📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

这一节终于来到了我们最为熟悉的一个对象 deployment,通常这可能是我们学习 k8s 接触的第一个大对象了,我们一般的应用也是以 deployment 来进行部署的,那么对于熟悉的它来说,我们应该从源码里面去找什么目标来看呢?对于我来说,deployment 的更新是我最好奇的,在我重新修改镜像版本之后,deployment 是如何一步步控制已有的 pod 进行更新的呢?这一节我们就从源码中揭秘这个过程。

前置知识

  • deployment 的基础使用
  • 滚动更新

心路历程

在我看来其他的属性与 pod 类似,而 deployment 作为一个 pod 的集合。那,为什么 deployment 要让 pod 的有多个副本呢?从最初的角度角度来说肯定是高可用了,所以 deployment 中最为关键的就是对 pod 的控制,也就是当 pod 的数量变化的时候,它是如何操作的。

码前提问

  1. deployment 是由哪个对象控制的?
  2. 应用更新的时候 deployment 是如何控制更新过程的?

源码分析

寻码过程

像 deployment 这样的源码比起其他就好找很多了,毕竟命名比较直接。在看来前几节之后,我不知道你是否发现了一个规律。通常看源码的正向思路可以被总结为:

  1. 找到对应实现的数据结构,通常是一个或多个结构体
  2. 看它的初始化,初始化能告诉你其中哪些必要的准备步骤,和具体一些字段的基础能力
  3. 看它的方法,通常就能知道你想要具体实现原理了

Deployment 结构

话不多说,先找到它的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// vendor/k8s.io/api/apps/v1/types.go:355
type Deployment struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Specification of the desired behavior of the Deployment.
// +optional
Spec DeploymentSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// Most recently observed status of the Deployment.
// +optional
Status DeploymentStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

可以看到,数据结构的定义和我们平常使用的 yaml 文件的定义是一一对应的,非常容易理解,可以简单浏览一下 Spec 的属性。

那么关键的问题来了,是哪个结构在控制 deployment?于是开始寻找 Deployment 的引用,看哪些位置在使用这个数据结构,引用很多,但是你只需要按文件去看就可以了。

阅读其他源码时,如果一个对象的引用,不要去寻找每一个代码引用的位置,而应该先从文件入手,如果引用的文件还是很多,可以从包的角度入手,一个包下通常能力方向也是类似的

于是,我找到了 DeploymentController 这个关键的对象(看命名也应该是它了,控制器嘛),今天我们后面就是围绕着它展开的。注意哦,

DeploymentController

DeploymentController 结构

还是类似的,先来看看结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// pkg/controller/deployment/deployment_controller.go:66
type DeploymentController struct {
// rsControl is used for adopting/releasing replica sets.
rsControl controller.RSControlInterface
client clientset.Interface

eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder

// To allow injection of syncDeployment for testing.
syncHandler func(ctx context.Context, dKey string) error
// used for unit testing
enqueueDeployment func(deployment *apps.Deployment)

// dLister can list/get deployments from the shared informer's store
dLister appslisters.DeploymentLister
// rsLister can list/get replica sets from the shared informer's store
rsLister appslisters.ReplicaSetLister
// podLister can list/get pods from the shared informer's store
podLister corelisters.PodLister

// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dListerSynced cache.InformerSynced
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced

// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}

注意两个点就好,一个是 syncHandler 还有一个是 queue 看到这两个字段我心里其实已经有个大概的思路了。下面就要用到我们在第一节提到的 informer 机制了。

NewDeploymentController

初始化是在 NewDeploymentController 方法中,我省略了其中一些部分,留下了重要的几个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// pkg/controller/deployment/deployment_controller.go:101
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
//....
dc := &DeploymentController{
//....
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
//....

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
//....

dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue

//....
return dc, nil
}

有了前面的知识,这里的代码我们就很容易理解了,关键是在于注册了有个 ResourceEvent 处理的各种能力,比如当 Add 事件来的时候,调用 addDeployment。先留心注意下面的两个部分 syncHandlerenqueueDeployment 后面会用到。接下来我们肯定会好奇,addDeployment 究竟是如何处理这个事件的,所以我们继续深入看里面的实现。

addDeployment

这里面的调用链路很清晰:addDeployment -> enqueueDeployment -> enqueue -> dc.queue.Add(key)

1
2
3
4
5
6
7
8
9
10
// pkg/controller/deployment/deployment_controller.go:391
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}

dc.queue.Add(key)
}

其实这些处理的工作,将 deployment 对应的 key 丢到队列里面去,所以下面我们只需要找到哪里在处理队列中的消息就可以了

Run

地方也很好找,是在 Run 里面,运行的时候启动了一定数量的 worker,然后每个 worker 循环去取消息。

1
2
3
4
5
6
7
8
9
10
11
// pkg/controller/deployment/deployment_controller.go:157
// Run begins watching and syncing.
func (dc *DeploymentController) Run(ctx context.Context, workers int) {
//...

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dc.worker, time.Second)
}

<-ctx.Done()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// pkg/controller/deployment/deployment_controller.go:473
func (dc *DeploymentController) worker(ctx context.Context) {
for dc.processNextWorkItem(ctx) {
}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)

err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)

return true
}

整个路径就是:Run -> worker -> processNextWorkItem -> syncHandler

可以看到就是一个标准的生产者消费者模型。然后关键就来到了 syncHandler 变量,还记得 dc.syncHandler = dc.syncDeployment 吗?对的,它在初始化时候被赋值为了 syncDeployment 这就到了我们这一节的重点方法了,注意看。

syncDeployment

这里我不想省略太多的代码,因为它本身是一个顺序结构,很容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// pkg/controller/deployment/deployment_controller.go:581
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
//...

deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}

// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()

//...

// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}

//...

if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}

// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}

scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}

switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  1. 第一步就是找到 deployment
  2. 第二步是找到 rsList 也是我们说的 RS
  3. 然后找到 podMap

寻找完了之后就开始根据状态进行操作。有哪些操作呢?

  • rollback 回滚
  • scaling 判断现在是不是在调整大小
  • rollout 关键来了,这就是更新,有两种模式
    • Recreate 重建
    • Rolling 滚动更新

这里我们最关心的策略终于暴露出来了,那就是滚动更新了,我们赶快来看看里面是怎么实现的。

rolloutRolling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// pkg/controller/deployment/rolling.go:31
// rolloutRolling implements the logic for rolling a new replica set.
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)

// Scale up, if we can.
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

// Scale down, if we can.
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}

// Sync deployment status
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

步骤其实比我想的要简单:

  1. 得到新旧的 RS 进行比较
  2. 先看要不要扩副本数,如果要则直接扩,并且就直接 sync 了不继续了
  3. 然后才轮到缩容,能操作就直接操作了。

然后,我们来回忆一下 pod 数量在实际更新中的变动过程,如果目前的 pod 是 3/3(目标/现有),那么扩容之后就会变成 3/4,此时下一次进来就不能扩了,只能变成缩了变成 3/3 然后不断往复,直到所以 pod 都满足期望要求的版本。想想真的蛮奇妙的,就是利用了简单的状态管理就实现了整个滚动更新过程,慢慢的就靠近了目标。这可能就是状态机的优雅吧,你只管改状态,剩下的协调交给我。

码后解答

  1. deployment 是由哪个对象控制的?
    1. DeploymentController
  2. 应用更新的时候 deployment 是如何控制更新过程的?
    1. 关键其实就在于:rolloutRolling ,将目标态的 pod 添加,打破平衡(状态变化),将不满足的旧状态移除,从而慢慢协调到最终状态。再说的简单一点:先尝试 scaledUp 然后尝试 scaledDown

总结提升

设计上

deployment 这里我们能学到哪些设计上的提升点呢?我个人有下面几个

  1. 首先就是 NewDeploymentController 里面对于 Informer 机制的运用
  2. 利用策略模式 RollingUpdateRecreate 两种不同实现很清晰
  3. 利用状态的管理来构建当前 rolloutRolling 的操作,对于编码来说清晰

熟悉了这部分的实现,那么对于其他对象类似的功能,我觉得你应该也能有自己的把握了。