📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
 
前言 在 k8s 中 pod 之间的访问在所难免,多个系统之间往往有着调用和互动。那么他们之间的访问就离不开 DNS 的帮助,两个相同 namespace 下的 pod 可以直接通过名称访问,而不同 namespace 下也只需要加上 namespace 就可以了。而实现的关键就是我们的 “交通指挥员” Core-DNS。
前置知识 
心路历程 第一次知道它是哪个时候还是 kube-dns,后来才是 Core-DNS。因为在整个系统安装好之后,就会有一个 pod 名字带 core-dns 的 pod。我当时就好奇这个 pod 是干什么的,于是就去研究,后来才知道它是负责 DNS 的。今天我们就来一起读读 Core-DNS 的源码,看看它是如何工作的。
码前提问 在看实际的源码之前,首先我们要明白 Core-DNS 的基本架构和工作原理。Core-DNS 本质是一个灵活的 DNS 服务器,我们知道 DNS 服务器的主要功能是解析域名,将域名转换为 IP 地址。而 Core-DNS 解析的域名不是普通的域名,而是 Kubernetes 集群中的服务和 Pod 的域名。而 IP 地址则是 Pod 的 IP 地址。从而让 pod 之间访问的时候不要用记不住的 IP 地址,而且 IP 地址还会变动。那么其实我们最关心的就是 Core-DNS 是如何知道 Pod 的 IP 地址的,以及它是如何处理 DNS 查询的。
Core-DNS 是如何知道 pod 的 IP 地址的? 
Core-DNS 是如何处理 DNS 查询的? 
Core-DNS 有什么优化措施来提高性能? 
 
源码分析 首先 Core-DNS 在 https://github.com/coredns/coredns 
由于 Core-DNS 其实是利用了老版本的 caddy 作为一个启动器,而所有必要的功能都是以插件的形式存在的,所以看源码的时候可能会跳出这个仓库本身。并且跳出去之后可能通过查看引用的方式是没有办法直接跳回来的,所以请注意跳转的时候记住来源。
 
入口 入口其实特别简单,就在最外面的文件中,引入插件,然后调用 coremain.Run() 启动 Core-DNS。
1 2 3 4 5 6 7 8 9 import  (    _ "github.com/coredns/coredns/core/plugin"       "github.com/coredns/coredns/coremain"  ) func  main ()   {    coremain.Run() } 
 
然后你点进去 coremain.Run() 就会发现懵了,因为其本质是在启动一个 caddy 实例。而并没有任何调用 Core-DNS 的代码。怎么回事呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func  Run ()   {    caddy.TrapSignals()     flag.Parse()     if  len (flag.Args()) > 0  {         mustLogFatal(fmt.Errorf("extra command line arguments: %s" , flag.Args()))     }     log.SetOutput(os.Stdout)     log.SetFlags(LogFlags)     if  version {         showVersion()         os.Exit(0 )     }     if  plugins {         fmt.Println(caddy.DescribePlugins())         os.Exit(0 )     }     _, err := maxprocs.Set(maxprocs.Logger(log.Printf))     if  err != nil  {         log.Println("[WARNING] Failed to set GOMAXPROCS:" , err)     }          corefile, err := caddy.LoadCaddyfile(serverType)     if  err != nil  {         mustLogFatal(err)     }          instance, err := caddy.Start(corefile)     if  err != nil  {         mustLogFatal(err)     }     if  !dnsserver.Quiet {         showVersion()     }          instance.Wait() } 
 
此时,不要慌,我们可以尝试去 caddy.Start 里面看看,就会看到其实调用关系是 caddy.Start -> startWithListenerFds -> inst.context.MakeServers 。而其中的 context 是一个接口
1 2 3 4 5 6 7 8 9 type  Context interface  {    InspectServerBlocks(string , []caddyfile.ServerBlock) ([]caddyfile.ServerBlock, error )                         MakeServers() ([]Server, error ) } 
 
而在我们的 /core/dnsserver/register.go 有一个 dnsContext 实现了这个接口。
1 2 3 4 5 6 7 type  dnsContext struct  {    keysToConfigs map [string ]*Config          configs []*Config } 
 
而且其中的 MakeServers 方法就是我们需要的。它会调用 makeServersForGroup 方法来创建服务器实例。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func  makeServersForGroup (addr string , group []*Config)   ([]caddy.Server, error ) {         var  servers []caddy.Server     for  range  numSockets {                  switch  tr, _ := parse.Transport(addr); tr {         case  transport.DNS:             s, err := NewServer(addr, group)                      case  transport.TLS:             s, err := NewServerTLS(addr, group)                      case  transport.QUIC:             s, err := NewServerQUIC(addr, group)                      case  transport.GRPC:             s, err := NewServergRPC(addr, group)                      case  transport.HTTPS:             s, err := NewServerHTTPS(addr, group)                      }     }     return  servers, nil  } 
 
看到这里的 NewServer 方法了吗?它就是我们 Core-DNS 的核心服务器。我们可以继续深入看看它的实现。而 NewServer 创建的 Server 是实现了 caddy.TCPServer 和 caddy.UDPServer 接口的,即是有 Serve 和 ServePacket 方法的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func  (s *Server)   Serve(l net.Listener) error  {    s.m.Lock()     s.server[tcp] = &dns.Server{Listener: l,         Net:           "tcp" ,         TsigSecret:    s.tsigSecret,         MaxTCPQueries: tcpMaxQueries,         ReadTimeout:   s.readTimeout,         WriteTimeout:  s.writeTimeout,         IdleTimeout: func ()   time.Duration {             return  s.idleTimeout         },         Handler: dns.HandlerFunc(func (w dns.ResponseWriter, r *dns.Msg)   {             ctx := context.WithValue(context.Background(), Key{}, s)             ctx = context.WithValue(ctx, LoopKey{}, 0 )             s.ServeDNS(ctx, w, r)         })}     s.m.Unlock()     return  s.server[tcp].ActivateAndServe() } 
 
到这里,其实对于整个启动过程我们有了一个大致的认识,这里非常绕的原因是因为它的启动依赖于 Caddy ,所以真正的运行是在那边里面,而这里仅仅只是实现了必要的接口而已。所以有时候看源码单独只是看本仓库,往往会完全不理解在干什么 ,这件事告诉我们,有的时候如果看不明或找不到一些入口的时候,可能是因为它依赖了其他仓库的代码。
插件系统核心 其实插件本身并不复杂,在这里插件的实现就仅仅是实现接口,然后注册,最后被调用而已。接口是下面这样:
1 2 3 4 5 Handler interface  {     ServeDNS(context.Context, dns.ResponseWriter, *dns.Msg) (int , error )     Name() string  } 
 
而注册就更简单了
1 func  init ()   { plugin.Register("whoami" , setup) }
 
就是在每个插件的 init 函数中调用 plugin.Register 方法来注册插件。这样在 Core-DNS 启动时就会自动加载这些插件。当然,不要忘记本分,我们今天最重要的目的是看它在 k8s 中是如何配合工作的,所以我们需要关注的是 Kubernetes 插件的实现。
Kubernetes 插件分析 相比与其他插件,kubernetes 插件代码就要多的多了。Kubernetes 插件是 Core-DNS 的核心,负责与 K8s API 交互。首先让我们来看到注册的部分:
1 2 3 4 const  pluginName = "kubernetes" func  init ()   { plugin.Register(pluginName, setup) }
 
和其他插件一样,没什么好说的,继续,我们来看看 setup 函数,其中调用了 InitKubeCache 这个方法里面调用了 newdnsController 这是我们的关键:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func  newdnsController (ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts)   *dnsControl {    dns := dnsControl{         client:            kubeClient,         mcsClient:         mcsClient,         selector:          opts.selector,         namespaceSelector: opts.namespaceSelector,         stopCh:            make (chan  struct {}),         zones:             opts.zones,         endpointNameMode:  opts.endpointNameMode,         multiclusterZones: opts.multiclusterZones,     }     dns.svcLister, dns.svcController = object.NewIndexerInformer(         &cache.ListWatch{             ListFunc:  serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),             WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),         },         &api.Service{},         cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},         cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc, svcExtIPIndex: svcExtIPIndexFunc},         object.DefaultProcessor(object.ToService, nil ),     )     podLister, podController := object.NewIndexerInformer(         &cache.ListWatch{             ListFunc:  podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),             WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),         },         &api.Pod{},         cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},         cache.Indexers{podIPIndex: podIPIndexFunc},         object.DefaultProcessor(object.ToPod, nil ),     )     dns.podLister = podLister     if  opts.initPodCache {         dns.podController = podController     }     epLister, epController := object.NewIndexerInformer(         &cache.ListWatch{             ListFunc:  endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),             WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),         },         &discovery.EndpointSlice{},         cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},         cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},         object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),     )     dns.epLister = epLister     if  opts.initEndpointsCache {         dns.epController = epController     }     dns.nsLister, dns.nsController = object.NewIndexerInformer(         &cache.ListWatch{             ListFunc:  namespaceListFunc(ctx, dns.client, dns.namespaceSelector),             WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),         },         &api.Namespace{},         cache.ResourceEventHandlerFuncs{},         cache.Indexers{},         object.DefaultProcessor(object.ToNamespace, nil ),     )          return  &dns } 
 
看到熟悉的 Informer 了吗?它是 Kubernetes 的核心组件之一,用于监听和缓存 Kubernetes API 对象的变化。这里我们创建了多个 Informer,分别用于监听 Service、Pod、Endpoints 和 Namespace 的变化。我们知道,不管是在同一个 namespace 还是不同 namespace 下,Pod 都可以通过 DNS 名称访问其他 Pod。所以这些不同的事件变动都要监听。而这几个资源的变化都会触发 注册的 eventHandler 也就是 Add、Update 和 Delete 方法。
你一定以为 Add、Update 和 Delete 这些方法会具体处理数据?但其实你实际去看看,他们其实本质都是去更新了一下时间戳而已。而真正的数据 cache 在 cache.Indexers 里面,在 dnsControl 中有几个 cache.Indexer 专门用了放他们。而本地缓存的意义就在于避免去频繁的访问 Kubernetes API Server,这是显而易见的。
其中有一个 podLister cache.Indexer 我们后面还会看到。
ServeDNS 最后我们来看看请求来的时候是如何处理的,而这里的关键则就在与 ServeDNS 方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func  (k Kubernetes)   ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int , error ) {         switch  state.QType() {     case  dns.TypeA:         records, truncated, err = plugin.A(ctx, &k, zone, state, nil , plugin.Options{})     case  dns.TypeAAAA:         records, truncated, err = plugin.AAAA(ctx, &k, zone, state, nil , plugin.Options{})     case  dns.TypeTXT:         records, truncated, err = plugin.TXT(ctx, &k, zone, state, nil , plugin.Options{})     case  dns.TypeCNAME:         records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{})     case  dns.TypePTR:         records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{})     case  dns.TypeMX:         records, extra, err = plugin.MX(ctx, &k, zone, state, plugin.Options{})     case  dns.TypeSRV:         records, extra, err = plugin.SRV(ctx, &k, zone, state, plugin.Options{})     case  dns.TypeSOA:         if  qname == zone {             records, err = plugin.SOA(ctx, &k, zone, state, plugin.Options{})         }     case  dns.TypeAXFR, dns.TypeIXFR:         return  dns.RcodeRefused, nil           return  dns.RcodeSuccess, nil  } 
 
可以看到,这里会根据不同的 DNS 查询类型来调用不同的处理方法。比如对于 A 记录查询,会调用 plugin.A 方法,而对于 AAAA 记录查询,则会调用 plugin.AAAA 方法。这些方法会根据当前的状态和查询条件来返回相应的 DNS 记录。
而最终不同的方法都会转回到 Services 方法中,通过 k.Records(ctx, state, false) 最后我们可以确认我们找到了,findPods 方法
1 2 3 4 5 6 7 func  (k *Kubernetes)   Services(ctx context.Context, state request.Request, exact bool , opt plugin.Options) (svcs []msg.Service, err error ) {         s, e := k.Records(ctx, state, false )          return  internal, e } 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func  (k *Kubernetes)   Records(ctx context.Context, state request.Request, exact bool ) ([]msg.Service, error ) {         if  r.podOrSvc == Pod {         pods, err := k.findPods(r, state.Zone)         return  pods, err     }     var  services []msg.Service     var  err error      if  !multicluster {         services, err = k.findServices(r, state.Zone)     } else  {         services, err = k.findMultiClusterServices(r, state.Zone)     }     return  services, err } 
 
而 findPods 当我看到这个名字的时候我就知道距离胜利不远了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func  (k *Kubernetes)   findPods(r recordRequest, zone string ) (pods []msg.Service, err error ) {         zonePath := msg.Path(zone, coredns)     var  ip string      if  strings.Count(podname, "-" ) == 3  && !strings.Contains(podname, "--" ) {         ip = strings.ReplaceAll(podname, "-" , "." )     } else  {         ip = strings.ReplaceAll(podname, "-" , ":" )     }          for  _, p := range  k.APIConn.PodIndex(ip) {                  if  ip == p.PodIP && match(namespace, p.Namespace) {             s := msg.Service{Key: strings.Join([]string {zonePath, Pod, namespace, podname}, "/" ), Host: ip, TTL: k.ttl}             pods = append (pods, s)             err = nil          }     }     return  pods, err } 
 
显然这里就是对于请求的 Pod 的 IP 地址进行查询。我们只需要去看 PodIndex 方法就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func  (dns *dnsControl)   PodIndex(ip string ) (pods []*object.Pod) {    os, err := dns.podLister.ByIndex(podIPIndex, ip)     if  err != nil  {         return  nil      }     for  _, o := range  os {         p, ok := o.(*object.Pod)         if  !ok {             continue          }         pods = append (pods, p)     }     return  pods } 
 
而这个方法中的 podLister 就是我们之前看到的 podLister cache.Indexer 。它就是 Indexer 缓存。里面存放的就是 Pod 的信息。而 ByIndex 方法则是根据索引来查询 Pod 的信息。
至此整个链路就全部串起来了。
码后解答 
Core-DNS 是如何知道 pod 的 IP 地址的? 
 
还是老一套 Informer 机制而已,Core-DNS 通过 Kubernetes 的 API Server 获取 Pod 的信息,并将其缓存到本地的 cache.Indexer 中。每当 Pod 的信息发生变化时,相关的 Informer 会触发事件,更新缓存中的 Pod 信息。 
 
Core-DNS 是如何处理 DNS 查询的? 
 
其实本质就是启动一个 DNS 服务,只不过它对于 DNS 查询会处理 k8s 中的服务和 Pod 的域名解析而已。 
 
Core-DNS 有什么优化措施来提高性能? 
 
cache 机制,Core-DNS 使用了本地缓存来存储 Pod 和 Service 的信息,避免频繁访问 Kubernetes API Server。通过 Informer 机制监听资源变化,并更新本地缓存,从而提高查询性能。 
 
总结提升 插件机制 其实 Core DNS 本质里面有两种模式在里面,一个是对于 caddy 的套壳,它完全是利用了 caddy 作为了一个启动器,虽然从代码层面来说减少了很多项目启动运行部分的代码,但是实际中我们也看到了,它已经删除了 caddy 许多功能,完全与主干已经脱节了。所以其实它完全可以把那部分直接合并过来作为一个项目里面。我们在看代码的时候会发现跳来跳去,非常的麻烦。对于新人确实不好理解。而另一个模式是 plugin 机制,利用 golang 中的 init 方法实现注册,只要 import 了就会自动注册。很多插件系统的设计也都是如此。
最后提一次它吧,一路看到这里,我相信你已经明白为什么很多人吹 Informer 了。它的设计确实非常的巧妙,利用了缓存和事件驱动的方式来处理 Kubernetes 中的资源变化,而且无论是内部组件还是外部组件都可以用它。通过 Informer,我们可以轻松地监听和处理资源的增删改查,而不需要频繁地访问 API Server。这种设计大大提高了性能和效率。而最关键的是利用了它,所有其他想要扩展 Kubernetes 的功能都可以利用它一方面是简化了开发工作,一方面接入行为也统一。