Golang笔记-11-controller-runtime

文章目录

1. 前言

控制器模式是k8s的一个核心设计,它解决了实际状态与期望状态最终保持一致的问题,我们可以在k8s源码中找到大量的控制器实现:github.com/kubernetes/kubernetes/pkg/controller

阅读过这些控制器实现后,我们可以发现一个通用的控制器流程:

  1. 定义控制器对象
  2. 初始化控制器对象:
    1. 初始化recorder用于记录事件
    2. 初始化informer用于监听资源变动
    3. 初始化lister用于获取资源
    4. 初始化cache用于缓存资源
    5. 初始化workqueue用于缓存资源变动
  3. 运行控制器:
    1. 等待informer缓存同步
    2. 启动worker处理资源变动

这些基础代码占据了控制器很大一部分,最初接触k8s开发时,笔者都是选择复制粘贴基础代码,学会使用模板后,就通过模板生成代码,而现在我们有了更好的选择:controller-runtime

2. controller-runtime

controller-runtime,顾名思义,即控制器运行时,它将我们平时需要编写的大量基础代码加以封装,对外暴露出几个主要的组件,假设我们需要监听集群中的Pod变动,只需要以下代码即可:

 1func main() {
 2	entryLog := log.Log.WithName("entrypoint")
 3
 4	// 创建一个manager
 5	entryLog.Info("setting up manager")
 6	mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
 7	if err != nil {
 8		entryLog.Error(err, "unable to set up overall controller manager")
 9		os.Exit(1)
10	}
11
12	// 创建一个controller来同步pod
13	entryLog.Info("Setting up controller")
14	c, err := controller.New("pod-controller", mgr, controller.Options{
15		Reconciler: &reconcilePod{client: mgr.GetClient()},
16	})
17	if err != nil {
18		entryLog.Error(err, "unable to set up individual controller")
19		os.Exit(1)
20	}
21
22	// 监控Pod变动并将key写入workqueue
23	if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil {
24		entryLog.Error(err, "unable to watch Pod")
25		os.Exit(1)
26	}
27
28	// 启动manager
29	entryLog.Info("starting manager")
30	if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
31		entryLog.Error(err, "unable to run manager")
32		os.Exit(1)
33	}
34}
35
36type reconcilePod struct {
37	// client 用于从APIServer获取资源对象
38	client client.Client
39}
40
41// 检查自定义控制器是否实现Reconcile方法
42var _ reconcile.Reconciler = &reconcilePod{}
43
44func (r *reconcilePod) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
45	// 初始化log
46	log := log.FromContext(ctx)
47
48	// 从缓存中获取Pod
49	pod := &corev1.Pod{}
50	err := r.client.Get(ctx, request.NamespacedName, rs)
51	if errors.IsNotFound(err) {
52		log.Error(nil, "Could not find Pod")
53		return reconcile.Result{}, nil
54	}
55
56	if err != nil {
57		return reconcile.Result{}, fmt.Errorf("could not fetch Pod: %+v", err)
58	}
59
60	// 打印Pod
61	log.Info("Reconciling Pod", "pod name", pod.Name)
62
63	return reconcile.Result{}, nil
64}

我们可以看到,原先复杂的准备工作现在已经简化为几个步骤:

  1. 创建manager
  2. 创建controller,添加需要监控的资源
  3. 实现Reconcile方法,处理资源变动

2.1 架构

下面是kubebuilder文档中借来的一张controller-runtime架构图:

2.2 Manager

下面是manager的定义:

 1// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
 2// A Manager is required to create Controllers.
 3type Manager interface {
 4	// Cluster holds a variety of methods to interact with a cluster.
 5	cluster.Cluster
 6
 7	// Add will set requested dependencies on the component, and cause the component to be
 8	// started when Start is called.  Add will inject any dependencies for which the argument
 9	// implements the inject interface - e.g. inject.Client.
10	// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
11	// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
12	Add(Runnable) error
13
14	// Elected is closed when this manager is elected leader of a group of
15	// managers, either because it won a leader election or because no leader
16	// election was configured.
17	Elected() <-chan struct{}
18
19	// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
20	// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
21	// sensitive and shouldn't be exposed publicly.
22	// If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
23	// Runnable to the manager via Add method.
24	AddMetricsExtraHandler(path string, handler http.Handler) error
25
26	// AddHealthzCheck allows you to add Healthz checker
27	AddHealthzCheck(name string, check healthz.Checker) error
28
29	// AddReadyzCheck allows you to add Readyz checker
30	AddReadyzCheck(name string, check healthz.Checker) error
31
32	// Start starts all registered Controllers and blocks until the context is cancelled.
33	// Returns an error if there is an error starting any controller.
34	//
35	// If LeaderElection is used, the binary must be exited immediately after this returns,
36	// otherwise components that need leader election might continue to run after the leader
37	// lock was lost.
38	Start(ctx context.Context) error
39
40	// GetWebhookServer returns a webhook.Server
41	GetWebhookServer() *webhook.Server
42
43	// GetLogger returns this manager's logger.
44	GetLogger() logr.Logger
45
46	// GetControllerOptions returns controller global configuration options.
47	GetControllerOptions() v1alpha1.ControllerConfigurationSpec
48}

完成manager初始化后,我们就可以获得以下能力:

  1. 与集群交互的能力,cluster.Cluster
  2. 添加多个自定义controller的能力
  3. 选举能力,利用k8s的APIServer接口实现服务高可用
  4. metrics服务,可用于对接普罗米修斯来采集指标
  5. 健康检查与就绪检查
  6. webhook服务,用于设置资源默认值和检查资源是否合规
  7. 统一的logger

下面是cluster.Cluster的定义:

 1// Cluster provides various methods to interact with a cluster.
 2type Cluster interface {
 3	// SetFields will set any dependencies on an object for which the object has implemented the inject
 4	// interface - e.g. inject.Client.
 5	// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
 6	SetFields(interface{}) error
 7
 8	// GetConfig returns an initialized Config
 9	GetConfig() *rest.Config
10
11	// GetScheme returns an initialized Scheme
12	GetScheme() *runtime.Scheme
13
14	// GetClient returns a client configured with the Config. This client may
15	// not be a fully "direct" client -- it may read from a cache, for
16	// instance.  See Options.NewClient for more information on how the default
17	// implementation works.
18	GetClient() client.Client
19
20	// GetFieldIndexer returns a client.FieldIndexer configured with the client
21	GetFieldIndexer() client.FieldIndexer
22
23	// GetCache returns a cache.Cache
24	GetCache() cache.Cache
25
26	// GetEventRecorderFor returns a new EventRecorder for the provided name
27	GetEventRecorderFor(name string) record.EventRecorder
28
29	// GetRESTMapper returns a RESTMapper
30	GetRESTMapper() meta.RESTMapper
31
32	// GetAPIReader returns a reader that will be configured to use the API server.
33	// This should be used sparingly and only when the client does not fit your
34	// use case.
35	GetAPIReader() client.Reader
36
37	// Start starts the cluster
38	Start(ctx context.Context) error
39}

我们可以在这里看到常用的client和recorder,其中client可以认为是对client-go的封装,支持CRUD操作,recorder直接引用了client-go中定义的EventRecorder,此外还支持自定义缓存和索引。

2.3 Client

 1// Client knows how to perform CRUD operations on Kubernetes objects.
 2type Client interface {
 3	Reader
 4	Writer
 5	StatusClient
 6
 7	// Scheme returns the scheme this client is using.
 8	Scheme() *runtime.Scheme
 9	// RESTMapper returns the rest this client is using.
10	RESTMapper() meta.RESTMapper
11}

controller-runtime的Client结合了GVK和GVR的优点,以更新一个node的注解为例,我们实现一个patchNodeAnnotaions方法:

1func (c *nodeController) patchNodeAnnotaions(ctx context.Context, curNode *v1.Node, nodeIP string) error {
2	newNode := curNode.DeepCopy()
3	newNode.Spec = curNode.Spec
4	if newNode.Annotations == nil {
5		newNode.Annotations = make(map[string]string)
6	}
7	newNode.Annotations[nodeIPAnnotation] = nodeIP
8	return c.client.Patch(ctx, newNode, client.StrategicMergeFrom(curNode))
9}

这里只需要复制节点,修改注解,然后使用指定的merge方式调用Patch方法即可。

使用GVK时,我们需要从clientset获取node的具体方法,再为node实现一个patch函数,假设需要操作数十种不同资源时,我们需要为每一种资源实现各自的函数。

使用GVR时,资源操作接口(Get/Delete/Patch...)是统一的,但GVR列表、Unstructured类型的序列化/反序列化、patchData的生成等等仍旧需要我们自行处理。

而使用controller-runtime的Client时,我们只需要提供资源名(Namespace/Name)、资源类型(结构体指针),即可开始CRUD操作。

2.4 Controller

 1// Controller implements a Kubernetes API.  A Controller manages a work queue fed reconcile.Requests
 2// from source.Sources.  Work is performed through the reconcile.Reconciler for each enqueued item.
 3// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
 4// in the object Spec.
 5type Controller interface {
 6	// Reconciler is called to reconcile an object by Namespace/Name
 7	reconcile.Reconciler
 8
 9	// Watch takes events provided by a Source and uses the EventHandler to
10	// enqueue reconcile.Requests in response to the events.
11	//
12	// Watch may be provided one or more Predicates to filter events before
13	// they are given to the EventHandler.  Events will be passed to the
14	// EventHandler if all provided Predicates evaluate to true.
15	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
16
17	// Start starts the controller.  Start blocks until the context is closed or a
18	// controller has an error starting.
19	Start(ctx context.Context) error
20
21	// GetLogger returns this controller logger prefilled with basic information.
22	GetLogger() logr.Logger
23}

Watch

Watch函数支持三种资源监听类型,通过定义 eventhandler 实现:

  • EnqueueRequestForObject:资源变动时将资源key加入workqueue,例如直接监听Pod变动
  • EnqueueRequestForOwner:资源变动时将资源owner的key加入workqueue,例如在Pod变动时,若Pod的Owner为ReplicaSet,则通知ReplicaSet发生了资源变动
  • EnqueueRequestsFromMapFunc:定义一个关联函数,资源变动时生成一组reconcile.Request,例如在集群扩容时添加了Node,通知一组对象发生了资源变动

此外还可以自定义 predicates,用于过滤资源,例如只监听指定命名空间、包含指定注解或标签的资源。

reconcile.Reconciler

 1
 2// Result contains the result of a Reconciler invocation.
 3type Result struct {
 4	// Requeue tells the Controller to requeue the reconcile key.  Defaults to false.
 5	Requeue bool
 6
 7	// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
 8	// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
 9	RequeueAfter time.Duration
10}
11
12type Reconciler interface {
13	// Reconcile performs a full reconciliation for the object referred to by the Request.
14	// The Controller will requeue the Request to be processed again if an error is non-nil or
15	// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
16	Reconcile(context.Context, Request) (Result, error)
17}

控制器的Reconcile方法是我们实现业务逻辑的核心,在完成controller初始化,添加好需要监控的资源后,我们就可以接收到资源变动通知。

Reconcile方法执行结束时,若返回的error不为nil或者Result的Requeue为true,将会触发重试,重试延迟与workqueue类型有关,和我们手动实现controller时一样。

2.5 Webhook

Webhook提供了两个功能:

  1. 设置资源默认值:例如Pod不存在key为app的注解时,添加一个key为app,value为custom的注解
  2. 校验资源是否合规:例如检查Pod是否存在一个key为app,value为custom的注解,否则禁止后续操作

标准资源通常已经有了对应的Webhook,CRD则需要用户自定实现这两个Webhook来确保数据的完整性,例如用户编辑资源删除关键参数时返回错误,减少Reconcile阶段的数据检查工作。

3. 最后

从重复代码到复杂框架,经常是经历:复制粘贴、模板生成、封装,越往后便屏蔽越多的细节。

框架减少了我们的编码量,提高了开发效率,笔者改造了一个内部DNS程序后,代码量直接减少60%以上,但框架也带来更多学习成本和不确定的bug,笔者也是在过完几遍controller-runtime后才开始着手改造项目,而另一位同事则是选择定时轮询资源,每个人都有自己的喜好,一切还是以结果为准吧。

但在充满不确定性的开发工作中,笔者认为,有众人维护的开源框架的可靠性总是大于自行实现的轮子。