引言
在 Kubernetes 中,如果你想获取资源对象的信息,有两种方式:
直接查询 APIServer:clientset.CoreV1().Pods("default").List(...)
- 数据来源:请求 apiserver
- 缺点:每次都会拉取全部数据,延迟非常高
使用本地缓存:lister.Pods(ns).Get(name)
- 数据来源:本地缓存
- 优点:速度非常快,不消耗 APIServer 资源
Informer 的设计就是为了解决这个问题。
Informer 的核心设计理念很简单:
- List:第一次启动时,拉取全量数据
- Watch:建立长连接,APIServer 主动推送变更事件(Add/Update/Delete)
- Cache:Informer 会在本地维护一份缓存(Store),以后查询直接从本地内存读取,耗时为 O(1),且不消耗 APIServer 资源
Kubernetes 集群中 99% 的操作是读(Controller 需要不断查询当前状态),只有 1% 是写。如果所有读都走网络,集群性能会受到严重影响。Informer 通过本地缓存解决了这个问题。
1. Reflector(反射器)
Reflector 是”搬运工”,它负责:
- 跟 APIServer 建立连接
- 执行 ListAndWatch 操作
- 接收远端数据变化事件
Reflector 的工作流程非常简单——死循环调用 ListAndWatch():
- 先发一个 HTTP GET 请求,把所有现有数据拉下来(List)
- 紧接着发一个 Watch 请求(本质是 HTTP Chunked 长连接)
当 Reflector 从网络连接里读到一个 Event(比如 Added),它会直接调用:
1
| r.store.Add(event.Object)
|
这里的 r.store 就是 DeltaFIFO。
逻辑闭环:
1
| APIServer (推送事件) -> Reflector (收到) -> DeltaFIFO.Add() -> 进入队列
|
2. DeltaFIFO(增量队列)
DeltaFIFO 是”缓冲区”,它的作用是防止处理逻辑太慢导致事件丢失。
数据结构定义
来自 k8s 源码 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type DeltaFIFO struct { lock sync.RWMutex items map[string]Deltas queue []string }
type Delta struct { Type DeltaType Object interface{} }
type Deltas []Delta
|
事件压缩机制
DeltaFIFO 实际上做了一个压缩操作,把同一个 key 的多次事件都存在一个 list 里,等到处理的时候一次性处理完。
示例:
首次事件:
1 2
| Queue: ["Pod1"] <-- 新增 Key Map: {"Pod1": [Created]} <-- 创建 List
|
再次更新:
1 2
| Queue: ["Pod1"] <-- Key 已存在,不动!(这就是压缩) Map: {"Pod1": [Created, Updated]} <-- 追加到 List
|
第三次有新 Pod:
1 2 3 4 5
| Queue: ["Pod1", "Pod2"] <-- 新增 Key,排在后面 Map: { "Pod1": [Created, Updated], "Pod2": [Created] }
|
第四次 Pod1 被删除:
1 2 3 4 5
| Queue: ["Pod1", "Pod2"] <-- 依然只有两个元素! Map: { "Pod1": [Created, Updated, Deleted], <-- 包含了 3 个事件 "Pod2": [Created] }
|
这样可以显著减少重复处理同一个对象的次数,提升效率。因为我们遍历的永远是 Queue,所以这种处理方式可以显著减少 queue 的长度。
3. Indexer(本地缓存/Store)
Indexer 是”账本”,这是 Informer 极其重要的原因之一。
Informer 会把从 APIServer 拿到的最新对象,在本地内存里存一份(通常是一个巨大的 Map)。这样做的目的是实现 O(1) 的本地读取性能。
ProcessLoop 和 Pop 过程
来自 k8s 源码 staging/src/k8s.io/client-go/tools/cache/controller.go:
1 2 3 4 5 6 7 8 9 10 11 12
| func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(c.config.Process) if err != nil { } } }
|
Pop 函数的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { } id := f.queue[0] f.queue = f.queue[1:] item, ok := f.items[id] delete(f.items, id) err := process(item) } }
|
Controller 的实现
让我们通过一个实际的例子来看如何使用 Informer 构建 Controller。
实验步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| go run .
kubectl create service clusterip my-test-app --tcp=80:80
kubectl annotate service my-test-app k8s-learner/strategy=blue kubectl get svc my-test-app -o yaml
kubectl annotate service my-test-app k8s-learner/strategy=panic --overwrite
kubectl annotate service my-test-app k8s-learner/strategy=green --overwrite
go run .
|
为什么能自动恢复?
这不是什么 APIServer 重新推送的魔法。Informer 就是客户端本地起的,只要 panic 了,里面的全部东西都消失了。
我们的 controller 重新启动后,会去同步一遍全部的 list,然后对每一个都触发一个 OnAdd。此时我们的处理函数发现有个 pod 的 annotation 是 green,但是 selector 还是 blue,就会更新它为 green。
这就是一种水平触发的实现方法。
Annotation 是什么?
Annotation 是 Kubernetes 资源对象上的一组键值对,主要用于存储非标识性元数据。
作用:它不影响 K8s 的核心调度(K8s 核心逻辑看不见它),是专门给工具或第三方程序(比如我们要写的 Controller)看的。
如何给一个组件添加 annotation:
1 2 3
| kubectl annotate service my-test-app k8s-learner/strategy=blue kubectl annotate service my-test-app k8s-learner/strategy=panic --overwrite kubectl annotate service my-test-app k8s-learner/strategy=green --overwrite
|
Controller 的线程模型
典型的生产者 - 消费者模型:
1 2 3 4 5
| [Informer (后台线程)] ↓ 监听到变化 Call c.enqueueService() ↓ c.workqueue.Add("default/my-service") <-- 东西放进传送带
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| [Controller (主线程)] c.Run() ↓ 启动 goroutine: wait.Until(c.runWorker) ↓ c.runWorker() (死循环) ↓ for c.processNextItem() { ↓ c.workqueue.Get() <-- 阻塞等待... 拿到 "default/my-service"! ↓ c.syncHandler("default/my-service") <-- 终于调用业务逻辑了! }
|
WorkQueue 的作用
WorkQueue 是 client-go 库里提供的一个纯粹的数据结构(就像 Java 的 BlockingQueue 或 Go 的 Channel,但功能更强)。
WorkQueue 与 DeltaFIFO 的关系:
- DeltaFIFO 负责监听 K8s 资源变化,并把变化的对象传递给你
- WorkQueue 则是你自己创建的队列,用来存放你想处理的 Key
具体流程:
- Informer 中的 Reflector 监听到变化后放入到 DeltaFIFO 中
- Informer 中的 processloop(死循环不断 pop)不断从里面取对象
- 取出来的对象会根据事件类型触发 AddFunc
- ProcessLoop 会做两个工作:
- 更新本地缓存也就是 indexer
- 分发事件,也就是调用我们自己的 AddEventHandler
- 在 AddEventHandler 中,我们不直接处理业务,而是把内容放到我们自己的 WorkQueue 中
- 这样做的原因是避免阻塞 Informer,影响 Informer 的效率
为什么需要 WorkQueue?
- 避免阻塞:OnAdd 这些回调方法调用是同步的,如果在回调中直接处理业务逻辑,可能会阻塞 Informer 的处理,影响性能
- 去重功能:WorkQueue 提供了去重功能,避免重复处理同一个 Key
- 重试机制:处理失败的 Key 可以重新入队,参考 handleErr 方法。如果直接在 OnAdd 中处理,失败了就丢掉了
- 并发控制:WorkQueue 可以控制并发处理的数量,避免同时处理过多任务导致资源耗尽
Controller 核心代码解析
Controller 结构体
1 2 3 4 5 6 7 8 9 10 11 12
| type Controller struct { kubeclientset kubernetes.Interface servicesLister corelisters.ServiceLister servicesSynced cache.InformerSynced workqueue workqueue.RateLimitingInterface }
|
初始化函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func NewController(kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) *Controller { serviceInformer := informerFactory.Core().V1().Services() c := &Controller{ kubeclientset: kubeclientset, servicesLister: serviceInformer.Lister(), servicesSynced: serviceInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue( workqueue.DefaultControllerRateLimiter(), "Services", ), } fmt.Println("Setting up event handlers...") serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueueService, UpdateFunc: func(old, new interface{}) { oldSvc := old.(*corev1.Service) newSvc := new.(*corev1.Service) if oldSvc.ResourceVersion == newSvc.ResourceVersion { return } c.enqueueService(new) }, }) return c }
|
EnqueueService 辅助函数
1 2 3 4 5 6 7 8 9
| func (c *Controller) enqueueService(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { runtime.HandleError(err) return } c.workqueue.Add(key) }
|
Run 启动入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (c *Controller) Run(stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.workqueue.ShutDown() fmt.Println("Starting Service controller") if !cache.WaitForCacheSync(stopCh, c.servicesSynced) { return fmt.Errorf("failed to wait for caches to sync") } fmt.Println("Cache synced. Starting workers...") go wait.Until(c.runWorker, time.Second, stopCh) fmt.Println("Worker started") <-stopCh return nil }
|
ProcessNextItem 处理队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (c *Controller) processNextItem() bool { key, quit := c.workqueue.Get() if quit { return false } defer c.workqueue.Done(key) err := c.syncHandler(key.(string)) c.handleErr(err, key) return true }
|
SyncHandler 业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } service, err := c.servicesLister.Services(namespace).Get(name) if err != nil { if errors.IsNotFound(err) { return nil } return err } strategy, ok := service.Annotations[StrategyAnnotation] if !ok { return nil } if strategy == "panic" { fmt.Printf("💣 BOOM! Triggered panic for %s\n", key) panic("Simulated Controller Crash!") } fmt.Printf("⚡ Syncing Service: %s, Strategy: %s\n", name, strategy) currentSelector := service.Spec.Selector["app"] if currentSelector == strategy { return nil } serviceCopy := service.DeepCopy() if serviceCopy.Spec.Selector == nil { serviceCopy.Spec.Selector = make(map[string]string) } serviceCopy.Spec.Selector["app"] = strategy _, err = c.kubeclientset.CoreV1().Services(namespace).Update( context.TODO(), serviceCopy, metav1.UpdateOptions{}, ) if err != nil { return err } fmt.Printf("✅ Successfully updated Service %s selector to %s\n", name, strategy) return nil }
|
HandleErr 重试逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func (c *Controller) handleErr(err error, key interface{}) { if err == nil { c.workqueue.Forget(key) return } if c.workqueue.NumRequeues(key) < 5 { fmt.Printf("Error syncing '%v': %v. Retrying...\n", key, err) c.workqueue.AddRateLimited(key) return } c.workqueue.Forget(key) runtime.HandleError(err) fmt.Printf("Dropping service '%q' out of the queue: %v\n", key, err) }
|
补充概念
SharedInformerFactory 可以复用连接的 Informer。如果你有多个 Controller,可以用同一个连接,APIServer 和 Informer 的连接只有一个。当 pod 发生变化时,Informer 负责分发事件给各个 Controller。
Resync(定期同步)机制
上述代码直接 panic 了,重启之后可以恢复状态,因为 informer 启动的时候会拉取全部的内容,然后触发全部 add 事件,这样就能恢复状态。
但是如果我们没有 panic 呢?如果 controller 只是出现了 bug,这样就不能通过重启时的全部拉取来恢复状态了。
我们通过 Resync 机制来解决问题:时间一到,强制将 indexer 里面的所有对象再次触发一遍 update 事件。这是一个兜底机制,保证 controller 状态和缓存最终一致。
如何避免逻辑复杂?通过在 update 事件里面判断一下 resourceVersion,如果相等说明是 Resync 触发的假更新。
线程安全问题
Indexer 是线程安全的,WorkQueue 也是线程安全的,但是我们的 OnAdd 和 OnUpdate 不是。我们的 informer 承诺了串行调用回调,所以回调 OnAdd(A) 和 OnAdd(B) 不会有并发问题。
但是 Controller 的 syncHandler 和 Informer 的 Reflector 是并发的。如果我们在 OnAdd 里面修改一个 map,而在 syncHandler 去读取整个 map 的值,一定会 panic,因为 map 不是线程安全的。
另外回调函数会阻塞 informer,所以回调函数一定要快速返回:
- 不能做 IO 操作
- 不能做阻塞操作
- 不能做长时间计算
否则会影响 informer 的效率。
总结
Informer 机制是 Kubernetes client-go 的核心设计,它通过:
- Reflector:负责与 APIServer 通信,执行 ListAndWatch
- DeltaFIFO:作为事件缓冲区和压缩器,合并同一对象的多次变更
- Indexer:提供 O(1) 性能的本地缓存
- WorkQueue:解耦事件处理和业务逻辑,提供重试和并发控制
- Controller:消费队列,执行业务调谐逻辑
这套组合拳使得 Kubernetes Controller 能够高效、可靠地监控和管理集群资源。