Controller and CustomResourceDefinitions
Kubernetes的Custom Resource Definition(CRD)和Controller机制是Kubernetes提供的一种扩展Kubernetes API的方法,可以用于自定义Kubernetes资源类型以及对这些资源的控制。
CRD是一种自定义的Kubernetes资源类型,它允许用户在Kubernetes中创建新的资源类型。CRD通过Kubernetes API Server暴露出来,使得用户可以使用kubectl或其他工具对其进行管理。
Controller是CRD的另一个重要组成部分,它实现了对CRD资源的控制逻辑。Controller会根据CRD资源的状态变化来触发相应的操作,比如创建、更新、删除等。Controller通常会通过watch机制来监听CRD资源的变化,并根据实际情况对其进行处理。CRD和Controller机制为Kubernetes提供了更强大的扩展能力,使得用户可以更好地适应不同场景的需求。
Extend Kubernetes with CustomResourceDefinitions.
- 定义、创建自定义资源
# crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: foos.samplecontroller.k8s.io
spec:
group: samplecontroller.k8s.io
version: v1alpha1
names:
kind: Foo
plural: foos
scope: Namespaced
kubectl apply -f crd.yaml
- Kubectl操作自定义资源
kubectl get foo
kubectl watch foo
- Client-go操作自定义资源
import (
"context"
samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
// 获取 kubernetes 集群的 rest.Config 对象
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
// 创建 CRD 客户端
cs, err := clientset.NewForConfig(config)
if err != nil {
panic(err)
}
// 创建自定义对象 foo
foo := &samplecontrollerv1alpha1.Foo{
ObjectMeta: metav1.ObjectMeta{
Name: "example-foo",
Namespace: "default",
},
Spec: samplecontrollerv1alpha1.FooSpec{
Size: 3,
},
}
// 创建或更新自定义对象 foo
result, err := cs.SamplecontrollerV1alpha1().Foos("default").Create(context.Background(), foo, metav1.CreateOptions{})
if err != nil {
result, err = cs.SamplecontrollerV1alpha1().Foos("default").Update(context.Background(), foo, metav1.UpdateOptions{})
if err != nil {
panic(err)
}
}
// 删除自定义对象 foo
err = cs.SamplecontrollerV1alpha1().Foos("default").Delete(context.Background(), foo.Name, metav1.DeleteOptions{})
if err != nil {
panic(err)
}
}
Write Controller with client-go
Controller Architecture
CRD Client code generate
k8s.io/code-generator 是Kubernetes官方提供的一种代码生成工具。这个工具可以根据定义好的CRD来自动生成客户端代码,包括类型定义、列表和单个对象的获取、创建、更新和删除等操作的接口,以及相关实现代码。
通过使用k8s.io/code-generator, 我们可以避免手动编写大量重复的代码,从而提高开发效率和代码质量。
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \
k8s.io/sample-controller/pkg/generated k8s.io/sample-controller/pkg/apis \
samplecontroller:v1alpha1 \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \
--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# pkg/generated
├── clientset
│ └── versioned
│ ├── clientset.go
│ ├── doc.go
│ ├── fake
│ │ ├── clientset_generated.go
│ │ ├── doc.go
│ │ └── register.go
│ ├── scheme
│ │ ├── doc.go
│ │ └── register.go
│ └── typed
│ └── samplecontroller
│ └── v1alpha1
│ ├── doc.go
│ ├── fake
│ │ ├── doc.go
│ │ ├── fake_foo.go
│ │ └── fake_samplecontroller_client.go
│ ├── foo.go
│ ├── generated_expansion.go
│ └── samplecontroller_client.go
├── informers
│ └── externalversions
│ ├── factory.go
│ ├── generic.go
│ ├── internalinterfaces
│ │ └── factory_interfaces.go
│ └── samplecontroller
│ ├── interface.go
│ └── v1alpha1
│ ├── foo.go
│ └── interface.go
└── listers
└── samplecontroller
└── v1alpha1
├── expansion_generated.go
└── foo.go
Client-go components
-
Reflector: 定义在type Reflector inside package cache,
通过Kubernetes API监控特定资源,资源可以为kubernetes内建资源也可以是自定义资源.
当reflector收到来自API的资源更新或创建通知,它将通过相应的API创建一个对象,并将它推入Delta Fifo队列. -
Informer: 定义在base controller inside package cache,
它从Delta Fifo队列弹出一个对象保存下来,同时调用我们的controller并传递该对象. -
Indexer: 定义在type Indexer inside package cache.
为对象提供索引功能,一个典型的使用场景是基于对象的labels创建索引.
Indexer维护索引通过一系列的索引函数,并使用一个线程安全的池子保存对象和它们的key.
Custom Controller components
-
Informer reference: Informer实例引用,
需要我们在我们的controller代码中自己创建. -
Indexer reference: Indexer实例引用,
需要我们在我们的controller代码中自己创建,我们在执行处理过程中通过它来取回对象. -
Resource Event Handlers: 一系列调函数,通过它们Informer传递对象给我们的controller.
一个典型的模式为该回调函数获得对象的key后将对象的key推入workqueue为接下来的处理. -
Work queue: 解耦对象传递和对象处理过程的队列.
-
Process Item: 对象具体的处理过程: 一般会使用Indexer reference或者Listing包装函数通过对象key取回对象.
import (
"context"
samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1"
listers "k8s.io/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
// 客户端,用于操作CRD
clientset clientset.Interface
// Informer 机制,确保 CRD 数据的及时获取和同步到内存中
fooInformer informers.FooInformer
fooLister listers.FooLister
// 工作队列,用于控制协程数目和并发度
workqueue workqueue.RateLimitingInterface
// 自定义对象转换接口
scheme *runtime.Scheme
}
func NewController(clientset clientset.Interface, fooInformer informers.FooInformer) *Controller {
c := &Controller{
clientset: clientset,
fooInformer: fooInformer,
fooLister: fooInformer.Lister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
scheme: runtime.NewScheme(),
}
// 注册自定义 API 对象到 scheme 中
samplecontrollerv1alpha1.AddToScheme(c.scheme)
// 绑定各种事件触发的回调函数
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
c.enqueueFoo(new)
},
DeleteFunc: c.enqueueFooForDelete,
})
return c
}
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.workqueue.ShutDown()
if !cache.WaitForCacheSync(stopCh, c.fooInformer.Informer().HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}
// 启动多个协程,处理工作队列中待处理的任务
for i := 0; i < numWorkers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
return nil
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
key := obj.(string)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
c.workqueue.Forget(obj)
return true
}
foo, err := c.fooLister.Foos(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
c.workqueue.Forget(obj)
return true
}
c.workqueue.AddRateLimited(obj)
return true
}
// TODO: 根据业务逻辑处理自定义对象 foo
// 处理完一个任务,将其从队列中删除
c.workqueue.Forget(obj)
return true
}
func (c *Controller) enqueueFoo(obj interface{}) {
foo := obj.(*samplecontrollerv1alpha1.Foo)
key, err := cache.MetaNamespaceKeyFunc(foo)
if err != nil {
return
}
c.workqueue.Add(key)
}
func (c *Controller) enqueueFooForDelete(obj interface{}) {
foo, ok := obj.(*samplecontrollerv1alpha1.Foo)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
foo, ok = tombstone.Obj.(*samplecontrollerv1alpha1.Foo)
if !ok {
return
}
}
key, err := cache.MetaNamespaceKeyFunc(foo)
if err == nil {
c.workqueue.Add(key)
}
}
Controller Runtime
Controller-runtime是Kubernetes官方提供的一种Go语言工具包,用于编写自定义控制器。该工具包基于Kubernetes API机制,封装了常见的控制器开发模式,如Watch Reconcile、Leader Election、Event Emitting等。
Controller-runtime为Kubernetes控制器的开发提供了很多便利,使得开发者可以更加专注于核心业务逻辑的实现。
Controller-runtime Architecture
- Manager:管理器
Manager 是整个框架的核心,其负责以下几个任务:
- 创建并启动各种控制器(Controller)
- 启动 Web 服务器,提供健康检查和指标监控等服务
- 提供客户端(Client)实例,用于操作 kubernetes API
- 可以通过配置文件或环境变量来自定义控制器运行参数
- Client:客户端
Client 是操作 kubernetes API 的客户端,通常情况下我们使用 kubernetes/client-go 库即可,但是在 controller-runtime 框架中,它对原生的 client-go 库进行了封装,提供了更加易用的接口,同时还支持对多个版本的 API 对象进行操作。
- Cache:缓存
Cache 是用于缓存 kubernetes API 对象的组件,它可以高效地从 kubernetes API 中获取对象,并将其同步到内存中,以便控制器快速地获取和处理对象。
- Controller:控制器
Controller 是控制器的核心组件,用于监听和处理 kubernetes API 对象的变化,并在需要时调用 Reconciler 进行协调。
- Reconciler:协调器
Reconciler 是一个接口,用于协调处理 kubernetes API 对象的状态。在 controller-runtime 中,每个控制器都必须关联一个 Reconciler 实现,以便在对象出现变化时调用该 Reconciler 进行处理。Reconciler 接口定义如下:
type Reconciler interface {
// 计算出与期望状态不同的部分
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
}
在 Reconcile 函数中,我们可以根据业务逻辑计算出当前对象的期望状态,然后与 kubernetes API 中的实际状态进行比较,如果两者不同,则更新实际状态,否则直接返回。
Cotroller-runtime 提供了一套完整的架构,封装了大量的功能代码,让开发 controller 变得更加简单方便。在使用 controller-runtime 构建 controller 时,我们继续只需要关注 Reconciler接口的实现即可。
Write Controller with controller-runtime
import (
"context"
samplecontrollerv1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func main() {
// 获取 kubernetes 集群的 rest.Config 对象
config, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
// 创建 kubernetes API 对象的编解码器
scheme := runtime.NewScheme()
samplecontrollerv1alpha1.AddToScheme(scheme)
// 创建 manager 实例
mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
})
if err != nil {
panic(err)
}
// 创建 Reconciler 实现
reconciler := &FooReconciler{
client: mgr.GetClient(),
scheme: scheme,
}
// 创建控制器对象并注册到manager
ctrl.NewControllerManagedBy(mgr).
For(&samplecontrollerv1alpha1.Foo{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(reconciler)
// 启动 manager,开始运行 controller
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
panic(err)
}
}
// FooReconciler 实现了 Reconciler 接口
type FooReconciler struct {
client client.Client
scheme *runtime.Scheme
}
func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 获取 kubernetes API 对象
foo := &samplecontrollerv1alpha1.Foo{}
if err := r.client.Get(ctx, req.NamespacedName, foo); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// 更新或创建 kubernetes API 对象
if err := controllerutil.SetControllerReference(foo, foo, r.scheme); err != nil {
return ctrl.Result{}, err
}
_, err := r.client.CreateOrUpdate(ctx, foo)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}