Golang笔记-11-controller-runtime
文章目录
1. 前言
控制器模式是k8s的一个核心设计,它解决了实际状态与期望状态最终保持一致的问题,我们可以在k8s源码中找到大量的控制器实现:github.com/kubernetes/kubernetes/pkg/controller。
阅读过这些控制器实现后,我们可以发现一个通用的控制器流程:
- 定义控制器对象
- 初始化控制器对象:
- 初始化recorder用于记录事件
- 初始化informer用于监听资源变动
- 初始化lister用于获取资源
- 初始化cache用于缓存资源
- 初始化workqueue用于缓存资源变动
- 运行控制器:
- 等待informer缓存同步
- 启动worker处理资源变动
这些基础代码占据了控制器很大一部分,最初接触k8s开发时,我都是选择复制粘贴基础代码,学会使用模板后,就通过模板生成代码,而现在我们有了更好的选择:controller-runtime
2. controller-runtime
controller-runtime,顾名思义,即控制器运行时,它将我们平时需要编写的大量基础代码加以封装,对外暴露出几个主要的组件,假设我们需要监听集群中的Pod变动,只需要以下代码即可:
1func main() {
2 entryLog := log.Log.WithName("entrypoint")
3
4 // 创建一个manager
5 entryLog.Info("setting up manager")
6 mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
7 if err != nil {
8 entryLog.Error(err, "unable to set up overall controller manager")
9 os.Exit(1)
10 }
11
12 // 创建一个controller来同步pod
13 entryLog.Info("Setting up controller")
14 c, err := controller.New("pod-controller", mgr, controller.Options{
15 Reconciler: &reconcilePod{client: mgr.GetClient()},
16 })
17 if err != nil {
18 entryLog.Error(err, "unable to set up individual controller")
19 os.Exit(1)
20 }
21
22 // 监控Pod变动并将key写入workqueue
23 if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil {
24 entryLog.Error(err, "unable to watch Pod")
25 os.Exit(1)
26 }
27
28 // 启动manager
29 entryLog.Info("starting manager")
30 if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
31 entryLog.Error(err, "unable to run manager")
32 os.Exit(1)
33 }
34}
35
36type reconcilePod struct {
37 // client 用于从APIServer获取资源对象
38 client client.Client
39}
40
41// 检查自定义控制器是否实现Reconcile方法
42var _ reconcile.Reconciler = &reconcilePod{}
43
44func (r *reconcilePod) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
45 // 初始化log
46 log := log.FromContext(ctx)
47
48 // 从缓存中获取Pod
49 pod := &corev1.Pod{}
50 err := r.client.Get(ctx, request.NamespacedName, rs)
51 if errors.IsNotFound(err) {
52 log.Error(nil, "Could not find Pod")
53 return reconcile.Result{}, nil
54 }
55
56 if err != nil {
57 return reconcile.Result{}, fmt.Errorf("could not fetch Pod: %+v", err)
58 }
59
60 // 打印Pod
61 log.Info("Reconciling Pod", "pod name", pod.Name)
62
63 return reconcile.Result{}, nil
64}
我们可以看到,原先复杂的准备工作现在已经简化为几个步骤:
- 创建manager
- 创建controller,添加需要监控的资源
- 实现Reconcile方法,处理资源变动
2.1 架构
下面是kubebuilder文档中借来的一张controller-runtime架构图:
2.2 Manager
下面是manager的定义:
1// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
2// A Manager is required to create Controllers.
3type Manager interface {
4 // Cluster holds a variety of methods to interact with a cluster.
5 cluster.Cluster
6
7 // Add will set requested dependencies on the component, and cause the component to be
8 // started when Start is called. Add will inject any dependencies for which the argument
9 // implements the inject interface - e.g. inject.Client.
10 // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
11 // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
12 Add(Runnable) error
13
14 // Elected is closed when this manager is elected leader of a group of
15 // managers, either because it won a leader election or because no leader
16 // election was configured.
17 Elected() <-chan struct{}
18
19 // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
20 // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
21 // sensitive and shouldn't be exposed publicly.
22 // If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
23 // Runnable to the manager via Add method.
24 AddMetricsExtraHandler(path string, handler http.Handler) error
25
26 // AddHealthzCheck allows you to add Healthz checker
27 AddHealthzCheck(name string, check healthz.Checker) error
28
29 // AddReadyzCheck allows you to add Readyz checker
30 AddReadyzCheck(name string, check healthz.Checker) error
31
32 // Start starts all registered Controllers and blocks until the context is cancelled.
33 // Returns an error if there is an error starting any controller.
34 //
35 // If LeaderElection is used, the binary must be exited immediately after this returns,
36 // otherwise components that need leader election might continue to run after the leader
37 // lock was lost.
38 Start(ctx context.Context) error
39
40 // GetWebhookServer returns a webhook.Server
41 GetWebhookServer() *webhook.Server
42
43 // GetLogger returns this manager's logger.
44 GetLogger() logr.Logger
45
46 // GetControllerOptions returns controller global configuration options.
47 GetControllerOptions() v1alpha1.ControllerConfigurationSpec
48}
完成manager初始化后,我们就可以获得以下能力:
- 与集群交互的能力,cluster.Cluster
- 添加多个自定义controller的能力
- 选举能力,利用k8s的APIServer接口实现服务高可用
- metrics服务,可用于对接普罗米修斯来采集指标
- 健康检查与就绪检查
- webhook服务,用于设置资源默认值和检查资源是否合规
- 统一的logger
下面是cluster.Cluster的定义:
1// Cluster provides various methods to interact with a cluster.
2type Cluster interface {
3 // SetFields will set any dependencies on an object for which the object has implemented the inject
4 // interface - e.g. inject.Client.
5 // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
6 SetFields(interface{}) error
7
8 // GetConfig returns an initialized Config
9 GetConfig() *rest.Config
10
11 // GetScheme returns an initialized Scheme
12 GetScheme() *runtime.Scheme
13
14 // GetClient returns a client configured with the Config. This client may
15 // not be a fully "direct" client -- it may read from a cache, for
16 // instance. See Options.NewClient for more information on how the default
17 // implementation works.
18 GetClient() client.Client
19
20 // GetFieldIndexer returns a client.FieldIndexer configured with the client
21 GetFieldIndexer() client.FieldIndexer
22
23 // GetCache returns a cache.Cache
24 GetCache() cache.Cache
25
26 // GetEventRecorderFor returns a new EventRecorder for the provided name
27 GetEventRecorderFor(name string) record.EventRecorder
28
29 // GetRESTMapper returns a RESTMapper
30 GetRESTMapper() meta.RESTMapper
31
32 // GetAPIReader returns a reader that will be configured to use the API server.
33 // This should be used sparingly and only when the client does not fit your
34 // use case.
35 GetAPIReader() client.Reader
36
37 // Start starts the cluster
38 Start(ctx context.Context) error
39}
我们可以在这里看到常用的client和recorder,其中client可以认为是对client-go的封装,支持CRUD操作,recorder直接引用了client-go中定义的EventRecorder,此外还支持自定义缓存和索引。
2.3 Client
1// Client knows how to perform CRUD operations on Kubernetes objects.
2type Client interface {
3 Reader
4 Writer
5 StatusClient
6
7 // Scheme returns the scheme this client is using.
8 Scheme() *runtime.Scheme
9 // RESTMapper returns the rest this client is using.
10 RESTMapper() meta.RESTMapper
11}
controller-runtime的Client结合了GVK和GVR的优点,以更新一个node的注解为例,我们实现一个patchNodeAnnotaions方法:
1func (c *nodeController) patchNodeAnnotaions(ctx context.Context, curNode *v1.Node, nodeIP string) error {
2 newNode := curNode.DeepCopy()
3 newNode.Spec = curNode.Spec
4 if newNode.Annotations == nil {
5 newNode.Annotations = make(map[string]string)
6 }
7 newNode.Annotations[nodeIPAnnotation] = nodeIP
8 return c.client.Patch(ctx, newNode, client.StrategicMergeFrom(curNode))
9}
这里只需要复制节点,修改注解,然后使用指定的merge方式调用Patch方法即可。
使用GVK时,我们需要从clientset获取node的具体方法,再为node实现一个patch函数,假设需要操作数十种不同资源时,我们需要为每一种资源实现各自的函数。
使用GVR时,资源操作接口(Get/Delete/Patch...)是统一的,但GVR列表、Unstructured类型的序列化/反序列化、patchData的生成等等仍旧需要我们自行处理。
而使用controller-runtime的Client时,我们只需要提供资源名(Namespace/Name)、资源类型(结构体指针),即可开始CRUD操作。
2.4 Controller
1// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
2// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
3// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
4// in the object Spec.
5type Controller interface {
6 // Reconciler is called to reconcile an object by Namespace/Name
7 reconcile.Reconciler
8
9 // Watch takes events provided by a Source and uses the EventHandler to
10 // enqueue reconcile.Requests in response to the events.
11 //
12 // Watch may be provided one or more Predicates to filter events before
13 // they are given to the EventHandler. Events will be passed to the
14 // EventHandler if all provided Predicates evaluate to true.
15 Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
16
17 // Start starts the controller. Start blocks until the context is closed or a
18 // controller has an error starting.
19 Start(ctx context.Context) error
20
21 // GetLogger returns this controller logger prefilled with basic information.
22 GetLogger() logr.Logger
23}
Watch
Watch函数支持三种资源监听类型,通过定义 eventhandler 实现:
- EnqueueRequestForObject:资源变动时将资源key加入workqueue,例如直接监听Pod变动
- EnqueueRequestForOwner:资源变动时将资源owner的key加入workqueue,例如在Pod变动时,若Pod的Owner为ReplicaSet,则通知ReplicaSet发生了资源变动
- EnqueueRequestsFromMapFunc:定义一个关联函数,资源变动时生成一组reconcile.Request,例如在集群扩容时添加了Node,通知一组对象发生了资源变动
此外还可以自定义 predicates,用于过滤资源,例如只监听指定命名空间、包含指定注解或标签的资源。
reconcile.Reconciler
1
2// Result contains the result of a Reconciler invocation.
3type Result struct {
4 // Requeue tells the Controller to requeue the reconcile key. Defaults to false.
5 Requeue bool
6
7 // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
8 // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
9 RequeueAfter time.Duration
10}
11
12type Reconciler interface {
13 // Reconcile performs a full reconciliation for the object referred to by the Request.
14 // The Controller will requeue the Request to be processed again if an error is non-nil or
15 // Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
16 Reconcile(context.Context, Request) (Result, error)
17}
控制器的Reconcile方法是我们实现业务逻辑的核心,在完成controller初始化,添加好需要监控的资源后,我们就可以接收到资源变动通知。
Reconcile方法执行结束时,若返回的error不为nil或者Result的Requeue为true,将会触发重试,重试延迟与workqueue类型有关,和我们手动实现controller时一样。
2.5 Webhook
Webhook提供了两个功能:
- 设置资源默认值:例如Pod不存在key为app的注解时,添加一个key为app,value为custom的注解
- 校验资源是否合规:例如检查Pod是否存在一个key为app,value为custom的注解,否则禁止后续操作
标准资源通常已经有了对应的Webhook,CRD则需要用户自定实现这两个Webhook来确保数据的完整性,例如用户编辑资源删除关键参数时返回错误,减少Reconcile阶段的数据检查工作。
3. 最后
从重复代码到复杂框架,经常是经历:复制粘贴、模板生成、封装,越往后便屏蔽越多的细节。
框架减少了我们的编码量,提高了开发效率,我改造了一个内部DNS程序后,代码量直接减少60%以上,但框架也带来更多学习成本和不确定的bug,我也是在过完几遍controller-runtime后才开始着手改造项目,而另一位同事则是选择定时轮询资源,每个人都有自己的喜好,一切还是以结果为准吧。
但在充满不确定性的开发工作中,我认为,有众人维护的开源框架的可靠性总是大于自行实现的轮子。