【K8s源码分析(四)】-K8s调度器绑定周期介绍

本次分析参考的K8s版本是v1.27.0

K8s的整体调度框架如下图所示。

Scheduling framework extension points

bindeCycle顶层函数

K8s调度器中绑定周期的函数bindingCyclepkg/scheduler/schedule_one.go:225中,如下,补充了一些注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// bindingCycle tries to bind an assumed Pod.
func (sched *Scheduler) bindingCycle(
ctx context.Context, // 调度上下文
state *framework.CycleState, // 调度周期状态
fwk framework.Framework, // 调度框架
scheduleResult ScheduleResult, // 调度结果
assumedPodInfo *framework.QueuedPodInfo, // 假定的 Pod 信息
start time.Time, // 绑定周期开始时间
podsToActivate *framework.PodsToActivate) *framework.Status { // 待激活的 Pods

assumedPod := assumedPodInfo.Pod // 获取假定的 Pod

// 运行 "permit" 插件,检查是否允许绑定操作
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
return status
}

// 运行 "prebind" 插件,执行绑定前的检查和操作
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}

// 运行 "bind" 插件,实际执行 Pod 到节点的绑定操作
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}

// 日志记录 Pod 绑定成功的信息
klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)

// 更新 Pod 调度的指标
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(assumedPodInfo.InitialAttemptTimestamp))

// 运行 "postbind" 插件,执行绑定后的检查和操作
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)

// 成功绑定周期结束后,如果有必要,将一些 Pods 移动到activeQ队列中
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// 与 schedulingCycle() 中的逻辑不同,我们不删除条目,
// 因为 podsToActivate.Map 不再被消费
}

// 返回 nil 表示没有错误
return nil
}

可以看到我们这里运行了permit、prebind、bind、postbind插件。下面具体来看这几个插件是如何运行的。

Permit插件

其运行函数WaitOnPermitpkg/scheduler/framework/runtime/framework.go:1250中,如下,补充了部分注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
waitingPod := f.waitingPods.get(pod.UID) // 根据pod的id来查看其是否在等待队列中
if waitingPod == nil {
return nil // 如果不在等待队列中,直接返回
}
defer f.waitingPods.remove(pod.UID) // 无论函数如何结束,都从等待列表中移除该 Pod

klog.V(4).InfoS("Pod waiting on permit", "pod", klog.KObj(pod)) // 日志记录 Pod 正在等待许可

startTime := time.Now() // 记录开始等待的时间
s := <-waitingPod.s // 从pod的通道中接收状态
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) // 记录许可等待的持续时间

if !s.IsSuccess() {
if s.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message()) // 日志记录 Pod 在等待许可时被拒绝
return s // 返回拒绝状态
}
err := s.AsError() // 将状态转换为错误
klog.ErrorS(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod)) // 日志记录等待许可失败
return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithFailedPlugin(s.FailedPlugin()) // 返回错误状态
}
return nil // 如果状态成功,则返回 nil 表示没有错误
}

主要流程包括:

  1. 查看等待队列是否包含该pod,如果不包含直接返回
  2. 如果包含,就记录开始等待的时间,并通过通道来进行阻塞等待
  3. 记录许可等待的持续时间。
  4. 检查接收到的状态。如果状态不成功:
    • 如果状态是不可调度的(Unschedulable),记录日志并返回拒绝状态。
    • 如果状态不是成功的,记录错误日志,并返回错误状态。
  5. 如果成功就直接返回。

PreBind插件

该插件对应的执行函数RunPreBindPluginspkg/scheduler/framework/runtime/framework.go:1048中,如下,补充了部分注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// RunPreBindPlugins runs the set of configured prebind plugins. It returns a
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now() // 记录 PreBind 插件开始运行的时间
defer func() {
// 记录 PreBind 插件的运行时间和最终状态
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.preBindPlugins {
// 遍历所有的 PreBind 插件
status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
// 如果插件返回的状态是不可调度的,则记录日志并返回该状态
klog.V(4).InfoS("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name()) // 设置失败的插件名称
return status
}
err := status.AsError() // 将状态转换为错误
klog.ErrorS(err, "Failed running PreBind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
// 如果插件运行失败,记录错误日志并返回错误状态
return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
}
}
return nil // 如果所有插件都成功运行,则返回 nil 表示没有错误
}

主要还是遍历所有的PreBind插件,并通过runPreBindPlugin函数进行运行,如果不成功就需要进行相关记录。runPreBindPlugin 函数在pkg/scheduler/framework/runtime/framework.go:1069

1
2
3
4
5
6
7
8
9
func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
if !state.ShouldRecordPluginMetrics() {
return pl.PreBind(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := pl.PreBind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}

可以看到主要是调用插件的PreBind函数进行检查。

Bind插件

该插件对应的执行函数Bindpkg/scheduler/schedule_one.go:796中,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
defer func() {
sched.finishBinding(fwk, assumed, targetNode, status)
}()

bound, err := sched.extendersBinding(assumed, targetNode)
if bound {
return framework.AsStatus(err)
}
return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
}

可以看到首先会调用extendersBinding这个拓展插件进行运行,其代码在pkg/scheduler/schedule_one.go:809中,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
// TODO(#87159): Move this to a Plugin.
func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (bool, error) {
for _, extender := range sched.Extenders {
if !extender.IsBinder() || !extender.IsInterested(pod) {
continue
}
return true, extender.Bind(&v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
Target: v1.ObjectReference{Kind: "Node", Name: node},
})
}
return false, nil
}

从TODO中可以看到这部分代码在后面可能也变成插件的形式。这里遍历每个拓展,检查看是不是拓展是不是对该pod感兴趣,需要去主动的bind。

如果这些拓展都不处理pod,就会调用RunBindPlugins来进行bind,代码在中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
if len(f.bindPlugins) == 0 {
return framework.NewStatus(framework.Skip, "")
}
for _, pl := range f.bindPlugins {
status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
if status.IsSkip() {
continue
}
if !status.IsSuccess() {
if status.IsUnschedulable() {
klog.V(4).InfoS("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
status.SetFailedPlugin(pl.Name())
return status
}
err := status.AsError()
klog.ErrorS(err, "Failed running Bind plugin", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err))
}
return status
}
return status
}

可以看到他会遍历每一个bindPlug,看这些插件然后去对他进行相关的绑定处理,如果有一个不成功,就会被视为失败,然后进行相关的记录。

每一个binPlug的绑定操作runBindPluginpkg/scheduler/framework/runtime/framework.go:1108中。

1
2
3
4
5
6
7
8
9
func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
if !state.ShouldRecordPluginMetrics() {
return bp.Bind(ctx, state, pod, nodeName)
}
startTime := time.Now()
status := bp.Bind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(metrics.Bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
return status
}

可以看到是调用每个插件的Bind函数。

PostBind插件

该插件对应的执行函数RunPostBindPluginspkg/scheduler/framework/runtime/framework.go:1119中,如下。

1
2
3
4
5
6
7
8
9
10
// RunPostBindPlugins runs the set of configured postbind plugins.
func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range f.postBindPlugins {
f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
}
}

可以看到其会运行每一个postBindPlugins插件。

具体运行的函数runPostBindPluginpkg/scheduler/framework/runtime/framework.go:1129中,如下

1
2
3
4
5
6
7
8
9
func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
if !state.ShouldRecordPluginMetrics() {
pl.PostBind(ctx, state, pod, nodeName)
return
}
startTime := time.Now()
pl.PostBind(ctx, state, pod, nodeName)
f.metricsRecorder.ObservePluginDurationAsync(metrics.PostBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
}

可以看到是调用了每个插件的PostBind函数。


【K8s源码分析(四)】-K8s调度器绑定周期介绍
http://example.com/2024/05/10/k8sSource4/
作者
滑滑蛋
发布于
2024年5月10日
许可协议