大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说[Raft共识算法] Dragonboat Log Replication 代码走读,希望您对编程的造诣更进一步.
Dragonboat Log Replication 代码走读
Dragonboat 是一个开源的高性能Go实现的Raft共识协议实现. 具有良好的性能和久经社区检验的鲁棒性, 机遇巧合, 接触到. 因此决定结合Raft博士论文走读其源码. 今天带来Raft中三大核心之一的日志复制Log Replication的代码走读.
Dragonboat Log Replication代码实现结构

Dragonboat中的网络接口调用主要在node.go文件中实现, 作者提供了对网络接口的抽象, 可以自由实现底层的网络交互方法. 本次讨论仅涉及对这些网络接口的代用逻辑, 也就是工作流的讲解, 不涉及网络协议底层实现的逻辑讨论. 作者在protobuf中定义了msg.Tpye, 并通过路由函数将不同Type的msg路由到不同的Handler函数进行处理.
msg Type 及其路由处理函数解读
先介绍根据msg.Type 进行路由的路由函数
路由函数 initializeHandlerMap
func (r *raft) Handle(m pb.Message) {
if !r.onMessageTermNotMatched(m) {
r.doubleCheckTermMatched(m.Term)
r.handle(r, m)
} ...
}
func (r *raft) initializeHandlerMap() {
// candidate
...
// follower
r.handlers[follower][pb.Propose] = r.handleFollowerPropose
r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate
r.handlers[follower][pb.Heartbeat] = r.handleFollowerHeartbeat
r.handlers[follower][pb.ReadIndex] = r.handleFollowerReadIndex
r.handlers[follower][pb.LeaderTransfer] = r.handleFollowerLeaderTransfer
r.handlers[follower][pb.ReadIndexResp] = r.handleFollowerReadIndexResp
r.handlers[follower][pb.InstallSnapshot] = r.handleFollowerInstallSnapshot
r.handlers[follower][pb.Election] = r.handleNodeElection
r.handlers[follower][pb.RequestVote] = r.handleNodeRequestVote
r.handlers[follower][pb.TimeoutNow] = r.handleFollowerTimeoutNow
r.handlers[follower][pb.ConfigChangeEvent] = r.handleNodeConfigChange
r.handlers[follower][pb.LocalTick] = r.handleLocalTick
r.handlers[follower][pb.SnapshotReceived] = r.handleRestoreRemote
// leader
r.handlers[leader][pb.LeaderHeartbeat] = r.handleLeaderHeartbeat
r.handlers[leader][pb.CheckQuorum] = r.handleLeaderCheckQuorum
r.handlers[leader][pb.Propose] = r.handleLeaderPropose
r.handlers[leader][pb.ReadIndex] = r.handleLeaderReadIndex
r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)
r.handlers[leader][pb.HeartbeatResp] = lw(r, r.handleLeaderHeartbeatResp)
r.handlers[leader][pb.SnapshotStatus] = lw(r, r.handleLeaderSnapshotStatus)
r.handlers[leader][pb.Unreachable] = lw(r, r.handleLeaderUnreachable)
r.handlers[leader][pb.LeaderTransfer] = r.handleLeaderTransfer
r.handlers[leader][pb.Election] = r.handleNodeElection
r.handlers[leader][pb.RequestVote] = r.handleNodeRequestVote
r.handlers[leader][pb.ConfigChangeEvent] = r.handleNodeConfigChange
r.handlers[leader][pb.LocalTick] = r.handleLocalTick
r.handlers[leader][pb.SnapshotReceived] = r.handleRestoreRemote
r.handlers[leader][pb.RateLimit] = r.handleLeaderRateLimit
// observer
...
// witness
...
}
重点需要关注的函数是 r.handlers[follower][pb.Propose] = r.handleFollowerPropose
, r.handlers[follower][pb.Replicate] = r.handleFollowerReplicate
, r.handlers[leader][pb.Propose] = r.handleLeaderPropose
, r.handlers[leader][pb.ReplicateResp] = lw(r, r.handleLeaderReplicateResp)
这四个函数. 分别对应Follower处理Proposal消息和Replicate消息; 以及Leader处理ProposalS和ReplicateResp消息. 接下来分别阅读上述四个函数. 以及上述四个函数后续的调用栈. 最终在本地调用栈结束于send函数. send函数十分简单仅仅将msgs添加到r.msgs领域中. 之后将有node扫描raft的msgs领域和logs领域中的缓存消息, 并发起网络交互.
send
func (r *raft) send(m pb.Message) {
m.From = r.nodeID
m = r.finalizeMessageTerm(m)
r.msgs = append(r.msgs, m)
}
更新msg的任期以及原节点id信息, 后添加到raft的msgs领域.
handleFollowerPropose
func (r *raft) handleFollowerPropose(m pb.Message) {
if r.leaderID == NoLeader {
plog.Warningf("%s dropped proposal, no leader", r.describe())
r.reportDroppedProposal(m)
return
}
m.To = r.leaderID
// the message might be queued by the transport layer, this violates the
// requirement of the entryQueue.get() func. copy the m.Entries to its
// own space.
m.Entries = newEntrySlice(m.Entries)
r.send(m)
}
Follower接到客户端的proposal(提议) 后需要将提议转发给主节点, 因此更新完msg.To
目的节点信息后立刻转发. 调用send函数.
handleLeaderPropose 及其后续函数
func (r *raft) handleLeaderPropose(m pb.Message) {
r.mustBeLeader()
if r.leaderTransfering() {
plog.Warningf("%s dropped proposal, leader transferring", r.describe())
r.reportDroppedProposal(m)
return
}
for i, e := range m.Entries {
if e.Type == pb.ConfigChangeEntry {
if r.hasPendingConfigChange() {
plog.Warningf("%s dropped config change, pending change", r.describe())
r.reportDroppedConfigChange(m.Entries[i])
m.Entries[i] = pb.Entry{Type: pb.ApplicationEntry}
}
r.setPendingConfigChange()
}
}
r.appendEntries(m.Entries)
r.broadcastReplicateMessage()
}
前18行代码都不是我们关注的重点: 大体进行一下在确认主节点完毕之后, 判断当前集群状态, 以及配置变更的操作. 最后两行的带吗引起我的的注意. 他们分别是 r.appendEntries(m.Entries)
和 r.broadcastReplicateMessage()
.
func (r *raft) appendEntries(entries []pb.Entry) {
lastIndex := r.log.lastIndex()
for i := range entries {
entries[i].Term = r.term
entries[i].Index = lastIndex + 1 + uint64(i)
}
r.log.append(entries)
r.remotes[r.nodeID].tryUpdate(r.log.lastIndex())
if r.isSingleNodeQuorum() {
r.tryCommit()
}
}
在appendEntries中更新每个entry的term和Index信息. 并将这些entries添加到r.log中.
func (r *raft) broadcastReplicateMessage() {
r.mustBeLeader()
for nid := range r.observers {
if nid == r.nodeID {
plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
}
}
for _, nid := range r.nodes() {
if nid != r.nodeID {
r.sendReplicateMessage(nid)
}
}
}
在broadcastReplicateMessage
方法中, 检查完leader之后, 调用r.sendReplicateMessage(nid)
来实现消息的发送.
func (r *raft) sendReplicateMessage(to uint64) {
var rp *remote
if v, ok := r.remotes[to]; ok {
rp = v
} else if v, ok := r.observers[to]; ok {
rp = v
} else {
rp, ok = r.witnesses[to]
if !ok {
plog.Panicf("%s failed to get the remote instance", r.describe())
}
}
if rp.isPaused() {
return
}
m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
if err != nil {
// log not available due to compaction, send snapshot
if !rp.isActive() {
plog.Warningf("%s, %s is not active, sending snapshot is skipped",
r.describe(), NodeID(to))
return
}
index := r.makeInstallSnapshotMessage(to, &m)
plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
r.describe(), index, NodeID(to), rp.next, rp.match, err)
rp.becomeSnapshot(index)
} else if len(m.Entries) > 0 {
lastIndex := m.Entries[len(m.Entries)-1].Index
rp.progress(lastIndex)
}
r.send(m)
}
该消息发送函数进行了一系列状态检查和判断之后, 最后一行语句点明主旨. 还是调用本段开始所述的send方法.
handleFollowerReplicate
func (r *raft) handleFollowerReplicate(m pb.Message) {
r.leaderIsAvailable()
r.setLeaderID(m.From)
r.handleReplicateMessage(m)
}
前两行判断leader的信息. 最后一行调用r.handleReplicateMessage(m)
方法处理Replicate信息.
在处理Replicate msg的过程中, 根据comitted信息的不同将有两种逻辑, 分别对应日志的复制和日志的提交.
func (r *raft) handleReplicateMessage(m pb.Message) {
resp := pb.Message{
To: m.From,
Type: pb.ReplicateResp,
}
if m.LogIndex < r.log.committed {
resp.LogIndex = r.log.committed
r.send(resp)
return
}
if r.log.matchTerm(m.LogIndex, m.LogTerm) {
r.log.tryAppend(m.LogIndex, m.Entries)
lastIdx := m.LogIndex + uint64(len(m.Entries))
r.log.commitTo(min(lastIdx, m.Commit))
resp.LogIndex = lastIdx
} else {
plog.Debugf("%s rejected Replicate index %d term %d from %s",
r.describe(), m.LogIndex, m.Term, NodeID(m.From))
resp.Reject = true
resp.LogIndex = m.LogIndex
resp.Hint = r.log.lastIndex()
if r.events != nil {
info := server.ReplicationInfo{
ClusterID: r.clusterID,
NodeID: r.nodeID,
Index: m.LogIndex,
Term: m.LogTerm,
From: m.From,
}
r.events.ReplicationRejected(info)
}
}
r.send(resp)
}
func (l *entryLog) tryAppend(index uint64, ents []pb.Entry) bool {
conflictIndex := l.getConflictIndex(ents)
if conflictIndex != 0 {
if conflictIndex <= l.committed {
plog.Panicf("entry %d conflicts with committed entry, committed %d",
conflictIndex, l.committed)
}
l.append(ents[conflictIndex-index-1:])
return true
}
return false
}
func (l *entryLog) getConflictIndex(entries []pb.Entry) uint64 {
for _, e := range entries {
if !l.matchTerm(e.Index, e.Term) {
return e.Index
}
}
return 0
}
func (l *entryLog) commitTo(index uint64) {
if index <= l.committed {
return
}
if index > l.lastIndex() {
plog.Panicf("invalid commitTo index %d, lastIndex() %d",
index, l.lastIndex())
}
l.committed = index
}
func (l *entryLog) lastIndex() uint64 {
index, ok := l.inmem.getLastIndex()
if ok {
return index
}
_, index = l.logdb.GetRange()
return index
}
前五行构造了replicaresp数据结构的同时, 对当前的committedIndex和m.LogIndex进行对比, 显然拒绝了比当前已提交的Index更小的消息. 之后在11–15行的代码中, 进行了term任期校验后, 添加msg到r.log
中, 更新其committed的index值. 一切结束之后使用前述的send方法返回Resp.
handleLeaderReplicateResp
func (r *raft) handleLeaderReplicateResp(m pb.Message, rp *remote) {
r.mustBeLeader()
rp.setActive()
if !m.Reject {
paused := rp.isPaused()
if rp.tryUpdate(m.LogIndex) {
rp.respondedTo()
if r.tryCommit() {
r.broadcastReplicateMessage()
} else if paused {
r.sendReplicateMessage(m.From)
}
// according to the leadership transfer protocol listed on the p29 of the
// raft thesis
if r.leaderTransfering() && m.From == r.leaderTransferTarget &&
r.log.lastIndex() == rp.match {
r.sendTimeoutNowMessage(r.leaderTransferTarget)
}
}
} else {
// the replication flow control code is derived from etcd raft, it resets
// nextIndex to match + 1. it is thus even more conservative than the raft
// thesis"s approach of nextIndex = nextIndex - 1 mentioned on the p21 of
// the thesis.
if rp.decreaseTo(m.LogIndex, m.Hint) {
r.enterRetryState(rp)
r.sendReplicateMessage(m.From)
}
}
}
不考虑失败的其他情况, 重点关注5–19行的代码, 不难发现, r.tryCommit()
和“r.broadcastReplicateMessage()`是值得重点注意的. 其中第一个函数负责状态判断, 第二个函数负责消息的广播.
func (r *raft) tryCommit() bool {
r.mustBeLeader()
if r.numVotingMembers() != len(r.matched) {
r.resetMatchValueArray()
}
idx := 0
for _, v := range r.remotes {
r.matched[idx] = v.match
idx++
}
for _, v := range r.witnesses {
r.matched[idx] = v.match
idx++
}
r.sortMatchValues()
q := r.matched[r.numVotingMembers()-r.quorum()]
// see p8 raft paper
// "Raft never commits log entries from previous terms by counting replicas.
// Only log entries from the leader’s current term are committed by counting
// replicas"
return r.log.tryCommit(q, r.term)
}
判断完leader身份之后进行?? 此处存疑. 之后到entryLog进行commit操作. 对于leader来说已经完成了日志提交的过程了, 但是client还需要对leader的本次Replicate信息进行反馈.
func (l *entryLog) tryCommit(index uint64, term uint64) bool {
if index <= l.committed {
return false
}
lterm, err := l.term(index)
if err == ErrCompacted {
lterm = 0
} else if err != nil {
panic(err)
}
if index > l.committed && lterm == term {
l.commitTo(index)
return true
}
return false
}
具体的commit逻辑还是在entrylog的方法中实现的.
func (r *raft) broadcastReplicateMessage() {
r.mustBeLeader()
for nid := range r.observers {
if nid == r.nodeID {
plog.Panicf("%s observer is broadcasting Replicate msg", r.describe())
}
}
for _, nid := range r.nodes() {
if nid != r.nodeID {
plog.Errorf("[Aibot] %s is sending replicate message to %s", r.describe(), NodeID(nid))
r.sendReplicateMessage(nid)
}
}
}
判断完状态最后一行进行消息的发送
func (r *raft) sendReplicateMessage(to uint64) {
var rp *remote
if v, ok := r.remotes[to]; ok {
rp = v
} else if v, ok := r.observers[to]; ok {
rp = v
} else {
rp, ok = r.witnesses[to]
if !ok {
plog.Panicf("%s failed to get the remote instance", r.describe())
}
}
if rp.isPaused() {
return
}
m, err := r.makeReplicateMessage(to, rp.next, maxEntrySize)
if err != nil {
// log not available due to compaction, send snapshot
if !rp.isActive() {
plog.Warningf("%s, %s is not active, sending snapshot is skipped",
r.describe(), NodeID(to))
return
}
index := r.makeInstallSnapshotMessage(to, &m)
plog.Infof("%s is sending snapshot (%d) to %s, r.Next %d, r.Match %d, %v",
r.describe(), index, NodeID(to), rp.next, rp.match, err)
rp.becomeSnapshot(index)
} else if len(m.Entries) > 0 {
lastIndex := m.Entries[len(m.Entries)-1].Index
rp.progress(lastIndex)
}
r.send(m)
}
从第16行开始构造一个replicate Message开始, 这里的pregress方法提供对远程状态的管理.
func (r *raft) makeReplicateMessage(to uint64,
next uint64, maxSize uint64) (pb.Message, error) {
term, err := r.log.term(next - 1)
if err != nil {
return pb.Message{}, err
}
entries, err := r.log.entries(next, maxSize)
if err != nil {
return pb.Message{}, err
}
if len(entries) > 0 {
lastIndex := entries[len(entries)-1].Index
expected := next - 1 + uint64(len(entries))
if lastIndex != expected {
plog.Panicf("%s expected last index in Replicate %d, got %d",
r.describe(), expected, lastIndex)
}
}
// Don"t send actual log entry to witness as they won"t replicate real message,
// unless there is a config change.
if _, ok := r.witnesses[to]; ok {
entries = makeMetadataEntries(entries)
}
return pb.Message{
To: to,
Type: pb.Replicate,
LogIndex: next - 1,
LogTerm: term,
Entries: entries,
Commit: r.log.committed,
}, nil
}
构建Replicate, msg. 之后发送给follower.
func (r *remote) progress(lastIndex uint64) {
if r.state == remoteReplicate {
r.next = lastIndex + 1
} else if r.state == remoteRetry {
r.retryToWait()
} else {
panic("unexpected remote state")
}
}
node的交互逻辑
主进程中有一个while True循环进行实时变更的处理.
func (e *engine) stepWorkerMain(workerID uint64) {
nodes := make(map[uint64]*node)
ticker := time.NewTicker(nodeReloadInterval)
defer ticker.Stop()
cci := uint64(0)
stopC := e.nodeStopper.ShouldStop()
updates := make([]pb.Update, 0)
for {
select {
case <-stopC:
e.offloadNodeMap(nodes)
return
case <-ticker.C:
nodes, cci = e.loadStepNodes(workerID, cci, nodes)
e.processSteps(workerID, make(map[uint64]struct{}), nodes, updates, stopC)
case <-e.stepCCIReady.waitCh(workerID):
nodes, cci = e.loadStepNodes(workerID, cci, nodes)
case <-e.stepWorkReady.waitCh(workerID):
if cci == 0 || len(nodes) == 0 {
nodes, cci = e.loadStepNodes(workerID, cci, nodes)
}
active := e.stepWorkReady.getReadyMap(workerID)
e.processSteps(workerID, active, nodes, updates, stopC)
}
}
}
在这个循环中的第23行e.processSteps(workerID, active, nodes, updates, stopC)
监控事件的状态并进行处理
func (e *engine) processSteps(workerID uint64,
active map[uint64]struct{},
nodes map[uint64]*node, nodeUpdates []pb.Update, stopC chan struct{}) {
if len(nodes) == 0 {
return
}
if len(active) == 0 {
for cid := range nodes {
active[cid] = struct{}{}
}
}
nodeUpdates = nodeUpdates[:0]
for cid := range active {
node, ok := nodes[cid]
if !ok || node.stopped() {
continue
}
if ud, hasUpdate := node.stepNode(); hasUpdate {
nodeUpdates = append(nodeUpdates, ud)
}
}
e.applySnapshotAndUpdate(nodeUpdates, nodes, true)
// see raft thesis section 10.2.1 on details why we send Replicate message
// before those entries are persisted to disk
for _, ud := range nodeUpdates {
node := nodes[ud.ClusterID]
node.sendReplicateMessages(ud)
node.processReadyToRead(ud)
node.processDroppedEntries(ud)
node.processDroppedReadIndexes(ud)
}
if err := e.logdb.SaveRaftState(nodeUpdates, workerID); err != nil {
panic(err)
}
if err := e.onSnapshotSaved(nodeUpdates, nodes); err != nil {
panic(err)
}
e.applySnapshotAndUpdate(nodeUpdates, nodes, false)
for _, ud := range nodeUpdates {
node := nodes[ud.ClusterID]
if err := node.processRaftUpdate(ud); err != nil {
panic(err)
}
e.processMoreCommittedEntries(ud)
node.commitRaftUpdate(ud)
}
if lazyFreeCycle > 0 {
resetNodeUpdate(nodeUpdates)
}
}
在这个方法中第18行stepNode
方法负责进行Node本地事务的处理包括本地客户端以及其他节点发送到本机的消息. 第41行负责进行网络交互processRaftUpdate
func (n *node) processRaftUpdate(ud pb.Update) error {
if err := n.logReader.Append(ud.EntriesToSave); err != nil {
return err
}
n.sendMessages(ud.Messages)
if err := n.removeLog(); err != nil {
return err
}
if err := n.runSyncTask(); err != nil {
return err
}
if n.saveSnapshotRequired(ud.LastApplied) {
n.pushTakeSnapshotRequest(rsm.SSRequest{})
}
return nil
}
第5行 n.sendMessages(ud.Messages)
方法
func (n *node) sendMessages(msgs []pb.Message) {
for _, msg := range msgs {
if !isFreeOrderMessage(msg) {
msg.ClusterId = n.clusterID
n.sendRaftMessage(msg)
}
}
}
第5行n.sendRaftMessage(msg)
由上层函数指定方法
func (nh *NodeHost) sendMessage(msg pb.Message) {
if nh.isPartitioned() {
return
}
if msg.Type != pb.InstallSnapshot {
nh.transport.Send(msg)
} else {
witness := msg.Snapshot.Witness
plog.Debugf("%s is sending snapshot to %s, witness %t, index %d, size %d",
dn(msg.ClusterId, msg.From), dn(msg.ClusterId, msg.To),
witness, msg.Snapshot.Index, msg.Snapshot.FileSize)
if n, ok := nh.getCluster(msg.ClusterId); ok {
if witness || !n.OnDiskStateMachine() {
nh.transport.SendSnapshot(msg)
} else {
n.pushStreamSnapshotRequest(msg.ClusterId, msg.To)
}
}
nh.events.sys.Publish(server.SystemEvent{
Type: server.SendSnapshotStarted,
ClusterID: msg.ClusterId,
NodeID: msg.To,
From: msg.From,
})
}
}s
第6行nh.transport.Send(msg)
// Send asynchronously sends raft messages to their target nodes.
//
// The generic async send Go pattern used in Send() is found in CockroachDB"s
// codebase.
func (t *Transport) Send(req pb.Message) bool {
v, _ := t.send(req)
if !v {
t.metrics.messageSendFailure(1)
}
return v
}
Raft日志复制过程详解
日志复制
日志提交
原文地址:https://www.cnblogs.com/aibot/archive/2022/10/11/16780090.html
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/4665.html