📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!

前言

前一节我们了解了 Service 和 Endpoint 的创建,发现他们本质上还是在改 k8s 内部的配置(数据),并没有实际去干活,也就是最终没有反应在机器的网络配置上。我们知道,想要让你的 pod 可以被 ClusterIP 访问或者是通过外部访问,都需要对于网络设备做一些规则的调整,也就是我们常说的 iptablesipvs。而 kube-proxy 组件就是来做这个工作的,它会将所需要的路由规则最终配置好。那么它究竟是如何做的呢?

前置知识

  • ipvs、iptables 的基础

码前提问

  1. kube-proxy 是什么类型的对象?
  2. ipvs 是由谁来配的?
  3. ipvs 后续如何更新?

源码分析

首先我们需要找到 kube-proxy 在哪。和之前不一样的是,之前我们看到都是一个个对象,而这次它是一个独立的组件。如果你之前没看过,也没了解过,你可以先思考一个问题 kube-proxy 应该是一个什么类型的对象呢?deployment 还是 statefulset 呢?最后我们再来验证一下。

启动

启动命令

一个独立的组件,其实也就是一个二进制,运行的一个程序而已,从 yaml 中启动的命令我们可以看到,参数是一个配置文件,还有一个 node 名称。

1
2
3
4
command:
- /usr/local/bin/kube-proxy
- '--config=/var/lib/kube-proxy/config.conf'
- '--hostname-override=$(NODE_NAME)'

入口

这样的组件我们非常容易通过 cmd 找到入口 kubernetes/cmd/kube-proxy/proxy.go。之前没有提到过,k8s 中是使用 cobra 来实现命令行的功能的,非常好用的一个库。然后 kube-proxy 就一个命令直接 run。

cmd/kube-proxy/app/server.go:501 很容易顺着路径找到。NewProxyCommand -> opts.Run() -> newProxyServer -> createProxier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// cmd/kube-proxy/app/server.go:359
func (o *Options) Run() error {
defer close(o.errCh)
if len(o.WriteConfigTo) > 0 {
return o.writeConfigFile()
}

if o.CleanupAndExit {
return cleanupAndExit()
}

proxyServer, err := newProxyServer(o.config, o.master)
if err != nil {
return err
}

o.proxyServer = proxyServer
return o.runLoop()
}

记住这里的,等下还要回来的,其实就是 newProxyServerr 然后 runLoop ,熟悉的感觉。

Proxier

然后是第一个要点,运行的模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// cmd/kube-proxy/app/server_others.go:128
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration, dualStack bool) (proxy.Provider, error) {
var proxier proxy.Provider
var err error

// ....
if config.Mode == proxyconfigapi.ProxyModeIPTables {
klog.InfoS("Using iptables Proxier")
// ....
} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
// ....
}
return proxier, nil
}
1
2
3
4
5
const (
ProxyModeIPTables ProxyMode = "iptables"
ProxyModeIPVS ProxyMode = "ipvs"
ProxyModeKernelspace ProxyMode = "kernelspace"
)

iptablesipvs 也就是 proxy 的模式,也就是 kube-proxy 最重要的实现了。

这里我们其实可以看到里面 NewProxier 这样类似的方法参数是非常多的,而且并没有封装成对象,可以看到观感是并不好的,而且如果改动起来还是比较麻烦的,你必须仔细核对每个参数的顺序。如果是实际中,我是不会建议这样写的。

我们知道 iptables 的实现是存在性能问题的,相较于它我们更关注 ipvs 的实现,而对于 NewProxier 里面的参数太多这里不一一说明。看一个小点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// pkg/proxy/ipvs/proxier.go:309
func NewProxier(ipFamily v1.IPFamily,
// ...
) (*Proxier, error) {
// ...

proxier := &Proxier{
// ...
}
// ...
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
proxier.gracefuldeleteManager.Run()
return proxier, nil
}

记住这里的 syncRunner 是一个 NewBoundedFrequencyRunner 类型,而其具体里面内部调用的方法其实是 proxier.syncProxyRules

Run

然后创建完,就是启动了,Run 里面的启动,关键就在下面这部分我精简了一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// cmd/kube-proxy/app/server.go:844
func (s *ProxyServer) Run() error {
// ...

// Create configs (i.e. Watches for Services and EndpointSlices)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.Config.ConfigSyncPeriod.Duration)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)

endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.Config.ConfigSyncPeriod.Duration)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)

// ...
nodeConfig.RegisterEventHandler(s.Proxier)

go nodeConfig.Run(wait.NeverStop)

// Birth Cry after the birth is successful
s.birthCry()

go s.Proxier.SyncLoop()

return <-errCh
}

可以看到三个

  • serviceConfig.RegisterEventHandler(s.Proxier)
  • endpointSliceConfig.RegisterEventHandler(s.Proxier)
  • nodeConfig.RegisterEventHandler(s.Proxier)

分别对应与三种不通类比的事件,service、endpoint、node。看到这里,我相信你应该也已经明白了,至少当这些资源出现变化的时候,显然 kube-proxy 也许需要做相应的处理。

不仅如此,让我们关注到最后的 go s.Proxier.SyncLoop() 也就是说,它自己本身也有一个同步循环在的。而上面三个,最后也落在来循环上,有兴趣你可以进去继续看看。

注意此时开始,我们不能通过简单的点击来查看源码了,因为这里涉及了两个看源码的容易迷糊的点。

  1. Proxier 有不同的实现,对此我们直接来到 pkg/proxy/ipvs/proxier.go 目录下看即可,不需要点,直接看具体方法实现
  2. 在 SyncLoop 里面其实是调用了 syncRunner.Loop 而如果你直接点击,啪,晕了,因为没有具体实现,而这就是我们前面提到的 NewBoundedFrequencyRunner,调用的方法其实是初始化的时候就传递进去了,也就是 内部调用的方法其实是 proxier.syncProxyRules

syncProxyRules

重点来了,本文的重点是它 syncProxyRules

1
2
// This is where all of the ipvs calls happen.
func (proxier *Proxier) syncProxyRules()

注释写的非常清楚,这就是 ipvs 干的全部的事情了,都在里面 600 行左右的代码。当然,如果不是为了研究 ipvs 本身,我觉得你没必要仔细研究里面内部的配置。

无论是 iptablesipvs 其本质是什么?路由规则,或者说路由表。它就是需要知道一个 ip 的路由规则是什么,应该怎么走。围绕这一点,你就可以大致了解到整个方法的内容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

// Build IPVS rules for each service.
for svcPortName, svcPort := range proxier.svcPortMap {
// ...
}

proxier.writeIptablesRules()

proxier.iptablesData.Reset()
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())
proxier.iptablesData.Write(proxier.filterChains.Bytes())
proxier.iptablesData.Write(proxier.filterRules.Bytes())

err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)

if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
}
if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}

metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
}

没错,精简之后就是这部分。其中最为关键的就是 for svcPortName, svcPort := range proxier.svcPortMap { 这个循环了,里面将所有需要 ipvs 的规则找出来了。整个方法的过程可以简单总结为:

  1. 遍历 service、endpoint 配置
  2. 根据需要找到所有需要设置的 ipvs 规则
  3. 设置 ipvs 规则
  4. 同步 iptables 的配置

最终我们能通过 ipvsadm -ln 命令看到它配置的具体情况,当然还能通过 ip addr 命令看到 kube-ipvs 的虚拟网络设备

1
2
3
4
5
$ ipvsadm -ln
.....
TCP 10.94.0.1:443 rr
-> 192.168.51.101:6443
.....

总结一下,其实 kube-proxy 做的核心事情非常简单,就是监听并定期更新 service 和 endpoint 对象所需要配置的路由规则,最终的访问就是通过这些路由规则来完成找到对应服务的。

码后解答

  1. kube-proxy 是什么类型的对象?
    1. 答案是在 kube-system namespace 下的一个 DaemonSet,其实容易想到的,因为每个网络规则的配置最终应该配置到宿主机上,而每一个节点就对应了一个宿主机,而 DaemonSet 正好每个节点一个,非常适合。
  2. ipvs 是由谁来配的?
    1. 当然就是我们的 kube-proxy 组件咯
  3. ipvs 后续如何更新?
    1. 通过监听并定期更新

总结提升

其实从 kube-proxy 你更能体会到 Linux 的强大,无论是 docker 还是 k8s 其实最终极的技术本质都是建立在已有的 Linux 功能上的,无论是这里提到的 ipvs 还是 namespace、cgroup 等等都是。当我们慢慢构建整个系统的过程中,会遇到一个又一个问题,寻找并使用已有的技术手段去解决,同时在规模不断变大,会再次出现问题,然后再寻找更好更优秀的解决方案并优化迭代,这就是软件开发的魅力。

编码上

OOMAdjuster

在 ProxyServer 启动的时候有这样一部分代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (s *ProxyServer) Run() error {
// To help debugging, immediately log version
klog.InfoS("Version info", "version", version.Get())

klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

// TODO(vmarmol): Use container config for this.
var oomAdjuster *oom.OOMAdjuster
if s.Config.OOMScoreAdj != nil {
oomAdjuster = oom.NewOOMAdjuster()
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.Config.OOMScoreAdj)); err != nil {
klog.V(2).InfoS("Failed to apply OOMScore", "err", err)
}
}

你有没有好奇 oomAdjuster 是什么东西,它是做什么用的?

首先你需要承认 kube-proxy,是一个非常重要的组件,因为如果网络不通的话,那么其他服务都是“白给”。所以你需要保证它的正常稳定运行,而我们常见的一个机制 OOM(out of memory) 当内存不够的时候是有概率去 Kill 你的应用的,而 OOMAdjuster 目的就是去调整分数,让进程的优先级提高,让自己变得重要,从而让 OOM 机制先 Kill 别人。

算是一个小的科技,可以记下来,说不定什么时候也能用上。

birthCry

还有一个非常形象的方法命名是 birthCry

1
2
3
4
5
6
7
// Birth Cry after the birth is successful
s.birthCry()


func (s *ProxyServer) birthCry() {
s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeNormal, "Starting", "StartKubeProxy", "")
}

就好像刚出生的孩子的第一声啼哭一样,表示整个系统成功启动了。