kine中的存储Driver最终需要对接Log
接口,以SQLite为例,分析一下,kine如何基于关系数据库实现的mvccdb。
// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/logstructured.go
type Log interface {
Start(ctx context.Context) error
CurrentRevision(ctx context.Context) (int64, error)
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
Watch(ctx context.Context, prefix string) <-chan []*server.Event
Count(ctx context.Context, prefix string) (int64, int64, error)
Append(ctx context.Context, event *server.Event) (int64, error)
DbSize(ctx context.Context) (int64, error)
}
表设计和字段映射
// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/sqlite/sqlite.go
CREATE TABLE IF NOT EXISTS kine
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name INTEGER,
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
lease INTEGER,
value BLOB,
old_value BLOB
)
// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/types.go
type KeyValue struct {
Key string
CreateRevision int64
ModRevision int64
Value []byte
Lease int64
}
type Event struct {
Delete bool
Create bool
KV *KeyValue
PrevKV *KeyValue
}
Event | 表字段 |
---|---|
KV.ModRevision | id |
KV.Key | name |
KV.Create | created |
KV.Delete | deleted |
KV.CreateRevision | create_revision |
PrevKV.ModRevision | prev_revision |
KV.Lease | lease |
KV.Value | value |
PrevKV.Value | old_value |
- 主键自增id为该记录KeyValue的Revision。
- KeyValue创建记录created=1、deleted=0、create_revision=0、prev_revision=0。
- KeyValue更新记录create_revison为created=1记录id,prev_revision为前记录id。
- KeyValue删除记录deleted=1,create_revison=创建记录id,prev_revision=前记录id。
- value字段和old_value字段记录,当前版本和上一版本KeyValue值内容。
MVCC实现
查询Key
获取当前版本KeyValue即:查询name为key,最新且deleted=0的记录。
SELECT *
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id FROM kine AS mkv WHERE mkv.name LIKE ? GROUP BY mkv.name
) AS maxkv ON maxkv.id = kv.id
WHERE
kv.deleted = 0
写入Key
乐观无锁、无事务并发安全写入:name和prev_revision建立联合唯一索引。
所有Log.Append
执行写入Event
均携带PrevKV
信息,PrevKV.ModRevision
作为新记录的prev_revision
字段,因为kine_name_prev_revision_uindex
联合唯一索引限制并发将冲突报错,仅有一条写入成功。
// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/sqlite/sqlite.go
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/logstructured.go
func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
...
rev, event, err := l.get(ctx, key, "", 1, 0, false)
...
updateEvent := &server.Event{
KV: &server.KeyValue{
Key: key,
CreateRevision: event.KV.CreateRevision,
Value: value,
Lease: lease,
},
PrevKV: event.KV,
}
rev, err = l.log.Append(ctx, updateEvent)
...
}
获取全局Revision
表最大id作为全局Revision。
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/generic/generic.go
SELECT MAX(rkv.id) AS id
FROM kine AS rkv
获取全局CompactRevision
内置keycompact_rev_key
记录已压缩版本。
// https://github.com/k3s-io/kine/blob/v0.9.8/pkg/drivers/generic/generic.go
SELECT MAX(crkv.prev_revision) AS prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'
获取Key列表
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/sqllog/sql.go
func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (int64, []*server.Event, error) {
...
//获取Key列表
if revision == 0 {
rows, err = s.d.ListCurrent(ctx, prefix, limit, includeDeleted)
} else {
rows, err = s.d.List(ctx, prefix, startKey, limit, revision, includeDeleted)
}
if err != nil {
return 0, nil, err
}
//处理版本已压缩返回
if revision > 0 && len(result) == 0 {
// a zero length result won't have the compact revision so get it manually
compact, err = s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
}
if revision > 0 && revision < compact {
return rev, result, server.ErrCompacted
}
...
}
SELECT *
FROM (
SELECT (rev), (compactRev), columns
FROM kine AS kv
JOIN (
SELECT MAX(mkv.id) AS id
FROM kine AS mkv
WHERE
mkv.name LIKE [?keyPrefix]
GROUP BY mkv.name) AS maxkv
ON maxkv.id = kv.id
WHERE
kv.deleted = 0 OR
[?includeDeleted]
) AS lkv
ORDER BY lkv.theid ASC
LIMIT [?limit]
Watch实现
Watch gRPC接口复用etcd的代码,重新实现Handle函数。
SQLLog
实现Watch内部,通过读写事件和定时轮询(默认一秒)实现Watch事件推送。
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/server/watch.go
func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
w := watcher{
server: ws,
backend: s.limited.backend,
watches: map[int64]func(){},
}
...
for {
...
if msg.GetCreateRequest() != nil {
w.Start(ws.Context(), msg.GetCreateRequest())
} else if msg.GetCancelRequest() != nil {
...
}
}
}
...
func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) {
...
go func() {
...
for events := range w.backend.Watch(ctx, key, r.StartRevision) {
...
}
...
}()
}
//https://github.com/k3s-io/kine/blob/v0.9.8/pkg/logstructured/sqllog/sql.go
func (s *SQLLog) Watch(ctx context.Context, prefix string) <-chan []*server.Event {
//broadcastern对象内部回调startWatch
values, err := s.broadcaster.Subscribe(ctx, s.startWatch)
...
return res
}
func (s *SQLLog) startWatch() (chan interface{}, error) {
pollStart, err := s.d.GetCompactRevision(s.ctx)
...
go s.compactor(compactInterval)
go s.poll(c, pollStart)
return c, nil
}
func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
...
wait := time.NewTicker(time.Second)
defer wait.Stop()
defer close(result)
for {
if waitForMore {
select {
case <-s.ctx.Done():
return
case check := <-s.notify:
if check <= last {
continue
}
case <-wait.C:
}
}
...
}
}