Tag : distributed-system

  • Deconstructing a working Raft implementation - 2

    By Gyubong Lee

    In the previous post, we provided an overview centered around the types in raft-rs, and explored how leader elections occur during network failures, how log inconsistencies are resolved, and how a consistent state is maintained after overcoming failures, based on three scenarios.

    In this article, we will continue from the last post and examine the operation of the Raft implementation across several scenarios.

    The scenario we will look into involves the process by which the state of a Raft cluster is saved to stable storage, and how the cluster is rebooted and recovers the previous state from logs and snapshots.

    💡 Raftify is a high-level Raft implementation developed by Lablup. If you're curious about Raftify, check out this post.

    Exploring the raft-rs Architecture Centered Around Types

    In this article, before we proceed with the scenario analysis, let's first look at some of the types in raft-rs that will be featured in this discussion.

    ConfState

    The cluster is composed of multiple nodes, and each node is classified as either a voter or a learner depending on whether they participate in voting during a fault-induced election. Both voters and learners are members of the cluster and share the consensus, but learners do not participate in voting.

    Information about these cluster members is also included in the consensus among the members, and thus can be established or modified by applying log entries.

    💡 In raft-rs, the EntryType is divided into EntryConfChange for such ConfState configuration changes and EntryNormal for general state changes.

    Among the types used in raft-rs, those used in the network layer are defined in the eraftpb.proto file and are compiled into Rust code by tonic.

    message ConfState {
        repeated uint64 voters = 1;
        repeated uint64 learners = 2;
    
        // The voters in the outgoing config. If not empty the node is in joint consensus.
        repeated uint64 voters_outgoing = 3;
        // The nodes that will become learners when the outgoing config is removed.
        // These nodes are necessarily currently in nodes_joint (or they would have
        // been added to the incoming config right away).
        repeated uint64 learners_next = 4;
        // If set, the config is joint and Raft will automatically transition into
        // the final config (i.e. remove the outgoing config) when this is safe.
        bool auto_leave = 5;
    }
    

    voters_outgoing, learners_next, auto_leave are fields supporting joint consensus. In this article, we will omit the explanation of joint consensus.

    Snapshot, SnapshotMetadata

    To ensure system availability, logs cannot be indefinitely accumulated and must be deleted once they have been committed to the state machine.

    The process of removing logs up to a specific index in the log sequence is referred to as log compaction, and the recorded state up to that index after applying the log entries is called a snapshot.

    Snapshots, which are the main focus of this post, will be examined in detail in the scenario analysis below. They are used to transfer the state of the cluster to newly joined nodes or for recovery from failures.

    message Snapshot {
        bytes data = 1;
        SnapshotMetadata metadata = 2;
    }
    
    message SnapshotMetadata {
        // The current `ConfState`.
        ConfState conf_state = 1;
        // The applied index.
        uint64 index = 2;
        // The term of the applied index.
        uint64 term = 3;
    }
    

    SnapshotMetadata represents the metadata of a snapshot at the time it was created.

    Specifically, each field signifies the following:

    • conf_state: Indicates the cluster membership information at the time the snapshot was created.
    • index: Represents the index of the last log entry that was compacted when the snapshot was created.
    • term: Indicates the term value of the last log entry at the time the snapshot was created.

    Such metadata is essential for maintaining log consistency when utilizing snapshots.

    For instance, when restoring state information from a snapshot, if the term of the log entry at the snapshot's index does not match the term in the snapshot metadata, the snapshot application request must be ignored to maintain consistency.

    Scenario Analysis

    1 - Snapshot Recording

    In Raftify, snapshot creation is initiated by calling the make_snapshot() method of a RaftNode, passing the specific index and the term value of the log entry at that index as arguments.

    The data to be stored in the snapshot comes from the return of the self.fsm.snapshot() method, which represents the current state of the state machine.

    💡 The self.fsm.snapshot() method can be implemented differently depending on how the Finite State Machine (FSM) is to be stored, and it is one of the implementations that Raftify users must provide. For example, in the HashStore example, where the FSM is stored in memory, snapshot() simply serializes and returns a HashMap.

    Passing the index of the last log entry applied to the state machine, last_applied, to compact(), deletes log entries up to the given index.

    // lablup/raftify/blob/main/src/raft_node/mod.rs
    pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> {
        ...
        let snapshot_data = self.fsm.snapshot().await?;
    
        let last_applied = self.raw_node.raft.raft_log.applied;
        let store = self.raw_node.mut_store();
        store.compact(last_applied)?;
        store.create_snapshot(snapshot_data, index, term)?;
        Ok(())
    }
    

    create_snapshot() records the snapshot metadata along with the snapshot data data received as an argument.

    // lablup/raftify/blob/main/src/heed_storage/mod.rs
    fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> {
        let store = self.wl();
        let mut writer = store.env.write_txn()?;
        let conf_state = store.conf_state(&writer)?;
    
        let mut snapshot = Snapshot::default();
        snapshot.set_data(data);
    
        let meta = snapshot.mut_metadata();
        meta.set_conf_state(conf_state);
        meta.index = index;
        meta.term = term;
    
        store.set_snapshot(&mut writer, &snapshot)?;
        writer.commit()?;
        Ok(())
    }
    

    2 - Transmitting Snapshots to Newly Joined Nodes

    Scenario

    When a new node joins the cluster, it must receive the state of the existing cluster to maintain consistency.

    However, replicating every log entry individually each time a new node joins is inefficient. Since all nodes share the same state machine, transmitting just the snapshot—the result of applied log entries—instead of every log entry can solve this issue. The type of message used to transmit snapshot data is MsgSnapshot.

    In this section, let's assume that Node 1 is the leader and Node 2 is the newly joined node. We will focus on the code and logs related to the MsgSnapshot message to understand what happens during this process.

    In Raftify, a newly joined follower does not send a separate snapshot request to the leader node.

    When a configuration change request (hereafter ConfChange) is committed, the leader tries to send this log entry to the newly joined node, which will reject this MsgAppend message because it does not have the required log entries.

    Do you remember the scenario from a previous part where MsgAppend messages were rejected due to a network failure, causing inconsistencies between nodes?

    In that scenario, inconsistencies were resolved by synchronizing mismatched log entries one by one using prepare_send_entries(). The difference when resolving log inconsistencies with a newly joined node is that instead of synchronizing log entries one by one, synchronization is done through a snapshot (prepare_send_snapshot()).

    Now, let’s delve into the code and log analysis to understand in detail how this scenario unfolds.

    Code Analysis

    Let's start by examining the part of the code where the MsgAppend message sent by the leader to the newly joined node is rejected.

    The maybe_send_append() function is as follows:

    In the code below, since the progress for the newly joined node is empty, the call to self.raft_log.term() fails, and prepare_send_snapshot() is triggered, causing maybe_send_wait() to return false (rejecting the MsgAppend message).

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => {
                    // wait for storage to fetch entries asynchronously
                    return false;
                }
                _ => {
                    // 💡 In this scenario, the following branch is executed.
                    // send snapshot if we failed to get term or entries.
                    if !self.prepare_send_snapshot(&mut m, pr, to) {
                        return false;
                    }
                }
            }
        }
        self.send(m, msgs);
        true
    }
    

    The prepare_send_snapshot() called in this scenario is a function that retrieves snapshot data by invoking the self.raft_log.snapshot() method and sets this data in the message to be sent.

    Afterwards, it marks the progress object of the node as being in a snapshot state before returning.

    💡 The node's state being marked as snapshot indicates that the node is in the process of snapshot replication, which means that log replication to this node will be temporarily suspended.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
        ...
        m.set_msg_type(MessageType::MsgSnapshot);
        let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
        if let Err(ref e) = snapshot_r {
            if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
                self.logger.debug(
                    format!(
                        "failed to send snapshot to {} because snapshot is temporarily unavailable",
                        to
                    )
                    .as_str(),
                );
                return false;
            }
            self.logger
                .fatal(format!("unexpected error: {:?}", e).as_str());
        }
        let snapshot = snapshot_r.unwrap();
        if snapshot.get_metadata().index == 0 {
            self.logger.fatal("need non-empty snapshot");
        }
        let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term);
        m.set_snapshot(snapshot);
        self.logger.debug(format!(
            "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}",
            first_index = self.raft_log.first_index(),
            committed = self.raft_log.committed,
            snapshot_index = sindex,
            snapshot_term = sterm,
            to = to,
            progress = format!("{:?}", pr)
        ).as_str());
    
        pr.become_snapshot(sindex);
        self.logger.debug(
            format!(
                "paused sending replication messages to {}; progress: {:?}",
                to, pr
            )
            .as_str(),
        );
        true
    }
    

    Thus, Raftify prepares the snapshot to be sent to the new node in advance via the RaftNode.make_snapshot() call when a ConfChange is committed, as we explored in scenario 1.

    The transmitted snapshot is detected and recovered by the Snapshot handling logic in the newly joined node's Raft loop. Through the logic below, the state machine is restored using the received snapshot data via self.fsm.restore(), and it is also applied to Stable storage via store.apply_snapshot().

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    async fn on_ready(&mut self) -> Result<()> {
        ...
        if *ready.snapshot() != Snapshot::default() {
            self.logger
                .info("Restoring state machine and snapshot metadata...");
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
        ...
    }
    

    Leader Node Log Analysis

    Now, let's analyze the logs generated on the leader node sequentially when a new node joins:

    1. Node 1 receives a join request from Node 2, and the cluster configuration is modified.
    Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter.
    Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
    
    1. Because a new log entry has been added to the leader, it sends a MsgAppend message to Node 2 to replicate this log entry.
    Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. However, since the newly joined node does not possess the information of the existing cluster, this MsgAppend message is rejected, and Node 1 receives a message indicating that the request has been rejected as follows.
    Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8
    Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. As described earlier, since the progress of the newly joined node is empty, the snapshot is stored in Stable storage, and log entries up to the specified index are removed. In this case, log entries up to index 8 are removed, and the log entry index corresponding to Node 2's join request is 9. Therefore, a snapshot message is sent along with a log indicating that the first_index is 8 and the commit is 9.
    Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. Replication of log entries is paused to transmit the snapshot.
    Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. A MsgSnapshot type message is sent to transmit the snapshot. The snapshot contains the data {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"} that was previously arbitrarily inserted.
    Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    

    Follower Node Log Analysis

    Analyzing the logs generated on the newly joined follower node, we find:

    1. Becomes a new follower node at term 1.
    Apr 15 06:37:27.421 INFO became follower at term 1
    
    1. Rejects the MsgAppend message received from the leader node.
    Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0)
    Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. Since the node is detected as being in a failed state and unnecessary voting should be avoided, it should respond correctly to MsgHeartbeat messages.
    Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. Receives the snapshot through a MsgSnapshot message.
    Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1]
    Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    
    1. Restores the state using the received snapshot.
    Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1
    Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1]
    Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata...
    Apr 15 06:37:27.425 DEBG snapshot's persisted index  9
    

    3 - Recovery in Case of Failure in the Majority of Nodes

    When a specific node fails, it is not a problem because the node can simply replicate new log entries from the leader node after the network is restored. Even if a node needs to join anew, it can recover its state through snapshots as discussed in scenario 2.

    However, if failures occur in a majority of nodes, the cluster cannot recover on its own.

    In this case, the administrator must intervene manually to decide which node's log sequence to consider as the correct state, and then re-bootstrap the cluster from that log sequence.

    Depending on the administrator's judgment, recovery can be done by directly applying all log entries one by one to the state machine or by restoring the state from the last created snapshot.

    State Recovery from WAL Snapshots

    In this section, we will use the Raftify example code.

    To recreate the example, we will first insert a few key-value pairs into Node 1, then call the make_snapshot() method via the /snapshot API to create a snapshot. We will then assume that the node has failed and shut it down.

    To recover from a WAL snapshot, pass the node_id of the node to be recovered to the restore_wal_snapshot_from option. Here, we will recover from Node 1's snapshot, so we will use 1.

    To verify whether the log entries have been applied, we will log "Inserted: (key, value)" each time apply() is called.

    💡 apply() is also an abstract method of the StateMachine that Raftify users need to define, similar to restore(). It is called when log entries are committed.

    After taking the snapshot and shutting down Node 1, we can use the CLI commands provided by Raftify to dump the storage, as shown below.

    From the logs below, we can see that the snapshot is stored in the storage and contains data such as { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}.

    ❯ raftify-cli debug persisted-all ./logs
    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    Then, let's re-bootstrap Node 1 using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1.

    The logs generated on Node 1 will be as follows. Since the state is restored directly from the snapshot, apply() has not been executed for any of the log entries.

    Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), }
    Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:54:44.705 INFO became follower at term 3
    Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:54:44.705 INFO RawNode created with id 1.
    Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    

    And then let's dump the storage again.

    Since the recovery is from its own snapshot, we can confirm that no state changes have occurred.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    State Recovery from WAL Logs

    This time, let's recover the state from a specific log sequence.

    In this case, as shown in the logs below, the storage has an empty snapshot but contains log entries necessary for state recovery.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    Let's assume a failure and shut down Node 1, then see what happens when we re-bootstrap it.

    After shutting down Node 1, re-bootstrap it using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1.

    The following logs will be printed on Node 1, showing that the previously entered log entries are applied at once via apply(), restoring the previous state.

    Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, }
    Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:46:50.712 INFO became follower at term 1
    Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:46:50.712 INFO RawNode created with id 1.
    Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    Apr 15 07:46:50.855 DEBG Entries [1, 9) requested.
    
    // Applies log entries one by one to restore the state machine state.
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (2, A)
    Inserted: (3, A)
    Inserted: (4, A)
    Inserted: (5, A)
    

    Similarly, since the state is recovered to its pre-crash state, dumping the storage will show that it is the same as before. The difference is that, compared to the previous recovery via snapshot, all log entries were applied one by one this time.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    Conclusion

    In this article, following up from the previous one, we explored the issues of resolving log inconsistencies and recovery scenarios with snapshots, especially when there is a newly joined node.

    Raftify is participating as an interactive project in the 2024 Open Source Contribution Academy and is recruiting mentees interested in implementing distributed systems! (Recruitment period: ~ 06.23)

    Participants will have the opportunity to experience everything from learning the basic concepts of distributed systems to the actual implementation process, alongside mentors.

    We look forward to your interest! Thank you 😊

    29 May 2024

  • Deconstructing a working Raft implementation - 1

    By Gyubong Lee

    In this article, we'll assume that the reader has a theoretical background in Raft, and we'll dig into the tikv/raft-rs code to see how state machines in distributed systems actually synchronize and behave in a few brief scenarios.

    While this article focuses on analyzing the RAFT-RS code, we will use the RAFTIFY source code as an example in some sections for a complete understanding, as the RAFT-RS implementation does not include network and storage layers for flexibility.

    💡 raftify is a high-level Raft implementation developed by Lablup. In this post, I'm only going to describe raftify with minimal code to understand how raft works. If you're curious about raftify, check out this post.

    Img from: https://github.com/tikv/raft-rs

    RAFT-RS architecture with a focus on ## types

    Before we dive into the scenarios, let's take a quick look at the architecture, focusing on the typical types used in the code base.

    Raft

    The Raft object of each Raft node holds a message queue msgs in memory and interacts with other Raft nodes through this queue.

    In a high-level implementation like raftify, the network layer is responsible for putting messages into this queue through an abstraction layer that will be described later.

    This message queue can therefore be seen as an endpoint for communication, and the Raft implementation will process these messages according to its current state, maintaining a consistent state between nodes.

    The RaftCore type holds the data corresponding to the state of this Raft node.

    There is also a type called Progress that holds metadata for synchronizing log entries with other Raft nodes, and these are updated appropriately in the ProgressTracker depending on the situation.

    As a result, Raft has the following types

    // tikv/raft-rs/blob/master/src/raft.rs
    pub struct Raft<T: Storage> {
        pub msgs: Vec<Message>,
        pub r: RaftCore<T>,
        prs: ProgressTracker,
    }
    

    RaftLog

    Representative of the data that RaftCore has is RaftLog, which abstracts access to a sequence of log entries.

    RaftLog<T: Storage> abstracts the types Unstable and T so that they can be handled together. Here, T corresponds to persistent storage that needs to be implemented at a higher level, such as raftify, and Unstable is a buffer that goes through before being written to this storage.

    // tikv/raft-rs/blob/master/src/raft_log.rs
    pub struct RaftLog<T: Storage> {
        pub store: T,
        pub unstable: Unstable,
    
        ...
    }
    

    💡 If you're interested in learning more about the RaftCore type, check out this link.

    Raft Loop

    Raft implementations perform an iterative process of updating their state machine in an infinite loop in order to communicate with other Raft nodes and maintain a consistent state. In this article, we'll call this loop a Raft loop.

    The source code for implementing a Raft loop in raftify is shown below.

    (You can also see the example code in tikv/raft-rs if you want to see the most minimal implementation).

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    async fn on_ready(&mut self) -> Result<()> {
        if !self.raw_node.has_ready() {
            return Ok(());
        }
        let mut ready = self.raw_node.ready();
    
        if !ready.messages().is_empty() {
            self.send_messages(ready.take_messages()).await;
        }
    
        if *ready.snapshot() != Snapshot::default() {
            slog::info!(
                self.logger,
                "Restoring state machine and snapshot metadata..."
            );
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
    
        self.handle_committed_entries(ready.take_committed_entries())
            .await?;
    
        if !ready.entries().is_empty() {
            let entries = &ready.entries()[..];
            let store = self.raw_node.mut_store();
            store.append(entries)?;
        }
    
        if let Some(hs) = ready.hs() {
            let store = self.raw_node.mut_store();
            store.set_hard_state(hs)?;
        }
    
        if !ready.persisted_messages().is_empty() {
            self.send_messages(ready.take_persisted_messages()).await;
        }
    
        let mut light_rd = self.raw_node.advance(ready);
    
        if let Some(commit) = light_rd.commit_index() {
            let store = self.raw_node.mut_store();
            store.set_hard_state_commit(commit)?;
        }
    
        if !light_rd.messages().is_empty() {
            self.send_messages(light_rd.take_messages()).await;
        }
    
        self.handle_committed_entries(light_rd.take_committed_entries())
            .await?;
    
        self.raw_node.advance_apply();
    
        Ok(())
    }
    

    RawNode

    Each Raft node has a higher-level instance of a type called RawNode that contains the Raft module. A RawNode has a records field that represents the metadata of SoftState, a state that is kept only in memory, HardState, a state that is stored in persistent storage, and Ready, which is not yet stored.

    💡 Ready is the data structure that is passed to the Raft node when it needs to be updated.

    // tikv/raft-rs/blob/master/src/raw_node.rs
    pub struct RawNode<T: Storage> {
        pub raft: Raft<T>,
        prev_ss: SoftState,
        prev_hs: HardState,
        max_number: u64,
        records: VecDeque<ReadyRecord>,
        commit_since_index: u64,
    }
    

    In the first part of the Raft loop, when the ready method is called, the metadata from Ready is stored in records, and after all the snapshots, entries, etc. that need to be stored are processed, the last part of the loop, advance, calls commit_ready and updates the offset of the buffer Unstable.

    RaftNode

    A RaftNode is a type that raftify abstracts a RawNode at a higher level, integrating it with the network and storage layers.

    In a separate asynchronous task, raftify receives messages sent by the gRPC client and passes them over the channel to the RaftNode.run task.

    After processing the messages, it handles state changes in a function (Raft loop) named on_ready.

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    pub async fn run(mut self) -> Result<()> {
        let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval);
        let fixed_tick_timer = tick_timer;
        let mut now = Instant::now();
    
        loop {
            ...
            tokio::select! {
                msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => {
                    if let Ok(Some(msg)) = msg {
                        self.handle_server_request_msg(msg).await?;
                    }
                }
                ...
            }
    
            let elapsed = now.elapsed();
            now = Instant::now();
            if elapsed > tick_timer {
                tick_timer = Duration::from_millis(100);
                self.raw_node.tick();
            } else {
                tick_timer -= elapsed;
            }
    
            self.on_ready().await?
        }
    }
    

    To explain raftify's implementation in more detail, raftify iterates through the following process

    1. generate a request from the client (e.g. call RaftServiceClient.propose or RaftNode.propose)
    2. RaftServiceClient.propose on the remote Raft node is called via gRPC.
    3. RaftServiceClient.proposepasses theProposemessage over the channel to theRaftNode.run` coroutine. 4.
    4. RaftNode.run polls the message queue and calls RawNode::propose when it receives a Propose message.
    5. when there are changes to the state machine that need to be applied, a Ready instance is created and passed to the on_ready handler.
    6. when entries are committed, the on_ready handler processes the committed entries and responds to the client.

    With the theoretical stuff out of the way, let's analyze a few scenarios and see what happens.

    💡 What we arbitrarily call Propose messages in this paragraph is a type of message defined for the purpose of proposing a state change to the cluster.

    Scenario analysis.

    1 - Add a new log entry

    What happens under the hood when you request (propose) a change to the cluster to alter its state machine? In this section, we'll break down what happens when you call RawNode.propose. Here's a look at the RawNode.propose function

    // tikv/raft-rs/blob/master/src/raw_node.rs
    pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
        let mut m = Message::default();
        m.set_msg_type(MessageType::MsgPropose);
        m.from = self.raft.id;
        let mut e = Entry::default();
        e.data = data.into();
        e.context = context.into();
        m.set_entries(vec![e].into());
        self.raft.step(m)
    }
    

    From the code above, you can see that the propose function calls step to make it handle a message of type MsgPropose.

    Here, step is the function that corresponds to the actual message handler in raft-rs. If the node calling step is the leader, step_leader is called, if it is a follower, step_follower is called, and if it is a candidate, step_candidate is called.

    The code for step is quite complex, but let's follow the code to see how the MsgPropose type is handled on the leader node.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_leader(&mut self, mut m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgPropose => {
                ...
                if !self.append_entry(m.mut_entries()) {
                    ...
                }
                self.bcast_append();
                return Ok(());
            }
        ...
        }
    }
    

    Raft.append_entrycallsRaftLog.append to add an entry. RaftLog.append appends the entries added to the Unstable buffer by self.unstable.truncate_and_append.

    // tikv/raft-rs/blob/master/src/raft_log.rs
    pub fn append(&mut self, ents: &[Entry]) -> u64 {
        ...
        self.unstable.truncate_and_append(ents);
        self.last_index()
    }
    

    As previously described, the entries added to the buffer will be persisted in a Raft loop, and updating the state machine via an advance-like function will automatically update the offset and clear the buffer.

    Let's take a look at the next call, bcast_append.

    You can see that we're calling core.send_append with each follower's progress as an argument, using the ProgressTracker (prs) described in the previous section to synchronize the log entries of the leader and followers.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn bcast_append(&mut self) {
        let self_id = self.id;
        let core = &mut self.r;
        let msgs = &mut self.msgs;
        self.prs
            .iter_mut()
            .filter(|&(id, _)| *id != self_id)
            .for_each(|(id, pr)| core.send_append(*id, pr, msgs));
    }
    

    The send_append has the following simple structure

    // tikv/raft-rs/blob/master/src/raft.rs
    fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) {
        self.maybe_send_append(to, pr, true, msgs);
    }
    

    The maybe_send_append reads the log entries in the range pr.next_idx to to via RaftLog.entries and passes them to prepare_send_entries.

    (As you can infer from the maybe_ prefix to its name, the function returns true on success and false on failure.)

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let ents = self.raft_log.entries(
            pr.next_idx,
            self.max_msg_size,
            GetEntriesContext(GetEntriesFor::SendAppend {
                to,
                term: self.term,
                aggressively: !allow_empty,
            }),
        );
        ...
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        ...
        self.send(m, msgs);
        true
    }
    

    Prepare_send_entriescreates a message object m of typeMsgAppendand puts the entries into the message. It then updatesprogress` and returns it.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn prepare_send_entries(
        &mut self,
        m: &mut Message,
        pr: &mut Progress,
        term: u64,
        ents: Vec<Entry>,
    ) {
        m.set_msg_type(MessageType::MsgAppend);
        m.index = pr.next_idx - 1;
        m.log_term = term;
        m.set_entries(ents.into());
        m.commit = self.raft_log.committed;
        if !m.entries.is_empty() {
            let last = m.entries.last().unwrap().index;
            pr.update_state(last);
        }
    }
    

    Then self.send(m, msgs) puts this prepared message into the msgs message queue.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
        ...
        msgs.push(m);
    }
    

    The MsgAppend message that enters the message queue will be sent to the follower node from send_messages through the network layer. Therefore, we need to see how the follower node handles the MsgAppend message after receiving it.

    Next, let's take a look at what happens on the follower node To find out what happens when a follower node receives an MsgAppend message, we can look at step_follower.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_follower(&mut self, mut m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgAppend => {
                self.election_elapsed = 0;
                self.leader_id = m.from;
                self.handle_append_entries(&m);
            }
            ...
        }
    }
    

    From the code above, you can see that the follower node that received the MsgAppend message is calling handle_append_entries.

    This function creates a to_send, a message of type MsgAppendResponse, and calls RaftLog.maybe_append, as shown below.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
    
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
            // MsgAppend 메시지를 수신
        } else {
            ...
            // MsgAppend 메시지를 거절
        }
        ...
        self.r.send(to_send, &mut self.msgs);
    }
    

    This function calls match_term to check if the message's logTerm and the log entry's term values are the same, calls find_conflict to check for conflicts in the log entry sequence, and calls Raft.append if it determines there are no problems.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn maybe_append(
        &mut self,
        idx: u64,
        term: u64,
        committed: u64,
        ents: &[Entry],
    ) -> Option<(u64, u64)> {
        if self.match_term(idx, term) {
            let conflict_idx = self.find_conflict(ents);
            if conflict_idx == 0 {
            } else if conflict_idx <= self.committed {
                fatal!(
                    self.unstable.logger,
                    "entry {} conflict with committed entry {}",
                    conflict_idx,
                    self.committed
                )
            } else {
                let start = (conflict_idx - (idx + 1)) as usize;
                self.append(&ents[start..]);
    
                if self.persisted > conflict_idx - 1 {
                    self.persisted = conflict_idx - 1;
                }
            }
            let last_new_index = idx + ents.len() as u64;
            self.commit_to(cmp::min(committed, last_new_index));
            return Some((conflict_idx, last_new_index));
        }
        None
    }
    

    We've seen this function before. It was the last function called before the call to RaftLog.append when a log entry was proposed by the leader node.

    As before, Raft.append_entry calls RaftLog.append to add the entry. RaftLog.appendappends the entries added to the Unstable buffer fromself.unstable.truncate_and_append`.

    This outlines a scenario where logs added to the leader are persisted on the leader node and copied to the follower nodes.

    2 - Leader and follower node log sequence mismatch

    In scenario 1, we looked at the code assuming a normal situation, but in reality, issues such as network disconnection can cause mismatches between leader and follower nodes. Let's take another look at the code, this time focusing on how to detect and resolve mismatches between leader and follower nodes.

    Let's say you have a cluster of three nodes that is processing thousands of requests that are successively changing the state machine, and then a network failure occurs.

    In the event of a failure, we should start by looking at the logs written to the nodes, persisted log entries, and debugging information to get some context, but to avoid making this post too long, we'll just pick out the logs that will give us a general idea of what's happening on the nodes and analyze them.

    First of all, node 3 is leaving a rejected msgApp... log indicating that it has rejected a message.

    Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7
    

    From the log above, we can see that node 3 is a follower node, node 2 is the newly elected leader node after the failure, and that the MsgAppend message trying to replicate the 3641th entry was rejected.

    If we look up what function this log is output from, we can see that it is called from handle_append_entries, which we saw in Scenario 1 (the function that handles the MsgAppend messages that the follower receives from the leader).

    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
        ...
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
        } else {
            debug!(
                self.logger,
                "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
                from {from}",
                msg_log_term = m.log_term,
                msg_index = m.index,
                from = m.from;
                "index" => m.index,
                "logterm" => ?self.raft_log.term(m.index),
            );
    
            let hint_index = cmp::min(m.index, self.raft_log.last_index());
            let (hint_index, hint_term) =
                self.raft_log.find_conflict_by_term(hint_index, m.log_term);
    
            if hint_term.is_none() {
                fatal!(
                    self.logger,
                    "term({index}) must be valid",
                    index = hint_index
                )
            }
    
            to_send.index = m.index;
            to_send.reject = true;
            to_send.reject_hint = hint_index;
            to_send.log_term = hint_term.unwrap();
        }
    
        to_send.set_commit(self.raft_log.committed);
        self.r.send(to_send, &mut self.msgs);
    }
    

    If you look at the function, you can see that this log was output, which means that maybe_append returned None, which means that match_term returned False. This means that there is a mismatch between the logTerm in the message and the value of term in entry 3641.

    So we find the point of conflict via term (find_conflict_by_term) and put the point of conflict (hint_index) into the reject_hint of the message and send it back to the reader in the form of an MsgAppendResponse message.

    So what does the leader do with this rejected MsgAppendResponse message?

    The leader node that rejected the message will leave a log that the MsgAppend was rejected, as shown below.

    Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611
    

    So the next thing we need to look at is the function that receives this rejected MsgAppend message and outputs "received msgAppend rejection".

    This function is called handle_append_response, and while the function itself is quite long, it's not that long when you cut it down to just what happens when an MsgAppend is rejected.

    fn handle_append_response(&mut self, m: &Message) {
        let mut next_probe_index: u64 = m.reject_hint;
        ...
        if m.reject {
            debug!(
                self.r.logger,
                "received msgAppend rejection";
                "reject_hint_index" => m.reject_hint,
                "reject_hint_term" => m.log_term,
                "from" => m.from,
                "index" => m.index,
            );
    
            if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
                debug!(
                    self.r.logger,
                    "decreased progress of {}",
                    m.from;
                    "progress" => ?pr,
                );
                if pr.state == ProgressState::Replicate {
                    pr.become_probe();
                }
    
                self.send_append(m.from);
            }
            return;
        }
        ...
    }
    

    Take the reject_hint from the message and make it the next_probe_index, and call Progress.maybe_decr_to to decrement the progress. Indicate that Progress is in the probe state, and call send_append to send another MsgAppend message.

    💡 ProgressState is an enum that represents the synchronization progress of each node. Under normal circumstances, it is "Replicate" if the node is replicating logs, "Probe" if the follower node does not know the last index that was replicated, and "Snapshot" if the node is in a probing state and is replicating logs to the follower by sending snapshots.

    To summarize, to find the index (next_probe_index) of the log entry before the collision, we decrement the node's progress and send another MsgAppend message. This process is repeated until we find the Common log prefix of the leader and follower nodes.

    Once the Common log prefix is found, log entries after that index are replicated in a unidirectional fashion from the leader to the follower and overwritten. This process can be seen in the maybe_send_append function.

    The log entries obtained through RaftLog.entries are replicated into the SendAppend context as shown below. Here, max_msg_size is max_size_per_msg from Config, which defaults to 0. With RaftLog.entries, the max_size of the LMDBStorage.entries (persistent storage type, corresponding to T in RaftLog) argument is given 0, which, based on this comment, means that if you don't set it, it will synchronize log entries one by one when there is a mismatch in the logs of the leader and follower nodes.

    After that, prepare_send_entries is used to prepare the MsgAppend message as described in the previous section, and Raft.send is used to replicate the entries to the follower node.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let mut m = Message::default();
        m.to = to;
        if pr.pending_request_snapshot != INVALID_INDEX {
            ...
        } else {
            let ents = self.raft_log.entries(
                pr.next_idx,
                self.max_msg_size,
                GetEntriesContext(GetEntriesFor::SendAppend {
                    to,
                    term: self.term,
                    aggressively: !allow_empty,
                }),
            );
            ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        }
        self.send(m, msgs);
        true
    }
    

    There are a lot of logs missing in the middle, but you can see that after the synchronization between the leader and the follower has occurred through the above process from the *3612th entry to the *3642nd entry, the follower's progress state changes to Replicate and it starts sending and receiving Heartbeat messages normally.

    2023-11-28 14:30:59,269 - INFO     - Entries [3612, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3613, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3614, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3615, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    ...
    
    2023-11-28 14:30:59,284 - INFO     - Entries [3641, 3643) requested
    Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,284 - INFO     - Entries [3642, 3643) requested
    Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:31:01,637
    

    3 - Electing a leader

    In Scenario 2, we could tell from the increase in the term value that the leader election was caused by a network failure, but in this scenario we'll take a closer look at the leader election process.

    To see what logs would be taken if the leader failed, we'll simply create a cluster of 3 nodes, force the leader process to shut down, and look at the logs of the process that is newly elected leader.

    To summarize the logs, after the leader node shuts down, node 3 starts the election and transitions to the Candidate state and sends a MsgRestVote message to the other voters. The process can be summarized as: you receive a MsgRequestVoteResponse message from node 2, you are elected as the new leader because you received a majority of the votes for yourself, you increase the term value to 2, and you send a special kind of message (an empty MsgAppend) to announce that you are the elected leader.

    💡 A follower node that has not received a heartbeat message by election_tick will start voting. In this case, to avoid split vote, election_tick is determined to be a random value between min_election_tick and max_election_tick each time. Therefore, after the leader node is terminated, any of the remaining two nodes can become the leader node, and it will be elected as the node with the smaller election_tick.

    Nov 29 01:30:30.210 INFO starting a new election, term: 1
    Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16
    Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2
    Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote
    2023-11-29 10:30:30,217 - WARNING  - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused"
        debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}"
    >
    2023-11-29 10:30:30,222 - DEBUG    - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }
    Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true
    Nov 29 01:30:30.223 TRCE ENTER become_leader
    Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10
    Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }
    Nov 29 01:30:30.223 INFO became leader at term 2, term: 2
    Nov 29 01:30:30.223 TRCE EXIT become_leader
    Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    

    Let's take a look at the logs to see what's going on in the code.

    First of all, the function that is printing the log "starting a new election" is hup.

    hupis called during the processing of messages of typeMsgHupfromstepandMsgTimeoutNowfromstep_follower`.

    Note that the MsgTimeoutNow message is the message type used for Leader transfer, not Leader election. This means that when the leader receives the MsgTransferLeader message, it will send a message of type MsgTimeoutNow to its followers and the hup function will be executed with the transfer_leader flag set to True. While Leader election is the process of electing a new leader in the event of a leader failure, Leader transfer is the process of a leader process transferring leadership to another follower process.

    So we can see that the message we need to follow now is MsgHup. We can guess that it was the tick_election function below that put in the MsgHup message because we didn't get a Heartbeat after the election_tick, so we started electing a leader.

    Remember how we called self.raw_node.tick() every tick_timer on RaftNode? This RawNode.tick allows the node to step an MsgHup message to itself if the election_elapsed has passed the randomized_election_timeout. (Randomizing the election_elapsed here is to prevent a situation where all nodes start voting at the same time and all nodes vote for themselves).

    // raw_node.rs
    pub fn tick(&mut self) -> bool {
        self.raft.tick()
    }
    
    // raft.rs
    pub fn tick(&mut self) -> bool {
        match self.state {
            StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
                self.tick_election()
            }
            StateRole::Leader => self.tick_heartbeat(),
        }
    }
    
    // raft.rs
    pub fn tick_election(&mut self) -> bool {
        self.election_elapsed += 1;
        if !self.pass_election_timeout() || !self.promotable {
            return false;
        }
        
        self.election_elapsed = 0;
        let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
        let _ = self.step(m);
        true
    }
    
    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            ...
            MessageType::MsgHup => {
                self.hup(false)
            },
        }
    }
    
    // raft.rs
    pub fn pass_election_timeout(&self) -> bool {
        self.election_elapsed >= self.randomized_election_timeout
    }
    

    The hup function runs the campaign function with the CAMPAIGN_ELECTION type, as shown below to summarize.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn hup(&mut self, transfer_leader: bool) {
        ...
        info!(
            self.logger,
            "starting a new election";
            "term" => self.term,
        );
    
        ...
        self.campaign(CAMPAIGN_ELECTION);
    }
    

    The campaign function transitions its own state to the Candidate state and starts voting, as shown below.

    First of all, self_id is the node's own id, as the name suggests, so self.poll(self_id, vote_msg, true) means to vote for yourself.

    If the result is VoteResult::Won, then the node wins the vote as it is and returns as the leader.

    So you can see that messages like MsgRequestVote, MsgRequestVoteResponse, etc. will not be sent back and forth in a single-node cluster.

    But of course, this scenario is not the case because it is not a single-node cluster.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        ...
    }
    

    Before we dive into the latter part of campaign, let's take a look at how poll works.

    The poll is a function that calls record_vote, tally_votes, and depending on the result of the poll, if it wins the vote, it transitions to the leader node and broadcasts (bcast_append) that it is the new leader of the cluster.

    If it loses the vote, it transitions to a follower node, and if the result is Pending, it returns without doing anything.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
        self.prs.record_vote(from, vote);
        let (gr, rj, res) = self.prs.tally_votes();
        if from != self.id {
            info!(
                self.logger,
                "received votes response";
                "vote" => vote,
                "from" => from,
                "rejections" => rj,
                "approvals" => gr,
                "type" => ?t,
                "term" => self.term,
            );
        }
    
        match res {
            VoteResult::Won => {
                if self.state == StateRole::PreCandidate {
                    self.campaign(CAMPAIGN_ELECTION);
                } else {
                    self.become_leader();
                    self.bcast_append();
                }
            }
            VoteResult::Lost => {
                let term = self.term;
                self.become_follower(term, INVALID_ID);
            }
            VoteResult::Pending => (),
        }
        res
    }
    

    The role of record_vote is quite simple. It records in the hashmap object votes of the ProgressTracker when a node with the value id has voted for itself.

    // tikv/raft-rs/blob/master/src/tracker.rs
    pub fn record_vote(&mut self, id: u64, vote: bool) {
        self.votes.entry(id).or_insert(vote);
    }
    

    Let's look at tally_votes. You can see that the hashmap votes is counting the number of nodes that voted for you and the number of nodes that rejected you, and returning them as a tuple.

    💡 The word "tally" refers to the act of counting or aggregating points, so "tally_votes" is a function that counts and aggregates votes.

    // tikv/raft-rs/blob/master/src/tracker.rs
    pub fn tally_votes(&self) -> (usize, usize, VoteResult) {
        let (mut granted, mut rejected) = (0, 0);
        for (id, vote) in &self.votes {
            if !self.conf.voters.contains(*id) {
                continue;
            }
            if *vote {
                granted += 1;
            } else {
                rejected += 1;
            }
        }
        let result = self.vote_result(&self.votes);
        (granted, rejected, result)
    }
    

    Let's take a look at how we determine the outcome of a vote.

    For a joint quorum, we need to get the consensus of both quorums (Incoming quorum, Outgoing quorum) to win the vote.

    So we need to look at the three vote_result functions below.

    In tracker.rs, we pass as an argument the callback function check, which allows the node id to know if the hashmap votes has voted for it.

    In joint.rs, we return VoteResult::Won only if both configurations win, and VoteResult::Lost if either side loses the vote. Otherwise, we return VoteResult::Pending.

    The actual counting of votes is done in vote_result in majority.rs.

    It counts the number of nodes in the cluster that voted for itself and the number of nodes that did not vote, and returns VoteResult::Won if more than a majority of the nodes agree, VoteResult::Pending if the majority is greater than a majority when including nodes that did not get a majority of the votes but failed to send a response, or VoteResult::Lost otherwise.

    // tracker.rs
    pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult {
        self.conf.voters.vote_result(|id| votes.get(&id).cloned())
    }
    
    // joint.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        let i = self.incoming.vote_result(&check);
        let o = self.outgoing.vote_result(check);
        match (i, o) {
            // It won if won in both.
            (VoteResult::Won, VoteResult::Won) => VoteResult::Won,
            // It lost if lost in either.
            (VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
            // It remains pending if pending in both or just won in one side.
            _ => VoteResult::Pending,
        }
    }
    
    // majority.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        ...
    
        let (mut yes, mut missing) = (0, 0);
        for v in &self.voters {
            match check(*v) {
                Some(true) => yes += 1,
                None => missing += 1,
                _ => (),
            }
        }
        let q = crate::majority(self.voters.len());
        if yes >= q {
            VoteResult::Won
        } else if yes + missing >= q {
            VoteResult::Pending
        } else {
            VoteResult::Lost
        }
    }
    
    // util.rs
    pub fn majority(total: usize) -> usize {
        (total / 2) + 1
    }
    

    We've seen how the voting process is based on the votes hashmap, but before this can happen, this hashmap needs to be updated appropriately via the MsgRequestVote, MsgRequestVoteResponse messages.

    So, let's continue following the campaign function.

    We can see that the campaign function is creating messages of type MsgRequestVote and sending them to voters.

    So next, let's follow the handler for the MsgRequestVote message.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        // Only send vote request to voters.
        for id in self.prs.conf().voters().ids().iter() {
            if id == self_id {
                continue;
            }
            ...
            let mut m = new_message(id, vote_msg, None);
            m.term = term;
            m.index = self.raft_log.last_index();
            m.log_term = self.raft_log.last_term();
            m.commit = commit;
            m.commit_term = commit_term;
            ...
            self.r.send(m, &mut self.msgs);
        }
        ...
    }
    
    

    At first glance, it seems complicated, but at the end of the day, what the handler of MsgRestVote does is create and send a message to agree or disagree with this vote.

    Based on the vote_resp_msg_type, the type we sent is MsgRequestVote, so the type of the response message will be MsgRequestVoteResponse. (We'll skip describing the prevote algorithm in this article)

    So let's see when a node agrees to vote and when it disagrees. If you peruse the code along with the comments, you'll notice that three conditions must be met for a node to agree to a vote.

    1. can_voteis *true* (either you already voted for the node, or you don't know theleader_idfor thisterm` and haven't voted yet)

    2. self.raft_log.is_up_to_date is true (the message's term value is greater than RaftLog.last_term or, if equal, the message's index is greater than RaftLog.last_index)

    3. the index of the message is greater than RaftLog.last_index, or has a higher priority.

    If these three conditions are met, we send a message that we agree to vote, and if none of them are met, we reject the vote.

    Now let's move on to the receiver of the MsgRequestVoteResponse.

    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgRequestVote => {
                // We can vote if this is a repeat of a vote we've already cast...
                let can_vote = (self.vote == m.from) ||
                    // ...we haven't voted and we don't think there's a leader yet in this term...
                    (self.vote == INVALID_ID && self.leader_id == INVALID_ID)
                    
                // ...and we believe the candidate is up to date.
                if can_vote
                    && self.raft_log.is_up_to_date(m.index, m.log_term)
                    && (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m))
                {
                    self.log_vote_approve(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = false;
                    to_send.term = m.term;
                    self.r.send(to_send, &mut self.msgs);
                    if m.get_msg_type() == MessageType::MsgRequestVote {
                        // Only record real votes.
                        self.election_elapsed = 0;
                        self.vote = m.from;
                    }
                } else {
                    self.log_vote_reject(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = true;
                    to_send.term = self.term;
                    let (commit, commit_term) = self.raft_log.commit_info();
                    to_send.commit = commit;
                    to_send.commit_term = commit_term;
                    self.r.send(to_send, &mut self.msgs);
                    self.maybe_commit_by_vote(&m);
                }
            }
        }
    }
    
    // raft.rs
    pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
        match t {
            MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
            MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
            _ => panic!("Not a vote message: {:?}", t),
        }
    }
    
    // raft_log.rs
    pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool {
        term > self.last_term() || (term == self.last_term() && last_index >= self.last_index())
    }
    

    The MsgRequestVoteResponse message handler is very simple!

    It calls the poll function we saw earlier to update the votes hashmap and update the StateRole if the vote has been decided.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_candidate(&mut self, m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgRequestVoteResponse => {
                ...
                self.poll(m.from, m.get_msg_type(), !m.reject);
                self.maybe_commit_by_vote(&m);
            }
        }
    }
    

    Summary

    In this article, we looked at the code architecture based on the types used in RAFT-RS, and then followed and analyzed the code of a RAFT implementation in three basic scenarios. We hope that this article has helped you expand your understanding of the RAFT module. In the next installment, we'll take a deeper look at how the RAFT implementation works with more scenarios.

    Thanks 😊

    This post auto translated from Korean

    29 March 2024

  • Introducing Raftify: High-level Raft framework created with focusing on scalability

    By Gyubong Lee

    Hello, I've been working on introducing Raft to the Backend.AI manager processes at Lablup since last year.

    Here's a rough breakdown of the related tasks I'm working on.

    1. Introducing Raft to the Backend.AI manager process and making it a leader-follower structure.
    2. Replacing the existing distributed lock-based GlobalTimer with a Raft-based global timer, and ensuring that a specific task is performed exactly once in the cluster.
    3. Embedding a global, shareable state store in the manager process and synchronizing it appropriately.

    In this post, I'll introduce the Raft framework I've been shoveling over the past year to accomplish this task, some of the issues I've encountered while developing it, and walk through a raftify example code that implements a distributed key-value store in less than 300 lines of code in total.

    Introducing raftify

    raftify is a Raft framework developed with a focus on extensibility so that it can be easily integrated with any server application.

    Among the Raft implementations utilized in production, tikv's raft-rs implementation was developed on top of raftify, using LMDB as stable storage and gRPC as the network layer.

    Writing binding of the Raft module

    I decided that building and maintaining a reliable Raft implementation from the ground up would be a significant burden, so I decided to write a Python binding for the Raft module first.

    So I initially thought I'd try writing a Python binding using gopy, which is the most starred Raft implementation on GitHub, hashicorp/raft.

    However, gopy didn't support the binding of goroutine, and it didn't support the latest Python version.

    Then, on the advice of a senior developer in the company, I learned about a Rust implementation called tikv/raft-rs and PyO3, which inspired me to try writing a Python binding for tikv/raft-rs using PyO3.

    rraft-py

    Thus, I decided to develop a Python binding for Raft modules named rraft-py which is combined with Rust, Raft, and Py

    My first concern in developing rraft-py was to make the semantics of the rust code and the python code as close to a 1:1 match as possible.

    To achieve a 1:1 match, I needed to bypass some of the details of Rust's syntax.

    My main concern at the time was how to expose Rust references to the Python side, which you can see in my PyCon KR presentation if you're interested.

    The result is rraft-py, a port of over 10,000 lines of integration test code from raft-rs, which has become a fairly reliable implementation of Raft bindings that can be used directly in Python.

    Currently, raftify is in the process of being completely rewritten in Rust, and rraft-py is no longer used, but it was a great experience to write my first PyO3 bindings and try out the APIs of a Raft implementation.

    riteraft-py

    After developing riteraft-py and porting over 10,000 lines of integration tests from raft-rs and even the multiple-mem-node example to python code to get it working, my only thought was that I still didn't know where to start.

    The raft-rs really only provided the Raft implementation itself and I had no idea how to integrate it into my application.

    While browsing GitHub, I came across a high-level Rust implementation based on tikv/raft-rs called riteraft in an issue called How to use this lib?, and it was much more intuitive to figure out how to use. So I decided to develop riteraft-py with the goal of mimicking its behavior in Python and integrating it at the application level.

    The job of riteraft is to integrate this Raft implementation directly with Raft modules and logs, state machines, and network layers, but the problem was that it didn't work very well, apart from being intuitive to use.

    Leader election not happening when the leader is dead, data replication not happening in certain scenarios, panic when the commit count goes over 255, etc... All sorts of miscellaneous issues had to be resolved.

    Even after resolving all of these issues and getting the cluster to look like it was working, the issues kept coming: it would seem to be working fine, but then certain failures would cause catastrophic issues, such as cluster inconsistency or log synchronization locking up.

    Each time we encountered an issue, we needed to be able to dig into the technical details of RAFT-RS and understand them, which ended up being a process of ripping apart RAFT-RS's code and understanding it piece by piece.

    raftify

    While troubleshooting the issue, I decided to use a different abstraction than riteraft and implemented many changes, including a CLI module for debugging node and cluster state, which led me to rename the library to raftify.

    When I first started developing the library, the goal was to make it compatible with any Python application, hence the name raftify, which means to raftify.

    I am no longer developing the Python implementation, but you can find it on its branch.

    raftify written in Rust

    Developed in Python on top of rraft-py, raftify ended up working well, but the crude test harness written in a multi-process structure was hard to test in CI, easily broke cluster consistency, and got out of control at the slightest hint of code complexity.

    As a result, we decided to completely rewrite raftify's internal logic in Rust and expose only the high-level interface of the Raft package in Python.

    Once completely rewritten in Rust, raftify was single-threaded, integration testable, and could be tested in CI, which helped eliminate the fear of making code changes.

    raftify example code

    In this section, we'll create a simple distributed key-value store using raftify.

    For the full source code, see this link.

    Define the state machine

    The first thing we need to do is define the log entries and state machine that we will use in our key-value store.

    For the sake of this article, we'll just define a simple Insert type command that defines a value as a log entry.

    💡 Disclaimer: This article does not explain the Rust language syntax and the theoretical background of Raft.

    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub enum LogEntry {
        Insert { key: u64, value: String },
    }
    

    Let's define a state machine of type HashMap as shown below.

    #[derive(Clone, Debug)]
    pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
    

    Then we need to define encode and decode methods to indicate how we want to serialize and deserialize these data structures. You can use the bincode crate to define these as simply as below.

    impl AbstractLogEntry for LogEntry {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(self).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<LogEntry> {
            let log_entry: LogEntry = deserialize(bytes)?;
            Ok(log_entry)
        }
    }
    
    impl AbstractStateMachine for HashStore {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<Self> {
            let db: HashMap<u64, String> = deserialize(bytes)?;
            Ok(Self(Arc::new(RwLock::new(db))))
        }
    }
    

    Finally, we need to define three methods in the HashStore that will be used by raftify's internal code.

    Define apply, a method that will be called when a new log entry is applied to the HashStore, snapshot, which will be called when saving the current state of the HashStore as a snapshot, and restore, which will be called when restoring the state of the HashStore via a snapshot byte slice, as shown below.

    #[async_trait]
    impl AbstractStateMachine for HashStore {
        async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
            let log_entry: LogEntry = LogEntry::decode(&data)?;
            match log_entry {
                LogEntry::Insert { ref key, ref value } => {
                    let mut db = self.0.write().unwrap();
                    log::info!("Inserted: ({}, {})", key, value);
                    db.insert(*key, value.clone());
                }
            };
            Ok(data)
        }
    
        async fn snapshot(&self) -> Result<Vec<u8>> {
            Ok(serialize(&self.0.read().unwrap().clone())?)
        }
    
        async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
            let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
            let mut db = self.0.write().unwrap();
            let _ = std::mem::replace(&mut *db, new);
            Ok(())
        }
    }
    

    Define the web server API

    Let's define the web server API that will be used in our example. We will use this API to access the Raft object on the node and manipulate the HashStore.

    For our example, we'll use the actix-web crate and define it as shown below.

    The put command can be implemented by calling the propose method on the RaftNode of the Raft object. We can do this by encoding the Insert type LogEntry we defined earlier and passing it as an argument to the RaftNode::propose method.

    The get command can be implemented by returning the value corresponding to the id from the HashMap stored in memory.

    #[get("/put/{id}/{value}")]
    async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder {
        let log_entry = LogEntry::Insert {
            key: path.0,
            value: path.1.clone(),
        };
        data.1.raft_node.propose(log_entry.encode().unwrap()).await;
    
        "OK".to_string()
    }
    
    #[get("/get/{id}")]
    async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder {
        let id = path.into_inner();
    
        let response = data.0.get(id);
        format!("{:?}", response)
    }
    
    let web_server = tokio::spawn(
        HttpServer::new(move || {
            App::new()
                .app_data(web::Data::new((store.clone(), raft.clone())))
                .service(put)
                .service(get)
        })
        .bind(addr)
        .unwrap()
        .run(),
    );
    

    Bootstrap a Raft cluster

    Next, let's bootstrap a cluster of RaftNodes.

    If the --peer-addr argument is given, send a join request to the cluster to get a new node_id via peer-addr and bootstrap a new cluster if this argument is not given.

    Leader

    In this example, we'll stick with the node_id of the leader node for intuitive understanding. This means that when we call Raft::bootstrap_cluster, we can create a Raft object of the leader node by passing 1 to the node_id. After that, we can call the Raft::run method and the RaftNode will run.

    Followers

    As shown below, after calling Raft::request_id to receive a ClusterJoinTicket that can be used to join the cluster, you will call Raft::new_follower with the specified node_id to create a follower Raft node object, call the Raft::run method to have the RaftNode run, and join the cluster via the Raft::join method.

    let (raft, raft_handle) = match peer_addr {
        Some(peer_addr) => {
            log::info!("Running in Follower mode");
    
            let ticket = Raft::request_id(raft_addr, peer_addr.clone(), logger.clone()).await.unwrap();
            let node_id = ticket.reserved_id;
    
            let raft = Raft::new_follower(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
    
            let handle = tokio::spawn(raft.clone().run());
            raft.join(ticket).await;
            (raft, handle)
        }
        None => {
            log::info!("Bootstrap a Raft Cluster");
            let node_id = 1;
            let raft = Raft::bootstrap_cluster(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
            let handle = tokio::spawn(raft.clone().run());
            (raft, handle)
        }
    };
    
    let _ = tokio::try_join!(raft_handle)?;
    

    You can now bootstrap a Raft cluster of three nodes in the terminal as shown below.

    $ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8002
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8003
    

    Test

    We can now try out the key-value store we defined through the actix-web server API via the curl command.

    ❯ curl http://localhost:8001/put/1/test
    OK
    
    ❯ curl http://localhost:8001/get/1
    Some("test")
    

    If you're interested in learning more, you can check out the raftify repository for instructions on how to use the CLI module to help with debugging, example code for RaftServiceClient, and more.

    Summary

    raftify is an experimental framework that aims to make it easier for anyone to integrate Raft modules that are otherwise hard to access by the normal developer.

    It was developed to introduce a leader-follower structure to Backend.AI manager processes, but as I've shown in this post, it could be used in a variety of places where you need an HA structure, such as creating your own simple distributed key-value store with short source code.

    If you're intrigued by the inner workings of the tikv/raft-rs implementation, stay tuned for my next post where I'll be analyzing what happens inside the source code line by line in a few scenarios.

    This post is automatically translated from Korean

    26 January 2024

We're here for you!

Complete the form and we'll be in touch soon

Contact Us

Headquarter & HPC Lab

8F, 577, Seolleung-ro, Gangnam-gu, Seoul, Republic of Korea

© Lablup Inc. All rights reserved.