本文为MIT 6.824 Lab3的实战经验总结。

项目地址:https://github.com/TimeSea05/mit-6.824

Lab3A的第一个任务是在Lab2的基础上,在没有丢包、服务器崩溃的情况下,实现键值存储系统。在理清思路后,实现起来相对比较容易。Clerk会向集群中的KV server发送RPC请求,来从集群中获取某个key对应的值(Get),或者修改数据库中的key/value pair (PutAppend)。当一个非Leader的KV server收到来自Clerk的RPC请求时,应该返回错误ErrWrongLeader;Clerk在接收到来自KV server的错误信息后,向另外一个KV server发送同样的RPC请求,直到收到RPC请求的KV server为集群中的Leader,此时Leader会将该请求加入Log,并在集群中开始同步,同步完成后返回成功。

实际上,为了提高效率,在RPC返回成功时,Clerk应该记录下刚才向哪一个KV server发送RPC请求,也就是记录下那一个KV server是集群当中的Leader,这样在发送下一个RPC请求时,首先向之前记录下的KV server发送请求,这样就会大大减少搜索Leader浪费的时间。

Lab3A的第二个任务是,在存在丢包、服务器崩溃的情况下,实现键值存储系统。丢包会带来很多问题,比如Clerk向KV server发送PutAppend RPC请求,server返回成功,并对数据库做了相应的操作,但是请求在返回过程中丢失,导致Clerk重发请求,这会导致同样的请求执行两次或者多次,显然是违背常理的。

为了解决这个问题,就要实现raft-extended paper中的Linearizable Sematics:

Our goal for Raft is to implement linearizable semantics (each operation appears to execute instantaneously, exactly once, at some point between its invocation and its response).

Raft的作者在文中其实给出了一些解决方案,但当时自己没有认真读论文,闷着头自己想解决办法,最后失败而不得已求助于互联网,才知道论文里面已经给出了一些方案:

The solution is for clients to assign unique serial numbers to every command. Then, the state machine tracks the latest serial number processed for each client, along with the associated response. If it receives a command whose serial number has already been executed, it responds immediately without re-executing the request.

对每一个RPC请求进行编号,并且记录下已经执行过的操作的序号;如果执行过了,那么就直接返回,不再对数据库进行相应的操作。

但其实原文中给出的解决方案相对来说还是比较抽象和含糊的,落实到代码实现上还是没有什么头绪。于是我在互联网上继续寻找,找到了Raft高性能库dragonboat作者在知乎上的回答。看了他的回答,感觉如同醍醐灌顶,思路瞬间明晰了。

和Raft论文中所述的方案一样,需要为每一个RPC设置一个线性增长的ID,同时还需要为每一个Clerk设置一个ID。有了这两个ID之后,可以区分任意两个RPC请求。dragonboat作者回答的另外一个核心是,每一个RPC请求,可以被commit多次,在log中出现多次,但永远只会被apply一次。也就是说,在集群收到来自Clerk的RPC之后,我们不需要对其进行其他的处理,直接将其加入到Leader的Log中,开始同步即可;但是在读取被commit的log entry时,也就是要对数据库进行操作时,需要判断该log entry (op)是否已经被状态机执行过了,如果执行过,那么就忽略此次commit。这样就实现了幂等。

之前我想破头也想不出实现Linearizable Semantics的原因是,我是在集群刚刚收到来自Clerk RPC时,判断RPC请求执行的操作是否已经被执行过了,这样会带来一系列的问题和Bug,最终放弃。但是将上面的判断逻辑放到对数据库进行操作处之后,实现难度大大降低了。

首先来看KVServer的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

maxraftstate int // snapshot if log grows this big

// Your definitions here.
term int // current term of raft peer
commitIndex int // committed log index of raft peer
cond *sync.Cond
db map[string]string

// clerkID -> `RPCID` of lastly executed RPC call
lastExecuted map[int]int

// waiting RPC calls
waitingReqs []ClientRequest
}

核心结构如上所示,为了实现线性语义,我添加了两个字段:对于每一个Clerk,lastExecuted[ClerkID]用来记录已经执行过的RPC ID的最大值;waitingReqs用来记录等待在cond上的RPC请求,用来精确唤醒线程。

其他的一些数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
Type string // Get, Put or Append
Key string
Value string
ClerkID int
RPCID int
}

type ClientRequest struct {
ClerkID int
RPCID int
}

PutAppend RPC Handler为例,来说明整个键值存储系统的运行过程。

首先需要判断当前的KV server是否是Leader,如果不是则返回ErrWrongLeader,因为在Raft中,只有Leader能够发起同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
kv.mu.Lock()

term, isLeader := kv.rf.GetState()
if !isLeader {
reply.Err = ErrWrongLeader
raft.DebugLog(raft.DReplyPutOrAppend, kv.me, "Reject: %s", reply.Err)
kv.mu.Unlock()
return
}
// ...
}

之后需要判断term的值是否发生了变化;如果是,那么就代表集群中的term发生了改变,该KV server为新选举出来的Leader;无论该KV server之前是否担任过Leader,都应该调用kv.cond.Broadcast将等待在条件变量上的超时线程全部唤醒,来处理新的RPC请求,同时kv.waitingReq数组也应该清空。Clerk会丢弃来自这些超时线程的应答。

1
2
3
4
5
6
7
8
9
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// ...
if kv.term < term {
kv.term = term
kv.cond.Broadcast()
kv.waitingReqs = nil
}
// ...
}

此后调用kv.rf.Start将op记录到日志中,然后在条件变量kv.cond上等待,直到该op被commit之后被唤醒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// ...
op := Op{
Type: args.Op,
Key: args.Key,
Value: args.Value,
ClerkID: args.ClerkID,
RPCID: args.RPCID,
}

kv.waitingReqs = append(kv.waitingReqs, ClientRequest{ClerkID: args.ClerkID, RPCID: args.RPCID})
// logging about RPC calls waiting on this key-value server
waitingReqsStr := "waitingReqs:["
for _, r := range kv.waitingReqs {
waitingReqsStr += fmt.Sprintf("%d:%d;", r.ClerkID, r.RPCID)
}
raft.DebugLog(raft.DReplyGet, kv.me, "%s]", waitingReqsStr)

kv.rf.Start(op)
kv.cond.Wait()
// ...
}

被唤醒之后,也是要进行同样的判断,看该KV server是否是Leader,以及判断term是否发生了改变;如果这些情况都符合要求,那么返回OK:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// ...
term, isLeader = kv.rf.GetState()
if !isLeader {
reply.Err = ErrWrongLeader
raft.DebugLog(raft.DReplyPutOrAppend, kv.me, "Reject: %s", reply.Err)
kv.mu.Unlock()
return
}

if kv.term < term {
kv.term = term
kv.cond.Broadcast()
kv.waitingReqs = nil
}

reply.Err = OK
raft.DebugLog(raft.DReplyPutOrAppend, kv.me, "Reply %s{%s:%s}: SUCCESS; db[%s]:%s",
args.Op, args.Key, args.Value, args.Key, kv.db[args.Key])
kv.mu.Unlock()
}

说了这么多,那么当线程在kv.cond上等待时,由哪一个线程来唤醒?Linearizable Semantics又是在哪里实现的?在另外一个函数KVServer.readApplyCh中。

每启动一个KV Server,都要启动一个相应的KVServer.readApplyCh线程,用来读取Raft peer commit的log entry,并对db作出相应的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (kv *KVServer) readApplyCh() {
for msg := range kv.applyCh {
if kv.killed() {
return
}
kv.mu.Lock()

if msg.SnapshotValid { // install a snapshot
kv.ingestSnapshot(msg.Snapshot)
} else if msg.CommandValid { // commit a log entry
op := msg.Command.(Op)
kv.commitIndex = msg.CommandIndex

// check if this op has already been executed
if kv.lastExecuted[op.ClerkID] < op.RPCID {
switch op.Type {
case "Put":
kv.db[op.Key] = op.Value
case "Append":
kv.db[op.Key] += op.Value
}
raft.DebugLog(raft.DReplyPutOrAppend, kv.me, "(CK:%d,RPC:%d,Op:%s,Key:%s,Value:%s): db[%s]==%s",
op.ClerkID, op.RPCID, op.Type, op.Key, op.Value, op.Key, kv.db[op.Key])
raft.DebugLog(raft.DReplyGet, kv.me, "lastExecuted[%d]: %d -> %d", op.ClerkID,
kv.lastExecuted[op.ClerkID], op.RPCID)
kv.lastExecuted[op.ClerkID] = op.RPCID
}

if kv.maxraftstate != -1 && kv.rf.Persister.RaftStateSize() > kv.maxraftstate {
raft.DebugLog(raft.DSnapshot, kv.me, "Take Snapshot, Index:%d", kv.commitIndex)
kv.rf.Snapshot(kv.commitIndex, kv.takeSnapshot())
}

// if this kv server is not leader, then it should not wake up
// threads waiting on its conditional variable
_, isLeader := kv.rf.GetState()
if !isLeader {
kv.mu.Unlock()
continue
}

// Only when there are threads waiting on `kv.cond`: len(kv.waitingReqs > 0)
// and `ClerkID` && `RPCID` of the log entry commited just now is the same as
// the first thread waiting on `kv.cond`'s queue,
// can we use `kv.cond.Signal` to wake up the first thread waiting on `kv.cond`
if len(kv.waitingReqs) > 0 && kv.waitingReqs[0].ClerkID == op.ClerkID && kv.waitingReqs[0].RPCID == op.RPCID {
kv.cond.Signal()
kv.waitingReqs = kv.waitingReqs[1:]
}
}

kv.mu.Unlock()
}
}

这里的实现还涉及到Lab 3B的Snapshot的实现了,此处不进行展开,重点看非Snapshot部分。

此处判断一个OP或者RPC是否被执行过了,是比较op.RPCIDkv.lastExecuted[op.ClerkID];只有在kv.lastExecuted[op.ClerkID] < op.RPCID成立的情况下才可以执行op中的操作,并且将kv.lastExecyted[op.ClerkID]更新为op.RPCID

但是这样实现是不是有问题呢?按理说,kv.lastExecuted[op.ClerkID]应该记录所有已经被执行过的RPC的ID,而不是最近一次执行过的,每次判断RPC是否被执行过,都应该在这个数组中搜索op.RPCID。需要注意的是,RPCID的值是线性递增的,而且如果RPC请求失败的话,Clerk会不断重发相同ID的RPC请求。按如上所述,在它之前的RPCID为1, 2, ..., op.RPCID-1,都必定被执行过了,所以说我们只需要存储最近一次执行过的RPCID即可。

另外我们用kv.waitingReqs数组,实现了精确线程唤醒。为什么需要精确地唤醒线程?想想如下这样一个场景:

  • Clerk向集群发送了一个Put请求(Put{3:5})和Get请求(Get 3),两个相应的线程在kv.cond上等待commit,假设需要同步的这两个新的log entry的index分别为80,81
  • 当前集群中的Leader因为某种原因(如服务器崩溃重启),commitIndex比较落后,在不断地向前进行commit
  • 如果在commit index为70,71的时候,不小心将前面两个线程给唤醒了,但实际上index为80,81的log entry都没有被commit,相应的操作没有被执行,Clerk的Get请求(Get 3)得到的是旧值,违反了Linearizable Semantics

所以我们需要精确唤醒,只有等待在kv.cond上的第一个线程,和当前commit的log entry对应时,才可以调用kv.cond.Signal唤醒该线程,同时从kv.waitingReqs中去掉第一个元素。