【K8s源码分析(五)】-K8s中Pod亲和性调度插件介绍

本次分析参考的K8s版本是v1.27.0

前言

K8s调度器v1版的的默认的插件都在pkg/scheduler/apis/config/v1/default_plugins.go:30 中,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// getDefaultPlugins returns the default set of plugins.
func getDefaultPlugins() *v1.Plugins {
plugins := &v1.Plugins{
MultiPoint: v1.PluginSet{
Enabled: []v1.Plugin{
{Name: names.PrioritySort},
{Name: names.NodeUnschedulable},
{Name: names.NodeName},
{Name: names.TaintToleration, Weight: pointer.Int32(3)},
{Name: names.NodeAffinity, Weight: pointer.Int32(2)},
{Name: names.NodePorts},
{Name: names.NodeResourcesFit, Weight: pointer.Int32(1)},
{Name: names.VolumeRestrictions},
{Name: names.EBSLimits},
{Name: names.GCEPDLimits},
{Name: names.NodeVolumeLimits},
{Name: names.AzureDiskLimits},
{Name: names.VolumeBinding},
{Name: names.VolumeZone},
{Name: names.PodTopologySpread, Weight: pointer.Int32(2)},
{Name: names.InterPodAffinity, Weight: pointer.Int32(2)},
{Name: names.DefaultPreemption},
{Name: names.NodeResourcesBalancedAllocation, Weight: pointer.Int32(1)},
{Name: names.ImageLocality, Weight: pointer.Int32(1)},
{Name: names.DefaultBinder},
},
},
}
applyFeatureGates(plugins)

return plugins
}

而本次我们主要关注的是InterPodAffinity插件,也就是pod的亲和性调度插件,此处是官方对其的介绍:Pod 间亲和性和反亲和性

在 Kubernetes 中,Pod 亲和性(Pod Affinity)是一种特性,它允许根据特定的规则来约束 Pod 可以调度的节点。使用 Pod 亲和性就可以控制 Pod 应该(或不应该)与某些其他 Pod 在同一个域内运行。这在需要一组 Pod 彼此靠近以优化网络通信或共享资源时非常有用。

Pod 亲和性分为两种类型:

  1. Pod 亲和性(Pod Affinity):定义了 Pod 应该与具有某些特定标签的 Pod 调度在同一域内。
  2. Pod 反亲和性(Pod Anti-Affinity):定义了 Pod 不应该与具有某些特定标签的 Pod 调度在同一域内。

注意如果当前正在调度的 Pod 是与其自身具有亲和性的系列中的第一个,则在通过所有其他亲和性检查后允许对其进行调度。这是通过验证集群中没有其他 pod 与该 pod 的命名空间和选择器匹配、该 pod 与其自己的术语匹配以及所选节点与所有请求的拓扑匹配来确定的。这可以确保即使所有 Pod 都指定了 Pod 间关联性,也不会出现死锁。

使用示例

我们现在首先定义一个基准Pod,名为pod-podaffinity-target,定义了它的label为{podenv: pro}并指定它必须放在node1上。

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: v1
kind: Pod
metadata:
name: pod-podaffinity-target
namespace: dev
labels:
podenv: pro #设置标签
spec:
containers:
- name: nginx
image: nginx:1.17.1
nodeName: node1 # 将目标pod名确指定到node1上

创建完这个Pod之后,我们再定义一个pod,名为pod-podaffinity-target,这个pod与pod-podaffinity-target具有亲和性要求,定义的域为kubernetes.io/hostname,即需要在同一个node上,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: v1
kind: Pod
metadata:
name: pod-podaffinity-required
namespace: dev
spec:
containers:
- name: nginx
image: nginx:1.17.1
affinity: #亲和性设置
podAffinity: #设置pod亲和性
requiredDuringSchedulingIgnoredDuringExecution: # 硬限制
- labelSelector:
matchExpressions: # 匹配env的值在["xxx"]中的标签
- key: podenv
operator: In
values: ["pro"]
namespaces:
- dev
namespaceSelector:
matchLabels:
environment: production
topologyKey: kubernetes.io/hostname

然后我们再定义一个pod,名为pod-podantiaffinity-required,这个pod与pod-podaffinity-target具有反亲和性要求,定义的域为kubernetes.io/hostname,即不能在同一个node上运行,配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: v1
kind: Pod
metadata:
name: pod-podantiaffinity-required
namespace: dev
spec:
containers:
- name: nginx
image: nginx:1.17.1
affinity: #亲和性设置
podAntiAffinity: #设置pod亲和性
requiredDuringSchedulingIgnoredDuringExecution: # 硬限制
- labelSelector:
matchExpressions: # 匹配podenv的值在["pro"]中的标签
- key: podenv
operator: In
values: ["pro"]
namespaces:
- dev
namespaceSelector:
matchLabels:
environment: production
topologyKey: kubernetes.io/hostname

全部运行后,得到的所有pod如下,可以看到pod-podaffinity-required 与pod-podaffinity-target 都部署到了node1上,而pod-podantiaffinity-required没有与pod-podaffinity-target 部署在同一个节点上,而是部署在了master节点上。

1
2
3
4
5
# kubectl get po -n dev -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
pod-podaffinity-required 1/1 Running 0 98s 192.168.166.169 node1 <none> <none>
pod-podaffinity-target 1/1 Running 0 2m6s 192.168.166.166 node1 <none> <none>
pod-podantiaffinity-required 1/1 Running 0 87s 192.168.219.106 master <none> <none>

亲和性细节介绍

下面整体介绍一下配置亲和性关系时的一些细节。

affinity字段下面需要设置亲和性类型,类型有nodeAffinitynodeAntiAffinitypodAffinitypodAntiAffinity,而此次介绍的是podAffinitypodAntiAffinity

podAffinitypodAntiAffinity下面就需要指定各个级别限制的内容,限制的级别有:

  • requiredDuringSchedulingIgnoredDuringExecution :在调度时必须要满足亲和性要求,但是在运行过程中可以无视亲和性要求
  • preferredDuringSchedulingIgnoredDuringExecution :其配置的亲和性要求是优选的,但不一定必须满足。在运行过程中也是可以无视。

看名字K8s应该有准备在后面加入在运行过程中有变化时也得遵循的亲和性要求,但是目前还没有支持,可以后续期待一下。

每个限制级别下面就有各个选择器,用来选择检查和哪些pod的亲和性关系,亲和性粒度怎么样。主要有:

  • labelSelector:根据label来筛选的pod进行亲和性检查。
  • namespaces:直接规定亲和性要求所适用的namespace名字,不标注或值为[] 则表示只在此pod所对应的namespace中生效。
  • namespaceSelector:根据标签查询来规定亲和性要求所适用的namespace,和namespaces是或的关系,namespace只需要满足两者其一就可以了。值为{}表示适用于所有namespace ,值为null或不标注着表示只在此pod所对应的namespace中生效。
  • topologyKey:确定亲和性处理的范围,kubernetes.io/hostname就代表不能在同一个主机上,还有其他的key,例如:topology.kubernetes.io/zonetopology.kubernetes.io/regiontopology.kubernetes.io/ostopology.kubernetes.io/architecture等,也可以自行定义。
  • matchLabelKeys、mismatchLabelKeys(在v1.29均为alpha阶段,高于本次讨论版本):主要就是通过label的key值来确定要(或不要)使用的pod

亲和性调度插件代码分析

亲和性调度插件的代码在pkg/scheduler/framework/plugins/interpodaffinity 文件夹下面,其文件包含如下:

1
2
3
4
5
6
7
interpodaffinity/
├── filtering.go
├── filtering_test.go
├── plugin.go
├── plugin_test.go
├── scoring.go
└── scoring_test.go

plugin.go文件

首先查看其一开始部分的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = names.InterPodAffinity

var _ framework.PreFilterPlugin = &InterPodAffinity{}
var _ framework.FilterPlugin = &InterPodAffinity{}
var _ framework.PreScorePlugin = &InterPodAffinity{}
var _ framework.ScorePlugin = &InterPodAffinity{}
var _ framework.EnqueueExtensions = &InterPodAffinity{}

// InterPodAffinity is a plugin that checks inter pod affinity
type InterPodAffinity struct {
parallelizer parallelize.Parallelizer
args config.InterPodAffinityArgs
sharedLister framework.SharedLister
nsLister listersv1.NamespaceLister
}

可以看到它定义了自己的名字,然后检查了自己是否实现了以下插件的接口:

  • PreFilter
  • Filter
  • PreScore
  • Score
  • EnqueueExtensions

然后看到其自身额外需要的变量也不多,各个需要的变量的含义如下:

  1. parallelizer:
    • 类型:parallelize.Parallelizer
    • 用途:Parallelizer 是一个并行处理工具,它允许调度框架并行地执行某些操作,例如并行地对多个节点进行过滤或打分。这样可以提高调度的效率。
  2. args:
    • 类型:config.InterPodAffinityArgs
    • 用途:args 存储了初始化插件时所需的参数。在 InterPodAffinity 的上下文中,config.InterPodAffinityArgs 可能包含与 Pod 亲和性相关的配置,如默认的命名空间选择行为或其他与亲和性规则相关的设置。
  3. sharedLister:
    • 类型:framework.SharedLister
    • 用途:sharedLister 是一个共享的列表器(Lister),它提供了对 Kubernetes 资源的访问,如 Pods、Nodes 等。这个列表器允许插件在调度决策过程中获取当前集群状态的视图。
  4. nsLister:
    • 类型:listersv1.NamespaceLister
    • 用途:nsLister 是一个专门用于命名空间(Namespace)的列表器。它允许插件获取关于 Kubernetes 命名空间的信息,这在处理跨命名空间的亲和性规则时非常有用。

注意到EnqueueExtensions的接口定义如下,即希望对于所有会让pod失败的插件都定义一个EventsToRegister函数,来让系统知道何时可以产生一个事件让失败的pod重新调度。

1
2
3
4
5
6
7
8
9
10
11
12
// EnqueueExtensions is an optional interface that plugins can implement to efficiently
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
// EventsToRegister returns a series of possible events that may cause a Pod
// failed by this plugin schedulable.
// The events will be registered when instantiating the internal scheduling queue,
// and leveraged to build event handlers dynamically.
// Note: the returned list needs to be static (not depend on configuration parameters);
// otherwise it would lead to undefined behavior.
EventsToRegister() []ClusterEvent
}

再查看该插件定义的EventsToRegister函数,可以看到如果使用这个插件,那么如果pod或者Node有添加、删除、label更新等操作都会触发事件,使得将失败的pod进行移入到activeQ或backOffQ中,详见之前的对调度队列的介绍:K8s源码分析(二)-K8s调度队列介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// EventsToRegister returns the possible events that may make a failed Pod
// schedulable
func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
// All ActionType includes the following events:
// - Delete. An unschedulable Pod may fail due to violating an existing Pod's anti-affinity constraints,
// deleting an existing Pod may make it schedulable.
// - Update. Updating on an existing Pod's labels (e.g., removal) may make
// an unschedulable Pod schedulable.
// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints,
// adding an assigned Pod may make it schedulable.
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
}
}

再查看其定义的初始化代码New,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// New initializes a new plugin and returns it.
func New(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
if h.SnapshotSharedLister() == nil {
return nil, fmt.Errorf("SnapshotSharedlister is nil")
}
args, err := getArgs(plArgs)
if err != nil {
return nil, err
}
if err := validation.ValidateInterPodAffinityArgs(nil, &args); err != nil {
return nil, err
}
pl := &InterPodAffinity{
parallelizer: h.Parallelizer(),
args: args,
sharedLister: h.SnapshotSharedLister(),
nsLister: h.SharedInformerFactory().Core().V1().Namespaces().Lister(),
}

return pl, nil
}

filtering.go

filter筛选时都是根据硬约束,即requiredDuringSchedulingIgnoredDuringExecution 的要求来进行筛选。其代码在pkg/scheduler/framework/plugins/interpodaffinity/filtering.go中。

PreFilter

其代码如下,补充了部分注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// PreFilter invoked at the prefilter extension point.
func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// 定义几个变量,用于存储所有节点信息和具有必需的反亲和性 Pod 的节点信息
var allNodes []*framework.NodeInfo
var nodesWithRequiredAntiAffinityPods []*framework.NodeInfo
var err error

// 获取所有节点信息
if allNodes, err = pl.sharedLister.NodeInfos().List(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos: %w", err))
}
// 获取具有必需的反亲和性 Pod 的节点列表
if nodesWithRequiredAntiAffinityPods, err = pl.sharedLister.NodeInfos().HavePodsWithRequiredAntiAffinityList(); err != nil {
return nil, framework.AsStatus(fmt.Errorf("failed to list NodeInfos with pods with affinity: %w", err))
}

// 初始化预过滤状态
s := &preFilterState{}

// 从 Pod 中解析 Pod 信息
if s.podInfo, err = framework.NewPodInfo(pod); err != nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("parsing pod: %+v", err))
}

// 处理 Pod 亲和性项的命名空间
for i := range s.podInfo.RequiredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAffinityTerms[i]); err != nil {
return nil, framework.AsStatus(err)
}
}
// 处理 Pod 反亲和性项的命名空间
for i := range s.podInfo.RequiredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&s.podInfo.RequiredAntiAffinityTerms[i]); err != nil {
return nil, framework.AsStatus(err)
}
}
// 获取命名空间标签的快照
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)

// 获取现有反亲和性计数
s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(ctx, pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods)
// 获取即将到来的亲和性和反亲和性计数
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes)

// 如果没有现有的反亲和性计数,并且 Pod 没有必需的亲和性或反亲和性项,则跳过
if len(s.existingAntiAffinityCounts) == 0 && len(s.podInfo.RequiredAffinityTerms) == 0 && len(s.podInfo.RequiredAntiAffinityTerms) == 0 {
return nil, framework.NewStatus(framework.Skip)
}

// 将预过滤状态写入周期状态
cycleState.Write(preFilterStateKey, s)
// 函数返回 nil,表示不需要进一步处理
return nil, nil
}

下面具体来分析一下这部分代码。

  1. 获取了所有节点信息

  2. 获取了所有拥有反亲和性 Pod 的节点列表

  3. 合并亲和性生效的namespace,函数mergeAffinityTermNamespacesIfNotEmpty如下。其做法是将NamespaceSelector中筛选的所有namespace插入到直接规定的namespaces中,从这里也可以看出来上面提到的namespaceSelector和namespaces是或的关系。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // Updates Namespaces with the set of namespaces identified by NamespaceSelector.
    // If successful, NamespaceSelector is set to nil.
    // The assumption is that the term is for an incoming pod, in which case
    // namespaceSelector is either unrolled into Namespaces (and so the selector
    // is set to Nothing()) or is Empty(), which means match everything. Therefore,
    // there when matching against this term, there is no need to lookup the existing
    // pod's namespace labels to match them against term's namespaceSelector explicitly.
    func (pl *InterPodAffinity) mergeAffinityTermNamespacesIfNotEmpty(at *framework.AffinityTerm) error {
    if at.NamespaceSelector.Empty() {
    return nil
    }
    ns, err := pl.nsLister.List(at.NamespaceSelector)
    if err != nil {
    return err
    }
    for _, n := range ns {
    at.Namespaces.Insert(n.Name)
    }
    at.NamespaceSelector = labels.Nothing()
    return nil
    }
  4. 合并反亲和性生效的namespace,与合并亲和性类似。

  5. 得到namespace的label的快照,并保存起来。

  6. 查看已经部署运行具有反亲和性约束的pod,查看这些已有的反亲和性约束与要调度的pod之间的约束结果,函数getExistingAntiAffinityCounts如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // calculates the following for each existing pod on each node:
    // 1. Whether it has PodAntiAffinity
    // 2. Whether any AntiAffinityTerm matches the incoming pod
    func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
    topoMaps := make([]topologyToMatchedTermCount, len(nodes))
    index := int32(-1)
    processNode := func(i int) {
    nodeInfo := nodes[i]
    node := nodeInfo.Node()
    if node == nil {
    klog.ErrorS(nil, "Node not found")
    return
    }
    topoMap := make(topologyToMatchedTermCount)
    for _, existingPod := range nodeInfo.PodsWithRequiredAntiAffinity {
    topoMap.updateWithAntiAffinityTerms(existingPod.RequiredAntiAffinityTerms, pod, nsLabels, node, 1)
    }
    if len(topoMap) != 0 {
    topoMaps[atomic.AddInt32(&index, 1)] = topoMap
    }
    }
    pl.parallelizer.Until(ctx, len(nodes), processNode, pl.Name())

    result := make(topologyToMatchedTermCount)
    for i := 0; i <= int(index); i++ {
    result.append(topoMaps[i])
    }

    return result
    }

    首先看一下这个函数的中的topologyToMatchedTermCount的定义,如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    type topologyPair struct {
    key string
    value string
    }
    type topologyToMatchedTermCount map[topologyPair]int64

    // ...

    // updates the topologyToMatchedTermCount map with the specified value
    // for each anti-affinity term matched the target pod.
    func (m topologyToMatchedTermCount) updateWithAntiAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, value int64) {
    // Check anti-affinity terms.
    for _, t := range terms {
    if t.Matches(pod, nsLabels) {
    m.update(node, t.TopologyKey, value)
    }
    }
    }

    // ...

    // Matches returns true if the pod matches the label selector and namespaces or namespace selector.
    func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set) bool {
    if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) {
    return at.Selector.Matches(labels.Set(pod.Labels))
    }
    return false
    }

    func (m topologyToMatchedTermCount) update(node *v1.Node, tk string, value int64) {
    if tv, ok := node.Labels[tk]; ok {
    pair := topologyPair{key: tk, value: tv}
    m[pair] += value
    // value could be negative, hence we delete the entry if it is down to zero.
    if m[pair] == 0 {
    delete(m, pair)
    }
    }
    }

    可以看到是一个topologyToMatchedTermCount是一个map,不过这里其value一直为1.函数getExistingAntiAffinityCounts这里会遍历有反亲和性约束的pod,然后看其该pod反亲和性约束所生效的namespace和所生效的label的范围是不是包含了要调度的pod,如果是就将作用域及作用域的具体值所对应的值加上value(也就是1),然后返回回去。最后得到的结果就例如:对于hostname域为node1的具有反亲和性约束的pod的数量为2,对于region域为east1的具有反亲和性约束的pod的数量为1。如此就将对于pod的反亲和性约束转变为了对于各个域的反亲和性约束。

  7. 然后查看要调度的pod的亲和性与反亲和性约束,查看已有的pod与该pod的约束结果,函数getIncomingAffinityAntiAffinityCounts如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // finds existing Pods that match affinity terms of the incoming pod's (anti)affinity terms.
    // It returns a topologyToMatchedTermCount that are checked later by the affinity
    // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
    // need to check all the pods in the cluster.
    func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Context, podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
    affinityCounts := make(topologyToMatchedTermCount)
    antiAffinityCounts := make(topologyToMatchedTermCount)
    if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
    return affinityCounts, antiAffinityCounts
    }

    affinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
    antiAffinityCountsList := make([]topologyToMatchedTermCount, len(allNodes))
    index := int32(-1)
    processNode := func(i int) {
    nodeInfo := allNodes[i]
    node := nodeInfo.Node()
    if node == nil {
    klog.ErrorS(nil, "Node not found")
    return
    }
    affinity := make(topologyToMatchedTermCount)
    antiAffinity := make(topologyToMatchedTermCount)
    for _, existingPod := range nodeInfo.Pods {
    affinity.updateWithAffinityTerms(podInfo.RequiredAffinityTerms, existingPod.Pod, node, 1)
    // The incoming pod's terms have the namespaceSelector merged into the namespaces, and so
    // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
    antiAffinity.updateWithAntiAffinityTerms(podInfo.RequiredAntiAffinityTerms, existingPod.Pod, nil, node, 1)
    }

    if len(affinity) > 0 || len(antiAffinity) > 0 {
    k := atomic.AddInt32(&index, 1)
    affinityCountsList[k] = affinity
    antiAffinityCountsList[k] = antiAffinity
    }
    }
    pl.parallelizer.Until(ctx, len(allNodes), processNode, pl.Name())

    for i := 0; i <= int(index); i++ {
    affinityCounts.append(affinityCountsList[i])
    antiAffinityCounts.append(antiAffinityCountsList[i])
    }

    return affinityCounts, antiAffinityCounts
    }

    操作的一些细节与6类似,主要就是并行遍历所有的node,然后运行检查每个node时,逐个检查node上的每个pod,然后检查与要调度的pod存在的亲和性、反亲和性约束的结果,并转化为了对各个域的亲和性、反亲和性约束结果,最终汇总返回。

  8. 如果与现存的具有反亲和性约束的pod没有约束关系,或者本身也没有亲和性、反亲和性约束,那么就更新状态为可跳过,并返回。不然就写入这个预筛选结果到cycleState

Filter

其代码如下,补充了部分注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Filter invoked at the filter extension point.
// It checks if a pod can be scheduled on the specified node with pod affinity/anti-affinity configuration.
func (pl *InterPodAffinity) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 检查给定的节点信息是否有效,如果节点为空,则返回错误状态
if nodeInfo.Node() == nil {
return framework.NewStatus(framework.Error, "node not found")
}

// 获取调度前的过滤状态信息,如果获取失败,则返回错误状态
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.AsStatus(err)
}

// 检查是否满足 Pod 的亲和性规则,如果不满足,则返回不可调度且不可解决的状态
if !satisfyPodAffinity(state, nodeInfo) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonAffinityRulesNotMatch)
}

// 检查是否满足 Pod 的反亲和性规则,如果不满足,则返回不可调度的状态
if !satisfyPodAntiAffinity(state, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonAntiAffinityRulesNotMatch)
}

// 检查是否满足现有 Pod 的反亲和性规则,如果不满足,则返回不可调度的状态
if !satisfyExistingPodsAntiAffinity(state, nodeInfo) {
return framework.NewStatus(framework.Unschedulable, ErrReasonExistingAntiAffinityRulesNotMatch)
}

// 如果所有检查都通过,则返回 nil,表示 Pod 可以被调度到该节点
return nil
}

整体的逻辑很简明,首先是得到预筛选的结果,然后检查该node是否满足要调度的pod的亲和性、反亲和性约束,已经是否满足已运行的pod的反亲和性约束。

查看satisfyPodAffinity函数,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Checks if the node satisfies the incoming pod's affinity rules.
func satisfyPodAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
podsExist := true
for _, term := range state.podInfo.RequiredAffinityTerms {
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.affinityCounts[tp] <= 0 {
podsExist = false
}
} else {
// All topology labels must exist on the node.
return false
}
}

if !podsExist {
// This pod may be the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod, the pod matches
// its own terms, and the node has all the requested topologies, then we allow the pod
// to pass the affinity check.
if len(state.affinityCounts) == 0 && podMatchesAllAffinityTerms(state.podInfo.RequiredAffinityTerms, state.podInfo.Pod) {
return true
}
return false
}
return true
}

在函数中首先遍历了要调度的pod的亲和性约束项,PreFilter转换的亲和性约束的结果查看node所在的域是否包含有亲和性pod,如果没有就不通过。例如要调度的pod的亲和性约束约束了region域,假设这个node的region的值为west(如果没有region域那么也不通过),然后根据PreFilter的结果来看,值为west的region中统计的与其具有亲和性的pod的数量是否大于0,如果是,那么这一项检查就通过了,如果不是就不通过。

然后这里也专门统计了是否存在与其具有亲和性约束的pod,如果完全不存在,那么这个pod可能就是所有亲和性约束的第一个,那么就检查一下当前的pod是否满足自己的亲和性要求,如下,如果通过了也能让其通过这部分的筛选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// returns true IFF the given pod matches all the given terms.
func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) bool {
if len(terms) == 0 {
return false
}
for _, t := range terms {
// The incoming pod NamespaceSelector was merged into the Namespaces set, and so
// we are not explicitly passing in namespace labels.
if !t.Matches(pod, nil) {
return false
}
}
return true
}

再查看satisfyPodAntiAffinity函数,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Checks if the node satisfies the incoming pod's anti-affinity rules.
func satisfyPodAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
if len(state.antiAffinityCounts) > 0 {
for _, term := range state.podInfo.RequiredAntiAffinityTerms {
if topologyValue, ok := nodeInfo.Node().Labels[term.TopologyKey]; ok {
tp := topologyPair{key: term.TopologyKey, value: topologyValue}
if state.antiAffinityCounts[tp] > 0 {
return false
}
}
}
}
return true
}

也是类似,检查node所在的域是否有与该要调度的pod具有反亲和性约束的pod,如果有,就不通过筛选。

在查看satisfyExistingPodsAntiAffinity函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Checks if scheduling the pod onto this node would break any anti-affinity
// terms indicated by the existing pods.
func satisfyExistingPodsAntiAffinity(state *preFilterState, nodeInfo *framework.NodeInfo) bool {
if len(state.existingAntiAffinityCounts) > 0 {
// Iterate over topology pairs to get any of the pods being affected by
// the scheduled pod anti-affinity terms
for topologyKey, topologyValue := range nodeInfo.Node().Labels {
tp := topologyPair{key: topologyKey, value: topologyValue}
if state.existingAntiAffinityCounts[tp] > 0 {
return false
}
}
}
return true
}

也是类似,检查node所在的域是否有与该要调度的pod具有反亲和性约束的pod,如果有,就不通过筛选。

scoring.go

打分时则依靠preferredDuringSchedulingIgnoredDuringExecution 来进行倾向性的打分。其代码在pkg/scheduler/framework/plugins/interpodaffinity/scoring.go中。

PreScore

PreScore函数的代码如下,添加了部分注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// PreScore builds and writes cycle state used by Score and NormalizeScore.
func (pl *InterPodAffinity) PreScore(
pCtx context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
nodes []*v1.Node,
) *framework.Status {
// 如果没有节点可供打分,则直接返回 nil。
if len(nodes) == 0 {
return nil
}

// 检查 sharedLister 是否为空,如果是,则返回错误状态。
if pl.sharedLister == nil {
return framework.NewStatus(framework.Error, "empty shared lister in InterPodAffinity PreScore")
}

// 获取 Pod 的亲和性配置。
affinity := pod.Spec.Affinity
// 检查是否存在亲和性约束。
hasPreferredAffinityConstraints := affinity != nil && affinity.PodAffinity != nil && len(affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0
// 检查是否存在反亲和性约束。
hasPreferredAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil && len(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0

// 如果没有首选的亲和性或反亲和性约束,并且配置指示忽略现有 Pod 的首选术语,
// 则写入一个空的 topologyScore 映射并返回 nil。
if pl.args.IgnorePreferredTermsOfExistingPods && !hasPreferredAffinityConstraints && !hasPreferredAntiAffinityConstraints {
cycleState.Write(preScoreStateKey, &preScoreState{
topologyScore: make(map[string]map[string]int64),
})
return nil
}

var allNodes []*framework.NodeInfo
var err error
// 如果有亲和性或者反亲和性约束
if hasPreferredAffinityConstraints || hasPreferredAntiAffinityConstraints {
// 获取所有节点的信息
allNodes, err = pl.sharedLister.NodeInfos().List()
if err != nil {
// 如果从 sharedLister 获取所有节点失败,则返回错误状态。
return framework.AsStatus(fmt.Errorf("failed to get all nodes from shared lister: %w", err))
}
} else {
// 如果没有亲和性也没用反亲和性约束,则只获取有亲和性pod的node
allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
if err != nil {
// 如果从 sharedLister 获取有亲和性 Pod 的节点列表失败,则返回错误状态。
return framework.AsStatus(fmt.Errorf("failed to get pods with affinity list: %w", err))
}
}

// 初始化 preScoreState 结构体,用于存储拓扑打分信息。
state := &preScoreState{
topologyScore: make(map[string]map[string]int64),
}

// 尝试为 Pod 创建 PodInfo,如果失败,则返回错误状态。
if state.podInfo, err = framework.NewPodInfo(pod); err != nil {
// 理论上我们不会到这里,因为错误会在 PreFilter 中被捕获。
return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", err))
}

// 处理 Pod 的首选亲和性术语,如果合并命名空间失败,则返回错误状态。
for i := range state.podInfo.PreferredAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAffinityTerms[i].AffinityTerm); err != nil {
return framework.AsStatus(fmt.Errorf("updating PreferredAffinityTerms: %w", err))
}
}
// 处理 Pod 的首选反亲和性术语,如果合并命名空间失败,则返回错误状态。
for i := range state.podInfo.PreferredAntiAffinityTerms {
if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAntiAffinityTerms[i].AffinityTerm); err != nil {
return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err))
}
}
// 获取命名空间标签的快照。
state.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)

// 初始化一个列表来存储每个节点的拓扑打分信息。
topoScores := make([]scoreMap, len(allNodes))
index := int32(-1)
processNode := func(i int) {
nodeInfo := allNodes[i]
if nodeInfo.Node() == nil {
return
}
// 如果 Pod 没有首选的亲和性术语,我们只需要处理节点上的有亲和性 Pod。
podsToProcess := nodeInfo.PodsWithAffinity
if hasPreferredAffinityConstraints || hasPreferredAntiAffinityConstraints {
// 如果 Pod 有首选的亲和性或反亲和性术语,我们需要处理节点上的所有 Pod。
podsToProcess = nodeInfo.Pods
}

// 初始化当前节点的拓扑打分映射。
topoScore := make(scoreMap)
// 遍历当前节点上的 Pod,处理现有 Pod 并计算拓扑打分。
for _, existingPod := range podsToProcess {
pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
}
// 如果当前节点有拓扑打分,则将其添加到 topoScores 列表中。
if len(topoScore) > 0 {
topoScores[atomic.AddInt32(&index, 1)] = topoScore
}
}
// 并行处理所有节点。
pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name())

// 将所有节点的拓扑打分信息累加到 state 中。
for i := 0; i <= int(index); i++ {
state.topologyScore.append(topoScores[i])
}

// 将构建好的周期状态写入 cycleState。
cycleState.Write(preScoreStateKey, state)
// 返回 nil 表示成功。
return nil
}

整体的流程如下:

  1. 获取pod的亲和性即反亲和性优选约束
  2. 如果pod没有相关的优选约束且系统也指明了要忽略优选约束,那么就直接返回。(如果没有忽略,我们就还需要考虑其他pod的优选倾向)
  3. 如果pod有亲和性优选或者反亲和性优选约束,那么考虑的node就是所有的node,如果都没有,那么就只考虑带有亲和性约束的pod的node。
  4. 与PreFilter类似,分别合并亲和性、反亲和性优选约束生效的namespace。
  5. 获取命名空间快照。
  6. 通过并行的方式给2中得到的每个node进行打分。
    1. 在打分时首先查看要调度的pod是否有亲和性优选或者反亲和性优选约束,如果有要处理的pod就是node上的所有pod,否则就是带有亲和性约束的pod。

    2. 通过processExistingPod函数给每个要处理的pod进行打分,该函数如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      func (pl *InterPodAffinity) processExistingPod(
      state *preScoreState,
      existingPod *framework.PodInfo,
      existingPodNodeInfo *framework.NodeInfo,
      incomingPod *v1.Pod,
      topoScore scoreMap,
      ) {
      existingPodNode := existingPodNodeInfo.Node()
      if len(existingPodNode.Labels) == 0 {
      return
      }

      // For every soft pod affinity term of <pod>, if <existingPod> matches the term,
      // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
      // value as that of <existingPods>`s node by the term`s weight.
      // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
      // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
      topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1)

      // For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
      // decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
      // value as that of <existingPod>`s node by the term`s weight.
      // Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
      // here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
      topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1)

      // For every hard pod affinity term of <existingPod>, if <pod> matches the term,
      // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
      // value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
      if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 {
      for _, t := range existingPod.RequiredAffinityTerms {
      topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1)
      }
      }

      // For every soft pod affinity term of <existingPod>, if <pod> matches the term,
      // increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
      // value as that of <existingPod>'s node by the term's weight.
      topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1)

      // For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
      // decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
      // value as that of <existingPod>'s node by the term's weight.
      topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1)
      }

      如注释所述,首先是查看这个已经存在的pod是否满足要调度的pod的亲和性优选约束,这会遍历各个亲和性优选约束,如果满足某个约束,那么就在这个node所属域的值上加上对应亲和性优选约束的weight。其主要的processTerms函数如下。例如亲和性优选约束是值为west的region,weight为10,那么如果当前pod也为west的region,那么值为west的region的分数就会加10.

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
      for _, term := range terms {
      m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier)
      }
      }

      func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
      if term.Matches(pod, nsLabels) {
      if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
      if m[term.TopologyKey] == nil {
      m[term.TopologyKey] = make(map[string]int64)
      }
      m[term.TopologyKey][tpValue] += int64(weight * multiplier)
      }
      }
      }

      然后也会查看各个反亲和性优选约束,这里就是减去对应的比重,都是通过multiplier控制。

      然后如果HardPodAffinityWeight>0还会去查看已经运行的pod的硬亲和性约束,如果匹配其具体的约束,那么该正在运行的pod对应的域的分数就会加上HardPodAffinityWeight

      然后还会统计以运行的pod的亲和性优选约束及反亲和性优选约束,如果匹配分数也会加上或者减去对应的权重值。

    3. 如果有统计到分数,那么就加入结果中。

  7. 最后就能得到各类型的域及对应值的分数,并放入到cycleState中。

Score

Score函数的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Score invoked at the Score extension point.
// The "score" returned in this function is the sum of weights got from cycleState which have its topologyKey matching with the node's labels.
// it is normalized later.
// Note: the returned "score" is positive for pod-affinity, and negative for pod-antiaffinity.
func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil {
return 0, framework.AsStatus(fmt.Errorf("failed to get node %q from Snapshot: %w", nodeName, err))
}
node := nodeInfo.Node()

s, err := getPreScoreState(cycleState)
if err != nil {
return 0, framework.AsStatus(err)
}
var score int64
for tpKey, tpValues := range s.topologyScore {
if v, exist := node.Labels[tpKey]; exist {
score += tpValues[v]
}
}

return score, nil
}

首先是获取PreScore的打分结果,然后匹配当前node的域与打分的域,如果匹配上了,就加上对应的分数。例如PreScore的结果是:

1
{{{region,east}:-5}, {{region,west}:10}, {{hostname,node1}:-10}, {{hostname,node2}:8}}

而node在值为east的region,所在的host为node2,那么它的分数就是-5+8=3分。

NormalizeScore

这里再关注一下NormalizeScore函数,如下,它的作用是规范化每个过滤后的节点的分数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// NormalizeScore normalizes the score for each filteredNode.
func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// 从调度周期状态中获取预打分状态,如果获取失败,则返回错误状态。
s, err := getPreScoreState(cycleState)
if err != nil {
return framework.AsStatus(err)
}
// 如果拓扑打分映射为空,则直接返回 nil,表示没有分数需要规范化。
if len(s.topologyScore) == 0 {
return nil
}

// 初始化最小和最大计数变量为 int64 类型的最大和最小值。
var minCount int64 = math.MaxInt64
var maxCount int64 = math.MinInt64

// 遍历所有节点的分数,找出最大和最小分数。
for i := range scores {
score := scores[i].Score
// 如果当前分数大于已知的最大分数,则更新最大分数。
if score > maxCount {
maxCount = score
}
// 如果当前分数小于已知的最小分数,则更新最小分数。
if score < minCount {
minCount = score
}
}

// 计算最大分数和最小分数之间的差异。
maxMinDiff := maxCount - minCount

// 遍历所有节点的分数,进行规范化处理。
for i := range scores {
fScore := float64(0) // 初始化浮点分数为 0。

// 如果最大分数和最小分数之间的差异大于 0,则计算规范化后的分数。
if maxMinDiff > 0 {
// 使用规范化公式计算浮点分数:系统规定最大分值 * (当前分数 - 最小分数) / (最大分数 - 最小分数)。
fScore = float64(framework.MaxNodeScore) * (float64(scores[i].Score-minCount) / float64(maxMinDiff))
}

// 将计算出的浮点分数转换为 int64 类型,并更新节点的分数。
scores[i].Score = int64(fScore)
}

// 返回 nil,表示分数规范化成功完成。
return nil
}

首先是检查了一下PreScore的结果是不是null,如果是就直接返回nil了。然后就是统计分数列别的最大值和最小值,然后使用规范化公式计算浮点分数:系统规定最大分值 * (当前分数 - 最小分数) / (最大分数 - 最小分数) ,并将计算出的浮点分数转换为 int64 类型,并更新节点的分数。


【K8s源码分析(五)】-K8s中Pod亲和性调度插件介绍
http://example.com/2024/05/11/k8sSource5/
作者
John Doe
发布于
2024年5月11日
许可协议