📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

pod 是 k8s 调度的最小单位,也就是整个 k8s 的基础之一,那么如何创建 pod 就是我们今天的关键了。这也是为什么我将它放在第一章的原因。

前置知识

  • 什么是 pod?
  • 什么是 kubelet?
  • pod 的生命周期?

心路历程

想看 k8s 源码,我不知如何下手,肯定是挑最熟悉最基础的部分,pod 肯定就是其中之一。而且日常的使用也让我们更熟悉 pod 的生命周期,所以我准备从 pod 入手。那么我知道 kubelet 作为操作 pod 的关键,那肯定就是代码的重点。于是我直接在代码中搜 kubelet,找到对应文件名称为 kubelet 的文件,应该就是我们今天的目标了。

1
pkg/kubelet/kubelet.go

然后开始聚焦,由于源码很多,不可能面面俱到,所以一开始我们就要设定范围,看什么,不看什么。而我们今天的目标就是 pod 的创建 其他都和我们没有关系。所以,kubelet 本身的初始化等其他细节我们看到就略过。

码前提问

看源码之前都自己先提出一些问题,这些问题能帮助我们更快的进入状态,以便能快速定位到所需的关键。

  1. kubelet 怎么知道要创建 pod 的?
  2. 是 kubelet 本身去操作 CRI 的吗?还是有别人的帮助?
  3. pod 创建完毕之后需要做通知或其他操作吗?

源码分析

寻码过程

由于是第一篇,我就把详细的寻找过程也写进来,给小白提供思路。可略过。

  1. 在 IDE(我使用 Goland) 中打开 pkg/kubelet/kubelet.go
  2. shift + command + - 折叠所有方法
  3. 通过方法名寻找合适的需要查看的代码

HandlePodAdditions

我第一眼发下的三个方法:

  • HandlePodAdditions
  • HandlePodUpdates
  • HandlePodRemoves

显然这些方法是用于操作 pod 的,再看了一眼注释没错,那我们先看 HandlePodAdditions

向上的引用链路

通过 IDE command+点击方法名称可以看到哪里调用了这个方法

我习惯先看向上的链路,也就是是谁调用的这个方法,整个链路很清晰:

1
Run -> syncLoop -> syncLoopIteration -> HandlePodAdditions

然后简单查看一下 syncLoop,从这里我们就可以理解到,kubelet 本质处理模式就是事件循环处理。启动之后通过一个 syncLoop 来不断循环处理过来的事件,在 syncLoopIteration 中根据不同的事件类型通过不同的方法处理事件,从而完成对 pod 的操作。下面的代码就描述了对于不同事件的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// pkg/kubelet/kubelet.go:2387
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}

方法内部的操作

然后再看方法的内部,精简后,主要逻辑就是下面这样:

1
2
3
4
5
6
7
8
9
10
11
// pkg/kubelet/kubelet.go:2506
for _, pod := range pods {
kl.podManager.AddPod(pod)

kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
StartTime: start,
})
}

podManager 中添加,在 podWorkers 中更新。也就是 kubelet 有两个帮手:podManagerpodWorkers

那么接下来的 UpdatePod 就“有你好看”了,通常第一次看源码容易迷失的大多数原因就来源于大量的代码被吓怕了。还是那句话,我是来看 pod 如何创建的。所以其他的什么 if 判断全部都可以扔掉,因为它们都是在处理 pod 的其他状态,对于创建无关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pkg/kubelet/pod_workers.go:926
// start the pod worker goroutine if it doesn't exist
podUpdates, exists := p.podUpdates[uid]
if !exists {
// spawn a pod worker
go func() {
p.podWorkerLoop(uid, outCh)
}()
}


// notify the pod worker there is a pending update
status.pendingUpdate = &options
status.working = true
klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
select {
case podUpdates <- struct{}{}:
default:
}
  1. 创建一个 goroutine 执行 podWorkerLoop(当 uid 查不到的时候也就是不存在的时候)
  2. 通知 pod worker 需要处理了

根据这样的流程,你可以按照下面的路径开始理解和寻觅:

  1. podWorkerLoop pkg/kubelet/pod_workers.go:1213
  2. SyncPod pkg/kubelet/pod_workers.go:1285
  3. Kubelet.SyncPod pkg/kubelet/kubelet.go:1687
  4. kl.containerRuntime.SyncPod pkg/kubelet/kubelet.go:1934
  5. startContainer pkg/kubelet/kuberuntime/kuberuntime_container.go:177
1
2
3
4
5
6
7
8
9
10
// SyncPod syncs the running pod into the desired pod by executing following steps://  
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Resize running containers (if InPlacePodVerticalScaling==true)
// 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(

SyncPod 的注释写的很清楚,步骤 123… ,这就是我们所说的 pod 的创建过程,有关 sandbox 我们稍后文章再说,你可以简单理解为这里在创建 pod 所需要的环境。其中我们关注到两个步骤:

  • 步骤 6:创建 init 容器,这就是我们会在实际中用到的在 spec 中配置 initContainers,能帮助你在真正的业务容器启动前运行一些容器去初始化一些数据或环境变量,当然它的用途很多。
  • 步骤 8:创建 normal 容器,这就是真正的我们常用的业务容器了。

而创建容器的方法是 startContainer

1
2
3
4
5
6
7
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(

同样的,注释步骤很清晰,就是拉取镜像、创建镜像、启动,而这些最终的操作都落到了 ContainerManager 具体会在 vendor/k8s.io/cri-api/pkg/apis/services.go:34 也就是我们常说的 CRI 了。

码后解答

  1. kubelet 怎么知道要创建 pod 的?
    1. syncLoop 中有 updates chan 这个通道传递了 kubetypes.PodUpdate 事件,有事件(创建的事件)来的时候就会创建 pod。
  2. kubelet 本身去操作 CRI 的吗?还是有别人的帮助?
    1. kubeletkubeGenericRuntimeManager 其中有 RuntimeService 也就是 ContainerManager 也就是最终得 CRI。
  3. pod 创建完毕之后需要做通知或其他操作吗?
    1. 没有,创建只管创建,职责很清晰。

额外扩展

下面这些,这些就是不看源码所很难了解到的内部细节了,虽然不影响整体理解,但可以作为额外扩展来学习一下。

MirrorPod

HandlePodAdditions 创建的过程中出现了一个方法是:GetPodAndMirrorPod ,那么什么是 MirrorPod 呢?

kubelet 会为每个 静态 pod 创建一个 MirrorPod,而静态 pod 直接由 kubelet 管理,而不交给 apiserver。如果 静态 pod 出现 crashes 那么 kubelet 会直接重启。而通过 kubectl get pod 看到的就是 MirrorPod。其主要功能还是为了k8s 内部一些实现和操作方便,所以我们平常不需要知道它,也就是为什么看源码才知道,平常不知道的原因。

Static Pod 生命是 kubelet 控制的
普通的 Pod 生命是 control plane 控制的
那怎么让 APIserver 能看到它呢?答案就是 MirrorPod

如果还是不理解,我总结的不一定完整,建议看原文的参考文档:

总结提升

设计上

事件循环,这是一个非常常见的设计,就是如同 kubelet 一样,通过 syncLoop 来不断循环来读取事件,通过不同类型的事件来执行对应操作。这样的优点就是解耦,并且职责清晰,还能通过事件类型来不断地扩展相对应的功能。当然其中配合 go 中 channel 和 select 写起来更加舒适。

编码上

小细节,我觉得缺少函数式编程的耳濡目染很难写出这样的代码:在 SyncPod 方法中,启动容器被封装成了一个内置函数 start 通过这样来共享了 pod 的相关配置,如果抽离成一个新的函数,参数会很多,又要封装新的对象,不划算。故,这样的写法,值得学习和感染一下。

1
2
# pkg/kubelet/kuberuntime/kuberuntime_manager.go:1225
start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {