用golang实现一个定时器任务队列实例

2020-01-28 12:58:45王旭

 很有幸得到公司信任,采用新的语言进行一些底层服务的开发,在实现功能的同时,也获得了一些感悟,因此在这记录一下,方便自己查看也可以共享给大家。

golang中定时器

golang中提供了2种定时器timer和ticker(如果JS很熟悉的话应该会很了解),分别是一次性定时器和重复任务定时器。

一般用法:


func main() { 
 input := make(chan interface{}) 
 //producer - produce the messages
 go func() {
  for i := 0; i < 5; i++ {
   input <- i
  }
  input <- "hello, world"
 }()
 
 t1 := time.NewTimer(time.Second * 5)
 t2 := time.NewTimer(time.Second * 10)
 
 for {
  select {
  //consumer - consume the messages
  case msg := <-input:
   fmt.Println(msg)
 
  case <-t1.C:
   println("5s timer")
   t1.Reset(time.Second * 5)
 
  case <-t2.C:
   println("10s timer")
   t2.Reset(time.Second * 10)
  }
 }
}

源码观察

这个C是啥,我们去源码看看,以timer为例:


type Timer struct {
 C <-chan Time
 r runtimeTimer
}

原来是一个channel,其实有GO基础的都知道,GO的运算符当出现的->或者<-的时候,必然是有一端是指channel。按照上面的例子来看,就是阻塞在一个for循环内,等待到了定时器的C从channel出来,当获取到值的时候,进行想要的操作。

设计我们的定时任务队列

我的需求

当时我的需求是这样,我需要接收到客户端的请求并产生一个定时任务,会在固定时间执行,可能是一次,也可能是多次,也可能到指定时间自动停止,可能当任务终止的时候,我还要能停止掉。

具体我画了个流程图,差不多如下,画图水平有限,请见谅。

定义结构


type OnceCron struct {
 tasks []*Task   //任务的列队
 add chan *Task  //当遭遇到新任务的时候
 remove chan string  //当遭遇到删除任务的时候
 stop chan struct{}  //当遇到停止信号的时候
 Logger *log.Logger  //日志 
}
type Job interface {
 Run()     //执行接口
}
type Task struct {
  Job  Job   //要执行的任务 
 Uuid string   //任务标识,删除时用
 RunTime int64   //执行时间
 Spacing int64   //间隔时间
 EndTime int64   //结束时间
 Number int    //总共要次数
}

队列实现

首先,我们要获得一个队列任务

func NewCron() *OnceCron 常规操作,为了节省篇幅,我就不写出来,具体可以看源码,贴在了底部。

然后,开始定时器队列的运行,一般,都会命名为Start。那么就有一个问题,我们刚开始启动程序的时候,这个时候是没有任务队列,那岂不是for{ select{}}在等待个毛毛球?所以,我们需要在Start的时候添加一个默认的任务, 我是这么做的,添加了一个一小时执行一次的重复队列,防止队列退出。