Go 并发控制context实现原理剖析(小结)

2020-01-28 13:31:00王冬梅

上面代码中协程HandelRequest()用于处理某个请求,其又会创建两个协程:WriteRedis()、WriteDatabase(),main协程创建创建context,并把context在各子协程间传递,main协程在适当的时机可以cancel掉所有子协程。

程序输出如下所示:


HandelRequest running
WriteDatabase running
WriteRedis running
HandelRequest running
WriteDatabase running
WriteRedis running
HandelRequest running
WriteDatabase running
WriteRedis running
It's time to stop all sub goroutines!
WriteDatabase Done.
HandelRequest Done.
WriteRedis Done.

2.4 timerCtx

源码包中src/context/context.go:timerCtx 定义了该类型context:


type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

timerCtx在cancelCtx基础上增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器。

由此,衍生出WithDeadline()和WithTimeout()。实现上这两种类型实现原理一样,只不过使用语境不一样:

deadline: 指定最后期限,比如context将2018.10.20 00:00:00之时自动结束 timeout: 指定最长存活时间,比如context将在30s后结束。

对于接口来说,timerCtx在cancelCtx基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。

2.4.1 Deadline()接口实现

Deadline()方法仅仅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法设置的。

2.4.2 cancel()接口实现

cancel()方法基本继承cancelCtx,只需要额外把timer关闭。

timerCtx被关闭后,timerCtx.cancelCtx.err将会存储关闭原因:

如果deadline到来之前手动关闭,则关闭原因与cancelCtx显示一致; 如果deadline到来时自动关闭,则原因为:"context deadline exceeded"

2.4.3 WithDeadline()方法实现

WithDeadline()方法实现步骤如下:

初始化一个timerCtx实例 将timerCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话) 启动定时器,定时器到期后会自动cancel本context 返回timerCtx实例和cancel()方法

也就是说,timerCtx类型的context不仅支持手动cancel,也会在定时器到来后自动cancel。

2.4.4 WithTimeout()方法实现

WithTimeout()实际调用了WithDeadline,二者实现原理一致。

看代码会非常清晰:


func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

2.4.5 典型使用案例

下面例子中使用WithTimeout()获得一个context并在其了协程中传递:


package main

import (
  "fmt"
  "time"
  "context"
)

func HandelRequest(ctx context.Context) {
  go WriteRedis(ctx)
  go WriteDatabase(ctx)
  for {
    select {
    case <-ctx.Done():
      fmt.Println("HandelRequest Done.")
      return
    default:
      fmt.Println("HandelRequest running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteRedis(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteRedis Done.")
      return
    default:
      fmt.Println("WriteRedis running")
      time.Sleep(2 * time.Second)
    }
  }
}

func WriteDatabase(ctx context.Context) {
  for {
    select {
    case <-ctx.Done():
      fmt.Println("WriteDatabase Done.")
      return
    default:
      fmt.Println("WriteDatabase running")
      time.Sleep(2 * time.Second)
    }
  }
}

func main() {
  ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
  go HandelRequest(ctx)

  time.Sleep(10 * time.Second)
}