【k8s kubelet 源代码阅读(一)】-Pod管理

关键组件介绍

probeManager

当一个pod被kubelet感知到之后,就会调用probeManagerAddPod来为各个容器启动探针,监控状态变化。具体执行如下:

  • 获取所有的pod.Spec.Containers以及pod中支持重新启动的initContainers,然后逐个处理

  • 如果这个容器配置了StartupProbe或者ReadinessProbe或者LivenessProbe,那么就会为其创建对应类型的newWorker,并启动协程来运行

  • newWorker协程会通过一个循环不断运行,其在实际检查前会有一些专门的处理,例如看容器是否是已经重启的新容器(即ContainerID是否发生改变),或者是否处在onHold的重启状态,容器是否是Running状态,是否超过了InitialDelaySeconds初始等待时间,检查通过后才会去具体执行探针。

  • 如果当前状态不是Started,那么就不会执行ReadinessProbe和LivenessProbe,只会执行StartupProbe。如果已经是Started那么就不会去执行StartupProbe。

  • 然后目前支持的探针类型有:

    • Exec:执行指定的命令

    • HTTPGet:执行Http请求

    • TCPSocket:建立TCPSocket

    • GRPC:调用GRPC

  • 如果连续探测成功或者连续失败超过阈值,就会将结果记录在自己的缓存中(缓存会被PLEG用来计算pod的事件),然后将事件发送到对应的updates 通道,然后通过syncLoopIteration函数来继续处理,在处理中会给podWorkers发送一个这个pod的SyncPodSync类型的事件来触发更新。

StatusManager

StatusManager主要职责是批量地将 Kubelet 本地缓存的 Pod 状态同步到 API Server。

其主循环如下,有两类更新的时机:

  • 增量更新:通过podStatusChannel触发

  • 全量更新:每10s周期性触发一次

1
2
3
4
5
6
7
8
9
10
11
12
go wait.Forever(func() {
for {
select {
case <-m.podStatusChannel:
klog.V(4).InfoS("Syncing updated statuses")
m.syncBatch(false) // 增量更新
case <-syncTicker:
klog.V(4).InfoS("Syncing all statuses")
m.syncBatch(true) // 全量更新
}
}
}, 0)

其核心的syncBatch(all bool)执行流程如下:

  • 首先通过与缓存的apiserver中pod的状态与本地维护的pod的状态比较,收集哪些pod需要更新状态:

    • 增量更新:

      • 收集当前status version > apiserver中 version的pod
    • 全量更新:

      • version变大的pod

      • 有deletionTimestamp并且是终态的的pod

      • 逐condition进行比较收集有不同的pod

  • 然后会对这些收集到的pod通过syncPod来向APIServer更新状态

    • 首先get到原本的pod

    • 然后比较uid是否相同,如果相同说明是同名新建的pod,需要跳过这次更新,并删除这旧的缓存

    • 然后进行状态合并mergePodStatus,状态合并而不是替换的原因在于有些status字段可能不是kubelet管理的,具体合并规则为:

      • 对于condition,保留不由kubelet管理的旧的condition,对于由kubelet管理的采用最新的condition,目前由kubelet管理的condition type有:

        1
        2
        3
        4
        5
        6
        7
        v1.PodScheduled,
        v1.PodReady,
        v1.PodInitialized,
        v1.ContainersReady,
        v1.PodResizeInProgress,
        v1.PodResizePending,
        v1.PodReadyToStartContainers,(如果开启相关特性)
      • ResourceClaimStatuses字段还是采用旧字段,因为其不属于kubelet管理

      • 如果pod进入了终态但是还有容器在运行,对于PhaseReasonMessage字段还是采用原本的旧字段,防止出现明明状态是终态但是还有容器在运行的假死状态

      • 如果pod的phase是终态,就强制把containerReadypodReady字段设置为false,以防止上报一个逻辑上矛盾的状态。

    • 得到merge后的状态后会通过Patch的方式来更新status,根据patch返回的新pod来更新缓存

    • 最后还会处理有deletionTimestamp并且是终态的的pod,其会调用Apiserver的Delete去删除对应的pod,并删除对应的缓存。

PLEG

目前最新的k8s支持两类PLEG:

  • GenericPLEG:

    • 传统的PLEG,核心的执行循环是:go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh),默认间隔是1s。

    • 在g.Relist函数中,它会通过容器运行时获取到当前节点的所有pod的容器的状态,然后逐个pod检查,通过与新旧容器状态的比较来获取事件,主要的事件类型有ContainerStarted、ContainerDied、ContainerRemoved、ContainerChanged,比较的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func generateEvents(logger klog.Logger, podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
    if newState == oldState {
    return nil
    }

    logger.V(4).Info("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
    switch newState {
    case plegContainerRunning:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
    case plegContainerExited:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
    case plegContainerUnknown:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
    case plegContainerNonExistent:
    switch oldState {
    case plegContainerExited:
    // We already reported that the container died before.
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
    default:
    return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
    }
    default:
    panic(fmt.Sprintf("unrecognized container state: %v", newState))
    }
    }
    • 如果有事件发生了,那么就会更新kubelet的cache中pod的状态,缓存的pod的状态如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    // PodStatus represents the status of the pod and its containers.
    // v1.PodStatus can be derived from examining PodStatus and v1.Pod.
    type PodStatus struct {
    // ID of the pod.
    ID types.UID
    // Name of the pod.
    Name string
    // Namespace of the pod.
    Namespace string
    // All IPs assigned to this pod
    IPs []string
    // Status of containers in the pod.
    ContainerStatuses []*Status
    // Statuses of containers of the active sandbox in the pod.
    ActiveContainerStatuses []*Status
    // Status of the pod sandbox.
    // Only for kuberuntime now, other runtime may keep it nil.
    SandboxStatuses []*runtimeapi.PodSandboxStatus
    // Timestamp at which container and pod statuses were recorded
    TimeStamp time.Time
    }

    // Status represents the status of a container.
    //
    // Status does not contain VolumeMap because CRI API is unaware of volume names.
    type Status struct {
    // ID of the container.
    ID ContainerID
    // Name of the container.
    Name string
    // Status of the container.
    State State
    // Creation time of the container.
    CreatedAt time.Time
    // Start time of the container.
    StartedAt time.Time
    // Finish time of the container.
    FinishedAt time.Time
    // Exit code of the container.
    ExitCode int
    // Name of the image, this also includes the tag of the image,
    // the expected form is "NAME:TAG".
    Image string
    // ID of the image.
    ImageID string
    // The digested reference of the image used by the container.
    ImageRef string
    // Runtime handler used to pull the image if any.
    ImageRuntimeHandler string
    // Hash of the container, used for comparison.
    Hash uint64
    // Number of times that the container has been restarted.
    RestartCount int
    // A string explains why container is in such a status.
    Reason string
    // Message written by the container before exiting (stored in
    // TerminationMessagePath).
    Message string
    // CPU and memory resources for this container
    Resources *ContainerResources
    // User identity information of the first process of this container
    User *ContainerUser
    // Mounts are the volume mounts of the container
    Mounts []Mount
    // StopSignal is used to show the container's effective stop signal in the Status
    StopSignal *v1.Signal
    }
    • 最后会将event(除ContainerChanged)发送给对应的管道,通过g.eventChannel <- events[i]
  • eventedPleg:

    • 这是基于事件实现的PLEG,通过 CRI 事件直接获取状态更新,相比依赖轮询的传统方法减少了节点资源消耗,但是这需要 CRI 运行时支持 CRI 事件。

    • 其主循环有两个:go wait.Until(e.watchEventsChannel, 0, e.stopCh)
      go wait.Until(e.updateGlobalCache, globalCacheUpdatePeriod, e.stopCacheUpdateCh)

    • 对于e.watchEventsChannel

      • 其会向容器运行时发起请求,要求运行时将所有容器的生命周期事件(如创建、启动、停止、删除)通过一个 channel (containerEventsResponseCh) 发送过来。如果多次连接失败会停止eventPleg,转而启动GenericPLEG。

      • 它还会不断从containerEventsResponseCh读取事件,然后获取对应pod的status,更新自身的cache,然后将事件发送给eventChannel

主循环syncLoopIteration

其主循环处理为syncLoopIteration,其核心是使用一个 select 语句同时监听来自多个不同源的事件通道,一旦任何一个通道有事件到达,它就会立即处理,从而驱动节点上的 Pod 达到其期望状态。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. housekeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// Here is an appropriate place to note that despite the syntactical
// similarity to the switch statement, the case statements in a select are
// evaluated in a pseudorandom order if there are multiple channels ready to
// read from when the select is evaluated. In other words, case statements
// are evaluated in random order, and you can not assume that the case
// statements evaluate in order if multiple channels have events.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// - configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// - plegCh: update the runtime cache; sync pod
// - syncCh: sync all pods waiting for sync
// - housekeepingCh: trigger cleanup of pods
// - health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}

switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}

kl.sourcesReady.AddSource(u.Source)

case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}

if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

status := "not ready"
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case update := <-kl.containerManager.Updates():
pods := []*v1.Pod{}
for _, p := range update.PodUIDs {
if pod, ok := kl.podManager.GetPodByUID(types.UID(p)); ok {
klog.V(3).InfoS("SyncLoop (containermanager): event for pod", "pod", klog.KObj(pod), "event", update)
pods = append(pods, pod)
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (containermanager): pod does not exist, ignore devices updates", "event", update)
}
}
if len(pods) > 0 {
// Updating the pod by syncing it again
// We do not apply the optimization by updating the status directly, but can do it later
handler.HandlePodSyncs(pods)
}

case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(ctx); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
}
klog.V(4).InfoS("SyncLoop (housekeeping) end", "duration", duration.Round(time.Millisecond))
}
}
return true
}

监控的事件来源有:

  • configCh (配置变更)

    • 来源: API Server、静态 Pod 文件目录、HTTP 端点。当有 Pod 的增、删、改、查(ADD, UPDATE, REMOVE, RECONCILE, DELETE)等期望状态变更时,事件会发送到这个通道。

    • 处理: 函数会根据事件类型(u.op),调用 SyncHandler 对应的处理方法(如 HandlePodAdditions, HandlePodUpdates,HandlePodRemoves,HandlePodReconcile ),将变更分发下去,最终触发 podWorkers 去执行具体的 Pod 操作。

  • plegCh (Pod 生命周期事件)

    • 来源: PLEG (Pod Lifecycle Event Generator)通过监控底层的容器运行时(如 containerd)获取的事件,它反映容器的实际状态变化,例如容器启动、死亡、创建失败等。

    • 处理: 当收到值得同步的 PLEG 事件时(isSyncPodWorthy),Kubelet 会为对应的 Pod 触发一次同步(HandlePodSyncs),以更新其在 API Server 上的状态,使其与节点的真实情况保持一致。

  • syncCh (周期性同步)

    • 来源: 一个定时器,在 syncLoop 中定义,每秒触发一次。

    • 处理: 调用 getPodsToSync() 获取所有需要同步的 Pod(比如之前同步失败的、或有内部模块请求同步的),然后调用 HandlePodSyncs 对它们进行批量同步。这是一个“兜底”机制,确保即使有事件丢失,Pod 状态最终也能被校准。

  • livenessManager, readinessManager, startupManagerUpdates() 通道 (探针结果)

    • 来源: 健康探针(Liveness/Readiness/Startup Probes)探测到某个容器的探针结果发生变化(如从成功变为失败)后会发送事件到通道中。

    • 处理: 当收到探针失败或成功的更新时,Kubelet 会立即为该 Pod 触发一次同步(HandlePodSyncs),以更新 Pod 的 status.conditions。例如,Readiness 探针失败会导致 Pod 的 Ready 条件变为 False

  • housekeepingCh (内务管理)

    • 来源: 一个定时器,每隔 housekeepingPeriod(默认2秒)触发一次。

    • 处理: 调用 handler.HandlePodCleanups() 执行清理工作,比如垃圾回收已终止的 Pod 和容器所占用的资源(如网络、挂载目录等)。这个操作只有在所有配置源都就绪后才会执行,以防误删。

关键事件处理

SyncHandler主要包含以下的一些处理

1
2
3
4
5
6
7
8
9
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups(ctx context.Context) error
}

HandlePodAdditions

处理由API Server、静态 Pod 文件目录、HTTP 导致的pod添加的事件,流程如下:

  • 对所有添加的pod按创建时间进行排序,再逐个处理

  • 把pod加入podManager用来表示pod以及被kubelet感知到

  • 如果是静态pod:

    • kubelet 不会走常规的创建流程,而是会找到它对应的真实 Pod(由 API Server 管理),并向 podWorkers 发送一个 SyncPodUpdate 请求。这通常意味着要根据静态 Pod 文件的变化来更新正在运行的 Pod。
  • 如果是普通的pod:

    • 进行准入检查:检查pod是否已经被标记为需要终止。例如,Kubelet 可能刚重启,就收到了一个已经被删除的 Pod 的 ADD 事件,这时需要忽略掉对其的处理。

    • 进行准入控制:获取到所有的allocated pods,然后通过各个admitHandlers来计算当前的资源是否可以提交这个pod。下面简单介绍predicateAdmitHandler和evicition_manager的检查逻辑:

      • predicateAdmitHandler:

        • 首先检查一些就算抢占其他pod也无法解决的限制,如系统、主机名;

        • 根据节点基本信息、已有pod、额外插件资源得到完整的节点信息;

        • 执行与调度器中filter类似的检查,检查节点的 CPU、内存、存储、Pod 数量等是否足够, Pod 请求的主机端口是否已被占用,节点的标签是否满足 Pod 的节点亲和性/选择器要求, Pod 是否能容忍节点上的 NoExecute 污点(Taint)等

        • 如果当前检查没有通过,但这个pod是关键函数则会尝试执行抢占,抢占的逻辑大致为通过贪心找到能满足需要的资源且已占有的资源最少的pod。

        • 如果还是不成功就会返回第一个不成功的原因。

      • evicition_manager:

        • 如果没有condition就说明节点目前没有压力,直接放行

        • 查看当前pod是否是关键pod,如果是就直接放行。判断的依据是查看其是否是静态pod或者优先级是否大于等于 2 * HighestUserDefinablePriority = 2000000000。

        • 如果condition中只有内存压力,那么就会放行非 BestEffort Pod,然后对于 BestEffort Pod会检查它的容忍度,如果配置了对“内存压力”污点(Taint)的容忍度(Toleration)也会放心。

        • 此外在有压力的情况下就不会放行了。

      • 最后调用podWorkers去执行UpdatePod函数,并传入pod及kubetypes.SyncPodCreate事件去继续处理,UpdatePod函数介绍如下:

        • 获取或创建podSyncStatus

        • 处理状态转化,对于要终止的pod,会设置status.terminatingAt字段为当前时间

        • 会启动或通知 PodWorker Goroutine。

PodWorker循环podWorkerLoop

podWorkerLoop 是 Kubelet 中每个 Pod 专属的生命周期管理者。当 UpdatePod 函数第一次接收到关于某个新 Pod 的更新时,它就会为这个 Pod 启动一个独立的 podWorkerLoop goroutine。这个 goroutine 的职责就是串行地、顺序地处理这个 Pod 从生到死的所有状态转换,确保每一步都正确执行。

这个函数的核心是一个 for range podUpdates 循环。podUpdates 是一个 channel,每当 UpdatePod 函数接收到关于这个 Pod 的新指令时,就会向这个 channel 发送一个信号,从而唤醒这个循环。

  1. 等待并获取工作 (for range podUpdates):

    • 循环会阻塞在这里,等待新的更新信号。
  2. 准备同步 (p.startPodSync(podUID)):

    • 一旦被唤醒,它首先会调用 startPodSync。这个函数负责:

      • podSyncStatus 中取出待处理的更新任务 (pendingUpdate)。

      • 检查 Pod 是否可以开始工作(例如,对于静态 Pod,要确保没有同名的旧 Pod 还在运行)。

      • 如果不能立即开始,startPodSync 会返回 canStart: falsepodWorkerLoop 就会 continue,继续等待下一次更新。

      • 如果 Pod 已经被标记为“永远无法启动”(例如,在启动前就被删除了),startPodSync 会返回 canEverStart: falsepodWorkerLoop 就会直接 return,彻底退出。

      • 如果一切正常,startPodSync 会返回 canStart: truecanEverStart: true,以及包含了具体工作内容的 update 对象。

  3. 执行核心同步逻辑 (err := func() error { ... }):

    • 这是一个匿名函数,主要是为了方便地处理 err

    • 获取最新状态: status, err = p.podCache.GetNewerThan(...)。在执行任何操作前,它会尝试从 PLEG(Pod Lifecycle Event Generator)的缓存中获取 Pod 的最新运行时状态(kubecontainer.PodStatus)。这确保了接下来的决策是基于最新的容器状态做出的。

    • 根据工作类型分派任务: switch { case update.WorkType == ... }

      • case TerminatedPod: 如果工作类型是“已终止”,则调用 p.podSyncer.SyncTerminatedPod。这个函数负责最后的清理工作,比如卸载卷、清理网络等。

      • case TerminatingPod: 如果工作类型是“正在终止”,则调用 p.podSyncer.SyncTerminatingPod。这个函数的核心任务是杀死 Pod 中的所有容器,并遵循优雅终止的宽限期。

      • default (即 SyncPod): 这是最常见的类型,调用 p.podSyncer.SyncPod。这个函数是 Kubelet 的核心同步逻辑,它会:

        • 对比 Pod 的期望状态(pod.Spec)和实际状态(从容器运行时获取)。

        • 创建缺失的容器。

        • 杀死多余或不健康的容器。

        • 设置沙箱(Pod 的网络命名空间等)。

        • SyncPod 会返回一个布尔值 isTerminal,如果 Pod 的所有容器都已运行完毕并且根据重启策略(RestartPolicy)不需要再重启,isTerminal 就会是 true

  4. 处理同步结果:

    • switch { case err == context.Canceled: ... }: 如果错误是 context.Canceled,说明 UpdatePod 函数因为收到了更紧急的指令(如立即终止)而取消了这次同步。podWorkerLoop 会直接等待下一次被唤醒。

    • case err != nil: 如果发生了其他错误,会打印错误日志,并通过 completeWork 函数将这个 Pod 加入到工作队列中,以便稍后进行退避重试(backoff retry)。

    • case update.WorkType == TerminatedPod: 如果 SyncTerminatedPod 成功完成,说明 Pod 的生命周期彻底结束。调用 completeTerminated 来清理 podWorkers 的内部状态,然后 podWorkerLoop 就可以 return,这个 goroutine 也就退出了。

    • case update.WorkType == TerminatingPod: 如果 SyncTerminatingPod 成功完成,说明所有容器都已被杀死。调用 completeTerminating 将 Pod 状态更新为“已终止”,并立即触发一次新的更新,以便进入 TerminatedPod 阶段进行清理。

    • case isTerminal: 如果 SyncPod 返回 isTerminal: true,说明 Pod 自然运行结束。调用 completeSync 将 Pod 状态更新为“正在终止”,并立即触发一次新的更新,以便进入 TerminatingPod 阶段。

  5. 完成工作并准备下一次循环 (p.completeWork(...)):

    • 这个函数会根据同步结果(成功、失败、阶段转换)来决定下一次重试的时间间隔,并将 Pod 的 UID 重新加入到 workQueue 中。

    • 它还会检查 podSyncStatus 中是否已经有新的 pendingUpdate,如果有,就立即向 podUpdates channel 发送信号,触发下一次循环,实现状态转换的无缝衔接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final
// state is reached. The loop is responsible for driving the pod through four main phases:
//
// 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time
// 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod
// 3. Terminating, ensuring all running containers in the pod are stopped
// 4. Terminated, cleaning up any resources that must be released before the pod can be deleted
//
// The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular
// sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet
// to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
// caller. When a pod transitions working->terminating or terminating->terminated, the next update is
// queued immediately and no kubelet action is required.
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
var lastSyncTime time.Time
for range podUpdates {
ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
if !ok {
continue
}
// If the pod was terminated prior to the pod being allowed to start, we exit the loop.
if !canEverStart {
return
}
// If the pod is not yet ready to start, continue and wait for more updates.
if !canStart {
continue
}

podUID, podRef := podUIDAndRefForUpdate(update.Options)

klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
// status updates on resyncs (the result of the last sync), transitions to
// terminating (no wait), or on terminated (whatever the most recent state is).
// Only syncing and terminating can generate pod status changes, while terminated
// pods ensure the most recent status makes it to the api server.
var status *kubecontainer.PodStatus
var err error
switch {
case update.Options.RunningPod != nil:
// when we receive a running pod, we don't need status at all because we are
// guaranteed to be terminating and we skip updates to the pod
default:
// wait until we see the next refresh from the PLEG via the cache (max 2s)
// TODO: this adds ~1s of latency on all transitions from sync to terminating
// to terminated, and on all termination retries (including evictions). We should
// improve latency by making the pleg continuous and by allowing pod status
// changes to be refreshed when key events happen (killPod, sync->terminating).
// Improving this latency also reduces the possibility that a terminated
// container's status is garbage collected before we have a chance to update the
// API server (thus losing the exit code).
status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)

if err != nil {
// This is the legacy event thrown by manage pod loop all other events are now dispatched
// from syncPodFn
p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
return err
}
}

// Take the appropriate action (illegal phases are prevented by UpdatePod)
switch {
case update.WorkType == TerminatedPod:
err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)

case update.WorkType == TerminatingPod:
var gracePeriod *int64
if opt := update.Options.KillPodOptions; opt != nil {
gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
}
podStatusFn := p.acknowledgeTerminating(podUID)

// if we only have a running pod, terminate it directly
if update.Options.RunningPod != nil {
err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
} else {
err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
}

default:
isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}

lastSyncTime = p.clock.Now()
return err
}()

var phaseTransition bool
switch {
case err == context.Canceled:
// when the context is cancelled we expect an update to already be queued
klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)

case err != nil:
// we will queue a retry
klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)

case update.WorkType == TerminatedPod:
// we can shut down the worker
p.completeTerminated(podUID)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
return

case update.WorkType == TerminatingPod:
// pods that don't exist in config don't need to be terminated, other loops will clean them up
if update.Options.RunningPod != nil {
p.completeTerminatingRuntimePod(podUID)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
return
}
// otherwise we move to the terminating phase
p.completeTerminating(podUID)
phaseTransition = true

case isTerminal:
// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
p.completeSync(podUID)
phaseTransition = true
}

// queue a retry if necessary, then put the next event in the channel if any
p.completeWork(podUID, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
}
}

Kubelet中的SyncPod

SyncPod 是 Kubelet 中负责创建和维持 Pod 运行状态的核心工作函数。当 podWorkerLoop 决定要对一个 Pod 进行同步时,就会调用这个函数。它的核心目标是:将 Pod 的实际状态(Actual State)驱动为其期望状态(Desired State)

这个函数是可重入的(reentrant),这意味着它可以被反复调用,并且每次调用都会尝试让 Pod 更接近其最终的期望状态。如果中途发生可恢复的错误,下一次调用 SyncPod 时会从失败的地方继续尝试。

SyncPod 的核心工作流程:

这个函数执行一个非常长的、事务性的操作序列,以确保一个 Pod 被正确地建立起来。如果其中任何一步失败,函数会返回错误,podWorkerLoop 会在稍后重试整个流程。

  1. 前置准备和状态生成:

    • 记录延迟: 如果是第一次创建 Pod (SyncPodCreate),会记录从 Kubelet 首次看到 Pod 到 podWorker 开始处理之间的延迟,用于性能监控。

    • 处理资源伸缩 (Resize): 如果启用了 InPlacePodVerticalScaling 特性,它会先检查并处理 Pod 的资源伸缩请求。

    • 生成 API 状态 (generateAPIPodStatus): 这是非常关键的一步。它会结合 Pod 的 specstatusManager 中缓存的状态以及从容器运行时获取的最新 podStatus,生成一个最终要上报给 API Server 的 v1.PodStatus 对象。

    • 检查是否已终结: 如果生成的 apiPodStatus 显示 Pod 已经处于 SucceededFailed 状态,说明 Pod 已经运行结束。SyncPod 会将这个最终状态更新到 statusManager,然后返回 isTerminal: true,通知 podWorkerLoop 进入终止流程。

  2. 状态更新和前置检查:

    • 更新状态管理器 (statusManager.SetPodStatus): 将上一步生成的 apiPodStatus 更新到 Kubelet 的状态管理器中。这是 Kubelet 内部对 Pod 状态的权威记录。

    • 检查网络插件: 如果网络插件还没准备好,并且 Pod 不是 hostNetwork 模式,那么就报错并退出,等待网络就绪。

    • 注册 Secret/ConfigMap: 通知 secretManagerconfigMapManager,这个 Pod 依赖了某些 Secret 和 ConfigMap,以便它们可以开始监视和挂载这些资源。

  3. 资源准备和创建:

    • 创建 Cgroup (pcm.EnsureExists): 如果启用了 cgroups-per-qos,它会为 Pod 创建对应的 Cgroup,并应用资源限制(如 CPU、内存限制)。这里有一个特殊逻辑:如果 Kubelet 重启后发现一个已存在的 Pod 没有 Cgroup,它会先杀死这个 Pod 的所有容器,然后再创建 Cgroup 并重新拉起容器,以确保 Pod 运行在正确的 Cgroup 控制下。

    • 创建 Mirror Pod (tryReconcileMirrorPods): 如果这是一个静态 Pod(Static Pod),并且还没有对应的镜像 Pod(Mirror Pod),SyncPod 会负责创建它。镜像 Pod 是静态 Pod 在 API Server 中的一个只读映射,目的是让集群的其他组件(如调度器)能够看到这个 Pod 并计算其资源占用。

    • 创建数据目录 (makePodDataDirs): 为 Pod 创建所需的数据目录,例如 /var/lib/kubelet/pods/<pod-uid>/volumes 等。

    • 等待卷挂载 (volumeManager.WaitForAttachAndMount): 这是非常重要的一步。它会阻塞在这里,直到 Pod 所需的所有存储卷(Volume)都已经被成功地附加(Attach)到节点并挂载(Mount)到 Pod 的数据目录中。

  4. 容器运行时同步:

    • 获取拉取镜像的密钥 (getPullSecretsForPod): 从 Secret Manager 中获取拉取 Pod 镜像所需的 imagePullSecrets

    • 启动探针 (probeManager.AddPod): 通知 probeManager 开始对这个 Pod 进行存活探针(Liveness Probe)和就绪探针(Readiness Probe)。

    • 调用容器运行时 (kl.containerRuntime.SyncPod): 这是整个流程中最核心的调用。Kubelet 将 Pod 的 specpodStatus、拉取密钥等所有信息打包,传递给底层的容器运行时(如 containerd 或 CRI-O)。容器运行时会负责:

      • 创建或更新 Pod 的沙箱(Sandbox)。

      • 拉取容器镜像。

      • 创建和启动容器。

      • 应用容器级别的配置。

    • containerRuntime.SyncPod 会返回一个结果,包含了同步过程中发生的任何错误。

  5. 收尾工作:

    • 更新 Reason 缓存: 将容器运行时的同步结果更新到 reasonCache,这有助于调试。

    • 处理原地伸缩结果: 如果有容器原地伸缩的操作,根据结果更新 Pod 的 PodResizeInProgress 状况。

    • 返回结果: 将 containerRuntime.SyncPod 的错误返回给 podWorkerLoop。如果返回 nil 错误,则表示本次同步成功,Pod 已经处于其期望的运行状态。

总结

SyncPod 是一个精心设计的、健壮的事务性脚本。它像一个建筑工头,按照一份详细的蓝图(Pod Spec),一步一步地协调各种资源(Cgroup、网络、存储、Secret),并最终指挥容器运行时这个“施工队”来完成 Pod 的“建造”工作。它的可重入性和详细的步骤确保了即使在复杂的环境中,Pod 也能够被可靠地创建和维护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250

// SyncPod is the transaction script for the sync of a single pod (setting up)
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// SyncTerminatingPod and SyncTerminatedPod. If SyncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If SyncPod exits with a transient error, the next
// invocation of SyncPod is expected to make progress towards reaching the
// desired state. SyncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will be
// SyncTerminatingPod. If the pod terminates for any other reason, SyncPod
// will receive a context cancellation and should exit as soon as possible.
//
// Arguments:
//
// updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
//
// pod - the pod that is being set up
//
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
//
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of SyncPod
//
// The workflow is:
// - If the pod is being created, record pod worker start latency
// - Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// - If the pod is being seen as running for the first time, record pod
// start latency
// - Update the status of the pod in the status manager
// - Stop the pod's containers if it should not be running due to soft
// admission
// - Ensure any background tracking for a runnable pod is started
// - Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// - Create the data directories for the pod if they do not exist
// - Wait for volumes to attach/mount
// - Fetch the pull secrets for the pod
// - Call the container runtime's SyncPod callback
// - Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next SyncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
ctx, otelSpan := kl.tracer.Start(ctx, "syncPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
semconv.K8SPodNameKey.String(pod.Name),
attribute.String("k8s.pod.update_type", updateType.String()),
semconv.K8SNamespaceNameKey.String(pod.Namespace),
))
klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
if err != nil {
otelSpan.RecordError(err)
otelSpan.SetStatus(codes.Error, err.Error())
}
klog.V(4).InfoS("SyncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
otelSpan.End()
}()

// Latency measurements for the main workflow are relative to the
// first time the pod was seen by kubelet.
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
}

// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
} else {
klog.V(3).InfoS("First seen time not recorded for pod",
"podUID", pod.UID,
"pod", klog.KObj(pod))
}
}

// handlePodResourcesResize updates the pod to use the allocated resources. This should come
// before the main business logic of SyncPod, so that a consistent view of the pod is used
// across the sync loop.
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Handle pod resize here instead of doing it in HandlePodUpdates because
// this conveniently retries any Deferred resize requests
// TODO(vinaykul,InPlacePodVerticalScaling): Investigate doing this in HandlePodUpdates + periodic SyncLoop scan
// See: https://github.com/kubernetes/kubernetes/pull/102884#discussion_r663160060
pod, err = kl.handlePodResourcesResize(pod, podStatus)
if err != nil {
return false, err
}
}

// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}
if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}

// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}

// Record the time it takes for the pod to become running
// since kubelet first saw the pod if firstSeenTime is set.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
}

kl.statusManager.SetPodStatus(pod, apiPodStatus)

// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}

// ensure the kubelet knows about referenced secrets or configmaps used by the pod
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
if kl.secretManager != nil {
kl.secretManager.RegisterPod(pod)
}
if kl.configMapManager != nil {
kl.configMapManager.RegisterPod(pod)
}
}

// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
// TODO: once context cancellation is added this check can be removed
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// When the kubelet is restarted with the cgroups-per-qos
// flag enabled, all the pod's running containers
// should be killed intermittently and brought back up
// under the qos cgroup hierarchy.
// Check if this is the pod's first sync
firstSync := true
for _, containerStatus := range apiPodStatus.ContainerStatuses {
if containerStatus.State.Running != nil {
firstSync = false
break
}
}
// Don't kill containers in pod if pod's cgroups already
// exists or the pod is running for the first time
podKilled := false
if !pcm.Exists(pod) && !firstSync {
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, nil); err == nil {
podKilled = true
} else {
if wait.Interrupted(err) {
return false, nil
}
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
}
}
// Create and Update pod's Cgroups
// Don't create cgroups for run once pod if it was killed above
// The current policy is not to restart the run once pods when
// the kubelet is restarted with the new flag as run once pods are
// expected to run only once and if the kubelet is restarted then
// they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
}

// Create Mirror Pod for Static Pod if it doesn't already exist
kl.tryReconcileMirrorPods(pod, mirrorPod)

// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
}

// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}

// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)

// Ensure the pod is being probed
kl.probeManager.AddPod(pod)

// TODO(#113606): use cancellation from the incoming context parameter, which comes from the pod worker.
// Currently, using cancellation from that context causes test failures. To remove this WithoutCancel,
// any wait.Interrupted errors need to be filtered from result and bypass the reasonCache - cancelling
// the context for SyncPod is a known and deliberate error, not a generic error.
// Use WithoutCancel instead of a new context.TODO() to propagate trace context
// Call the container runtime's SyncPod callback
sctx := context.WithoutCancel(ctx)
result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.crashLoopBackOff)
kl.reasonCache.Update(pod.UID, result)

for _, r := range result.SyncResults {
if r.Action == kubecontainer.ResizePodInPlace {
if r.Error == nil {
// The pod was resized successfully, clear any pod resize errors in the PodResizeInProgress condition.
kl.statusManager.SetPodResizeInProgressCondition(pod.UID, "", "", true)
} else {
kl.statusManager.SetPodResizeInProgressCondition(pod.UID, v1.PodReasonError, r.Message, false)
}
}
}

return false, result.Error()
}

kubeGenericRuntimeManager的syncPod

这个函数是 Kubelet 与底层容器运行时(如 containerd, CRI-O)交互的核心。当 Kubelet 的 podWorkerLoop 决定要同步一个 Pod 的状态时,最终就会调用这个函数。它的职责是将 Pod 的实际状态(Actual State)调整为期望状态(Desired State)

整个函数可以看作一个精心编排的、包含多个步骤的事务性过程。如果中途某一步失败,函数会返回错误,上层逻辑(podWorkerLoop)会在稍后重试整个 SyncPod 流程。

SyncPod 的 8 个核心步骤:

函数注释中清晰地列出了它的工作流程,我们来逐一解析:

步骤 1: 计算 Pod 的变更 (m.computePodActions)

这是 SyncPod 的“大脑”。它会比较 Pod 的期望配置 (pod spec) 和从容器运行时获取的当前实际状态 (podStatus),然后计算出需要执行的一系列具体操作,封装在 podContainerChanges 对象里。这些操作包括:

  • KillPod: 是否需要杀死整个 Pod(包括它的网络沙箱)。

  • CreateSandbox: 是否需要创建一个新的网络沙箱。

  • ContainersToKill: 一个列表,包含需要被杀死的容器(比如:健康检查失败、定义已变更等)。

  • ContainersToStart: 一个列表,包含需要被启动或重启的应用容器。

  • InitContainersToStart: 需要启动的 Init 容器。

  • EphemeralContainersToStart: 需要启动的临时容器(Ephemeral Container)。

  • ContainersToUpdate: 需要原地更新资源的容器(用于垂直扩缩容)。

步骤 2: 如有必要,杀死整个 Pod (if podContainerChanges.KillPod)

如果步骤 1 计算出需要杀死整个 Pod(通常是因为网络沙箱的配置变了,或者所有容器都已终止且无需重启),这一步就会执行。它会调用 killPodWithSyncResult 来:

  1. 停止 Pod 内所有的容器。

  2. 停止并销毁 Pod 的网络沙箱。 如果此步骤失败,SyncPod 会立即返回。

步骤 3: 杀死不需要的容器

如果不需要杀死整个 Pod,这一步会遍历步骤 1 计算出的 ContainersToKill 列表,并调用 m.killContainer 来逐个停止那些不再需要的容器。

步骤 4: 如有必要,创建 Pod 沙箱 (if podContainerChanges.CreateSandbox)

如果步骤 1 计算出需要创建新的沙箱,这一步就会执行。它会调用 m.createPodSandbox,通过 CRI (Container Runtime Interface) 请求底层容器运行时创建一个新的网络环境(即 Pod Sandbox)。创建成功后,会获取并记录沙箱的 IP 地址。

步骤 5: 创建临时容器 (Ephemeral Containers)

遍历 EphemeralContainersToStart 列表,启动所有需要的临时容器。临时容器通常用于调试,它们可以在 Pod 运行后动态添加进来。

步骤 6: 创建 Init 容器

遍历 InitContainersToStart 列表,按顺序启动 Init 容器。这是关键的一步

  • 对于不可重启的 Init 容器,如果任何一个启动失败,SyncPod 会立即中止并返回错误,因为后续的容器都不能再启动。

  • 对于可重启的 Init 容器(Sidecar),如果启动失败,可能会跳过并继续尝试启动其他容器。

步骤 7: 调整运行中容器的资源

如果启用了原地垂直扩缩容(In-Place Pod Vertical Scaling)功能,并且步骤 1 计算出了需要更新资源的容器列表 (ContainersToUpdate),这一步会调用 CRI 接口去更新正在运行容器的资源限制(如 CPU、内存)。

步骤 8: 创建应用容器

这是最后一步,遍历 ContainersToStart 列表,启动所有常规的应用容器。与 Init 容器不同,如果某个应用容器启动失败,SyncPod 通常不会立即中止,而是会记录下这个失败,然后继续尝试启动其他容器。

总结

SyncPod 是 Kubelet 中一个非常健壮和核心的函数。它通过一个定义清晰的、可重试的步骤序列,确保了 Pod 的状态能够可靠地从任何当前状态收敛到其期望状态。它处理了从网络、存储、配置到容器生命周期的方方面面,是 Pod 得以在节点上正确运行的根本保障。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297

// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Resize running containers (if InPlacePodVerticalScaling==true)
// 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(ctx, pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}

// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
} else {
klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
}

killResult := m.killPodWithSyncResult(ctx, pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
return
}

if podContainerChanges.CreateSandbox {
m.purgeInitContainers(ctx, pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
return
}
}
}

// Keep terminated init containers fairly aggressively controlled
// This is an optimization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(ctx, pod, podStatus)

// We pass the value of the PRIMARY podIP and list of podIPs down to
// generatePodSandboxConfig and generateContainerConfig, which in turn
// passes it to various other functions, in order to facilitate functionality
// that requires this value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet. The list of podIPs is used to
// generate the hosts file.
//
// We default to the IPs in the passed-in pod status, and overwrite them if the
// sandbox needs to be (re)started.
var podIPs []string
if podStatus != nil {
podIPs = podStatus.IPs
}

// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
var msg string
var err error

klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
metrics.StartedPodsTotal.Inc()
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)

// ConvertPodSysctlsVariableToDotsSeparator converts sysctl variable
// in the Pod.Spec.SecurityContext.Sysctls slice into a dot as a separator.
// runc uses the dot as the separator to verify whether the sysctl variable
// is correct in a separate namespace, so when using the slash as the sysctl
// variable separator, runc returns an error: "sysctl is not in a separate kernel namespace"
// and the podSandBox cannot be successfully created. Therefore, before calling runc,
// we need to convert the sysctl variable, the dot is used as a separator to separate the kernel namespace.
// When runc supports slash as sysctl separator, this function can no longer be used.
sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)

// Prepare resources allocated by the Dynammic Resource Allocation feature for the pod
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if err := m.runtimeHelper.PrepareDynamicResources(ctx, pod); err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
return
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedPrepareDynamicResources, "Failed to prepare dynamic resources: %v", err)
klog.ErrorS(err, "Failed to prepare dynamic resources", "pod", klog.KObj(pod))
return
}
}

podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
if err != nil {
// createPodSandbox can return an error from CNI, CSI,
// or CRI if the Pod has been deleted while the POD is
// being created. If the pod has been deleted then it's
// not a real error.
//
// SyncPod can still be running when we get here, which
// means the PodWorker has not acked the deletion.
if m.podStateProvider.IsPodTerminationRequested(pod.UID) {
klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
metrics.StartedPodsErrorsTotal.Inc()
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
return
}
klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))

resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
if referr != nil {
klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))
result.Fail(err)
return
}
if resp.GetStatus() == nil {
result.Fail(errors.New("pod sandbox status is nil"))
return
}

// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())
klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
}
}

// the start containers routines depend on pod ip(as in primary pod ip)
// instead of trying to figure out if we have 0 < len(podIPs)
// everytime, we short circuit it here
podIP := ""
if len(podIPs) != 0 {
podIP = podIPs[0]
}

// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
return
}

imageVolumePullResults, err := m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets)
if err != nil {
klog.ErrorS(err, "Get image volumes for pod failed", "pod", klog.KObj(pod))
configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error())
return
}

// Helper containing boilerplate common to starting all types of containers.
// typeName is a description used to describe this type of container in log messages,
// currently: "container", "init container" or "ephemeral container"
// metricLabel is the label used to describe this type of container in monitoring metrics.
// currently: "container", "init_container" or "ephemeral_container"
start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)

isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
return err
}

metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
}
klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))

// We fail late here to populate the "ErrImagePull" and "ImagePullBackOff" correctly to the end user.
imageVolumes, err := m.toKubeContainerImageVolumes(imageVolumePullResults, spec.container, pod, startContainerResult)
if err != nil {
return err
}

// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
msg, err = m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes)
incrementImageVolumeMetrics(err, msg, spec.container, imageVolumes)
if err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
if sc.HasWindowsHostProcessRequest(pod, spec.container) {
metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
}
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
}
return err
}

return nil
}

// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
for _, idx := range podContainerChanges.EphemeralContainersToStart {
start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}

// TODO: Remove this code path as logically it is the subset of the next
// code path.
if !types.HasRestartableInitContainer(pod) && utilfeature.DefaultFeatureGate.Enabled(features.LegacySidecarContainers) {
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}

// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
} else {
// Step 6: start init containers.
for _, idx := range podContainerChanges.InitContainersToStart {
container := &pod.Spec.InitContainers[idx]
// Start the next init container.
if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
if podutil.IsRestartableInitContainer(container) {
klog.V(4).InfoS("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod))
continue
}
klog.V(4).InfoS("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod))
return
}

// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
}

// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if resizable, _ := IsInPlacePodVerticalScalingAllowed(pod); resizable {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
result.SyncResults = append(result.SyncResults, m.doPodResizeAction(pod, podContainerChanges))
}
}

// Step 8: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}

return
}

【k8s kubelet 源代码阅读(一)】-Pod管理
http://example.com/2025/08/11/k8s kubelet源代码阅读【一】-Pod管理/
作者
滑滑蛋
发布于
2025年8月11日
许可协议