正确的使用 Golang 的 Channel 进行并发通信
基础的Channel操作这里不再赘言,讨论一个重要的点:如何正确的关闭一个Channel
场景一:
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
ch := make(chan struct{})
go func() {
time.Sleep(delay)
fmt.Println("BREAKING NEWS:", text)
close(ch) // 广播 - 一个关闭的管道都会发送一个零值
}()
return ch
}
func main() {
wait := Publish("Channels let goroutines communicate.", 5*time.Second)
fmt.Println("Waiting for the news...")
<-wait
fmt.Println("The news is out, time to leave.")
}
这段代码的输出是:
Waiting for the news...
BREAKING NEWS: Channels let goroutines communicate.
The news is out, time to leave.
在这个场景中,sender是Publish,receiver是main函数。在新闻被广播之后,sender主动关闭了Channel,receiver收到零值解除阻塞。一切都是正常且容易理解的。
场景二:
func Publish(text string, pipe chan string) {
for {
pipe <- text
}
}
func main() {
pipe := make(chan string)
go Publish("Channels let goroutines communicate.", pipe)
fmt.Println("Waiting for the news...")
for i := 0; i < 5; i++ {
text := <-pipe
fmt.Println(text)
}
close(pipe)
fmt.Println("The news is out, time to leave.")
time.Sleep(time.Second)
}
这段代码的输出时:
panic: send on closed channel
goroutine 5 [running]:
main.Publish(0x1443c8, 0x24, 0x1042e040, 0x0)
/tmp/sandbox452988919/main.go:10 +0x60
created by main.main
/tmp/sandbox452988919/main.go:16 +0xa0
goroutine 1 [sleep]:
main.main()
/tmp/sandbox452988919/main.go:24 +0x3c0
goroutine 6 [runnable]:
runtime.timerproc()
/usr/src/go/src/runtime/time.go:151
runtime.goexit()
/usr/src/go/src/runtime/asm_amd64p32.s:1086 +0x1
在这个场景中,sender依然是Publish,receiver也依然是main函数。但是和上个场景的区别在于,这次是receiver主动关闭了Channel,这就造成往一个已经被关闭的Channel写入数据的问题。
Golang并没有提供检测一个Channel在写入时是否已经被关闭的方法。然而在一些复杂的业务逻辑中,很可能需要receiver来关闭Channel,如何解决呢。按照Golang的风格, 这种情况应该使用其他的辅助逻辑来让关闭Channel的始终是sender,这样可能会使逻辑变得非常复杂。不过也有一种比较好的方法来以一种容易理解的方式解决这个问题。
如果不想使逻辑更加复杂,在读写Channel时加锁也可以达到效果。
以下为错误示例
场景二的改进代码:
type Pipe struct {
pipe chan string
closed chan bool
}
func Publish(text string, pipeStruct *Pipe) {
for {
select {
case <-pipeStruct.closed:
close(pipeStruct.pipe)
return
case pipeStruct.pipe <- text:
}
}
}
func main() {
pipeStruct := &Pipe{
pipe: make(chan string),
closed: make(chan bool),
}
go Publish("Channels let goroutines communicate.", pipeStruct)
fmt.Println("Waiting for the news...")
for i := 0; i < 5; i++ {
text := <-pipeStruct.pipe
fmt.Println(text)
}
close(pipeStruct.closed)
fmt.Println("The news is out, time to leave.")
time.Sleep(time.Second)
}
这里使用select来进行写入,select的case的执行是根据代码的顺序,当有一个或多个的case都是非block时,随机选择一个非block的case执行。当close了一个Channel时,第一个case为非block,有效的避免了panic