Raft Lab 2B #
怎么发送日志 #
2b 是 Leader 接收客户端请求,将客户端请求里的 Command,写入到自己的 log 数组里,同时同步到其他节点的 log 数组,但是该怎么做呢?以下是笔者的一些思考过程:
- 客户端是调用的 Start 函数,因此可以考虑:在 Start 函数里,先写入到自己的 log,然后再调用一个 doAppendEntry 的函数,将日志同步到其他节点,最后将结果返回给客户端
- Follower 节点,通过和 PreLogIndex 和 PreLogTerm 的比较,可以确认当前这条日志是否可以同步到自身的 log 数组里
- 如果某个节点当前的日志比较落后,拒绝了这次的同步日志的请求,该怎么办呢?Leader 在同步日志时,维护一个 nextIndex 数组,如果某个节点
i
同步日志失败,则将 nextIndex [i]–,同时将 nextIndex[i] 到当前的日志,放到日志数组里,再发送给节点i
,直到成功为止(或者自身不是 leader 状态) - 每个Raft节点都需要维护自身的 commitIndex(表示同步到超过半数节点的 最大的 log 下标),其中 Leader 节点在超过半数的节点同步日志成功后,将自身的 commitIndex 增加,然后返回结果给客户端;Follower 节点在收到心跳请求后,根据 LeaderCommit 的信息,将自身的 commitIndex 增加
leader 选举修改 #
lab 2a 里面,是通过比较 candidate 与 当前节点的 term 大小,来确定是否给 candidate 投票
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = args.CandidateId
reply.Term = rf.currentTerm
reply.VoteGranted = true
} else {
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
在 lab 2b 里面,在投票的请求里,带上了 Candidate 的 LastLogIndex 和 LastLogTerm ,因此当 Term 一样时,需要比较 LastLog 的大小
if args.Term > rf.currentTerm || (args.Term == rf.currentTerm &&
(args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex))) {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = args.CandidateId
reply.Term = rf.currentTerm
reply.VoteGranted = true
} else {
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
然而上面的比较其实是有 bug 的,没有办法通过全部的测试用例。比如下面这个场景
由 3 个节点组成的 raft 集群,节点 1 为 leader,节点 2 和节点 3 都是 follower。当节点 3 与节点 1、2 网络隔绝时:
- 节点 3 由于无法收到节点 1 传过来的心跳而超时,因此会发起投票,Term 不断自增
- 节点 1 和节点 2 正常接收客户端请求,处理客户端请求,logs不断增加
当节点 3 与节点 1、2 的网络恢复时,节点 3 发起投票,在上面的代码中,节点 1、2 会投票给节点 3,让节点 3 变为 Leader,然而节点 3 并没有全部的 logs,这与论文 5.4.1(Election restriction)的内容冲突。
因此上面的代码还需要再修改一下,在 args.Term > rf.currentTerm
时,将自身的状态变为 follower,然后比较 LastLog 的大小,再确认是否进行投票。
if args.Term > rf.currentTerm || (args.Term == rf.currentTerm &&
(args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex))) {
rf.currentTerm = args.Term
rf.state = Follower
if args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex) {
rf.votedFor = args.CandidateId
reply.Term = rf.currentTerm
reply.VoteGranted = true
} else {
rf.votedFor = -1
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
} else {
reply.Term = rf.currentTerm
reply.VoteGranted = false
}
事实证明,上述代码不仅仅需要在 leader 选举接收投票请求 时修改,还需要在 AppendEntry Request 接收心跳日志 的地方修改,不然 2A 就跑不过了
config.go 里面的 logs 为空的问题 #
config.go 里面会校验每个 raft 节点的日志的一致性,但是它的 logs,并不是直接复制 raft 节点里面的 log。config.go 在创建每个 raft 节点的时候,在构造参数里会传递一个 channel 给 raft 节点,然后再创建一个 goroutine 不断读取channel里的消息,同步节点中已经 commit 过的 log。因此在每个 raft 节点 commit 一个新的 log 之后,都应该向对应的 channel 里面发送 ApplyMsg,这样 config.go 才能从 channel 获取,更新它自己的 logs
在 raft.go 的代码前面也有 ApplyMsg 的注释和结构体
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
...
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
// For 2D:
SnapshotValid bool
Snapshot []byte
SnapshotTerm int
SnapshotIndex int
}
在 commit 一个 log 之后,应该向 channel 发送一条 ApplyMsg,且应该保证这条 log 之前的所有 logs 都已经被 commit 且已经发送到 channel 里面。再考虑并发较高的场景,向 channel 发送 ApplyMsg 时,应该按照 logs 的下标顺序发送(在 rf.applyCh <- ApplyMsg
之前如果加一个 go,启动一个新的 goroutine,会有 logs 乱序的问题,无法通过测试)
for i := rf.commitIndex + 1; i <= args.LeaderCommit; i++ {
rf.applyCh <- ApplyMsg{
CommandValid: true,
Command: rf.log[i].Command,
CommandIndex: i,
}
}
commitIndex 问题 #
leader 在超过半数的节点 AppendEntry 完成后,就增加 commitIndex
follower 怎么处理呢
- 由心跳日志定时触发:心跳日志的请求参数和普通的 AppendEntry 请求参数一样, 只不过里面的 Entries 为空。请求参数里有 PreLogIndex、PrevLogTerm 和 LeaderCommit,follower 根据这些参数可以确定 commitIndex 的值
Leader commit 之后立刻触发:Leader commit 之后,立刻发送一个心跳日志,让 follower commit
第二种做法其实也可以,但是在 lab 2b 里面最后有一个 RPC Count 测试,如果 Leader Commit 之后立刻发送心跳日志,会因为发送太多的 RPC 请求而导致无法通过最后一个测试
还有一个问题就是:如果 leader 在自身 commit 之后,与其他 follower 之间的网络断开或者自身挂了,导致 follower 没有进行 commit,那怎么办。这就是论文 5.4.2(Committing entries from previous terms)提到的内容了,会有两种情况,对应论文里 Figure 8 的(d)和(e)
next 数组的维护 #
next 数组的维护,主要有以下几点需要注意:
-
每个 raft 节点初始化 logs 数组,在下标为 0 的地方填充一条日志,将 next 数组初始化为 1,这样可以减少越界问题的处理。
-
leader 对 follower 发送 AppendEntry 请求时,Entries 设置为 leader.logs[next[i]:logIndex],即 logs 第 next[i] 个到 logs 数组的最新的一个。当 follower AppendEntry失败,减少 next[i] 的值(但 next[i] 不会减少到 0,因为第 0 条日志每个节点都是一样的)
-
在 follower 对 AppendEntry 请求返回 true 之后,修改 next 数组的值为 logIndex(当前 logs 数组中最新的日志的下标)。同时要注意并发问题,有可能上一个 AppendEntry 还未完成,等上一个 AppendEntry 完成时,把 next[i] 修改变小
rf.mu.Lock() if logIndex > rf.nextIndex[i] { rf.nextIndex[i] = logIndex } rf.mu.Unlock()
性能提升 #
刚开始跑过 2b 的时候,发现跑完总共大概需要 80s。而在实验的说明里:If your solution uses much more than a minute of real time for the 2B tests, or much more than 5 seconds of CPU time, you may run into trouble later on. 也就是说,如果跑完 2b 需要超过 1 分钟的时间,后续的实验可能就有问题,因此需要对代码进行一下优化,提升性能,减少执行时间
我大概做了如下 3 点的优化:
-
优化退出条件:原本在向其他所有节点发送请求时(包括选举、心跳、追加日志请求),有一个 condition 的锁,在不满足条件时继续等待
appendEntryMu.Lock() for liveServers < majorityNum && finished < peersNum { cond.Wait() }
优化一下这个
for
的退出条件(但好像性能的提升并不明显)appendEntryMu.Lock() for liveServers < majorityNum && finished < peersNum && maxTerm == currentTerm { cond.Wait() }
-
优化心跳间隔:在写 2a 的时候,设置的是每隔 300ms 发送一次心跳。修改为 100 ms,执行时间一下就从 80s 变成 45s。
-
优化心跳超时时间:在写 2a 的时候,心跳超时时间为 1500ms,修改为 300ms(性能提升不确定,但是修复了一个 bug,在修改了 接受投票 和 接受心跳日志的
if
条件后,2a 概率性过不了,发现是这个心跳超时时间设置的太大导致)
其他一些思考 #
- Lab2b 里面的测试,只有各个节点的网络断开恢复的测试,并没有节点崩溃重启的一些测试。因此当前的实现还有一些问题:节点崩溃重启之后,logs数组怎么恢复,commitIndex、next 数组怎么初始化,这个节点怎么参与选举,这些都没有实现。一个很明显的做法就是进行持久化,可是需要持久化哪些数据呢?
- Lab2b 里面,我并没有维护 matchIndex[] 数组,就可以通过所有的测试,这个 matchIndex[] 是否需要呢?