client-go简介

client-go是一个调用Kubernetes集群资源对象API的客户端,即通过client-go实现对Kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查操作。大部分对Kubernetes进行前置API封装的二次开发都通过client-go这个第三方包来实现。

client-go目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
.
├── discovery # 定义DsicoveryClient客户端。作用是用于发现k8s所支持GVR(Group, Version, Resources)。
├── dynamic # 定义DynamicClient客户端。可以用于访问k8s Resources(如: Pod, Deploy...),也可以访问用户自定义资源(即: CRD)。
├── informers # k8s中各种Resources的Informer机制的实现。
├── kubernetes # 定义ClientSet客户端。它只能用于访问k8s Resources。每一种资源(如: Pod等)都可以看成是一个客端,而ClientSet是多个客户端的集合,它对RestClient进行了封装,引入了对Resources和Version的管理。通常来说ClientSet是client-gen来自动生成的。
├── listers # 提供对Resources的获取功能。对于Get()和List()而言,listers提供给二者的数据都是从缓存中读取的。
├── pkg
├── plugin # 提供第三方插件。如:GCP, OpenStack等。
├── rest # 定义RestClient,实现了Restful的API。同时会支持Protobuf和Json格式数据。
├── scale # 定义ScalClient。用于Deploy, RS, RC等的扩/缩容。
├── tools # 定义诸如SharedInformer、Reflector、DealtFIFO和Indexer等常用工具。实现client查询和缓存机制,减少client与api-server请求次数,减少api-server的压力。
├── transport
└── util # 提供诸如WorkQueue、Certificate等常用方法。

流程

  1. 初始化rest.Config对象,用来初始化client。
  2. 按需初始化client。
  3. 进行具体的crud。
image-20231109195552271

RESTClient是所有客户端的父类,底层调用了Go语言net\http库,访问API Server的RESTful接口。

ClientSet

ClientSet是使用最多的客户端,它继承自RESTClient,使用K8s的代码生成机制(client-gen机制),在编译过程中,会根据目前K8s内置的资源信息,自动生成他们的客户端代码(前提是需要添加适当的注解),使用者可以通过builder pattern进行初始化,得到自己在意的目标资源类型的客户端。ClientSet如同它的名字一样,代表的是一组内置资源的客户端。

1
2
3
clientset, err := kubernetes.NewForConfig(config) // 根据config对象创建clientSet对象
mustSuccess(err)
podClient := clientset.CoreV1().Pods("development") // 根据Pod资源的Group、Version、Recource Name创建资源定制客户端,传入的字符串表示资源所在的ns;podClient对象具有List\Update\Delete\Patch\Get等curd接口

DynamicClient

DynamiClient动态客户端,可以根据传入的GVR(group version resource)生成一个可以操作特定资源的客户端。但是不是内存安全的客户端,返回的结果通常是非结构化的。需要额外经过一次类型转换才能变为目标资源类型的对象,这一步存在内存安全的风险。相比ClientSet,动态客户端不局限于K8s的内置资源,可以用于处理CRD(custome resource define)自定义资源,但是缺点在于安全性不高。DynamicClient使用的样例代码如下:

结构化的类型通常属于k8s runtime object的子类型;非结构化的对象通常是map[string]interface{}的形式,通过一个字典存储对象的属性;K8s所有的内置资源都可以通过代码生成机制,拥有默认的资源转换方法

1
2
3
4
5
6
7
8
9
10
11
12
13
dynamicClient, err := dynamic.NewForConfig(config)
mustSuccess(err)
gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
// 返回非结构化的对象
unstructObj, err := dynamicClient.Resource(gvr).Namespace("sandbox").List(context.TODO(), metav1.ListOptions{Limit: 40})
mustSuccess(err)
podList := &corev1.PodList{}
// 额外做一次类型转换,如果这里传错类型,就会有类型安全的风险
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
mustSuccess(err)
for _, po := range podList.Items {
fmt.Printf("NAMESPACE: %v \t NAME: %v \t STATUS: %v \n", po.Namespace, po.Name, po.Status)
}

DiscoveryClient

DiscoveryClient发现客户端,主要用于处理向服务端请求当前集群支持的资源信息,例如命令kubectl api-resources使用的就是发现客户端,由于发现客户端获取的数据量比较大,并且集群的资源信息变更并不频繁,因此发现客户端会在本地建立文件缓存,默认十分钟之内的请求,使用本地缓存,超过十分钟之后则重新请求服务端。DiscoveryClient的使用样例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
mustSuccess(err)

_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
mustSuccess(err)

for _, list := range APIResourceList {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
mustSuccess(err)

for _, resource := range list.APIResources {
fmt.Printf("name: %v \t group: %v \t verison: %v \n",
resource.Name, gv.Group, gv.Version)
}
}

总结一下

客户端名称 源码目录 简单描述
RESTClient client-go/rest/ 基础客户端,对HTTP Request封装
ClientSet client-go/kubernetes/ 在RESTClient基础上封装了对Resource和Version,也就是说我们使用ClientSet的话是必须要知道Resource和Version, 例如AppsV1().Deployments或者CoreV1.Pods,缺点是不能访问CRD自定义资源
DynamicClient client-go/dynamic/ 包含一组动态的客户端,可以对任意的K8S API对象执行通用操作,包括CRD自定义资源
DiscoveryClient client-go/discovery/ ClientSet必须要知道Resource和Version, 但使用者通常很难记住所有的GVR信息,这个DiscoveryClient是提供一个发现客户端,发现API Server支持的资源组,资源版本和资源信息

client-go内部原理

![image-20231110113355125](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110113355125.png)

![image-20231110113147395](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110113147395.png)

各组件介绍

  1. client-go 组件
    • Reflector: 定义在 /tools/cache 包内的 Reflector 类型 中的 reflector 监视 Kubernetes API 以获取指定的资源类型 (Kind)。完成此操作的函数是 ListAndWatch。监视可以用于内建资源,也可以用于自定义资源。当 reflector 通过监视 API 的收到关于新资源实例存在的通知时,它使用相应的 listing API 获取新创建的对象,并将其放入 watchHandler 函数内的 Delta Fifo 队列中。
    • Informer: 在 /tools/cache 包内的基础 controller 中定义的一个 informer 从 Delta FIFO 队列中弹出对象。完成此操作的函数是 processLoop。这个基础 controller 的任务是保存对象以供以后检索,并调用 controller 将对象传递给它。
    • Indexer: indexer 为对象提供索引功能。它定义在 /tools/cache 包内的 Indexer 类型。一个典型的索引用例是基于对象标签创建索引。Indexer 可以基于多个索引函数维护索引。Indexer 使用线程安全的数据存储来存储对象及其键值。在 /tools/cache 包内的 Store 类型 定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数为该对象生成一个名为 <namespace>/<name> 组合的对象键值。
  2. Custom Controller 组件
    • Informer reference: 这是一个知道如何使用自定义资源对象的 Informer 实例的引用。您的自定义控制器代码需要创建适当的 Informer。
    • Indexer reference: 这是一个知道如何使用自定义资源对象的 Indexer 实例的引用。您的自定义控制器代码需要创建这个。您将使用此引用检索对象,以便稍后处理。
    • Resource Event Handlers: 当 Informer 想要分发一个对象给你的控制器时,会调用这些回调函数。编写这些函数的典型模式是获取已分配对象的键值,并将该键值放入一个工作队列中进行进一步处理。
    • Work queue: 这是在控制器代码中创建的队列,用于将对象的分发与处理解耦。编写 Resource Event Handler 函数来提取所分发对象的键值并将其添加到工作队列中。
    • Process Item: 这是在代码中创建的处理 work queue 中的 items 的函数。可以有一个或多个其他函数来执行实际的处理。这些函数通常使用 Indexer 引用 或 Listing wrapper 来获取与键值对应的对象。

流程

Indexer

Indexer主要依赖于ThreadSafeStore实现,是client-go提供的一种缓存机制,通过检索本地缓存可以有效降低apiserver的压力

![image-20231110113653209](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110113653209.png)

Indexer接口

Indexer接口主要是在Store接口的基础上拓展了对象的检索功能:

1
2
3
4
5
6
7
8
9
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error) // 根据索引名和给定的对象返回符合条件的所有对象
IndexKeys(indexName, indexedValue string) ([]string, error) // 根据索引名和索引值返回符合条件的所有对象的 key
ListIndexFuncValues(indexName string) []string // 列出索引函数计算出来的所有索引值
ByIndex(indexName, indexedValue string) ([]interface{}, error) // 根据索引名和索引值返回符合条件的所有对象
GetIndexers() Indexers // 获取所有的 Indexers,对应 map[string]IndexFunc 类型
AddIndexers(newIndexers Indexers) error // 这个方法要在数据加入存储前调用,添加更多的索引方法,默认只通过 namespace 检索
}

Indexer的默认实现是cache:

1
2
3
4
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}

cache对应两个方法体实现完全一样的New函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

ThreadSafeStore

ThreadSafeStore是Indexer的核心逻辑所在,Indexer的多数方法是直接调用内部cacheStorage属性的方法实现的,同样先看接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error // 过期了,没有具体代码逻辑
}

对应实现:

1
2
3
4
5
6
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices
}

这里的Indexers和Indices是:

1
2
3
type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index

Indexers 里存的是 Index 函数 map,一个典型的实现是字符串 namespace 作为 key,IndexFunc 类型的实现 MetaNamespaceIndexFunc 函数作为 value,也就是我们希望通过 namespace 来检索时,通过 Indexers 可以拿到对应的计算 Index 的函数,接着拿着这个函数,把对象穿进去,就可以计算出这个对象对应的 key,在这里也就是具体的 namespace 值,比如 default、kube-system 这种。然后在 Indices 里存的也是一个 map,key 是上面计算出来的 default 这种 namespace 值,value 是一个 set,而 set 表示的是这个 default namespace 下的一些具体 pod 的 <namespace>/<name> 这类字符串。最后拿着这种 key,就可以在 items 里检索到对应的对象了。

![image-20231110114758819](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110114758819.png)

总结一下

Indexer是Informer实现本地缓存的关键模块。作为Indexer的主要实现,cache是一个存储在内存中的缓存器,初始化时,会指定keyFunc,通常会根据对象的资源名与对象名组合成一个唯一的字符串作为对象键。此外,cache将缓存的维护工作委托给threadSafeMap来完成,threadSafeMap内部实现了一套类似MySql覆盖索引、二级索引的存储机制,用户可以自行添加具有特定索引生成方法的二级索引,方便自己的数据存取。

DeltaFIFO

DeltaFIFO其实是两个词:Delta+FIFO,Delta表示变化,FIFO则是先入先出的队列。

![image-20231110143712545](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110143712545.png)

DeltaFIFO将接收来的资源event转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。

Client-go定义了以下几种变化类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go
// DeltaType 其实是字符串类型的别名,代表一种变化
type DeltaType string

// Change type definition
const (
Added DeltaType = &quot;Added&quot; // 增
Updated DeltaType = &quot;Updated&quot; // 更新
Deleted DeltaType = &quot;Deleted&quot; // 删除
Replaced DeltaType = &quot;Replaced&quot; // 替换,list出错时,会触发relist,此时会替换
Sync DeltaType = &quot;Sync&quot; // 周期性的同步,底层会当作一个update类型处理
)
// Delta由一个对象+类型组成
type Delta struct {
Type DeltaType
Object interface{}
}

// Deltas是一组Delta
type Deltas []Delta

然后看一下Delta_FIFO的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
// 读写锁与条件变量
lock sync.RWMutex
cond sync.Cond

// items是一个字典,存储了对象键与Delats的映射关系
// queue是一个FIFO队列,存储了先后进入队列的对象的对象键,queue里面的对象和items里的对象键是一一对应的
// items里的对象,至少有一个Delta
items map[string]Deltas
queue []string

// 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
populated bool
// 通过Replace()接口将第一批对象放入队列的对象数量
initialPopulationCount int

// 用于计算对象键的方法
keyFunc KeyFunc

// 其实就是Indexer
knownObjects KeyListerGetter

// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}

可以用一张图简单描述下Delta_FIFO里面items和queue的关系:

![image-20231110144144459](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110144144459.png)

采用这样的结构把对象与事件的存储分离,好处就是不会因为某个对象的事件太多,而导致其他对象的事件一直得不到消费。

Delta_FIFO的核心操作有两个:往队列里面添加元素、从队列中POP元素,可以看下这两个方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 文件路径: k8s.io/client-go/tools/cache/delta_fifo.go

// queueActionLocked 用于向队列中添加delta,调用前必须加写锁
// 传入delta类型、资源对象两个参数
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 获取资源对象的对象键
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}

// 向items中添加delta,并对操作进行去重,目前来看,只有连续两次操作都是删除操作的情况下,才可以合并,其他操作不会合并
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) &gt; 0 {
// 向queue和items中添加元素
// 添加以后,条件变量发出消息,通知可能正在阻塞的POP方法有事件进队列了
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// 冗余判断,其实是不会走到这个分支的,去重后的delta list长度怎么也不可能小于1
delete(f.items, id)
}
return nil
}

// Pop方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 如果队列是空的,利用条件变量阻塞住,直到有新的delta
// 如果Close()被调用,则退出
// 否则一直循环处理
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
// 取队列第一个的所有deltas
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount &gt; 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
// 如果处理失败了,调用addIfNotPresent,addIfNotPresent意为:如果queue中没有则添加
// 本身刚刚从queue和items中取出对象,应该不会存在重复的对象,这里调用addIfNotPresent应该只是为了保险起见
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}

return item, err
}
}

Reflector

K8s的设计是事件驱动的、充分微服务化的,我们可以从事件传递的角度重新理解一下K8s:

组件之间互相看作是事件的生产者、消费者,API Server看作是一个只用内存存储事件的Broker,我们可以从消息队列的角度取理解一下,如下图展示的:

![image-20231110145802473](/Users/goiruri/Library/Application Support/typora-user-images/image-20231110145802473.png)

k8s服务端通过读取etcd的资源变更信息,向所有客户端发布资源变更事件。k8s中,组件之间通过HTTP协议进行通信,在不额外引入其他中间件的情况下,保证消息传递的实时性、可靠性、顺序性不是一个容易的事情。K8s内部所有的组件都是通过Informer机制实现与API Server的通信的。Informer直译就是消息通知者的意思。

通常一个Informer只会关注一种特定的资源,Reflector负责从API Server拉取&同步该资源类型下所有对象的event。例如,如果当前informer关注Pod资源,那么Reflector会首先list集群中所有的Pod的信息,同步本地的ResourceVersion,之后基于当前的ResourceVerison,使用一个Http长连接Watch集群中Pod资源的事件,并传递到Delta_FIFO模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 文件路径: k8s.io/client-go/tools/cache/reflector.go

// Reflector监控某一种资源的变化,并将这些变化传递到存储中
type Reflector struct {
name string // 名字,默认会被命名为 文件:行号
expectedTypeName string // 被监控的资源的类型名
expectedType reflect.Type // 监控的对象类型
expectedGVK *schema.GroupVersionKind // The GVK of the object we expect to place in the store if unstructured.
store Store // 存储,就是Delta_FIFO,这里的Store类型实际是Delta_FIFO的父类
listerWatcher ListerWatcher // 用来进行list&amp;watch的接口对象,大概知道做什么的就行了,底层是通过http长连接实现的资源监听

resyncPeriod time.Duration // 重新同步的周期
ShouldResync func() bool // 周期性的判断是否需要重新同步
clock clock.Clock // 时钟对象,主要是为了给测试留后门,方便修改时间
...
}

ResourceVersion是ETCD生成的全局唯一且递增的序号,通过此序号,客户端可以知道目前与服务端信息同步的状态,每次只取大于等于本地ResourceVersion的事件,好处是可以实现事件的全局唯一,实现“断点续传”功能,不用担心本地客户端偶尔出现的网络异常。

可以关注到Reflector三个比较关键的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// 文件路径: k8s.io/client-go/tools/cache/reflector.go 

// 反射器的入口方法,wait.Backoffutil会周期性的执行传入的匿名函数,直到接收到stopCh传来的终止信号
func (r *Reflector) Run(stopCh &lt;-chan struct{}) {
klog.V(2).Infof(&quot;Starting reflector %s (%s) from %s&quot;, r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof(&quot;Stopping reflector %s (%s) from %s&quot;, r.expectedTypeName, r.resyncPeriod, r.name)
}
// ListAndWatch方法比较长,这里截取部分展示,主要有三部分
// 1. list操作,更新本地的资源版本号,最先执行且只执行一次
// 2. 启动后台的周期性sync协程
// 3. 死循环执行watchHandler操作
func (r *Reflector) ListAndWatch(stopCh &lt;-chan struct{}) error {
// 全量list的逻辑,只执行一次
// 这一步里面会将list返回的结果实例化为对象数组,也就是反射的过程,这也是reflector名字的由来,但是reflector目前做的不仅仅是反射
if err := func() error {
...
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step(&quot;Objects listed&quot;)
listMetaInterface, err := meta.ListAccessor(list)
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step(&quot;Resource version updated&quot;)
return nil
}(); err != nil {
return err
}
// 后台定期sync协程,会一直周期性执行
go func() {
resyncCh, cleanup := r.resyncChan()
for {
...
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()

// watch操作
for {
...
if err := r.watchHandler(start, w, &amp;resourceVersion, resyncerrc, stopCh); err != nil {
...
return nil
}
...
}
}
// 我们继续看看watchHandler里面做了什么
// 去掉了注释、日志、错误判断,只看核心逻辑,可以直观的看到,最后的处理逻辑落在了Delta_FIFO上
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh &lt;-chan struct{}) error {
eventCount := 0
defer w.Stop()
loop:
for {
select {
case &lt;-stopCh:
return errorStopRequested
case err := &lt;-errc:
return err
case event, ok := &lt;-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}

meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
// 操作Delta_FIFO
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf(&quot;%s: unable to understand watch event %#v&quot;, r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}

总结一下

  1. Reflector利用apiserver的client列举全量对象(版本为0以后的对象全部列举出来)
  2. 将全量对象同步到DeltaFIFO中,并且更新资源的版本号,后续watch会依赖此版本号;
  3. 在后台启动一个定时resync的协程,把全量对象以Update事件的方式通知出去(如果没有设置同步周期,这一步可以不执行);
  4. 基于当前资源版本号watch资源;
  5. 一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,同时更新当前资源的版本号

Controller

流程解析

reflector会持续监听k8s集群中指定资源类型的API,当发现变动和更新时,就会创建一个发生变动的对象的副本,并将其添加到队列DeltaFIFO中。

Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力。

Indexer的接口如下,它继承了Store接口,Store中定义了对对象的增删改查等方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// client-go/tools/cache/index.go
type Indexer interface {
Store
// Retrieve list of objects that match on the named indexing function
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the set of keys that match on the named indexing function.
IndexKeys(indexName, indexKey string) ([]string, error)
// ListIndexFuncValues returns the list of generated values of an Index func
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client-go/tools/cache/store.go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)

// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}

cache实现了Indexer接口,但cache是包内私有的(首字母小写),只能通过包内封装的函数进行调用。

1
2
3
4
5
6
7
8
// client-go/tools/cache/store.go
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// client-go/tools/cache/thread_safe_store.go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}

可以通过NewStore和NewIndexer初始化cache来返回一个Store或Indexer指针(cache实现了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的数据载体为threadSafeMap,threadSafeMap通过NewThreadSafeStore函数初始化。

1
2
3
4
5
6
7
8
9
// client-go/tools/cache/store.go
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client-go/tools/cache/store.go
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

client-go中的很多实现封装都非常规范,index.go中给出了索引相关的操作(接口);store.go中给出了与操作存储相关的接口,并提供了一个cache实现,当然也可以实现自行实现Store接口;thread_safe_store.go为cache的私有实现。

client-go的indexer实际操作的还是threadSafeMap中的方法和数据,调用关系如下:

![image-20231109145326413](/Users/goiruri/Library/Application Support/typora-user-images/image-20231109145326413.png)

可以通过下图理解threadSafeMap中各种索引之间的关系

![image-20231109150211829](/Users/goiruri/Library/Application Support/typora-user-images/image-20231109150211829.png)

  • indexer实际的对象存储在threadSafeMap结构中
  • indexers划分了不同的索引类型(indexName,如namespace),并按照索引类型进行索引(indexFunc,如MetaNamespaceIndexFunc),得出符合该对象的索引键(indexKeys,如namespaces),一个对象在一个索引类型中可能有多个索引键。
  • indices按照索引类型保存了索引(index,如包含所有namespaces下面的obj),进而可以按照索引键找出特定的对象键(keys,如某个namespace下面的对象键),indices用于快速查找对象
  • items按照对象键保存了实际的对象

以namespace作为索引类型为例来讲,首先从indexers获取计算namespace的indexFunc,然后使用该indexFunc计算出与入参对象相关的所有namespaces。indices中保存了所有namespaces下面的对象键,可以获取特定namespace下面的所有对象键,在items中输入特定的对象键就可以得出特定的对象。indexers用于找出与特定对象相关的资源,如找出某Pod相关的secrets。

默认的indexFunc如下,根据对象的namespace进行分类

1
2
3
4
5
6
7
8
// client-go/tools/cache/index.go
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}

cache结构中的keyFunc用于生成objectKey,下面是默认的keyFunc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//client-go/tools/cache/thread_safe_store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}