📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

在 k8s 中 pod 之间的访问在所难免,多个系统之间往往有着调用和互动。那么他们之间的访问就离不开 DNS 的帮助,两个相同 namespace 下的 pod 可以直接通过名称访问,而不同 namespace 下也只需要加上 namespace 就可以了。而实现的关键就是我们的 “交通指挥员” Core-DNS。

前置知识

心路历程

第一次知道它是哪个时候还是 kube-dns,后来才是 Core-DNS。因为在整个系统安装好之后,就会有一个 pod 名字带 core-dns 的 pod。我当时就好奇这个 pod 是干什么的,于是就去研究,后来才知道它是负责 DNS 的。今天我们就来一起读读 Core-DNS 的源码,看看它是如何工作的。

码前提问

在看实际的源码之前,首先我们要明白 Core-DNS 的基本架构和工作原理。Core-DNS 本质是一个灵活的 DNS 服务器,我们知道 DNS 服务器的主要功能是解析域名,将域名转换为 IP 地址。而 Core-DNS 解析的域名不是普通的域名,而是 Kubernetes 集群中的服务和 Pod 的域名。而 IP 地址则是 Pod 的 IP 地址。从而让 pod 之间访问的时候不要用记不住的 IP 地址,而且 IP 地址还会变动。那么其实我们最关心的就是 Core-DNS 是如何知道 Pod 的 IP 地址的,以及它是如何处理 DNS 查询的。

  1. Core-DNS 是如何知道 pod 的 IP 地址的?
  2. Core-DNS 是如何处理 DNS 查询的?
  3. Core-DNS 有什么优化措施来提高性能?

源码分析

首先 Core-DNS 在 https://github.com/coredns/coredns

由于 Core-DNS 其实是利用了老版本的 caddy 作为一个启动器,而所有必要的功能都是以插件的形式存在的,所以看源码的时候可能会跳出这个仓库本身。并且跳出去之后可能通过查看引用的方式是没有办法直接跳回来的,所以请注意跳转的时候记住来源。

入口

入口其实特别简单,就在最外面的文件中,引入插件,然后调用 coremain.Run() 启动 Core-DNS。

1
2
3
4
5
6
7
8
9
// coredns.go
import (
_ "github.com/coredns/coredns/core/plugin" // Plug in CoreDNS.
"github.com/coredns/coredns/coremain"
)

func main() {
coremain.Run()
}

然后你点进去 coremain.Run() 就会发现懵了,因为其本质是在启动一个 caddy 实例。而并没有任何调用 Core-DNS 的代码。怎么回事呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Run is CoreDNS's main() function.
func Run() {
caddy.TrapSignals()
flag.Parse()

if len(flag.Args()) > 0 {
mustLogFatal(fmt.Errorf("extra command line arguments: %s", flag.Args()))
}

log.SetOutput(os.Stdout)
log.SetFlags(LogFlags)

if version {
showVersion()
os.Exit(0)
}
if plugins {
fmt.Println(caddy.DescribePlugins())
os.Exit(0)
}

_, err := maxprocs.Set(maxprocs.Logger(log.Printf))
if err != nil {
log.Println("[WARNING] Failed to set GOMAXPROCS:", err)
}

// Get Corefile input
corefile, err := caddy.LoadCaddyfile(serverType)
if err != nil {
mustLogFatal(err)
}

// Start your engines
instance, err := caddy.Start(corefile)
if err != nil {
mustLogFatal(err)
}

if !dnsserver.Quiet {
showVersion()
}

// Twiddle your thumbs
instance.Wait()
}

此时,不要慌,我们可以尝试去 caddy.Start 里面看看,就会看到其实调用关系是 caddy.Start -> startWithListenerFds -> inst.context.MakeServers 。而其中的 context 是一个接口

1
2
3
4
5
6
7
8
9
type Context interface {
InspectServerBlocks(string, []caddyfile.ServerBlock) ([]caddyfile.ServerBlock, error)

// This is what Caddy calls to make server instances.
// By this time, all directives have been executed and,
// presumably, the context has enough state to produce
// server instances for Caddy to start.
MakeServers() ([]Server, error)
}

而在我们的 /core/dnsserver/register.go 有一个 dnsContext 实现了这个接口。

1
2
3
4
5
6
7
// core/dnsserver/register.go:37
type dnsContext struct {
keysToConfigs map[string]*Config

// configs is the master list of all site configs.
configs []*Config
}

而且其中的 MakeServers 方法就是我们需要的。它会调用 makeServersForGroup 方法来创建服务器实例。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// core/dnsserver/register.go:301
func makeServersForGroup(addr string, group []*Config) ([]caddy.Server, error) {
//....

var servers []caddy.Server
for range numSockets {
// switch on addr
switch tr, _ := parse.Transport(addr); tr {
case transport.DNS:
s, err := NewServer(addr, group)
// ...
case transport.TLS:
s, err := NewServerTLS(addr, group)
// ...
case transport.QUIC:
s, err := NewServerQUIC(addr, group)
// ...
case transport.GRPC:
s, err := NewServergRPC(addr, group)
// ...
case transport.HTTPS:
s, err := NewServerHTTPS(addr, group)
// ...
}
}
return servers, nil
}

看到这里的 NewServer 方法了吗?它就是我们 Core-DNS 的核心服务器。我们可以继续深入看看它的实现。而 NewServer 创建的 Server 是实现了 caddy.TCPServercaddy.UDPServer 接口的,即是有 ServeServePacket 方法的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// core/dnsserver/server.go:148
func (s *Server) Serve(l net.Listener) error {
s.m.Lock()

s.server[tcp] = &dns.Server{Listener: l,
Net: "tcp",
TsigSecret: s.tsigSecret,
MaxTCPQueries: tcpMaxQueries,
ReadTimeout: s.readTimeout,
WriteTimeout: s.writeTimeout,
IdleTimeout: func() time.Duration {
return s.idleTimeout
},
Handler: dns.HandlerFunc(func(w dns.ResponseWriter, r *dns.Msg) {
ctx := context.WithValue(context.Background(), Key{}, s)
ctx = context.WithValue(ctx, LoopKey{}, 0)
s.ServeDNS(ctx, w, r)
})}

s.m.Unlock()

return s.server[tcp].ActivateAndServe()
}

到这里,其实对于整个启动过程我们有了一个大致的认识,这里非常绕的原因是因为它的启动依赖于 Caddy ,所以真正的运行是在那边里面,而这里仅仅只是实现了必要的接口而已。所以有时候看源码单独只是看本仓库,往往会完全不理解在干什么,这件事告诉我们,有的时候如果看不明或找不到一些入口的时候,可能是因为它依赖了其他仓库的代码。

插件系统核心

其实插件本身并不复杂,在这里插件的实现就仅仅是实现接口,然后注册,最后被调用而已。接口是下面这样:

1
2
3
4
5
// plugin/plugin.go:50
Handler interface {
ServeDNS(context.Context, dns.ResponseWriter, *dns.Msg) (int, error)
Name() string
}

而注册就更简单了

1
func init() { plugin.Register("whoami", setup) }

就是在每个插件的 init 函数中调用 plugin.Register 方法来注册插件。这样在 Core-DNS 启动时就会自动加载这些插件。当然,不要忘记本分,我们今天最重要的目的是看它在 k8s 中是如何配合工作的,所以我们需要关注的是 Kubernetes 插件的实现。

Kubernetes 插件分析

相比与其他插件,kubernetes 插件代码就要多的多了。Kubernetes 插件是 Core-DNS 的核心,负责与 K8s API 交互。首先让我们来看到注册的部分:

1
2
3
4
// plugin/kubernetes/setup.go:26
const pluginName = "kubernetes"

func init() { plugin.Register(pluginName, setup) }

和其他插件一样,没什么好说的,继续,我们来看看 setup 函数,其中调用了 InitKubeCache 这个方法里面调用了 newdnsController 这是我们的关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// plugin/kubernetes/controller.go:130
func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts) *dnsControl {
dns := dnsControl{
client: kubeClient,
mcsClient: mcsClient,
selector: opts.selector,
namespaceSelector: opts.namespaceSelector,
stopCh: make(chan struct{}),
zones: opts.zones,
endpointNameMode: opts.endpointNameMode,
multiclusterZones: opts.multiclusterZones,
}

dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc, svcExtIPIndex: svcExtIPIndexFunc},
object.DefaultProcessor(object.ToService, nil),
)

podLister, podController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{podIPIndex: podIPIndexFunc},
object.DefaultProcessor(object.ToPod, nil),
)
dns.podLister = podLister
if opts.initPodCache {
dns.podController = podController
}

epLister, epController := object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.epLister = epLister
if opts.initEndpointsCache {
dns.epController = epController
}

dns.nsLister, dns.nsController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector),
WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),
},
&api.Namespace{},
cache.ResourceEventHandlerFuncs{},
cache.Indexers{},
object.DefaultProcessor(object.ToNamespace, nil),
)

//....

return &dns
}

看到熟悉的 Informer 了吗?它是 Kubernetes 的核心组件之一,用于监听和缓存 Kubernetes API 对象的变化。这里我们创建了多个 Informer,分别用于监听 Service、Pod、Endpoints 和 Namespace 的变化。我们知道,不管是在同一个 namespace 还是不同 namespace 下,Pod 都可以通过 DNS 名称访问其他 Pod。所以这些不同的事件变动都要监听。而这几个资源的变化都会触发 注册的 eventHandler 也就是 AddUpdateDelete 方法。

你一定以为 AddUpdateDelete 这些方法会具体处理数据?但其实你实际去看看,他们其实本质都是去更新了一下时间戳而已。而真正的数据 cache 在 cache.Indexers 里面,在 dnsControl 中有几个 cache.Indexer 专门用了放他们。而本地缓存的意义就在于避免去频繁的访问 Kubernetes API Server,这是显而易见的。

其中有一个 podLister cache.Indexer 我们后面还会看到。

ServeDNS

最后我们来看看请求来的时候是如何处理的,而这里的关键则就在与 ServeDNS 方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// plugin/kubernetes/handler.go:13
func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
//....
switch state.QType() {
case dns.TypeA:
records, truncated, err = plugin.A(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeAAAA:
records, truncated, err = plugin.AAAA(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeTXT:
records, truncated, err = plugin.TXT(ctx, &k, zone, state, nil, plugin.Options{})
case dns.TypeCNAME:
records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{})
case dns.TypePTR:
records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{})
case dns.TypeMX:
records, extra, err = plugin.MX(ctx, &k, zone, state, plugin.Options{})
case dns.TypeSRV:
records, extra, err = plugin.SRV(ctx, &k, zone, state, plugin.Options{})
case dns.TypeSOA:
if qname == zone {
records, err = plugin.SOA(ctx, &k, zone, state, plugin.Options{})
}
case dns.TypeAXFR, dns.TypeIXFR:
return dns.RcodeRefused, nil

//....
return dns.RcodeSuccess, nil
}

可以看到,这里会根据不同的 DNS 查询类型来调用不同的处理方法。比如对于 A 记录查询,会调用 plugin.A 方法,而对于 AAAA 记录查询,则会调用 plugin.AAAA 方法。这些方法会根据当前的状态和查询条件来返回相应的 DNS 记录。

而最终不同的方法都会转回到 Services 方法中,通过 k.Records(ctx, state, false) 最后我们可以确认我们找到了,findPods 方法

1
2
3
4
5
6
7
// plugin/kubernetes/kubernetes.go:95
func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) {
//...
s, e := k.Records(ctx, state, false)
//...
return internal, e
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// plugin/kubernetes/kubernetes.go:310
func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {
//...
if r.podOrSvc == Pod {
pods, err := k.findPods(r, state.Zone)
return pods, err
}

var services []msg.Service
var err error
if !multicluster {
services, err = k.findServices(r, state.Zone)
} else {
services, err = k.findMultiClusterServices(r, state.Zone)
}
return services, err
}

findPods 当我看到这个名字的时候我就知道距离胜利不远了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// plugin/kubernetes/kubernetes.go:359
func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) {
//...

zonePath := msg.Path(zone, coredns)

var ip string
if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") {
ip = strings.ReplaceAll(podname, "-", ".")
} else {
ip = strings.ReplaceAll(podname, "-", ":")
}

//...

for _, p := range k.APIConn.PodIndex(ip) {
// check for matching ip and namespace
if ip == p.PodIP && match(namespace, p.Namespace) {
s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}
pods = append(pods, s)

err = nil
}
}
return pods, err
}

显然这里就是对于请求的 Pod 的 IP 地址进行查询。我们只需要去看 PodIndex 方法就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// plugin/kubernetes/controller.go:504
func (dns *dnsControl) PodIndex(ip string) (pods []*object.Pod) {
os, err := dns.podLister.ByIndex(podIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
p, ok := o.(*object.Pod)
if !ok {
continue
}
pods = append(pods, p)
}
return pods
}

而这个方法中的 podLister 就是我们之前看到的 podLister cache.Indexer 。它就是 Indexer 缓存。里面存放的就是 Pod 的信息。而 ByIndex 方法则是根据索引来查询 Pod 的信息。

至此整个链路就全部串起来了。

码后解答

  1. Core-DNS 是如何知道 pod 的 IP 地址的?
  • 还是老一套 Informer 机制而已,Core-DNS 通过 Kubernetes 的 API Server 获取 Pod 的信息,并将其缓存到本地的 cache.Indexer 中。每当 Pod 的信息发生变化时,相关的 Informer 会触发事件,更新缓存中的 Pod 信息。
  1. Core-DNS 是如何处理 DNS 查询的?
  • 其实本质就是启动一个 DNS 服务,只不过它对于 DNS 查询会处理 k8s 中的服务和 Pod 的域名解析而已。
  1. Core-DNS 有什么优化措施来提高性能?
  • cache 机制,Core-DNS 使用了本地缓存来存储 Pod 和 Service 的信息,避免频繁访问 Kubernetes API Server。通过 Informer 机制监听资源变化,并更新本地缓存,从而提高查询性能。

总结提升

插件机制

其实 Core DNS 本质里面有两种模式在里面,一个是对于 caddy 的套壳,它完全是利用了 caddy 作为了一个启动器,虽然从代码层面来说减少了很多项目启动运行部分的代码,但是实际中我们也看到了,它已经删除了 caddy 许多功能,完全与主干已经脱节了。所以其实它完全可以把那部分直接合并过来作为一个项目里面。我们在看代码的时候会发现跳来跳去,非常的麻烦。对于新人确实不好理解。而另一个模式是 plugin 机制,利用 golang 中的 init 方法实现注册,只要 import 了就会自动注册。很多插件系统的设计也都是如此。

Informer

最后提一次它吧,一路看到这里,我相信你已经明白为什么很多人吹 Informer 了。它的设计确实非常的巧妙,利用了缓存和事件驱动的方式来处理 Kubernetes 中的资源变化,而且无论是内部组件还是外部组件都可以用它。通过 Informer,我们可以轻松地监听和处理资源的增删改查,而不需要频繁地访问 API Server。这种设计大大提高了性能和效率。而最关键的是利用了它,所有其他想要扩展 Kubernetes 的功能都可以利用它一方面是简化了开发工作,一方面接入行为也统一。