delayqueue的Go简易实现

1k 词

一、缘起

很多时候,业务有“在一段时间之后,完成一个工作任务”的需求。
例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
一般来说怎么实现这类“48小时后自动评价为5星”需求呢?
常见方案:启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价。
假设订单表的结构为:t_order(oid, finish_time, stars, status, …),更具体的,定时任务每隔一个小时会这么做一次:
select oid from t_order where finish_time > 48hours and status=0;
update t_order set stars=5 and status=1 where oid in[…];
如果数据量很大,需要分页查询,分页update,这将会是一个for循环。
方案的不足:
(1)轮询效率比较低
(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑
(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时
(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显

二、高效延时消息设计与实现

高效延时消息,包含两个重要的数据结构:
(1)环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
(2)任务集合,环上每一个slot是一个Set
同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。
Task结构中有两个很重要的属性:
(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针
image.png
使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好,精确到秒(控制timer移动频率可以控制精度)

三、Go代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main

import (
"errors"
"fmt"
"time"
)

const cicleSectionNum = 10

type TaskFunc func(args ...any)

type Task struct {
runTime time.Time //初次运行时间
cycleNum int //需要第几圈
curIndex int //当前运行到第几格
//执行的函数
exec TaskFunc
params []any
}

type DelayMessage struct {
cycleNum int //当前运行到第几圈了
curIndex int //当前运行到第几格
slots [cicleSectionNum]map[string]*Task
closed chan bool
taskClose chan bool
timeClose chan bool
startTime time.Time
}

func NewDelayMessage() *DelayMessage {
dm := &DelayMessage{
cycleNum: 0,
curIndex: 0,
closed: make(chan bool),
taskClose: make(chan bool),
timeClose: make(chan bool),
startTime: time.Now(),
}
for i := 0; i < cicleSectionNum; i++ {
dm.slots[i] = make(map[string]*Task)
}
return dm
}

func (dm *DelayMessage) Start() {
go dm.taskLoop()
go dm.timeLoop()
select {
case <-dm.closed:
dm.taskClose <- true
dm.timeClose <- true
break
}
}

func (dm *DelayMessage) Stop() {
dm.closed <- true
}

func (dm *DelayMessage) taskLoop() {
defer func() {
fmt.Println("任务遍历结束!")
}()
for {
select {
case <-dm.taskClose:
return
default:
{
tasks := dm.slots[dm.curIndex]
if len(tasks) > 0 {
for k, v := range tasks {
if v.cycleNum == dm.cycleNum {
go v.exec(v.params...)
delete(tasks, k)
}
}
}
}
}

}
}

func (dm *DelayMessage) timeLoop() {
defer func() {
fmt.Println("时间遍历结束!")
}()
tick := time.NewTicker(time.Second)
for {
select {
case <-dm.timeClose:
return
case <-tick.C:
fmt.Println(time.Now().Format(time.DateTime))
dm.curIndex = (dm.curIndex + 1) % cicleSectionNum
if dm.curIndex == 0 {
dm.cycleNum += 1
}
fmt.Println("当前循环时间", dm.cycleNum, dm.curIndex)
}
}

}

func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []any) error {
if dm.startTime.After(t) {
return errors.New("时间错误")
}
//当前时间与指定时间相差秒数
subSecond := t.Unix() - dm.startTime.Unix()
//计算循环次数
cycleNum := int(subSecond / cicleSectionNum)
//计算任务所在的slots的下标
ix := subSecond % cicleSectionNum
//把任务加入tasks中
tasks := dm.slots[ix]
if _, ok := tasks[key]; ok {
return errors.New("该slots中已存在key为" + key + "的任务")
}
tasks[key] = &Task{
runTime: t,
cycleNum: cycleNum,
curIndex: int(ix),
exec: exec,
params: params,
}
return nil
}

func main() {
fmt.Println("abc")
dm := NewDelayMessage()
//添加任务
dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...any) {
fmt.Println(args...)
}, []any{1, 2, 3})

dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...any) {
fmt.Println(args...)
}, []any{2, 2, 3})

dm.AddTask(time.Now().Add(time.Second*3), "test2", func(args ...any) {
fmt.Println(args...)
}, []any{3, 2, 3})

dm.AddTask(time.Now().Add(time.Second*11), "test11", func(args ...any) {
fmt.Println(args...)
}, []any{11, 2, 3})

dm.AddTask(time.Now().Add(time.Second*12), "test11", func(args ...any) {
fmt.Println(args...)
}, []any{11, 2, 3})

//40秒后关闭
time.AfterFunc(time.Second*40, func() {
dm.Stop()
})
dm.Start()
}
留言