【K8s源码分析(五)】-K8s中Pod亲和性调度插件介绍
本次分析参考的K8s版本是v1.27.0。
前言
K8s调度器v1版的的默认的插件都在pkg/scheduler/apis/config/v1/default_plugins.go:30 中,如下
| 1 |  | 
而本次我们主要关注的是InterPodAffinity插件,也就是pod的亲和性调度插件,此处是官方对其的介绍:Pod 间亲和性和反亲和性
在 Kubernetes 中,Pod 亲和性(Pod Affinity)是一种特性,它允许根据特定的规则来约束 Pod 可以调度的节点。使用 Pod 亲和性就可以控制 Pod 应该(或不应该)与某些其他 Pod 在同一个域内运行。这在需要一组 Pod 彼此靠近以优化网络通信或共享资源时非常有用。
Pod 亲和性分为两种类型:
- Pod 亲和性(Pod Affinity):定义了 Pod 应该与具有某些特定标签的 Pod 调度在同一域内。
- Pod 反亲和性(Pod Anti-Affinity):定义了 Pod 不应该与具有某些特定标签的 Pod 调度在同一域内。
注意如果当前正在调度的 Pod 是与其自身具有亲和性的系列中的第一个,则在通过所有其他亲和性检查后允许对其进行调度。这是通过验证集群中没有其他 pod 与该 pod 的命名空间和选择器匹配、该 pod 与其自己的术语匹配以及所选节点与所有请求的拓扑匹配来确定的。这可以确保即使所有 Pod 都指定了 Pod 间关联性,也不会出现死锁。
使用示例
我们现在首先定义一个基准Pod,名为pod-podaffinity-target,定义了它的label为{podenv: pro}并指定它必须放在node1上。
| 1 |  | 
创建完这个Pod之后,我们再定义一个pod,名为pod-podaffinity-target,这个pod与pod-podaffinity-target具有亲和性要求,定义的域为kubernetes.io/hostname,即需要在同一个node上,配置文件如下:
| 1 |  | 
然后我们再定义一个pod,名为pod-podantiaffinity-required,这个pod与pod-podaffinity-target具有反亲和性要求,定义的域为kubernetes.io/hostname,即不能在同一个node上运行,配置文件如下:
| 1 |  | 
全部运行后,得到的所有pod如下,可以看到pod-podaffinity-required 与pod-podaffinity-target 都部署到了node1上,而pod-podantiaffinity-required没有与pod-podaffinity-target 部署在同一个节点上,而是部署在了master节点上。
| 1 |  | 
亲和性细节介绍
下面整体介绍一下配置亲和性关系时的一些细节。
在affinity字段下面需要设置亲和性类型,类型有nodeAffinity、nodeAntiAffinity和podAffinity 、podAntiAffinity,而此次介绍的是podAffinity和podAntiAffinity 。
在podAffinity 和podAntiAffinity下面就需要指定各个级别限制的内容,限制的级别有:
- requiredDuringSchedulingIgnoredDuringExecution:在调度时必须要满足亲和性要求,但是在运行过程中可以无视亲和性要求
- preferredDuringSchedulingIgnoredDuringExecution:其配置的亲和性要求是优选的,但不一定必须满足。在运行过程中也是可以无视。
看名字K8s应该有准备在后面加入在运行过程中有变化时也得遵循的亲和性要求,但是目前还没有支持,可以后续期待一下。
每个限制级别下面就有各个选择器,用来选择检查和哪些pod的亲和性关系,亲和性粒度怎么样。主要有:
- labelSelector:根据label来筛选的pod进行亲和性检查。
- namespaces:直接规定亲和性要求所适用的namespace名字,不标注或值为[] 则表示只在此pod所对应的namespace中生效。
- namespaceSelector:根据标签查询来规定亲和性要求所适用的namespace,和namespaces是或的关系,namespace只需要满足两者其一就可以了。值为{}表示适用于所有namespace ,值为null或不标注着表示只在此pod所对应的namespace中生效。
- topologyKey:确定亲和性处理的范围,kubernetes.io/hostname就代表不能在同一个主机上,还有其他的key,例如:topology.kubernetes.io/zone、topology.kubernetes.io/region、topology.kubernetes.io/os、topology.kubernetes.io/architecture等,也可以自行定义。
- matchLabelKeys、mismatchLabelKeys(在v1.29均为alpha阶段,高于本次讨论版本):主要就是通过label的key值来确定要(或不要)使用的pod
亲和性调度插件代码分析
亲和性调度插件的代码在pkg/scheduler/framework/plugins/interpodaffinity 文件夹下面,其文件包含如下:
| 1 |  | 
plugin.go文件
首先查看其一开始部分的代码:
| 1 |  | 
可以看到它定义了自己的名字,然后检查了自己是否实现了以下插件的接口:
- PreFilter
- Filter
- PreScore
- Score
- EnqueueExtensions
然后看到其自身额外需要的变量也不多,各个需要的变量的含义如下:
- parallelizer:- 类型:parallelize.Parallelizer
- 用途:Parallelizer是一个并行处理工具,它允许调度框架并行地执行某些操作,例如并行地对多个节点进行过滤或打分。这样可以提高调度的效率。
 
- 类型:
- args:- 类型:config.InterPodAffinityArgs
- 用途:args存储了初始化插件时所需的参数。在InterPodAffinity的上下文中,config.InterPodAffinityArgs可能包含与 Pod 亲和性相关的配置,如默认的命名空间选择行为或其他与亲和性规则相关的设置。
 
- 类型:
- sharedLister:- 类型:framework.SharedLister
- 用途:sharedLister是一个共享的列表器(Lister),它提供了对 Kubernetes 资源的访问,如 Pods、Nodes 等。这个列表器允许插件在调度决策过程中获取当前集群状态的视图。
 
- 类型:
- nsLister:- 类型:listersv1.NamespaceLister
- 用途:nsLister是一个专门用于命名空间(Namespace)的列表器。它允许插件获取关于 Kubernetes 命名空间的信息,这在处理跨命名空间的亲和性规则时非常有用。
 
- 类型:
注意到EnqueueExtensions的接口定义如下,即希望对于所有会让pod失败的插件都定义一个EventsToRegister函数,来让系统知道何时可以产生一个事件让失败的pod重新调度。
| 1 |  | 
再查看该插件定义的EventsToRegister函数,可以看到如果使用这个插件,那么如果pod或者Node有添加、删除、label更新等操作都会触发事件,使得将失败的pod进行移入到activeQ或backOffQ中,详见之前的对调度队列的介绍:K8s源码分析(二)-K8s调度队列介绍
| 1 |  | 
再查看其定义的初始化代码New,如下。
| 1 |  | 
filtering.go
filter筛选时都是根据硬约束,即requiredDuringSchedulingIgnoredDuringExecution 的要求来进行筛选。其代码在pkg/scheduler/framework/plugins/interpodaffinity/filtering.go中。
PreFilter
其代码如下,补充了部分注释。
| 1 |  | 
下面具体来分析一下这部分代码。
- 获取了所有节点信息 
- 获取了所有拥有反亲和性 Pod 的节点列表 
- 合并亲和性生效的namespace,函数 - mergeAffinityTermNamespacesIfNotEmpty如下。其做法是将NamespaceSelector中筛选的所有namespace插入到直接规定的namespaces中,从这里也可以看出来上面提到的namespaceSelector和namespaces是或的关系。- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21- // Updates Namespaces with the set of namespaces identified by NamespaceSelector.
 // If successful, NamespaceSelector is set to nil.
 // The assumption is that the term is for an incoming pod, in which case
 // namespaceSelector is either unrolled into Namespaces (and so the selector
 // is set to Nothing()) or is Empty(), which means match everything. Therefore,
 // there when matching against this term, there is no need to lookup the existing
 // pod's namespace labels to match them against term's namespaceSelector explicitly.
 func (pl *InterPodAffinity) mergeAffinityTermNamespacesIfNotEmpty(at *framework.AffinityTerm) error {
 if at.NamespaceSelector.Empty() {
 return nil
 }
 ns, err := pl.nsLister.List(at.NamespaceSelector)
 if err != nil {
 return err
 }
 for _, n := range ns {
 at.Namespaces.Insert(n.Name)
 }
 at.NamespaceSelector = labels.Nothing()
 return nil
 }
- 合并反亲和性生效的namespace,与合并亲和性类似。 
- 得到namespace的label的快照,并保存起来。 
- 查看已经部署运行具有反亲和性约束的pod,查看这些已有的反亲和性约束与要调度的pod之间的约束结果,函数 - getExistingAntiAffinityCounts如下。- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30- // calculates the following for each existing pod on each node:
 // 1. Whether it has PodAntiAffinity
 // 2. Whether any AntiAffinityTerm matches the incoming pod
 func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
 topoMaps := make([]topologyToMatchedTermCount, len(nodes))
 index := int32(-1)
 processNode := func(i int) {
 nodeInfo := nodes[i]
 node := nodeInfo.Node()
 if node == nil {
 klog.ErrorS(nil, "Node not found")
 return
 }
 topoMap := make(topologyToMatchedTermCount)
 for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
 topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1)
 }
 if len(topoMap) != 0 {
 topoMaps[atomic.AddInt32(&index, 1)] = topoMap
 }
 }
 pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name())
 result := make(topologyToMatchedTermCount)
 for i := 0; i <= int(index); i++ {
 result.append(topoMaps[i])
 }
 return result
 }- 首先看一下这个函数的中的topologyToMatchedTermCount的定义,如下 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39- type topologyPair struct {
 key string
 value string
 }
 type topologyToMatchedTermCount map[topologyPair]int64
 // ...
 // updates the topologyToMatchedTermCount map with the specified value
 // for each anti-affinity term matched the target pod.
 func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, value int64) {
 // Check anti-affinity terms.
 for _, t := range terms {
 if t.Matches(pod, nsLabels) {
 m.update(node, t.TopologyKey, value)
 }
 }
 }
 // ...
 // Matches returns true if the pod matches the label selector and namespaces or namespace selector.
 func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set) bool {
 if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) {
 return at.Selector.Matches(labels.Set(pod.Labels))
 }
 return false
 }
 func (m topologyToMatchedTermCount) update(node *v1.Node, tk string, value int64) {
 if tv, ok := node.Labels[tk]; ok {
 pair := topologyPair{key: tk, value: tv}
 m[pair] += value
 // value could be negative, hence we delete the entry if it is down to zero.
 if m[pair] == 0 {
 delete(m, pair)
 }
 }
 }- 可以看到是一个topologyToMatchedTermCount是一个map,不过这里其value一直为1.函数 - getExistingAntiAffinityCounts这里会遍历有反亲和性约束的pod,然后看其该pod反亲和性约束所生效的namespace和所生效的label的范围是不是包含了要调度的pod,如果是就将作用域及作用域的具体值所对应的值加上value(也就是1),然后返回回去。最后得到的结果就例如:对于hostname域为node1的具有反亲和性约束的pod的数量为2,对于region域为east1的具有反亲和性约束的pod的数量为1。如此就将对于pod的反亲和性约束转变为了对于各个域的反亲和性约束。
- 然后查看要调度的pod的亲和性与反亲和性约束,查看已有的pod与该pod的约束结果,函数 - getIncomingAffinityAntiAffinityCounts如下。- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45- // finds existing Pods that match affinity terms of the incoming pod's (anti)affinity terms.
 // It returns a topologyToMatchedTermCount that are checked later by the affinity
 // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
 // need to check all the pods in the cluster.
 func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Context, podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
 affinityCounts := make(topologyToMatchedTermCount)
 antiAffinityCounts := make(topologyToMatchedTermCount)
 if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
 return affinityCounts, antiAffinityCounts
 }
 affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
 antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
 index := int32(-1)
 processNode := func(i int) {
 nodeInfo := allNodes[i]
 node := nodeInfo.Node()
 if node == nil {
 klog.ErrorS(nil, "Node not found")
 return
 }
 affinity := make(topologyToMatchedTermCount)
 antiAffinity := make(topologyToMatchedTermCount)
 for _, existingPod := range nodeInfo.Pods {
 affinity.updateWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1)
 // The incoming pod's terms have the namespaceSelector merged into the namespaces, and so
 // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
 antiAffinity.updateWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1)
 }
 if len(affinity) > 0 || len(antiAffinity) > 0 {
 k := atomic.AddInt32(&index, 1)
 affinityCountsList[k] = affinity
 antiAffinityCountsList[k] = antiAffinity
 }
 }
 pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())
 for i := 0; i <= int(index); i++ {
 affinityCounts.append(affinityCountsList[i])
 antiAffinityCounts.append(antiAffinityCountsList[i])
 }
 return affinityCounts, antiAffinityCounts
 }- 操作的一些细节与6类似,主要就是并行遍历所有的node,然后运行检查每个node时,逐个检查node上的每个pod,然后检查与要调度的pod存在的亲和性、反亲和性约束的结果,并转化为了对各个域的亲和性、反亲和性约束结果,最终汇总返回。 
- 如果与现存的具有反亲和性约束的pod没有约束关系,或者本身也没有亲和性、反亲和性约束,那么就更新状态为可跳过,并返回。不然就写入这个预筛选结果到 - cycleState中
Filter
其代码如下,补充了部分注释。
| 1 |  | 
整体的逻辑很简明,首先是得到预筛选的结果,然后检查该node是否满足要调度的pod的亲和性、反亲和性约束,已经是否满足已运行的pod的反亲和性约束。
查看satisfyPodAffinity函数,如下
| 1 |  | 
在函数中首先遍历了要调度的pod的亲和性约束项,PreFilter转换的亲和性约束的结果查看node所在的域是否包含有亲和性pod,如果没有就不通过。例如要调度的pod的亲和性约束约束了region域,假设这个node的region的值为west(如果没有region域那么也不通过),然后根据PreFilter的结果来看,值为west的region中统计的与其具有亲和性的pod的数量是否大于0,如果是,那么这一项检查就通过了,如果不是就不通过。
然后这里也专门统计了是否存在与其具有亲和性约束的pod,如果完全不存在,那么这个pod可能就是所有亲和性约束的第一个,那么就检查一下当前的pod是否满足自己的亲和性要求,如下,如果通过了也能让其通过这部分的筛选。
| 1 |  | 
再查看satisfyPodAntiAffinity函数,如下
| 1 |  | 
也是类似,检查node所在的域是否有与该要调度的pod具有反亲和性约束的pod,如果有,就不通过筛选。
在查看satisfyExistingPodsAntiAffinity函数,如下:
| 1 |  | 
也是类似,检查node所在的域是否有与该要调度的pod具有反亲和性约束的pod,如果有,就不通过筛选。
scoring.go
打分时则依靠preferredDuringSchedulingIgnoredDuringExecution 来进行倾向性的打分。其代码在pkg/scheduler/framework/plugins/interpodaffinity/scoring.go中。
PreScore
PreScore函数的代码如下,添加了部分注释。
| 1 |  | 
整体的流程如下:
- 获取pod的亲和性即反亲和性优选约束
- 如果pod没有相关的优选约束且系统也指明了要忽略优选约束,那么就直接返回。(如果没有忽略,我们就还需要考虑其他pod的优选倾向)
- 如果pod有亲和性优选或者反亲和性优选约束,那么考虑的node就是所有的node,如果都没有,那么就只考虑带有亲和性约束的pod的node。
- 与PreFilter类似,分别合并亲和性、反亲和性优选约束生效的namespace。
- 获取命名空间快照。
- 通过并行的方式给2中得到的每个node进行打分。- 在打分时首先查看要调度的pod是否有亲和性优选或者反亲和性优选约束,如果有要处理的pod就是node上的所有pod,否则就是带有亲和性约束的pod。 
- 通过 - processExistingPod函数给每个要处理的pod进行打分,该函数如下- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45- func (pl *InterPodAffinity) processExistingPod(
 state *preScoreState,
 existingPod *framework.PodInfo,
 existingPodNodeInfo *framework.NodeInfo,
 incomingPod *v1.Pod,
 topoScore scoreMap,
 ) {
 existingPodNode := existingPodNodeInfo.Node()
 if len(existingPodNode.Labels) == 0 {
 return
 }
 // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
 // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
 // value as that of <existingPods>`s node by the term`s weight.
 // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
 // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
 topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1)
 // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
 // decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
 // value as that of <existingPod>`s node by the term`s weight.
 // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
 // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
 topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1)
 // For every hard pod affinity term of <existingPod>, if <pod> matches the term,
 // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
 // value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
 if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 {
 for _, t := range existingPod.RequiredAffinityTerms {
 topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1)
 }
 }
 // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
 // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
 // value as that of <existingPod>'s node by the term's weight.
 topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1)
 // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
 // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
 // value as that of <existingPod>'s node by the term's weight.
 topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1)
 }- 如注释所述,首先是查看这个已经存在的pod是否满足要调度的pod的亲和性优选约束,这会遍历各个亲和性优选约束,如果满足某个约束,那么就在这个node所属域的值上加上对应亲和性优选约束的weight。其主要的 - processTerms函数如下。例如亲和性优选约束是值为west的region,weight为10,那么如果当前pod也为west的region,那么值为west的region的分数就会加10.- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16- func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
 for _, term := range terms {
 m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier)
 }
 }
 func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
 if term.Matches(pod, nsLabels) {
 if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
 if m[term.TopologyKey] == nil {
 m[term.TopologyKey] = make(map[string]int64)
 }
 m[term.TopologyKey][tpValue] += int64(weight * multiplier)
 }
 }
 }- 然后也会查看各个反亲和性优选约束,这里就是减去对应的比重,都是通过multiplier控制。 - 然后如果 - HardPodAffinityWeight>0还会去查看已经运行的pod的硬亲和性约束,如果匹配其具体的约束,那么该正在运行的pod对应的域的分数就会加上- HardPodAffinityWeight。- 然后还会统计以运行的pod的亲和性优选约束及反亲和性优选约束,如果匹配分数也会加上或者减去对应的权重值。 
- 如果有统计到分数,那么就加入结果中。 
 
- 最后就能得到各类型的域及对应值的分数,并放入到cycleState中。
Score
Score函数的代码如下。
| 1 |  | 
首先是获取PreScore的打分结果,然后匹配当前node的域与打分的域,如果匹配上了,就加上对应的分数。例如PreScore的结果是:
| 1 |  | 
而node在值为east的region,所在的host为node2,那么它的分数就是-5+8=3分。
NormalizeScore
这里再关注一下NormalizeScore函数,如下,它的作用是规范化每个过滤后的节点的分数。
| 1 |  | 
首先是检查了一下PreScore的结果是不是null,如果是就直接返回nil了。然后就是统计分数列别的最大值和最小值,然后使用规范化公式计算浮点分数:系统规定最大分值 * (当前分数 - 最小分数) / (最大分数 - 最小分数) ,并将计算出的浮点分数转换为 int64 类型,并更新节点的分数。