Lab 2b

Raft Lab 2B #

怎么发送日志 #

2b 是 Leader 接收客户端请求,将客户端请求里的 Command,写入到自己的 log 数组里,同时同步到其他节点的 log 数组,但是该怎么做呢?以下是笔者的一些思考过程:

  1. 客户端是调用的 Start 函数,因此可以考虑:在 Start 函数里,先写入到自己的 log,然后再调用一个 doAppendEntry 的函数,将日志同步到其他节点,最后将结果返回给客户端
  2. Follower 节点,通过和 PreLogIndex 和 PreLogTerm 的比较,可以确认当前这条日志是否可以同步到自身的 log 数组里
  3. 如果某个节点当前的日志比较落后,拒绝了这次的同步日志的请求,该怎么办呢?Leader 在同步日志时,维护一个 nextIndex 数组,如果某个节点 i 同步日志失败,则将 nextIndex [i]–,同时将 nextIndex[i] 到当前的日志,放到日志数组里,再发送给节点 i,直到成功为止(或者自身不是 leader 状态)
  4. 每个Raft节点都需要维护自身的 commitIndex(表示同步到超过半数节点的 最大的 log 下标),其中 Leader 节点在超过半数的节点同步日志成功后,将自身的 commitIndex 增加,然后返回结果给客户端;Follower 节点在收到心跳请求后,根据 LeaderCommit 的信息,将自身的 commitIndex 增加

leader 选举修改 #

lab 2a 里面,是通过比较 candidate 与 当前节点的 term 大小,来确定是否给 candidate 投票

    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.votedFor = args.CandidateId

        reply.Term = rf.currentTerm
        reply.VoteGranted = true
    } else {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
    }

在 lab 2b 里面,在投票的请求里,带上了 Candidate 的 LastLogIndex 和 LastLogTerm ,因此当 Term 一样时,需要比较 LastLog 的大小

    if args.Term > rf.currentTerm || (args.Term == rf.currentTerm &&
        (args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex))) {
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.votedFor = args.CandidateId

        reply.Term = rf.currentTerm
        reply.VoteGranted = true
    } else {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
    }

然而上面的比较其实是有 bug 的,没有办法通过全部的测试用例。比如下面这个场景

由 3 个节点组成的 raft 集群,节点 1 为 leader,节点 2 和节点 3 都是 follower。当节点 3 与节点 1、2 网络隔绝时:

  • 节点 3 由于无法收到节点 1 传过来的心跳而超时,因此会发起投票,Term 不断自增
  • 节点 1 和节点 2 正常接收客户端请求,处理客户端请求,logs不断增加

当节点 3 与节点 1、2 的网络恢复时,节点 3 发起投票,在上面的代码中,节点 1、2 会投票给节点 3,让节点 3 变为 Leader,然而节点 3 并没有全部的 logs,这与论文 5.4.1(Election restriction)的内容冲突。

因此上面的代码还需要再修改一下,在 args.Term > rf.currentTerm 时,将自身的状态变为 follower,然后比较 LastLog 的大小,再确认是否进行投票。

    if args.Term > rf.currentTerm || (args.Term == rf.currentTerm &&
        (args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex))) {
        rf.currentTerm = args.Term
        rf.state = Follower

        if args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastIndex) {
            rf.votedFor = args.CandidateId
            reply.Term = rf.currentTerm
            reply.VoteGranted = true
        } else {
            rf.votedFor = -1
            reply.Term = rf.currentTerm
            reply.VoteGranted = false
        }
    } else {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
    }

事实证明,上述代码不仅仅需要在 leader 选举接收投票请求 时修改,还需要在 AppendEntry Request 接收心跳日志 的地方修改,不然 2A 就跑不过了

config.go 里面的 logs 为空的问题 #

config.go 里面会校验每个 raft 节点的日志的一致性,但是它的 logs,并不是直接复制 raft 节点里面的 log。config.go 在创建每个 raft 节点的时候,在构造参数里会传递一个 channel 给 raft 节点,然后再创建一个 goroutine 不断读取channel里的消息,同步节点中已经 commit 过的 log。因此在每个 raft 节点 commit 一个新的 log 之后,都应该向对应的 channel 里面发送 ApplyMsg,这样 config.go 才能从 channel 获取,更新它自己的 logs

在 raft.go 的代码前面也有 ApplyMsg 的注释和结构体

// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
... 

//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
//
type ApplyMsg struct {
    CommandValid bool
    Command      interface{}
    CommandIndex int

    // For 2D:
    SnapshotValid bool
    Snapshot      []byte
    SnapshotTerm  int
    SnapshotIndex int
}

在 commit 一个 log 之后,应该向 channel 发送一条 ApplyMsg,且应该保证这条 log 之前的所有 logs 都已经被 commit 且已经发送到 channel 里面。再考虑并发较高的场景,向 channel 发送 ApplyMsg 时,应该按照 logs 的下标顺序发送(在 rf.applyCh <- ApplyMsg 之前如果加一个 go,启动一个新的 goroutine,会有 logs 乱序的问题,无法通过测试)

for i := rf.commitIndex + 1; i <= args.LeaderCommit; i++ {
    rf.applyCh <- ApplyMsg{
        CommandValid: true,
        Command:      rf.log[i].Command,
        CommandIndex: i,
    }
}

commitIndex 问题 #

leader 在超过半数的节点 AppendEntry 完成后,就增加 commitIndex

follower 怎么处理呢

  1. 由心跳日志定时触发:心跳日志的请求参数和普通的 AppendEntry 请求参数一样, 只不过里面的 Entries 为空。请求参数里有 PreLogIndex、PrevLogTerm 和 LeaderCommit,follower 根据这些参数可以确定 commitIndex 的值
  2. Leader commit 之后立刻触发:Leader commit 之后,立刻发送一个心跳日志,让 follower commit

第二种做法其实也可以,但是在 lab 2b 里面最后有一个 RPC Count 测试,如果 Leader Commit 之后立刻发送心跳日志,会因为发送太多的 RPC 请求而导致无法通过最后一个测试

还有一个问题就是:如果 leader 在自身 commit 之后,与其他 follower 之间的网络断开或者自身挂了,导致 follower 没有进行 commit,那怎么办。这就是论文 5.4.2(Committing entries from previous terms)提到的内容了,会有两种情况,对应论文里 Figure 8 的(d)和(e)

next 数组的维护 #

next 数组的维护,主要有以下几点需要注意:

  1. 每个 raft 节点初始化 logs 数组,在下标为 0 的地方填充一条日志,将 next 数组初始化为 1,这样可以减少越界问题的处理。

  2. leader 对 follower 发送 AppendEntry 请求时,Entries 设置为 leader.logs[next[i]:logIndex],即 logs 第 next[i] 个到 logs 数组的最新的一个。当 follower AppendEntry失败,减少 next[i] 的值(但 next[i] 不会减少到 0,因为第 0 条日志每个节点都是一样的)

  3. 在 follower 对 AppendEntry 请求返回 true 之后,修改 next 数组的值为 logIndex(当前 logs 数组中最新的日志的下标)。同时要注意并发问题,有可能上一个 AppendEntry 还未完成,等上一个 AppendEntry 完成时,把 next[i] 修改变小

    rf.mu.Lock()
    if logIndex > rf.nextIndex[i] {
        rf.nextIndex[i] = logIndex
    }
    rf.mu.Unlock()
    

性能提升 #

刚开始跑过 2b 的时候,发现跑完总共大概需要 80s。而在实验的说明里:If your solution uses much more than a minute of real time for the 2B tests, or much more than 5 seconds of CPU time, you may run into trouble later on. 也就是说,如果跑完 2b 需要超过 1 分钟的时间,后续的实验可能就有问题,因此需要对代码进行一下优化,提升性能,减少执行时间

我大概做了如下 3 点的优化:

  1. 优化退出条件:原本在向其他所有节点发送请求时(包括选举、心跳、追加日志请求),有一个 condition 的锁,在不满足条件时继续等待

        appendEntryMu.Lock()
        for liveServers < majorityNum && finished < peersNum {
            cond.Wait()
        }
    

    优化一下这个 for 的退出条件(但好像性能的提升并不明显)

        appendEntryMu.Lock()
        for liveServers < majorityNum && finished < peersNum && maxTerm == currentTerm {
            cond.Wait()
        }
    
  2. 优化心跳间隔:在写 2a 的时候,设置的是每隔 300ms 发送一次心跳。修改为 100 ms,执行时间一下就从 80s 变成 45s。

  3. 优化心跳超时时间:在写 2a 的时候,心跳超时时间为 1500ms,修改为 300ms(性能提升不确定,但是修复了一个 bug,在修改了 接受投票 和 接受心跳日志的 if 条件后,2a 概率性过不了,发现是这个心跳超时时间设置的太大导致)

其他一些思考 #

  • Lab2b 里面的测试,只有各个节点的网络断开恢复的测试,并没有节点崩溃重启的一些测试。因此当前的实现还有一些问题:节点崩溃重启之后,logs数组怎么恢复,commitIndex、next 数组怎么初始化,这个节点怎么参与选举,这些都没有实现。一个很明显的做法就是进行持久化,可是需要持久化哪些数据呢?
  • Lab2b 里面,我并没有维护 matchIndex[] 数组,就可以通过所有的测试,这个 matchIndex[] 是否需要呢?