Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or “serialize”) the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder; see the comments in persist() and readPersist(). labgob is like Go’s gob encoder but prints error messages if you try to encode structures with lower-case field names. For now, pass nil as the second argument to persister.Save(). Insert calls to persist() at the points where your implementation changes persistent state. Once you’ve done this, and if the rest of your implementation is correct, you should pass all of the 2C tests.
lab2D的实验要求如下
Implement Snapshot() and the InstallSnapshot RPC, as well as the changes to Raft to support these (e.g, operation with a trimmed log). Your solution is complete when it passes the 2D tests (and all the previous Lab 2 tests).
func(rf *Raft) Snapshot(index int, snapshot []byte) { rf.mu.Lock() defer rf.mu.Unlock() if index <= rf.getFirstIndex() { Debug(dSnap, "S%v ignores the snapshot request with end index %v, because the index is not bigger than the first index %v", rf.me, index, rf.getFirstIndex()) return }
rf.logs = append([]LogEntry{{index, rf.logs[index-rf.getFirstIndex()].Term, nil}}, rf.logs[index-rf.getFirstIndex()+1:]...) rf.persister.Save(rf.encodeState(), snapshot) Debug(dSnap, "S%v applies the snapshot with end index %v, now the len(logs)=%v", rf.me, index, len(rf.logs)) }
func(rf *Raft) applier() { for !rf.killed() { rf.mu.Lock() for rf.lastApplied >= rf.commitIndex { rf.applyCond.Wait() }
if rf.waitApplySnapshotRequest.Term != -1 { if rf.lastApplied < rf.waitApplySnapshotRequest.LastIncludeIndex { rf.mu.Unlock()
rf.applyCh <- ApplyMsg{ //Question: two applyCh update way, how to update orderly? SnapshotValid: true, Snapshot: rf.waitApplySnapshotRequest.Data, SnapshotTerm: rf.waitApplySnapshotRequest.LastIncludeTerm, SnapshotIndex: rf.waitApplySnapshotRequest.LastIncludeIndex, }
rf.mu.Lock() rf.lastApplied = rf.waitApplySnapshotRequest.LastIncludeIndex Debug(dSnap, "S%v applies snapshot from S%v, now the lastApplied is %v", rf.me, rf.waitApplySnapshotRequest.LeaderId, rf.lastApplied)
} rf.waitApplySnapshotRequest = InstallSnapshotRequest{Term: -1} rf.mu.Unlock() } else { commitIndex, lastApplied := rf.commitIndex, rf.lastApplied if rf.getFirstIndex() != 0 && lastApplied+1-rf.getFirstIndex() <= 0 { Debug(dWarn, "S%v has no log to apply, because lastApplied %v < firstIndex %v", rf.me, lastApplied, rf.getFirstIndex()) rf.mu.Unlock() continue } entries := make([]LogEntry, commitIndex-lastApplied) Debug(dInfo, "S%v pre to apply log entries. LastApplied: %v, FirstIndex: %v, commitIndex: %v)", rf.me, lastApplied, rf.getFirstIndex(), commitIndex) copy(entries, rf.logs[lastApplied+1-rf.getFirstIndex():commitIndex+1-rf.getFirstIndex()]) rf.mu.Unlock()
for _, entry := range entries { rf.applyCh <- ApplyMsg{ CommandValid: true, Command: entry.Command, CommandIndex: entry.Index, CommandTerm: entry.Term, } }