【k8s kubelet 源代码阅读(二)】-节点状态上报

主处理流程

在kubelet的Run中会调用相应的代码进行节点状态更新,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if kl.kubeClient != nil {
// Start two go-routines to update the status.
//
// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
// once the node becomes ready, then exits afterwards.
//
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
go func() {
// Call updateRuntimeUp once before syncNodeStatus to make sure kubelet had already checked runtime state
// otherwise when restart kubelet, syncNodeStatus will report node notReady in first report period
kl.updateRuntimeUp()
wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
}()

go kl.fastStatusUpdateOnce()

// start syncing lease
go kl.nodeLeaseController.Run(context.Background())

// Mirror pods for static pods may not be created immediately during node startup
// due to node registration or informer sync delays. They will be created eventually
// when static pods are resynced (every 1-1.5 minutes).
// To ensure kube-scheduler is aware of static pod resource usage faster,
// mirror pods are created as soon as the node registers.
go kl.fastStaticPodsRegistration(ctx)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

updateRuntimeUp

它在最开始初始化时会被调用,后面也会被周期性地调用(每 5 秒一次),主要职责是:检查底层容器运行时的健康状况,并根据检查结果更新 Kubelet 内部的状态。当容器运行时首次准备就绪时,它还会触发一系列依赖于运行时的模块的初始化。其主要流程为:

  1. 通过 CRI (Container Runtime Interface) 调用底层容器运行时(如 containerd 或 CRI-O)的 Status 接口,获取一个包含运行时当前状态信息的结构体 s。

  2. 通过s中的NetworkReady来检查网络状态是否ready

  3. 通过s中的RuntimeReady来检查运行时是否ready

  4. 当 RuntimeReady 为 true 时才会去更新内部缓存,并初始化那些依赖容器运行时的模块(只会被初始调用一次),比如 cAdvisor(用于监控)、ContainerManager(用于资源管理)、EvictionManager(驱逐管理器)、pluginManager、shutdownManager等。这个设计确保了这些模块只在容器运行时真正可用后才被启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
// update the container runtime uptime in the kubelet runtimeState.
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
ctx := context.Background()

s, err := kl.containerRuntime.Status(ctx)
if err != nil {
klog.ErrorS(err, "Container runtime sanity check failed")
return
}
if s == nil {
klog.ErrorS(nil, "Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
klogErrorS := klog.ErrorS
if !kl.containerRuntimeReadyExpected {
klogErrorS = klog.V(4).ErrorS
}
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
klogErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// information in RuntimeReady condition will be propagated to NodeReady condition.
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
klogErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}

kl.runtimeState.setRuntimeState(nil)
kl.runtimeState.setRuntimeHandlers(s.Handlers)
kl.runtimeState.setRuntimeFeatures(s.Features)
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

syncNodeStatus

其首先会去registerWithAPIServer,然后再updateNodeStatus。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master if there is any change or enough time
// passed from the last sync, registering the kubelet first if necessary.
func (kl *Kubelet) syncNodeStatus() {
kl.syncNodeStatusMux.Lock()
defer kl.syncNodeStatusMux.Unlock()
ctx := context.Background()

if kl.kubeClient == nil || kl.heartbeatClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithAPIServer()
}
if err := kl.updateNodeStatus(ctx); err != nil {
klog.ErrorS(err, "Unable to update node status")
}
}

registerWithAPIServer

该函数的核心作用是:尝试将 Kubelet 所在的节点(Node)注册到 Kubernetes API Server。如果节点已经存在,则进行一次状态协调(reconcile)。

函数返回一个布尔值,true 表示注册/协调成功,false 表示失败。

下面是该函数的详细执行流程:

  1. 尝试创建新节点

    • kl.kubeClient.CoreV1().Nodes().Create(...): 函数首先会乐观地尝试直接创建一个新的 Node 对象。

    • 如果创建成功 (err == nil),说明这是一个全新的节点加入集群。函数会记录相关延迟指标,然后直接返回 true,表示注册成功。

  2. 处理创建失败的情况

    • 如果创建失败,函数会通过一个 switch 语句来分析错误类型:

      • apierrors.IsAlreadyExists(err): 这是最常见的情况。错误表明 API Server 中已经存在一个同名的 Node 对象。这意味着 Kubelet 可能是在重启,或者之前的注册请求因为某些原因中断了。在这种情况下,程序会继续往下执行,进入“协调”逻辑。

      • apierrors.IsForbidden(err): 这个错误表示 Kubelet 没有创建 Node 对象的权限。这通常是由于 RBAC 配置不正确导致的。如果 KubeletRegistrationGetOnExistsOnly 这个特性门控被启用,它会直接报错并返回 false。否则,它会继续尝试获取节点信息,因为节点可能已经由其他方式创建好了。

      • default: 对于其他类型的错误(如网络问题),函数会记录错误日志并直接返回 false,表示本次尝试失败。

  3. 获取已存在的节点对象

    • 如果是因为 AlreadyExistsForbidden(且特性门控未开启)进入后续流程,函数会调用 kl.kubeClient.CoreV1().Nodes().Get(...) 来获取 API Server 中已存在的 Node 对象。

    • 如果获取失败或返回的 existingNodenil,说明发生了异常,函数会记录错误并返回 false

  4. 协调(Reconcile)节点状态

    • 如果成功获取到了 existingNode,说明节点确实已经注册过了。这时 Kubelet 的任务就变成了确保当前 Kubelet 的配置和状态与 API Server 中的记录保持一致。

    • originalNode := existingNode.DeepCopy(): 先深度拷贝一份原始的节点对象,用于后续的 Patch 操作,以计算出差异。

    • 接下来会调用一系列的 reconcileupdate 函数,来检查并更新 existingNode 对象中的字段,包括:

      • reconcileCMADAnnotationWithExistingNode: 协调与存储卷挂载相关的注解。

      • updateDefaultLabels: 更新节点的默认标签(如操作系统、架构等)。

      • reconcileExtendedResource: 协调扩展资源(如 GPU 等)。

      • reconcileHugePageResource: 协调大页内存资源。

    • requiresUpdate 标志位会记录在这些检查过程中是否有任何字段需要被更新。

  5. 发起 Patch 更新

    • 如果 requiresUpdatetrue,说明节点的某些状态需要更新。

    • nodeutil.PatchNodeStatus(...): 函数会调用这个帮助函数,向 API Server 发送一个 PATCH 请求。PATCH 请求只会更新发生变化的字段,比 UPDATE 请求更高效。

    • 如果 PATCH 操作失败,记录错误并返回 false

  6. 成功返回

    • 如果 PATCH 成功,或者根本不需要更新(requiresUpdatefalse),函数最终会返回 true,表示 Kubelet 已经成功地将自己与 API Server 中的节点状态同步了。

总结一下

tryRegisterWithAPIServer 函数封装了 Kubelet 节点注册的核心逻辑。它不仅处理了首次注册的场景,更重要的是处理了 Kubelet 重启后与现有节点对象进行状态同步的场景,通过一系列的协调和 PATCH 操作,确保了节点信息的准确性和一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// registerWithAPIServer registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithAPIServer() {
if kl.registrationCompleted {
return
}

kl.nodeStartupLatencyTracker.RecordAttemptRegisterNode()

step := 100 * time.Millisecond

for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}

node, err := kl.initialNode(context.TODO())
if err != nil {
klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
continue
}

klog.InfoS("Attempting to register node", "node", klog.KObj(node))
registered := kl.tryRegisterWithAPIServer(node)
if registered {
klog.InfoS("Successfully registered node", "node", klog.KObj(node))
kl.registrationCompleted = true
return
}
}
}

updateNodeStatus

该函数每10s会被调用一次,但是有4%的抖动以避免多个节点同时上报状态,使得APIServer压力过大。

具体在update时会最多尝试5次的tryUpdateNodeStatus。每次update的流程如下:

  1. 获取当前节点对象(originalNode

    • 这个函数的设计考虑到了大规模集群下对 API Server 的性能影响。

    • if tryNumber == 0: 如果是第一次尝试(tryNumber 为 0),它会优先从本地的 nodeLister(一个本地缓存)中获取 Node 对象。这样做可以极大地减少对 API Server 的 GET 请求,降低控制平面的负载。本地缓存的数据可能会有轻微延迟,但通常是可以接受的。

    • else: 如果不是第一次尝试(意味着前一次尝试可能因为数据冲突而失败),它会直接通过 heartbeatClient 向 API Server 发送 GET 请求,以获取最新、最准确的 Node 对象数据,避免再次发生冲突。

    • 如果获取 Node 对象失败,或者获取到的对象为 nil,则直接返回错误。

  2. 计算更新后的节点状态

    • node, changed := kl.updateNode(ctx, originalNode): 调用 updateNode 函数。这个函数会:

      • 创建一个 originalNode 的深拷贝(DeepCopy)。

      • 基于 Kubelet 当前的内部状态(如容器运行时状态、资源容量、Pod CIDR 等)来更新这个拷贝的 Status 字段。

      • 返回更新后的 node 对象和一个布尔值 changed,该值表示 node.StatusoriginalNode.Status 相比是否发生了实质性的变化。

  3. 判断是否需要发送更新

    • shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency: 这里决定了是否真的需要向 API Server 发送更新请求。满足以下任一条件即可:

      • changed: 节点状态发生了变化(比如 NodeReady 条件从 False 变成了 True)。

      • kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency: 距离上次成功上报状态的时间已经超过了预设的 nodeStatusReportFrequency(例如 5 分钟)。这是一个强制上报机制,确保即使节点状态一直没有变化,API Server 也能定期收到该节点的“心跳”,确认它还活着。

  4. 执行更新或跳过

    • if !shouldPatchNodeStatus: 如果不需要更新,函数会调用 kl.markVolumesFromNode(node) 来同步一下卷(Volume)的使用状态,然后直接返回 nil,表示本次操作成功完成(虽然没有发送网络请求)。

    • updatedNode, err := kl.patchNodeStatus(originalNode, node): 如果需要更新,则调用 patchNodeStatus 函数,向 API Server 发送一个 PATCH 请求,只更新发生变化的字段。

    • if err == nil: 如果 PATCH 请求成功,函数会用 API Server 返回的最新 updatedNode 对象来调用 kl.markVolumesFromNode(updatedNode),确保 Volume Manager 的状态与刚上报到 API Server 的状态一致。

    • 最后,返回 patchNodeStatus 的结果(err)。如果 err 不为 nil,外层的 updateNodeStatus 就会进行重试。

总结一下

tryUpdateNodeStatus 是一个既高效又健壮的函数。它通过优先使用本地缓存来降低 API Server 负载,同时通过强制上报周期和冲突后直连 API Server 的机制来保证状态同步的最终一致性和可靠性。它是 Kubelet 节点状态管理的核心实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// updateNodeStatus updates node status to master with retries if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) updateNodeStatus(ctx context.Context) error {
klog.V(5).InfoS("Updating node status")
for i := 0; i < nodeStatusUpdateRetry; i++ { // nodeStatusUpdateRetry = 5
if err := kl.tryUpdateNodeStatus(ctx, i); err != nil {
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
kl.onRepeatedHeartbeatFailure()
}
klog.ErrorS(err, "Error updating node status, will retry")
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}

// tryUpdateNodeStatus tries to update node status to master if there is any
// change or enough time passed from the last sync.
func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
// To reduce the load on control-plane, we are serving GET operations from
// local lister (the data might be slightly delayed but it doesn't
// seem to cause more conflict - the delays are pretty small).
// If it result in a conflict, all retries are served directly from etcd.
var originalNode *v1.Node
var err error

if tryNumber == 0 {
originalNode, err = kl.nodeLister.Get(string(kl.nodeName))
} else {
opts := metav1.GetOptions{}
originalNode, err = kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
}
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}

node, changed := kl.updateNode(ctx, originalNode)
shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency

if !shouldPatchNodeStatus {
kl.markVolumesFromNode(node)
return nil
}

updatedNode, err := kl.patchNodeStatus(originalNode, node)
if err == nil {
kl.markVolumesFromNode(updatedNode)
}
return err
}

fastStatusUpdateOnce


【k8s kubelet 源代码阅读(二)】-节点状态上报
http://example.com/2025/08/19/k8s kubelet源代码阅读【二】-节点状态上报/
作者
滑滑蛋
发布于
2025年8月19日
许可协议