K8s Informer介绍

背景

之前面试多次被问到过k8s的informer中如果请求丢失了会发生什么,感觉这应该是Operator中的经典面试题。正好也整理一下这方面的内容。

这个问题类似于:

  • 在 Kubernetes 系统中,组件之间的通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 机制,尤其是其中定期发送的resync机制

框架概览

日常开发是只需要关注绿色的部分,即:

  1. 调用AddEventHandler,添加相应的逻辑处理AddFuncDeleteFuncUpdateFunc

  2. 实现 worker 逻辑从 workqueue 中消费 obj-key 即可。

其余的部分都是由client-go完成,主要关注SharedIndexInformer。

SharedIndexInformer

为了避免多个Listener去监控同一个资源的变化,从而加大APIServer的压力,主要的设计思路是使用单例模式,一个资源只实例化一个Informer,后续所有的 Listener 都共享这一个 Informer 实例即可。

从下面的源代码可以看出所有的 Informer 都通过同一个工厂SharedInformerFactory来生成:

  • 其内部存在一个 map,名为informers来存储所有当前已经实例化的所有 informer。

  • 通过InformerFor这个方法来实现共享机制,也就是 Singleton 模式,具体见下述代码和注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type sharedInformerFactory struct {
...// 工厂级别(所有 informer)默认的 resync 时间defaultResync time.Duration// 每个 informer 具体的 resync 时间customResync map[reflect.Type]time.Duration// informer 实例的 mapinformers map[reflect.Type]cache.SharedIndexInformer...
}

// 共享机制 通过 InformerFor 来完成func (f *sharedInformerFactory) InformerFor(
obj runtime.Object,
newFunc internalinterfaces.NewInformerFunc,
) cache.SharedIndexInformer {
...informerType := reflect.TypeOf(obj)
// 如果已经有 informer 实例 就直接返回该实例informer, exists := f.informers[informerType]
if exists {
return informer
}
// 如果不存在该类型的 informer// 1. 设置 informer 的 resync 时间resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 2. 实例化该 informerinformer = newFunc(f.client, resyncPeriod)
// 3. 在 map 中记录该 informerf.informers[informerType] = informerreturn informer
}

一个Informer中包含了以下几个组件:

  • Reflector:一个 informer 就有一个 Reflector。 Reflector 负责监控对应的资源,其中包含 ListerWatcher、store(DeltaFIFO)、lastSyncResourceVersion、resyncPeriod 等信息, 当资源发生变化时,会触发相应 obj 的变更事件,并将该 obj 的 delta 放入 DeltaFIFO 中。 DeltaFIFO会被消费然后用来更新本地的状态。

  • Indexer:它是 Informer 机制中本地最全的数据存储,其会通过 DeltaFIFO 中最新的 Delta 不停的更新自身信息,并对外提供状态信息,例如支持通过get获取。

  • Listerner:Informer中包含了多个由AddEventHandlerAddEventHandlerWithResyncPeriod向 informer 注册新的 Listener。当 HandleDeltas 处理 DeltaFIFO 中的 Delta 时,会将这些更新事件派发给注册的 Listener。当然这里具体派发给哪些 Listener 有一定的规则,具体如下:

    • 派发给listeners:DeltaType 为AddedUpdatedDeleted、新旧资源版本号不一致的Replaced

    • 派发给syncingListeners:DeltaType 为Sync、新旧资源版本号一致的Replaced

数据同步流

有四类数据存储需要同步:API Server、DeltaFIFO、Indexer、Listener。对于这四部分,可以简单理解:API Server 侧为最权威的数据、DeltaFIFO 为本地最新的数据、Indexer 为本地最全的数据、Listener 为用户侧做逻辑用的数据。

远端通路:远端(API Server) ⇔ 本地(DeltaFIFO、Indexer、Listener)

负责将APIServer侧的obj同步到本地,主要同步类型有两类:

  • 通过List行为产生的同步行为,这类 event 的 DeltaType 为Replaced,同时只有在 Reflector 初始启动时才会产生。

  • 通过Watch行为产生的同步行为,对于 watch 到的Added、Modified、Deleted类型的 event,对应的 DeltaType 为Added、Updated、Deleted

本地通路:本地(DeltaFIFO、Indexer、SyncingListener)之间同步

本地通路是通过 Reflector 的ListAndWatch方法中运行一个 goroutine 来执行定期的Resync操作。它会从 Indxer 拉一遍对应到所有 objs 的Delta 到 DeltaFIFO 中(list),其中的 Delta 为Sync状态。之后 handleDeltas 就会同步 DeltaFIFO 中的 Sync Delta 给 syncingListeners 和 Indexer。

关键源码解析

ListAndWatch 方法

简化后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { 
// list...// resync...// watch...
}

func (r *Reflector) Run(stopCh <-chan struct{}) {
...wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
...
}

BackoffUntil包装了ListAndWatch,用来在ListAndWatch退出的时候重新启动它,只有stopChan发来停止消息的时候才会真正的停止。

其内部做了三件事:list->resync->watch,主要介绍如下。

List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { 
// listif err := func() error {
...// 1. 开启 goroutine 执行 listgo func() {
defer func() {
if r := recover(); r != nil {
// list 失败,向 panicCh 发送信号panicCh <- r
}
}()
// 执行 list 操作,从 API Server 侧获取所有 obj 集合...// 成功完成 listclose(listCh)
}()
// 2. 等待执行 list 操作的 goroutine 结束,或者 stopCh、panicCh 终止select {
case <-stopCh:
return nilcase r := <-panicCh:
panic(r)
case <-listCh:
}
...// 待研究 watch cache 是什么?if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}

r.setIsLastSyncResourceVersionUnavailable(false) // list was successful...listMetaInterface, err := meta.ListAccessor(list)
...resourceVersion = listMetaInterface.GetResourceVersion()
// 从 list 中整理所有 obj 为一个数组items, err := meta.ExtractList(list)

// 3. 将 API Server 侧的最新 Obj 集合同步到 DeltaFIFO 中 最终调用 DeltaFIFO 的 Replace 方法if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
r.setLastSyncResourceVersion(resourceVersion)
...return nil
}(); err != nil {
return err
}
// resync...// watch...
}

list 操作在 ListAndWatch 中只会运行一次,简单来说,也可看作三个步骤:

  1. 派发 goroutine 去 API Server 拉取最新的 Obj 集合

  2. 等待 goroutine 结束,listCh接收到信号,表示 list 完成。或者stopChpanicCh发来信号。其中 stopCh 表示调用者需要停止,panicCh 表示 goroutine 的 list 过程出错了

  3. 整理 API Server 侧拉取到的最新 obj 集合,同时syncWith到 DeltaFIFO 中(最终调用 DeltaFIFO 的 Replace 方法)。

Resync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { 
// list
...
// resync
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// watch
...
}

这部分是通过派发 goroutine 来完成的,内部通过 for 死循环来定期执行Resync操作,resyncChan()会定期向resyncCh发来信号,定期的时间由 resyncPeriod 属性来设置。 整个过程直到cancelCh或者stopCh发来停止信号,其中 cancelCh 表示本次 ListAndWatch 结束了,stopCh 表示上层(调用者)发来停止信号。 在每次的Resync操作操作中:

  1. 首先调用ShouldResync函数,其具体实现在 sharedProcessor 中,其会根据每一个 Listener 的同步时间来选出当前期待/需要进行 Resync 的 Listener 放入syncingListeners中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()

p.syncingListeners = []*processorListener{}

resyncNeeded := false
now := p.clock.Now()
// 遍历所有的 Listener,将同步时间已经到了的
// Listener 加入 syncingListeners
for _, listener := range p.listeners {
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
  • 调用 store.Resync(),具体由 DeltaFIFO 中的 Resync 来实现,想要完成将 Indexer 中的 obj 全部刷到 DeltaFIFO 中(list)。 需要注意,在这个过程中,如果 DeltaFIFO 的 items 中已经存在该 obj,就不需要放了。因为我们的目的就是同步本地之间的 obj 信息, 既然在 items 中已经存在了该信息,并且该信息一定是本地最新的,未来也会被处理同步到本地所有存储中,因此这里就不需要再添加了。 具体处理细节看下面代码注解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
// knownObjects 可以理解为 Indexer
if f.knownObjects == nil {
return nil
}
// 将 Indexer 中所有的 obj 刷到 DeltaFIFO 中
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
// 通过 key 在 Indexer 中获得 obj
obj, exists, err := f.knownObjects.GetByKey(key)
...
// 计算 DeltaFIFO 中 Obj 的 key
id, err := f.KeyOf(obj)
...
// 如果在 items 中已经存在该 obj,就不需要再添加了
if len(f.items[id]) > 0 {
return nil
}
// 如果在 items 中没有该 obj,就添加 Sync 类型的 Deltas
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}

注意Resync也就是保证丢失的信息可以再次被处理的关键。

如果是从 Resync 重新同步到 Delta FIFO 队列的事件,会分发到 updateNotification 中触发 onUpdate 的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newestfor _, d := range obj.(Deltas) {
// 判断事件类型,看事件是通过新增、更新、替换、删除还是 Resync 重新同步产生的switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := falseswitch {
case d.Type == Sync:
// 如果是通过 Resync 重新同步得到的事件则做个标记isSync = truecase d.Type == Replaced:
...
}
// 如果是通过 Resync 重新同步得到的事件,则触发 onUpdate 回调s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

Watch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { 
// list
...
// resync
...
// watch
for {
...
w, err := r.listerWatcher.Watch(options)
...
// 开始 watch
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
// 如果不是 stopCh 发来的主动停止,就记录日志
if err != errorStopRequested {
...
}
// 注意这里返回的为 nil,结合 BackoffUntil 函数看
return nil
}
}
}

整个 watch 包在一个 for 死循环中,具体的 watch 行为通过watchHandler函数来实现,其内部循环监听 watch 对象(由 listerWatcher.Watch 产生)的ResultChan。 如果发来 evenet,并且没有出错,就按照四种类型进行处理: 分别为Added、Modified、Deleted、Bookmark,表示有 Obj 被:添加、修改、删除,以及版本更新。之后对于前三种类型, 分别调用 store(DeltaFIFO)的Add、Update、Delete方法, 向 DeltaFIFO 中添加 DeltaType 为Added、Updated、Deleted的 Delta。后续通过 Pop 函数中的 HandleDeltas 消费这些 Deltas。

HandleDeltas 方法

该方法的功能就是循环处理 item(Deltas)中的 Delta,对于每一个 Delta:按照操作类型分类,Deleted为一类,剩余操作Sync, Replaced, Added, Updated归为另一类:

  1. 对于Deleted:首先调用 indexer 的Delete方法,在本地存储中删除该 Obj。之后调用 distribute 方法,对所有的 Listener 进行deleteNotification通知删除 Obj 消息;

  2. 对于Sync, Replaced, Added, Updated:首先查看在 indexer 中是否能够 get 到该 Obj:

    • 如果可以 get:调用 indexer 的Update方法,更新本地存储的 Obj,之后调用 distribute 方法,对所有的 Listener 进行updateNotification通知更新 Obj 消息;(注意:这部分的 distribute 针对 Sync 和部分 Replaced(见下述说明)只需要通知syncingListeners,而不是所有的 listeners。通过 distribute 方法最后的 bool 参数来设定,大部分情况设定为 false,说明通知所有的 listeners)

    • 如果 get 不到:调用 indexer 的Add方法,在本地存储添加该 Obj,之后调用 distribute 方法,对所有的 Listener 进行addNotification通知添加 Obj 消息;

AddEventHandler

下面这部分代码负责将各资源的informer的eventHandler注册进来,有几个注意点点:

  • 如果s.defaultEventHandlerResyncPeriod不大于0,那么就不会尝试更新resyncPeriod

  • 如果s没有启动,那么就不会只是单纯的注册进去,否则还会遍历所有的indexer,为每个给listener触发一个addNotification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}

if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}

if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}

listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

if !s.started {
s.processor.addListener(listener)
return
}

// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}

参考资料


K8s Informer介绍
http://example.com/2025/04/09/K8sInformer/
作者
滑滑蛋
发布于
2025年4月9日
许可协议