📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

当我们知道了 pod 的生命周期,那么 k8s 如何知道一个 pod 的健康状态呢?就是通过今天要说的 Probe 也就是探针来检查 pod 的状态。一方面可以监控 pod 的健康状态,重启不健康的 pod;另一方面还可以监控 pod 的服务状态,当 pod 能提供服务时才会将流量打进来。

前置知识

  • livenessProbe
  • readinessProbe
  • startupProbe

要知道这三种探针的能力 https://kubernetes.io/zh-cn/docs/concepts/workloads/pods/pod-lifecycle/#types-of-probe

心路历程

探针这个东西就和 request limit 一样,你不配置的话,绝大多数适合,使用起来也问题不大。甚至在一开始的时候我都没注意到这个配置,但是当你的服务非常注重 SLA(承诺服务可用性) 或者你的容器出现了异常,无法服务又没有正确退出的时候,这个配置就显得非常有用了。而在实际中,不合适的探针配置也可能会导致奇怪的问题。

所以,针对探针,想要实际了解一下它具体是如何做的,防止一些意外使用。

码前提问

  1. 探针究竟是谁在探?master?worker?node?pod 自己?
  2. 探针是什么时候启动的?
  3. 探针何时停止?

源码分析

寻码过程

这次当然是搜索 probe 或者你搜索具体 livenessProbe 也可以找到对应的定义和接口。因为我看到的具体已经有连个目录名字就是 probe 所以优先确认目录下的代码是否为我需要的。

  • pkg/kubelet/prober 这里看起很像
  • pkg/probe 这个目录下有几个子目录,名称是:http、tcp 我就知道这个目录是 probe 的具体实现,具体是以一个什么方式去探活
    由于我们本次的目标不在于具体如何探活(发送一个 http 请求没啥看的)所以我们关注 pkg/kubelet/prober 目录下的源码

prober_manager.go

首先映入眼帘的就是 prober_manager.go 从命名就可以看出它是管理员,那先来看一下内部的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type manager struct {
// Map of active workers for probes
workers map[probeKey]*worker
// Lock for accessing & mutating workers
workerLock sync.RWMutex

// The statusManager cache provides pod IP and container IDs for probing.
statusManager status.Manager

// readinessManager manages the results of readiness probes
readinessManager results.Manager

// livenessManager manages the results of liveness probes
livenessManager results.Manager

// startupManager manages the results of startup probes
startupManager results.Manager

// prober executes the probe actions.
prober *prober

start time.Time
}

// Manager manages pod probing. It creates a probe "worker" for every container that specifies a
// probe (AddPod). The worker periodically probes its assigned container and caches the results. The
// manager use the cached probe results to set the appropriate Ready state in the PodStatus when
// requested (UpdatePodStatus). Updating probe parameters is not currently supported.
type Manager interface {
// AddPod creates new probe workers for every container probe. This should be called for every
// pod created.
AddPod(pod *v1.Pod)

// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)

// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results.
RemovePod(pod *v1.Pod)

// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPods map[types.UID]sets.Empty)

// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(*v1.Pod, *v1.PodStatus)
}

有一个 map 包含了所有 worker,然后一个锁,那既然这样,可以猜测 worker 就是最终干活的了。也应该是它来完成最终的 探针 工作。

看完结构,再看方法,manager 有两个很重要的方法:

  • func (m *manager) AddPod(pod *v1.Pod)
  • func (m *manager) RemovePod(pod *v1.Pod)
    显然这两个方法就是将 pod 添加到管理中,还有移出去的。

下面的代码就是 AddPod 中遍历找到所有探针的配置,然后进行创建,可以看到,如果 workers map 中没有,那么就会新建一个 worker 并且开一个协程去跑这个 worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// pkg/kubelet/prober/prober_manager.go:185
for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
key.containerName = c.Name

if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok {
klog.V(8).ErrorS(nil, "Startup probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name)
return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run() // 重点就是这里了
}

if c.ReadinessProbe != nil {
// 与 StartupProbe 类似...
}

if c.LivenessProbe != nil {
// 与 StartupProbe 类似...
}
}

那么只要知道谁调用了 AddPod 方法就能知道什么时候探针被启动了。我们发现调用的位置只有一个:pkg/kubelet/kubelet.go:1916 也就是:func (kl *Kubelet) SyncPod 方法中。

此时让我们回忆一下 kubelet 创建 pod 的时候的调用过程:

  1. podWorkerLoop pkg/kubelet/pod_workers.go:1213
  2. SyncPod pkg/kubelet/pod_workers.go:1285
  3. Kubelet.SyncPod pkg/kubelet/kubelet.go:1687
  4. kl.containerRuntime.SyncPod pkg/kubelet/kubelet.go:1934
  5. startContainer pkg/kubelet/kuberuntime/kuberuntime_container.go:177
    没错就是第三步骤,而且注意是在第四步骤之前哦。

什么时候停止

在 go 中有一个编码规范:当你使用 go 启动一个协程时,你必须要清楚的知道它什么时候会退出。否则容易导致协程泄露。

那么既然 probe 是开协程启动的,那么什么时候会停止呢?那肯定要看 run 方法里面了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// pkg/kubelet/prober/worker.go:145
// run periodically probes the container.
func (w *worker) run() {
ctx := context.Background()
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

//...

probeTicker := time.NewTicker(probeTickerPeriod)

defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}

w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
//.....
}()

probeLoop:
for w.doProbe(ctx) {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
case <-w.manualTriggerCh:
// continue
}
}
}

其本质就是根据用户配置的 PeriodSeconds 时间定时执行 doProbe() 方法,而退出则是在 stopCh 有消息的时候,那什么时候来消息呢?是 worker.stop() 的时候。而调用 worker.stop() 方法的位置有三个。

  1. func (m *manager) StopLivenessAndStartup
  2. func (m *manager) RemovePod
  3. func (m *manager) CleanupPods
    看方法名你应该就明白探针被关闭的时间了。

doProbe

for w.doProbe(ctx) { 执行探针的过程虽然代码长,但并不复杂。还是需要抓主要矛盾,精简之后就是如下的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
//.......

// Note, exec probe does NOT have access to pod environment variables or downward API
result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
// Prober error, throw away the result.
return true
}

//.......

w.resultsManager.Set(w.containerID, result, w.pod)

if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
// The container fails a liveness/startup check, it will need to be restarted.
// Stop probing until we see a new container ID. This is to reduce the
// chance of hitting #21751, where running `docker exec` when a
// container is being stopped may lead to corrupted container state.
w.onHold = true
w.resultRun = 0
}

return true
}
  • w.probeManager.prober.probe 其实就是根据具体不同的探针类型去执行不同的探针方法了。
  • w.resultsManager.Set 最关键的就是这里,最终将探针探测的结果通过 Set 方法传递了出去,为啥说传递呢?因为其内部就是一个 updates chan Update 的 channel。这样解耦了探测和状态改变。

码后解答

  1. 探针究竟是谁在探?master?worker?node?pod 自己?
    • 原来还是 kubelet,它通过一个 goroutine 来启动探针。
  2. 探针是什么时候启动的?
    • 在 SuncPod 初始化的时候。
  3. 探针何时停止?
    • StopLivenessAndStartupRemovePodCleanupPods 方法执行时,也就是要么是 pod 状态异常,或者是 pod 要被移除或清理了,同时探针就会被一起关闭。

额外细节

看到 StopLivenessAndStartup 方法名的时候我就注意到了,为什么 readinessProbe 不在其中呢?原因是:kubelet 关闭 pod 的时候,先 StopLivenessAndStartup 停止 livenessstartup 探针,再来 killPod,最后调用 RemovePod 来移除全部的探针。然后想想 readinessProbe 的作用你就会明白为什么是这个顺序了。

总结提升

设计上

在设计上有两个常见的解耦:

  1. 探测结果和根据具体结果进行处理的解构,探测只管探测,后续的处理不管,发出结果的消息就完事了。
  2. 探测具体实现的解耦(其实不能严格意义上这样讲),根据不同的探测类型有不同的探测实现。

编码上

break+label

1
2
3
4
5
6
7
8
9
10
11
12
probeLoop:
for w.doProbe(ctx) {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
case <-w.manualTriggerCh:
// continue
}
}
}

一个比较常用的 break+label 的写法,如果你没见过,可以了解下。通常在循环嵌套不方便退出的时候用。

定长的数组

1
2
3
4
5
6
for _, probeType := range [...]probeType{readiness, liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}

上面的代码有一个不起眼的小细节,这里用到了 [...] 也就是说这里最终其实是一个定长的数组,而不是 slice(切片),我们平常写可能 ... 就不加了,也不影响。可见 k8s 源码中的细节真的很多。