【k8s APIServer 源码阅读(一)】-对象缓存

Cacher结构

cacher的结构如下,已将注释翻译成了中文,关键的内容有

  • incoming chan watchCacheEvent:事件分发的管道

    • 这里事件的结构如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // watchCacheEvent is a single "watch event" that is send to users of
    // watchCache. Additionally to a typical "watch.Event" it contains
    // the previous value of the object to enable proper filtering in the
    // upper layers.
    type watchCacheEvent struct {
    Type watch.EventType
    Object runtime.Object
    ObjLabels labels.Set
    ObjFields fields.Set
    PrevObject runtime.Object
    PrevObjLabels labels.Set
    PrevObjFields fields.Set
    Key string
    ResourceVersion uint64
    RecordTime time.Time
    }

    // EventType defines the possible types of events.
    type EventType string

    const (
    Added EventType = "ADDED"
    Modified EventType = "MODIFIED"
    Deleted EventType = "DELETED"
    Bookmark EventType = "BOOKMARK"
    Error EventType = "ERROR"
    )
  • resourcePrefix string:存储API资源的路径前缀(如/api/v1/pods/apis/apps/v1/deployments),确保不同资源类型的缓存相互独立

  • storage storage.Interface:对存储接口的抽象,该接口由具体存储实现(如etcd3)提供,上层组件(如Cacher)通过此接口与存储交互:

    • Cacher使用Watch方法监听底层存储变化

    • API服务器通过Get/GetList满足LIST请求

    • 控制器通过GuaranteedUpdate实现资源的安全更新

  • watchCache *watchCache:一个维护了当前kind的所有的资源变动事件的滑动窗口

  • reflector *cache.Reflector:回调函数,list并watch etcd 并将事件和资源存到watchCache这个滑动窗口中

  • versioner storage.Versioner:处理资源的version相关信息

  • watchersBuffer []*cacheWatcher:所有对该资源有watch需求的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Cacher负责为给定资源提供WATCH和LIST请求服务
// 并基于底层存储内容在后台更新其缓存。
// Cacher实现了storage.Interface(尽管大多数调用只是委托给底层存储)
type Cacher struct {
// HighWaterMarks用于性能调试
// 重要:由于HighWaterMark使用sync/atomic,它必须位于结构体顶部,以解决32位平台上的bug
// 参见:https://golang.org/pkg/sync/atomic/了解更多信息
incomingHWM storage.HighWaterMark
// 应分发给观察者的传入事件
incoming chan watchCacheEvent

resourcePrefix string

sync.RWMutex

// 在访问缓存器的缓存之前,等待ready状态变为ok
// 这对于防止用户访问未初始化或正在重新填充的结构是必要的
// 当缓存器暂停或停止时,ready需要设置为false
// 当初始化后缓存器准备好使用时,ready需要设置为true
ready *ready

// 底层storage.Interface
storage storage.Interface

// 底层缓存中对象的预期类型
objectType reflect.Type
// 用于区分*unstructured.Unstructured(自定义资源定义)
groupResource schema.GroupResource

// 对象最近变更的“滑动窗口”和当前状态
watchCache *watchCache
reflector *cache.Reflector

// Versioner用于处理资源版本
versioner storage.Versioner

// newFunc是创建新空对象的函数,用于存储Type类型的对象
newFunc func() runtime.Object

// newListFunc是创建新空列表用于存储Type类型对象
newListFunc func() runtime.Object

// indexedTrigger用于优化需要处理传入事件的观察者数量
indexedTrigger *indexedTriggerFunc
// watchers将观察者感兴趣的触发函数值映射到观察者
watcherIdx int
watchers indexedWatchers

// 定义了分发事件时可用于等待未就绪观察者
// 然后关闭它们的时间预算
dispatchTimeoutBudget timeBudget

// 处理优雅终止
stopLock sync.RWMutex
stopped bool
stopCh chan struct{}
stopWg sync.WaitGroup

clock clock.Clock
// timer用于避免底层观察者中不必要的分配
timer *time.Timer

// dispatching确定当前是否有任何事件正在分发中
dispatching bool
// watchersBuffer是当前可能对当前分发事件感兴趣的观察者列表
watchersBuffer []*cacheWatcher
// blockedWatchers是缓冲区当前已满的观察者列表
blockedWatchers []*cacheWatcher
// watchersToStop当前分发期间应该停止但推迟到
// 当前事件分发结束才停止以避免与观察者关闭通道竞争
watchersToStop []*cacheWatcher
// 维护超时队列以在观察者超时前发送书签事件
// 注意:访问此字段时必须受Cacher.lock保护
bookmarkWatchers *watcherBookmarkTimeBuckets
// expiredBookmarkWatchers是已过期并需要安排下一个书签事件
expiredBookmarkWatchers []*cacheWatcher
}

Cache初始化

初始化的流程如下:

  1. 配置验证与前置检查

  2. 索引触发器初始化(支持资源索引功能)

  3. 初始化Cacher,注意这里可以看到incoming的事件管道的容量只有100

  4. 给Cacher创建watchCache与reflector

  5. 后台触发progressRequester协程定期发送watch进度更新(bookmark事件),防止连接超时,并向客户端发送最新的资源版本信息。

  6. 后台触发cacher.dispatchEvents()协程处理incoming通道中的事件并推送给WATCH客户端

  7. 后台触发wait.Until以1秒间隔执行startCaching,确保缓存持续同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) (*Cacher, error) {
stopCh := make(chan struct{})
obj := config.NewFunc()
// 1. 配置验证与前置检查
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err)
}

// 2. 索引触发器初始化(支持资源索引功能)
var indexedTrigger *indexedTriggerFunc
if config.IndexerFuncs != nil {
// ... 索引器函数处理逻辑 ...
}

// Cacher结构体初始化
cacher := &Cacher{
resourcePrefix: config.ResourcePrefix,
ready: newReady(config.Clock),
storage: config.Storage,
objectType: reflect.TypeOf(obj),
groupResource: config.GroupResource,
versioner: config.Versioner,
newFunc: config.NewFunc,
newListFunc: config.NewListFunc,
indexedTrigger: indexedTrigger,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[namespacedName]watchersMap),
valueWatchers: make(map[string]watchersMap),
},
incoming: make(chan watchCacheEvent, 100),
dispatchTimeoutBudget: newTimeBudget(),
stopCh: stopCh,
clock: config.Clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(config.Clock, defaultBookmarkFrequency),
}

// 3. 缓存与同步机制初始化
progressRequester := progress.NewConditionalProgressRequester(config.Storage.RequestWatchProgress)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner,
config.Indexers, config.Clock,
)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflector := cache.NewNamedReflector("storage/cacher.go:"+config.ResourcePrefix,
listerWatcher, obj, watchCache, 0)
reflector.WatchListPageSize = storageWatchListPageSize
reflector.MaxInternalErrorRetryDuration = time.Second * 30

// 4. 后台协程启动
go cacher.dispatchEvents()
go progressRequester.Run(stopCh)

cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } },
time.Second, stopCh,
)
}()

return cacher, nil
}

watchCache

watchCache的结构如下,这里核心是使用了一个cache []*watchCacheEvent这个循环数组来缓存历史事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// watchCache实现了Store接口
// 但它依赖元素实现runtime.Object接口
//
// watchCache是一个"滑动窗口"(具有有限容量),用于存储从watch中观察到的对象
type watchCache struct {
sync.RWMutex

// 列表等待足够新资源版本的条件变量
cond *sync.Cond

// 历史窗口的最大容量
capacity int

// 事件缓存的动态容量上限
upperBoundCapacity int

// 事件缓存的动态容量下限
lowerBoundCapacity int

// keyFunc用于为给定对象获取底层存储中的键
keyFunc func(runtime.Object) (string, error)

// getAttrsFunc用于获取对象的标签和字段
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)

// cache用作循环缓冲区 - 其"当前"内容存储在[start_index%capacity, end_index%capacity)范围内
// 因此"当前"内容正好有end_index-start_index个项目
cache []*watchCacheEvent
startIndex int
endIndex int
// removedEventSinceRelist表示自上次重新列表以来,是否有任何事件已从`cache`循环缓冲区中删除
removedEventSinceRelist bool

// store用于支持从"缓存历史末尾"(即最新缓存的watch事件之后的时刻)开始的LIST操作
// 这对于允许客户端从"现在"开始watch是必要的
// 注意:我们假设<store>是线程安全的
store storeIndexer

// watchCache传播到的资源版本
resourceVersion uint64

// 最后一次列表结果的资源版本(通过Replace()方法填充)
listResourceVersion uint64

// 此处理程序在每次成功的Replace()方法结束时运行
onReplace func()

// 此处理程序在每次Add/Update/Delete方法结束时运行
// 此外还获取对象的先前值
eventHandler func(*watchCacheEvent)

// 用于测试超时
clock clock.Clock

// eventFreshDuration定义watch缓存将存储的最小watch历史时长
eventFreshDuration time.Duration

// 底层的storage.Versioner
versioner storage.Versioner

// cacher的组资源
groupResource schema.GroupResource

// 用于测试缓存间隔失效
indexValidator indexValidator

// 如果有请求等待watch变为最新,则请求进度通知
waitingUntilFresh *progress.ConditionalProgressRequester

// 存储orderedLister的先前快照,以允许从先前的修订版本提供请求
snapshots Snapshotter
}

此外还提供了AddUpdateDelete接口来给reflector暴露更改的接口,注意这里的函数只是进行了简单的包装,即将事件类型还有obj包装成了watch.Event,然后就使用了processEvent来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Added, Object: object}

f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}

// Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Modified, Object: object}

f := func(elem *storeElement) error { return w.store.Update(elem) }
return w.processEvent(event, resourceVersion, f)
}

// Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
event := watch.Event{Type: watch.Deleted, Object: object}

f := func(elem *storeElement) error { return w.store.Delete(elem) }
return w.processEvent(event, resourceVersion, f)
}

watchCache.processEvent()

其流程如下:

  • 首先在watch.Event的基础上添加更多的信息,诸如Fields、resourceVersion、RecordTime等,此外如果之前就有,那么也会获取到previous,最终组成watchCacheEvent。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// processEvent is safe as long as there is at most one call to it in flight
// at any point in time.
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
metrics.EventsReceivedCounter.WithLabelValues(w.groupResource.String()).Inc()

key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
elem := &storeElement{Key: key, Object: event.Object}
elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
if err != nil {
return err
}

wcEvent := &watchCacheEvent{
Type: event.Type,
Object: elem.Object,
ObjLabels: elem.Labels,
ObjFields: elem.Fields,
Key: key,
ResourceVersion: resourceVersion,
RecordTime: w.clock.Now(),
}

// We can call w.store.Get() outside of a critical section,
// because the w.store itself is thread-safe and the only
// place where w.store is modified is below (via updateFunc)
// and these calls are serialized because reflector is processing
// events one-by-one.
previous, exists, err := w.store.Get(elem)
if err != nil {
return err
}
if exists {
previousElem := previous.(*storeElement)
wcEvent.PrevObject = previousElem.Object
wcEvent.PrevObjLabels = previousElem.Labels
wcEvent.PrevObjFields = previousElem.Fields
}

if err := func() error {
w.Lock()
defer w.Unlock()

w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()

err := updateFunc(elem)
if err != nil {
return err
}
if w.snapshots != nil {
if orderedLister, ordered := w.store.(orderedLister); ordered {
if w.isCacheFullLocked() {
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
w.snapshots.RemoveLess(oldestRV)
}
w.snapshots.Add(w.resourceVersion, orderedLister)
}
}
return err
}(); err != nil {
return err
}

// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to Add/Update/Delete and
// UpdateResourceVersion in flight at any point in time, which is true now,
// because reflector calls them synchronously from its main thread.
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
metrics.RecordResourceVersion(w.groupResource.String(), resourceVersion)
return nil
}

cacher.dispatchEvents()

该方法实现了事件分发的主循环,处理来自底层存储的资源变更事件,并定期生成书签事件以维持watch连接活性,是API服务器缓存系统向客户端推送实时更新的关键组件。

  • bookmarkTimer每s进行一次,但是为了避免过高的并发有0.25比例的随机扰动。

  • 事件处理的主循环处理三种事件源:

    • incoming通道接收存储层事件(Added、Modified、Deleted、Bookmark、Error),然后将非BOOKMARK的事件(这类事件很频繁,亚秒级)进行分发,然后更新最后处理的资源版本lastProcessedResourceVersion

    • bookmarkTimer定时器收到的事件,给cacher.version推送一个Bookmark事件的更新并附带最后处理的资源版本lastProcessedResourceVersion,从而让cacher.version更新这个object记录的ResourceVersion。

    • stopCh收到事件执行优雅退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (c *Cacher) dispatchEvents() {
// Jitter to help level out any aggregate load.
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
defer bookmarkTimer.Stop()

// The internal informer populates the RV as soon as it conducts
// The first successful sync with the underlying store.
// The cache must wait until this first sync is completed to be deemed ready.
// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0,
// we poll aggressively for the first list RV before entering the dispatch loop.
lastProcessedResourceVersion := uint64(0)
if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) {
if rv := c.watchCache.getListResourceVersion(); rv != 0 {
lastProcessedResourceVersion = rv
return true, nil
}
return false, nil
}); err != nil {
// given the function above never returns error,
// the non-empty error means that the stopCh was closed
return
}
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
if event.Type != watch.Bookmark {
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
bookmarkEvent := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: lastProcessedResourceVersion,
}
if err := c.versioner.UpdateObject(bookmarkEvent.Object, bookmarkEvent.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on bookmark event %+v", bookmarkEvent.ResourceVersion, bookmarkEvent.Object)
continue
}
c.dispatchEvent(bookmarkEvent)
case <-c.stopCh:
return
}
}
}

cacher.dispatchEvent()

  • 首先通过startDispatching构建watchBuffer

  • 如果分发的事件是Bookmark:

    • watchBuffer中的每个watcher通过nonblockingAdd添加,其会判断是否 event.ResourceVersion >= c.bookmarkAfterResourceVersion,只有是才会加入到cacher.input中。
  • 如果是其他普通的事件:

    • 对每个watcher调用nonblockingAdd,成功则事件入队,失败则加入blockedWatchers列表

    • 如果blockedWatchers列表有时事件则:

      • 超时预算管理:从dispatchTimeoutBudget获取可用超时时间,避免单个事件占用过多资源

      • 带超时的阻塞分发:调用watcher.add并传入计时器,允许在超时前等待watcher处理

      • 智能计时器复用:如果计时器触发则设为nil,避免后续watcher使用已过期的计时器

      • 资源回收:返回未使用的超时时间到预算池,实现整体资源调控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
// Watchers stopped after startDispatching will be delayed to finishDispatching,

// Since add() can block, we explicitly add when cacher is unlocked.
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
//
// Note: if we ever decide to cache the serialization of bookmark events,
// we will also need to modify the watchEncoder encoder
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given that CachingObject is just wrapping the object and not perfoming
// deep-copying (until some field is explicitly being modified), we create
// it unconditionally to ensure safety and reduce deep-copying.
//
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent

c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}

if len(c.blockedWatchers) > 0 {
// dispatchEvent is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time.Now()
timeout := c.dispatchTimeoutBudget.takeAvailable()
c.timer.Reset(timeout)

// Send event to all blocked watchers. As long as timer is running,
// `add` will wait for the watcher to unblock. After timeout,
// `add` will not wait, but immediately close a still blocked watcher.
// Hence, every watcher gets the chance to unblock itself while timer
// is running, not only the first ones in the list.
timer := c.timer
for _, watcher := range c.blockedWatchers {
if !watcher.add(event, timer) {
// fired, clean the timer by set it to nil.
timer = nil
}
}

// Stop the timer if it is not fired
if timer != nil && !timer.Stop() {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<-timer.C
}

c.dispatchTimeoutBudget.returnUnused(timeout - time.Since(startTime))
}
}
}

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// if the bookmarkAfterResourceVersion hasn't been seen
// we will try to deliver a bookmark event every second.
// the following check will discard a bookmark event
// if it is < than the bookmarkAfterResourceVersion
// so that we don't pollute the input channel
if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
return false
}
select {
case c.input <- event:
c.markBookmarkAfterRvAsReceived(event)
return true
default:
return false
}
}

【k8s APIServer 源码阅读(一)】-对象缓存
http://example.com/2025/08/24/k8s APIServer源码阅读【一】-对象缓存/
作者
滑滑蛋
发布于
2025年8月24日
许可协议