// A Ticker holds a channel that delivers ``ticks'' of a clock // at intervals. type Ticker struct { C <-chan Time // The channel on which the ticks are delivered. r runtimeTimer }
// NewTicker returns a new Ticker containing a channel that will send // the time on the channel after each tick. The period of the ticks is // specified by the duration argument. The ticker will adjust the time // interval or drop ticks to make up for slow receivers. // The duration d must be greater than zero; if not, NewTicker will // panic. Stop the ticker to release associated resources. funcNewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
// Stop turns off a ticker. After Stop, no more ticks will be sent. // Stop does not close the channel, to prevent a concurrent goroutine // reading from the channel from seeing an erroneous "tick". func(t *Ticker) Stop() { stopTimer(&t.r) }
// Reset stops a ticker and resets its period to the specified duration. // The next tick will arrive after the new period elapses. func(t *Ticker) Reset(d Duration) { if t.r.f == nil { panic("time: Reset called on uninitialized Ticker") } modTimer(&t.r, when(d), int64(d), t.r.f, t.r.arg, t.r.seq) }
// Tick is a convenience wrapper for NewTicker providing access to the ticking // channel only. While Tick is useful for clients that have no need to shut down // the Ticker, be aware that without a way to shut it down the underlying // Ticker cannot be recovered by the garbage collector; it "leaks". // Unlike NewTicker, Tick will return nil if d <= 0. funcTick(d Duration) <-chan Time { if d <= 0 { returnnil } return NewTicker(d).C }
// Add a timer to the heap and start or kick timerproc if the new timer is // earlier than any of the others. // Timers are locked. // Returns whether all is well: false if the data structure is corrupt // due to user-level races. func(tb *timersBucket) addtimerLocked(t *timer) bool { // when must never be negative; otherwise timerproc will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = 1<<63 - 1 } t.i = len(tb.t) tb.t = append(tb.t, t) if !siftupTimer(tb.t, t.i) { returnfalse } if t.i == 0 { // siftup moved to top: new earliest deadline. if tb.sleeping && tb.sleepUntil > t.when { tb.sleeping = false notewakeup(&tb.waitnote) } if tb.rescheduling { tb.rescheduling = false goready(tb.gp, 0) } if !tb.created { tb.created = true go timerproc(tb) } } returntrue }
funcsiftupTimer(t []*timer, i int)bool { if i >= len(t) { returnfalse } when := t[i].when tmp := t[i] for i > 0 { p := (i - 1) / 4// parent if when >= t[p].when { break } t[i] = t[p] t[i].i = i i = p } if tmp != t[i] { t[i] = tmp t[i].i = i } returntrue }
总的来说启动一个 timer 就是三步走
加锁
将新的 timer 添加到数组末尾
堆化
stopTimer
其实知道了启动停止就不难了,也是类似的,从数组中删除之后然后堆化就可以了
1 2 3 4 5 6
// stopTimer removes t from the timer heap if it is there. // It returns true if t was removed, false if t wasn't even there. //go:linkname stopTimer time.stopTimer funcstopTimer(t *timer)bool { return deltimer(t) }
// Delete timer t from the heap. // Do not need to update the timerproc: if it wakes up early, no big deal. funcdeltimer(t *timer)bool { if t.tb == nil { // t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. returnfalse }
tb := t.tb
lock(&tb.lock) removed, ok := tb.deltimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } return removed }
func(tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 if i < 0 || i > last || tb.t[i] != t { returnfalse, true } if i != last { tb.t[i] = tb.t[last] tb.t[i].i = i } tb.t[last] = nil tb.t = tb.t[:last] ok = true if i != last { if !siftupTimer(tb.t, i) { ok = false } if !siftdownTimer(tb.t, i) { ok = false } } returntrue, ok }
// Values for the timer status field. const ( // Timer has no status set yet. timerNoStatus = iota
// Waiting for timer to fire. // The timer is in some P's heap. timerWaiting
// Running the timer function. // A timer will only have this status briefly. timerRunning
// The timer is deleted and should be removed. // It should not be run, but it is still in some P's heap. timerDeleted
// The timer is being removed. // The timer will only have this status briefly. timerRemoving
// The timer has been stopped. // It is not in any P's heap. timerRemoved
// The timer is being modified. // The timer will only have this status briefly. timerModifying
// The timer has been modified to an earlier time. // The new when value is in the nextwhen field. // The timer is in some P's heap, possibly in the wrong place. timerModifiedEarlier
// The timer has been modified to the same or a later time. // The new when value is in the nextwhen field. // The timer is in some P's heap, possibly in the wrong place. timerModifiedLater
// The timer has been modified and is being moved. // The timer will only have this status briefly. timerMoving )
// addtimer adds a timer to the current P. // This should only be called with a newly created timer. // That avoids the risk of changing the when field of a timer in some P's heap, // which could cause the heap to become unsorted. funcaddtimer(t *timer) { //.................
when := t.when
// Disable preemption while using pp to avoid changing another P's heap. mp := acquirem()
// 获取当前 g 所绑定的 m,然后再拿到绑定的 p pp := getg().m.p.ptr() lock(&pp.timersLock) // 首先做清除,清除那些已经标记为删除的 timer cleantimers(pp) // 然后将当前的 timer 加入到当前 p 所属的 timer 列表中 doaddtimer(pp, t) unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// doaddtimer adds t to the current P's heap. // The caller must have locked the timers for pp. funcdoaddtimer(pp *p, t *timer) { // ............... t.pp.set(pp) i := len(pp.timers) // 这里的操作和之前类似,只不过这次不是放在桶里了,而是放到了 P 上,放完之后依旧是堆化 pp.timers = append(pp.timers, t) siftupTimer(pp.timers, i) if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) }
// Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. timersLock mutex
// P 里面是有一个专门的地方来保存这个 timer 堆的 // Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. timers []*timer
// Number of timers in P's heap. // Modified using atomic instructions. numTimers uint32
// Number of timerDeleted timers in P's heap. // Modified using atomic instructions. deletedTimers uint32 // .............. }
这里可以看到我们的 timer 堆已经不再是一个放在全局各个桶下面的了,而是在 P 内部保存 timer 堆,其他和原来的基本思路一致
// deltimer deletes the timer t. It may be on some other P, so we can't // actually remove it from the timers heap. We can only mark it as deleted. // It will be removed in due course by the P whose heap it is on. // Reports whether the timer was removed before it was run. funcdeltimer(t *timer)bool { for { switch s := atomic.Load(&t.status); s { case timerWaiting, timerModifiedLater: // Prevent preemption while the timer is in timerModifying. // This could lead to a self-deadlock. See #38070. mp := acquirem() if atomic.Cas(&t.status, s, timerModifying) { // Must fetch t.pp before changing status, // as cleantimers in another goroutine // can clear t.pp of a timerDeleted timer. tpp := t.pp.ptr() if !atomic.Cas(&t.status, timerModifying, timerDeleted) { badTimer() } releasem(mp) atomic.Xadd(&tpp.deletedTimers, 1) // Timer was not yet run. returntrue } else { releasem(mp) } case timerModifiedEarlier: // Prevent preemption while the timer is in timerModifying. // This could lead to a self-deadlock. See #38070. mp := acquirem() if atomic.Cas(&t.status, s, timerModifying) { // Must fetch t.pp before setting status // to timerDeleted. tpp := t.pp.ptr() if !atomic.Cas(&t.status, timerModifying, timerDeleted) { badTimer() } releasem(mp) atomic.Xadd(&tpp.deletedTimers, 1) // Timer was not yet run. returntrue } else { releasem(mp) } case timerDeleted, timerRemoving, timerRemoved: // Timer was already run. returnfalse case timerRunning, timerMoving: // The timer is being run or moved, by a different P. // Wait for it to complete. osyield() case timerNoStatus: // Removing timer that was never added or // has already been run. Also see issue 21874. returnfalse case timerModifying: // Simultaneous calls to deltimer and modtimer. // Wait for the other call to complete. osyield() default: badTimer() } } }
何时触发?
那么问题来了,在新版本里面是什么时候出发的。其实如果之前没有看过调度相关的源码还真的有点难找。
schedule -> checkTimers -> runtimer
stealWork -> checkTimers -> runtimer
findrunnable -> checkTimers -> runtimer
其实是当调度的时候触发的 timer 检查,检查的时候触发的对应执行。而且如果你第一次看你会觉得神奇,为什么 work steal 的时候还会进行 timer 的检查呢?我们慢慢往下看。
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. funcschedule() { _g_ := getg()
// ....................
top: pp := _g_.m.p.ptr() pp.preempt = false
if sched.gcwaiting != 0 { gcstopm() goto top } if pp.runSafePointFn != 0 { runSafePointFn() }
// Sanity check: if we are spinning, the run queue should be empty. // Check this before calling checkTimers, as that might call // goready to put a ready goroutine on the local run queue. if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { throw("schedule: spinning with local work") }
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. funcfindrunnable() (gp *g, inheritTime bool) { _g_ := getg()
// The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M.
top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() }
funccheckTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { // If it's not yet time for the first timer, or the first adjusted // timer, then there is nothing to do. next := int64(atomic.Load64(&pp.timer0When)) nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest)) if next == 0 || (nextAdj != 0 && nextAdj < next) { next = nextAdj }
if next == 0 { // No timers to run or adjust. return now, 0, false }
if now == 0 { now = nanotime() } if now < next { // Next timer is not ready to run, but keep going // if we would clear deleted timers. // This corresponds to the condition below where // we decide whether to call clearDeletedTimers. // 当下一次触发实现还没有到的时候,这里有一个小细节,当需要删除 timer 个数小于 1/4 的时候是不操作的,直接返回,也就是说等着批量一起处理 if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } }
lock(&pp.timersLock)
iflen(pp.timers) > 0 { // 进行当前 p 的 timer 堆的调整,这个方法里面还有很多细节,这里不展开,推荐看一眼 adjusttimers(pp, now) forlen(pp.timers) > 0 { // Note that runtimer may temporarily unlock // pp.timersLock. // 如果有需要执行的 timer 的话,那么就调用 runtimer 方法去执行 if tw := runtimer(pp, now); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } }
// If this is the local P, and there are a lot of deleted timers, // clear them out. We only do this for the local P to reduce // lock contention on timersLock. if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) }
funcruntimer(pp *p, now int64)int64 { for { // 取出堆顶元素 t := pp.timers[0] if t.pp.ptr() != pp { throw("runtimer: bad p") } switch s := atomic.Load(&t.status); s { case timerWaiting: if t.when > now { // Not ready to run. return t.when }
if !atomic.Cas(&t.status, s, timerRunning) { continue } // Note that runOneTimer may temporarily unlock // pp.timersLock. // 如果已经到了当前触发时间,就运行当前这个 timer runOneTimer(pp, t, now) return0 case timerDeleted: //..................... case timerModifiedEarlier, timerModifiedLater: //..................... case timerModifying: osyield() case timerNoStatus, timerRemoved: badTimer() case timerRunning, timerRemoving, timerMoving: badTimer() default: badTimer() } } }
iflen(pp.timers) > 0 { // 找一个别的 P plocal := getg().m.p.ptr() // The world is stopped, but we acquire timersLock to // protect against sysmon calling timeSleepUntil. // This is the only case where we hold the timersLock of // more than one P, so there are no deadlock concerns. lock(&plocal.timersLock) lock(&pp.timersLock) // 这里把当前 P 上的 timer 都移走 moveTimers(plocal, pp.timers) //..................... }
版本对比
看完了源码你会发现,1.14 前后 timer 变化点主要在两个方面:
存储方式由原来的放在全局的桶里转而放到了 P 上
触发方式的由原来的单个 goroutine 方法循环定期触发改为调度中触发
接下来就是篇的最后重点部分了:为什么 1.14 前后 timer 需要做这样的优化?更快了吗?
我用图来让你快速明白为什么会有这个改动。
存储结构
存储结构上的改变很容易看出来,就是从原来的桶换成了 P,那么为什么呢?
问题关键
PS:图中的 TP 意思是 运行 timerproc 的 G
可以看到,改动之前,timer 的触发需要频繁的做 M 和 P 的绑定和解绑操作。
就这?对这就是问题的关键。我们举个例子,如果有一个 ticker 每秒触发一次,每触发一次就需要绑定一次 M 解绑一次,而当系统中的 timer 越来越多,那么随之带来的就是越加频繁的切换了。
而改动之后,timer 的触发是在调度循环里面,而且存储在本地的 P 中,所以没有了绑定和解绑的过程,也不再需要一个运行 timerproc goroutine 单独去维护触发。