【k8s APIServer 源码阅读(一)】-对象缓存
Cacher结构
cacher的结构如下,已将注释翻译成了中文,关键的内容有
incoming chan watchCacheEvent
:事件分发的管道- 这里事件的结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// watchCacheEvent is a single "watch event" that is send to users of
// watchCache. Additionally to a typical "watch.Event" it contains
// the previous value of the object to enable proper filtering in the
// upper layers.
type watchCacheEvent struct {
Type watch.EventType
Object runtime.Object
ObjLabels labels.Set
ObjFields fields.Set
PrevObject runtime.Object
PrevObjLabels labels.Set
PrevObjFields fields.Set
Key string
ResourceVersion uint64
RecordTime time.Time
}
// EventType defines the possible types of events.
type EventType string
const (
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
)resourcePrefix string
:存储API资源的路径前缀(如/api/v1/pods
或/apis/apps/v1/deployments
),确保不同资源类型的缓存相互独立storage storage.Interface
:对存储接口的抽象,该接口由具体存储实现(如etcd3)提供,上层组件(如Cacher)通过此接口与存储交互:Cacher使用
Watch
方法监听底层存储变化API服务器通过
Get
/GetList
满足LIST请求控制器通过
GuaranteedUpdate
实现资源的安全更新
watchCache *watchCache
:一个维护了当前kind的所有的资源变动事件的滑动窗口reflector *cache.Reflector
:回调函数,list并watch etcd 并将事件和资源存到watchCache这个滑动窗口中versioner storage.Versioner
:处理资源的version相关信息watchersBuffer []*cacheWatcher
:所有对该资源有watch需求的连接
1 |
|
Cache初始化
初始化的流程如下:
配置验证与前置检查
索引触发器初始化(支持资源索引功能)
初始化Cacher,注意这里可以看到incoming的事件管道的容量只有100
给Cacher创建watchCache与reflector
后台触发
progressRequester
协程定期发送watch进度更新(bookmark事件),防止连接超时,并向客户端发送最新的资源版本信息。后台触发
cacher.dispatchEvents()
协程处理incoming
通道中的事件并推送给WATCH客户端后台触发wait.Until
以1秒间隔执行startCaching
,确保缓存持续同步
1 |
|
watchCache
watchCache的结构如下,这里核心是使用了一个cache []*watchCacheEvent
这个循环数组来缓存历史事件。
1 |
|
此外还提供了Add
、Update
、Delete
接口来给reflector暴露更改的接口,注意这里的函数只是进行了简单的包装,即将事件类型还有obj包装成了watch.Event
,然后就使用了processEvent
来处理。
1 |
|
watchCache.processEvent()
其流程如下:
- 首先在watch.Event的基础上添加更多的信息,诸如Fields、resourceVersion、RecordTime等,此外如果之前就有,那么也会获取到previous,最终组成watchCacheEvent。
1 |
|
cacher.dispatchEvents()
该方法实现了事件分发的主循环,处理来自底层存储的资源变更事件,并定期生成书签事件以维持watch连接活性,是API服务器缓存系统向客户端推送实时更新的关键组件。
bookmarkTimer每s进行一次,但是为了避免过高的并发有0.25比例的随机扰动。
事件处理的主循环处理三种事件源:
从
incoming
通道接收存储层事件(Added、Modified、Deleted、Bookmark、Error),然后将非BOOKMARK的事件(这类事件很频繁,亚秒级)进行分发,然后更新最后处理的资源版本lastProcessedResourceVersion从
bookmarkTimer
定时器收到的事件,给cacher.version推送一个Bookmark事件的更新并附带最后处理的资源版本lastProcessedResourceVersion,从而让cacher.version更新这个object记录的ResourceVersion。从
stopCh
收到事件执行优雅退出
1 |
|
cacher.dispatchEvent()
首先通过startDispatching构建
watchBuffer
如果分发的事件是Bookmark:
- 给
watchBuffer
中的每个watcher通过nonblockingAdd添加,其会判断是否 event.ResourceVersion >= c.bookmarkAfterResourceVersion,只有是才会加入到cacher.input中。
- 给
如果是其他普通的事件:
对每个watcher调用
nonblockingAdd
,成功则事件入队,失败则加入blockedWatchers
列表如果
blockedWatchers
列表有时事件则:超时预算管理:从
dispatchTimeoutBudget
获取可用超时时间,避免单个事件占用过多资源带超时的阻塞分发:调用
watcher.add
并传入计时器,允许在超时前等待watcher处理智能计时器复用:如果计时器触发则设为nil,避免后续watcher使用已过期的计时器
资源回收:返回未使用的超时时间到预算池,实现整体资源调控
1 |
|