client-go简介 client-go是一个调用Kubernetes集群资源对象API的客户端,即通过client-go实现对Kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查操作。大部分对Kubernetes进行前置API封装的二次开发都通过client-go这个第三方包来实现。
client-go目录结构 1 2 3 4 5 6 7 8 9 10 11 12 13 . ├── discovery # 定义DsicoveryClient客户端。作用是用于发现k8s所支持GVR(Group, Version, Resources)。 ├── dynamic # 定义DynamicClient客户端。可以用于访问k8s Resources(如: Pod, Deploy...),也可以访问用户自定义资源(即: CRD)。 ├── informers # k8s中各种Resources的Informer机制的实现。 ├── kubernetes # 定义ClientSet客户端。它只能用于访问k8s Resources。每一种资源(如: Pod等)都可以看成是一个客端,而ClientSet是多个客户端的集合,它对RestClient进行了封装,引入了对Resources和Version的管理。通常来说ClientSet是client-gen来自动生成的。 ├── listers # 提供对Resources的获取功能。对于Get()和List()而言,listers提供给二者的数据都是从缓存中读取的。 ├── pkg ├── plugin # 提供第三方插件。如:GCP, OpenStack等。 ├── rest # 定义RestClient,实现了Restful的API。同时会支持Protobuf和Json格式数据。 ├── scale # 定义ScalClient。用于Deploy, RS, RC等的扩/缩容。 ├── tools # 定义诸如SharedInformer、Reflector、DealtFIFO和Indexer等常用工具。实现client查询和缓存机制,减少client与api-server请求次数,减少api-server的压力。 ├── transport └── util # 提供诸如WorkQueue、Certificate等常用方法。
流程
初始化rest.Config对象,用来初始化client。
按需初始化client。
进行具体的crud。
RESTClient是所有客户端的父类,底层调用了Go语言net\http
库,访问API Server的RESTful接口。
ClientSet ClientSet是使用最多的客户端,它继承自RESTClient,使用K8s的代码生成机制(client-gen机制),在编译过程中,会根据目前K8s内置的资源信息,自动生成他们的客户端代码(前提是需要添加适当的注解),使用者可以通过builder pattern进行初始化,得到自己在意的目标资源类型的客户端。ClientSet如同它的名字一样,代表的是一组内置资源的客户端。
1 2 3 clientset, err := kubernetes.NewForConfig(config) mustSuccess(err) podClient := clientset.CoreV1().Pods("development")
DynamicClient DynamiClient
动态客户端,可以根据传入的GVR(group version resource)
生成一个可以操作特定资源的客户端。但是不是内存安全的客户端,返回的结果通常是非结构化的。需要额外经过一次类型转换才能变为目标资源类型的对象,这一步存在内存安全的风险。相比ClientSet
,动态客户端不局限于K8s的内置资源,可以用于处理CRD(custome resource define)
自定义资源,但是缺点在于安全性不高。DynamicClient
使用的样例代码如下:
结构化的类型通常属于k8s runtime object的子类型;非结构化的对象通常是map[string]interface{}的形式,通过一个字典存储对象的属性;K8s所有的内置资源都可以通过代码生成机制,拥有默认的资源转换方法
1 2 3 4 5 6 7 8 9 10 11 12 13 dynamicClient, err := dynamic.NewForConfig(config) mustSuccess(err) gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} unstructObj, err := dynamicClient.Resource(gvr).Namespace("sandbox").List(context.TODO(), metav1.ListOptions{Limit: 40 }) mustSuccess(err) podList := &corev1.PodList{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList) mustSuccess(err) for _, po := range podList.Items { fmt.Printf("NAMESPACE: %v \t NAME: %v \t STATUS: %v \n", po.Namespace, po.Name, po.Status) }
DiscoveryClient DiscoveryClient
发现客户端,主要用于处理向服务端请求当前集群支持的资源信息,例如命令kubectl api-resources
使用的就是发现客户端,由于发现客户端获取的数据量比较大,并且集群的资源信息变更并不频繁,因此发现客户端会在本地建立文件缓存,默认十分钟之内的请求,使用本地缓存,超过十分钟之后则重新请求服务端。DiscoveryClient
的使用样例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) mustSuccess(err) _, APIResourceList, err := discoveryClient.ServerGroupsAndResources() mustSuccess(err) for _, list := range APIResourceList { gv, err := schema.ParseGroupVersion(list.GroupVersion) mustSuccess(err) for _, resource := range list.APIResources { fmt.Printf("name: %v \t group: %v \t verison: %v \n", resource.Name, gv.Group, gv.Version) } }
总结一下
客户端名称
源码目录
简单描述
RESTClient
client-go/rest/
基础客户端,对HTTP Request封装
ClientSet
client-go/kubernetes/
在RESTClient基础上封装了对Resource和Version,也就是说我们使用ClientSet的话是必须要知道Resource和Version, 例如AppsV1().Deployments或者CoreV1.Pods,缺点是不能访问CRD自定义资源
DynamicClient
client-go/dynamic/
包含一组动态的客户端,可以对任意的K8S API对象执行通用操作,包括CRD自定义资源
DiscoveryClient
client-go/discovery/
ClientSet必须要知道Resource和Version, 但使用者通常很难记住所有的GVR信息,这个DiscoveryClient是提供一个发现客户端,发现API Server支持的资源组,资源版本和资源信息
client-go内部原理 

各组件介绍
client-go 组件
Reflector : 定义在 /tools/cache 包内的 Reflector 类型 中的 reflector 监视 Kubernetes API 以获取指定的资源类型 (Kind)。完成此操作的函数是 ListAndWatch。监视可以用于内建资源,也可以用于自定义资源。当 reflector 通过监视 API 的收到关于新资源实例存在的通知时,它使用相应的 listing API 获取新创建的对象,并将其放入 watchHandler 函数内的 Delta Fifo 队列中。
Informer : 在 /tools/cache 包内的基础 controller 中定义的一个 informer 从 Delta FIFO 队列中弹出对象。完成此操作的函数是 processLoop。这个基础 controller 的任务是保存对象以供以后检索,并调用 controller 将对象传递给它。
Indexer : indexer 为对象提供索引功能。它定义在 /tools/cache 包内的 Indexer 类型 。一个典型的索引用例是基于对象标签创建索引。Indexer 可以基于多个索引函数维护索引。Indexer 使用线程安全的数据存储来存储对象及其键值。在 /tools/cache 包内的 Store 类型 定义了一个名为MetaNamespaceKeyFunc
的默认函数,该函数为该对象生成一个名为 <namespace>/<name>
组合的对象键值。
Custom Controller 组件
Informer reference : 这是一个知道如何使用自定义资源对象的 Informer 实例的引用。您的自定义控制器代码需要创建适当的 Informer。
Indexer reference : 这是一个知道如何使用自定义资源对象的 Indexer 实例的引用。您的自定义控制器代码需要创建这个。您将使用此引用检索对象,以便稍后处理。
Resource Event Handlers : 当 Informer 想要分发一个对象给你的控制器时,会调用这些回调函数。编写这些函数的典型模式是获取已分配对象的键值,并将该键值放入一个工作队列中进行进一步处理。
Work queue : 这是在控制器代码中创建的队列,用于将对象的分发与处理解耦。编写 Resource Event Handler 函数来提取所分发对象的键值并将其添加到工作队列中。
Process Item : 这是在代码中创建的处理 work queue 中的 items 的函数。可以有一个或多个其他函数来执行实际的处理。这些函数通常使用 Indexer 引用 或 Listing wrapper 来获取与键值对应的对象。
流程 Indexer Indexer主要依赖于ThreadSafeStore实现,是client-go提供的一种缓存机制,通过检索本地缓存可以有效降低apiserver的压力

Indexer接口 Indexer接口主要是在Store接口的基础上拓展了对象的检索功能:
1 2 3 4 5 6 7 8 9 type Indexer interface { Store Index(indexName string , obj interface {}) ([]interface {}, error ) IndexKeys(indexName, indexedValue string ) ([]string , error ) ListIndexFuncValues(indexName string ) []string ByIndex(indexName, indexedValue string ) ([]interface {}, error ) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
Indexer的默认实现是cache:
1 2 3 4 type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
cache对应两个方法体实现完全一样的New函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 func NewStore (keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } func NewIndexer (keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
ThreadSafeStore ThreadSafeStore是Indexer的核心逻辑所在,Indexer的多数方法是直接调用内部cacheStorage属性的方法实现的,同样先看接口定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type ThreadSafeStore interface { Add(key string , obj interface {}) Update(key string , obj interface {}) Delete(key string ) Get(key string ) (item interface {}, exists bool ) List() []interface {} ListKeys() []string Replace(map [string ]interface {}, string ) Index(indexName string , obj interface {}) ([]interface {}, error ) IndexKeys(indexName, indexKey string ) ([]string , error ) ListIndexFuncValues(name string ) []string ByIndex(indexName, indexKey string ) ([]interface {}, error ) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error }
对应实现:
1 2 3 4 5 6 type threadSafeMap struct { lock sync.RWMutex items map [string ]interface {} indexers Indexers indices Indices }
这里的Indexers和Indices是:
1 2 3 type Index map [string ]sets.Stringtype Indexers map [string ]IndexFunctype Indices map [string ]Index
Indexers 里存的是 Index 函数 map,一个典型的实现是字符串 namespace 作为 key,IndexFunc 类型的实现 MetaNamespaceIndexFunc
函数作为 value,也就是我们希望通过 namespace 来检索时,通过 Indexers 可以拿到对应的计算 Index 的函数,接着拿着这个函数,把对象穿进去,就可以计算出这个对象对应的 key,在这里也就是具体的 namespace 值,比如 default、kube-system 这种。然后在 Indices 里存的也是一个 map,key 是上面计算出来的 default 这种 namespace 值,value 是一个 set,而 set 表示的是这个 default namespace 下的一些具体 pod 的 <namespace>/<name>
这类字符串。最后拿着这种 key,就可以在 items 里检索到对应的对象了。

总结一下 Indexer是Informer实现本地缓存的关键模块。作为Indexer的主要实现,cache
是一个存储在内存中的缓存器,初始化时,会指定keyFunc
,通常会根据对象的资源名与对象名组合成一个唯一的字符串作为对象键。此外,cache
将缓存的维护工作委托给threadSafeMap
来完成,threadSafeMap
内部实现了一套类似MySql覆盖索引、二级索引的存储机制,用户可以自行添加具有特定索引生成方法的二级索引,方便自己的数据存取。
DeltaFIFO DeltaFIFO其实是两个词:Delta+FIFO,Delta表示变化,FIFO则是先入先出的队列。

DeltaFIFO将接收来的资源event转化为特定的变化类型,存储在队列中,周期性的POP出去,分发到事件处理器,并更新Indexer中的本地缓存。
Client-go定义了以下几种变化类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type DeltaType string const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Replaced DeltaType = "Replaced" Sync DeltaType = "Sync" ) type Delta struct { Type DeltaType Object interface {} } type Deltas []Delta
然后看一下Delta_FIFO的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond items map [string ]Deltas queue []string populated bool initialPopulationCount int keyFunc KeyFunc knownObjects KeyListerGetter emitDeltaTypeReplaced bool }
可以用一张图简单描述下Delta_FIFO里面items和queue的关系:

采用这样的结构把对象与事件的存储分离,好处就是不会因为某个对象的事件太多,而导致其他对象的事件一直得不到消费。
Delta_FIFO的核心操作有两个:往队列里面添加元素、从队列中POP元素,可以看下这两个方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface {}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append (f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len (newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append (f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { delete (f.items, id) } return nil } func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface {}, error ) { f.lock.Lock() defer f.lock.Unlock() for { for len (f.queue) == 0 { if f.closed { return nil , ErrFIFOClosed } f.cond.Wait() } id := f.queue[0 ] f.queue = f.queue[1 :] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { continue } delete (f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } return item, err } }
Reflector K8s的设计是事件驱动的、充分微服务化的,我们可以从事件传递的角度重新理解一下K8s:
组件之间互相看作是事件的生产者、消费者,API Server看作是一个只用内存存储事件的Broker,我们可以从消息队列的角度取理解一下,如下图展示的:

k8s服务端通过读取etcd的资源变更信息,向所有客户端发布资源变更事件。k8s中,组件之间通过HTTP协议进行通信,在不额外引入其他中间件的情况下,保证消息传递的实时性、可靠性、顺序性不是一个容易的事情。K8s内部所有的组件都是通过Informer机制实现与API Server的通信的。Informer直译就是消息通知者的意思。
通常一个Informer只会关注一种特定的资源,Reflector负责从API Server拉取&同步该资源类型下所有对象的event。例如,如果当前informer关注Pod资源,那么Reflector会首先list集群中所有的Pod的信息,同步本地的ResourceVersion,之后基于当前的ResourceVerison,使用一个Http长连接Watch集群中Pod资源的事件,并传递到Delta_FIFO模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type Reflector struct { name string expectedTypeName string expectedType reflect.Type expectedGVK *schema.GroupVersionKind store Store listerWatcher ListerWatcher resyncPeriod time.Duration ShouldResync func () bool clock clock.Clock ... }
ResourceVersion是ETCD生成的全局唯一且递增的序号,通过此序号,客户端可以知道目前与服务端信息同步的状态,每次只取大于等于本地ResourceVersion的事件,好处是可以实现事件的全局唯一,实现“断点续传”功能,不用担心本地客户端偶尔出现的网络异常。
可以关注到Reflector
三个比较关键的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 func (r *Reflector) Run(stopCh <-chan struct {}) { klog.V(2 ).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func () { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true , stopCh) klog.V(2 ).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } func (r *Reflector) ListAndWatch(stopCh <-chan struct {}) error { if err := func () error { ... r.setIsLastSyncResourceVersionUnavailable(false ) initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err } go func () { resyncCh, cleanup := r.resyncChan() for { ... cleanup() resyncCh, cleanup = r.resyncChan() } }() for { ... if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ... return nil } ... } } func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string , errc chan error , stopCh <-chan struct {}) error { eventCount := 0 defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } meta, err := meta.Accessor(event.Object) newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) case watch.Modified: err := r.store.Update(event.Object) case watch.Deleted: err := r.store.Delete(event.Object) case watch.Bookmark: default : utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } ... }
总结一下
Reflector利用apiserver的client列举全量对象(版本为0以后的对象全部列举出来)
将全量对象同步到DeltaFIFO中,并且更新资源的版本号,后续watch会依赖此版本号;
在后台启动一个定时resync的协程,把全量对象以Update事件的方式通知出去(如果没有设置同步周期,这一步可以不执行);
基于当前资源版本号watch资源;
一旦有对象发生变化,那么就会根据变化的类型(新增、更新、删除)调用DeltaFIFO的相应接口,同时更新当前资源的版本号
Controller 流程解析 reflector会持续监听k8s集群中指定资源类型的API,当发现变动和更新时,就会创建一个发生变动的对象的副本,并将其添加到队列DeltaFIFO中。
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力。
Indexer的接口如下,它继承了Store接口,Store中定义了对对象的增删改查等方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type Indexer interface { Store Index(indexName string , obj interface {}) ([]interface {}, error ) IndexKeys(indexName, indexKey string ) ([]string , error ) ListIndexFuncValues(indexName string ) []string ByIndex(indexName, indexKey string ) ([]interface {}, error ) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 type Store interface { Add(obj interface {}) error Update(obj interface {}) error Delete(obj interface {}) error List() []interface {} ListKeys() []string Get(obj interface {}) (item interface {}, exists bool , err error ) GetByKey(key string ) (item interface {}, exists bool , err error ) Replace([]interface {}, string ) error Resync() error }
cache实现了Indexer接口,但cache是包内私有的(首字母小写),只能通过包内封装的函数进行调用。
1 2 3 4 5 6 7 8 type cache struct { cacheStorage ThreadSafeStore keyFunc KeyFunc }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type ThreadSafeStore interface { Add(key string , obj interface {}) Update(key string , obj interface {}) Delete(key string ) Get(key string ) (item interface {}, exists bool ) List() []interface {} ListKeys() []string Replace(map [string ]interface {}, string ) Index(indexName string , obj interface {}) ([]interface {}, error ) IndexKeys(indexName, indexKey string ) ([]string , error ) ListIndexFuncValues(name string ) []string ByIndex(indexName, indexKey string ) ([]interface {}, error ) GetIndexers() Indexers AddIndexers(newIndexers Indexers) error Resync() error }
可以通过NewStore和NewIndexer初始化cache来返回一个Store或Indexer指针(cache实现了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的数据载体为threadSafeMap,threadSafeMap通过NewThreadSafeStore函数初始化。
1 2 3 4 5 6 7 8 9 func (c *cache) Add(obj interface {}) error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.cacheStorage.Add(key, obj) return nil }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func NewStore (keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } func NewIndexer (keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
client-go中的很多实现封装都非常规范,index.go中给出了索引相关的操作(接口);store.go中给出了与操作存储相关的接口,并提供了一个cache实现,当然也可以实现自行实现Store接口;thread_safe_store.go为cache的私有实现。
client-go的indexer实际操作的还是threadSafeMap中的方法和数据,调用关系如下:

可以通过下图理解threadSafeMap中各种索引之间的关系

indexer实际的对象存储在threadSafeMap结构中
indexers划分了不同的索引类型(indexName,如namespace),并按照索引类型进行索引(indexFunc,如MetaNamespaceIndexFunc),得出符合该对象的索引键(indexKeys,如namespaces),一个对象在一个索引类型中可能有多个索引键。
indices按照索引类型保存了索引(index,如包含所有namespaces下面的obj),进而可以按照索引键找出特定的对象键(keys,如某个namespace下面的对象键),indices用于快速查找对象
items按照对象键保存了实际的对象
以namespace作为索引类型为例来讲,首先从indexers获取计算namespace的indexFunc,然后使用该indexFunc计算出与入参对象相关的所有namespaces。indices中保存了所有namespaces下面的对象键,可以获取特定namespace下面的所有对象键,在items中输入特定的对象键就可以得出特定的对象。indexers用于找出与特定对象相关的资源,如找出某Pod相关的secrets。
默认的indexFunc如下,根据对象的namespace进行分类
1 2 3 4 5 6 7 8 func MetaNamespaceIndexFunc (obj interface {}) ([]string , error ) { meta, err := meta.Accessor(obj) if err != nil { return []string {"" }, fmt.Errorf("object has no meta: %v" , err) } return []string {meta.GetNamespace()}, nil }
cache结构中的keyFunc用于生成objectKey,下面是默认的keyFunc。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func MetaNamespaceKeyFunc (obj interface {}) (string , error ) { if key, ok := obj.(ExplicitKey); ok { return string (key), nil } meta, err := meta.Accessor(obj) if err != nil { return "" , fmt.Errorf("object has no meta: %v" , err) } if len (meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }