Golang 手写一个简单的并发任务 manager

2022-08-31 15:07:30
目录
前言errgroup需求拆解实战代码JobJobManager错误处理及时退出完整代码小结

前言

今天也是偏实战的内容,作为一个并发复习课,很简单,我们来看看怎样实现一个并发任务>

在微服务的场景下,我们有很多任务的执行是没有明确的先后顺序的,比如一个接口同时要做到任务 A 和 任务 B,两个任务分别拿到一些数据,最后组装裁剪后通过接口下发。

此时,A 和 B 两个任务没有依赖关系,如果我们串行来执行,会拖慢整个任务的执行节奏,用并发的方式来优化是一个方向。

那怎么实现呢?

errgroup

一个常见的想法是用>

今天我们不打算用这种实现,希望用更加基础的组件来引发思考,看看如何活用 sync 包提供的基础能力。另外一点是 errgroup 也有他的缺陷,如果在启动的协程中没有手动 recover,那么一旦在我们的任务中出现 panic,整个程序就 crash 了。

这一点还是很有争议的,很多开发者认为这是符合预期的,也有一些开发者希望在 New 一个 errgroup 的时候能够提供 option 控制是否来 recover。近期还有两个 issue 在进行激烈的讨论,目前看没有定论。

感兴趣的同学可以看下这两个 issue:

    x/sync/errgroup: why not recover the fn's err in errgroup #40484proposal: x/sync/errgroup: propagate panics and Goexits through Wait #53757

    需求拆解

    ok,我们来试着用>

      一定要能做到并发执行各个任务,开多个协程,而不是在一个 main goroutine 里串行执行各个任务;并发安全,我们当然不希望出现数据异常,不希望并发执行任务导致最后程序因为 runtime error 而挂掉;如果多个任务都失败,只返回一个 error 即可;能够 recover from panic,不需要开发者使用的时候再手动去写 recover 逻辑;性能有保障。

      并发执行这一点我们可以借助 sync.WaitGroup 的能力,每次启动一个goroutine,WaitGroup 就加 1,在 defer 里完成 Done,启动所有 goroutine 之后,等着 Wait 返回结果即可。常规的能力复用。

      需要额外处理的地方在于,怎么实现多个线程只有一个 error 能赋值,以及 recover 的适配。

      实战代码

      我们理一下思路,看看代码怎么写。

      Job

      首先一定需要定义一个通用的函数签名,使得开发者能够传入自己要执行的并发任务。

      type Job interface {
      	Do(ctx context.Context, param interface{}) error
      	Name() string
      }

      JobManager

      我们的>

      type JobManager []Job

      错误处理

      要达到只有一个>

        sync.Mutex 加锁;sync.Once 只执行一次。

        当然,什么时候我们都可以用一把大锁解决问题,但它的性能不会很好,能用原子操作解决的尽量还是不要用 Mutex,这里参照 errgroup,我们可以用一个 Once 对象来控制只赋值一次。

        panic 恢复可以直接在 defer 里面 recover 即可,需要能带出来 stack trace,把它变成一个 error 赋值

        及时退出

        有时候我们这个并发任务数量非常多,可能还没创建完>

        完整代码

        把上面的分析落地,这样我们就实现了一个带上了>

        package main
        import (
        	"context"
        	"errors"
        	"fmt"
        	"sync"
        	"sync/atomic"
        )
        type Job interface {
        	Do(ctx context.Context, param interface{}) error
        	Name() string
        }
        type JobManager []Job
        
        func (mgr JobManager) Execute(ctx context.Context, param interface{}) error {
        	var (
        		stop    int32 = 0
        		err     error
        		wg      sync.WaitGroup
        		errOnce sync.Once
        	)
        
        	for _, job := range mgr {
        		if atomic.LoadInt32(&stop) > 0 {
        			break
        		}
        
        		wg.Add(1)
        		go func(j Job) {
        			defer func() {
        				wg.Done()
        				if r := recover(); r != nil {
        					errMsg := fmt.Sprintf("JobManager panic: job: %v, reason: %v", j.Name(), r)
        					nerr := errors.New(errMsg)
        					errOnce.Do(func() {
        						if err == nil {
        							err = nerr
        						}
        					})
        					atomic.AddInt32(&stop, 1)
        				}
        			}()
        			nerr := j.Do(ctx, param)
        			if nerr != nil {
        				atomic.AddInt32(&stop, 1)
        				errOnce.Do(func() {
        					if err == nil {
        						err = nerr
        					}
        				})
        			}
        		}(job)
        	}
        	wg.Wait()
        	return err
        }

        使用方法也很简单:

        var mgr = JobManager{
        	AJob, BJob, CJob, // 这里的各个 Job 需要实现一开始我们定义的接口
        }
        err := mgr.Execute(ctx, param)

        这里我们需要定义统一的 param interface{},建议是一个接口,各个 Job 执行完毕后如果有需要写入的数据,可以调用 param 的 Setter 方法写入,最后直接拿 param 来做后续逻辑。

        小结

        今天我们用>

        建议大家回顾一下此前对于 once 以及 errgroup 的源码解析,相信你会更能融会贯通。

          Golang errgroup 设计和原理解析解析 Golang sync.Once 用法和原理

          到此这篇关于Golang 手写一个简单的并发任务 manager的文章就介绍到这了,更多相关Golang manager内容请搜索易采站长站以前的文章或继续浏览下面的相关文章希望大家以后多多支持易采站长站!