📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

前一节我们看到了 deployment 的滚动更新实现,如果你对它已经有一个比较清晰的认识,那么这一节的 replicaset 就非常容易理解了,因为基本实现都是差不多的。为了方便描述后面文中提及的 replicaset 统一简写为 RS。

在一开始学习 k8s 的时候其实我们不一定能碰到这个对象,如果只是日常的使用通常来说的都是 deployment 或者是 statefulset 这样。渐渐深入才会发现它。好像默默无闻的它是做什么的呢?

前置知识

  • RS 是什么?

心路历程

在不知道 RS 之前我一直都以为是 deployment 直接去控制的 pod。而在一开始了解之后,我会好奇为什么要设计一个 RS,直接控制不行吗?渐渐的深入,就会发现,其实它有着自己的设计在里面。

码前提问

  1. RS 和 Deployment 关系是什么?
  2. 有何特别的设计?

源码分析

寻码过程

有了 deployment 的经验其实 RS 寻码的过程就非常简单了。关键都是在 控制器 上。于是在相同的包下面我们就容易找到它。

1
2
kubernetes/pkg/controller/deployment
kubernetes/pkg/controller/replicaset

而且我相信有了前面的经验,你已经对这样的对象看源码的过程比较容易上手了,我们会依旧先从:结构、如何 New、如何使用,使用过程中的细节,这几个部分来着手。

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// pkg/controller/replicaset/replica_set.go:81
type ReplicaSetController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
eventBroadcaster record.EventBroadcaster

// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
// To allow injection of syncReplicaSet for testing.
syncHandler func(ctx context.Context, rsKey string) error

// A TTLCache of pod creates/deletes each rc expects to see.
expectations *controller.UIDTrackingControllerExpectations

// A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
rsLister appslisters.ReplicaSetLister
// rsListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced cache.InformerSynced
rsIndexer cache.Indexer

// A store of pods, populated by the shared informer passed to NewReplicaSetController
podLister corelisters.PodLister
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced cache.InformerSynced

// Controllers that need to be synced
queue workqueue.RateLimitingInterface
}

有没有一种熟悉的感觉,我第一次看的时候,就觉得和 deployment 几乎一模一样,而那么关键点也是在启动和同步(syncHandler)的的时候了,所以让我们直接往下看。

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// pkg/controller/replicaset/replica_set.go:138
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {

rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
eventBroadcaster: eventBroadcaster,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rsc.addRS(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
rsc.updateRS(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
rsc.deleteRS(logger, obj)
},
})
//.....

rsc.syncHandler = rsc.syncReplicaSet

return rsc
}

还是一样的 Informer 熟悉的配方,只不过这次都换成了 RS 的方法。然后就让我惊奇的发现了下面的路径。

整个路径就是:Run -> worker -> processNextWorkItem -> syncHandler

这?这不就和 deployment 一模一样了么。_所以其实对于这类对象本身的操作设计和行为都是一致的_。还是那一套事件处理的机制,唯独不一样的是什么呢?让我们回到 deployment 里面看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// pkg/controller/deployment/deployment_controller.go:101
func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ....
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addDeployment(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateDeployment(logger, oldObj, newObj)
},
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: func(obj interface{}) {
dc.deleteDeployment(logger, obj)
},
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dc.addReplicaSet(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dc.updateReplicaSet(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
dc.deleteReplicaSet(logger, obj)
},
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
dc.deletePod(logger, obj)
},
})
// ....
return dc, nil
}

之前我们没有发现,原来 rsInformer 就在里面,其实对于 RS 的事件 Deployment 也处理了,那么其实很容易理解了,其实最终控制 pod 的是 RS,而 Deployment 控制 RS ,RS 专心管 pod,而 Deployment 其实额外提供了之前说的升级和回滚等等利于实际应用升级部署的操作。所以我们上一节中更多关注在了 Deployment 本身的更新动作上,而 pod 其实还没仔细看。这里 RS 补充了这一部分。

于是说我就将源码的重心移动到了如何控制 pod 上面,于是我很快发现了 CreatePods 方法被调用的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// pkg/controller/replicaset/replica_set.go:566
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
logger := klog.FromContext(ctx)
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}

rsc.expectations.ExpectCreations(logger, rsKey, diff)
logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)

successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return nil
}
}
return err
})

// Any skipped pods that we never attempted to start shouldn't be expected.
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
if skippedPods := diff - successfulCreations; skippedPods > 0 {
logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(logger, rsKey)
}
}
return err
}

//...
return nil
}

其实这里的大方向的逻辑非常简单,就是根据 diff 的不同来对 pod 进行不同的处理,如果 diff < 0 就会去创建,而 diff > 0 就会去删除。删除的代码这里就不做展示了。

这里也是一个和之前函数式调用类似的方式调用 slowStartBatch 方法的时候将 fn 传递了进去,fn 其中就就是具体创建的方法,而 slowStartBatch 又是一个很不错可以学习的设计。这个我们放在后面一起说。而在这之前有一个重要步骤是 expectations 的创建

1
rsc.expectations.ExpectCreations(logger, rsKey, diff)

那么其实 expectations 里面记录的就是对于 rsKEY 的 diff,也就是改变量,让我们具体来看看这个里面是什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// pkg/controller/controller_utils.go:332
type UIDTrackingControllerExpectations struct {
ControllerExpectationsInterface
// TODO: There is a much nicer way to do this that involves a single store,
// a lock per entry, and a ControlleeExpectationsInterface type.
uidStoreLock sync.Mutex
// Store used for the UIDs associated with any expectation tracked via the
// ControllerExpectationsInterface.
uidStore cache.Store
}

// pkg/controller/controller_utils.go:147
type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
SatisfiedExpectations(logger klog.Logger, controllerKey string) bool
DeleteExpectations(logger klog.Logger, controllerKey string)
SetExpectations(logger klog.Logger, controllerKey string, add, del int) error
ExpectCreations(logger klog.Logger, controllerKey string, adds int) error
ExpectDeletions(logger klog.Logger, controllerKey string, dels int) error
CreationObserved(logger klog.Logger, controllerKey string)
DeletionObserved(logger klog.Logger, controllerKey string)
RaiseExpectations(logger klog.Logger, controllerKey string, add, del int)
LowerExpectations(logger klog.Logger, controllerKey string, add, del int)
}

上面的接口,而具体的实现是 ControllerExpectations 来实现的。它的结构非常简单其本质是 cache.Store 也就是一个泛型缓存,当然在那个年代里面还没有泛型,就是 interface 而已。

1
2
3
4
5
6
7
// pkg/controller/controller_utils.go:265
type ControlleeExpectations struct {
add int64
del int64
key string
timestamp time.Time
}

看到结构就容易理解了,其实内部存放的就是 add 和 del 的数量也就是期望的改变量。

1
2
3
4
// pkg/controller/controller_utils.go:281
func (e *ControlleeExpectations) Fulfilled() bool {
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

当二者都为 0 时则实际 == 预期,也就是我们在第一节提到的控制循环类似的思路。而其中还设计了一个过期时间,从而包装了整个逻辑,包括改变、校验等等。虽然感觉实际(工作中)用到的情况会比较少,但是这部分的代码依旧可以做一个案例参考,毕竟里面还是有一些并发操作的。而且这部分应该是还有优化的空间。

码后解答

  1. RS 和 Deployment 关系是什么?
    1. Deployment 控制 RS ,RS 控制 Pod
  2. 有何特别的设计?
    1. 那当然是 expectations 的设计

额外扩展

让我们回头来看看 slowStartBatch 的实现部分吧。从翻译上来看应该叫作批量慢启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
remaining := count
successes := 0
for batchSize := integer.IntMin(remaining, initialBatchSize); batchSize > 0; batchSize = integer.IntMin(2*batchSize, remaining) {
errCh := make(chan error, batchSize)
var wg sync.WaitGroup
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
if err := fn(); err != nil {
errCh <- err
}
}()
}
wg.Wait()
curSuccesses := batchSize - len(errCh)
successes += curSuccesses
if len(errCh) > 0 {
return successes, <-errCh
}
remaining -= batchSize
}
return successes, nil
}

看起来很复杂,其实很简单。

  1. initialBatchSize 从 1 开始,batchSize 就是从 1 开始(当然和 remaining 相比),每次批次是 x2 倍的递增,也就是 1 2 4 8 这样
  2. 每个执行批次通过 wg 控制并发
  3. 也就是每次只要前面一直成功并发就会越来越快

这个部分的设计几乎可以直接抄过来用的,这个 TCP 的拥塞控制还不一样,由于总工作量是有限制的,所以到后面不会并发爆发,也就不需要拥塞避免的部分,小体量的任务场景下好用。

总结提升

其实当我们看完源码,回过头来审视这两个对象的名称,DeploymentReplicaSet,我们就会发现,其实它们的名字就是它们的职责,Deployment 是部署,ReplicaSet 是复制集,而复制集就是去控制 pod 的复制,而部署就是去控制复制集的部署。只不过可能我们对于英文的不敏感,所以一开始不会这么直观的感受到。

设计上

从设计的角度而言,我觉得我们能学到的是有关与对象与职责的设计。我们可以看到 Deployment 并没有直接去控制 pod,而 ReplicaSet 去控制了 pod ,所以 ReplicaSet 的职责就是去管理 pod,而 Deployment 的职责是什么呢?其实是应用的生命周期,Deployment 允许你定义升级策略、回滚操作、滚动更新等功能,使得在应用程序更新时能够更加方便和可控,当你需要更新应用程序时,可以通过更新 Deployment 的来触发新的 ReplicaSet 的创建,然后逐步替换旧的 ReplicaSet 中的 Pod。

编码上

在编码上,我们可以学到两点,前面也提到了。一个是有关 expectations 的包装和设计,一个是有关 slowStartBatch 对于慢启动函数的设计,这些都是可以被我们学习和利用的。