if id == r.id { // The candidate votes for itself and should account for this self // vote once the vote has been durably persisted (since it doesn't // send a MsgVote to itself). This response message will be added to // msgsAfterAppend and delivered back to this node after the vote // has been written to stable storage. // 自己给自己发送拉票请求 并不需要真的从发送请求到接收响应走完 既然是自己给自己投票 那肯定是同意 所以直接给自己模拟个赞成票就行 // 这个赞成的响应信息会被放到msgsAfterAppend r.send(pb.Message{To: id, Term: term, Type: voteRespMsgType(voteMsg)}) continue }
// advancec的用途是什么 在不是异步存储的场景 默认就是同步方式 怎么保证相间的顺序是同步的呢 就是靠这个通信 // raft把ready清单 这个清单里面就有要给集群其他节点发送的 告诉etcd后就把advancec赋值 上层处理完后通过advancec告诉raft 在上层通知处理完之前raft不再向上层发送ready清单 if advancec == nil && n.rn.HasReady() { // Populate a Ready. Note that this Ready is not guaranteed to // actually be handled. We will arm readyc, but there's no guarantee // that we will actually send on it. It's possible that we will // service another channel instead, loop around, and then populate // the Ready again. We could instead force the previous Ready to be // handled first, but it's generally good to emit larger Readys plus // it simplifies testing (by emitting less frequently and more // predictably). // ready清单 rd = n.rn.readyWithoutAccept() readyc = n.readyc }
case rd := <-rc.node.Ready(): // raftexample收到raft给的readyc通知 readyc里面有raft打包好的ready清单 // Must save the snapshot file and WAL snapshot entry before saving any other entries // or hardstate to ensure that recovery after a snapshot restore is possible. if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) } rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } rc.raftStorage.Append(rd.Entries) // 走RPC向其他节点发送 rc.transport.Send(rc.processMessages(rd.Messages)) applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)) if !ok { rc.stop() return } rc.maybeTriggerSnapshot(applyDoneC) // raft给的ready清单已经处理完 raftexample再通过advancec通知raft已经处理完了 rc.node.Advance()
8 raft处理投票
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// raft把ready清单通过readyc投递给raftexample // raftexample处理完后通过advancec通知raft func(rn *RawNode) Advance(_ Ready) { // The actions performed by this function are encoded into stepsOnAdvance in // acceptReady. In earlier versions of this library, they were computed from // the provided Ready struct. Retain the unused parameter for compatibility. if rn.asyncStorageWrites { rn.raft.logger.Panicf("Advance must not be called when using AsyncStorageWrites") } // raft在打包ready清单时候已经把自己给自己的投票放到了stepsOnAdvance里面了 // 在收到raftexample的advance通知后 这个时机就可以处理到那个投票 进入得票统计流程 for i, m := range rn.stepsOnAdvance { _ = rn.raft.Step(m) rn.stepsOnAdvance[i] = pb.Message{} } rn.stepsOnAdvance = rn.stepsOnAdvance[:0] }