一、缘起
很多时候,业务有“在一段时间之后,完成一个工作任务”的需求。
例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
一般来说怎么实现这类“48小时后自动评价为5星”需求呢?
常见方案:启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价。
假设订单表的结构为:t_order(oid, finish_time, stars, status, …),更具体的,定时任务每隔一个小时会这么做一次:
select oid from t_order where finish_time > 48hours and status=0;
update t_order set stars=5 and status=1 where oid in[…];
如果数据量很大,需要分页查询,分页update,这将会是一个for循环。
方案的不足:
(1)轮询效率比较低
(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑
(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时
(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显
二、高效延时消息设计与实现
高效延时消息,包含两个重要的数据结构:
(1)环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
(2)任务集合,环上每一个slot是一个Set
同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。
Task结构中有两个很重要的属性:
(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针

使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好,精确到秒(控制timer移动频率可以控制精度)
三、Go代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| package main
import ( "errors" "fmt" "time" )
const cicleSectionNum = 10
type TaskFunc func(args ...any)
type Task struct { runTime time.Time cycleNum int curIndex int exec TaskFunc params []any }
type DelayMessage struct { cycleNum int curIndex int slots [cicleSectionNum]map[string]*Task closed chan bool taskClose chan bool timeClose chan bool startTime time.Time }
func NewDelayMessage() *DelayMessage { dm := &DelayMessage{ cycleNum: 0, curIndex: 0, closed: make(chan bool), taskClose: make(chan bool), timeClose: make(chan bool), startTime: time.Now(), } for i := 0; i < cicleSectionNum; i++ { dm.slots[i] = make(map[string]*Task) } return dm }
func (dm *DelayMessage) Start() { go dm.taskLoop() go dm.timeLoop() select { case <-dm.closed: dm.taskClose <- true dm.timeClose <- true break } }
func (dm *DelayMessage) Stop() { dm.closed <- true }
func (dm *DelayMessage) taskLoop() { defer func() { fmt.Println("任务遍历结束!") }() for { select { case <-dm.taskClose: return default: { tasks := dm.slots[dm.curIndex] if len(tasks) > 0 { for k, v := range tasks { if v.cycleNum == dm.cycleNum { go v.exec(v.params...) delete(tasks, k) } } } } }
} }
func (dm *DelayMessage) timeLoop() { defer func() { fmt.Println("时间遍历结束!") }() tick := time.NewTicker(time.Second) for { select { case <-dm.timeClose: return case <-tick.C: fmt.Println(time.Now().Format(time.DateTime)) dm.curIndex = (dm.curIndex + 1) % cicleSectionNum if dm.curIndex == 0 { dm.cycleNum += 1 } fmt.Println("当前循环时间", dm.cycleNum, dm.curIndex) } }
}
func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []any) error { if dm.startTime.After(t) { return errors.New("时间错误") } subSecond := t.Unix() - dm.startTime.Unix() cycleNum := int(subSecond / cicleSectionNum) ix := subSecond % cicleSectionNum tasks := dm.slots[ix] if _, ok := tasks[key]; ok { return errors.New("该slots中已存在key为" + key + "的任务") } tasks[key] = &Task{ runTime: t, cycleNum: cycleNum, curIndex: int(ix), exec: exec, params: params, } return nil }
func main() { fmt.Println("abc") dm := NewDelayMessage() dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...any) { fmt.Println(args...) }, []any{1, 2, 3})
dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...any) { fmt.Println(args...) }, []any{2, 2, 3})
dm.AddTask(time.Now().Add(time.Second*3), "test2", func(args ...any) { fmt.Println(args...) }, []any{3, 2, 3})
dm.AddTask(time.Now().Add(time.Second*11), "test11", func(args ...any) { fmt.Println(args...) }, []any{11, 2, 3})
dm.AddTask(time.Now().Add(time.Second*12), "test11", func(args ...any) { fmt.Println(args...) }, []any{11, 2, 3})
time.AfterFunc(time.Second*40, func() { dm.Stop() }) dm.Start() }
|