엔지니어링

May 29, 2024

엔지니어링

실제로 동작하는 Raft 구현체 뜯어 보기 - 2

  • 이규봉

    소프트웨어 엔지니어

May 29, 2024

엔지니어링

실제로 동작하는 Raft 구현체 뜯어 보기 - 2

  • 이규봉

    소프트웨어 엔지니어

지난 포스팅 에서는 raft-rs 타입들을 중심으로한 전체적인 개요와 시스템에 네트워크 장애가 발생했을 때 리더 선출이 어떤 식으로 이뤄지는지, 어떻게 로그 비일관성을 해소하고 장애를 극복한 후 일관적인 상태를 유지하게 되는지 세 가지 시나리오를 기반으로 알아보았습니다.

이 글에선 지난 글에 이어 Raft 구현체의 동작 방식을 몇몇 시나리오에 걸쳐 살펴보겠습니다.

이번에 살펴볼 시나리오는 Raft 클러스터의 상태를 어떤 과정을 거쳐 Stable storage에 저장하고, 클러스터를 다시 부트스트랩 했을 때 어떻게 이전 상태를 로그와 스냅샷으로부터 복구하게 되는지 알아보겠습니다.

💡 Raftify는 Lablup에서 개발한 하이레벨의 Raft 구현체입니다. Raftify에 대해 궁금하시다면 해당 포스팅을 참고해보세요.

타입을 중심으로 살펴보는 raft-rs 아키텍쳐

이번 글에서도 마찬가지로 시나리오 분석에 앞서 raft-rs의 타입들 중 이번 글에 등장할 몇몇 타입들을 알아보도록 하겠습니다.

ConfState

클러스터는 여러 노드들로 구성되어 있으며 각 노드들은 장애 발생으로 인한 투표 상황에서 투표에 참여할 지의 여부에 따라 voterlearner로 나뉩니다. voterlearner 모두 클러스터 구성원으로서 클러스터로부터 합의를 공유하지만 learner의 경우 투표에 참여하지 않습니다.

이러한 클러스터 구성원들에 대한 정보 역시 클러스터 구성원 간의 합의에 포함되며, 그렇기 때문에 로그 엔트리를 적용함으로써 구성되거나 변경될 수 있습니다.

💡 raft-rs의 EntryType은 이런 ConfState 구성 변경을 위한 EntryConfChange 타입과 일반적인 상태 변경을 위한 EntryNormal 타입으로 나뉩니다.

raft-rs에서 사용되는 타입들 중 네트워크 계층에 사용되는 타입들은 eraftpb.proto 파일에 정의되어 있으며 tonic에 의해 러스트 코드로 컴파일 됩니다.

message ConfState { repeated uint64 voters = 1; repeated uint64 learners = 2; // The voters in the outgoing config. If not empty the node is in joint consensus. repeated uint64 voters_outgoing = 3; // The nodes that will become learners when the outgoing config is removed. // These nodes are necessarily currently in nodes_joint (or they would have // been added to the incoming config right away). repeated uint64 learners_next = 4; // If set, the config is joint and Raft will automatically transition into // the final config (i.e. remove the outgoing config) when this is safe. bool auto_leave = 5; }

voters_outgoing, learners_next, auto_leave는 Joint consensus 지원을 위한 필드로 이 글에선 Joint consensus에 대한 설명은 생략하도록 하겠습니다.

Snapshot과 SnapshotMetadata

시스템의 가용성을 위해 로그를 무한정 쌓아둘 수 없기 때문에 오래된 로그들은 삭제되어야 하며 제거되기 전 반드시 상태 머신에 반영되어야 합니다.

로그 시퀸스에서 특정 인덱스까지의 로그를 지우는 것을 로그 컴팩션이라고 부르며 해당 인덱스까지 로그 엔트리가 적용된 상태를 기록한 것을 스냅샷이라고 부릅니다.

스냅샷은 이번 포스팅의 핵심 주제로 아래 시나리오 분석에서 자세히 살펴보겠지만 새로 가입한 노드로 클러스터의 상태를 전송하거나, 장애로부터 복구하기 위한 용도로 활용됩니다.

message Snapshot { bytes data = 1; SnapshotMetadata metadata = 2; } message SnapshotMetadata { // The current `ConfState`. ConfState conf_state = 1; // The applied index. uint64 index = 2; // The term of the applied index. uint64 term = 3; }

SnapshotMetadata은 스냅샷이 생성될 당시의 메타 데이터입니다.

구체적으로 각 필드들은 아래와 같은 의미를 갖습니다.

  • conf_state: 스냅샷이 생성될 당시의 클러스터 구성원 정보를 나타냅니다.
  • index: 스냅샷이 생성된 당시 컴팩션이 이뤄진 마지막 로그 엔트리의 인덱스를 나타냅니다.
  • term: 스냅샷 생성된 당시 마지막 로그 엔트리가 갖는 term 값을 나타냅니다.

위와 같은 메타 데이터들은 스냅샷을 활용할 때 로그 일관성을 깨지 않기 위해 필수적인 요소입니다.

예를 들어 스냅샷으로 상태 정보를 복원할 때 스냅샷의 인덱스에 해당하는 로그 엔트리의 term과 스냅샷 메타 데이터의 term이 일치하지 않는 경우 일관성 유지를 위해 스냅샷 적용 요청을 무시해야 합니다.

시나리오 분석

1 - 스냅샷 기록

Raftify에서 스냅샷 생성은 아래와 같은 RaftNode의 make_snapshot()라는 메서드 호출로 이뤄집니다.

특정 인덱스 및 해당 인덱스에서의 로그 엔트리의 term 값을 인자로 넘겨줍니다.

스냅샷에 저장할 데이터는 self.fsm.snapshot() 메서드가 리턴한 데이터로, 현재 상태 머신의 상태에 해당합니다.

💡 self.fsm.snapshot() 메서드는 FSM(Finite State Machine)을 어떻게 저장할 것인지 여부에 따라 다르게 구현될 수 있으므로 Raftify 유저가 구현해 넘겨주어야 하는 구현 중 하나입니다. 예를 들어 인메모리에 FSM을 저장하는 HashStore 예제의 경우 snapshot()은 단순히 HashMap을 직렬화해 리턴합니다.

상태 머신에 적용된 마지막 로그 엔트리의 인덱스 last_appliedcompact()에 넘겨주면 로그 엔트리에서 주어진 인덱스 이전까지의 로그를 삭제합니다.

// lablup/raftify/blob/main/src/raft_node/mod.rs pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> { ... let snapshot_data = self.fsm.snapshot().await?; let last_applied = self.raw_node.raft.raft_log.applied; let store = self.raw_node.mut_store(); store.compact(last_applied)?; store.create_snapshot(snapshot_data, index, term)?; Ok(()) }

create_snapshot()는 넘겨 받은 스냅샷 데이터 data와 함께 스냅샷 메타 데이터들을 기록합니다.

// lablup/raftify/blob/main/src/heed_storage/mod.rs fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> { let store = self.wl(); let mut writer = store.env.write_txn()?; let conf_state = store.conf_state(&writer)?; let mut snapshot = Snapshot::default(); snapshot.set_data(data); let meta = snapshot.mut_metadata(); meta.set_conf_state(conf_state); meta.index = index; meta.term = term; store.set_snapshot(&mut writer, &snapshot)?; writer.commit()?; Ok(()) }

2 - 새로 조인한 노드에 스냅샷 전송

시나리오

클러스터에 새로 조인한 노드는 일관성을 유지하기 위해 기존 클러스터의 상태를 전송받아야 합니다.

하지만 새 노드가 클러스터에 참여할 때마다 모든 로그 엔트리를 하나 하나 복제하는 것은 비효율적인 일입니다. 모든 노드는 같은 상태 머신을 가지기 때문에 모든 로그 엔트리를 전송하는 대신, 로그 엔트리들이 적용된 결과물인 스냅샷만을 전송해 문제를 해결할 수 있으며, 이 때 스냅샷 데이터를 전송하는 메시지의 타입은 MsgSnapshot입니다.

따라서 이 섹션에서는 1번 노드가 리더 노드이고 2번 노드가 새로 조인한 노드라고 가정한 후 MsgSnapshot 메시지와 관련된 코드와 로그를 중심으로 어떤 일이 일어나고 있는지 살펴보도록 하겠습니다.

Raftify에선 새로 조인한 팔로워가 리더 노드에게 별개의 스냅샷 요청을 전송하지 않습니다.

구성 변경 요청(이후 ConfChange) 이 커밋되면 리더가 해당 로그 엔트리를 새로 조인한 노드에 보내려고 시도하고, 새 노드는 이 로그 엔트리를 갖고 있지 않기 때문에 이 MsgAppend 메세지는 거절됩니다.

전편의 시나리오 2에서 네트워크 장애로 인해 MsgAppend 메시지가 거절되었을 때 생기는 노드 사이의 비일관성을 해소하는 시나리오를 다뤘었던 것을 기억하시나요?

해당 시나리오에선 prepare_send_entries()를 통해 불일치하는 로그 엔트리들을 하나씩 동기화 했었습니다. 새로 조인한 노드와의 로그 비일관성을 해소하는 경우는, 단지 로그 엔트리를 하나씩 동기화 하는 대신 스냅샷(prepare_send_snapshot())을 통해 동기화 한다는 점이 다르다고 볼 수 있습니다.

그럼 아래에선 코드 및 로그 분석을 통해 해당 시나리오가 어떤 과정을 통해 일어나고 있는 것인지 자세히 알아보겠습니다.

코드 분석

우선 해당 시나리오와 관련된 코드들 중 리더가 새로 조인한 노드에게 보낸 MsgAppend 메시지가 거절되는 부분부터 살펴보도록 하겠습니다.

maybe_send_append() 코드를 살펴보면 아래와 같습니다. 아래 코드에서 새로 조인한 노드의 progress는 비어 있기 때문에 self.raft_log.term() 호출은 실패하게 되고, prepare_send_snapshot()가 호출되면서 maybe_send_append()false를 리턴합니다 (MsgAppend 거절)

// tikv/raft-rs/blob/master/src/raft.rs fn maybe_send_append( &mut self, to: u64, pr: &mut Progress, allow_empty: bool, msgs: &mut Vec<Message>, ) -> bool { ... let term = self.raft_log.term(pr.next_idx - 1); match (term, ents) { (Ok(term), Ok(mut ents)) => { if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) { return true; } self.prepare_send_entries(&mut m, pr, term, ents) } (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => { // wait for storage to fetch entries asynchronously return false; } _ => { // 💡 이번 시나리오에선 아래 분기가 실행됩니다. // send snapshot if we failed to get term or entries. if !self.prepare_send_snapshot(&mut m, pr, to) { return false; } } } } self.send(m, msgs); true }

호출된 prepare_send_snapshot()는 아래와 같은 함수로, self.raft_log.snapshot() 메서드를 호출해 스냅샷 데이터를 가져온 후 송신할 메시지에 설정합니다.

그 후 해당 노드의 progress 객체를 snapshot 상태라고 표시한 후 리턴합니다.

💡 여기서 노드의 상태가 snapshot 상태라는 것은 해당 노드가 스냅샷 복제 상태이기 때문에 이 노드로의 로그 복제 작업이 잠시 중단될 것임을 나타냅니다.

// tikv/raft-rs/blob/master/src/raft.rs fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool { ... m.set_msg_type(MessageType::MsgSnapshot); let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to); if let Err(ref e) = snapshot_r { if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) { self.logger.debug( format!( "failed to send snapshot to {} because snapshot is temporarily unavailable", to ) .as_str(), ); return false; } self.logger .fatal(format!("unexpected error: {:?}", e).as_str()); } let snapshot = snapshot_r.unwrap(); if snapshot.get_metadata().index == 0 { self.logger.fatal("need non-empty snapshot"); } let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term); m.set_snapshot(snapshot); self.logger.debug(format!( "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}", first_index = self.raft_log.first_index(), committed = self.raft_log.committed, snapshot_index = sindex, snapshot_term = sterm, to = to, progress = format!("{:?}", pr) ).as_str()); pr.become_snapshot(sindex); self.logger.debug( format!( "paused sending replication messages to {}; progress: {:?}", to, pr ) .as_str(), ); true }

따라서 Raftify는 ConfChange가 커밋될 때 1번 시나리오에서 살펴봤었던 RaftNode.make_snapshot() 호출을 통해 새 노드에 전송할 스냅샷을 미리 준비해둡니다.

이렇게 전송된 스냅샷은 새로 조인한 노드의 Raft loop의 Snapshot 핸들링 로직에서 감지되어 복구하게 됩니다. 아래 로직의 self.fsm.restore()을 통해 전송 받은 스냅샷 데이터로 상태 머신을 복구하고, store.apply_snapshot()을 통해 Stable storage에도 적용해줍니다.

// lablup/raftify/blob/main/raftify/src/raft_node/mod.rs async fn on_ready(&mut self) -> Result<()> { ... if *ready.snapshot() != Snapshot::default() { self.logger .info("Restoring state machine and snapshot metadata..."); let snapshot = ready.snapshot(); if !snapshot.get_data().is_empty() { self.fsm.restore(snapshot.get_data().to_vec()).await?; } let store = self.raw_node.mut_store(); store.apply_snapshot(snapshot.clone())?; } ... }

리더 노드 로그 분석

이번엔 새로운 노드가 조인 했을 때 리더 노드에 출력되는 로그들을 하나씩 순서대로 분석해보겠습니다.

  1. 1번 노드는 2번 노드로부터 조인 요청을 받고 클러스터 구성이 변경됩니다.
Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter. Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
  1. 리더에 새로운 로그 엔트리가 추가되었기 때문에 2번 노드에 이 로그 엔트리를 복제하기 위해 MsgAppend 메시지를 송신합니다.
Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. 그러나 새로 조인한 노드는 기존 클러스터의 정보를 갖고 있지 못하기 때문에 이 MsgAppend 메시지는 거절되며 1번 노드는 아래와 같이 요청이 거절되었다는 메시지를 받게 됩니다.
Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8 Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. 위에서 설명한것 처럼 새로 조인한 노드의 progress는 비어 있으므로, 스냅샷을 Stable storage에 저장하고 해당 인덱스까지의 로그 엔트리들을 제거하게 됩니다. 이 경우엔 8 이전까지의 로그 엔트리들이 제거되었으며 2번 노드의 조인 요청에 해당하는 로그 엔트리의 인덱스는 9입니다. 따라서 아래와 같이 first_index8이며, commit9라는 로그와 함께 스냅샷 메세지가 전송됩니다.
Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. 스냅샷 전송을 위해 로그 엔트리 복제를 중단합니다.
Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
  1. 스냅샷을 전송하는 MsgSnapshot 타입의 메시지를 송신합니다. 스냅샷엔 이전에 임의로 넣어 놓은 data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"} 라는 데이터가 들어 있는 것을 확인할 수 있습니다
Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }

팔로워 노드 로그 분석

새로 조인한 팔로워 노드에 출력되는 로그를 분석해보면 아래와 같습니다.

  1. term 1에서 새로운 팔로워 노드가 됩니다.
Apr 15 06:37:27.421 INFO became follower at term 1
  1. 리더 노드로부터 온 MsgAppend 메세지를 거절합니다.
Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0) Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. 해당 노드가 장애 상태로 감지되어 불필요한 투표가 일어나선 안 되기 때문에 MsgHeartbeat 메시지엔 정상 응답해야 합니다.
Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
  1. MsgSnapshot 메시지를 통해 스냅샷을 전송 받습니다.
Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1] Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
  1. 전송받은 스냅샷을 통해 상태를 복구합니다.
Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1 Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1] Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 } Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata... Apr 15 06:37:27.425 DEBG snapshot's persisted index 9

3 - 대다수(Majority) 이상의 노드에 장애가 생긴 경우 복구

특정 노드에 장애가 발생한 경우 해당 노드는 단지 네트워크가 복구된 후 리더 노드로부터 새 로그 엔트리들을 복제 받으면 되기 때문에 문제가 되지 않습니다. 노드가 새로 조인해야 하는 경우에도 2번 시나리오에서 다뤘듯이 스냅샷을 통해 상태를 복구할 수 있으므로 문제가 되지 않습니다.

하지만 쿼럼 이상의 노드에 장애가 발생한 경우 클러스터는 스스로 장애를 복구할 수 없습니다.

이 경우 관리자가 수동으로 개입해 어떤 노드의 로그 시퀸스를 정상 상태로 볼 것인지 결정한 후 해당 로그 시퀸스로부터 다시 클러스터를 부트스트랩 해 주어야 합니다.

이 때 관리자의 판단에 따라 상태 머신에 모든 로그 엔트리를 하나 하나 직접 적용해가며 복구하거나 마지막으로 생성된 스냅샷으로부터 상태를 복구해야 합니다.

WAL 스냅샷에서의 상태 복구

해당 섹션에선 직접 Raftify의 예제 코드를 사용합니다.

예제를 재현하기 위해 1번 노드에 간단하게 몇 개의 키값을 넣어준 후 /snapshot API를 통해 make_snapshot() 메서드를 호출해 스냅샷을 생성해줍니다. 그리고 노드에 장애가 일어났다고 가정하고 종료해볼 것입니다.

WAL 스냅샷으로부터 복구하기 위해선 restore_wal_snapshot_from 라는 옵션에 복구할 노드의 node_id를 넘겨주면 됩니다. 여기선 1번 노드의 스냅샷으로 복구할 것이므로 1을 넣어주면 됩니다.

그리고 로그 엔트리의 적용 여부를 확인하기 위해 apply()가 호출될 때 마다 "Inserted: (key, value)"와 같은 로그를 남겨보도록 하겠습니다.

💡 apply() 역시 restore()와 마찬가지로 Raftify 유저가 정의해야 하는 StateMachine의 추상 메서드들 중 하나로 로그 엔트리가 커밋되는 시점에 호출됩니다.

스냅샷을 찍고 1번 노드를 종료한 후 Raftify가 제공하는 CLI 명령어를 사용해 스토리지를 덤프해보면 아래와 같습니다.

아래 로그를 통해 스토리지 내에 스냅샷이 저장되어 있고 { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}와 같은 데이터를 갖고 있다는 것을 알 수 있습니다.

❯ raftify-cli debug persisted-all ./logs *----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8

그 후 ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1 명령어를 통해 1번 노드를 다시 부트스트랩 시켜 봅시다.

이 때 1번 노드에 출력되는 로그는 아래와 같습니다. 상태를 스냅샷으로부터 바로 복구하므로 각 로그 엔트리들에 대한 apply()는 한 번도 실행되지 않았습니다.

Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), } Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:54:44.705 INFO became follower at term 3 Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:54:44.705 INFO RawNode created with id 1. Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...

그리고 다시 스토리지를 덤프 해 봅시다.

자기 자신의 스냅샷으로부터의 복구이기 때문에 아무런 상태 변화도 일어나지 않은 것을 확인할 수 있습니다.

*----- node-1 -----* ---- Persisted entries ---- Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 1, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }" Last index: 8

WAL 로그에서의 상태 복구

이번에는 특정 로그 시퀸스로부터 상태를 복구해봅시다.

이번엔 아래 로그와 같이 스토리지에 스냅샷은 비어 있으며 대신 상태를 복구하기 위한 로그 엔트리들이 저장되어 있습니다.

*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8

이전 섹션에서와 마찬가지로 장애를 가정해 1번 노드를 종료하고 다시 부트스트랩 했을 때 어떤 일이 일어나는지 살펴보겠습니다.

1번 노드를 종료한 후 ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1 명령어로 1번 노드를 다시 부트스트랩 시켜 봅시다.

1번 노드에 아래와 같은 로그가 출력되며 이전에 입력한 로그 엔트리들이 한 번에 apply() 되며 이전의 상태를 복구하는 것을 알 수 있습니다.

Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, } Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false } Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0 Apr 15 07:46:50.712 INFO became follower at term 1 Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } } Apr 15 07:46:50.712 INFO RawNode created with id 1. Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"... Apr 15 07:46:50.855 DEBG Entries [1, 9) requested. // 하나씩 로그 엔트리들을 apply하며 상태 머신 상태를 복구해나감 Inserted: (1, A) Inserted: (1, A) Inserted: (1, A) Inserted: (2, A) Inserted: (3, A) Inserted: (4, A) Inserted: (5, A)

이번에도 마찬가지로 자기 자신의 크러시 이전 상태를 복구한 것이므로 스토리지를 덤프해보면 이전과 같습니다. 다른 점은 이전엔 스냅샷을 통해 빠르게 상태를 복구한 것에 비해 모든 로그 엔트리들을 하나 하나 적용했다는 점입니다.

*----- node-1 -----* ---- Persisted entries ---- Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }" Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }" Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }" Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }" Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }" Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }" Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }" Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }" ---- Metadata ---- HardState { term: 2, vote: 1, commit: 8 } ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false } "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }" Last index: 8

마무리

이번 글에선 지난 편에 이어 스냅샷을 중심으로 새로 조인한 노드가 있을 때 로그 비일관성 해소 문제와 장애 복구 시나리오에 대해 알아보았습니다.

Raftify는 2024 오픈소스 컨트리뷰션 아카데미에 참여형 프로젝트로 참가해 분산 시스템 구현에 관심이 있는 멘티 분들을 모집하고 있습니다! (모집 기간: ~ 06.23)

참가자분들은 멘토들과 함께 분산 시스템의 기본 개념 학습부터 실제 구현 과정까지 경험해 볼 수 있습니다.

많은 관심 부탁드립니다! 감사합니다 😊

도움이 필요하신가요?

내용을 작성해 주시면 곧 연락 드리겠습니다.

문의하기

Headquarter & HPC Lab

서울특별시 강남구 선릉로100길 34 남영빌딩 4층, 5층

© Lablup Inc. All rights reserved.