本文是MIT 6.824 Distrubuted System的lab2的实战经验总结。

6.824的测试代码对raft实现的robustness提出了非常高的要求,以Test 1B为例,Test 1B测试的是分布式系统在存在节点断连重连的情况下,Leader Election能否正常工作。关于Raft算法中Leader Election的算法细节,在raft-extend论文中有非常详细的讲解,此处不再赘述。

Test 1B中,系统有3个节点,分别设为0, 1, 2。在选举出Leader (假设为0) 之后,立即将该节点从网络中断开。断开后剩余的两个节点 (1, 2) 不会收到来自原来Leader的Heart Beat,因此它们之间会重新进行选举。

假设节点1首先Election Timeout,节点1成为Candidate。由于节点1无从知晓节点0已经从网络中断开,所以它还是会想节点0发送RequestVote RPC。当然,节点1也会向节点2发送RequestVote RPC。在我的实现中,Candidate需要在收到所有的节点的RequestVote Reply之后,才能决定自己是否当选为Leader,但是由于节点0已经从网络中断开,向其发送RPC不会得到恢复,在相当长时间后 (> 2s),RPC call才会返回,结果为fail。所以在节点1等待RPC call返回的过程中,节点2又会发生Election Timeout,将其置为Candiate,开始新一轮的Leader Election。在旧一轮的Leader Election没有结束的情况下,再开启新的一轮Leader Election肯定是不符合raft-extended paper对Leader Election实现的表述的,因此我们的实现需要改进。

改进的思路大概就是,如果RPC call在超过一段时间 (RPCTimeout) 没有返回,就将Empty Reply返回给RPC call的调用者。

首先设置一个常量RPCTimeout,代表能够容忍的RPC call的最高延时。在后面的测试中,测试代码会设置分布式系统的longDelays字段为true,这样的话,远程节点在相应RPC call后,系统再过200ms才会把结果返回给RPC的调用者。所以此处我将其设置为400ms:

1
const RPCTimeout = 400 * time.Millisecond

每一次调用RPC与其他节点通信,都要启动一个单独的线程,防止阻塞,同时需要给RPC call做一层封装,如果RPC call超时,那么将其返回的结果丢弃:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// This wrapper is used to deal with RPC network latency issues
// Every time you need to make an RPC call, put all info you need into a RPCInfo struct,
// start a new `RPCTimeoutWapper` goroutine, pass that struct to the wrapper.
// The wrapper will make an RPC call, wait for its return
// When it returns, it will check if timeout occurs
// if not, put RPC reply into channel `replyCh`, else exit
func (rf *Raft) RPCWrapper(info RPCInfo, replyCh chan interface{}) {
info.startTime = time.Now()

switch args := info.args.(type) {
case RequestVoteArgs:
reply := info.reply.(RequestVoteReply)
rf.peers[info.peer].Call(info.name, &args, &reply)
info.reply = reply
case AppendEntriesArgs:
reply := info.reply.(AppendEntriesReply)
rf.peers[info.peer].Call(info.name, &args, &reply)
info.reply = reply
case InstallSnapshotArgs:
reply := info.reply.(int)
rf.peers[info.peer].Call(info.name, &args, &reply)
info.reply = reply
}

if time.Since(info.startTime) < RPCTimeout {
replyCh <- info.reply
}
}

结构体RPCInfo的定义如下:

1
2
3
4
5
6
7
8
9
10
11
// All you need to make an RPC call
// type of args and reply is `interface{}`
// to support all kinds of RPCArgs and RPCReply
type RPCInfo struct {
name string
peer int // which raft peer to send RPC to
startTime time.Time // time the RPC is called

args interface{}
reply interface{}
}

这里没有使用go 1.18提供的泛型,而是使用了被广为诟病的interface{}。由于笔者对go 1.18泛型不够熟悉,便只好采用interface{}来支持不同类型的RPC Args结构体。

在RPC wrapper中,使用go的type switch判断info.args的类型,不同的类型调用不同的RPC。如果RPC call没有超时,那么我们就将结果返回给调用者。这里的返回不是传统的return,而是将reply放到事先定义好的channel replyCh中;调用者在启动线程后便阻塞在replyCh这个channel上,在这个线程将reply传入replyCh后,线程解除阻塞,实现了所谓的"返回"。

在启动单独的线程进行RPC call的同时,需要启动另外一个线程RPCTimeoutHandler,这个线程检测到之前的RPC call超时,就会将一个Empty Reply返回给RPC的调用者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Every time you make an RPC call, you need to start a `RPCTimeoutHandler` thread
// if the RPC call you make time out, this thread will send an empty reply to `replyCh`
// which means the return value of RPC call made before is ignored
// If RPC call finished successfully, main thread(who start this ticker thread) will
// send a value to channel `rpcFinished` to tell the thread not to send a empty reply to `replyCh`
func (rf *Raft) RPCTimeoutHandler(replyCh chan interface{}, info RPCInfo, rpcFinished chan bool) {
time.Sleep(RPCTimeout)
if len(replyCh) == 0 && len(rpcFinished) == 0 {
switch info.reply.(type) {
case RequestVoteReply:
replyCh <- RequestVoteReply{}
case AppendEntriesReply:
replyCh <- AppendEntriesReply{}
case int:
replyCh <- 0
}
}
}

同样的,这个线程也是在检测到超时之后,将Empty Reply传入预先定义好的channel replyCh中。但是如果RPC call没有超时,那么这个线程就不应该将Empty Reply传入replyCh了,但是这个该如何实现呢?只需要事先定义好另外一个channel rpcFinished,当RPC caller收到了来自其他节点的回复是,就会将一个值传入rpcFinished,此时len(rpcFinished) == 1;该线程会检测len(rpcFinished)的值,如果不为0,也就是说RPC caller通知该线程,RPC已经完成,且没有超时,那么这个线程就不会将Empty Reply传入replyCh了。

一个使用该方法来解决RPC call高延迟的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Make sure that every time you call this function
// you must hold `rf.mu`
func (rf *Raft) issueInstallSnapshotRPC(peer int) int {
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderID: rf.me,
LastIncludedIndex: rf.lastIncludedIdx,
LastIncludedTerm: rf.lastIncludedTerm,
Data: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()

snapshotStr := "non-nil"
if args.Data == nil {
snapshotStr = "nil"
}
DebugLog(dSnapshot, rf.me, "INSTALL Snapshot -> PEER %d; {T:%d,LLI:%d,LIT:%d,DATA:%s}",
peer, args.Term, args.LastIncludedIndex, args.LastIncludedTerm, snapshotStr)

var reply int
replyCh := make(chan interface{}, 1)
rpcInfo := RPCInfo{
peer: peer,
name: "Raft.InstallSnapshot",
args: args,
reply: reply,
}
rpcFinished := make(chan bool, 1)

go rf.RPCWrapper(rpcInfo, replyCh)
go rf.RPCTimeoutHandler(replyCh, rpcInfo, rpcFinished)

replyTerm := (<-replyCh).(int)
rpcFinished <- true

return replyTerm
}

可以看到,正如我们上面所说的,当一个节点需要通过RPC与其他节点进行通信的时候,首先定义两个channel replyChrpcFinished,用来实现线程之间的通信;之后启动这两个线程:RPCWrapperRPCTimeoutHandler;之后RPC caller会阻塞在replyCh上。当RPC 返回时,caller接触阻塞,继续向下执行,并向rpcFinished传入一个值通知RPCTimeoutHandlerRPC call结束。

一个需要注意的点是,为了支持各种RPC reply,replyCh的类型为chan interface{},从这个channel中取出的值需要进行类型断言才能使用。

在写博客的时候,突然发现了代码中的一个小问题:当RPC call超时,RPCTimeoutHandler线程会将Empty Reply返回给caller,此时caller继续向下执行,但还是会向rpcFinished传入一个值,但此时RPCTimeoutHandler线程已经退出,不需要再向这个channel中传值了。如果您想出了这个问题的解决办法,欢迎联系我。