📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

相比较于 deployment 和 StatefulSet,DaemonSet 是更简单的一个,也是最不常用的一个对象了。对于应用开发的同学来说可能几乎见不到它,而对于运维或者 SRE 的同学可能会熟悉一些。DaemonSet 用于确保集群中的每个节点运行有且仅有一个 pod 实例的场景。两个最常见的场景是:日志收集和监控。日志收集是为了收集每个节点上的日志,而监控则是为了监控每个节点的一些数据指标。通常来说以全局平台或者节点为场景的情况下才会想到它。那么 DaemonSet 的如何保证每个节点 pod 的数量呢?这一节让我从源码的角度看看它是如何实现的。

前置知识

  • DaemonSet 的基本使用

码前讨论

首先代码位置就不多说了,有前面的经验。

1
kubernetes/pkg/controller/daemon

由于前面我们已经看过了 deployment、rs、StatefulSet,那么其实对于 DaemonSet,我们也是一样几乎大致的形态结构都已经可以八九不离十了,而且它只有 daemon_controller.goupdate.go 两个文件,就像我前面说的也它其实很简单,并且功能也不复杂。所以这次我们换一种方式来认识源码,放大之前提问的部分。我们在最开始第一节的时候就提到过,看源码之前提几个问题能帮助我们快速进入状态和定位关键。而对于熟悉的结构,我们更可以通过这样的方式来快速阅读源码,而非逐字逐句去做翻译。

码前提问

问题 1

我们知道 DaemonSet 确保集群中每个节点有且仅有一个 pod ,那么当节点数量变化的时候,它一定会随之改变,那么 DaemonSet 的 controller 是如何感知这个变化的呢?如果是你去编写,你会从何处入手?在看源码之前你可以先大胆假设一下。

问题 2

关键的问题在于 DaemonSet 是如何保证集群中每个节点有且仅有一个 pod 的呢?需要做哪些设置呢。同样的,再看源码之前,你可以先问问自己,不是 DaemonSet 的情况,如果是一个普通的 deployment 你能否做的让 pod 调度到每个节点一个?如果可以,那么 DaemonSet 或许就是类似的思路。

问题 3

为了保证 pod 的关系和数量,我会猜测 DaemonSet 可能需要存 node 和 pod 的对应关系,如果有,是存在了哪里?

你可以先不看下面的分析,自己去寻找这三个问题的答案,找到之后再回来核对,看看是否与你的想法一致。


源码分析

问题 1

DaemonSet 是如何感知节点的变化的?

第一个问题相对来说比较简单。由于我们之前看过的所有对象来说,无论是对象本身的变化,还是 pod 的变化都是通过 informer 机制来告诉 controller 的。所以 node 的变化也无意外,也是通过这样的事件机制来做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// pkg/controller/daemon/daemon_controller.go:134
// NewDaemonSetsController creates a new DaemonSetsController
func NewDaemonSetsController(
ctx context.Context,
daemonSetInformer appsinformers.DaemonSetInformer,
historyInformer appsinformers.ControllerRevisionInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
// ...

nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dsc.addNode(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dsc.updateNode(logger, oldObj, newObj)
},
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeLister = nodeInformer.Lister()

// ...

return dsc, nil
}

NewDaemonSetsController 方法中可以明确看到,通过 nodeInformer 添加了有关节点变化的 event 处理方法,当有对应事件的时候,也就是 node 有变化的时候我们就能知道,并做出相应的调整。

如果这部分你能在看源码之前猜测到,那我觉得对于 informer 整个机制应该是真的掌握了。

问题 2

DaemonSet 是如何保证集群中每个节点有且仅有一个 pod 的?

这个问题稍微复杂一些,考查了你对于 k8s 一些基础概念的了解。我特别也没有在前置知识里面提及是怕过早公布答案。首先,让我们来想一下后面一个小问题,也就是如何让 deployment 能均匀分布到各个节点上去。

如果想把某个 pod 直接调度到特定的节点上,我们可以直接在 spec 下配置 nodeName 来解决。

1
2
3
4
5
6
apiVersion: v1
kind: Pod
metadata:
name: nginx
spec:
nodeName: foo-node # 调度 Pod 到特定的节点

而对于整个对象 deployment 或者是 statefulset,那么答案是 亲和性 。比如官方就给出过对于 zk 的部署最佳实践中就提到,让 statefulset 的 pod 分布到不同的节点,以保证更好的高可用,不会因为所有 pod 都在一个节点,而这个节点挂了就一起挂了的情况。如下:https://kubernetes.io/zh-cn/docs/tutorials/stateful-application/zookeeper/#tolerating-node-failure

1
2
3
4
5
6
7
8
9
10
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"

那么 DaemonSet 很大程度上会参考这样的规则,让调度器能把 pod 按照我们的要求每个节点调度一个。

于是乎,我们可以在源码中寻找来印证我们的想法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// pkg/controller/daemon/daemon_controller.go:993
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with errors if any
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// ...

batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()

podTemplate := template.DeepCopy()
// The pod's NodeAffinity will be updated to make sure the Pod is bound
// to the target node by default scheduler. It is safe to do so because there
// should be no conflicting node affinity with the target node.
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))

// ...
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - (batchSize + pos)
// ...
}

// ...
}

我们可以看到在 syncNodes 方法中 dsc.podControl.CreatePods 之前,除了将原有的所有 template 属性 DeepCopy 了一份之外,单独处理了 Affinity (亲和性)并且处理的条件是什么呢?也就是 ReplaceDaemonSetPodNodeNameNodeAffinity 的第二个参数

1
func ReplaceDaemonSetPodNodeNameNodeAffinity(affinity *v1.Affinity, nodename string) *v1.Affinity {

nodename 破案了~ 所以,其实 DaemonSet 就是靠着来实现的,其他都是浮云,本质其实挺简单的。其实复杂的部分都给调度器了。

问题 3

为了保证 pod 的关系和数量,我会猜测 DaemonSet 可能需要存 node 和 pod 的对应关系,如果有,是存在了哪里?

这是一个很容易被疑惑和误导的问题,其实有了问题 2 做铺垫,这个问题也就能瞥见一点了。如果没有看过源码,你或许就可能会想,DaemonSet 应该存储了节点和 pod 的对应关系,方便在选择的时候选择合适的节点,并且当新来的时候可以确认当前没有 pod 的节点是哪一个。而事实并不是这样。DaemonSet 并不会保存这样的对应关系。有一个显然的理由是,在问题 2 中我们已经看到,pod 的调度完全是依靠调度器去完成的,控制器仅仅只是描述信息罢了,最终 pod 会调度到哪里其实并不归他管。

但是,DaemonSet 也必须要知道这个对应关系,没有这个关系,无论是后续更新还是本身的状态变化都需要依赖这个部分。于是乎,我们可以在 rollingUpdate 的时候发现它是如何操作的。

1
2
3
4
5
6
7
8
9
10
// pkg/controller/daemon/update.go:42
// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes,
// remaining within the constraints imposed by the update strategy.
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
logger := klog.FromContext(ctx)
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods)

rollingUpdate 方法显然是用于执行 DaemonSet 滚动更新的时候用的,也就是 pod 不断更新的过程。而这个方法本身是用来计算出需要更新哪些 pod ,哪一些要删,哪一些要新增。具体就不再展开。关键是这个部分

1
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)

getNodesToDaemonPods 返回了一个 map,nodeToDaemonPods,key 是 NodeName 而 value 则是对应的 pod 列表。内部的实现其实也非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// pkg/controller/daemon/daemon_controller.go:755
func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
claimedPods, err := dsc.getDaemonPods(ctx, ds)
if err != nil {
return nil, err
}
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*v1.Pod)
logger := klog.FromContext(ctx)
for _, pod := range claimedPods {
if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
// This Pod has a finalizer or is already scheduled for deletion from the
// store by the kubelet or the Pod GC. The DS controller doesn't have
// anything else to do with it.
continue
}
nodeName, err := util.GetTargetNodeName(pod)
if err != nil {
logger.V(4).Info("Failed to get target node name of Pod in DaemonSet",
"pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
continue
}

nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
}

return nodeToDaemonPods, nil
}

可以看到就是将 DaemonPods 拿出来,通过 GetTargetNodeName 拿到对应的 nodeName 然后分好就可以了。其中内部就是通过 dsc.podLister.Pods(ds.Namespace).List(labels.Everything()) 来完成的。总结一下,就是其实当时直接查出来的。

看到这里你也许会好奇为什么我会单独把这个部分拿出来看,而不是去看其他创建或者计算的过程。首先我会觉得其他的部分可以算是 “业务” 它有着自己的逻辑,按部就班,并且正确计算条件即可。而之所以看这部分是想强化一下我们对于控制循环的理解,我们在这个大章节最开始就提到了它。控制循环的本质是根据当前状态和期望状态不一致,从而触发改变,让目标状态最终能变成期望状态,而关键在于是 ”当前状态“,这个状态可能会由于整个集群任何操作变化的改变而变动,所以只有当下去看,才能知道目前的状态是什么样的,改变的因素太多了。

总结提升

这一节我们看了 DaemonPods 的源码部分,如果你已经可以自己在源码中寻找到前面提出问题的答案,那么我相信对于各种其他的对象你也可以轻车熟路了。并且看到这里,你应该就能感觉到,其实看源码本身并不难,找准目标一步步往下走就可以了,虽然代码量很多,但是设计绝大多数其实都是相通的,一个类型看一个,都能举一反三。相信你渐渐能有这样的体会。

编码上

最后,在编码上,我们可以总结一个小点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// ...

errCh := make(chan error, createDiff+deleteDiff)
createWait := sync.WaitGroup{}
// ...
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()

// ...
if err != nil {
errCh <- err
}
}(i)
}
createWait.Wait()
// ...
}

// ...
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}

在我们前面看到的 syncNodes 方法中有一个非常标准的利用 WaitGroup 去并发处理任务并等待任务处理完毕,同时利用 chan error 将错误统一发送到 channel 最后一并处理合并的最佳实践。这一部分的编码我相信很多地方都是可以使用的,希望你也能学到。