Tag : distributed-system
Deconstructing a working Raft implementation - 2
By Gyubong Lee- Deconstructing a working Raft implementation - 2
- Deconstructing a working Raft implementation - 1
- Introduce raftify: High-level Raft framework created with focusing on scalability
In the previous post, we provided an overview centered around the types in raft-rs, and explored how leader elections occur during network failures, how log inconsistencies are resolved, and how a consistent state is maintained after overcoming failures, based on three scenarios.
In this article, we will continue from the last post and examine the operation of the Raft implementation across several scenarios.
The scenario we will look into involves the process by which the state of a Raft cluster is saved to stable storage, and how the cluster is rebooted and recovers the previous state from logs and snapshots.
💡 Raftify is a high-level Raft implementation developed by Lablup. If you're curious about Raftify, check out this post.
Exploring the raft-rs Architecture Centered Around Types
In this article, before we proceed with the scenario analysis, let's first look at some of the types in raft-rs that will be featured in this discussion.
ConfState
The cluster is composed of multiple nodes, and each node is classified as either a voter or a learner depending on whether they participate in voting during a fault-induced election. Both voters and learners are members of the cluster and share the consensus, but learners do not participate in voting.
Information about these cluster members is also included in the consensus among the members, and thus can be established or modified by applying log entries.
💡 In raft-rs, the EntryType is divided into EntryConfChange for such ConfState configuration changes and EntryNormal for general state changes.
Among the types used in
raft-rs
, those used in the network layer are defined in theeraftpb.proto
file and are compiled into Rust code by tonic.message ConfState { repeated uint64 voters = 1; repeated uint64 learners = 2; // The voters in the outgoing config. If not empty the node is in joint consensus. repeated uint64 voters_outgoing = 3; // The nodes that will become learners when the outgoing config is removed. // These nodes are necessarily currently in nodes_joint (or they would have // been added to the incoming config right away). repeated uint64 learners_next = 4; // If set, the config is joint and Raft will automatically transition into // the final config (i.e. remove the outgoing config) when this is safe. bool auto_leave = 5; }
voters_outgoing
,learners_next
,auto_leave
are fields supporting joint consensus. In this article, we will omit the explanation of joint consensus.Snapshot, SnapshotMetadata
To ensure system availability, logs cannot be indefinitely accumulated and must be deleted once they have been committed to the state machine.
The process of removing logs up to a specific index in the log sequence is referred to as log compaction, and the recorded state up to that index after applying the log entries is called a snapshot.
Snapshots, which are the main focus of this post, will be examined in detail in the scenario analysis below. They are used to transfer the state of the cluster to newly joined nodes or for recovery from failures.
message Snapshot { bytes data = 1; SnapshotMetadata metadata = 2; } message SnapshotMetadata { // The current `ConfState`. ConfState conf_state = 1; // The applied index. uint64 index = 2; // The term of the applied index. uint64 term = 3; }
SnapshotMetadata
represents the metadata of a snapshot at the time it was created.Specifically, each field signifies the following:
conf_state
: Indicates the cluster membership information at the time the snapshot was created.index
: Represents the index of the last log entry that was compacted when the snapshot was created.term
: Indicates the term value of the last log entry at the time the snapshot was created.
Such metadata is essential for maintaining log consistency when utilizing snapshots.
For instance, when restoring state information from a snapshot, if the
term
of the log entry at the snapshot's index does not match theterm
in the snapshot metadata, the snapshot application request must be ignored to maintain consistency.Scenario Analysis
1 - Snapshot Recording
In Raftify, snapshot creation is initiated by calling the
make_snapshot()
method of a RaftNode, passing the specific index and theterm
value of the log entry at that index as arguments.The data to be stored in the snapshot comes from the return of the
self.fsm.snapshot()
method, which represents the current state of the state machine.💡 The
self.fsm.snapshot()
method can be implemented differently depending on how the Finite State Machine (FSM) is to be stored, and it is one of the implementations that Raftify users must provide. For example, in theHashStore
example, where the FSM is stored in memory,snapshot()
simply serializes and returns aHashMap
.Passing the index of the last log entry applied to the state machine,
last_applied
, tocompact()
, deletes log entries up to the given index.// lablup/raftify/blob/main/src/raft_node/mod.rs pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> { ... let snapshot_data = self.fsm.snapshot().await?; let last_applied = self.raw_node.raft.raft_log.applied; let store = self.raw_node.mut_store(); store.compact(last_applied)?; store.create_snapshot(snapshot_data, index, term)?; Ok(()) }
create_snapshot()
records the snapshot metadata along with the snapshot datadata
received as an argument.// lablup/raftify/blob/main/src/heed_storage/mod.rs fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> { let store = self.wl(); let mut writer = store.env.write_txn()?; let conf_state = store.conf_state(&writer)?; let mut snapshot = Snapshot::default(); snapshot.set_data(data); let meta = snapshot.mut_metadata(); meta.set_conf_state(conf_state); meta.index = index; meta.term = term; store.set_snapshot(&mut writer, &snapshot)?; writer.commit()?; Ok(()) }
2 - Transmitting Snapshots to Newly Joined Nodes
Scenario
When a new node joins the cluster, it must receive the state of the existing cluster to maintain consistency.
However, replicating every log entry individually each time a new node joins is inefficient. Since all nodes share the same state machine, transmitting just the snapshot—the result of applied log entries—instead of every log entry can solve this issue. The type of message used to transmit snapshot data is
MsgSnapshot
.In this section, let's assume that Node 1 is the leader and Node 2 is the newly joined node. We will focus on the code and logs related to the
MsgSnapshot
message to understand what happens during this process.In Raftify, a newly joined follower does not send a separate snapshot request to the leader node.
When a configuration change request (hereafter
ConfChange
) is committed, the leader tries to send this log entry to the newly joined node, which will reject thisMsgAppend
message because it does not have the required log entries.Do you remember the scenario from a previous part where
MsgAppend
messages were rejected due to a network failure, causing inconsistencies between nodes?In that scenario, inconsistencies were resolved by synchronizing mismatched log entries one by one using
prepare_send_entries()
. The difference when resolving log inconsistencies with a newly joined node is that instead of synchronizing log entries one by one, synchronization is done through a snapshot (prepare_send_snapshot()
).Now, let’s delve into the code and log analysis to understand in detail how this scenario unfolds.
Code Analysis
Let's start by examining the part of the code where the
MsgAppend
message sent by the leader to the newly joined node is rejected.The
maybe_send_append()
function is as follows:In the code below, since the
progress
for the newly joined node is empty, the call toself.raft_log.term()
fails, andprepare_send_snapshot()
is triggered, causingmaybe_send_wait()
to return false (rejecting theMsgAppend
message).// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let term = self.raft_log.term(pr.next_idx - 1); match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => { // wait for storage to fetch entries asynchronously return false; } _ => { // 💡 In this scenario, the following branch is executed. // send snapshot if we failed to get term or entries. if !self.prepare_send_snapshot(&mut m, pr, to) { return false; } } } } self.send(m, msgs); true }
The
prepare_send_snapshot()
called in this scenario is a function that retrieves snapshot data by invoking theself.raft_log.snapshot()
method and sets this data in the message to be sent.Afterwards, it marks the
progress
object of the node as being in asnapshot
state before returning.💡 The node's state being marked as
snapshot
indicates that the node is in the process of snapshot replication, which means that log replication to this node will be temporarily suspended.// tikv/raft-rs/blob/master/src/raft.rs fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool { ... m.set_msg_type(MessageType::MsgSnapshot); let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to); if let Err(ref e) = snapshot_r { if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) { self.logger.debug( format!( "failed to send snapshot to {} because snapshot is temporarily unavailable", to ) .as_str(), ); return false; } self.logger .fatal(format!("unexpected error: {:?}", e).as_str()); } let snapshot = snapshot_r.unwrap(); if snapshot.get_metadata().index == 0 { self.logger.fatal("need non-empty snapshot"); } let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term); m.set_snapshot(snapshot); self.logger.debug(format!( "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}", first_index = self.raft_log.first_index(), committed = self.raft_log.committed, snapshot_index = sindex, snapshot_term = sterm, to = to, progress = format!("{:?}", pr) ).as_str()); pr.become_snapshot(sindex); self.logger.debug( format!( "paused sending replication messages to {}; progress: {:?}", to, pr ) .as_str(), ); true }
Thus, Raftify prepares the snapshot to be sent to the new node in advance via the
RaftNode.make_snapshot()
call when aConfChange
is committed, as we explored in scenario 1.The transmitted snapshot is detected and recovered by the Snapshot handling logic in the newly joined node's Raft loop. Through the logic below, the state machine is restored using the received snapshot data via
self.fsm.restore()
, and it is also applied to Stable storage viastore.apply_snapshot()
.// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs async fn on_ready(&mut self) -> Result<()> { ... if *ready.snapshot() != Snapshot::default() { self.logger .info("Restoring state machine and snapshot metadata..."); let snapshot = ready.snapshot(); if !snapshot.get_data().is_empty() { self.fsm.restore(snapshot.get_data().to_vec()).await?; } let store = self.raw_node.mut_store(); store.apply_snapshot(snapshot.clone())?; } ... }
Leader Node Log Analysis
Now, let's analyze the logs generated on the leader node sequentially when a new node joins:
- Node 1 receives a join request from Node 2, and the cluster configuration is modified.
Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter. Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
- Because a new log entry has been added to the leader, it sends a
MsgAppend
message to Node 2 to replicate this log entry.
Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
- However, since the newly joined node does not possess the information of the existing cluster, this
MsgAppend
message is rejected, and Node 1 receives a message indicating that the request has been rejected as follows.
Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8 Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
- As described earlier, since the
progress
of the newly joined node is empty, the snapshot is stored in Stable storage, and log entries up to the specified index are removed. In this case, log entries up to index 8 are removed, and the log entry index corresponding to Node 2's join request is 9. Therefore, a snapshot message is sent along with a log indicating that thefirst_index
is 8 and thecommit
is 9.
Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
- Replication of log entries is paused to transmit the snapshot.
Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
- A
MsgSnapshot
type message is sent to transmit the snapshot. The snapshot contains the data{4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}
that was previously arbitrarily inserted.
Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
Follower Node Log Analysis
Analyzing the logs generated on the newly joined follower node, we find:
- Becomes a new follower node at term 1.
Apr 15 06:37:27.421 INFO became follower at term 1
- Rejects the
MsgAppend
message received from the leader node.
Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0) Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
- Since the node is detected as being in a failed state and unnecessary voting should be avoided, it should respond correctly to
MsgHeartbeat
messages.
Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
- Receives the snapshot through a
MsgSnapshot
message.
Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1] Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
- Restores the state using the received snapshot.
Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1 Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1] Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata... Apr 15 06:37:27.425 DEBG snapshot's persisted index 9
3 - Recovery in Case of Failure in the Majority of Nodes
When a specific node fails, it is not a problem because the node can simply replicate new log entries from the leader node after the network is restored. Even if a node needs to join anew, it can recover its state through snapshots as discussed in scenario 2.
However, if failures occur in a majority of nodes, the cluster cannot recover on its own.
In this case, the administrator must intervene manually to decide which node's log sequence to consider as the correct state, and then re-bootstrap the cluster from that log sequence.
Depending on the administrator's judgment, recovery can be done by directly applying all log entries one by one to the state machine or by restoring the state from the last created snapshot.
State Recovery from WAL Snapshots
In this section, we will use the Raftify example code.
To recreate the example, we will first insert a few key-value pairs into Node 1, then call the
make_snapshot()
method via the/snapshot
API to create a snapshot. We will then assume that the node has failed and shut it down.To recover from a WAL snapshot, pass the
node_id
of the node to be recovered to therestore_wal_snapshot_from
option. Here, we will recover from Node 1's snapshot, so we will use1
.To verify whether the log entries have been applied, we will log
"Inserted: (key, value)"
each timeapply()
is called.💡
apply()
is also an abstract method of the StateMachine that Raftify users need to define, similar torestore()
. It is called when log entries are committed.After taking the snapshot and shutting down Node 1, we can use the CLI commands provided by
Raftify
to dump the storage, as shown below.From the logs below, we can see that the snapshot is stored in the storage and contains data such as
{ data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}
.❯ raftify-cli debug persisted-all ./logs *----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8
Then, let's re-bootstrap Node 1 using the command
./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1
.The logs generated on Node 1 will be as follows. Since the state is restored directly from the snapshot,
apply()
has not been executed for any of the log entries.Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), } Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:54:44.705 INFO became follower at term 3 Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:54:44.705 INFO RawNode created with id 1. Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
And then let's dump the storage again.
Since the recovery is from its own snapshot, we can confirm that no state changes have occurred.
*----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8
State Recovery from WAL Logs
This time, let's recover the state from a specific log sequence.
In this case, as shown in the logs below, the storage has an empty snapshot but contains log entries necessary for state recovery.
*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8
Let's assume a failure and shut down Node 1, then see what happens when we re-bootstrap it.
After shutting down Node 1, re-bootstrap it using the command
./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1
.The following logs will be printed on Node 1, showing that the previously entered log entries are applied at once via
apply()
, restoring the previous state.Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, } Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:46:50.712 INFO became follower at term 1 Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:46:50.712 INFO RawNode created with id 1. Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"... Apr 15 07:46:50.855 DEBG Entries [1, 9) requested. // Applies log entries one by one to restore the state machine state. Inserted: (1, A) Inserted: (1, A) Inserted: (1, A) Inserted: (2, A) Inserted: (3, A) Inserted: (4, A) Inserted: (5, A)
Similarly, since the state is recovered to its pre-crash state, dumping the storage will show that it is the same as before. The difference is that, compared to the previous recovery via snapshot, all log entries were applied one by one this time.
*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8
Conclusion
In this article, following up from the previous one, we explored the issues of resolving log inconsistencies and recovery scenarios with snapshots, especially when there is a newly joined node.
Raftify is participating as an interactive project in the 2024 Open Source Contribution Academy and is recruiting mentees interested in implementing distributed systems! (Recruitment period: ~ 06.23)
Participants will have the opportunity to experience everything from learning the basic concepts of distributed systems to the actual implementation process, alongside mentors.
We look forward to your interest! Thank you 😊
29 May 2024
Deconstructing a working Raft implementation - 1
By Gyubong Lee- Deconstructing a working Raft implementation - 2
- Deconstructing a working Raft implementation - 1
- Introduce raftify: High-level Raft framework created with focusing on scalability
In this article, we'll assume that the reader has a theoretical background in Raft, and we'll dig into the tikv/raft-rs code to see how state machines in distributed systems actually synchronize and behave in a few brief scenarios.
While this article focuses on analyzing the RAFT-RS code, we will use the RAFTIFY source code as an example in some sections for a complete understanding, as the RAFT-RS implementation does not include network and storage layers for flexibility.
💡 raftify is a high-level Raft implementation developed by Lablup. In this post, I'm only going to describe raftify with minimal code to understand how raft works. If you're curious about raftify, check out this post.
Img from: https://github.com/tikv/raft-rs
RAFT-RS architecture with a focus on ## types
Before we dive into the scenarios, let's take a quick look at the architecture, focusing on the typical types used in the code base.
Raft
The
Raft
object of each Raft node holds a message queuemsgs
in memory and interacts with other Raft nodes through this queue.In a high-level implementation like raftify, the network layer is responsible for putting messages into this queue through an abstraction layer that will be described later.
This message queue can therefore be seen as an endpoint for communication, and the
Raft
implementation will process these messages according to its current state, maintaining a consistent state between nodes.The
RaftCore
type holds the data corresponding to the state of this Raft node.There is also a type called
Progress
that holds metadata for synchronizing log entries with other Raft nodes, and these are updated appropriately in theProgressTracker
depending on the situation.As a result, Raft has the following types
// tikv/raft-rs/blob/master/src/raft.rs pub struct Raft<T: Storage> { pub msgs: Vec<Message>, pub r: RaftCore<T>, prs: ProgressTracker, }
RaftLog
Representative of the data that
RaftCore
has isRaftLog
, which abstracts access to a sequence of log entries.RaftLog<T: Storage>
abstracts the typesUnstable
andT
so that they can be handled together. Here,T
corresponds to persistent storage that needs to be implemented at a higher level, such as raftify, andUnstable
is a buffer that goes through before being written to this storage.// tikv/raft-rs/blob/master/src/raft_log.rs pub struct RaftLog<T: Storage> { pub store: T, pub unstable: Unstable, ... }
💡 If you're interested in learning more about the RaftCore type, check out this link.
Raft Loop
Raft implementations perform an iterative process of updating their state machine in an infinite loop in order to communicate with other Raft nodes and maintain a consistent state. In this article, we'll call this loop a Raft loop.
The source code for implementing a Raft loop in raftify is shown below.
(You can also see the example code in tikv/raft-rs if you want to see the most minimal implementation).
// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs async fn on_ready(&mut self) -> Result<()> { if !self.raw_node.has_ready() { return Ok(()); } let mut ready = self.raw_node.ready(); if !ready.messages().is_empty() { self.send_messages(ready.take_messages()).await; } if *ready.snapshot() != Snapshot::default() { slog::info!( self.logger, "Restoring state machine and snapshot metadata..." ); let snapshot = ready.snapshot(); if !snapshot.get_data().is_empty() { self.fsm.restore(snapshot.get_data().to_vec()).await?; } let store = self.raw_node.mut_store(); store.apply_snapshot(snapshot.clone())?; } self.handle_committed_entries(ready.take_committed_entries()) .await?; if !ready.entries().is_empty() { let entries = &ready.entries()[..]; let store = self.raw_node.mut_store(); store.append(entries)?; } if let Some(hs) = ready.hs() { let store = self.raw_node.mut_store(); store.set_hard_state(hs)?; } if !ready.persisted_messages().is_empty() { self.send_messages(ready.take_persisted_messages()).await; } let mut light_rd = self.raw_node.advance(ready); if let Some(commit) = light_rd.commit_index() { let store = self.raw_node.mut_store(); store.set_hard_state_commit(commit)?; } if !light_rd.messages().is_empty() { self.send_messages(light_rd.take_messages()).await; } self.handle_committed_entries(light_rd.take_committed_entries()) .await?; self.raw_node.advance_apply(); Ok(()) }
RawNode
Each Raft node has a higher-level instance of a type called
RawNode
that contains the Raft module. ARawNode
has arecords
field that represents the metadata ofSoftState
, a state that is kept only in memory,HardState
, a state that is stored in persistent storage, andReady
, which is not yet stored.💡 Ready is the data structure that is passed to the Raft node when it needs to be updated.
// tikv/raft-rs/blob/master/src/raw_node.rs pub struct RawNode<T: Storage> { pub raft: Raft<T>, prev_ss: SoftState, prev_hs: HardState, max_number: u64, records: VecDeque<ReadyRecord>, commit_since_index: u64, }
In the first part of the Raft loop, when the
ready
method is called, the metadata fromReady
is stored inrecords
, and after all the snapshots, entries, etc. that need to be stored are processed, the last part of the loop,advance
, callscommit_ready
and updates the offset of the bufferUnstable
.RaftNode
A
RaftNode
is a type that raftify abstracts aRawNode
at a higher level, integrating it with the network and storage layers.In a separate asynchronous task, raftify receives messages sent by the gRPC client and passes them over the channel to the
RaftNode.run
task.After processing the messages, it handles state changes in a function (Raft loop) named
on_ready
.// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs pub async fn run(mut self) -> Result<()> { let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval); let fixed_tick_timer = tick_timer; let mut now = Instant::now(); loop { ... tokio::select! { msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => { if let Ok(Some(msg)) = msg { self.handle_server_request_msg(msg).await?; } } ... } let elapsed = now.elapsed(); now = Instant::now(); if elapsed > tick_timer { tick_timer = Duration::from_millis(100); self.raw_node.tick(); } else { tick_timer -= elapsed; } self.on_ready().await? } }
To explain raftify's implementation in more detail, raftify iterates through the following process
- generate a request from the client (e.g. call
RaftServiceClient.propose
orRaftNode.propose
) RaftServiceClient.propose
on the remote Raft node is called via gRPC.- RaftServiceClient.propose
passes the
Proposemessage over the channel to the
RaftNode.run` coroutine. 4. RaftNode.run
polls the message queue and callsRawNode::propose
when it receives aPropose
message.- when there are changes to the state machine that need to be applied, a
Ready
instance is created and passed to theon_ready
handler. - when entries are committed, the
on_ready
handler processes the committed entries and responds to the client.
With the theoretical stuff out of the way, let's analyze a few scenarios and see what happens.
💡 What we arbitrarily call Propose messages in this paragraph is a type of message defined for the purpose of proposing a state change to the cluster.
Scenario analysis.
1 - Add a new log entry
What happens under the hood when you request (propose) a change to the cluster to alter its state machine? In this section, we'll break down what happens when you call
RawNode.propose
. Here's a look at theRawNode.propose
function// tikv/raft-rs/blob/master/src/raw_node.rs pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> { let mut m = Message::default(); m.set_msg_type(MessageType::MsgPropose); m.from = self.raft.id; let mut e = Entry::default(); e.data = data.into(); e.context = context.into(); m.set_entries(vec![e].into()); self.raft.step(m) }
From the code above, you can see that the
propose
function callsstep
to make it handle a message of typeMsgPropose
.Here,
step
is the function that corresponds to the actual message handler in raft-rs. If the node callingstep
is the leader,step_leader
is called, if it is a follower,step_follower
is called, and if it is a candidate,step_candidate
is called.The code for
step
is quite complex, but let's follow the code to see how theMsgPropose
type is handled on the leader node.// tikv/raft-rs/blob/master/src/raft.rs fn step_leader(&mut self, mut m: Message) -> Result<()> { ... match m.get_msg_type() { MessageType::MsgPropose => { ... if !self.append_entry(m.mut_entries()) { ... } self.bcast_append(); return Ok(()); } ... } }
Raft.append_entry
calls
RaftLog.appendto add an entry. RaftLog.append
appends the entries added to the Unstable buffer byself.unstable.truncate_and_append
.// tikv/raft-rs/blob/master/src/raft_log.rs pub fn append(&mut self, ents: &[Entry]) -> u64 { ... self.unstable.truncate_and_append(ents); self.last_index() }
As previously described, the entries added to the buffer will be persisted in a Raft loop, and updating the state machine via an
advance
-like function will automatically update the offset and clear the buffer.Let's take a look at the next call,
bcast_append
.You can see that we're calling
core.send_append
with each follower'sprogress
as an argument, using theProgressTracker
(prs
) described in the previous section to synchronize the log entries of the leader and followers.// tikv/raft-rs/blob/master/src/raft.rs pub fn bcast_append(&mut self) { let self_id = self.id; let core = &mut self.r; let msgs = &mut self.msgs; self.prs .iter_mut() .filter(|&(id, _)| *id != self_id) .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); }
The
send_append
has the following simple structure// tikv/raft-rs/blob/master/src/raft.rs fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) { self.maybe_send_append(to, pr, true, msgs); }
The
maybe_send_append
reads the log entries in the range pr.next_idx to to viaRaftLog.entries
and passes them toprepare_send_entries
.(As you can infer from the
maybe_
prefix to its name, the function returns true on success and false on failure.)// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let ents = self.raft_log.entries( pr.next_idx, self.max_msg_size, GetEntriesContext(GetEntriesFor::SendAppend { to, term: self.term, aggressively: !allow_empty, }), ); ... match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } ... } ... self.send(m, msgs); true }
Prepare_send_entries
creates a message object m of type
MsgAppendand puts the entries into the message. It then updates
progress` and returns it.// tikv/raft-rs/blob/master/src/raft.rs fn prepare_send_entries( &mut self, m: &mut Message, pr: &mut Progress, term: u64, ents: Vec<Entry>, ) { m.set_msg_type(MessageType::MsgAppend); m.index = pr.next_idx - 1; m.log_term = term; m.set_entries(ents.into()); m.commit = self.raft_log.committed; if !m.entries.is_empty() { let last = m.entries.last().unwrap().index; pr.update_state(last); } }
Then
self.send(m, msgs)
puts this prepared message into themsgs
message queue.// tikv/raft-rs/blob/master/src/raft.rs fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) { ... msgs.push(m); }
The
MsgAppend
message that enters the message queue will be sent to the follower node fromsend_messages
through the network layer. Therefore, we need to see how the follower node handles theMsgAppend
message after receiving it.Next, let's take a look at what happens on the follower node To find out what happens when a follower node receives an
MsgAppend
message, we can look atstep_follower
.// tikv/raft-rs/blob/master/src/raft.rs fn step_follower(&mut self, mut m: Message) -> Result<()> { match m.get_msg_type() { ... MessageType::MsgAppend => { self.election_elapsed = 0; self.leader_id = m.from; self.handle_append_entries(&m); } ... } }
From the code above, you can see that the follower node that received the
MsgAppend
message is callinghandle_append_entries
.This function creates a
to_send
, a message of typeMsgAppendResponse
, and callsRaftLog.maybe_append
, as shown below.// tikv/raft-rs/blob/master/src/raft.rs pub fn handle_append_entries(&mut self, m: &Message) { ... let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { ... // MsgAppend 메시지를 수신 } else { ... // MsgAppend 메시지를 거절 } ... self.r.send(to_send, &mut self.msgs); }
This function calls
match_term
to check if the message'slogTerm
and the log entry'sterm
values are the same, callsfind_conflict
to check for conflicts in the log entry sequence, and callsRaft.append
if it determines there are no problems.// tikv/raft-rs/blob/master/src/raft.rs pub fn maybe_append( &mut self, idx: u64, term: u64, committed: u64, ents: &[Entry], ) -> Option<(u64, u64)> { if self.match_term(idx, term) { let conflict_idx = self.find_conflict(ents); if conflict_idx == 0 { } else if conflict_idx <= self.committed { fatal!( self.unstable.logger, "entry {} conflict with committed entry {}", conflict_idx, self.committed ) } else { let start = (conflict_idx - (idx + 1)) as usize; self.append(&ents[start..]); if self.persisted > conflict_idx - 1 { self.persisted = conflict_idx - 1; } } let last_new_index = idx + ents.len() as u64; self.commit_to(cmp::min(committed, last_new_index)); return Some((conflict_idx, last_new_index)); } None }
We've seen this function before. It was the last function called before the call to
RaftLog.append
when a log entry was proposed by the leader node.As before,
Raft.append_entry
callsRaftLog.append
to add the entry. RaftLog.appendappends the entries added to the Unstable buffer from
self.unstable.truncate_and_append`.This outlines a scenario where logs added to the leader are persisted on the leader node and copied to the follower nodes.
2 - Leader and follower node log sequence mismatch
In scenario 1, we looked at the code assuming a normal situation, but in reality, issues such as network disconnection can cause mismatches between leader and follower nodes. Let's take another look at the code, this time focusing on how to detect and resolve mismatches between leader and follower nodes.
Let's say you have a cluster of three nodes that is processing thousands of requests that are successively changing the state machine, and then a network failure occurs.
In the event of a failure, we should start by looking at the logs written to the nodes, persisted log entries, and debugging information to get some context, but to avoid making this post too long, we'll just pick out the logs that will give us a general idea of what's happening on the nodes and analyze them.
First of all, node 3 is leaving a rejected msgApp... log indicating that it has rejected a message.
Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7
From the log above, we can see that node 3 is a follower node, node 2 is the newly elected leader node after the failure, and that the
MsgAppend
message trying to replicate the 3641th entry was rejected.If we look up what function this log is output from, we can see that it is called from
handle_append_entries
, which we saw in Scenario 1 (the function that handles theMsgAppend
messages that the follower receives from the leader).pub fn handle_append_entries(&mut self, m: &Message) { ... let mut to_send = Message::default(); to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); ... if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) { ... } else { debug!( self.logger, "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \ from {from}", msg_log_term = m.log_term, msg_index = m.index, from = m.from; "index" => m.index, "logterm" => ?self.raft_log.term(m.index), ); let hint_index = cmp::min(m.index, self.raft_log.last_index()); let (hint_index, hint_term) = self.raft_log.find_conflict_by_term(hint_index, m.log_term); if hint_term.is_none() { fatal!( self.logger, "term({index}) must be valid", index = hint_index ) } to_send.index = m.index; to_send.reject = true; to_send.reject_hint = hint_index; to_send.log_term = hint_term.unwrap(); } to_send.set_commit(self.raft_log.committed); self.r.send(to_send, &mut self.msgs); }
If you look at the function, you can see that this log was output, which means that
maybe_append
returned None, which means thatmatch_term
returned False. This means that there is a mismatch between thelogTerm
in the message and the value ofterm
in entry 3641.So we find the point of conflict via
term
(find_conflict_by_term
) and put the point of conflict (hint_index
) into thereject_hint
of the message and send it back to the reader in the form of anMsgAppendResponse
message.So what does the leader do with this rejected
MsgAppendResponse
message?The leader node that rejected the message will leave a log that the
MsgAppend
was rejected, as shown below.Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611
So the next thing we need to look at is the function that receives this rejected
MsgAppend
message and outputs "received msgAppend rejection".This function is called
handle_append_response
, and while the function itself is quite long, it's not that long when you cut it down to just what happens when anMsgAppend
is rejected.fn handle_append_response(&mut self, m: &Message) { let mut next_probe_index: u64 = m.reject_hint; ... if m.reject { debug!( self.r.logger, "received msgAppend rejection"; "reject_hint_index" => m.reject_hint, "reject_hint_term" => m.log_term, "from" => m.from, "index" => m.index, ); if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) { debug!( self.r.logger, "decreased progress of {}", m.from; "progress" => ?pr, ); if pr.state == ProgressState::Replicate { pr.become_probe(); } self.send_append(m.from); } return; } ... }
Take the
reject_hint
from the message and make it thenext_probe_index
, and callProgress.maybe_decr_to
to decrement the progress. Indicate thatProgress
is in the probe state, and callsend_append
to send anotherMsgAppend
message.💡 ProgressState is an enum that represents the synchronization progress of each node. Under normal circumstances, it is "Replicate" if the node is replicating logs, "Probe" if the follower node does not know the last index that was replicated, and "Snapshot" if the node is in a probing state and is replicating logs to the follower by sending snapshots.
To summarize, to find the index (
next_probe_index
) of the log entry before the collision, we decrement the node'sprogress
and send anotherMsgAppend
message. This process is repeated until we find the Common log prefix of the leader and follower nodes.Once the Common log prefix is found, log entries after that index are replicated in a unidirectional fashion from the leader to the follower and overwritten. This process can be seen in the
maybe_send_append
function.The log entries obtained through
RaftLog.entries
are replicated into theSendAppend
context as shown below. Here,max_msg_size
ismax_size_per_msg
from Config, which defaults to0
. WithRaftLog.entries
, themax_size
of theLMDBStorage.entries
(persistent storage type, corresponding to T inRaftLog
) argument is given0
, which, based on this comment, means that if you don't set it, it will synchronize log entries one by one when there is a mismatch in the logs of the leader and follower nodes.After that,
prepare_send_entries
is used to prepare theMsgAppend
message as described in the previous section, andRaft.send
is used to replicate the entries to the follower node.// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let mut m = Message::default(); m.to = to; if pr.pending_request_snapshot != INVALID_INDEX { ... } else { let ents = self.raft_log.entries( pr.next_idx, self.max_msg_size, GetEntriesContext(GetEntriesFor::SendAppend { to, term: self.term, aggressively: !allow_empty, }), ); ... let term = self.raft_log.term(pr.next_idx - 1); match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } ... } } self.send(m, msgs); true }
There are a lot of logs missing in the middle, but you can see that after the synchronization between the leader and the follower has occurred through the above process from the *3612th entry to the *3642nd entry, the follower's
progress state
changes toReplicate
and it starts sending and receivingHeartbeat
messages normally.2023-11-28 14:30:59,269 - INFO - Entries [3612, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3613, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3614, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,259 - INFO - Entries [3615, 3643) requested Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 ... 2023-11-28 14:30:59,284 - INFO - Entries [3641, 3643) requested Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:30:59,284 - INFO - Entries [3642, 3643) requested Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2 Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2 2023-11-28 14:31:01,637
3 - Electing a leader
In Scenario 2, we could tell from the increase in the
term
value that the leader election was caused by a network failure, but in this scenario we'll take a closer look at the leader election process.To see what logs would be taken if the leader failed, we'll simply create a cluster of 3 nodes, force the leader process to shut down, and look at the logs of the process that is newly elected leader.
To summarize the logs, after the leader node shuts down, node 3 starts the election and transitions to the Candidate state and sends a
MsgRestVote
message to the other voters. The process can be summarized as: you receive aMsgRequestVoteResponse
message from node 2, you are elected as the new leader because you received a majority of the votes for yourself, you increase the term value to 2, and you send a special kind of message (an emptyMsgAppend
) to announce that you are the elected leader.💡 A follower node that has not received a heartbeat message by election_tick will start voting. In this case, to avoid split vote, election_tick is determined to be a random value between min_election_tick and max_election_tick each time. Therefore, after the leader node is terminated, any of the remaining two nodes can become the leader node, and it will be elected as the node with the smaller election_tick.
Nov 29 01:30:30.210 INFO starting a new election, term: 1 Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16 Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2 Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3 Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3 Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote 2023-11-29 10:30:30,217 - WARNING - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused" debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}" > 2023-11-29 10:30:30,222 - DEBUG - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 } Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true Nov 29 01:30:30.223 TRCE ENTER become_leader Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10 Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 } Nov 29 01:30:30.223 INFO became leader at term 2, term: 2 Nov 29 01:30:30.223 TRCE EXIT become_leader Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3 Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
Let's take a look at the logs to see what's going on in the code.
First of all, the function that is printing the log "starting a new election" is
hup
.hup
is called during the processing of messages of type
MsgHupfrom
stepand
MsgTimeoutNowfrom
step_follower`.Note that the
MsgTimeoutNow
message is the message type used for Leader transfer, not Leader election. This means that when the leader receives theMsgTransferLeader
message, it will send a message of typeMsgTimeoutNow
to its followers and thehup
function will be executed with thetransfer_leader
flag set to True. While Leader election is the process of electing a new leader in the event of a leader failure, Leader transfer is the process of a leader process transferring leadership to another follower process.So we can see that the message we need to follow now is
MsgHup
. We can guess that it was thetick_election
function below that put in theMsgHup
message because we didn't get a Heartbeat after theelection_tick
, so we started electing a leader.Remember how we called
self.raw_node.tick()
everytick_timer
onRaftNode
? ThisRawNode.tick
allows the node to step anMsgHup
message to itself if theelection_elapsed
has passed therandomized_election_timeout
. (Randomizing theelection_elapsed
here is to prevent a situation where all nodes start voting at the same time and all nodes vote for themselves).// raw_node.rs pub fn tick(&mut self) -> bool { self.raft.tick() } // raft.rs pub fn tick(&mut self) -> bool { match self.state { StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => { self.tick_election() } StateRole::Leader => self.tick_heartbeat(), } } // raft.rs pub fn tick_election(&mut self) -> bool { self.election_elapsed += 1; if !self.pass_election_timeout() || !self.promotable { return false; } self.election_elapsed = 0; let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id)); let _ = self.step(m); true } // raft.rs pub fn step(&mut self, m: Message) -> Result<()> { ... match m.get_msg_type() { ... MessageType::MsgHup => { self.hup(false) }, } } // raft.rs pub fn pass_election_timeout(&self) -> bool { self.election_elapsed >= self.randomized_election_timeout }
The
hup
function runs thecampaign
function with theCAMPAIGN_ELECTION
type, as shown below to summarize.// tikv/raft-rs/blob/master/src/raft.rs fn hup(&mut self, transfer_leader: bool) { ... info!( self.logger, "starting a new election"; "term" => self.term, ); ... self.campaign(CAMPAIGN_ELECTION); }
The
campaign
function transitions its own state to theCandidate
state and starts voting, as shown below.First of all,
self_id
is the node's own id, as the name suggests, soself.poll(self_id, vote_msg, true)
means to vote for yourself.If the result is
VoteResult::Won
, then the node wins the vote as it is and returns as the leader.So you can see that messages like
MsgRequestVote
,MsgRequestVoteResponse
, etc. will not be sent back and forth in a single-node cluster.But of course, this scenario is not the case because it is not a single-node cluster.
// tikv/raft-rs/blob/master/src/raft.rs pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { ... } else { self.become_candidate(); (MessageType::MsgRequestVote, self.term) }; let self_id = self.id; if VoteResult::Won == self.poll(self_id, vote_msg, true) { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). return; } ... }
Before we dive into the latter part of
campaign
, let's take a look at howpoll
works.The
poll
is a function that callsrecord_vote
,tally_votes
, and depending on the result of the poll, if it wins the vote, it transitions to the leader node and broadcasts (bcast_append
) that it is the new leader of the cluster.If it loses the vote, it transitions to a follower node, and if the result is
Pending
, it returns without doing anything.// tikv/raft-rs/blob/master/src/raft.rs fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult { self.prs.record_vote(from, vote); let (gr, rj, res) = self.prs.tally_votes(); if from != self.id { info!( self.logger, "received votes response"; "vote" => vote, "from" => from, "rejections" => rj, "approvals" => gr, "type" => ?t, "term" => self.term, ); } match res { VoteResult::Won => { if self.state == StateRole::PreCandidate { self.campaign(CAMPAIGN_ELECTION); } else { self.become_leader(); self.bcast_append(); } } VoteResult::Lost => { let term = self.term; self.become_follower(term, INVALID_ID); } VoteResult::Pending => (), } res }
The role of
record_vote
is quite simple. It records in the hashmap objectvotes
of theProgressTracker
when a node with the valueid
has voted for itself.// tikv/raft-rs/blob/master/src/tracker.rs pub fn record_vote(&mut self, id: u64, vote: bool) { self.votes.entry(id).or_insert(vote); }
Let's look at
tally_votes
. You can see that the hashmapvotes
is counting the number of nodes that voted for you and the number of nodes that rejected you, and returning them as a tuple.💡 The word "tally" refers to the act of counting or aggregating points, so "tally_votes" is a function that counts and aggregates votes.
// tikv/raft-rs/blob/master/src/tracker.rs pub fn tally_votes(&self) -> (usize, usize, VoteResult) { let (mut granted, mut rejected) = (0, 0); for (id, vote) in &self.votes { if !self.conf.voters.contains(*id) { continue; } if *vote { granted += 1; } else { rejected += 1; } } let result = self.vote_result(&self.votes); (granted, rejected, result) }
Let's take a look at how we determine the outcome of a vote.
For a joint quorum, we need to get the consensus of both quorums (Incoming quorum, Outgoing quorum) to win the vote.
So we need to look at the three
vote_result
functions below.In tracker.rs, we pass as an argument the callback function
check
, which allows the nodeid
to know if the hashmapvotes
has voted for it.In joint.rs, we return
VoteResult::Won
only if both configurations win, andVoteResult::Lost
if either side loses the vote. Otherwise, we returnVoteResult::Pending
.The actual counting of votes is done in
vote_result
in majority.rs.It counts the number of nodes in the cluster that voted for itself and the number of nodes that did not vote, and returns
VoteResult::Won
if more than a majority of the nodes agree,VoteResult::Pending
if the majority is greater than a majority when including nodes that did not get a majority of the votes but failed to send a response, orVoteResult::Lost
otherwise.// tracker.rs pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult { self.conf.voters.vote_result(|id| votes.get(&id).cloned()) } // joint.rs pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult { let i = self.incoming.vote_result(&check); let o = self.outgoing.vote_result(check); match (i, o) { // It won if won in both. (VoteResult::Won, VoteResult::Won) => VoteResult::Won, // It lost if lost in either. (VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost, // It remains pending if pending in both or just won in one side. _ => VoteResult::Pending, } } // majority.rs pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult { ... let (mut yes, mut missing) = (0, 0); for v in &self.voters { match check(*v) { Some(true) => yes += 1, None => missing += 1, _ => (), } } let q = crate::majority(self.voters.len()); if yes >= q { VoteResult::Won } else if yes + missing >= q { VoteResult::Pending } else { VoteResult::Lost } } // util.rs pub fn majority(total: usize) -> usize { (total / 2) + 1 }
We've seen how the voting process is based on the
votes
hashmap, but before this can happen, this hashmap needs to be updated appropriately via theMsgRequestVote
,MsgRequestVoteResponse
messages.So, let's continue following the
campaign
function.We can see that the
campaign
function is creating messages of typeMsgRequestVote
and sending them to voters.So next, let's follow the handler for the
MsgRequestVote
message.// tikv/raft-rs/blob/master/src/raft.rs pub fn campaign(&mut self, campaign_type: &'static [u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { ... } else { self.become_candidate(); (MessageType::MsgRequestVote, self.term) }; let self_id = self.id; if VoteResult::Won == self.poll(self_id, vote_msg, true) { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). return; } // Only send vote request to voters. for id in self.prs.conf().voters().ids().iter() { if id == self_id { continue; } ... let mut m = new_message(id, vote_msg, None); m.term = term; m.index = self.raft_log.last_index(); m.log_term = self.raft_log.last_term(); m.commit = commit; m.commit_term = commit_term; ... self.r.send(m, &mut self.msgs); } ... }
At first glance, it seems complicated, but at the end of the day, what the handler of
MsgRestVote
does is create and send a message to agree or disagree with this vote.Based on the
vote_resp_msg_type
, the type we sent isMsgRequestVote
, so the type of the response message will beMsgRequestVoteResponse
. (We'll skip describing the prevote algorithm in this article)So let's see when a node agrees to vote and when it disagrees. If you peruse the code along with the comments, you'll notice that three conditions must be met for a node to agree to a vote.
-
can_vote
is *true* (either you already voted for the node, or you don't know the
leader_idfor this
term` and haven't voted yet) -
self.raft_log.is_up_to_date
is true (the message'sterm
value is greater thanRaftLog.last_term
or, if equal, the message's index is greater thanRaftLog.last_index
) -
the index of the message is greater than
RaftLog.last_index
, or has a higher priority.
If these three conditions are met, we send a message that we agree to vote, and if none of them are met, we reject the vote.
Now let's move on to the receiver of the
MsgRequestVoteResponse
.// raft.rs pub fn step(&mut self, m: Message) -> Result<()> { ... match m.get_msg_type() { MessageType::MsgRequestVote => { // We can vote if this is a repeat of a vote we've already cast... let can_vote = (self.vote == m.from) || // ...we haven't voted and we don't think there's a leader yet in this term... (self.vote == INVALID_ID && self.leader_id == INVALID_ID) // ...and we believe the candidate is up to date. if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) && (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m)) { self.log_vote_approve(&m); let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = false; to_send.term = m.term; self.r.send(to_send, &mut self.msgs); if m.get_msg_type() == MessageType::MsgRequestVote { // Only record real votes. self.election_elapsed = 0; self.vote = m.from; } } else { self.log_vote_reject(&m); let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); to_send.reject = true; to_send.term = self.term; let (commit, commit_term) = self.raft_log.commit_info(); to_send.commit = commit; to_send.commit_term = commit_term; self.r.send(to_send, &mut self.msgs); self.maybe_commit_by_vote(&m); } } } } // raft.rs pub fn vote_resp_msg_type(t: MessageType) -> MessageType { match t { MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse, MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse, _ => panic!("Not a vote message: {:?}", t), } } // raft_log.rs pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool { term > self.last_term() || (term == self.last_term() && last_index >= self.last_index()) }
The
MsgRequestVoteResponse
message handler is very simple!It calls the
poll
function we saw earlier to update thevotes
hashmap and update theStateRole
if the vote has been decided.// tikv/raft-rs/blob/master/src/raft.rs fn step_candidate(&mut self, m: Message) -> Result<()> { match m.get_msg_type() { ... MessageType::MsgRequestVoteResponse => { ... self.poll(m.from, m.get_msg_type(), !m.reject); self.maybe_commit_by_vote(&m); } } }
Summary
In this article, we looked at the code architecture based on the types used in RAFT-RS, and then followed and analyzed the code of a RAFT implementation in three basic scenarios. We hope that this article has helped you expand your understanding of the RAFT module. In the next installment, we'll take a deeper look at how the RAFT implementation works with more scenarios.
Thanks 😊
This post auto translated from Korean
29 March 2024
Introducing Raftify: High-level Raft framework created with focusing on scalability
By Gyubong Lee- Deconstructing a working Raft implementation - 2
- Deconstructing a working Raft implementation - 1
- Introduce raftify: High-level Raft framework created with focusing on scalability
Hello, I've been working on introducing Raft to the Backend.AI manager processes at Lablup since last year.
Here's a rough breakdown of the related tasks I'm working on.
- Introducing Raft to the Backend.AI manager process and making it a leader-follower structure.
- Replacing the existing distributed lock-based GlobalTimer with a Raft-based global timer, and ensuring that a specific task is performed exactly once in the cluster.
- Embedding a global, shareable state store in the manager process and synchronizing it appropriately.
In this post, I'll introduce the Raft framework I've been shoveling over the past year to accomplish this task, some of the issues I've encountered while developing it, and walk through a raftify example code that implements a distributed key-value store in less than 300 lines of code in total.
Introducing raftify
raftify is a Raft framework developed with a focus on extensibility so that it can be easily integrated with any server application.
Among the Raft implementations utilized in production, tikv's raft-rs implementation was developed on top of raftify, using LMDB as stable storage and gRPC as the network layer.
Writing binding of the Raft module
I decided that building and maintaining a reliable Raft implementation from the ground up would be a significant burden, so I decided to write a Python binding for the Raft module first.
So I initially thought I'd try writing a Python binding using gopy, which is the most starred Raft implementation on GitHub, hashicorp/raft.
However, gopy didn't support the binding of
goroutine
, and it didn't support the latest Python version.Then, on the advice of a senior developer in the company, I learned about a Rust implementation called tikv/raft-rs and PyO3, which inspired me to try writing a Python binding for tikv/raft-rs using PyO3.
rraft-py
Thus, I decided to develop a Python binding for Raft modules named rraft-py which is combined with Rust, Raft, and Py
My first concern in developing rraft-py was to make the semantics of the rust code and the python code as close to a 1:1 match as possible.
To achieve a 1:1 match, I needed to bypass some of the details of Rust's syntax.
My main concern at the time was how to expose Rust references to the Python side, which you can see in my PyCon KR presentation if you're interested.
The result is rraft-py, a port of over 10,000 lines of integration test code from raft-rs, which has become a fairly reliable implementation of Raft bindings that can be used directly in Python.
Currently, raftify is in the process of being completely rewritten in Rust, and rraft-py is no longer used, but it was a great experience to write my first PyO3 bindings and try out the APIs of a Raft implementation.
riteraft-py
After developing riteraft-py and porting over 10,000 lines of integration tests from raft-rs and even the multiple-mem-node example to python code to get it working, my only thought was that I still didn't know where to start.
The raft-rs really only provided the Raft implementation itself and I had no idea how to integrate it into my application.
While browsing GitHub, I came across a high-level Rust implementation based on tikv/raft-rs called riteraft in an issue called How to use this lib?, and it was much more intuitive to figure out how to use. So I decided to develop riteraft-py with the goal of mimicking its behavior in Python and integrating it at the application level.
The job of riteraft is to integrate this Raft implementation directly with Raft modules and logs, state machines, and network layers, but the problem was that it didn't work very well, apart from being intuitive to use.
Leader election not happening when the leader is dead, data replication not happening in certain scenarios, panic when the commit count goes over 255, etc... All sorts of miscellaneous issues had to be resolved.
Even after resolving all of these issues and getting the cluster to look like it was working, the issues kept coming: it would seem to be working fine, but then certain failures would cause catastrophic issues, such as cluster inconsistency or log synchronization locking up.
Each time we encountered an issue, we needed to be able to dig into the technical details of RAFT-RS and understand them, which ended up being a process of ripping apart RAFT-RS's code and understanding it piece by piece.
raftify
While troubleshooting the issue, I decided to use a different abstraction than riteraft and implemented many changes, including a CLI module for debugging node and cluster state, which led me to rename the library to raftify.
When I first started developing the library, the goal was to make it compatible with any Python application, hence the name raftify, which means to raftify.
I am no longer developing the Python implementation, but you can find it on its branch.
raftify written in Rust
Developed in Python on top of rraft-py, raftify ended up working well, but the crude test harness written in a multi-process structure was hard to test in CI, easily broke cluster consistency, and got out of control at the slightest hint of code complexity.
As a result, we decided to completely rewrite raftify's internal logic in Rust and expose only the high-level interface of the Raft package in Python.
Once completely rewritten in Rust, raftify was single-threaded, integration testable, and could be tested in CI, which helped eliminate the fear of making code changes.
raftify example code
In this section, we'll create a simple distributed key-value store using raftify.
For the full source code, see this link.
Define the state machine
The first thing we need to do is define the log entries and state machine that we will use in our key-value store.
For the sake of this article, we'll just define a simple
Insert
type command that defines a value as a log entry.💡 Disclaimer: This article does not explain the Rust language syntax and the theoretical background of Raft.
#[derive(Clone, Debug, Serialize, Deserialize)] pub enum LogEntry { Insert { key: u64, value: String }, }
Let's define a state machine of type
HashMap
as shown below.#[derive(Clone, Debug)] pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
Then we need to define
encode
anddecode
methods to indicate how we want to serialize and deserialize these data structures. You can use the bincode crate to define these as simply as below.impl AbstractLogEntry for LogEntry { fn encode(&self) -> Result<Vec<u8>> { serialize(self).map_err(|e| e.into()) } fn decode(bytes: &[u8]) -> Result<LogEntry> { let log_entry: LogEntry = deserialize(bytes)?; Ok(log_entry) } } impl AbstractStateMachine for HashStore { fn encode(&self) -> Result<Vec<u8>> { serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into()) } fn decode(bytes: &[u8]) -> Result<Self> { let db: HashMap<u64, String> = deserialize(bytes)?; Ok(Self(Arc::new(RwLock::new(db)))) } }
Finally, we need to define three methods in the HashStore that will be used by raftify's internal code.
Define
apply
, a method that will be called when a new log entry is applied to the HashStore,snapshot
, which will be called when saving the current state of the HashStore as a snapshot, andrestore
, which will be called when restoring the state of the HashStore via a snapshot byte slice, as shown below.#[async_trait] impl AbstractStateMachine for HashStore { async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> { let log_entry: LogEntry = LogEntry::decode(&data)?; match log_entry { LogEntry::Insert { ref key, ref value } => { let mut db = self.0.write().unwrap(); log::info!("Inserted: ({}, {})", key, value); db.insert(*key, value.clone()); } }; Ok(data) } async fn snapshot(&self) -> Result<Vec<u8>> { Ok(serialize(&self.0.read().unwrap().clone())?) } async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> { let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap(); let mut db = self.0.write().unwrap(); let _ = std::mem::replace(&mut *db, new); Ok(()) } }
Define the web server API
Let's define the web server API that will be used in our example. We will use this API to access the Raft object on the node and manipulate the HashStore.
For our example, we'll use the actix-web crate and define it as shown below.
The put command can be implemented by calling the
propose
method on the RaftNode of the Raft object. We can do this by encoding theInsert
type LogEntry we defined earlier and passing it as an argument to theRaftNode::propose
method.The get command can be implemented by returning the value corresponding to the id from the HashMap stored in memory.
#[get("/put/{id}/{value}")] async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder { let log_entry = LogEntry::Insert { key: path.0, value: path.1.clone(), }; data.1.raft_node.propose(log_entry.encode().unwrap()).await; "OK".to_string() } #[get("/get/{id}")] async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder { let id = path.into_inner(); let response = data.0.get(id); format!("{:?}", response) } let web_server = tokio::spawn( HttpServer::new(move || { App::new() .app_data(web::Data::new((store.clone(), raft.clone()))) .service(put) .service(get) }) .bind(addr) .unwrap() .run(), );
Bootstrap a Raft cluster
Next, let's bootstrap a cluster of RaftNodes.
If the
--peer-addr
argument is given, send a join request to the cluster to get a newnode_id
via peer-addr and bootstrap a new cluster if this argument is not given.Leader
In this example, we'll stick with the
node_id
of the leader node for intuitive understanding. This means that when we callRaft::bootstrap_cluster
, we can create a Raft object of the leader node by passing 1 to thenode_id
. After that, we can call theRaft::run
method and the RaftNode will run.Followers
As shown below, after calling
Raft::request_id
to receive aClusterJoinTicket
that can be used to join the cluster, you will callRaft::new_follower
with the specifiednode_id
to create a follower Raft node object, call theRaft::run
method to have the RaftNode run, and join the cluster via theRaft::join
method.let (raft, raft_handle) = match peer_addr { Some(peer_addr) => { log::info!("Running in Follower mode"); let ticket = Raft::request_id(raft_addr, peer_addr.clone(), logger.clone()).await.unwrap(); let node_id = ticket.reserved_id; let raft = Raft::new_follower( node_id, raft_addr, store.clone(), cfg, None, logger.clone(), )?; let handle = tokio::spawn(raft.clone().run()); raft.join(ticket).await; (raft, handle) } None => { log::info!("Bootstrap a Raft Cluster"); let node_id = 1; let raft = Raft::bootstrap_cluster( node_id, raft_addr, store.clone(), cfg, None, logger.clone(), )?; let handle = tokio::spawn(raft.clone().run()); (raft, handle) } }; let _ = tokio::try_join!(raft_handle)?;
You can now bootstrap a Raft cluster of three nodes in the terminal as shown below.
$ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8002 $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8003
Test
We can now try out the key-value store we defined through the
actix-web
server API via thecurl
command.❯ curl http://localhost:8001/put/1/test OK ❯ curl http://localhost:8001/get/1 Some("test")
If you're interested in learning more, you can check out the raftify repository for instructions on how to use the CLI module to help with debugging, example code for
RaftServiceClient
, and more.Summary
raftify is an experimental framework that aims to make it easier for anyone to integrate Raft modules that are otherwise hard to access by the normal developer.
It was developed to introduce a leader-follower structure to Backend.AI manager processes, but as I've shown in this post, it could be used in a variety of places where you need an HA structure, such as creating your own simple distributed key-value store with short source code.
If you're intrigued by the inner workings of the
tikv/raft-rs
implementation, stay tuned for my next post where I'll be analyzing what happens inside the source code line by line in a few scenarios.This post is automatically translated from Korean
26 January 2024