在 go 中当我们需要延迟一段时间后执行,或者需要间隔固定时间去执行某个行为的时候就需要使用到 timer,那么 timer 到底是如何实现的呢?我们今天就来看看 timer 里面是什么样的。

同时因为 1.14 版本前后 timer 的实现有很大的区别,我们顺便来了解一下之前的版本和现在的版本有什么样的不一样,到底做了什么样的优化。

前置知识点

有以下的知识点支持才能更好的理解今天的分析

  • 需要有 GMP 模型的基础
  • 需要有 go 调度相关的基础
  • 需要有数据结构中’堆‘的基础

ticker

要看 timer 可以先从 ticker 入手,ticker 其实我们经常使用到,ticker 顾名思义就是每次间隔一段时间触发一次,下面我们就来看看它的具体实现

带着问题

  • Ticker 如果当前时间到了,没有及时处理,下一次时间到了,会保留吗?是都在后面排队,还是直接被丢弃了?
  • NewTicker()Tick() 有什么区别?使用上需要注意什么?

数据结构

1
2
3
4
5
6
// A Ticker holds a channel that delivers ``ticks'' of a clock
// at intervals.
type Ticker struct {
C <-chan Time // The channel on which the ticks are delivered.
r runtimeTimer
}

可以看到它的数据结构非常简单,就是一个 channel 当时间到达就会向这个 channel 里面发送一个触发的时间

Start Stop Reset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// NewTicker returns a new Ticker containing a channel that will send
// the time on the channel after each tick. The period of the ticks is
// specified by the duration argument. The ticker will adjust the time
// interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

// Stop turns off a ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine
// reading from the channel from seeing an erroneous "tick".
func (t *Ticker) Stop() {
stopTimer(&t.r)
}

// Reset stops a ticker and resets its period to the specified duration.
// The next tick will arrive after the new period elapses.
func (t *Ticker) Reset(d Duration) {
if t.r.f == nil {
panic("time: Reset called on uninitialized Ticker")
}
modTimer(&t.r, when(d), int64(d), t.r.f, t.r.arg, t.r.seq)
}

注意点有以下几个:

  • ticker 中的 channel 长度为 1,这也就意味着里面只能放一个触发的时间,也就是说如果当前这次触发没有处理完成,下次触发来了可以先存着,但是再下一次就直接会被抛弃了。你是不是奇怪为什么要单独提出这一点来说,想要说明的是,ticker 的使用并不能保证一定间隔相同的时间触发,如果你再处理过程中阻塞住了,间隔时间短就可能出现连续,所以处理一定要保证及时。
  • stop 并不会关闭 channel,因为并发的时候可能同时到了触发时间,如果关闭了 channel 就有可能出现往一个关闭的 channel 中发消息的 panic;但也只有 stop 了之后相关的资源才会得到释放,所以用完之后记得关闭

可以看到三个方法都比较简单,主要就是利用 timer 去实现的 ticker,所以我们主要需要关注在 startTimer stopTimer modTimer 方法上

Tick 方法

1
2
3
4
5
6
7
8
9
10
11
// Tick is a convenience wrapper for NewTicker providing access to the ticking
// channel only. While Tick is useful for clients that have no need to shut down
// the Ticker, be aware that without a way to shut it down the underlying
// Ticker cannot be recovered by the garbage collector; it "leaks".
// Unlike NewTicker, Tick will return nil if d <= 0.
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C
}

从这里我们很明显可以看到,其实 Tick 方法就是对 NewTicker 的一个封装,让使用 Ticker 更加简单,直接一行代码搞定,但是随之带来的就是你没有办法去关闭这个 Ticker 了。
这也就意味着会导致内存泄露,所以一般在项目中都会使用 NewTicker 方法,除非你的项目当 Tick 停止时就已经直接退出了,那也不必考虑这个问题。

好了,现在我们可以聚焦到这次我们的主角 Timer 上了

go1.13 的 Timer

老版本的 timer 实现比较简单,代码也比较清晰

startTimer

1
2
3
4
5
6
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}

这里我们可以看到,首先是分配了一个 bucket 然后加锁之后开始 addtimerLocked

1
2
3
4
5
6
7
8
9
func addtimer(t *timer) {
tb := t.assignBucket()
lock(&tb.lock)
ok := tb.addtimerLocked(t)
unlock(&tb.lock)
if !ok {
badTimer()
}
}
1
2
3
4
5
6
const timersLen = 64
func (t *timer) assignBucket() *timersBucket {
id := uint8(getg().m.p.ptr().id) % timersLen
t.tb = &timers[id].timersBucket
return t.tb
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(tb.t)
tb.t = append(tb.t, t)
if !siftupTimer(tb.t, t.i) {
return false
}
if t.i == 0 {
// siftup moved to top: new earliest deadline.
if tb.sleeping && tb.sleepUntil > t.when {
tb.sleeping = false
notewakeup(&tb.waitnote)
}
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
if !tb.created {
tb.created = true
go timerproc(tb)
}
}
return true
}

当看到 siftupTimer 这个方法的时候你应该就豁然开朗了,因为这个很明显的就是一个堆的操作,只不过这里的堆是一个 4 叉堆,你看它找父节点的时候是 /4 的,siftdownTimer 也是类似这里也不多赘述了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func siftupTimer(t []*timer, i int) bool {
if i >= len(t) {
return false
}
when := t[i].when
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
t[i].i = i
i = p
}
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
return true
}

总的来说启动一个 timer 就是三步走

  • 加锁
  • 将新的 timer 添加到数组末尾
  • 堆化

stopTimer

其实知道了启动停止就不难了,也是类似的,从数组中删除之后然后堆化就可以了

1
2
3
4
5
6
// stopTimer removes t from the timer heap if it is there.
// It returns true if t was removed, false if t wasn't even there.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
if t.tb == nil {
// t.tb can be nil if the user created a timer
// directly, without invoking startTimer e.g
// time.Ticker{C: c}
// In this case, return early without any deletion.
// See Issue 21874.
return false
}

tb := t.tb

lock(&tb.lock)
removed, ok := tb.deltimerLocked(t)
unlock(&tb.lock)
if !ok {
badTimer()
}
return removed
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i := t.i
last := len(tb.t) - 1
if i < 0 || i > last || tb.t[i] != t {
return false, true
}
if i != last {
tb.t[i] = tb.t[last]
tb.t[i].i = i
}
tb.t[last] = nil
tb.t = tb.t[:last]
ok = true
if i != last {
if !siftupTimer(tb.t, i) {
ok = false
}
if !siftdownTimer(tb.t, i) {
ok = false
}
}
return true, ok
}

何时触发?

那么问题来了,时间到了之后什么地方触发往 timer 中的 channel 中发数据呢?其实前面的源码中已经给出了细节,在 addtimerLocked 方法中:

1
2
3
4
5
if !tb.created {
tb.created = true
// 这里创建了一个 goroutine 专门来运行 timerproc 方法
go timerproc(tb)
}

创建的时候会调用 timerproc 方法,我们来看看这个方法里面做了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Timerproc runs the time-driven events.
// It sleeps until the next event in the tb heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
lock(&tb.lock)
tb.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(tb.t) == 0 {
delta = -1
break
}
// 获取堆顶元素
t := tb.t[0]
// 看是否满足触发时间
delta = t.when - now
if delta > 0 {
break
}
ok := true
// 当 period > 0 则说明这是一个需要周期性触发的 timer 也就是 ticker,否则就触发一次后直接从堆里面移除
if t.period > 0 {
// leave in heap but adjust next time to fire
// 修改当前元素的触发时间,然后直接开始堆化即可,自己就排到后面去了
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(tb.t, 0) {
ok = false
}
} else {
// remove from heap
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
if !siftdownTimer(tb.t, 0) {
ok = false
}
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
if !ok {
badTimer()
}
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
// 这里就是真正触发定时方法的地方,如果是 ticker 的话就是初始化的 sendTime 方法,就是将当前时间发送到 channel 中
f(arg, seq)
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
// 这里可以看到,如果对顶元素还没有到对应的触发时间,那么就睡眠相对应的时间即可
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}

可以看到整体思路很清晰,就是将最先触发的元素拿出来,然后判断是否到时间,如果到了时间就触发,如果没到,就睡眠一个 delta 的时间等待触发。当然在 addtimerLocked 方法中也会尝试唤醒 (调用notewakeup方法),因为新加入的 timer 肯定会影响当前整个堆的下一次触发时间。

所以总的来说在 go1.13 版本中,timer 的实现还是比较简单清晰的

go1.17 的 Timer

那么我们来看看现在版本的 timer 是如何实现的,因为我们上面详细看过,这里就省略其中部分。

在当前新的版本中对于 timer 的定义有了各种状态的表示,下面的注释也很清晰,标识了各种状态所出现的情况,至于状态的转换这里就不给出具体的状态图了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Values for the timer status field.
const (
// Timer has no status set yet.
timerNoStatus = iota

// Waiting for timer to fire.
// The timer is in some P's heap.
timerWaiting

// Running the timer function.
// A timer will only have this status briefly.
timerRunning

// The timer is deleted and should be removed.
// It should not be run, but it is still in some P's heap.
timerDeleted

// The timer is being removed.
// The timer will only have this status briefly.
timerRemoving

// The timer has been stopped.
// It is not in any P's heap.
timerRemoved

// The timer is being modified.
// The timer will only have this status briefly.
timerModifying

// The timer has been modified to an earlier time.
// The new when value is in the nextwhen field.
// The timer is in some P's heap, possibly in the wrong place.
timerModifiedEarlier

// The timer has been modified to the same or a later time.
// The new when value is in the nextwhen field.
// The timer is in some P's heap, possibly in the wrong place.
timerModifiedLater

// The timer has been modified and is being moved.
// The timer will only have this status briefly.
timerMoving
)

addtimer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some P's heap,
// which could cause the heap to become unsorted.
func addtimer(t *timer) {
//.................

when := t.when

// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()

// 获取当前 g 所绑定的 m,然后再拿到绑定的 p
pp := getg().m.p.ptr()
lock(&pp.timersLock)
// 首先做清除,清除那些已经标记为删除的 timer
cleantimers(pp)
// 然后将当前的 timer 加入到当前 p 所属的 timer 列表中
doaddtimer(pp, t)
unlock(&pp.timersLock)

wakeNetPoller(when)

releasem(mp)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
// ...............

t.pp.set(pp)
i := len(pp.timers)
// 这里的操作和之前类似,只不过这次不是放在桶里了,而是放到了 P 上,放完之后依旧是堆化
pp.timers = append(pp.timers, t)
siftupTimer(pp.timers, i)
if t == pp.timers[0] {
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type p struct {
// ..............

// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex

// P 里面是有一个专门的地方来保存这个 timer 堆的
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer

// Number of timers in P's heap.
// Modified using atomic instructions.
numTimers uint32

// Number of timerDeleted timers in P's heap.
// Modified using atomic instructions.
deletedTimers uint32
// ..............
}

这里可以看到我们的 timer 堆已经不再是一个放在全局各个桶下面的了,而是在 P 内部保存 timer 堆,其他和原来的基本思路一致

deltimer

删除和原来的操作就不一样了,原先删除后会直接进行堆的操作,而在新版本中不是的,只是标记了状态,根据当前不同的状态进行操作,如:没有运行怎么办,或已经运行了怎么办,当前还未被添加….

而在 cleantimers 方法中会对已经标记为删除的 timer 做相对应的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// Must fetch t.pp before changing status,
// as cleantimers in another goroutine
// can clear t.pp of a timerDeleted timer.
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// Must fetch t.pp before setting status
// to timerDeleted.
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerNoStatus:
// Removing timer that was never added or
// has already been run. Also see issue 21874.
return false
case timerModifying:
// Simultaneous calls to deltimer and modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
}

何时触发?

那么问题来了,在新版本里面是什么时候出发的。其实如果之前没有看过调度相关的源码还真的有点难找。

  1. schedule -> checkTimers -> runtimer
  2. stealWork -> checkTimers -> runtimer
  3. findrunnable -> checkTimers -> runtimer

其实是当调度的时候触发的 timer 检查,检查的时候触发的对应执行。而且如果你第一次看你会觉得神奇,为什么 work steal 的时候还会进行 timer 的检查呢?我们慢慢往下看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()

// ....................

top:
pp := _g_.m.p.ptr()
pp.preempt = false

if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if pp.runSafePointFn != 0 {
runSafePointFn()
}

// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}

checkTimers(pp, 0)

// ..................
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()

// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.

top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}

now, pollUntil, _ := checkTimers(_p_, 0)

// ..................

这里我们可以看到,确实是在调度的时候触发的 checkTimers 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// If it's not yet time for the first timer, or the first adjusted
// timer, then there is nothing to do.
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}

if next == 0 {
// No timers to run or adjust.
return now, 0, false
}

if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
// 当下一次触发实现还没有到的时候,这里有一个小细节,当需要删除 timer 个数小于 1/4 的时候是不操作的,直接返回,也就是说等着批量一起处理
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}

lock(&pp.timersLock)

if len(pp.timers) > 0 {
// 进行当前 p 的 timer 堆的调整,这个方法里面还有很多细节,这里不展开,推荐看一眼
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
// 如果有需要执行的 timer 的话,那么就调用 runtimer 方法去执行
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}

// If this is the local P, and there are a lot of deleted timers,
// clear them out. We only do this for the local P to reduce
// lock contention on timersLock.
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}

unlock(&pp.timersLock)

return now, pollUntil, ran
}

那么我们就来赶紧看看 runtimer 方法到底是如何运行的吧

runtimer

这个方法其实非常简单,就是将堆顶元素取出来看状态,根据不同状态去处理,如果满足运行时间则运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func runtimer(pp *p, now int64) int64 {
for {
// 取出堆顶元素
t := pp.timers[0]
if t.pp.ptr() != pp {
throw("runtimer: bad p")
}
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now {
// Not ready to run.
return t.when
}

if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
// Note that runOneTimer may temporarily unlock
// pp.timersLock.
// 如果已经到了当前触发时间,就运行当前这个 timer
runOneTimer(pp, t, now)
return 0
case timerDeleted:
//.....................
case timerModifiedEarlier, timerModifiedLater:
//.....................
case timerModifying:
osyield()
case timerNoStatus, timerRemoved:
badTimer()
case timerRunning, timerRemoving, timerMoving:
badTimer()
default:
badTimer()
}
}
}

运行其实和原来的逻辑是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func runOneTimer(pp *p, t *timer, now int64) {
//.....................

f := t.f
arg := t.arg
seq := t.seq

// 这里的逻辑和原来的 timerproc 中的逻辑是一致的
if t.period > 0 {
// Leave in heap but adjust next time to fire.
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
if t.when < 0 { // check for overflow.
t.when = maxWhen
}
siftdownTimer(pp.timers, 0)
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp)
} else {
// Remove from heap.
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}

//.....................

unlock(&pp.timersLock)

// 这里就是真正执行触发的方法了
f(arg, seq)

lock(&pp.timersLock)

//.....................
}

moveTimer

你以为这样就没有了?还有什么问题我们没有考虑到呢?我们现在已经知道新版本的 timer 堆是在 P 上的了,那么问题来了,当 P 被销毁的时候,可能当前的 P 上还有 timer 呢,那这些 timer 应该怎么办?当然是移走咯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (pp *p) destroy() {
assertLockHeld(&sched.lock)
assertWorldStopped()

//.....................

if len(pp.timers) > 0 {
// 找一个别的 P
plocal := getg().m.p.ptr()
// The world is stopped, but we acquire timersLock to
// protect against sysmon calling timeSleepUntil.
// This is the only case where we hold the timersLock of
// more than one P, so there are no deadlock concerns.
lock(&plocal.timersLock)
lock(&pp.timersLock)
// 这里把当前 P 上的 timer 都移走
moveTimers(plocal, pp.timers)
//.....................
}

版本对比

看完了源码你会发现,1.14 前后 timer 变化点主要在两个方面:

  1. 存储方式由原来的放在全局的桶里转而放到了 P 上
  2. 触发方式的由原来的单个 goroutine 方法循环定期触发改为调度中触发

接下来就是篇的最后重点部分了:为什么 1.14 前后 timer 需要做这样的优化?更快了吗?

我用图来让你快速明白为什么会有这个改动。

存储结构

timer存储结构

timer存储结构

存储结构上的改变很容易看出来,就是从原来的桶换成了 P,那么为什么呢?

问题关键

timer问题关键

PS:图中的 TP 意思是 运行 timerproc 的 G

可以看到,改动之前,timer 的触发需要频繁的做 M 和 P 的绑定和解绑操作。

就这?对这就是问题的关键。我们举个例子,如果有一个 ticker 每秒触发一次,每触发一次就需要绑定一次 M 解绑一次,而当系统中的 timer 越来越多,那么随之带来的就是越加频繁的切换了。

而改动之后,timer 的触发是在调度循环里面,而且存储在本地的 P 中,所以没有了绑定和解绑的过程,也不再需要一个运行 timerproc goroutine 单独去维护触发。

总结

下面回顾总结几个点:

  1. timer 堆从原有的桶移动到了 P 上,是为了解决频繁切换 MP 的问题。
  2. 因为 checkTimers 是在调度循环里面执行的,所以一些操作被延后执行,比如删除 timer 的操作只是修改状态,而懒到后面一起去执行。
  3. 其实 timer 的设计说到底还是一个堆的存储,然后堆顶就是下一次最近要执行的 timer。

总的来说 timer 的实现还是比较清晰的,其实更老的版本中,一开始 timer 的实现的堆只有一个,而为了优化全局锁的并发性能才出现了 64 个桶这样的结构,然后又发现了切换的性能问题,继续优化才有了现在的 timer。所以其实现在看来很多 go 里面复杂的设计原本都是也是由一个非常简单的设计演变而来的。