引言 实现了MIT6.824中的lab2A,即leader选举的部分。
Raft结构及初始化 为一个Raft中的节点增加的变量主要有:
currentTerm: 当前任期 
votedFor: 为谁投票, -1表示没有投票,注意一个任期只能投一次票 
state: 当前节点的状态 
heartbeatTimeout: 心跳超时计数器 
electionTimeout: 选举超时计数器 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type  Raft struct  { 	mu               sync.RWMutex         	peers            []*labrpc.ClientEnd  	persister        *Persister           	me               int                   	dead             int32                 	currentTerm      int  	votedFor         int  	state            NodeState 	heartbeatTimeout *time.Timer 	electionTimeout  *time.Timer }type  NodeState uint8 const  ( 	Follower NodeState = iota  	Candidate 	Leader )
 
初始化Make函数如下,注意为新添加的变量进行初始化,可以看到初始化之后就会启动一个ticker的goroutine来让节点不断运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func  Make (peers []*labrpc.ClientEnd, me int , 	persister *Persister, applyCh chan  ApplyMsg)   *Raft { 	rf := &Raft{ 		peers:            peers, 		persister:        persister, 		me:               me, 		dead:             0 , 		currentTerm:      0 , 		votedFor:         -1 , 		state:            Follower, 		heartbeatTimeout: time.NewTimer(time.Duration(StableHeartbeatTimeout())), 		electionTimeout:  time.NewTimer(time.Duration(RandomizedElectionTimeout())), 	} 	 	 	rf.readPersist(persister.ReadRaftState()) 	 	go  rf.ticker() 	return  rf }
 
计时函数 计时函数如上所述有两个:
heartbeatTimeout: 倒计时结束时,需要向其他节点发送心跳,以维持自己的leader地位 
electionTimeout: 倒计时结束时,需要转化为candidate开始选举,如果在倒计时结束前收到了leader的心跳,则重置倒计时。 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func  (rf *Raft)   ticker() { 	for  rf.killed() == false  { 		select  { 		case  <-rf.heartbeatTimeout.C: 			rf.mu.Lock() 			if  rf.state == Leader { 				rf.broadcastHeartbeat() 				rf.heartbeatTimeout.Reset(StableHeartbeatTimeout()) 			} 			rf.mu.Unlock() 		case  <-rf.electionTimeout.C: 			rf.mu.Lock() 			rf.changeState(Candidate) 			rf.currentTerm++ 			rf.startElection() 			rf.electionTimeout.Reset(RandomizedElectionTimeout()) 			rf.mu.Unlock() 		} 	} }
 
选举leader 选举leader主要依靠发送RequestVote RPC来进行,选举的过程如下:
electionTimeout计时器到期,节点转化为candidate状态,增加currentTerm并开始选举 
发送RequestVote RPC给其他节点,请求投票 
接收到其他节点的投票结果。 
 
按照Raft论文的描述可以将选举结果分为三种:
得到了大多数节点的投票,成为leader 
有其他节点成为了leader,自己转化为follower。如何感知到其他节点成为了leader呢?有两种手段:
通过RequestVoteReply中的Term字段,如果Term比自己的大,则说明有其他节点成为了leader 
接受到了其他节点的心跳,说明有其他节点成为了leader 
 
 
大家平分选票,没有leader产生,等待electionTimeout计时器到期,重新开始选举 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func  (rf *Raft)   startElection() { 	request := rf.genRequestVoteRequest() 	DPrintf("{Node %v} starts election with RequestVoteRequest %v" , rf.me, request) 	rf.votedFor = rf.me 	grantedVoteNum := 1  	 	for  peer := range  rf.peers { 		if  peer != rf.me { 			if  peer == rf.me { 				continue  			} 			go  rf.electionRequestOnce(peer, &grantedVoteNum, request) 		} 	} }func  (rf *Raft)   electionRequestOnce(peer int , grantedVoteNum *int , request *RequestVoteArgs) { 	reply := new (RequestVoteReply) 	if  rf.sendRequestVote(peer, request, reply) { 		rf.mu.Lock() 		defer  rf.mu.Unlock() 		DPrintf("{Node %v} received RequestVoteReply {%v} from {Node %v}" , rf.me, reply, peer) 		if  rf.currentTerm == request.Term && rf.state == Candidate { 			if  reply.VoteGranted { 				*grantedVoteNum++ 				if  *grantedVoteNum > len (rf.peers)/2  { 					rf.changeState(Leader) 					rf.broadcastHeartbeat() 				} 			} 		} else  if  reply.Term > rf.currentTerm { 			DPrintf("{Node %v} found higher term %v in RequestVoteReply %v from {Node %v}" , rf.me, reply.Term, reply, peer) 			rf.currentTerm = reply.Term 			rf.votedFor = -1  			rf.changeState(Follower) 		} 	} }
 
节点在进行投票时的规则如下:
如果自己的term比对方大,则拒绝投票 
如果在当前term中已经投过票给其他candidate,则拒绝投票 
其余情况下投票给对方,并更新自己的term与votedFor,并直接转化为follower状态 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func  (rf *Raft)   RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { 	 	rf.mu.Lock() 	defer  rf.mu.Unlock() 	defer  DPrintf("{Node %v}'s state is {state: %v, term: %v}, the RequestVoteReply is {%v}" , rf.me, rf.state, rf.currentTerm, reply) 	if  args.Term < rf.currentTerm || (args.Term == rf.currentTerm && rf.votedFor != -1  && rf.votedFor != args.CandidateId) { 		reply.Term, reply.VoteGranted = rf.currentTerm, false  		return  	} 	if  args.Term > rf.currentTerm { 		rf.currentTerm, rf.votedFor = args.Term, -1  		rf.changeState(Follower) 	} 	if  !rf.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) { 		reply.Term, reply.VoteGranted = rf.currentTerm, false  		return  	} 	rf.votedFor = args.CandidateId 	 	reply.Term, reply.VoteGranted = rf.currentTerm, true  }type  RequestVoteArgs struct  { 	Term         int  	CandidateId  int  	LastLogIndex int  	LastLogTerm  int  }type  RequestVoteReply struct  { 	Term        int  	VoteGranted bool  }
 
注意在状态转化时需要对计时器进行相应的修改,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func  (rf *Raft)   changeState(newState NodeState) { 	if  rf.state == newState { 		return  	} 	DPrintf("{Node %v} changes state from %s to %s" , rf.me, rf.state, newState) 	rf.state = newState 	switch  newState { 	case  Follower: 		rf.heartbeatTimeout.Stop() 		rf.electionTimeout.Reset(RandomizedElectionTimeout()) 	case  Candidate: 	case  Leader: 		rf.broadcastHeartbeat() 		rf.heartbeatTimeout.Reset(StableHeartbeatTimeout()) 		rf.electionTimeout.Stop() 	} }
 
心跳广播 理论上心跳发送应该与日志复制用的是同一种RPC,但是lab2A不需要实现日志复制,所以这里的日志复制进行了简化,能发送心跳来维持自己的leader地位即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func  (rf *Raft)   replicateOneRound(peer int ) { 	if  rf.state != Leader { 		return  	} 	request := rf.genAppendEntriesRequest(peer) 	reply := new (AppendEntriesReply) 	if  rf.sendAppendEntries(peer, request, reply) { 		DPrintf("{Node %v} received AppendEntriesReply {%v} from {Node %v}" , rf.me, reply, peer) 	} }func  (rf *Raft)   AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesReply) { 	DPrintf("{Node %v} received AppendEntriesRequest {%v}" , rf.me, args) 	rf.changeState(Follower) 	rf.electionTimeout.Reset(RandomizedElectionTimeout()) 	reply.Term, reply.Success = rf.currentTerm, true  }type  AppendEntriesRequest struct  { 	Term     int  	LeaderId int  }type  AppendEntriesReply struct  { 	Term    int  	Success bool  }
 
运行结果 运行结果如下,能通过所有测试: