태그 : 분산 시스템

  • 실제로 동작하는 Raft 구현체 뜯어 보기 - 2

    By 이규봉

    지난 포스팅 에서는 raft-rs 타입들을 중심으로한 전체적인 개요와 시스템에 네트워크 장애가 발생했을 때 리더 선출이 어떤 식으로 이뤄지는지, 어떻게 로그 비일관성을 해소하고 장애를 극복한 후 일관적인 상태를 유지하게 되는지 세 가지 시나리오를 기반으로 알아보았습니다.

    이 글에선 지난 글에 이어 Raft 구현체의 동작 방식을 몇몇 시나리오에 걸쳐 살펴보겠습니다.

    이번에 살펴볼 시나리오는 Raft 클러스터의 상태를 어떤 과정을 거쳐 Stable storage에 저장하고, 클러스터를 다시 부트스트랩 했을 때 어떻게 이전 상태를 로그와 스냅샷으로부터 복구하게 되는지 알아보겠습니다.

    💡 Raftify는 Lablup에서 개발한 하이레벨의 Raft 구현체입니다. Raftify에 대해 궁금하시다면 해당 포스팅을 참고해보세요.

    타입을 중심으로 살펴보는 raft-rs 아키텍쳐

    이번 글에서도 마찬가지로 시나리오 분석에 앞서 raft-rs의 타입들 중 이번 글에 등장할 몇몇 타입들을 알아보도록 하겠습니다.

    ConfState

    클러스터는 여러 노드들로 구성되어 있으며 각 노드들은 장애 발생으로 인한 투표 상황에서 투표에 참여할 지의 여부에 따라 voterlearner로 나뉩니다. voterlearner 모두 클러스터 구성원으로서 클러스터로부터 합의를 공유하지만 learner의 경우 투표에 참여하지 않습니다.

    이러한 클러스터 구성원들에 대한 정보 역시 클러스터 구성원 간의 합의에 포함되며, 그렇기 때문에 로그 엔트리를 적용함으로써 구성되거나 변경될 수 있습니다.

    💡 raft-rs의 EntryType은 이런 ConfState 구성 변경을 위한 EntryConfChange 타입과 일반적인 상태 변경을 위한 EntryNormal 타입으로 나뉩니다.

    raft-rs에서 사용되는 타입들 중 네트워크 계층에 사용되는 타입들은 eraftpb.proto 파일에 정의되어 있으며 tonic에 의해 러스트 코드로 컴파일 됩니다.

    message ConfState {
        repeated uint64 voters = 1;
        repeated uint64 learners = 2;
    
        // The voters in the outgoing config. If not empty the node is in joint consensus.
        repeated uint64 voters_outgoing = 3;
        // The nodes that will become learners when the outgoing config is removed.
        // These nodes are necessarily currently in nodes_joint (or they would have
        // been added to the incoming config right away).
        repeated uint64 learners_next = 4;
        // If set, the config is joint and Raft will automatically transition into
        // the final config (i.e. remove the outgoing config) when this is safe.
        bool auto_leave = 5;
    }
    

    voters_outgoing, learners_next, auto_leave는 Joint consensus 지원을 위한 필드로 이 글에선 Joint consensus에 대한 설명은 생략하도록 하겠습니다.

    Snapshot과 SnapshotMetadata

    시스템의 가용성을 위해 로그를 무한정 쌓아둘 수 없기 때문에 오래된 로그들은 삭제되어야 하며 제거되기 전 반드시 상태 머신에 반영되어야 합니다.

    로그 시퀸스에서 특정 인덱스까지의 로그를 지우는 것을 로그 컴팩션이라고 부르며 해당 인덱스까지 로그 엔트리가 적용된 상태를 기록한 것을 스냅샷이라고 부릅니다.

    스냅샷은 이번 포스팅의 핵심 주제로 아래 시나리오 분석에서 자세히 살펴보겠지만 새로 가입한 노드로 클러스터의 상태를 전송하거나, 장애로부터 복구하기 위한 용도로 활용됩니다.

    message Snapshot {
        bytes data = 1;
        SnapshotMetadata metadata = 2;
    }
    
    message SnapshotMetadata {
        // The current `ConfState`.
        ConfState conf_state = 1;
        // The applied index.
        uint64 index = 2;
        // The term of the applied index.
        uint64 term = 3;
    }
    

    SnapshotMetadata은 스냅샷이 생성될 당시의 메타 데이터입니다.

    구체적으로 각 필드들은 아래와 같은 의미를 갖습니다.

    • conf_state: 스냅샷이 생성될 당시의 클러스터 구성원 정보를 나타냅니다.
    • index: 스냅샷이 생성된 당시 컴팩션이 이뤄진 마지막 로그 엔트리의 인덱스를 나타냅니다.
    • term: 스냅샷 생성된 당시 마지막 로그 엔트리가 갖는 term 값을 나타냅니다.

    위와 같은 메타 데이터들은 스냅샷을 활용할 때 로그 일관성을 깨지 않기 위해 필수적인 요소입니다.

    예를 들어 스냅샷으로 상태 정보를 복원할 때 스냅샷의 인덱스에 해당하는 로그 엔트리의 term과 스냅샷 메타 데이터의 term이 일치하지 않는 경우 일관성 유지를 위해 스냅샷 적용 요청을 무시해야 합니다.

    시나리오 분석

    1 - 스냅샷 기록

    Raftify에서 스냅샷 생성은 아래와 같은 RaftNode의 make_snapshot()라는 메서드 호출로 이뤄집니다.

    특정 인덱스 및 해당 인덱스에서의 로그 엔트리의 term 값을 인자로 넘겨줍니다.

    스냅샷에 저장할 데이터는 self.fsm.snapshot() 메서드가 리턴한 데이터로, 현재 상태 머신의 상태에 해당합니다.

    💡 self.fsm.snapshot() 메서드는 FSM(Finite State Machine)을 어떻게 저장할 것인지 여부에 따라 다르게 구현될 수 있으므로 Raftify 유저가 구현해 넘겨주어야 하는 구현 중 하나입니다. 예를 들어 인메모리에 FSM을 저장하는 HashStore 예제의 경우 snapshot()은 단순히 HashMap을 직렬화해 리턴합니다.

    상태 머신에 적용된 마지막 로그 엔트리의 인덱스 last_appliedcompact()에 넘겨주면 로그 엔트리에서 주어진 인덱스 이전까지의 로그를 삭제합니다.

    // lablup/raftify/blob/main/src/raft_node/mod.rs
    pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> {
        ...
        let snapshot_data = self.fsm.snapshot().await?;
    
        let last_applied = self.raw_node.raft.raft_log.applied;
        let store = self.raw_node.mut_store();
        store.compact(last_applied)?;
        store.create_snapshot(snapshot_data, index, term)?;
        Ok(())
    }
    

    create_snapshot()는 넘겨 받은 스냅샷 데이터 data와 함께 스냅샷 메타 데이터들을 기록합니다.

    // lablup/raftify/blob/main/src/heed_storage/mod.rs
    fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> {
        let store = self.wl();
        let mut writer = store.env.write_txn()?;
        let conf_state = store.conf_state(&writer)?;
    
        let mut snapshot = Snapshot::default();
        snapshot.set_data(data);
    
        let meta = snapshot.mut_metadata();
        meta.set_conf_state(conf_state);
        meta.index = index;
        meta.term = term;
    
        store.set_snapshot(&mut writer, &snapshot)?;
        writer.commit()?;
        Ok(())
    }
    

    2 - 새로 조인한 노드에 스냅샷 전송

    시나리오

    클러스터에 새로 조인한 노드는 일관성을 유지하기 위해 기존 클러스터의 상태를 전송받아야 합니다.

    하지만 새 노드가 클러스터에 참여할 때마다 모든 로그 엔트리를 하나 하나 복제하는 것은 비효율적인 일입니다. 모든 노드는 같은 상태 머신을 가지기 때문에 모든 로그 엔트리를 전송하는 대신, 로그 엔트리들이 적용된 결과물인 스냅샷만을 전송해 문제를 해결할 수 있으며, 이 때 스냅샷 데이터를 전송하는 메시지의 타입은 MsgSnapshot입니다.

    따라서 이 섹션에서는 1번 노드가 리더 노드이고 2번 노드가 새로 조인한 노드라고 가정한 후 MsgSnapshot 메시지와 관련된 코드와 로그를 중심으로 어떤 일이 일어나고 있는지 살펴보도록 하겠습니다.

    Raftify에선 새로 조인한 팔로워가 리더 노드에게 별개의 스냅샷 요청을 전송하지 않습니다.

    구성 변경 요청(이후 ConfChange) 이 커밋되면 리더가 해당 로그 엔트리를 새로 조인한 노드에 보내려고 시도하고, 새 노드는 이 로그 엔트리를 갖고 있지 않기 때문에 이 MsgAppend 메세지는 거절됩니다.

    전편의 시나리오 2에서 네트워크 장애로 인해 MsgAppend 메시지가 거절되었을 때 생기는 노드 사이의 비일관성을 해소하는 시나리오를 다뤘었던 것을 기억하시나요?

    해당 시나리오에선 prepare_send_entries()를 통해 불일치하는 로그 엔트리들을 하나씩 동기화 했었습니다. 새로 조인한 노드와의 로그 비일관성을 해소하는 경우는, 단지 로그 엔트리를 하나씩 동기화 하는 대신 스냅샷(prepare_send_snapshot())을 통해 동기화 한다는 점이 다르다고 볼 수 있습니다.

    그럼 아래에선 코드 및 로그 분석을 통해 해당 시나리오가 어떤 과정을 통해 일어나고 있는 것인지 자세히 알아보겠습니다.

    코드 분석

    우선 해당 시나리오와 관련된 코드들 중 리더가 새로 조인한 노드에게 보낸 MsgAppend 메시지가 거절되는 부분부터 살펴보도록 하겠습니다.

    maybe_send_append() 코드를 살펴보면 아래와 같습니다. 아래 코드에서 새로 조인한 노드의 progress는 비어 있기 때문에 self.raft_log.term() 호출은 실패하게 되고, prepare_send_snapshot()가 호출되면서 maybe_send_append()false를 리턴합니다 (MsgAppend 거절)

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => {
                    // wait for storage to fetch entries asynchronously
                    return false;
                }
                _ => {
                    // 💡 이번 시나리오에선 아래 분기가 실행됩니다.
                    // send snapshot if we failed to get term or entries.
                    if !self.prepare_send_snapshot(&mut m, pr, to) {
                        return false;
                    }
                }
            }
        }
        self.send(m, msgs);
        true
    }
    

    호출된 prepare_send_snapshot()는 아래와 같은 함수로, self.raft_log.snapshot() 메서드를 호출해 스냅샷 데이터를 가져온 후 송신할 메시지에 설정합니다.

    그 후 해당 노드의 progress 객체를 snapshot 상태라고 표시한 후 리턴합니다.

    💡 여기서 노드의 상태가 snapshot 상태라는 것은 해당 노드가 스냅샷 복제 상태이기 때문에 이 노드로의 로그 복제 작업이 잠시 중단될 것임을 나타냅니다.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
        ...
        m.set_msg_type(MessageType::MsgSnapshot);
        let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
        if let Err(ref e) = snapshot_r {
            if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
                self.logger.debug(
                    format!(
                        "failed to send snapshot to {} because snapshot is temporarily unavailable",
                        to
                    )
                    .as_str(),
                );
                return false;
            }
            self.logger
                .fatal(format!("unexpected error: {:?}", e).as_str());
        }
        let snapshot = snapshot_r.unwrap();
        if snapshot.get_metadata().index == 0 {
            self.logger.fatal("need non-empty snapshot");
        }
        let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term);
        m.set_snapshot(snapshot);
        self.logger.debug(format!(
            "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}",
            first_index = self.raft_log.first_index(),
            committed = self.raft_log.committed,
            snapshot_index = sindex,
            snapshot_term = sterm,
            to = to,
            progress = format!("{:?}", pr)
        ).as_str());
    
        pr.become_snapshot(sindex);
        self.logger.debug(
            format!(
                "paused sending replication messages to {}; progress: {:?}",
                to, pr
            )
            .as_str(),
        );
        true
    }
    

    따라서 Raftify는 ConfChange가 커밋될 때 1번 시나리오에서 살펴봤었던 RaftNode.make_snapshot() 호출을 통해 새 노드에 전송할 스냅샷을 미리 준비해둡니다.

    이렇게 전송된 스냅샷은 새로 조인한 노드의 Raft loop의 Snapshot 핸들링 로직에서 감지되어 복구하게 됩니다. 아래 로직의 self.fsm.restore()을 통해 전송 받은 스냅샷 데이터로 상태 머신을 복구하고, store.apply_snapshot()을 통해 Stable storage에도 적용해줍니다.

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    async fn on_ready(&mut self) -> Result<()> {
        ...
        if *ready.snapshot() != Snapshot::default() {
            self.logger
                .info("Restoring state machine and snapshot metadata...");
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
        ...
    }
    

    리더 노드 로그 분석

    이번엔 새로운 노드가 조인 했을 때 리더 노드에 출력되는 로그들을 하나씩 순서대로 분석해보겠습니다.

    1. 1번 노드는 2번 노드로부터 조인 요청을 받고 클러스터 구성이 변경됩니다.
    Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter.
    Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
    
    1. 리더에 새로운 로그 엔트리가 추가되었기 때문에 2번 노드에 이 로그 엔트리를 복제하기 위해 MsgAppend 메시지를 송신합니다.
    Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. 그러나 새로 조인한 노드는 기존 클러스터의 정보를 갖고 있지 못하기 때문에 이 MsgAppend 메시지는 거절되며 1번 노드는 아래와 같이 요청이 거절되었다는 메시지를 받게 됩니다.
    Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8
    Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. 위에서 설명한것 처럼 새로 조인한 노드의 progress는 비어 있으므로, 스냅샷을 Stable storage에 저장하고 해당 인덱스까지의 로그 엔트리들을 제거하게 됩니다. 이 경우엔 8 이전까지의 로그 엔트리들이 제거되었으며 2번 노드의 조인 요청에 해당하는 로그 엔트리의 인덱스는 9입니다. 따라서 아래와 같이 first_index8이며, commit9라는 로그와 함께 스냅샷 메세지가 전송됩니다.
    Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. 스냅샷 전송을 위해 로그 엔트리 복제를 중단합니다.
    Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. 스냅샷을 전송하는 MsgSnapshot 타입의 메시지를 송신합니다. 스냅샷엔 이전에 임의로 넣어 놓은 data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"} 라는 데이터가 들어 있는 것을 확인할 수 있습니다
    Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    

    팔로워 노드 로그 분석

    새로 조인한 팔로워 노드에 출력되는 로그를 분석해보면 아래와 같습니다.

    1. term 1에서 새로운 팔로워 노드가 됩니다.
    Apr 15 06:37:27.421 INFO became follower at term 1
    
    1. 리더 노드로부터 온 MsgAppend 메세지를 거절합니다.
    Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0)
    Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. 해당 노드가 장애 상태로 감지되어 불필요한 투표가 일어나선 안 되기 때문에 MsgHeartbeat 메시지엔 정상 응답해야 합니다.
    Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. MsgSnapshot 메시지를 통해 스냅샷을 전송 받습니다.
    Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1]
    Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    
    1. 전송받은 스냅샷을 통해 상태를 복구합니다.
    Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1
    Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1]
    Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata...
    Apr 15 06:37:27.425 DEBG snapshot's persisted index  9
    

    3 - 대다수(Majority) 이상의 노드에 장애가 생긴 경우 복구

    특정 노드에 장애가 발생한 경우 해당 노드는 단지 네트워크가 복구된 후 리더 노드로부터 새 로그 엔트리들을 복제 받으면 되기 때문에 문제가 되지 않습니다. 노드가 새로 조인해야 하는 경우에도 2번 시나리오에서 다뤘듯이 스냅샷을 통해 상태를 복구할 수 있으므로 문제가 되지 않습니다.

    하지만 쿼럼 이상의 노드에 장애가 발생한 경우 클러스터는 스스로 장애를 복구할 수 없습니다.

    이 경우 관리자가 수동으로 개입해 어떤 노드의 로그 시퀸스를 정상 상태로 볼 것인지 결정한 후 해당 로그 시퀸스로부터 다시 클러스터를 부트스트랩 해 주어야 합니다.

    이 때 관리자의 판단에 따라 상태 머신에 모든 로그 엔트리를 하나 하나 직접 적용해가며 복구하거나 마지막으로 생성된 스냅샷으로부터 상태를 복구해야 합니다.

    WAL 스냅샷에서의 상태 복구

    해당 섹션에선 직접 Raftify의 예제 코드를 사용합니다.

    예제를 재현하기 위해 1번 노드에 간단하게 몇 개의 키값을 넣어준 후 /snapshot API를 통해 make_snapshot() 메서드를 호출해 스냅샷을 생성해줍니다. 그리고 노드에 장애가 일어났다고 가정하고 종료해볼 것입니다.

    WAL 스냅샷으로부터 복구하기 위해선 restore_wal_snapshot_from 라는 옵션에 복구할 노드의 node_id를 넘겨주면 됩니다. 여기선 1번 노드의 스냅샷으로 복구할 것이므로 1을 넣어주면 됩니다.

    그리고 로그 엔트리의 적용 여부를 확인하기 위해 apply()가 호출될 때 마다 "Inserted: (key, value)"와 같은 로그를 남겨보도록 하겠습니다.

    💡 apply() 역시 restore()와 마찬가지로 Raftify 유저가 정의해야 하는 StateMachine의 추상 메서드들 중 하나로 로그 엔트리가 커밋되는 시점에 호출됩니다.

    스냅샷을 찍고 1번 노드를 종료한 후 Raftify가 제공하는 CLI 명령어를 사용해 스토리지를 덤프해보면 아래와 같습니다.

    아래 로그를 통해 스토리지 내에 스냅샷이 저장되어 있고 { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}와 같은 데이터를 갖고 있다는 것을 알 수 있습니다.

    ❯ raftify-cli debug persisted-all ./logs
    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    그 후 ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1 명령어를 통해 1번 노드를 다시 부트스트랩 시켜 봅시다.

    이 때 1번 노드에 출력되는 로그는 아래와 같습니다. 상태를 스냅샷으로부터 바로 복구하므로 각 로그 엔트리들에 대한 apply()는 한 번도 실행되지 않았습니다.

    Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), }
    Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:54:44.705 INFO became follower at term 3
    Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:54:44.705 INFO RawNode created with id 1.
    Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    

    그리고 다시 스토리지를 덤프 해 봅시다.

    자기 자신의 스냅샷으로부터의 복구이기 때문에 아무런 상태 변화도 일어나지 않은 것을 확인할 수 있습니다.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    WAL 로그에서의 상태 복구

    이번에는 특정 로그 시퀸스로부터 상태를 복구해봅시다.

    이번엔 아래 로그와 같이 스토리지에 스냅샷은 비어 있으며 대신 상태를 복구하기 위한 로그 엔트리들이 저장되어 있습니다.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    이전 섹션에서와 마찬가지로 장애를 가정해 1번 노드를 종료하고 다시 부트스트랩 했을 때 어떤 일이 일어나는지 살펴보겠습니다.

    1번 노드를 종료한 후 ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1 명령어로 1번 노드를 다시 부트스트랩 시켜 봅시다.

    1번 노드에 아래와 같은 로그가 출력되며 이전에 입력한 로그 엔트리들이 한 번에 apply() 되며 이전의 상태를 복구하는 것을 알 수 있습니다.

    Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, }
    Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:46:50.712 INFO became follower at term 1
    Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:46:50.712 INFO RawNode created with id 1.
    Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    Apr 15 07:46:50.855 DEBG Entries [1, 9) requested.
    
    // 하나씩 로그 엔트리들을 apply하며 상태 머신 상태를 복구해나감
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (2, A)
    Inserted: (3, A)
    Inserted: (4, A)
    Inserted: (5, A)
    

    이번에도 마찬가지로 자기 자신의 크러시 이전 상태를 복구한 것이므로 스토리지를 덤프해보면 이전과 같습니다. 다른 점은 이전엔 스냅샷을 통해 빠르게 상태를 복구한 것에 비해 모든 로그 엔트리들을 하나 하나 적용했다는 점입니다.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    마무리

    이번 글에선 지난 편에 이어 스냅샷을 중심으로 새로 조인한 노드가 있을 때 로그 비일관성 해소 문제와 장애 복구 시나리오에 대해 알아보았습니다.

    Raftify는 2024 오픈소스 컨트리뷰션 아카데미에 참여형 프로젝트로 참가해 분산 시스템 구현에 관심이 있는 멘티 분들을 모집하고 있습니다! (모집 기간: ~ 06.23)

    참가자분들은 멘토들과 함께 분산 시스템의 기본 개념 학습부터 실제 구현 과정까지 경험해 볼 수 있습니다.

    많은 관심 부탁드립니다! 감사합니다 😊

    29 May 2024

  • 실제로 동작하는 Raft 구현체 뜯어 보기 - 1

    By 이규봉

    이 글에선 독자들이 Raft에 관한 이론적인 배경지식이 있다고 가정하고, tikv/raft-rs 코드를 샅샅이 훑어 보며 어떻게 실제로 분산 시스템에서의 상태 머신이 동기화되고 동작하게 되는지 몇 개의 간략한 시나리오에 걸쳐 알아보겠습니다.

    이 글은 raft-rs 코드 분석에 초점을 맞추고 있지만, raft-rs 구현체가 유연성을 위해 네트워크 및 스토리지 계층을 포함하지 않기 때문에 온전한 이해를 위해 일부 섹션에서 raftify 소스 코드를 예제로 사용합니다.

    💡 raftify는 Lablup에서 개발한 하이레벨의 Raft 구현체입니다. 이 글에선 raftify에 대해선 raft의 동작 방식을 이해하기 위한 최소한의 코드만을 설명합니다. raftify에 대해 궁금하시다면 해당 포스팅을 참고해보세요.

    타입을 중심으로 살펴보는 raft-rs 아키텍처

    시나리오를 살펴 보기에 앞서 코드 베이스에서 사용되는 대표적인 타입들을 중심으로 아키텍쳐를 대략적으로 살펴봅시다.

    Raft

    각 Raft 노드들의 Raft 객체는 메시지 큐 msgs 를 메모리에 들고 있으며 이 큐를 통해 다른 Raft 노드들과 상호작용합니다.

    raftify와 같은 하이레벨의 구현체에서 네트워크 계층은 후에 설명할 추상화 계층을 통해 이 큐에 메시지를 넣는 역할을 하게 됩니다.

    따라서 이 메시지 큐는 통신의 엔드 포인트로 볼 수 있으며, Raft 구현체는 현재 상태에 따라 이 메시지들을 처리해 나가며 노드 간의 일관된 상태를 유지합니다.

    이 Raft 노드의 상태에 해당하는 데이터들을 들고 있는 것이 RaftCore 타입입니다.

    또한 다른 Raft 노드들과의 로그 엔트리들을 동기화하기 위한 메타 데이터들을 담는 Progress란 타입이 있으며, 이것들은 상황에 따라 ProgressTracker에서 적절하게 업데이트 됩니다.

    결과적으로 Raft는 아래와 같은 타입이 됩니다.

    pub struct Raft<T: Storage> {
        pub msgs: Vec<Message>,
        pub r: RaftCore<T>,
        prs: ProgressTracker,
    }
    

    RaftLog

    RaftCore가 갖는 대표적인 데이터로 로그 엔트리 시퀸스에 대한 접근을 추상화하는 RaftLog가 있습니다.

    RaftLog<T: Storage>UnstableT 타입을 함께 다룰 수 있도록 추상화합니다. 여기서 T는 raftify와 같은 보다 높은 레벨에서 구현해 넣어 주어야 하는 영속적인 스토리지에 해당하며, Unstable은 이 스토리지에 기록되기 전 거치는 버퍼입니다.

    pub struct RaftLog<T: Storage> {
        pub store: T,
        pub unstable: Unstable,
    
        ...
    }
    

    💡 RaftCore 타입에 대해 더 궁금하다면 이 링크를 참고하세요.

    Raft Loop

    Raft 구현체들은 다른 Raft 노드들과 통신하며 일관된 상태를 유지하기 위해 무한 루프를 돌며 자신의 상태 머신을 업데이트 하는 반복적인 프로세스를 수행합니다. 이 글에선 이러한 루프를 Raft loop라고 부르겠습니다.

    raftify에서 Raft loop를 구현하는 소스 코드는 아래와 같습니다.

    (가장 minimal한 구현을 보고 싶다면 tikv/raft-rs의 예제 코드를 참고하실 수도 있습니다.)

    async fn on_ready(&mut self) -> Result<()> {
        if !self.raw_node.has_ready() {
            return Ok(());
        }
        let mut ready = self.raw_node.ready();
    
        if !ready.messages().is_empty() {
            self.send_messages(ready.take_messages()).await;
        }
    
        if *ready.snapshot() != Snapshot::default() {
            slog::info!(
                self.logger,
                "Restoring state machine and snapshot metadata..."
            );
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
    
        self.handle_committed_entries(ready.take_committed_entries())
            .await?;
    
        if !ready.entries().is_empty() {
            let entries = &ready.entries()[..];
            let store = self.raw_node.mut_store();
            store.append(entries)?;
        }
    
        if let Some(hs) = ready.hs() {
            let store = self.raw_node.mut_store();
            store.set_hard_state(hs)?;
        }
    
        if !ready.persisted_messages().is_empty() {
            self.send_messages(ready.take_persisted_messages()).await;
        }
    
        let mut light_rd = self.raw_node.advance(ready);
    
        if let Some(commit) = light_rd.commit_index() {
            let store = self.raw_node.mut_store();
            store.set_hard_state_commit(commit)?;
        }
    
        if !light_rd.messages().is_empty() {
            self.send_messages(light_rd.take_messages()).await;
        }
    
        self.handle_committed_entries(light_rd.take_committed_entries())
            .await?;
    
        self.raw_node.advance_apply();
    
        Ok(())
    }
    

    RawNode

    각 Raft 노드들은 Raft 모듈을 포함하는 RawNode란 타입의 좀 더 하이레벨의 인스턴스를 갖습니다. RawNode는 메모리에만 유지되는 상태인 SoftState, 영속적인 스토리지에 저장되는 상태인 HardState와 아직 저장되지 않은 Ready의 메타 데이터들을 나타내는 records 필드를 갖고 있습니다.

    💡 Ready란 Raft 노드를 업데이트 해야 할 필요가 있을 때 갱신되어야 할 데이터들을 한꺼번에 넘겨주는 자료구조입니다.

    pub struct RawNode<T: Storage> {
        pub raft: Raft<T>,
        prev_ss: SoftState,
        prev_hs: HardState,
        max_number: u64,
        records: VecDeque<ReadyRecord>,
        commit_since_index: u64,
    }
    

    ready() 메서드가 호출될 때 Ready의 메타 데이터가 records에 저장되고, 저장되어야 하는 스냅샷, 로그 엔트리 등이 모두 처리된 후 함수의 마지막 부분인 RawNode::advance()에서 RawNode::commit_ready()를 호출하며 버퍼 Unstable의 스냅샷, 엔트리를 비웁니다.

    RaftNode

    RaftNode는 raftify에서 RawNode를 네트워크, 스토리지 계층과 통합해 좀 더 하이 레벨에서 추상화하는 타입입니다.

    raftify는 별개의 비동기 태스크에서 gRPC 클라이언트에서 보낸 메시지들을 수신하여, 채널을 통해 RaftNode.run() 태스크에 이 메시지들을 넘겨줍니다.

    메시지를 처리하고 난 후엔 on_ready()란 이름의 메서드(Raft loop)에서 상태 변경을 처리합니다.

    pub async fn run(mut self) -> Result<()> {
        let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval);
        let fixed_tick_timer = tick_timer;
        let mut now = Instant::now();
    
        loop {
            ...
            tokio::select! {
                msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => {
                    if let Ok(Some(msg)) = msg {
                        self.handle_server_request_msg(msg).await?;
                    }
                }
                ...
            }
    
            let elapsed = now.elapsed();
            now = Instant::now();
            if elapsed > tick_timer {
                tick_timer = Duration::from_millis(100);
                self.raw_node.tick();
            } else {
                tick_timer -= elapsed;
            }
    
            self.on_ready().await?
        }
    }
    

    좀 더 raftify의 구현에 대해 자세히 설명해보자면 raftify는 아래와 같은 과정을 반복 처리합니다.

    1. 클라이언트에서 요청 생성. (예를 들어 RaftServiceClient.propose()RaftNode.propose()를 호출)
    2. gRPC를 통해 원격 Raft 노드의 RaftServiceClient.propose()가 호출됨.
    3. RaftServiceClient.propose()가 채널을 통해 Propose 메시지를 RaftNode.run() 비동기 태스크로 넘김.
    4. 메시지 큐를 폴링하던 RaftNode.run()Propose 메시지가 들어오면 RawNode.propose() 호출.
    5. 상태 머신에 적용되어야 하는 변경 사항이 생기면 Ready 인스턴스가 생성되어 on_ready() 핸들러로 전달됨.
    6. on_ready() 핸들러에서 커밋된 엔트리들을 처리한 후 클라이언트에 응답함.

    이론적인 내용들은 이쯤에서 마무리하고 시나리오 몇 개를 분석해보며 어떤 일들이 일어나는지 살펴봅시다.

    💡 이 단락에서 Propose 메시지라고 임의로 칭한 것은 클러스터에 상태 변경을 제안하기 위한 목적으로 정의된 타입의 메시지입니다.

    시나리오 분석

    1 - 새 로그 엔트리 추가

    리더 노드 분석

    상태 머신을 변경하기 위해 클러스터에 변경 사항을 요청하면 (propose) 내부에서 어떤 일이 일어날까요? 이 섹션에선 RawNode.propose()를 호출했을 때 어떤 과정을 거치게 되는지 하나씩 분석해보겠습니다. RawNode.propose() 함수를 살펴보면 아래와 같습니다.

    pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
        let mut m = Message::default();
        m.set_msg_type(MessageType::MsgPropose);
        m.from = self.raft.id;
        let mut e = Entry::default();
        e.data = data.into();
        e.context = context.into();
        m.set_entries(vec![e].into());
        self.raft.step(m)
    }
    

    위 코드를 통해 propose() 함수는 step()을 호출해 MsgPropose 타입의 메시지를 처리하도록 만드는 것을 알 수 있습니다.

    여기서 step()은 raft-rs의 실질적인 메시지 핸들러에 해당하는 함수입니다. step()을 호출한 노드가 리더인 경우 step_leader(), 팔로워인 경우 step_follower(), 후보자인 경우 step_candidate()가 호출됩니다.

    step()의 코드를 모두 이해하는 것은 다소 복잡하기 때문에 여기선 리더 노드에서 MsgPropose 타입이 어떻게 처리되는지 코드를 따라가봅시다.

    fn step_leader(&mut self, mut m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgPropose => {
                ...
                if !self.append_entry(m.mut_entries()) {
                    ...
                }
                self.bcast_append();
                return Ok(());
            }
        ...
        }
    }
    

    Raft.append_entry()RaftLog.append()를 호출해 엔트리들을 추가합니다. RaftLog.append()self.unstable.truncate_and_append()에서 Unstable 버퍼에 엔트리들을 추가합니다. 버퍼에 추가된 엔트리들은 Raft loop에서 Stable storage에 persist 될 것입니다.

    pub fn append(&mut self, ents: &[Entry]) -> u64 {
        ...
        self.unstable.truncate_and_append(ents);
        self.last_index()
    }
    

    그 다음으로 호출되는 bcast_append()에 대해 살펴보도록 하겠습니다.

    이전 섹션에서 설명한, 리더와 팔로워들의 로그 엔트리들을 동기화하기 위한 ProgressTracker (prs)를 통해 각 팔로워의 progress를 인자로 RaftCore.send_append()를 호출하는 것을 볼 수 있습니다.

    pub fn bcast_append(&mut self) {
        let self_id = self.id;
        let core = &mut self.r;
        let msgs = &mut self.msgs;
        self.prs
            .iter_mut()
            .filter(|&(id, _)| *id != self_id)
            .for_each(|(id, pr)| core.send_append(*id, pr, msgs));
    }
    

    send_append()는 아래와 같은 간략한 구조를 갖고 있습니다.

    fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) {
        self.maybe_send_append(to, pr, true, msgs);
    }
    

    maybe_send_append()RaftLog.entries를 통해 pr.next_idx ~ to 범위의 로그 엔트리들을 읽어온 후 prepare_send_entries()에 넘겨주며 성공하면 true, 실패하면 false를 리턴합니다.

    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let ents = self.raft_log.entries(
            pr.next_idx,
            self.max_msg_size,
            GetEntriesContext(GetEntriesFor::SendAppend {
                to,
                term: self.term,
                aggressively: !allow_empty,
            }),
        );
        ...
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        ...
        self.send(m, msgs);
        true
    }
    

    prepare_send_entries()는 메시지 객체 m을 MsgAppend 타입으로 만들고 엔트리들을 메시지에 넣어줍니다. 그 후 progress를 업데이트 해 준 후 리턴합니다.

    fn prepare_send_entries(
        &mut self,
        m: &mut Message,
        pr: &mut Progress,
        term: u64,
        ents: Vec<Entry>,
    ) {
        m.set_msg_type(MessageType::MsgAppend);
        m.index = pr.next_idx - 1;
        m.log_term = term;
        m.set_entries(ents.into());
        m.commit = self.raft_log.committed;
        if !m.entries.is_empty() {
            let last = m.entries.last().unwrap().index;
            pr.update_state(last);
        }
    }
    

    그리고 self.send(m, msgs)에서 이 준비한 메시지를 msgs 메시지 큐에 넣어 줍니다.

    fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
        ...
        msgs.push(m);
    }
    

    메시지 큐에 들어간 MsgAppend 메시지는 네트워크 계층을 통해 send_messages()에서 팔로워 노드로 전송되게 됩니다. 따라서, 우리는 팔로워 노드가 MsgAppend 메시지를 받은 후 어떻게 처리하는지를 봐야 합니다.

    팔로워 노드 분석

    다음으로 팔로워 노드에서 일어나는 일을 살펴보면 아래와 같습니다. 팔로워 노드에서 MsgAppend 메시지를 수신했을 때 일어나는 일을 알아보려면 step_follower()를 보면 됩니다.

    fn step_follower(&mut self, mut m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgAppend => {
                self.election_elapsed = 0;
                self.leader_id = m.from;
                self.handle_append_entries(&m);
            }
            ...
        }
    }
    

    위 코드를 통해 MsgAppend 메시지를 수신한 팔로워 노드가 handle_append_entries()를 호출하고 있는 것을 알 수 있습니다.

    이 함수는 아래처럼 MsgAppendResponse 타입의 메시지인 to_send를 만들고 RaftLog.maybe_append()를 호출합니다.

    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
    
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
            // MsgAppend 메시지를 수신
        } else {
            ...
            // MsgAppend 메시지를 거절
        }
        ...
        self.r.send(to_send, &mut self.msgs);
    }
    

    이 함수는 아래처럼 match_term()을 호출해 메시지의 logTerm과 로그 엔트리의 term 값이 같은지 확인하고, find_conflict()를 호출해 로그 엔트리 시퀸스에 충돌이 있는지 검사한 후 문제가 없다고 판단하면 Raft.append()를 호출합니다.

    pub fn maybe_append(
        &mut self,
        idx: u64,
        term: u64,
        committed: u64,
        ents: &[Entry],
    ) -> Option<(u64, u64)> {
        if self.match_term(idx, term) {
            let conflict_idx = self.find_conflict(ents);
            if conflict_idx == 0 {
            } else if conflict_idx <= self.committed {
                fatal!(
                    self.unstable.logger,
                    "entry {} conflict with committed entry {}",
                    conflict_idx,
                    self.committed
                )
            } else {
                let start = (conflict_idx - (idx + 1)) as usize;
                self.append(&ents[start..]);
    
                if self.persisted > conflict_idx - 1 {
                    self.persisted = conflict_idx - 1;
                }
            }
            let last_new_index = idx + ents.len() as u64;
            self.commit_to(cmp::min(committed, last_new_index));
            return Some((conflict_idx, last_new_index));
        }
        None
    }
    

    우리는 이 함수를 본 적이 있습니다. 리더 노드에서 로그 엔트리가 제안되었을 때 RaftLog.append()의 호출 전 마지막으로 호출된 함수였죠.

    이전과 마찬가지로 Raft.append_entry()RaftLog.append()를 호출해 엔트리들을 추가합니다. RaftLog.append()self.unstable.truncate_and_append()에서 Unstable 버퍼에 엔트리들을 append 합니다.

    이것으로 리더에 추가된 로그가 리더 노드에 persist 되고 팔로워 노드에 복사되는 시나리오를 간략하게 알아보았습니다.

    2 - 리더와 팔로워 노드 로그 시퀀스 불일치 시

    우리는 시나리오 1에서 정상적인 상황을 가정하고 코드를 들여다보았습니다. 하지만 실제로는 네트워크 단절 등의 이슈로 리더 노드와 팔로워 노드에 불일치가 생길 수 있습니다. 이번엔 리더 노드와 팔로워 노드 사이에 불일치가 생겼을 때 이를 어떻게 감지하고 해소하는지를 중심으로 다시 한번 코드를 들여다보겠습니다.

    3개의 노드로 이루어진 클러스터가 연속해서 상태 머신을 변경하는 수천 개의 요청을 처리하다가 네트워크 장애가 발생했다고 가정해봅시다.

    장애가 발생한 경우 코드부터 보는 게 아니라 우선 노드들에 출력된 로그들과 persist된 로그 엔트리들, 디버깅 정보들을 먼저 들여다 보며 맥락을 파악하는 것부터 시작해야 하지만, 글이 지나치게 장황해지는 것을 피하기 위해 노드들에 어떤 일들이 발생하고 있는지 대략적으로 파악하게 해 줄 로그만 골라서 분석해보겠습니다.

    우선 3번 노드에선 메시지를 reject 했음을 나타내는 rejected msgApp... 로그를 남기고 있습니다.

    Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7
    

    위 로그를 통해 3번 노드는 팔로워 노드, 2번 노드가 장애가 발생한 후 새로 선출된 리더 노드이며 3641 번째 엔트리를 복제하려는 MsgAppend 메시지가 거절되었다는 것을 알 수 있습니다.

    이 로그가 어떤 함수에서 출력된 것인지 찾아보면, 시나리오 1에서 살펴 보았었던 handle_append_entries()에서 호출하는 것을 알 수 있는데요. (팔로워가 리더로부터 받은 MsgAppend 메시지를 처리하는 함수)

    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
        ...
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
        } else {
            debug!(
                self.logger,
                "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
                from {from}",
                msg_log_term = m.log_term,
                msg_index = m.index,
                from = m.from;
                "index" => m.index,
                "logterm" => ?self.raft_log.term(m.index),
            );
    
            let hint_index = cmp::min(m.index, self.raft_log.last_index());
            let (hint_index, hint_term) =
                self.raft_log.find_conflict_by_term(hint_index, m.log_term);
    
            if hint_term.is_none() {
                fatal!(
                    self.logger,
                    "term({index}) must be valid",
                    index = hint_index
                )
            }
    
            to_send.index = m.index;
            to_send.reject = true;
            to_send.reject_hint = hint_index;
            to_send.log_term = hint_term.unwrap();
        }
    
        to_send.set_commit(self.raft_log.committed);
        self.r.send(to_send, &mut self.msgs);
    }
    

    함수를 살펴보면 이 로그가 출력되었다는 것에서 maybe_append()가 None을 리턴했다는 것, 즉 match_term()이 False를 반환했다는 것을 알 수 있습니다. 이것은 메시지의 logTerm과 3641번 엔트리의 term 값에 불일치가 발생했다는 것을 의미합니다.

    따라서 term을 통해 충돌한 지점을 찾고 (find_conflict_by_term()) 충돌한 지점(hint_index)을 메시지의 reject_hint에 넣어 리더에 MsgAppendResponse 메시지 형태로 되돌려 줍니다.

    그럼 리더는 이 거절된 MsgAppendResponse 메시지를 어떻게 처리할까요?

    메시지를 거절한 리더 노드는 아래와 같은 MsgAppend가 거절했다는 로그를 남기게 됩니다.

    Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611
    

    따라서 우리가 그 다음으로 들여다 보아야 하는 것은 이 거절된 MsgAppend 메시지를 받은 후 "received msgAppend rejection"를 출력하는 함수입니다.

    이 함수는 handle_append_response()인데요, 함수 자체는 꽤 길지만 MsgAppend 메시지가 reject 되었을 때의 처리만 잘라놓고 보면 그리 길지 않습니다.

    fn handle_append_response(&mut self, m: &Message) {
        let mut next_probe_index: u64 = m.reject_hint;
        ...
        if m.reject {
            debug!(
                self.r.logger,
                "received msgAppend rejection";
                "reject_hint_index" => m.reject_hint,
                "reject_hint_term" => m.log_term,
                "from" => m.from,
                "index" => m.index,
            );
    
            if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
                debug!(
                    self.r.logger,
                    "decreased progress of {}",
                    m.from;
                    "progress" => ?pr,
                );
                if pr.state == ProgressState::Replicate {
                    pr.become_probe();
                }
    
                self.send_append(m.from);
            }
            return;
        }
        ...
    }
    

    메시지의 reject_hint를 가져와 next_probe_index로 만들고, Progress.maybe_decr_to()를 호출해 progress를 감소시킵니다. Progress가 Probe 상태임을 표시하고, send_append()를 호출해 다시 MsgAppend 메시지를 보내줍니다.

    💡 ProgressState는 각 노드들의 동기화 진행 상태를 나타내는 enum 입니다. 정상적인 상황, 로그를 복제하고 있는 상태에선 "Replicate" 이며, 복제된 마지막 인덱스를 모르고 있는 팔로워 노드는 조사 중인 상태란 의미로 "Probe", 스냅샷 전송을 통해 팔로워에 로그를 복제 중인 경우 "Snapshot"입니다.

    요약하자면 충돌이 발생하기 전 로그 엔트리의 인덱스 (next_probe_index)를 찾기 위해 해당 노드의 progress를 감소시키고 다시 MsgAppend 메시지를 보낸다는 것입니다. 이 과정은 리더와 팔로워 노드의 Common log prefix를 찾게 될 때까지 반복됩니다.

    Common log prefix를 찾게 되면, 해당 인덱스 이후의 로그 엔트리들은 리더로부터 팔로워로 단방향으로 복제되어 덮어쓰게 됩니다. 이 과정은 maybe_send_append() 함수에서 확인할 수 있습니다.

    아래와 같이 RaftLog.entries를 통해 얻어진 로그 엔트리들이 SendAppend 컨텍스트로 복제됩니다. 이때 max_msg_size는 Config의 max_size_per_msg이며, 이 값의 디폴트 값은 0입니다. RaftLog.entries를 통해 LMDBStorage.entries()의 (RaftLog의 T에 해당하는 persistent 스토리지 타입) 인자의 max_size0이 주어지는데, 이 주석을 토대로 생각해보면 이것의 의미는 따로 설정해 주지 않으면 리더와 팔로워 노드의 로그에 불일치가 발생했을 때 로그 엔트리를 한 개씩 동기화하란 의미임을 알 수 있습니다.

    이후엔 이전 섹션에서 설명한 것과 같이 prepare_send_entries()를 통해 MsgAppend 메시지를 준비하고, Raft.send()를 통해 팔로워 노드로 로그 엔트리들을 복제하기 위한 메시지가 전달됩니다.

    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let mut m = Message::default();
        m.to = to;
        if pr.pending_request_snapshot != INVALID_INDEX {
            ...
        } else {
            let ents = self.raft_log.entries(
                pr.next_idx,
                self.max_msg_size,
                GetEntriesContext(GetEntriesFor::SendAppend {
                    to,
                    term: self.term,
                    aggressively: !allow_empty,
                }),
            );
            ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        }
        self.send(m, msgs);
        true
    }
    

    중간에 많은 로그들이 생략되어 있지만 리더와 팔로워 사이에 3612 번째 엔트리부터 3642번째 엔트리 까지 위와 같은 과정을 거쳐 동기화가 일어난 후, 팔로워 노드로의 복제가 모두 끝나면 해당 팔로워의 ProgressStateReplicate로 변하며, 정상적으로 Heartbeat 메시지를 주고 받기 시작하는 것을 알 수 있습니다.

    2023-11-28 14:30:59,269 - INFO     - Entries [3612, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3613, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3614, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3615, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    ...
    
    2023-11-28 14:30:59,284 - INFO     - Entries [3641, 3643) requested
    Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,284 - INFO     - Entries [3642, 3643) requested
    Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:31:01,637
    

    3 - 리더 선출

    시나리오 2에서 네트워크 장애로 인해 리더 선출이 일어났었던 것을 term 값의 증가를 통해 확인할 수 있었는데요, 이 시나리오에선 이 리더 선출 과정에 대해 자세히 들여다보도록 하겠습니다.

    리더에 장애가 생긴 경우 어떤 로그들이 찍히게 되는지 확인하기 위해 간단하게 3개의 노드로 이뤄진 클러스터를 만들고 리더 프로세스를 강제로 종료시켜 본 후 새로 리더로 선출되는 프로세스의 로그를 들여다보겠습니다.

    로그의 내용을 요약해보자면 리더 노드가 종료된 후, 3번 노드에서 선거를 시작하고 후보자(Candidate) 상태로 전이한 후 다른 voter들에게 MsgRequestVote 메시지를 보냅니다. 2번 노드로부터 MsgRequestVoteResponse 메시지를 받고, 자신은 본인에게 투표하기 때문에 과반수 이상의 투표를 받게 되어 새 리더로 선출된 후 term 값을 2로 증가시키고 자신이 리더로 선출되었음을 알리기 위한 특수한 종류의 메시지(Empty MsgAppend)를 보내는 과정이라고 요약할 수 있습니다.

    💡 election_tick 만큼 heartbeat 메시지를 받지 못한 팔로워 노드가 투표를 시작하게 됩니다. 이 때 투표 분열(Split vote)을 방지하기 위해 election_tick은 매번 min_election_tick ~ max_election_tick 사이에서 무작위 값으로 결정됩니다. 따라서 리더 노드가 종료된 후 나머지 두 노드들 중 어떤 노드라도 리더 노드가 될 수 있으며 이는 더 작은 election_tick을 가진 노드로 선출됩니다.

    Nov 29 01:30:30.210 INFO starting a new election, term: 1
    Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16
    Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2
    Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote
    2023-11-29 10:30:30,217 - WARNING  - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with:
         status = StatusCode.UNAVAILABLE
         details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused"
         debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}"
    >
    2023-11-29 10:30:30,222 - DEBUG    - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }
    Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true
    Nov 29 01:30:30.223 TRCE ENTER become_leader
    Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10
    Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }
    Nov 29 01:30:30.223 INFO became leader at term 2, term: 2
    Nov 29 01:30:30.223 TRCE EXIT become_leader
    Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    

    그럼 이제 로그 내용을 바탕으로 코드에서 어떤 일들이 벌어지고 있는지 알아 봅시다.

    우선 "starting a new election" 이라는 로그를 출력하고 있는 함수는 hup() 입니다.

    hup()step()MsgHup 타입, step_follower()MsgTimeoutNow 타입의 메시지에 대한 처리 과정에서 호출됩니다.

    여기서 MsgTimeoutNow 메시지는 Leader election이 아닌, Leader transfer에 사용되는 메시지 타입입니다. 즉 리더가 MsgTransferLeader 메시지를 받게 되면 팔로워들에게 MsgTimeoutNow 타입의 메시지를 전송하게 되고 transfer_leader 플래그를 True로 둔 채 hup() 함수가 실행되게 됩니다. Leader election은 리더 장애 등의 상황으로 리더를 새로 선출하는 과정이지만, Leader transfer은 리더 프로세스가 다른 팔로워 프로세스에게 리더를 양도하는 과정입니다.

    그러므로 우리가 지금 따라가보아야 하는 메시지는 MsgHup 임을 알 수 있습니다. election_tick이 지났는데도 Heartbeat를 받지 못했기 때문에 리더 선출을 시작했다는 점을 통해 MsgHup 메시지를 넣어준 것이 아래 tick_election() 함수인 것을 추측해 볼 수 있습니다.

    RaftNode에서 tick_timer마다 self.raw_node.tick()을 호출했던 것을 기억하시나요? 이 RawNode.tick()을 통해 노드가 election_elapsedrandomized_election_timeout를 경과한 경우 자기 자신에게 MsgHup 메시지를 step 하게 되는 것입니다. (여기서 election_elapsed을 랜덤화하는 것은 모든 노드가 동시에 투표를 시작해 모든 노드가 자기 자신에게 투표하는 상황을 방지하기 위한 것입니다.)

    // raw_node.rs
    pub fn tick(&mut self) -> bool {
        self.raft.tick()
    }
    
    // raft.rs
    pub fn tick(&mut self) -> bool {
        match self.state {
            StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
                self.tick_election()
            }
            StateRole::Leader => self.tick_heartbeat(),
        }
    }
    
    // raft.rs
    pub fn tick_election(&mut self) -> bool {
        self.election_elapsed += 1;
        if !self.pass_election_timeout() || !self.promotable {
            return false;
        }
        
        self.election_elapsed = 0;
        let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
        let _ = self.step(m);
        true
    }
    
    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            ...
            MessageType::MsgHup => {
                self.hup(false)
            },
        }
    }
    
    // raft.rs
    pub fn pass_election_timeout(&self) -> bool {
        self.election_elapsed >= self.randomized_election_timeout
    }
    

    hup() 함수는 간단하게 요약해보자면 아래처럼 campaign() 함수를 CAMPAIGN_ELECTION 타입으로 실행합니다.

    fn hup(&mut self, transfer_leader: bool) {
        ...
        info!(
            self.logger,
            "starting a new election";
            "term" => self.term,
        );
    
        ...
        self.campaign(CAMPAIGN_ELECTION);
    }
    

    campaign() 함수는 아래처럼 자신의 상태를 Candidate 상태로 전이시킨 후 투표를 시작합니다.

    우선 self_id는 이름대로 노드 자신의 id입니다. 따라서 self.poll(self_id, vote_msg, true)는 자기 자신에게 투표한다는 의미입니다.

    이 결과가 VoteResult::Won인 경우 그대로 투표에서 승리하며 노드 본인이 리더가 되고 리턴합니다.

    따라서 MsgRequestVote, MsgRequestVoteResponse 등의 메시지는 싱글 노드 클러스터에서 오고 가지 않을 것임을 알 수 있습니다.

    하지만 물론 이 시나리오는 싱글 노드 클러스터가 아니기 때문에 경우에 해당하지 않습니다.

    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        ...
    }
    

    campaign()의 뒷부분을 더 들여다보기 전에 poll()은 어떻게 동작하는 것인지 알아봅시다.

    poll()은 아래처럼 record_vote(), tally_votes()를 호출하는 함수이며, 투표 결과에 따라 투표에서 승리했다면 리더 노드로 전이한 후, 자신이 클러스터의 새 리더라는 것을 브로드캐스팅 (bcast_append()) 합니다.

    투표에서 진 경우 팔로워 노드로 전이하며, 결과가 Pending인 경우 아무일도 수행하지 않고 리턴합니다.

    fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
        self.prs.record_vote(from, vote);
        let (gr, rj, res) = self.prs.tally_votes();
        if from != self.id {
            info!(
                self.logger,
                "received votes response";
                "vote" => vote,
                "from" => from,
                "rejections" => rj,
                "approvals" => gr,
                "type" => ?t,
                "term" => self.term,
            );
        }
    
        match res {
            VoteResult::Won => {
                if self.state == StateRole::PreCandidate {
                    self.campaign(CAMPAIGN_ELECTION);
                } else {
                    self.become_leader();
                    self.bcast_append();
                }
            }
            VoteResult::Lost => {
                let term = self.term;
                self.become_follower(term, INVALID_ID);
            }
            VoteResult::Pending => (),
        }
        res
    }
    

    record_vote()의 역할은 아주 단순합니다. id 값을 가진 노드가 자기 자신에게 투표했을 때 ProgressTracker의 해시맵 객체 votes에 기록하는 함수입니다.

    pub fn record_vote(&mut self, id: u64, vote: bool) {
        self.votes.entry(id).or_insert(vote);
    }
    

    tally_votes를 봅시다. 해시맵 votes를 통해 자기 자신에게 투표한 노드의 수와 거절한 노드의 수를 세서 튜플 형태로 리턴해주고 있는 것을 볼 수 있습니다.

    💡 "tally"라는 단어는 점수를 세거나 집계하는 행위를 의미합니다. 즉 "tally_votes"는 투표를 세서 집계하는 함수입니다.

    pub fn tally_votes(&self) -> (usize, usize, VoteResult) {
        let (mut granted, mut rejected) = (0, 0);
        for (id, vote) in &self.votes {
            if !self.conf.voters.contains(*id) {
                continue;
            }
            if *vote {
                granted += 1;
            } else {
                rejected += 1;
            }
        }
        let result = self.vote_result(&self.votes);
        (granted, rejected, result)
    }
    

    투표 결과를 어떻게 판단하는지 들여다볼까요?

    조인트 쿼럼의 경우 두 쿼럼 (Incoming quorum, Outgoing quorum)의 동의를 모두 얻어야 투표에서 승리할 수 있습니다.

    따라서 우리는 아래 세 vote_result() 함수를 들여다봐야 합니다.

    tracker.rs에선 해시맵 votes를 통해 노드 id가 자신에게 투표했는지 알 수 있게 해 주는 콜백 함수 check를 인자로 넘겨 줍니다.

    joint.rs에선 두 구성에서 모두 승리한 경우에만 VoteResult::Won를 리턴하고, 한 쪽에서라도 투표에서 졌다면 VoteResult::Lost를 리턴합니다. 그 외의 경우인 경우 VoteResult::Pending를 리턴합니다.

    득표 수를 실제로 카운트하는 작업은 majority.rsvote_result()에서 진행됩니다.

    클러스터의 voter들 중 자기 자신에게 투표한 노드의 수와 투표하지 않은 노드의 수를 세서 과반수보다 많은 노드들이 동의한 경우 VoteResult::Won, 과반수 이상의 투표를 얻지 못했지만 응답을 보내주지 못한 노드까지 포함시켰을 때 과반수를 넘는다면 VoteResult::Pending, 그 이외의 경우 VoteResult::Lost를 반환합니다.

    // tracker.rs
    pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult {
        self.conf.voters.vote_result(|id| votes.get(&id).cloned())
    }
    
    // joint.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        let i = self.incoming.vote_result(&check);
        let o = self.outgoing.vote_result(check);
        match (i, o) {
            // It won if won in both.
            (VoteResult::Won, VoteResult::Won) => VoteResult::Won,
            // It lost if lost in either.
            (VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
            // It remains pending if pending in both or just won in one side.
            _ => VoteResult::Pending,
        }
    }
    
    // majority.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        ...
    
        let (mut yes, mut missing) = (0, 0);
        for v in &self.voters {
            match check(*v) {
                Some(true) => yes += 1,
                None => missing += 1,
                _ => (),
            }
        }
        let q = crate::majority(self.voters.len());
        if yes >= q {
            VoteResult::Won
        } else if yes + missing >= q {
            VoteResult::Pending
        } else {
            VoteResult::Lost
        }
    }
    
    // util.rs
    pub fn majority(total: usize) -> usize {
        (total / 2) + 1
    }
    

    투표 과정이 votes 해시맵을 기반으로 어떻게 진행되는지 살펴보았습니다. 하지만 이 과정을 밟기 전 MsgRequestVote, MsgRequestVoteResponse 메시지를 통해 이 해시맵이 적절하게 업데이트 되어야 합니다.

    따라서 campaign() 함수를 계속 따라가 보도록 합시다.

    campaign() 함수가 MsgRequestVote 타입의 메시지를 만들어 voter들에게 전송하고 있다는 것을 알 수 있습니다.

    따라서 그 다음으론 MsgRequestVote 메시지의 핸들러를 따라가 봅시다.

    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        // Only send vote request to voters.
        for id in self.prs.conf().voters().ids().iter() {
            if id == self_id {
                continue;
            }
            ...
            let mut m = new_message(id, vote_msg, None);
            m.term = term;
            m.index = self.raft_log.last_index();
            m.log_term = self.raft_log.last_term();
            m.commit = commit;
            m.commit_term = commit_term;
            ...
            self.r.send(m, &mut self.msgs);
        }
        ...
    }
    

    얼핏 보면 복잡해 보이지만 결국 MsgRequestVote 메시지의 핸들러가 하는 일은 이 투표에 동의하거나 동의하지 않는다는 메시지를 만들어 전송해주는 일입니다.

    vote_resp_msg_type에 따라 우리가 보낸 메시지 타입은 MsgRequestVote이므로 응답 메시지의 타입은 MsgRequestVoteResponse가 될 것입니다. (이 글에선 prevote 알고리즘에 대한 설명은 생략합니다)

    그럼 노드가 언제 투표에 동의하고 언제 동의하지 않는지 살펴봅시다. 주석과 함께 코드를 찬찬히 살펴보면 투표에 동의하기 위해선 아래 세 조건이 만족되어야 함을 알 수 있습니다.

    1. can_votetrue (이미 해당 노드에 투표한 경우이거나, 이번 term에서의 leader_id를 모르고 아직 투표 하지 않은 경우)

    2. self.raft_log.is_up_to_datetrue (메시지의 term 값이 RaftLog.last_term 보다 크거나, 만약 같다면 RaftLog.last_index보다 메시지의 인덱스가 큰 경우)

    3. 메시지의 인덱스가 RaftLog.last_index 보다 크거나, 더 높은 우선 순위를 갖는 경우

    이 세 조건이 만족된 경우 Vote에 동의하며 만족되지 않는 조건이 있다면 Vote를 거절한다는 메시지를 보냅니다.

    그럼 이제 MsgRequestVoteResponse의 수신부로 넘어가봅시다.

    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgRequestVote => {
                // We can vote if this is a repeat of a vote we've already cast...
                let can_vote = (self.vote == m.from) ||
                    // ...we haven't voted and we don't think there's a leader yet in this term...
                    (self.vote == INVALID_ID && self.leader_id == INVALID_ID)
      
                // ...and we believe the candidate is up to date.
                if can_vote
                    && self.raft_log.is_up_to_date(m.index, m.log_term)
                    && (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m))
                {
                    self.log_vote_approve(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = false;
                    to_send.term = m.term;
                    self.r.send(to_send, &mut self.msgs);
                    if m.get_msg_type() == MessageType::MsgRequestVote {
                        // Only record real votes.
                        self.election_elapsed = 0;
                        self.vote = m.from;
                    }
                } else {
                    self.log_vote_reject(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = true;
                    to_send.term = self.term;
                    let (commit, commit_term) = self.raft_log.commit_info();
                    to_send.commit = commit;
                    to_send.commit_term = commit_term;
                    self.r.send(to_send, &mut self.msgs);
                    self.maybe_commit_by_vote(&m);
                }
            }
        }
    }
    
    // raft.rs
    pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
        match t {
            MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
            MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
            _ => panic!("Not a vote message: {:?}", t),
        }
    }
    
    // raft_log.rs
    pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool {
        term > self.last_term() || (term == self.last_term() && last_index >= self.last_index())
    }
    

    MsgRequestVoteResponse 메시지 핸들러는 매우 단순합니다!

    우리가 아까 봤었던 poll() 함수를 호출하여 votes 해시맵을 업데이트 하고 투표 결과가 결정된 경우 StateRole을 업데이트 합니다.

    fn step_candidate(&mut self, m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgRequestVoteResponse => {
                ...
                self.poll(m.from, m.get_msg_type(), !m.reject);
                self.maybe_commit_by_vote(&m);
            }
        }
    }
    

    정리

    이 글에선 raft-rs에서 사용되는 타입들을 바탕으로 코드 아키텍쳐를 살펴본 후, 세 가지 기초적인 시나리오를 바탕으로 raft 구현체의 코드를 따라가며 분석해보았습니다. 이 글이 raft 모듈에 대한 이해를 넓히는데 도움이 되었기를 바랍니다. 다음 글에선 좀 더 다양한 시나리오들을 통해 raft 구현체의 작동 방식을 보다 깊이 살펴보도록 하겠습니다.

    감사합니다 😊

    29 March 2024

  • Introducing Raftify: 확장성에 초점을 맞춰 개발된 High-level Raft Framework

    By 이규봉

    안녕하세요, 저는 작년부터 Lablup에서 Backend.AI 매니저 프로세스에 Raft를 도입하는 작업을 맡아 수행하고 있습니다.

    제가 수행 중인 관련 작업을 대략적으로 나타내어 보면 아래와 같습니다.

    1. Backend.AI 매니저 프로세스에 Raft를 도입해 리더-팔로워 구조로 만드는 것.
    2. 기존 분산 락 기반의 GlobalTimer를 Raft 기반의 글로벌 타이머로 변경하고, 클러스터에서 특정 작업이 정확히 한 번만 수행되도록 보장하는 것.
    3. 매니저 프로세스 간 공유 가능한 전역적인 상태 저장소를 매니저 프로세스에 내장시키고 적절하게 동기화하는 것.

    이 글에선 이러한 작업을 수행하기 위해 제가 지난 1년간 삽질하며 개발하게 된 Raft 프레임워크와 이를 개발하며 마주친 여러 이슈들에 대해 소개드리고 총 300줄이 되지 않는 간략한 코드를 통해 분산 키값 저장소를 구현하는 raftify 예제 코드에 대해 설명드려 보도록 하겠습니다.

    raftify 소개

    raftify는 어떤 서버 애플리케이션과도 쉽게 통합될 수 있도록 확장성에 초점을 맞추어 개발된 Raft 프레임워크입니다.

    raftify는 프로덕션에서 활용되고 있는 Raft 구현체들 중 tikv의 raft-rs 구현체 위에 LMDB를 stable storage로, gRPC를 네트워크 계층으로 사용해 개발되었습니다.

    raft 모듈 바인딩

    저는 신뢰할 수 있는 Raft 구현체를 밑바닥부터 모두 쌓아올려 유지 보수하는 것은 현실적으로 큰 짐이 될 수 있다고 판단해 우선 Raft 모듈의 파이썬 바인딩을 작성해보기로 결정했습니다.

    그래서 처음에는 GitHub에서 가장 스타를 많이 받은 Raft 구현체인 hashicorp/raft 구현체를 gopy를 사용해 파이썬 바인딩을 작성해보면 어떨까 생각했습니다.

    하지만 gopy는 고루틴에 대한 바인딩을 지원해주지 못했고 최신 파이썬 버전도 지원해 주지 않고 있었습니다.

    그러던 참에 사내 시니어 개발자님의 조언을 통해 tikv/raft-rs란 Rust 구현체와 PyO3에 대해 알게 되며 PyO3를 통해 tikv/raft-rs의 파이썬 바인딩을 작성해 보아야겠다고 생각하게 되었습니다.

    rraft-py

    그렇게 rust, raft, py를 합쳐 rraft-py란 이름으로 Raft 모듈의 파이썬 바인딩 개발에 도전해보게 되었습니다.

    rraft-py를 개발하면서 가장 먼저 신경 쓴 것은 rust 코드와 파이썬 코드의 의미가 가능한 1:1 매칭이 되도록 만들어야겠다는 것이었습니다.

    1:1 매칭이 가능하려면 러스트의 문법에 관련된 세부 사항들을 잘 우회할 필요가 있었습니다.

    제가 당시에 제일 고민했던 것은 러스트의 참조를 파이썬 측으로 어떻게 노출해주어야 좋을지에 관련된 것이었으며, 관심이 있으시다면 해당 파이콘 발표 영상을 참고하실 수 있습니다.

    이렇게 개발된 rraft-py는 1만 줄 이상의 raft-rs의 통합 테스트 코드를 그대로 포팅해 파이썬에서 바로 사용할 수 있는 나름 신뢰할 수 있는 Raft 바인딩 구현체가 되었습니다.

    현재 raftify는 Rust로 완전히 재작성하는 과정을 거친 후 rraft-py를 사용하지 않게 되었지만 처음으로 PyO3 바인딩을 작성해보고 Raft 구현체의 API들을 사용해보는 좋은 경험이 되었습니다.

    riteraft-py

    rraft-py를 개발하고 raft-rs의 1만 줄 가량의 통합 테스트들과 multiple-mem-node example까지 파이썬 코드로 포팅해 정상적으로 동작하도록 개발한 후 든 생각은 여전히 어디에서부터 시작해야 할지 모르겠다는 것이었습니다.

    raft-rs는 정말 Raft 구현체 자체만을 제공했고 이것을 어떻게 애플리케이션에 통합할 수 있을지 전혀 감이 잡히지 않았습니다.

    Github을 찾아보던 중 How to use this lib?란 이슈에서 riteraft란 이름으로 tikv/raft-rs를 기반으로 하는 하이 레벨의 Rust 구현체를 발견하게 되었고, 해당 라이브러리는 훨씬 직관적으로 사용 방법을 파악할 수 있었습니다. 그래서 저는 파이썬에서 이것의 동작을 그대로 모방해 애플리케이션 레벨에 통합하는 것을 목표로 riteraft-py를 개발하기로 결심했습니다.

    riteraft는 Raft 모듈과 로그, 상태 머신, 네트워크 계층과 이 Raft 구현체를 직접 통합하는 일을 수행하는데요, 문제는 직관적인 사용법과 별개로 제대로 동작하지 않는다는 점이었습니다.

    리더가 죽었는데 Leader election이 일어나지 않는 문제, 특정 시나리오에서 데이터 복제가 일어나지 않는 문제, 커밋 갯수가 255개를 넘어갈 때 일어나는 패닉 등... 온갖 잡다한 이슈를 모두 해결해야 했습니다.

    위 이슈들을 모두 해결하고 클러스터가 동작하는 것 처럼 보이게 만든 후에도 이슈는 계속해서 발생했습니다. 잘 동작하는 것 같다가도 특정 장애 상황을 마주하면 클러스터 일관성이 깨지거나 로그 동기화가 락인 되는 등 치명적인 문제들이 발생했습니다.

    이슈가 발생할 때마다 매번 raft-rs의 기술적인 세부 사항들을 들여다보고 이해할 수 있어야 했으며 이 과정은 결국 raft-rs의 코드를 뜯어보고 하나 하나 이해해 나가는 과정을 요구했습니다.

    raftify

    이슈를 해결하는 과정에서 riteraft와 다른 추상화를 사용하기로 결정했고 노드와 클러스터 상태를 디버깅 하기 위한 CLI 모듈 등 여러 변경 사항들을 구현하면서 라이브러리 이름을 raftify로 변경하게 되었습니다.

    해당 라이브러리를 처음 개발하기 시작할 땐 어떤 파이썬 애플리케이션과도 잘 호환될 수 있도록 하는 것을 목표로 개발했기 때문에 raft화 시키겠다는 의미로 raftify라는 이름을 붙였습니다.

    파이썬 구현체는 현재는 더 이상 개발하고 있지 않지만 해당 브랜치에서 확인할 수 있습니다.

    raftify written in Rust

    rraft-py 위에 파이썬으로 개발된 raftify는 결과적으로 잘 작동되긴 했지만 멀티 프로세스 구조로 작성된 조잡한 테스트 하네스는 CI에서 테스트 하기 힘들었고 쉽게 클러스터 일관성이 깨졌으며 코드가 조금만 복잡해져도 제어하기 힘들어졌습니다.

    결과적으로 raftify 내부 로직을 러스트로 완전히 재작성하고 Raft 패키지의 하이 레벨에서의 인터페이스만을 파이썬으로 노출시키기로 결정하게 되었습니다.

    그렇게 완전히 러스트로 재작성된 raftify는 싱글 스레드만으로 통합 테스트 수행이 가능했고, CI에서 테스트할 수 있어 코드 변경의 두려움을 없애도록 도와주었습니다.

    raftify 예제 코드

    이 섹션에선 raftify를 사용해 간단한 분산 키값 저장소를 만들어봅니다.

    전체 소스 코드는 해당 링크를 참고하세요.

    상태 머신 정의

    우선은 키값 저장소에서 사용할 로그 엔트리와 상태 머신을 정의해야 합니다.

    이 글에선 간단하게 로그 엔트리로 값을 정의하는 Insert라는 타입의 명령어만 정의해보겠습니다.

    💡 이 글에선 Rust 문법, Raft의 이론적 배경에 대해 설명하지 않습니다.

    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub enum LogEntry {
        Insert { key: u64, value: String },
    }
    

    상태 머신은 HashMap 타입으로 아래처럼 정의해보겠습니다.

    #[derive(Clone, Debug)]
    pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
    

    그런 다음 이 자료구조들을 어떻게 직렬화, 역직렬화 할지 나타낼 encode, decode 메서드를 정의해주어야 합니다. bincode 크레이트를 사용해 아래처럼 간단하게 정의할 수 있습니다.

    impl AbstractLogEntry for LogEntry {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(self).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<LogEntry> {
            let log_entry: LogEntry = deserialize(bytes)?;
            Ok(log_entry)
        }
    }
    
    impl AbstractStateMachine for HashStore {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<Self> {
            let db: HashMap<u64, String> = deserialize(bytes)?;
            Ok(Self(Arc::new(RwLock::new(db))))
        }
    }
    

    마지막으로 HashStore에 raftify 내부 코드에서 사용될 세 가지 메서드를 정의하면 됩니다.

    HashStore에 새 로그 엔트리가 적용될 때 호출될 메서드인 apply, 현재 HashStore의 상태를 스냅샷으로 저장할 때 호출될 snapshot, 스냅샷 바이트 슬라이스를 통해 HashStore의 상태를 복구할 때 호출될 restore를 아래처럼 정의해줍니다.

    #[async_trait]
    impl AbstractStateMachine for HashStore {
        async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
            let log_entry: LogEntry = LogEntry::decode(&data)?;
            match log_entry {
                LogEntry::Insert { ref key, ref value } => {
                    let mut db = self.0.write().unwrap();
                    log::info!("Inserted: ({}, {})", key, value);
                    db.insert(*key, value.clone());
                }
            };
            Ok(data)
        }
    
        async fn snapshot(&self) -> Result<Vec<u8>> {
            Ok(serialize(&self.0.read().unwrap().clone())?)
        }
    
        async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
            let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
            let mut db = self.0.write().unwrap();
            let _ = std::mem::replace(&mut *db, new);
            Ok(())
        }
    }
    

    웹 서버 API 정의

    예제에서 사용될 웹 서버 API를 정의해봅시다. 이 API를 통해 노드의 Raft 객체에 접근해 HashStore를 조작할 것입니다.

    예제에선 actix-web 크레이트를 사용해 아래처럼 정의해보도록 하겠습니다.

    put 명령은 Raft 객체의 RaftNode에서 propose 메서드를 호출함으로써 구현할 수 있습니다. 이전에 정의한 Insert 타입 LogEntry를 인코딩해 RaftNode::propose 메서드의 인자에 넘겨주면 됩니다.

    get 명령은 인메모리에 저장되어 있는 HashMap에서 id에 해당하는 값을 리턴하는 것으로 구현할 수 있습니다.

    #[get("/put/{id}/{value}")]
    async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder {
        let log_entry = LogEntry::Insert {
            key: path.0,
            value: path.1.clone(),
        };
        data.1.raft_node.propose(log_entry.encode().unwrap()).await;
    
        "OK".to_string()
    }
    
    #[get("/get/{id}")]
    async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder {
        let id = path.into_inner();
    
        let response = data.0.get(id);
        format!("{:?}", response)
    }
    
    

    Raft 클러스터 부트스트랩

    이제 RaftNode들의 클러스터를 부트스트랩 시켜 봅시다.

    이 예제에서는 아래와 같은 구성을 갖는 Raft 클러스터를 부트스트랩할 것입니다.

    아래와 같은 구성에서는 세 개의 노드가 voter로서 부트스트랩되므로 클러스터에 리더가 존재하지 않아 election_timeout 후 바로 리더를 선출하게 됩니다.

    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60061
    node_id = 1
    role = "voter"
    
    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60062
    node_id = 2
    role = "voter"
    
    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60063
    node_id = 3
    role = "voter"
    
    let options = Options::from_args();
    let store = HashStore::new();
    let initial_peers = load_peers().await?;
    
    let mut cfg = build_config();
    cfg.initial_peers = Some(initial_peers.clone());
    
    let node_id = initial_peers
        .get_node_id_by_addr(options.raft_addr.clone())
        .unwrap();
    
    let raft = Raft::bootstrap(
        node_id,
        options.raft_addr,
        store.clone(),
        cfg.clone(),
        logger.clone(),
    )?;
    
    let handle = tokio::spawn(raft.clone().run());
    
    // ...
    tokio::try_join!(handle)?;
    

    그리고 이 Raft 서버와 통신하기 위한 웹 서버를 붙여 줍시다.

    if let Some(addr) = options.web_server {
        let _web_server = tokio::spawn(
            HttpServer::new(move || {
                App::new()
                    .app_data(web::Data::new((store.clone(), raft.clone())))
                    .service(put)
                    .service(get)
            })
            .bind(addr)
            .unwrap()
            .run(),
        );
    }
    

    이제 터미널에서 아래처럼 세 노드로 이뤄진 Raft 클러스터를 부트스트랩 시킬 수 있습니다.

    $ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --web-server=127.0.0.1:8002
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --web-server=127.0.0.1:8003
    

    테스트

    이제 curl 명령을 통해 actix-web 서버 API를 통해 우리가 정의한 키값 저장소를 사용해 볼 수 있습니다.

    ❯ curl http://localhost:8001/put/1/test
    OK
    
    ❯ curl http://localhost:8001/get/1
    Some("test")
    

    더 자세한 내용이 궁금하시다면 raftify 레포지토리에서 디버깅을 도와주는 CLI 모듈의 사용법, RaftServiceClient의 예제 코드 등을 확인하실 수 있습니다.

    정리

    raftify는 일반 개발자 입장에서 접근하기 쉽지 않은 Raft 모듈을 누구나 쉽게 통합시킬 수 있도록 만드는 것을 목표로 하고 있는 실험적인 프레임워크입니다.

    Backend.AI 매니저 프로세스들에 리더-팔로워 구조를 도입하겠다는 목적으로 개발되었지만 이 글에서 설명드린 것 처럼 짧은 소스 코드로 자신만의 간단한 분산 키값 저장소를 만드는 등 HA 구조가 필요한 곳에서 다양하게 활용될 수 있을 것으로 보입니다.

    혹시 tikv/raft-rs 구현체 내부 동작 방식에 흥미가 생기셨다면 다음 글에서 몇 가지 시나리오에서 일어나는 일들을 소스코드 내부를 한 줄 한 줄 따라가며 분석해 볼 예정이니 기대해주시면 감사하겠습니다.

    26 January 2024

도움이 필요하신가요?

내용을 작성해 주시면 곧 연락 드리겠습니다.

문의하기

본사 및 HPC 연구소

서울특별시 강남구 선릉로 577 CR타워 8층

© Lablup Inc. All rights reserved.