在 Project1 中,需要构建一个单机键值存储的 gRPC 服务器,并且支持 column familly(CF),CF 类似于一个命名空间,不同 CF 中的同名的键是不同的,可以简单的将不同的CF理解成不同的mini databases。
目标与层次结构
Project1主要是实现kv
中的部分代码来完成单机KV数据库服务器Server
。它的层次结构如下图所示:
Server结构体为:
1 | type Server struct { |
它有RawGet()
、RawPut()
、RawDelete()
、RawScan()
几个主要的方法,借助Storage
的Reader()
和Write()
方法来实现。
Storage是一个接口,它的定义如下 1
2
3
4
5
6type Storage interface {
Start() error
Stop() error
Write(ctx *kvrpcpb.Context, batch []Modify) error
Reader(ctx *kvrpcpb.Context) (StorageReader, error)
}Storage
接口的一个实现,它的定义如下:
1 | type StandAloneStorage struct { |
它Reader()
和Write()
方法的实现用到了engine_util.Engines
的方法,engine_util.Engines
实际上是对 badger键值API的一个封装:
1 | type Engines struct { |
对Engines
中键值的获取、插入、删除等操作,以及事务、迭代器BadgerIterator
都在kv/util/engine_util
中实现,Storage
的读写操作都需要依赖这些API实现。
代码实现
了解完整体的目标和实现逻辑后,就可以实现具体的代码了。
首先我们定义好单机存储StandAloneStorage
结构体: 1
2
3
4
5type StandAloneStorage struct {
// Your Data Here (1).
engines *engine_util.Engines
config *config.Config
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func NewStandAloneStorage(conf *config.Config) *StandAloneStorage {
// Your Code Here (1).
kvPath := conf.DBPath + "/kv"
raftPath := conf.DBPath + "/raft"
kvEngine := engine_util.CreateDB(kvPath, false)
var raftEngine *badger.DB
if conf.Raft {
raftEngine = engine_util.CreateDB(raftPath, true)
}
engines := engine_util.NewEngines(kvEngine, raftEngine, kvPath, raftPath)
return &StandAloneStorage{
engines: engines,
config: conf,
}
}Storage
接口,完成Start()
、Stop()
、Reader()
、Write()
四个函数,为Server提供基本的读写操作。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37func (s *StandAloneStorage) Start() error {
// Your Code Here (1).
return nil
}
func (s *StandAloneStorage) Stop() error {
// Your Code Here (1).
return s.engines.Close()
}
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
// Your Code Here (1).
txn := s.engines.Kv.NewTransaction(false)
return &StandAloneStorageReader{
txn: txn,
}, nil
}
func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
// Your Code Here (1).
for _, modify := range batch {
switch modify.Data.(type) {
case storage.Put:
err := engine_util.PutCF(s.engines.Kv, modify.Cf(), modify.Key(), modify.Value())
if err != nil {
return err
}
case storage.Delete:
err := engine_util.DeleteCF(s.engines.Kv, modify.Cf(), modify.Key())
if err != nil {
return err
}
}
}
return nil
}Reader
的实现需要用到badger.Txn
,它为我们提供了键值的一致性快照,可以随时供之后读取。为此,我们要实现StorageReader
接口: 1
2
3
4
5
6type StorageReader interface {
// When the key doesn't exist, return nil for the value
GetCF(cf string, key []byte) ([]byte, error)
IterCF(cf string) engine_util.DBIterator
Close()
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19type StandAloneStorageReader struct {
txn *badger.Txn
}
func (s *StandAloneStorageReader) GetCF(cf string, key []byte) ([]byte, error) {
value, err := engine_util.GetCFFromTxn(s.txn, cf, key)
if err == badger.ErrKeyNotFound {
return nil, nil
}
return value, err
}
func (s *StandAloneStorageReader) IterCF(cf string) engine_util.DBIterator {
return engine_util.NewCFIterator(cf, s.txn)
}
func (s *StandAloneStorageReader) Close() {
s.txn.Discard()
}StandAloneStorage
后,便可以实现Server的RawGet()
、RawPut()
、RawDelete()
、RawScan()
四个方法,完成用于的各种数据库操作请求。
1 | func (server *Server) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { |
测试
完成以上代码后可在根目录下运行 make project1
来对代码进行测试。
在代码测试中可以帮助我们解决很多bug,比如有
- 在获取错误的字符串时使用了
err.Error()
,但是err
可能为空,需要进行判断 - 在实现
StandAloneStorageReader
的GetCF
函数时,如果出现了ErrKeyNotFound
错误,需要返回空值而不是返回错误 - 迭代器
BadgerIterator
的Valid
函数是判断当前位置是否有效,而不是判断Next是否有效