kine中的存储Driver最终需要对接Log接口,以SQLite为例,分析一下,kine如何基于关系数据库实现的mvccdb。

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/logstructured.go

type Log interface {
    Start(ctx context.Context) error
    CurrentRevision(ctx context.Context) (int64, error)
    List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
    After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
    Watch(ctx context.Context, prefix string) <-chan []*server.Event
    Count(ctx context.Context, prefix string) (int64, int64, error)
    Append(ctx context.Context, event *server.Event) (int64, error)
    DbSize(ctx context.Context) (int64, error)
}

表设计和字段映射

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/sqlite/sqlite.go
CREATE TABLE IF NOT EXISTS kine
(
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name INTEGER,
    created INTEGER,
    deleted INTEGER,
    create_revision INTEGER,
    prev_revision INTEGER,
    lease INTEGER,
    value BLOB,
    old_value BLOB
)

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/types.go

type KeyValue struct {
    Key            string
    CreateRevision int64
    ModRevision    int64
    Value          []byte
    Lease          int64
}

type Event struct {
    Delete bool
    Create bool
    KV     *KeyValue
    PrevKV *KeyValue
}
Event 表字段
KV.ModRevision id
KV.Key name
KV.Create created
KV.Delete deleted
KV.CreateRevision create_revision
PrevKV.ModRevision prev_revision
KV.Lease lease
KV.Value value
PrevKV.Value old_value
  • 主键自增id为该记录KeyValue的Revision。
  • KeyValue创建记录created=1、deleted=0、create_revision=0、prev_revision=0。
  • KeyValue更新记录create_revison为created=1记录id,prev_revision为前记录id。
  • KeyValue删除记录deleted=1,create_revison=创建记录id,prev_revision=前记录id。
  • value字段和old_value字段记录,当前版本和上一版本KeyValue值内容。

MVCC实现

查询Key

获取当前版本KeyValue即:查询name为key,最新且deleted=0的记录。

SELECT *
FROM kine AS kv
JOIN (
   SELECT MAX(mkv.id) AS id FROM kine AS mkv WHERE mkv.name LIKE ? GROUP BY mkv.name
) AS maxkv ON maxkv.id = kv.id
WHERE
kv.deleted = 0

写入Key

乐观无锁、无事务并发安全写入:name和prev_revision建立联合唯一索引。

所有Log.Append执行写入Event均携带PrevKV信息,PrevKV.ModRevision作为新记录的prev_revision字段,因为kine_name_prev_revision_uindex联合唯一索引限制并发将冲突报错,仅有一条写入成功。

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/sqlite/sqlite.go

`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/logstructured.go

func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
	...
    rev, event, err := l.get(ctx, key, "", 1, 0, false)
	...
    updateEvent := &server.Event{
        KV: &server.KeyValue{
            Key:            key,
            CreateRevision: event.KV.CreateRevision,
            Value:          value,
            Lease:          lease,
        },
        PrevKV: event.KV,
    }

    rev, err = l.log.Append(ctx, updateEvent)
   ...
}

获取全局Revision

表最大id作为全局Revision。

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/generic/generic.go

SELECT MAX(rkv.id) AS id
FROM kine AS rkv

获取全局CompactRevision

内置keycompact_rev_key记录已压缩版本。

// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/generic/generic.go

SELECT MAX(crkv.prev_revision) AS prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'

获取Key列表

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/sqllog/sql.go

func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []*server.Event, error) {
  	...
	//获取Key列表
    if revision == 0 {
        rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted)
    } else {
        rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted)
    }
    if err != nil {
        return 0, nil, err
    }

	//处理版本已压缩返回
    if revision > 0 && len(result) == 0 {
        // a zero length result won't have the compact revision so get it manually
        compact, err = s.d.GetCompactRevision(ctx)
        if err != nil {
            return 0, nil, err
        }
    }

    if revision > 0 && revision < compact {
        return rev, result, server.ErrCompacted
    }
	...
}
SELECT *
FROM (
	SELECT (rev), (compactRev), columns
	FROM kine AS kv
	JOIN (
		SELECT MAX(mkv.id) AS id
		FROM kine AS mkv
		WHERE
			mkv.name LIKE [?keyPrefix]
		GROUP BY mkv.name) AS maxkv
		ON maxkv.id = kv.id
	WHERE
		kv.deleted = 0 OR
		[?includeDeleted]
) AS lkv
ORDER BY lkv.theid ASC
LIMIT [?limit]

Watch实现

Watch gRPC接口复用etcd的代码,重新实现Handle函数。

SQLLog实现Watch内部,通过读写事件和定时轮询(默认一秒)实现Watch事件推送。

//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/watch.go

func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
	w := watcher{
		server:  ws,
		backend: s.limited.backend,
		watches: map[int64]func(){},
	}
	...
	for {
		...
		if msg.GetCreateRequest() != nil {
			w.Start(ws.Context(), msg.GetCreateRequest())
		} else if msg.GetCancelRequest() != nil {
			...
		}
	}
}
...

func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) {
	...
	go func() {
		...
		for events := range w.backend.Watch(ctx, key, r.StartRevision) {
			...
		}
		...
	}()
}
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/sqllog/sql.go

func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
    //broadcastern对象内部回调startWatch
    values, err := s.broadcaster.Subscribe(ctx, s.startWatch)
	...
    return res
}

func (s *SQLLog) startWatch() (chan interface{}, error) {
    pollStart, err := s.d.GetCompactRevision(s.ctx)
   ...
    go s.compactor(compactInterval)
    go s.poll(c, pollStart)
    return c, nil
}

func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
	...
    wait := time.NewTicker(time.Second)
    defer wait.Stop()
    defer close(result)

    for {
        if waitForMore {
            select {
            case <-s.ctx.Done():
                return
            case check := <-s.notify:
                if check <= last {
                    continue
                }
            case <-wait.C:
            }
        }
		...
	}
}