Go定时任务源码 - robfig/cron
Hyman 2022/11/21 探索Go
# 介绍
robfig/cron (opens new window)是Go语言实现的开源定时任务调度框架,核心代码是巧妙的使用chan + select + for实现了一个轻量级调度协程,不但语法简洁,而且具有很好的性能,是一个很好的并发编程素材。
# 设计
任务抽象(业务隔离):任务抽象成一个Job接口,业务逻辑类只需实现该接口,参数和返回值调度并不关心,所有都为空
type Job interface {
Run()
}
1
2
3
2
3
计划接口:通过当前时间计算任务的下次执行执行时间,具体实现类可以根据实际需求实现
type Schedule interface {
Next(time.Time) time.Time
}
1
2
3
2
3
定时任务对象:保存执行的任务Job、计算执行时间
type Entry struct {
ID EntryID // id
Schedule Schedule // 计划
Next time.Time // 下次执行时间
Job Job // 任务
}
1
2
3
4
5
6
2
3
4
5
6
任务调度管理:保存定时任务对象(Entry),调度任务执行,提供新增、删除接口(涉及关联资源竞争)
// 任务管理类
type Cron struct {
nextID int64 // 生成entry自增ID
entries []*Entry // 保存Entry
add chan *Entry // 添加
remove chan EntryID // 删除
}
// 删除
func (c *Cron) Remove(id EntryID) {
c.remove <- id
}
// 新增
func (c *Cron) Add(spec string, cmd Job) EntryID {
entry := &Entry{
ID: EntryID(atomic.AddInt64(&c.nextID, 1)),
Schedule: ParseStandard(spec),
Job: cmd,
}
c.add <- entry
return entry.ID
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
核心调度:计算下次执行时间 -> 排序 -> 取最早执行数据 -> timer 等待,因为只有一个协程在执行这个run的调度,所以不存在资源竞争,不需要加锁,另外考虑到执行任务可能涉及阻塞,例如:IO操作,所以一般startJob方法会开启协程执行
func (c *Cron) run() {
now := time.Now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now) // 计算下次执行时间
}
for {
sort.Sort(byTime(c.entries)) // 时间排序
timer := time.NewTimer(c.entries[0].Next.Sub(now))
select {
case now = <-timer.C:
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.Job) // 开协程执行
e.Next = e.Schedule.Next(now) // 计算下次执行时间
}
case newEntry := <-c.add: // 新增
timer.Stop()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
}
...
}
}
// 执行任务
func (c *Cron) startJob(j Job) {
go func() {
j.Run()
}()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
启动时会开启唯一协程执行run方法,计算任务执行时间,执行,任务管理等
func New() *Cron {
c := &Cron{
entries: nil,
add: make(chan *Entry),
remove: make(chan EntryID),
}
return c
}
func (c *Cron) Start() {
go c.run()
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 总结
- 共享资源(定时任务)的管理和调度由唯一协程管理
- 通过for + select + channel来循环计算执行时间,监听任务到期、增删事件
- 执行任务会新启协程执行,不阻塞调度
- 采用扇入/扇出原理,多协程添加、增删任务调度协程(Fan In),调度启动新协程执行任务(Fan Out)
- 调度协程使用的是CSP并发模型思想