Go 并发控制context实现原理剖析(小结)

2020-01-28 13:31:00王冬梅

cancelCtx.err默认是nil,在context被cancel时指定一个error变量: var Canceled = errors.New("context canceled")

2.3.3 cancel()接口实现

cancel()内部方法是理解cancelCtx的最关键的方法,其作用是关闭自己和其后代,其后代存储在cancelCtx.children的map中,其中key值即后代对象,value值并没有意义,这里使用map只是为了方便查询而已。

cancel方法实现伪代码如下所示:


func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  c.mu.Lock()
	
  c.err = err	           //设置一个error,说明关闭原因
  close(c.done)           //将channel关闭,以此通知派生的context
	
  for child := range c.children {  //遍历所有children,逐个调用cancel方法
    child.cancel(false, err)
  }
  c.children = nil
  c.mu.Unlock()

  if removeFromParent {      //正常情况下,需要将自己从parent删除
    removeChild(c.Context, c)
  }
}

实际上,WithCancel()返回的第二个用于cancel context的方法正是此cancel()。

2.3.4 WithCancel()方法实现

WithCancel()方法作了三件事:

初始化一个cancelCtx实例 将cancelCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话) 返回cancelCtx实例和cancel()方法

其实现源码如下所示:


func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)  //将自身添加到父节点
	return &c, func() { c.cancel(true, Canceled) }
}

这里将自身添加到父节点的过程有必要简单说明一下:

如果父节点也支持cancel,也就是说其父节点肯定有children成员,那么把新context添加到children里即可; 如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到children里; 如果所有的父节点均不支持cancel,则启动一个协程等待父节点结束,然后再把当前context结束。

2.3.5 典型使用案例

一个典型的使用cancel context的例子如下所示:


package main

import (
  "fmt"
  "time"
  "context"
)

func HandelRequest(ctx context.Context) {
  go WriteRedis(ctx)
  go WriteDatabase(ctx)
  for {
    select {
    case <-ctx.Done():
      fmt.Println("HandelRequest Done.")
      return
    default:
      fmt.Println("HandelRequest running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteRedis(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteRedis Done.")
      return
    default:
      fmt.Println("WriteRedis running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteDatabase(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteDatabase Done.")
      return
    default:
      fmt.Println("WriteDatabase running")
      time.Sleep(2 * time.Second)
    }
  }
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  go HandelRequest(ctx)

  time.Sleep(5 * time.Second)
  fmt.Println("It's time to stop all sub goroutines!")
  cancel()

  //Just for test whether sub goroutines exit or not
  time.Sleep(5 * time.Second)
}