K8s Informer介绍
背景
之前面试多次被问到过k8s的informer中如果请求丢失了会发生什么,感觉这应该是Operator中的经典面试题。正好也整理一下这方面的内容。
这个问题类似于:
- 在 Kubernetes 系统中,组件之间的通过 HTTP 协议进行通信,在不依赖任何中间件的情况下需要保证消息的实时性、可靠性、顺序性等。那么 Kubernetes 是如何做到的呢?答案就是 Informer 机制,尤其是其中定期发送的resync机制
框架概览
日常开发是只需要关注绿色的部分,即:
调用
AddEventHandler
,添加相应的逻辑处理AddFunc
、DeleteFunc
、UpdateFunc
实现 worker 逻辑从 workqueue 中消费 obj-key 即可。
其余的部分都是由client-go完成,主要关注SharedIndexInformer。
SharedIndexInformer
为了避免多个Listener去监控同一个资源的变化,从而加大APIServer的压力,主要的设计思路是使用单例模式,一个资源只实例化一个Informer,后续所有的 Listener 都共享这一个 Informer 实例即可。
从下面的源代码可以看出所有的 Informer 都通过同一个工厂SharedInformerFactory
来生成:
其内部存在一个 map,名为
informers
来存储所有当前已经实例化的所有 informer。通过
InformerFor
这个方法来实现共享机制,也就是 Singleton 模式,具体见下述代码和注解。
1 |
|
一个Informer中包含了以下几个组件:
Reflector:一个 informer 就有一个 Reflector。 Reflector 负责监控对应的资源,其中包含 ListerWatcher、store(DeltaFIFO)、lastSyncResourceVersion、resyncPeriod 等信息, 当资源发生变化时,会触发相应 obj 的变更事件,并将该 obj 的 delta 放入 DeltaFIFO 中。 DeltaFIFO会被消费然后用来更新本地的状态。
Indexer:它是 Informer 机制中本地最全的数据存储,其会通过 DeltaFIFO 中最新的 Delta 不停的更新自身信息,并对外提供状态信息,例如支持通过get获取。
Listerner:Informer中包含了多个由
AddEventHandler
或AddEventHandlerWithResyncPeriod
向 informer 注册新的 Listener。当 HandleDeltas 处理 DeltaFIFO 中的 Delta 时,会将这些更新事件派发给注册的 Listener。当然这里具体派发给哪些 Listener 有一定的规则,具体如下:派发给
listeners
:DeltaType 为Added
、Updated
、Deleted
、新旧资源版本号不一致的Replaced
派发给
syncingListeners
:DeltaType 为Sync
、新旧资源版本号一致的Replaced
数据同步流
有四类数据存储需要同步:API Server、DeltaFIFO、Indexer、Listener。对于这四部分,可以简单理解:API Server 侧为最权威的数据、DeltaFIFO 为本地最新的数据、Indexer 为本地最全的数据、Listener 为用户侧做逻辑用的数据。
远端通路:远端(API Server) ⇔ 本地(DeltaFIFO、Indexer、Listener)
负责将APIServer侧的obj同步到本地,主要同步类型有两类:
通过
List
行为产生的同步行为,这类 event 的 DeltaType 为Replaced
,同时只有在 Reflector 初始启动时才会产生。通过
Watch
行为产生的同步行为,对于 watch 到的Added、Modified、Deleted
类型的 event,对应的 DeltaType 为Added、Updated、Deleted
。
本地通路:本地(DeltaFIFO、Indexer、SyncingListener)之间同步
本地通路是通过 Reflector 的ListAndWatch
方法中运行一个 goroutine 来执行定期的Resync
操作。它会从 Indxer 拉一遍对应到所有 objs 的Delta 到 DeltaFIFO 中(list),其中的 Delta 为Sync
状态。之后 handleDeltas 就会同步 DeltaFIFO 中的 Sync Delta 给 syncingListeners 和 Indexer。
关键源码解析
ListAndWatch 方法
简化后的代码如下:
1 |
|
BackoffUntil包装了ListAndWatch,用来在ListAndWatch退出的时候重新启动它,只有stopChan发来停止消息的时候才会真正的停止。
其内部做了三件事:list->resync->watch,主要介绍如下。
List
1 |
|
list 操作在 ListAndWatch 中只会运行一次,简单来说,也可看作三个步骤:
派发 goroutine 去 API Server 拉取最新的 Obj 集合
等待 goroutine 结束,
listCh
接收到信号,表示 list 完成。或者stopCh
、panicCh
发来信号。其中 stopCh 表示调用者需要停止,panicCh 表示 goroutine 的 list 过程出错了整理 API Server 侧拉取到的最新 obj 集合,同时
syncWith
到 DeltaFIFO 中(最终调用 DeltaFIFO 的 Replace 方法)。
Resync
1 |
|
这部分是通过派发 goroutine 来完成的,内部通过 for 死循环来定期执行Resync
操作,resyncChan()
会定期向resyncCh
发来信号,定期的时间由 resyncPeriod 属性来设置。 整个过程直到cancelCh
或者stopCh
发来停止信号,其中 cancelCh 表示本次 ListAndWatch 结束了,stopCh 表示上层(调用者)发来停止信号。 在每次的Resync
操作操作中:
- 首先调用
ShouldResync
函数,其具体实现在 sharedProcessor 中,其会根据每一个 Listener 的同步时间来选出当前期待/需要进行 Resync 的 Listener 放入syncingListeners
中。
1 |
|
- 调用 store.Resync(),具体由 DeltaFIFO 中的 Resync 来实现,想要完成将 Indexer 中的 obj 全部刷到 DeltaFIFO 中(list)。 需要注意,在这个过程中,如果 DeltaFIFO 的 items 中已经存在该 obj,就不需要放了。因为我们的目的就是同步本地之间的 obj 信息, 既然在 items 中已经存在了该信息,并且该信息一定是本地最新的,未来也会被处理同步到本地所有存储中,因此这里就不需要再添加了。 具体处理细节看下面代码注解。
1 |
|
注意Resync也就是保证丢失的信息可以再次被处理的关键。
如果是从 Resync 重新同步到 Delta FIFO 队列的事件,会分发到 updateNotification 中触发 onUpdate 的回调
1 |
|
Watch
1 |
|
整个 watch 包在一个 for 死循环中,具体的 watch 行为通过watchHandler
函数来实现,其内部循环监听 watch 对象(由 listerWatcher.Watch 产生)的ResultChan
。 如果发来 evenet,并且没有出错,就按照四种类型进行处理: 分别为Added、Modified、Deleted、Bookmark,表示有 Obj 被:添加、修改、删除,以及版本更新。之后对于前三种类型, 分别调用 store(DeltaFIFO)的Add、Update、Delete
方法, 向 DeltaFIFO 中添加 DeltaType 为Added、Updated、Deleted
的 Delta。后续通过 Pop 函数中的 HandleDeltas 消费这些 Deltas。
HandleDeltas 方法
该方法的功能就是循环处理 item(Deltas)中的 Delta,对于每一个 Delta:按照操作类型分类,Deleted
为一类,剩余操作Sync, Replaced, Added, Updated
归为另一类:
对于
Deleted
:首先调用 indexer 的Delete方法,在本地存储中删除该 Obj。之后调用 distribute 方法,对所有的 Listener 进行deleteNotification通知删除 Obj 消息;对于
Sync, Replaced, Added, Updated
:首先查看在 indexer 中是否能够 get 到该 Obj:如果可以 get:调用 indexer 的Update方法,更新本地存储的 Obj,之后调用 distribute 方法,对所有的 Listener 进行updateNotification通知更新 Obj 消息;(注意:这部分的 distribute 针对 Sync 和部分 Replaced(见下述说明)只需要通知
syncingListeners
,而不是所有的 listeners。通过 distribute 方法最后的 bool 参数来设定,大部分情况设定为 false,说明通知所有的 listeners)如果 get 不到:调用 indexer 的Add方法,在本地存储添加该 Obj,之后调用 distribute 方法,对所有的 Listener 进行addNotification通知添加 Obj 消息;
AddEventHandler
下面这部分代码负责将各资源的informer的eventHandler注册进来,有几个注意点点:
如果s.defaultEventHandlerResyncPeriod不大于0,那么就不会尝试更新resyncPeriod
如果s没有启动,那么就不会只是单纯的注册进去,否则还会遍历所有的indexer,为每个给listener触发一个addNotification
1 |
|