📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

在了解了基础的 pod 再到常用的 deployment,我们对于应用常用的 k8s 中对象应该已经了一个比较清晰的认识。对内没有问题之后,让我们来看看对外。要想让你部署的服务能被外部访问到,那么离不开的就是 service,也是我们最常见到的第一个有关与外部访问的对象了。所以在这一章的第一节我们先来看看 Service 是如何实现的。

在看源码之前问自己一个问题,为什么需要 service 呢?其实能想到的重要原因有两个:一个是默认情况下,容器没办法直接被外部访问到,就像我们使用 docker 一样,如果不绑定宿主机的端口是没办法为外部服务的;另一个就是负载均衡,因为我们的 pod 常常是有多个的,并且关键的是 pod 还会可能分在不同的机器上,如果没有一个合理的策略去将外部的流量转到对应的服务上就如同没有了导航方向标。

前置知识

  1. Service 的基本使用
  2. Service 的类型

心路历程

这里看源码很容易被陷入进去,由于 Service 功能并没有那么的直接涉及的细节和点(技术方案)很多,如果直接搜索,然后看名字然后一个个看下来容易迷茫并且很难串起来。让我们回到第一章第一节看源码的时候,重新出发。抛开所有我们暂时不关心的地方,仅仅看主线,所以我们应该关注什么呢?从类型来看 ClusterIP、NodePort、LoadBalancer 这几种,ClusterIP 是用来内部访问的,NodePort 将绑定每个节点的一个端口来访问。我觉得这两个应该是我们最先接触 k8s 用到的,一个用于服务与服务之间的访问,一个用于外部访问做测试,内网或者本地测试经常用到。

而我们先不聚焦使用,或者说原理,第一节我们就最基本的看下 Service 这个对象是如何被创建的,创建了之后做了什么。有经验的同学可能了解并知道 kube-proxy 以及 iptables、ipvs 等等,那么 service 的创建之后会对他们有什么影响吗?

这下其实就变得非常简单的,只要找到 Service 对象和创建、控制的过程就可以了。没错,这一节我们就仅仅看这么多,所以特别简单不用担心。

码前提问

  1. Service 创建之后做了些什么?

源码分析

数据结构

看源码的方式还是一样的,首先我们最容易的也是最熟悉的是 Service 的数据结构,也就是我们常常看到的 yaml 文件

1
2
3
4
5
6
7
8
9
apiVersion: v1
kind: Service
spec:
type: ClusterIP
ports:
- protocol: TCP
port: 80
targetPort: 80
clusterIP: x.x.x.x

只要他是一个对象,那么在源码里面一定是一个 struct 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// staging/src/k8s.io/api/core/v1/types.go:5049
// vendor/k8s.io/api/core/v1/types.go:5049 (也是一样的)
type Service struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
Spec ServiceSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
Status ServiceStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

// staging/src/k8s.io/api/core/v1/types.go:4743
type ServiceSpec struct {
Ports []ServicePort `json:"ports,omitempty" patchStrategy:"merge" patchMergeKey:"port" protobuf:"bytes,1,rep,name=ports"`
Selector map[string]string `json:"selector,omitempty" protobuf:"bytes,2,rep,name=selector"`
ClusterIP string `json:"clusterIP,omitempty" protobuf:"bytes,3,opt,name=clusterIP"`
ClusterIPs []string `json:"clusterIPs,omitempty" protobuf:"bytes,18,opt,name=clusterIPs"`
Type ServiceType `json:"type,omitempty" protobuf:"bytes,4,opt,name=type,casttype=ServiceType"`
//...
}

显然这个结构就是我们常常见到 yaml 的对应了

控制器

有了之前第二章的经验,我们知道,在 k8s 中往往对象的控制都是通过一个控制器(controller)去控制的,并且控制的方式也都是通过 Informer 的方式,Service 也不例外,不过它不太好找。之前我们看 Deployment 那我们去找 DeploymentController 对吧,但我们这次可没找到一个叫做 ServiceController 的东西。并且你搜 ServiceController 容易误区找到其他一个对象。而它其实称为 EndpointController ,我们可以通过 ServiceInformer 找到它。先来看看 NewEndpointController 也就是创建的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// pkg/controller/endpoint/endpoints_controller.go:73
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})

e := &Controller{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
workerLoopPeriod: time.Second,
}

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.onServiceUpdate(cur)
},
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.onEndpointsDelete,
})
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

e.staleEndpointsTracker = newStaleEndpointsTracker()
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder

e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod

return e
}

无须细看,这套路我们再熟悉不过了,和之前一样的模式,Informer、Event 我们关注的 Service 就在里面。

1
2
3
4
5
6
7
8
9
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.onServiceUpdate(cur)
},
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced

onServiceUpdate 里面就只是简单入队而已,然后依旧和之前一样,还是看它的 Run 方法,然后通过启动多个 worker 执行 worker 方法就

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// pkg/controller/endpoint/endpoints_controller.go:166
func (e *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()

// Start events processing pipeline.
e.eventBroadcaster.StartStructuredLogging(0)
e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
defer e.eventBroadcaster.Shutdown()

defer e.queue.ShutDown()

logger := klog.FromContext(ctx)
logger.Info("Starting endpoint controller")
defer logger.Info("Shutting down endpoint controller")

if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
}

go func() {
defer utilruntime.HandleCrash()
e.checkLeftoverEndpoints()
}()

<-ctx.Done()
}

然后 worker -> processNextWorkItem -> syncService 最终在里面处理。

syncService

这个方法很长对吧?200+ 行,但其实本身并不复杂。本质就做了两件大事。我精简了部分之后,大体看上去是这样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// pkg/controller/endpoint/endpoints_controller.go:358
func (e *Controller) syncService(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)

service, err := e.serviceLister.Services(namespace).Get(name)

pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())

subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int

for _, pod := range pods {
ep, err := podToEndpointAddressForService(service, pod)

// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)

var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
subsets = endpoints.RepackSubsets(subsets)

// See if there's actually an update here.
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)

createEndpoints := len(currentEndpoints.ResourceVersion) == 0
compareLabels := currentEndpoints.Labels
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
return nil
}
newEndpoints := currentEndpoints.DeepCopy()
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels


logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
var updatedEndpoints *v1.Endpoints
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}

return nil
}

第一件事情很简单,Service 最关键的东西是什么?port 其实就是端口,因为除了 ip 和端口,你在 Service 里面也很少配置其他东西了不是吗… 所以其实看到的第一件事情也很简单,就是遍历所有的 service 将其中有 Ports 的情况搬出来为后面准备。因为对于 ClusterIP 来说,内部访问,那么知道 ip + 端口才能行。也就是前半部 for _, pod := range pods { 里面的事情。

然后第二件事情是在 // See if there's actually an update here. 这句话开始的时候(注释已经说的很明显了)去根据上面查询出来的情况对比新旧情况,去配置对应的 Endpoints ,当然这也是为什么它称为 endpoints_controller 的原因。最终如果需要创建 createEndpoints 则创建,如果需要更新 updatedEndpoints 更新。

好了,这个时候有同学就要问题 Endpoint 是啥呀,我怎么从来没听过呢?其实它也是 k8s 里面一个对象,只是由于我们不需要直接接触和配置它,通常使用的时候接触不到。你可以通过

1
kubectl get endpoints --all-namespaces

这样的命令看到,这不就是我们内部访问的 ip 和 端口吗?

1
2
3
4
5
kube-system                    kube-controller-manager-svc               10.0.10.205:10257                                                   2y111d
kube-system kube-dns 172.16.0.183:53,172.16.0.191:53,172.16.0.183:53 + 3 more... 2y118d
kube-system kube-scheduler-svc 10.0.10.205:10259 2y111d
kube-system kubelet 10.0.10.207:10250,10.0.10.209:10250,10.0.10.205:10250 + 6 more... 2y111d
kube-system metrics-server 10.0.10.207:4443 2y111d

也就是说,其实 service 只要正常的配置,正常的更新就可以了,而在 EndpointController 里面会根据这部分更新来更新 Endpoints ,从而配置好内部的访问情况。这也为什么我让你看了 EndpointController 的原因。这也是为什么我说整体比较简单的原因,因为其实本质上没有复杂的业务逻辑和调度切换关系。只有配置的更新。

码后解答

  1. Service 创建之后做了些什么?
    1. 从 Endpoint 角度来说,Service 的更新就直接影响了 Endpoint 的配置,根据不同的配置端口信息有了不同的配置

总结提升

编码上

这次在编码上有看到一个小细节,在 Run 的时候,使用了一个 defer utilruntime.HandleCrash(),别小看它,虽然只是对于一个 panic 的 recover 封装处理,但要记住,很多写法都会导致 panic 无法正确被捕获到哦。然后,内部默认有一个 PanicHandler,也可以自己传入 handler 去处理。最后判断 ReallyCrash 确认是否真的要崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func HandleCrash(additionalHandlers ...func(interface{})) {
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
if ReallyCrash {
// Actually proceed to panic.
panic(r)
}
}
}

总之是一个很不错的 panic 的 recover 封装,可以直接拿来使用。

设计上

其实和其他设计一样,都是对象的改变引起了配置的改变。而每一个改变也都是通过 Informer 机制联系在一起的。其实这一节的关键是要告诉你,有 Endpoint 这个对象的存在,有了它,映射关系就有了,就能定位到一个对象应该如何被正确访问了。当然这一节我们只是定位到了配置上的变动,下面我们将从原理上看看流量到底是如何被转发的。