在了解了基础的 pod 再到常用的 deployment,我们对于应用常用的 k8s 中对象应该已经了一个比较清晰的认识。对内没有问题之后,让我们来看看对外。要想让你部署的服务能被外部访问到,那么离不开的就是 service,也是我们最常见到的第一个有关与外部访问的对象了。所以在这一章的第一节我们先来看看 Service 是如何实现的。
在看源码之前问自己一个问题,为什么需要 service 呢?其实能想到的重要原因有两个:一个是默认情况下,容器没办法直接被外部访问到,就像我们使用 docker 一样,如果不绑定宿主机的端口是没办法为外部服务的;另一个就是负载均衡,因为我们的 pod 常常是有多个的,并且关键的是 pod 还会可能分在不同的机器上,如果没有一个合理的策略去将外部的流量转到对应的服务上就如同没有了导航方向标。
前置知识
Service 的基本使用
Service 的类型
心路历程
这里看源码很容易被陷入进去,由于 Service 功能并没有那么的直接涉及的细节和点(技术方案)很多,如果直接搜索,然后看名字然后一个个看下来容易迷茫并且很难串起来。让我们回到第一章第一节看源码的时候,重新出发。抛开所有我们暂时不关心的地方,仅仅看主线,所以我们应该关注什么呢?从类型来看 ClusterIP、NodePort、LoadBalancer 这几种,ClusterIP 是用来内部访问的,NodePort 将绑定每个节点的一个端口来访问。我觉得这两个应该是我们最先接触 k8s 用到的,一个用于服务与服务之间的访问,一个用于外部访问做测试,内网或者本地测试经常用到。
而我们先不聚焦使用,或者说原理,第一节我们就最基本的看下 Service 这个对象是如何被创建的,创建了之后做了什么。有经验的同学可能了解并知道 kube-proxy 以及 iptables、ipvs 等等,那么 service 的创建之后会对他们有什么影响吗?
这下其实就变得非常简单的,只要找到 Service 对象和创建、控制的过程就可以了。没错,这一节我们就仅仅看这么多,所以特别简单不用担心。
码前提问
Service 创建之后做了些什么?
源码分析
数据结构
看源码的方式还是一样的,首先我们最容易的也是最熟悉的是 Service 的数据结构,也就是我们常常看到的 yaml 文件
subsets := []v1.EndpointSubset{} var totalReadyEps int var totalNotReadyEps int
for _, pod := range pods { ep, err := podToEndpointAddressForService(service, pod)
// Allow headless service not to have ports. iflen(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] portNum, err := podutil.FindPort(pod, servicePort) if err != nil { logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err) continue } epp := endpointPortFromServicePort(servicePort, portNum)
// See if there's actually an update here. currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
createEndpoints := len(currentEndpoints.ResourceVersion) == 0 compareLabels := currentEndpoints.Labels if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok { compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService) } // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints // updates caused by Pod updates that we don't care, e.g. annotation update. if !createEndpoints && endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) returnnil } newEndpoints := currentEndpoints.DeepCopy() newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) var updatedEndpoints *v1.Endpoints if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) }
returnnil }
第一件事情很简单,Service 最关键的东西是什么?port 其实就是端口,因为除了 ip 和端口,你在 Service 里面也很少配置其他东西了不是吗… 所以其实看到的第一件事情也很简单,就是遍历所有的 service 将其中有 Ports 的情况搬出来为后面准备。因为对于 ClusterIP 来说,内部访问,那么知道 ip + 端口才能行。也就是前半部 for _, pod := range pods { 里面的事情。
然后第二件事情是在 // See if there's actually an update here. 这句话开始的时候(注释已经说的很明显了)去根据上面查询出来的情况对比新旧情况,去配置对应的 Endpoints ,当然这也是为什么它称为 endpoints_controller 的原因。最终如果需要创建 createEndpoints 则创建,如果需要更新 updatedEndpoints 更新。
funcHandleCrash(additionalHandlers ...func(interface{})) { if r := recover(); r != nil { for _, fn := range PanicHandlers { fn(r) } for _, fn := range additionalHandlers { fn(r) } if ReallyCrash { // Actually proceed to panic. panic(r) } } }