传统同步机制主要指面向共享内存的同步机制,比如排它锁、共享锁等。这两种情况导致的泄露还是比较常见的。go 由于 defer 的存在,第二类情况,一般情况下还是比较容易避免的。
chanel 引起的泄露
先说 channel,如果之前读过官方的那篇并发的文章[1],翻译版[2],你会发现 channel 的使用,一个不小心就泄露了。我们来具体总结下那些情况下可能导致。
发送不接收
我们知道,发送者一般都会配有相应的接收者。理想情况下,我们希望接收者总能接收完所有发送的数据,这样就不会有任何问题。但现实是,一旦接收者发生异常退出,停止继续接收上游数据,发送者就会被阻塞。这个情况在 前面说的文章[3] 中有非常细致的介绍。
示例代码:
package main
import "time"
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func main() {
defer func() {
fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
}()
// Set up the pipeline.
out := gen(2, 3)
for n := range out {
fmt.Println(n) // 2
time.Sleep(5 * time.Second) // done thing, 可能异常中断接收
if true { // if err != nil
break
}
}
}
例子中,发送者通过 out chan 向下游发送数据,main 函数接收数据,接收者通常会依据接收到的数据做一些具体的处理,这里用 Sleep 代替。如果这期间发生异常,导致处理中断,退出循环。gen 函数中启动的 goroutine 并不会退出。
如何解决?
此处的主要问题在于,当接收者停止工作,发送者并不知道,还在傻傻地向下游发送数据。故而,我们需要一种机制去通知发送者。我直接说答案吧,就不循渐进了。Go 可以通过 channel 的关闭向所有的接收者发送广播信息。
修改后的代码:
package main
import "time"
func gen(done chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func main() {
defer func() {
time.Sleep(time.Second)
fmt.Println("the number of goroutines: ", runtime.NumGoroutine())
}()
// Set up the pipeline.
done := make(chan struct{})
defer close(done)
out := gen(done, 2, 3)
for n := range out {
fmt.Println(n) // 2
time.Sleep(5 * time.Second) // done thing, 可能异常中断接收
if true { // if err != nil
break
}
}
}
函数 gen 中通过 select 实现 2 个 channel 的同时处理。当异常发生时,将进入 <-done 分支,实现 goroutine 退出。这里为了演示效果,保证资源顺利释放,退出时等待了几秒保证释放完成。
执行后的输出如下:
the number of goroutines: 1









