Engineering

May 29, 2024

Engineering

Deconstructing a working Raft implementation - 2

  • Gyubong Lee

    Software Engineer

May 29, 2024

Engineering

Deconstructing a working Raft implementation - 2

  • Gyubong Lee

    Software Engineer

In the previous post, we provided an overview centered around the types in raft-rs, and explored how leader elections occur during network failures, how log inconsistencies are resolved, and how a consistent state is maintained after overcoming failures, based on three scenarios.

In this article, we will continue from the last post and examine the operation of the Raft implementation across several scenarios.

The scenario we will look into involves the process by which the state of a Raft cluster is saved to stable storage, and how the cluster is rebooted and recovers the previous state from logs and snapshots.

💡 Raftify is a high-level Raft implementation developed by Lablup. If you're curious about Raftify, check out this post.

Exploring the raft-rs Architecture Centered Around Types

In this article, before we proceed with the scenario analysis, let's first look at some of the types in raft-rs that will be featured in this discussion.

ConfState

The cluster is composed of multiple nodes, and each node is classified as either a voter or a learner depending on whether they participate in voting during a fault-induced election. Both voters and learners are members of the cluster and share the consensus, but learners do not participate in voting.

Information about these cluster members is also included in the consensus among the members, and thus can be established or modified by applying log entries.

💡 In raft-rs, the EntryType is divided into EntryConfChange for such ConfState configuration changes and EntryNormal for general state changes.

Among the types used in raft-rs, those used in the network layer are defined in the eraftpb.proto file and are compiled into Rust code by tonic.

message ConfState { repeated uint64 voters = 1; repeated uint64 learners = 2; // The voters in the outgoing config. If not empty the node is in joint consensus. repeated uint64 voters_outgoing = 3; // The nodes that will become learners when the outgoing config is removed. // These nodes are necessarily currently in nodes_joint (or they would have // been added to the incoming config right away). repeated uint64 learners_next = 4; // If set, the config is joint and Raft will automatically transition into // the final config (i.e. remove the outgoing config) when this is safe. bool auto_leave = 5; }

voters_outgoing, learners_next, auto_leave are fields supporting joint consensus. In this article, we will omit the explanation of joint consensus.

Snapshot, SnapshotMetadata

To ensure system availability, logs cannot be indefinitely accumulated and must be deleted once they have been committed to the state machine.

The process of removing logs up to a specific index in the log sequence is referred to as log compaction, and the recorded state up to that index after applying the log entries is called a snapshot.

Snapshots, which are the main focus of this post, will be examined in detail in the scenario analysis below. They are used to transfer the state of the cluster to newly joined nodes or for recovery from failures.

message Snapshot { bytes data = 1; SnapshotMetadata metadata = 2; } message SnapshotMetadata { // The current `ConfState`. ConfState conf_state = 1; // The applied index. uint64 index = 2; // The term of the applied index. uint64 term = 3; }

SnapshotMetadata represents the metadata of a snapshot at the time it was created.

Specifically, each field signifies the following:

  • conf_state: Indicates the cluster membership information at the time the snapshot was created.
  • index: Represents the index of the last log entry that was compacted when the snapshot was created.
  • term: Indicates the term value of the last log entry at the time the snapshot was created.

Such metadata is essential for maintaining log consistency when utilizing snapshots.

For instance, when restoring state information from a snapshot, if the term of the log entry at the snapshot's index does not match the term in the snapshot metadata, the snapshot application request must be ignored to maintain consistency.

Scenario Analysis

1 - Snapshot Recording

In Raftify, snapshot creation is initiated by calling the make_snapshot() method of a RaftNode, passing the specific index and the term value of the log entry at that index as arguments.

The data to be stored in the snapshot comes from the return of the self.fsm.snapshot() method, which represents the current state of the state machine.

💡 The self.fsm.snapshot() method can be implemented differently depending on how the Finite State Machine (FSM) is to be stored, and it is one of the implementations that Raftify users must provide. For example, in the HashStore example, where the FSM is stored in memory, snapshot() simply serializes and returns a HashMap.

Passing the index of the last log entry applied to the state machine, last_applied, to compact(), deletes log entries up to the given index.

// lablup/raftify/blob/main/src/raft_node/mod.rs pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> { ... let snapshot_data = self.fsm.snapshot().await?; let last_applied = self.raw_node.raft.raft_log.applied; let store = self.raw_node.mut_store(); store.compact(last_applied)?; store.create_snapshot(snapshot_data, index, term)?; Ok(()) }

create_snapshot() records the snapshot metadata along with the snapshot data data received as an argument.

// lablup/raftify/blob/main/src/heed_storage/mod.rs fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> { let store = self.wl(); let mut writer = store.env.write_txn()?; let conf_state = store.conf_state(&writer)?; let mut snapshot = Snapshot::default(); snapshot.set_data(data); let meta = snapshot.mut_metadata(); meta.set_conf_state(conf_state); meta.index = index; meta.term = term; store.set_snapshot(&mut writer, &snapshot)?; writer.commit()?; Ok(()) }

2 - Transmitting Snapshots to Newly Joined Nodes

Scenario

When a new node joins the cluster, it must receive the state of the existing cluster to maintain consistency.

However, replicating every log entry individually each time a new node joins is inefficient. Since all nodes share the same state machine, transmitting just the snapshot—the result of applied log entries—instead of every log entry can solve this issue. The type of message used to transmit snapshot data is MsgSnapshot.

In this section, let's assume that Node 1 is the leader and Node 2 is the newly joined node. We will focus on the code and logs related to the MsgSnapshot message to understand what happens during this process.

In Raftify, a newly joined follower does not send a separate snapshot request to the leader node.

When a configuration change request (hereafter ConfChange) is committed, the leader tries to send this log entry to the newly joined node, which will reject this MsgAppend message because it does not have the required log entries.

Do you remember the scenario from a previous part where MsgAppend messages were rejected due to a network failure, causing inconsistencies between nodes?

In that scenario, inconsistencies were resolved by synchronizing mismatched log entries one by one using prepare_send_entries(). The difference when resolving log inconsistencies with a newly joined node is that instead of synchronizing log entries one by one, synchronization is done through a snapshot (prepare_send_snapshot()).

Now, let’s delve into the code and log analysis to understand in detail how this scenario unfolds.

Code Analysis

Let's start by examining the part of the code where the MsgAppend message sent by the leader to the newly joined node is rejected.

The maybe_send_append() function is as follows:

In the code below, since the progress for the newly joined node is empty, the call to self.raft_log.term() fails, and prepare_send_snapshot() is triggered, causing maybe_send_wait() to return false (rejecting the MsgAppend message).

// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let term = self.raft_log.term(pr.next_idx - 1); match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => { // wait for storage to fetch entries asynchronously return false; } _ => { // 💡 In this scenario, the following branch is executed. // send snapshot if we failed to get term or entries. if !self.prepare_send_snapshot(&mut m, pr, to) { return false; } } } } self.send(m, msgs); true }

The prepare_send_snapshot() called in this scenario is a function that retrieves snapshot data by invoking the self.raft_log.snapshot() method and sets this data in the message to be sent.

Afterwards, it marks the progress object of the node as being in a snapshot state before returning.

💡 The node's state being marked as snapshot indicates that the node is in the process of snapshot replication, which means that log replication to this node will be temporarily suspended.

// tikv/raft-rs/blob/master/src/raft.rs fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool { ... m.set_msg_type(MessageType::MsgSnapshot); let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to); if let Err(ref e) = snapshot_r { if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) { self.logger.debug( format!( "failed to send snapshot to {} because snapshot is temporarily unavailable", to ) .as_str(), ); return false; } self.logger .fatal(format!("unexpected error: {:?}", e).as_str()); } let snapshot = snapshot_r.unwrap(); if snapshot.get_metadata().index == 0 { self.logger.fatal("need non-empty snapshot"); } let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term); m.set_snapshot(snapshot); self.logger.debug(format!( "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}", first_index = self.raft_log.first_index(), committed = self.raft_log.committed, snapshot_index = sindex, snapshot_term = sterm, to = to, progress = format!("{:?}", pr) ).as_str()); pr.become_snapshot(sindex); self.logger.debug( format!( "paused sending replication messages to {}; progress: {:?}", to, pr ) .as_str(), ); true }

Thus, Raftify prepares the snapshot to be sent to the new node in advance via the RaftNode.make_snapshot() call when a ConfChange is committed, as we explored in scenario 1.

The transmitted snapshot is detected and recovered by the Snapshot handling logic in the newly joined node's Raft loop. Through the logic below, the state machine is restored using the received snapshot data via self.fsm.restore(), and it is also applied to Stable storage via store.apply_snapshot().

// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs async fn on_ready(&mut self) -> Result<()> { ... if *ready.snapshot() != Snapshot::default() { self.logger .info("Restoring state machine and snapshot metadata..."); let snapshot = ready.snapshot(); if !snapshot.get_data().is_empty() { self.fsm.restore(snapshot.get_data().to_vec()).await?; } let store = self.raw_node.mut_store(); store.apply_snapshot(snapshot.clone())?; } ... }

Leader Node Log Analysis

Now, let's analyze the logs generated on the leader node sequentially when a new node joins:

  1. Node 1 receives a join request from Node 2, and the cluster configuration is modified.
Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter. Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
  1. Because a new log entry has been added to the leader, it sends a MsgAppend message to Node 2 to replicate this log entry.
Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. However, since the newly joined node does not possess the information of the existing cluster, this MsgAppend message is rejected, and Node 1 receives a message indicating that the request has been rejected as follows.
Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8 Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. As described earlier, since the progress of the newly joined node is empty, the snapshot is stored in Stable storage, and log entries up to the specified index are removed. In this case, log entries up to index 8 are removed, and the log entry index corresponding to Node 2's join request is 9. Therefore, a snapshot message is sent along with a log indicating that the first_index is 8 and the commit is 9.
Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. Replication of log entries is paused to transmit the snapshot.
Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. A MsgSnapshot type message is sent to transmit the snapshot. The snapshot contains the data {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"} that was previously arbitrarily inserted.
Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }

Follower Node Log Analysis

Analyzing the logs generated on the newly joined follower node, we find:

  1. Becomes a new follower node at term 1.
Apr 15 06:37:27.421 INFO became follower at term 1
  1. Rejects the MsgAppend message received from the leader node.
Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0) Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. Since the node is detected as being in a failed state and unnecessary voting should be avoided, it should respond correctly to MsgHeartbeat messages.
Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. Receives the snapshot through a MsgSnapshot message.
Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1] Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
  1. Restores the state using the received snapshot.
Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1 Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1] Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata... Apr 15 06:37:27.425 DEBG snapshot's persisted index 9

3 - Recovery in Case of Failure in the Majority of Nodes

When a specific node fails, it is not a problem because the node can simply replicate new log entries from the leader node after the network is restored. Even if a node needs to join anew, it can recover its state through snapshots as discussed in scenario 2.

However, if failures occur in a majority of nodes, the cluster cannot recover on its own.

In this case, the administrator must intervene manually to decide which node's log sequence to consider as the correct state, and then re-bootstrap the cluster from that log sequence.

Depending on the administrator's judgment, recovery can be done by directly applying all log entries one by one to the state machine or by restoring the state from the last created snapshot.

State Recovery from WAL Snapshots

In this section, we will use the Raftify example code.

To recreate the example, we will first insert a few key-value pairs into Node 1, then call the make_snapshot() method via the /snapshot API to create a snapshot. We will then assume that the node has failed and shut it down.

To recover from a WAL snapshot, pass the node_id of the node to be recovered to the restore_wal_snapshot_from option. Here, we will recover from Node 1's snapshot, so we will use 1.

To verify whether the log entries have been applied, we will log "Inserted: (key, value)" each time apply() is called.

💡 apply() is also an abstract method of the StateMachine that Raftify users need to define, similar to restore(). It is called when log entries are committed.

After taking the snapshot and shutting down Node 1, we can use the CLI commands provided by Raftify to dump the storage, as shown below.

From the logs below, we can see that the snapshot is stored in the storage and contains data such as { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}.

❯ raftify-cli debug persisted-all ./logs *----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8

Then, let's re-bootstrap Node 1 using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1.

The logs generated on Node 1 will be as follows. Since the state is restored directly from the snapshot, apply() has not been executed for any of the log entries.

Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), } Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:54:44.705 INFO became follower at term 3 Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:54:44.705 INFO RawNode created with id 1. Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...

And then let's dump the storage again.

Since the recovery is from its own snapshot, we can confirm that no state changes have occurred.

*----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8

State Recovery from WAL Logs

This time, let's recover the state from a specific log sequence.

In this case, as shown in the logs below, the storage has an empty snapshot but contains log entries necessary for state recovery.

*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8

Let's assume a failure and shut down Node 1, then see what happens when we re-bootstrap it.

After shutting down Node 1, re-bootstrap it using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1.

The following logs will be printed on Node 1, showing that the previously entered log entries are applied at once via apply(), restoring the previous state.

Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, } Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:46:50.712 INFO became follower at term 1 Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:46:50.712 INFO RawNode created with id 1. Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"... Apr 15 07:46:50.855 DEBG Entries [1, 9) requested. // Applies log entries one by one to restore the state machine state. Inserted: (1, A) Inserted: (1, A) Inserted: (1, A) Inserted: (2, A) Inserted: (3, A) Inserted: (4, A) Inserted: (5, A)

Similarly, since the state is recovered to its pre-crash state, dumping the storage will show that it is the same as before. The difference is that, compared to the previous recovery via snapshot, all log entries were applied one by one this time.

*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8

Conclusion

In this article, following up from the previous one, we explored the issues of resolving log inconsistencies and recovery scenarios with snapshots, especially when there is a newly joined node.

Raftify is participating as an interactive project in the 2024 Open Source Contribution Academy and is recruiting mentees interested in implementing distributed systems! (Recruitment period: ~ 06.23)

Participants will have the opportunity to experience everything from learning the basic concepts of distributed systems to the actual implementation process, alongside mentors.

We look forward to your interest! Thank you 😊

We're here for you!

Complete the form and we'll be in touch soon

Contact Us

Headquarter & HPC Lab

8F, 577, Seolleung-ro, Gangnam-gu, Seoul, Republic of Korea

© Lablup Inc. All rights reserved.