Kine是k3s-io为实现轻量化部署kubernetes而实现的一个支持kubernetes api-server直接读写,数据持久化到SQLite、MySQL、Postgres等后端的中间件。

Kubernetes api-server使用etcd作为后端存储集群的元数据,并且其controller机制强依赖etcd的watch机制。

Kine实现对etcd的替代,完整实现了kubernetes api-server用到的etcd接口。

透过kine这个轻量的中间件,既可以深入部分etcd原理,又可以了解kubernetes核心controller机制背后的支撑。

Etcd到Kine

所谓知己知彼,要替换etcd,首先就要了解etcd的实现本身。

如图,etcd核心分三个模块:

  • gRPC KV Server 基于gRPC的接口层
  • Raft 实现强一致的Raft日志层
  • mvccdb 基于boltdb的mvcc存储层

Etcd的读写事务请求,客户端通过gRPC发送给etcd server,etcd server形成raft日志提交给raft集群,最后etcd server apply日志,持久化数据到boltdb。

Etcd实现可靠和高性能watch,一方面基于gRPC/http2网络,一方面是数据存储的mvcc机制。

Kine实现了部分etcd相同接口,gRPC Server部分复用了etcd的gRPC代码,没有实现Raft日志模块,但是抽象了Backend层对接后端不同存储Driver,存储Driver需要实现支持mvcc的读写。

Kine Server

kine复用了etcd的go.etcd.io/etcd/api/v3/etcdserverpb所有代码实现etcd gRPC Server。

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/server.go

type KVServerBridge struct {
    limited *LimitedServer
}

...

func (k *KVServerBridge) Register(server *grpc.Server) {
    ...
    etcdserverpb.RegisterWatchServer(server, k)
    etcdserverpb.RegisterKVServer(server, k)
}

KVServerBridge 实现了gRPC功能接口函数。

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/kv.go

func (k *KVServerBridge) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
   ...
    resp, err := k.limited.Range(ctx, r)
    if err != nil {
        logrus.Errorf("error while range on %s %s: %v", r.Key, r.RangeEnd, err)
        return nil, err
    }
    ...
    return rangeResponse, nil
}

LimitedServer对接Backend存储接口


// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/limited.go

type LimitedServer struct {
    backend Backend
    scheme  string
}

func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {
    if len(r.RangeEnd) == 0 {
        return l.get(ctx, r)
    }
    return l.list(ctx, r)
}

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/types.go

type Backend interface {
    Start(ctx context.Context) error
    Get(ctx context.Context, key, rangeEnd string, limit, revision int64) (int64, *KeyValue, error)
    Create(ctx context.Context, key string, value []byte, lease int64) (int64, error)
    Delete(ctx context.Context, key string, revision int64) (int64, *KeyValue, bool, error)
    List(ctx context.Context, prefix, startKey string, limit, revision int64) (int64, []*KeyValue, error)
    Count(ctx context.Context, prefix string) (int64, int64, error)
    Update(ctx context.Context, key string, value []byte, revision, lease int64) (int64, *KeyValue, bool, error)
    Watch(ctx context.Context, key string, revision int64) <-chan []*Event
    DbSize(ctx context.Context) (int64, error)
}


//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/get.go

func (l *LimitedServer) get(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) {
    if r.Limit != 0 && len(r.RangeEnd) != 0 {
        return nil, fmt.Errorf("invalid combination of rangeEnd and limit, limit should be 0 got %d", r.Limit)
    }
    rev, kv, err := l.backend.Get(ctx, string(r.Key), string(r.RangeEnd), r.Limit, r.Revision)
   ...
    return resp, nil
}

Kine Backend

kine在pkg/logstructuredpkg/drivers中实现对接各种后端存储。

LogStructured实现Server的Backend接口,通过Log接口和底层Driver对接。


// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/logstructured.go

type Log interface {
    Start(ctx context.Context) error
    CurrentRevision(ctx context.Context) (int64, error)
    List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
    After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
    Watch(ctx context.Context, prefix string) <-chan []*server.Event
    Count(ctx context.Context, prefix string) (int64, int64, error)
    Append(ctx context.Context, event *server.Event) (int64, error)
    DbSize(ctx context.Context) (int64, error)
}

type LogStructured struct {
    log Log
}

func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
   ...
    rev, err = l.log.Append(ctx, updateEvent)
    ...
    return rev, updateEvent.KV, true, err
}

Backend最终向Driver通过Log.Append使用Event结构存储数据。

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/types.go

type KeyValue struct {
    Key            string
    CreateRevision int64
    ModRevision    int64
    Value          []byte
    Lease          int64
}


type Event struct {
    Delete bool
    Create bool
    KV     *KeyValue
    PrevKV *KeyValue
}

kine中EventKeyValue设计了和Etcd中keyIndex和mvccpb.KeyValue类似的可以记录版本信息的数据结构。存储Driver在执行数据存储动作时候,需要对接相关字段。