엔지니어링
- 실제로 동작하는 Raft 구현체 뜯어 보기 - 2
- 실제로 동작하는 Raft 구현체 뜯어 보기 - 1
- Introduce raftify: 확장성에 초점을 맞추어 개발된 하이 레벨의 Raft 프레임워크
이 글에선 독자들이 Raft에 관한 이론적인 배경지식이 있다고 가정하고, tikv/raft-rs 코드를 샅샅이 훑어 보며 어떻게 실제로 분산 시스템에서의 상태 머신이 동기화되고 동작하게 되는지 몇 개의 간략한 시나리오에 걸쳐 알아보겠습니다.
이 글은 raft-rs 코드 분석에 초점을 맞추고 있지만, raft-rs 구현체가 유연성을 위해 네트워크 및 스토리지 계층을 포함하지 않기 때문에 온전한 이해를 위해 일부 섹션에서 raftify 소스 코드를 예제로 사용합니다.
💡 raftify는 Lablup에서 개발한 하이레벨의 Raft 구현체입니다. 이 글에선 raftify에 대해선 raft의 동작 방식을 이해하기 위한 최소한의 코드만을 설명합니다. raftify에 대해 궁금하시다면 해당 포스팅을 참고해보세요.
타입을 중심으로 살펴보는 raft-rs 아키텍처
시나리오를 살펴 보기에 앞서 코드 베이스에서 사용되는 대표적인 타입들을 중심으로 아키텍쳐를 대략적으로 살펴봅시다.
Raft
각 Raft 노드들의 Raft
객체는 메시지 큐 msgs
를 메모리에 들고 있으며 이 큐를 통해 다른 Raft 노드들과 상호작용합니다.
raftify와 같은 하이레벨의 구현체에서 네트워크 계층은 후에 설명할 추상화 계층을 통해 이 큐에 메시지를 넣는 역할을 하게 됩니다.
따라서 이 메시지 큐는 통신의 엔드 포인트로 볼 수 있으며, Raft
구현체는 현재 상태에 따라 이 메시지들을 처리해 나가며 노드 간의 일관된 상태를 유지합니다.
이 Raft 노드의 상태에 해당하는 데이터들을 들고 있는 것이 RaftCore
타입입니다.
또한 다른 Raft 노드들과의 로그 엔트리들을 동기화하기 위한 메타 데이터들을 담는 Progress
란 타입이 있으며, 이것들은 상황에 따라 ProgressTracker
에서 적절하게 업데이트 됩니다.
결과적으로 Raft는 아래와 같은 타입이 됩니다.
pub struct Raft<T: Storage> {
pub msgs: Vec<Message>,
pub r: RaftCore<T>,
prs: ProgressTracker,
}
RaftLog
RaftCore
가 갖는 대표적인 데이터로 로그 엔트리 시퀸스에 대한 접근을 추상화하는 RaftLog
가 있습니다.
RaftLog<T: Storage>
는 Unstable
과 T
타입을 함께 다룰 수 있도록 추상화합니다. 여기서 T
는 raftify와 같은 보다 높은 레벨에서 구현해 넣어 주어야 하는 영속적인 스토리지에 해당하며, Unstable
은 이 스토리지에 기록되기 전 거치는 버퍼입니다.
pub struct RaftLog<T: Storage> {
pub store: T,
pub unstable: Unstable,
...
}
💡 RaftCore 타입에 대해 더 궁금하다면 이 링크를 참고하세요.
Raft Loop
Raft 구현체들은 다른 Raft 노드들과 통신하며 일관된 상태를 유지하기 위해 무한 루프를 돌며 자신의 상태 머신을 업데이트 하는 반복적인 프로세스를 수행합니다. 이 글에선 이러한 루프를 Raft loop라고 부르겠습니다.
raftify에서 Raft loop를 구현하는 소스 코드는 아래와 같습니다.
(가장 minimal한 구현을 보고 싶다면 tikv/raft-rs의 예제 코드를 참고하실 수도 있습니다.)
async fn on_ready(&mut self) -> Result<()> {
if !self.raw_node.has_ready() {
return Ok(());
}
let mut ready = self.raw_node.ready();
if !ready.messages().is_empty() {
self.send_messages(ready.take_messages()).await;
}
if *ready.snapshot() != Snapshot::default() {
slog::info!(
self.logger,
"Restoring state machine and snapshot metadata..."
);
let snapshot = ready.snapshot();
if !snapshot.get_data().is_empty() {
self.fsm.restore(snapshot.get_data().to_vec()).await?;
}
let store = self.raw_node.mut_store();
store.apply_snapshot(snapshot.clone())?;
}
self.handle_committed_entries(ready.take_committed_entries())
.await?;
if !ready.entries().is_empty() {
let entries = &ready.entries()[..];
let store = self.raw_node.mut_store();
store.append(entries)?;
}
if let Some(hs) = ready.hs() {
let store = self.raw_node.mut_store();
store.set_hard_state(hs)?;
}
if !ready.persisted_messages().is_empty() {
self.send_messages(ready.take_persisted_messages()).await;
}
let mut light_rd = self.raw_node.advance(ready);
if let Some(commit) = light_rd.commit_index() {
let store = self.raw_node.mut_store();
store.set_hard_state_commit(commit)?;
}
if !light_rd.messages().is_empty() {
self.send_messages(light_rd.take_messages()).await;
}
self.handle_committed_entries(light_rd.take_committed_entries())
.await?;
self.raw_node.advance_apply();
Ok(())
}
RawNode
각 Raft 노드들은 Raft 모듈을 포함하는 RawNode
란 타입의 좀 더 하이레벨의 인스턴스를 갖습니다.
RawNode
는 메모리에만 유지되는 상태인 SoftState
, 영속적인 스토리지에 저장되는 상태인 HardState
와 아직 저장되지 않은 Ready
의 메타 데이터들을 나타내는 records
필드를 갖고 있습니다.
💡 Ready란 Raft 노드를 업데이트 해야 할 필요가 있을 때 갱신되어야 할 데이터들을 한꺼번에 넘겨주는 자료구조입니다.
pub struct RawNode<T: Storage> {
pub raft: Raft<T>,
prev_ss: SoftState,
prev_hs: HardState,
max_number: u64,
records: VecDeque<ReadyRecord>,
commit_since_index: u64,
}
ready()
메서드가 호출될 때 Ready
의 메타 데이터가 records
에 저장되고, 저장되어야 하는 스냅샷, 로그 엔트리 등이 모두 처리된 후 함수의 마지막 부분인 RawNode::advance()
에서 RawNode::commit_ready()
를 호출하며 버퍼 Unstable
의 스냅샷, 엔트리를 비웁니다.
RaftNode
RaftNode
는 raftify에서 RawNode
를 네트워크, 스토리지 계층과 통합해 좀 더 하이 레벨에서 추상화하는 타입입니다.
raftify는 별개의 비동기 태스크에서 gRPC 클라이언트에서 보낸 메시지들을 수신하여, 채널을 통해 RaftNode.run()
태스크에 이 메시지들을 넘겨줍니다.
메시지를 처리하고 난 후엔 on_ready()
란 이름의 메서드(Raft loop)에서 상태 변경을 처리합니다.
pub async fn run(mut self) -> Result<()> {
let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval);
let fixed_tick_timer = tick_timer;
let mut now = Instant::now();
loop {
...
tokio::select! {
msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => {
if let Ok(Some(msg)) = msg {
self.handle_server_request_msg(msg).await?;
}
}
...
}
let elapsed = now.elapsed();
now = Instant::now();
if elapsed > tick_timer {
tick_timer = Duration::from_millis(100);
self.raw_node.tick();
} else {
tick_timer -= elapsed;
}
self.on_ready().await?
}
}
좀 더 raftify의 구현에 대해 자세히 설명해보자면 raftify는 아래와 같은 과정을 반복 처리합니다.
- 클라이언트에서 요청 생성. (예를 들어
RaftServiceClient.propose()
나RaftNode.propose()
를 호출) - gRPC를 통해 원격 Raft 노드의
RaftServiceClient.propose()
가 호출됨. RaftServiceClient.propose()
가 채널을 통해Propose
메시지를RaftNode.run()
비동기 태스크로 넘김.- 메시지 큐를 폴링하던
RaftNode.run()
은Propose
메시지가 들어오면RawNode.propose()
호출. - 상태 머신에 적용되어야 하는 변경 사항이 생기면
Ready
인스턴스가 생성되어on_ready()
핸들러로 전달됨. on_ready()
핸들러에서 커밋된 엔트리들을 처리한 후 클라이언트에 응답함.
이론적인 내용들은 이쯤에서 마무리하고 시나리오 몇 개를 분석해보며 어떤 일들이 일어나는지 살펴봅시다.
💡 이 단락에서 Propose 메시지라고 임의로 칭한 것은 클러스터에 상태 변경을 제안하기 위한 목적으로 정의된 타입의 메시지입니다.
시나리오 분석
1 - 새 로그 엔트리 추가
리더 노드 분석
상태 머신을 변경하기 위해 클러스터에 변경 사항을 요청하면 (propose) 내부에서 어떤 일이 일어날까요?
이 섹션에선 RawNode.propose()
를 호출했을 때 어떤 과정을 거치게 되는지 하나씩 분석해보겠습니다.
RawNode.propose()
함수를 살펴보면 아래와 같습니다.
pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgPropose);
m.from = self.raft.id;
let mut e = Entry::default();
e.data = data.into();
e.context = context.into();
m.set_entries(vec![e].into());
self.raft.step(m)
}
위 코드를 통해 propose()
함수는 step()
을 호출해 MsgPropose
타입의 메시지를 처리하도록 만드는 것을 알 수 있습니다.
여기서 step()
은 raft-rs의 실질적인 메시지 핸들러에 해당하는 함수입니다. step()
을 호출한 노드가 리더인 경우 step_leader()
, 팔로워인 경우 step_follower()
, 후보자인 경우 step_candidate()
가 호출됩니다.
step()
의 코드를 모두 이해하는 것은 다소 복잡하기 때문에 여기선 리더 노드에서 MsgPropose
타입이 어떻게 처리되는지 코드를 따라가봅시다.
fn step_leader(&mut self, mut m: Message) -> Result<()> {
...
match m.get_msg_type() {
MessageType::MsgPropose => {
...
if !self.append_entry(m.mut_entries()) {
...
}
self.bcast_append();
return Ok(());
}
...
}
}
Raft.append_entry()
는 RaftLog.append()
를 호출해 엔트리들을 추가합니다. RaftLog.append()
는 self.unstable.truncate_and_append()
에서 Unstable 버퍼에 엔트리들을 추가합니다. 버퍼에 추가된 엔트리들은 Raft loop에서 Stable storage에 persist 될 것입니다.
pub fn append(&mut self, ents: &[Entry]) -> u64 {
...
self.unstable.truncate_and_append(ents);
self.last_index()
}
그 다음으로 호출되는 bcast_append()
에 대해 살펴보도록 하겠습니다.
이전 섹션에서 설명한, 리더와 팔로워들의 로그 엔트리들을 동기화하기 위한 ProgressTracker
(prs
)를 통해 각 팔로워의 progress
를 인자로 RaftCore.send_append()
를 호출하는 것을 볼 수 있습니다.
pub fn bcast_append(&mut self) {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
}
send_append()
는 아래와 같은 간략한 구조를 갖고 있습니다.
fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) {
self.maybe_send_append(to, pr, true, msgs);
}
maybe_send_append()
는 RaftLog.entries
를 통해 pr.next_idx ~ to 범위의 로그 엔트리들을 읽어온 후 prepare_send_entries()
에 넘겨주며 성공하면 true, 실패하면 false를 리턴합니다.
fn maybe_send_append(
&mut self,
to: u64,
pr: &mut Progress,
allow_empty: bool,
msgs: &mut Vec<Message>,
) -> bool {
...
let ents = self.raft_log.entries(
pr.next_idx,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to,
term: self.term,
aggressively: !allow_empty,
}),
);
...
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
...
}
...
self.send(m, msgs);
true
}
prepare_send_entries()
는 메시지 객체 m을 MsgAppend
타입으로 만들고 엔트리들을 메시지에 넣어줍니다. 그 후 progress
를 업데이트 해 준 후 리턴합니다.
fn prepare_send_entries(
&mut self,
m: &mut Message,
pr: &mut Progress,
term: u64,
ents: Vec<Entry>,
) {
m.set_msg_type(MessageType::MsgAppend);
m.index = pr.next_idx - 1;
m.log_term = term;
m.set_entries(ents.into());
m.commit = self.raft_log.committed;
if !m.entries.is_empty() {
let last = m.entries.last().unwrap().index;
pr.update_state(last);
}
}
그리고 self.send(m, msgs)
에서 이 준비한 메시지를 msgs
메시지 큐에 넣어 줍니다.
fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
...
msgs.push(m);
}
메시지 큐에 들어간 MsgAppend
메시지는 네트워크 계층을 통해 send_messages()
에서 팔로워 노드로 전송되게 됩니다. 따라서, 우리는 팔로워 노드가 MsgAppend
메시지를 받은 후 어떻게 처리하는지를 봐야 합니다.
팔로워 노드 분석
다음으로 팔로워 노드에서 일어나는 일을 살펴보면 아래와 같습니다. 팔로워 노드에서 MsgAppend
메시지를 수신했을 때 일어나는 일을 알아보려면 step_follower()
를 보면 됩니다.
fn step_follower(&mut self, mut m: Message) -> Result<()> {
match m.get_msg_type() {
...
MessageType::MsgAppend => {
self.election_elapsed = 0;
self.leader_id = m.from;
self.handle_append_entries(&m);
}
...
}
}
위 코드를 통해 MsgAppend
메시지를 수신한 팔로워 노드가 handle_append_entries()
를 호출하고 있는 것을 알 수 있습니다.
이 함수는 아래처럼 MsgAppendResponse
타입의 메시지인 to_send
를 만들고 RaftLog.maybe_append()
를 호출합니다.
pub fn handle_append_entries(&mut self, m: &Message) {
...
let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);
if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
{
...
// MsgAppend 메시지를 수신
} else {
...
// MsgAppend 메시지를 거절
}
...
self.r.send(to_send, &mut self.msgs);
}
이 함수는 아래처럼 match_term()
을 호출해 메시지의 logTerm
과 로그 엔트리의 term
값이 같은지 확인하고, find_conflict()
를 호출해 로그 엔트리 시퀸스에 충돌이 있는지 검사한 후 문제가 없다고 판단하면 Raft.append()
를 호출합니다.
pub fn maybe_append(
&mut self,
idx: u64,
term: u64,
committed: u64,
ents: &[Entry],
) -> Option<(u64, u64)> {
if self.match_term(idx, term) {
let conflict_idx = self.find_conflict(ents);
if conflict_idx == 0 {
} else if conflict_idx <= self.committed {
fatal!(
self.unstable.logger,
"entry {} conflict with committed entry {}",
conflict_idx,
self.committed
)
} else {
let start = (conflict_idx - (idx + 1)) as usize;
self.append(&ents[start..]);
if self.persisted > conflict_idx - 1 {
self.persisted = conflict_idx - 1;
}
}
let last_new_index = idx + ents.len() as u64;
self.commit_to(cmp::min(committed, last_new_index));
return Some((conflict_idx, last_new_index));
}
None
}
우리는 이 함수를 본 적이 있습니다. 리더 노드에서 로그 엔트리가 제안되었을 때 RaftLog.append()
의 호출 전 마지막으로 호출된 함수였죠.
이전과 마찬가지로 Raft.append_entry()
는 RaftLog.append()
를 호출해 엔트리들을 추가합니다. RaftLog.append()
는 self.unstable.truncate_and_append()
에서 Unstable 버퍼에 엔트리들을 append 합니다.
이것으로 리더에 추가된 로그가 리더 노드에 persist 되고 팔로워 노드에 복사되는 시나리오를 간략하게 알아보았습니다.
2 - 리더와 팔로워 노드 로그 시퀀스 불일치 시
우리는 시나리오 1에서 정상적인 상황을 가정하고 코드를 들여다보았습니다. 하지만 실제로는 네트워크 단절 등의 이슈로 리더 노드와 팔로워 노드에 불일치가 생길 수 있습니다. 이번엔 리더 노드와 팔로워 노드 사이에 불일치가 생겼을 때 이를 어떻게 감지하고 해소하는지를 중심으로 다시 한번 코드를 들여다보겠습니다.
3개의 노드로 이루어진 클러스터가 연속해서 상태 머신을 변경하는 수천 개의 요청을 처리하다가 네트워크 장애가 발생했다고 가정해봅시다.
장애가 발생한 경우 코드부터 보는 게 아니라 우선 노드들에 출력된 로그들과 persist된 로그 엔트리들, 디버깅 정보들을 먼저 들여다 보며 맥락을 파악하는 것부터 시작해야 하지만, 글이 지나치게 장황해지는 것을 피하기 위해 노드들에 어떤 일들이 발생하고 있는지 대략적으로 파악하게 해 줄 로그만 골라서 분석해보겠습니다.
우선 3번 노드에선 메시지를 reject 했음을 나타내는 rejected msgApp... 로그를 남기고 있습니다.
Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7
위 로그를 통해 3번 노드는 팔로워 노드, 2번 노드가 장애가 발생한 후 새로 선출된 리더 노드이며 3641 번째 엔트리를 복제하려는 MsgAppend
메시지가 거절되었다는 것을 알 수 있습니다.
이 로그가 어떤 함수에서 출력된 것인지 찾아보면, 시나리오 1에서 살펴 보았었던 handle_append_entries()
에서 호출하는 것을 알 수 있는데요. (팔로워가 리더로부터 받은 MsgAppend
메시지를 처리하는 함수)
pub fn handle_append_entries(&mut self, m: &Message) {
...
let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);
...
if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
{
...
} else {
debug!(
self.logger,
"rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
from {from}",
msg_log_term = m.log_term,
msg_index = m.index,
from = m.from;
"index" => m.index,
"logterm" => ?self.raft_log.term(m.index),
);
let hint_index = cmp::min(m.index, self.raft_log.last_index());
let (hint_index, hint_term) =
self.raft_log.find_conflict_by_term(hint_index, m.log_term);
if hint_term.is_none() {
fatal!(
self.logger,
"term({index}) must be valid",
index = hint_index
)
}
to_send.index = m.index;
to_send.reject = true;
to_send.reject_hint = hint_index;
to_send.log_term = hint_term.unwrap();
}
to_send.set_commit(self.raft_log.committed);
self.r.send(to_send, &mut self.msgs);
}
함수를 살펴보면 이 로그가 출력되었다는 것에서 maybe_append()
가 None을 리턴했다는 것, 즉 match_term()
이 False를 반환했다는 것을 알 수 있습니다. 이것은 메시지의 logTerm
과 3641번 엔트리의 term
값에 불일치가 발생했다는 것을 의미합니다.
따라서 term
을 통해 충돌한 지점을 찾고 (find_conflict_by_term()
) 충돌한 지점(hint_index
)을 메시지의 reject_hint
에 넣어 리더에 MsgAppendResponse
메시지 형태로 되돌려 줍니다.
그럼 리더는 이 거절된 MsgAppendResponse
메시지를 어떻게 처리할까요?
메시지를 거절한 리더 노드는 아래와 같은 MsgAppend
가 거절했다는 로그를 남기게 됩니다.
Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611
따라서 우리가 그 다음으로 들여다 보아야 하는 것은 이 거절된 MsgAppend
메시지를 받은 후 "received msgAppend rejection"를 출력하는 함수입니다.
이 함수는 handle_append_response()
인데요, 함수 자체는 꽤 길지만 MsgAppend
메시지가 reject 되었을 때의 처리만 잘라놓고 보면 그리 길지 않습니다.
fn handle_append_response(&mut self, m: &Message) {
let mut next_probe_index: u64 = m.reject_hint;
...
if m.reject {
debug!(
self.r.logger,
"received msgAppend rejection";
"reject_hint_index" => m.reject_hint,
"reject_hint_term" => m.log_term,
"from" => m.from,
"index" => m.index,
);
if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
debug!(
self.r.logger,
"decreased progress of {}",
m.from;
"progress" => ?pr,
);
if pr.state == ProgressState::Replicate {
pr.become_probe();
}
self.send_append(m.from);
}
return;
}
...
}
메시지의 reject_hint
를 가져와 next_probe_index
로 만들고, Progress.maybe_decr_to()
를 호출해 progress를 감소시킵니다. Progress
가 Probe 상태임을 표시하고, send_append()
를 호출해 다시 MsgAppend
메시지를 보내줍니다.
💡 ProgressState는 각 노드들의 동기화 진행 상태를 나타내는 enum 입니다. 정상적인 상황, 로그를 복제하고 있는 상태에선 "Replicate" 이며, 복제된 마지막 인덱스를 모르고 있는 팔로워 노드는 조사 중인 상태란 의미로 "Probe", 스냅샷 전송을 통해 팔로워에 로그를 복제 중인 경우 "Snapshot"입니다.
요약하자면 충돌이 발생하기 전 로그 엔트리의 인덱스 (next_probe_index
)를 찾기 위해 해당 노드의 progress
를 감소시키고 다시 MsgAppend
메시지를 보낸다는 것입니다. 이 과정은 리더와 팔로워 노드의 Common log prefix를 찾게 될 때까지 반복됩니다.
Common log prefix를 찾게 되면, 해당 인덱스 이후의 로그 엔트리들은 리더로부터 팔로워로 단방향으로 복제되어 덮어쓰게 됩니다. 이 과정은 maybe_send_append()
함수에서 확인할 수 있습니다.
아래와 같이 RaftLog.entries
를 통해 얻어진 로그 엔트리들이 SendAppend
컨텍스트로 복제됩니다. 이때 max_msg_size
는 Config의 max_size_per_msg
이며, 이 값의 디폴트 값은 0
입니다. RaftLog.entries
를 통해 LMDBStorage.entries()
의 (RaftLog
의 T에 해당하는 persistent 스토리지 타입) 인자의 max_size
에 0
이 주어지는데, 이 주석을 토대로 생각해보면 이것의 의미는 따로 설정해 주지 않으면 리더와 팔로워 노드의 로그에 불일치가 발생했을 때 로그 엔트리를 한 개씩 동기화하란 의미임을 알 수 있습니다.
이후엔 이전 섹션에서 설명한 것과 같이 prepare_send_entries()
를 통해 MsgAppend
메시지를 준비하고, Raft.send()
를 통해 팔로워 노드로 로그 엔트리들을 복제하기 위한 메시지가 전달됩니다.
fn maybe_send_append(
&mut self,
to: u64,
pr: &mut Progress,
allow_empty: bool,
msgs: &mut Vec<Message>,
) -> bool {
...
let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
...
} else {
let ents = self.raft_log.entries(
pr.next_idx,
self.max_msg_size,
GetEntriesContext(GetEntriesFor::SendAppend {
to,
term: self.term,
aggressively: !allow_empty,
}),
);
...
let term = self.raft_log.term(pr.next_idx - 1);
match (term, ents) {
(Ok(term), Ok(mut ents)) => {
if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
return true;
}
self.prepare_send_entries(&mut m, pr, term, ents)
}
...
}
}
self.send(m, msgs);
true
}
중간에 많은 로그들이 생략되어 있지만 리더와 팔로워 사이에 3612 번째 엔트리부터 3642번째 엔트리 까지 위와 같은 과정을 거쳐 동기화가 일어난 후, 팔로워 노드로의 복제가 모두 끝나면 해당 팔로워의 ProgressState
가 Replicate
로 변하며, 정상적으로 Heartbeat
메시지를 주고 받기 시작하는 것을 알 수 있습니다.
2023-11-28 14:30:59,269 - INFO - Entries [3612, 3643) requested
Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
2023-11-28 14:30:59,259 - INFO - Entries [3613, 3643) requested
Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
2023-11-28 14:30:59,259 - INFO - Entries [3614, 3643) requested
Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
2023-11-28 14:30:59,259 - INFO - Entries [3615, 3643) requested
Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
...
2023-11-28 14:30:59,284 - INFO - Entries [3641, 3643) requested
Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
2023-11-28 14:30:59,284 - INFO - Entries [3642, 3643) requested
Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2
Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
2023-11-28 14:31:01,637
3 - 리더 선출
시나리오 2에서 네트워크 장애로 인해 리더 선출이 일어났었던 것을 term
값의 증가를 통해 확인할 수 있었는데요, 이 시나리오에선 이 리더 선출 과정에 대해 자세히 들여다보도록 하겠습니다.
리더에 장애가 생긴 경우 어떤 로그들이 찍히게 되는지 확인하기 위해 간단하게 3개의 노드로 이뤄진 클러스터를 만들고 리더 프로세스를 강제로 종료시켜 본 후 새로 리더로 선출되는 프로세스의 로그를 들여다보겠습니다.
로그의 내용을 요약해보자면 리더 노드가 종료된 후, 3번 노드에서 선거를 시작하고 후보자(Candidate) 상태로 전이한 후 다른 voter들에게 MsgRequestVote
메시지를 보냅니다. 2번 노드로부터 MsgRequestVoteResponse
메시지를 받고, 자신은 본인에게 투표하기 때문에 과반수 이상의 투표를 받게 되어 새 리더로 선출된 후 term 값을 2로 증가시키고 자신이 리더로 선출되었음을 알리기 위한 특수한 종류의 메시지(Empty MsgAppend
)를 보내는 과정이라고 요약할 수 있습니다.
💡 election_tick 만큼 heartbeat 메시지를 받지 못한 팔로워 노드가 투표를 시작하게 됩니다. 이 때 투표 분열(Split vote)을 방지하기 위해 election_tick은 매번 min_election_tick ~ max_election_tick 사이에서 무작위 값으로 결정됩니다. 따라서 리더 노드가 종료된 후 나머지 두 노드들 중 어떤 노드라도 리더 노드가 될 수 있으며 이는 더 작은 election_tick을 가진 노드로 선출됩니다.
Nov 29 01:30:30.210 INFO starting a new election, term: 1
Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16
Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2
Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote
2023-11-29 10:30:30,217 - WARNING - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused"
debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}"
>
2023-11-29 10:30:30,222 - DEBUG - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }
Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true
Nov 29 01:30:30.223 TRCE ENTER become_leader
Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10
Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }
Nov 29 01:30:30.223 INFO became leader at term 2, term: 2
Nov 29 01:30:30.223 TRCE EXIT become_leader
Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
그럼 이제 로그 내용을 바탕으로 코드에서 어떤 일들이 벌어지고 있는지 알아 봅시다.
우선 "starting a new election" 이라는 로그를 출력하고 있는 함수는 hup()
입니다.
hup()
는 step()
의 MsgHup
타입, step_follower()
의 MsgTimeoutNow
타입의 메시지에 대한 처리 과정에서 호출됩니다.
여기서 MsgTimeoutNow
메시지는 Leader election이 아닌, Leader transfer에 사용되는 메시지 타입입니다. 즉 리더가 MsgTransferLeader
메시지를 받게 되면 팔로워들에게 MsgTimeoutNow
타입의 메시지를 전송하게 되고 transfer_leader
플래그를 True로 둔 채 hup()
함수가 실행되게 됩니다. Leader election은 리더 장애 등의 상황으로 리더를 새로 선출하는 과정이지만, Leader transfer은 리더 프로세스가 다른 팔로워 프로세스에게 리더를 양도하는 과정입니다.
그러므로 우리가 지금 따라가보아야 하는 메시지는 MsgHup
임을 알 수 있습니다. election_tick
이 지났는데도 Heartbeat를 받지 못했기 때문에 리더 선출을 시작했다는 점을 통해 MsgHup
메시지를 넣어준 것이 아래 tick_election()
함수인 것을 추측해 볼 수 있습니다.
RaftNode
에서 tick_timer
마다 self.raw_node.tick()
을 호출했던 것을 기억하시나요? 이 RawNode.tick()
을 통해 노드가 election_elapsed
가 randomized_election_timeout
를 경과한 경우 자기 자신에게 MsgHup
메시지를 step 하게 되는 것입니다. (여기서 election_elapsed
을 랜덤화하는 것은 모든 노드가 동시에 투표를 시작해 모든 노드가 자기 자신에게 투표하는 상황을 방지하기 위한 것입니다.)
// raw_node.rs
pub fn tick(&mut self) -> bool {
self.raft.tick()
}
// raft.rs
pub fn tick(&mut self) -> bool {
match self.state {
StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
self.tick_election()
}
StateRole::Leader => self.tick_heartbeat(),
}
}
// raft.rs
pub fn tick_election(&mut self) -> bool {
self.election_elapsed += 1;
if !self.pass_election_timeout() || !self.promotable {
return false;
}
self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
let _ = self.step(m);
true
}
// raft.rs
pub fn step(&mut self, m: Message) -> Result<()> {
...
match m.get_msg_type() {
...
MessageType::MsgHup => {
self.hup(false)
},
}
}
// raft.rs
pub fn pass_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
}
hup()
함수는 간단하게 요약해보자면 아래처럼 campaign()
함수를 CAMPAIGN_ELECTION
타입으로 실행합니다.
fn hup(&mut self, transfer_leader: bool) {
...
info!(
self.logger,
"starting a new election";
"term" => self.term,
);
...
self.campaign(CAMPAIGN_ELECTION);
}
campaign()
함수는 아래처럼 자신의 상태를 Candidate
상태로 전이시킨 후 투표를 시작합니다.
우선 self_id
는 이름대로 노드 자신의 id입니다. 따라서 self.poll(self_id, vote_msg, true)
는 자기 자신에게 투표한다는 의미입니다.
이 결과가 VoteResult::Won
인 경우 그대로 투표에서 승리하며 노드 본인이 리더가 되고 리턴합니다.
따라서 MsgRequestVote
, MsgRequestVoteResponse
등의 메시지는 싱글 노드 클러스터에서 오고 가지 않을 것임을 알 수 있습니다.
하지만 물론 이 시나리오는 싱글 노드 클러스터가 아니기 때문에 경우에 해당하지 않습니다.
pub fn campaign(&mut self, campaign_type: &'static [u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
...
} else {
self.become_candidate();
(MessageType::MsgRequestVote, self.term)
};
let self_id = self.id;
if VoteResult::Won == self.poll(self_id, vote_msg, true) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster).
return;
}
...
}
campaign()
의 뒷부분을 더 들여다보기 전에 poll()
은 어떻게 동작하는 것인지 알아봅시다.
poll()
은 아래처럼 record_vote()
, tally_votes()
를 호출하는 함수이며, 투표 결과에 따라 투표에서 승리했다면 리더 노드로 전이한 후, 자신이 클러스터의 새 리더라는 것을 브로드캐스팅 (bcast_append()
) 합니다.
투표에서 진 경우 팔로워 노드로 전이하며, 결과가 Pending
인 경우 아무일도 수행하지 않고 리턴합니다.
fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
self.prs.record_vote(from, vote);
let (gr, rj, res) = self.prs.tally_votes();
if from != self.id {
info!(
self.logger,
"received votes response";
"vote" => vote,
"from" => from,
"rejections" => rj,
"approvals" => gr,
"type" => ?t,
"term" => self.term,
);
}
match res {
VoteResult::Won => {
if self.state == StateRole::PreCandidate {
self.campaign(CAMPAIGN_ELECTION);
} else {
self.become_leader();
self.bcast_append();
}
}
VoteResult::Lost => {
let term = self.term;
self.become_follower(term, INVALID_ID);
}
VoteResult::Pending => (),
}
res
}
record_vote()
의 역할은 아주 단순합니다. id
값을 가진 노드가 자기 자신에게 투표했을 때 ProgressTracker
의 해시맵 객체 votes
에 기록하는 함수입니다.
pub fn record_vote(&mut self, id: u64, vote: bool) {
self.votes.entry(id).or_insert(vote);
}
tally_votes
를 봅시다. 해시맵 votes
를 통해 자기 자신에게 투표한 노드의 수와 거절한 노드의 수를 세서 튜플 형태로 리턴해주고 있는 것을 볼 수 있습니다.
💡 "tally"라는 단어는 점수를 세거나 집계하는 행위를 의미합니다. 즉 "tally_votes"는 투표를 세서 집계하는 함수입니다.
pub fn tally_votes(&self) -> (usize, usize, VoteResult) {
let (mut granted, mut rejected) = (0, 0);
for (id, vote) in &self.votes {
if !self.conf.voters.contains(*id) {
continue;
}
if *vote {
granted += 1;
} else {
rejected += 1;
}
}
let result = self.vote_result(&self.votes);
(granted, rejected, result)
}
투표 결과를 어떻게 판단하는지 들여다볼까요?
조인트 쿼럼의 경우 두 쿼럼 (Incoming quorum, Outgoing quorum)의 동의를 모두 얻어야 투표에서 승리할 수 있습니다.
따라서 우리는 아래 세 vote_result()
함수를 들여다봐야 합니다.
tracker.rs에선 해시맵 votes
를 통해 노드 id
가 자신에게 투표했는지 알 수 있게 해 주는 콜백 함수 check
를 인자로 넘겨 줍니다.
joint.rs에선 두 구성에서 모두 승리한 경우에만 VoteResult::Won
를 리턴하고, 한 쪽에서라도 투표에서 졌다면 VoteResult::Lost
를 리턴합니다. 그 외의 경우인 경우 VoteResult::Pending
를 리턴합니다.
득표 수를 실제로 카운트하는 작업은 majority.rs의 vote_result()
에서 진행됩니다.
클러스터의 voter들 중 자기 자신에게 투표한 노드의 수와 투표하지 않은 노드의 수를 세서 과반수보다 많은 노드들이 동의한 경우 VoteResult::Won
, 과반수 이상의 투표를 얻지 못했지만 응답을 보내주지 못한 노드까지 포함시켰을 때 과반수를 넘는다면 VoteResult::Pending
, 그 이외의 경우 VoteResult::Lost
를 반환합니다.
// tracker.rs
pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult {
self.conf.voters.vote_result(|id| votes.get(&id).cloned())
}
// joint.rs
pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
let i = self.incoming.vote_result(&check);
let o = self.outgoing.vote_result(check);
match (i, o) {
// It won if won in both.
(VoteResult::Won, VoteResult::Won) => VoteResult::Won,
// It lost if lost in either.
(VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
// It remains pending if pending in both or just won in one side.
_ => VoteResult::Pending,
}
}
// majority.rs
pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
...
let (mut yes, mut missing) = (0, 0);
for v in &self.voters {
match check(*v) {
Some(true) => yes += 1,
None => missing += 1,
_ => (),
}
}
let q = crate::majority(self.voters.len());
if yes >= q {
VoteResult::Won
} else if yes + missing >= q {
VoteResult::Pending
} else {
VoteResult::Lost
}
}
// util.rs
pub fn majority(total: usize) -> usize {
(total / 2) + 1
}
투표 과정이 votes
해시맵을 기반으로 어떻게 진행되는지 살펴보았습니다. 하지만 이 과정을 밟기 전 MsgRequestVote
, MsgRequestVoteResponse
메시지를 통해 이 해시맵이 적절하게 업데이트 되어야 합니다.
따라서 campaign()
함수를 계속 따라가 보도록 합시다.
campaign()
함수가 MsgRequestVote
타입의 메시지를 만들어 voter들에게 전송하고 있다는 것을 알 수 있습니다.
따라서 그 다음으론 MsgRequestVote
메시지의 핸들러를 따라가 봅시다.
pub fn campaign(&mut self, campaign_type: &'static [u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
...
} else {
self.become_candidate();
(MessageType::MsgRequestVote, self.term)
};
let self_id = self.id;
if VoteResult::Won == self.poll(self_id, vote_msg, true) {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster).
return;
}
// Only send vote request to voters.
for id in self.prs.conf().voters().ids().iter() {
if id == self_id {
continue;
}
...
let mut m = new_message(id, vote_msg, None);
m.term = term;
m.index = self.raft_log.last_index();
m.log_term = self.raft_log.last_term();
m.commit = commit;
m.commit_term = commit_term;
...
self.r.send(m, &mut self.msgs);
}
...
}
얼핏 보면 복잡해 보이지만 결국 MsgRequestVote
메시지의 핸들러가 하는 일은 이 투표에 동의하거나 동의하지 않는다는 메시지를 만들어 전송해주는 일입니다.
vote_resp_msg_type
에 따라 우리가 보낸 메시지 타입은 MsgRequestVote
이므로 응답 메시지의 타입은 MsgRequestVoteResponse
가 될 것입니다. (이 글에선 prevote 알고리즘에 대한 설명은 생략합니다)
그럼 노드가 언제 투표에 동의하고 언제 동의하지 않는지 살펴봅시다. 주석과 함께 코드를 찬찬히 살펴보면 투표에 동의하기 위해선 아래 세 조건이 만족되어야 함을 알 수 있습니다.
-
can_vote
가 true (이미 해당 노드에 투표한 경우이거나, 이번term
에서의leader_id
를 모르고 아직 투표 하지 않은 경우) -
self.raft_log.is_up_to_date
가 true (메시지의term
값이RaftLog.last_term
보다 크거나, 만약 같다면RaftLog.last_index
보다 메시지의 인덱스가 큰 경우) -
메시지의 인덱스가
RaftLog.last_index
보다 크거나, 더 높은 우선 순위를 갖는 경우
이 세 조건이 만족된 경우 Vote에 동의하며 만족되지 않는 조건이 있다면 Vote를 거절한다는 메시지를 보냅니다.
그럼 이제 MsgRequestVoteResponse
의 수신부로 넘어가봅시다.
// raft.rs
pub fn step(&mut self, m: Message) -> Result<()> {
...
match m.get_msg_type() {
MessageType::MsgRequestVote => {
// We can vote if this is a repeat of a vote we've already cast...
let can_vote = (self.vote == m.from) ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(self.vote == INVALID_ID && self.leader_id == INVALID_ID)
// ...and we believe the candidate is up to date.
if can_vote
&& self.raft_log.is_up_to_date(m.index, m.log_term)
&& (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m))
{
self.log_vote_approve(&m);
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = false;
to_send.term = m.term;
self.r.send(to_send, &mut self.msgs);
if m.get_msg_type() == MessageType::MsgRequestVote {
// Only record real votes.
self.election_elapsed = 0;
self.vote = m.from;
}
} else {
self.log_vote_reject(&m);
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
to_send.reject = true;
to_send.term = self.term;
let (commit, commit_term) = self.raft_log.commit_info();
to_send.commit = commit;
to_send.commit_term = commit_term;
self.r.send(to_send, &mut self.msgs);
self.maybe_commit_by_vote(&m);
}
}
}
}
// raft.rs
pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
match t {
MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
_ => panic!("Not a vote message: {:?}", t),
}
}
// raft_log.rs
pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool {
term > self.last_term() || (term == self.last_term() && last_index >= self.last_index())
}
MsgRequestVoteResponse
메시지 핸들러는 매우 단순합니다!
우리가 아까 봤었던 poll()
함수를 호출하여 votes
해시맵을 업데이트 하고 투표 결과가 결정된 경우 StateRole
을 업데이트 합니다.
fn step_candidate(&mut self, m: Message) -> Result<()> {
match m.get_msg_type() {
...
MessageType::MsgRequestVoteResponse => {
...
self.poll(m.from, m.get_msg_type(), !m.reject);
self.maybe_commit_by_vote(&m);
}
}
}
정리
이 글에선 raft-rs에서 사용되는 타입들을 바탕으로 코드 아키텍쳐를 살펴본 후, 세 가지 기초적인 시나리오를 바탕으로 raft 구현체의 코드를 따라가며 분석해보았습니다. 이 글이 raft 모듈에 대한 이해를 넓히는데 도움이 되었기를 바랍니다. 다음 글에선 좀 더 다양한 시나리오들을 통해 raft 구현체의 작동 방식을 보다 깊이 살펴보도록 하겠습니다.
감사합니다 😊