Controller and CustomResourceDefinitions

Kubernetes的Custom Resource Definition(CRD)和Controller机制是Kubernetes提供的一种扩展Kubernetes API的方法,可以用于自定义Kubernetes资源类型以及对这些资源的控制。

CRD是一种自定义的Kubernetes资源类型,它允许用户在Kubernetes中创建新的资源类型。CRD通过Kubernetes API Server暴露出来,使得用户可以使用kubectl或其他工具对其进行管理。

Controller是CRD的另一个重要组成部分,它实现了对CRD资源的控制逻辑。Controller会根据CRD资源的状态变化来触发相应的操作,比如创建、更新、删除等。Controller通常会通过watch机制来监听CRD资源的变化,并根据实际情况对其进行处理。CRD和Controller机制为Kubernetes提供了更强大的扩展能力,使得用户可以更好地适应不同场景的需求。

Extend Kubernetes with CustomResourceDefinitions.

  • 定义、创建自定义资源
# crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: foos.samplecontroller.k8s.io
spec:
  group: samplecontroller.k8s.io
  version: v1alpha1
  names:
    kind: Foo
    plural: foos
  scope: Namespaced

kubectl apply -f crd.yaml
  • Kubectl操作自定义资源
kubectl get foo
kubectl watch foo
  • Client-go操作自定义资源
import (
    "context"

    samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
    clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
    // 获取 kubernetes 集群的 rest.Config 对象
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err)
    }
    
    // 创建 CRD 客户端
    cs, err := clientset.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // 创建自定义对象 foo
    foo := &samplecontrollerv1alpha1.Foo{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "example-foo",
            Namespace: "default",
        },
        Spec: samplecontrollerv1alpha1.FooSpec{
            Size: 3,
        },
    }

    // 创建或更新自定义对象 foo
    result, err := cs.SamplecontrollerV1alpha1().Foos("default").Create(context.Background(), foo, metav1.CreateOptions{})
    if err != nil {
        result, err = cs.SamplecontrollerV1alpha1().Foos("default").Update(context.Background(), foo, metav1.UpdateOptions{})
        if err != nil {
            panic(err)
        }
    }

    // 删除自定义对象 foo
    err = cs.SamplecontrollerV1alpha1().Foos("default").Delete(context.Background(), foo.Name, metav1.DeleteOptions{})
    if err != nil {
        panic(err)
    }
}

Write Controller with client-go

Controller Architecture

CRD Client code generate

k8s.io/code-generator 是Kubernetes官方提供的一种代码生成工具。这个工具可以根据定义好的CRD来自动生成客户端代码,包括类型定义、列表和单个对象的获取、创建、更新和删除等操作的接口,以及相关实现代码。

通过使用k8s.io/code-generator, 我们可以避免手动编写大量重复的代码,从而提高开发效率和代码质量。

SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}

bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
k8s.io/sample-controller/pkg/generated k8s.io/sample-controller/pkg/apis \
samplecontroller:v1alpha1 \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# pkg/generated
├── clientset
│   └── versioned
│       ├── clientset.go
│       ├── doc.go
│       ├── fake
│       │   ├── clientset_generated.go
│       │   ├── doc.go
│       │   └── register.go
│       ├── scheme
│       │   ├── doc.go
│       │   └── register.go
│       └── typed
│           └── samplecontroller
│               └── v1alpha1
│                   ├── doc.go
│                   ├── fake
│                   │   ├── doc.go
│                   │   ├── fake_foo.go
│                   │   └── fake_samplecontroller_client.go
│                   ├── foo.go
│                   ├── generated_expansion.go
│                   └── samplecontroller_client.go
├── informers
│   └── externalversions
│       ├── factory.go
│       ├── generic.go
│       ├── internalinterfaces
│       │   └── factory_interfaces.go
│       └── samplecontroller
│           ├── interface.go
│           └── v1alpha1
│               ├── foo.go
│               └── interface.go
└── listers
    └── samplecontroller
        └── v1alpha1
            ├── expansion_generated.go
            └── foo.go

Client-go components

  • Reflector: 定义在type Reflector inside package cache,
    通过Kubernetes API监控特定资源,资源可以为kubernetes内建资源也可以是自定义资源.
    当reflector收到来自API的资源更新或创建通知,它将通过相应的API创建一个对象,并将它推入Delta Fifo队列.

  • Informer: 定义在base controller inside package cache,
    它从Delta Fifo队列弹出一个对象保存下来,同时调用我们的controller并传递该对象.

  • Indexer: 定义在type Indexer inside package cache.
    为对象提供索引功能,一个典型的使用场景是基于对象的labels创建索引.
    Indexer维护索引通过一系列的索引函数,并使用一个线程安全的池子保存对象和它们的key.

Custom Controller components

  • Informer reference: Informer实例引用,
    需要我们在我们的controller代码中自己创建.

  • Indexer reference: Indexer实例引用,
    需要我们在我们的controller代码中自己创建,我们在执行处理过程中通过它来取回对象.

  • Resource Event Handlers: 一系列调函数,通过它们Informer传递对象给我们的controller.
    一个典型的模式为该回调函数获得对象的key后将对象的key推入workqueue为接下来的处理.

  • Work queue: 解耦对象传递和对象处理过程的队列.

  • Process Item: 对象具体的处理过程: 一般会使用Indexer reference或者Listing包装函数通过对象key取回对象.

import (
    "context"
    
    samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
    clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
    informers "k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1"
    listers "k8s.io/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1"

    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
)

type Controller struct {
    // 客户端,用于操作CRD
    clientset clientset.Interface
    
    // Informer 机制,确保 CRD 数据的及时获取和同步到内存中
    fooInformer informers.FooInformer
    fooLister   listers.FooLister
    
    // 工作队列,用于控制协程数目和并发度
    workqueue workqueue.RateLimitingInterface
    
    // 自定义对象转换接口
    scheme *runtime.Scheme
}

func NewController(clientset clientset.Interface, fooInformer informers.FooInformer) *Controller {
    c := &Controller{
        clientset: clientset,
        fooInformer: fooInformer,
        fooLister: fooInformer.Lister(),
        workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        scheme: runtime.NewScheme(),
    }
    // 注册自定义 API 对象到 scheme 中
    samplecontrollerv1alpha1.AddToScheme(c.scheme)
    // 绑定各种事件触发的回调函数
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: c.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            c.enqueueFoo(new)
        },
        DeleteFunc: c.enqueueFooForDelete,
    })
    return c
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
    defer c.workqueue.ShutDown()

    if !cache.WaitForCacheSync(stopCh, c.fooInformer.Informer().HasSynced) {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    // 启动多个协程,处理工作队列中待处理的任务
    for i := 0; i < numWorkers; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    <-stopCh
    return nil
}

func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    defer c.workqueue.Done(obj)

    key := obj.(string)
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        c.workqueue.Forget(obj)
        return true
    }

    foo, err := c.fooLister.Foos(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            c.workqueue.Forget(obj)
            return true
        }
        c.workqueue.AddRateLimited(obj)
        return true
    }

    // TODO: 根据业务逻辑处理自定义对象 foo
    
    // 处理完一个任务,将其从队列中删除
    c.workqueue.Forget(obj)
    return true
}

func (c *Controller) enqueueFoo(obj interface{}) {
    foo := obj.(*samplecontrollerv1alpha1.Foo)
    key, err := cache.MetaNamespaceKeyFunc(foo)
    if err != nil {
        return
    }
    c.workqueue.Add(key)
}

func (c *Controller) enqueueFooForDelete(obj interface{}) {
    foo, ok := obj.(*samplecontrollerv1alpha1.Foo)
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            return
        }
        foo, ok = tombstone.Obj.(*samplecontrollerv1alpha1.Foo)
        if !ok {
            return
        }
    }
    key, err := cache.MetaNamespaceKeyFunc(foo)
    if err == nil {
        c.workqueue.Add(key)
    }
}

Controller Runtime

Controller-runtime是Kubernetes官方提供的一种Go语言工具包,用于编写自定义控制器。该工具包基于Kubernetes API机制,封装了常见的控制器开发模式,如Watch Reconcile、Leader Election、Event Emitting等。

Controller-runtime为Kubernetes控制器的开发提供了很多便利,使得开发者可以更加专注于核心业务逻辑的实现。

Controller-runtime Architecture

  1. Manager:管理器

Manager 是整个框架的核心,其负责以下几个任务:

  • 创建并启动各种控制器(Controller)
  • 启动 Web 服务器,提供健康检查和指标监控等服务
  • 提供客户端(Client)实例,用于操作 kubernetes API
  • 可以通过配置文件或环境变量来自定义控制器运行参数
  1. Client:客户端

Client 是操作 kubernetes API 的客户端,通常情况下我们使用 kubernetes/client-go 库即可,但是在 controller-runtime 框架中,它对原生的 client-go 库进行了封装,提供了更加易用的接口,同时还支持对多个版本的 API 对象进行操作。

  1. Cache:缓存

Cache 是用于缓存 kubernetes API 对象的组件,它可以高效地从 kubernetes API 中获取对象,并将其同步到内存中,以便控制器快速地获取和处理对象。

  1. Controller:控制器

Controller 是控制器的核心组件,用于监听和处理 kubernetes API 对象的变化,并在需要时调用 Reconciler 进行协调。

  1. Reconciler:协调器

Reconciler 是一个接口,用于协调处理 kubernetes API 对象的状态。在 controller-runtime 中,每个控制器都必须关联一个 Reconciler 实现,以便在对象出现变化时调用该 Reconciler 进行处理。Reconciler 接口定义如下:

type Reconciler interface {
    // 计算出与期望状态不同的部分
    Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
}

在 Reconcile 函数中,我们可以根据业务逻辑计算出当前对象的期望状态,然后与 kubernetes API 中的实际状态进行比较,如果两者不同,则更新实际状态,否则直接返回。

Cotroller-runtime 提供了一套完整的架构,封装了大量的功能代码,让开发 controller 变得更加简单方便。在使用 controller-runtime 构建 controller 时,我们继续只需要关注 Reconciler接口的实现即可。

Write Controller with controller-runtime

import (
    "context"

    samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/apimachinery/pkg/api/errors"

    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

func main() {
    // 获取 kubernetes 集群的 rest.Config 对象
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err)
    }

    // 创建 kubernetes API 对象的编解码器
    scheme := runtime.NewScheme()
    samplecontrollerv1alpha1.AddToScheme(scheme)

    // 创建 manager 实例
    mgr, err := ctrl.NewManager(config, ctrl.Options{
        Scheme: scheme,
    })
    if err != nil {
        panic(err)
    }

    // 创建 Reconciler 实现
    reconciler := &FooReconciler{
        client: mgr.GetClient(),
        scheme: scheme,
    }

    // 创建控制器对象并注册到manager
    ctrl.NewControllerManagedBy(mgr).
        For(&samplecontrollerv1alpha1.Foo{}).
        WithEventFilter(predicate.GenerationChangedPredicate{}).
        Complete(reconciler)

    // 启动 manager,开始运行 controller
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        panic(err)
    }
}

// FooReconciler 实现了 Reconciler 接口
type FooReconciler struct {
    client client.Client
    scheme *runtime.Scheme
}

func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 获取 kubernetes API 对象
    foo := &samplecontrollerv1alpha1.Foo{}
    if err := r.client.Get(ctx, req.NamespacedName, foo); err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }

    // 更新或创建 kubernetes API 对象
    if err := controllerutil.SetControllerReference(foo, foo, r.scheme); err != nil {
        return ctrl.Result{}, err
    }
    _, err := r.client.CreateOrUpdate(ctx, foo)
    if err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}