Engineering
Nov 29, 2023
Engineering
Raft Consensus algorithm for Backend.AI: Leader election
Jeongseok Kang
Researcher
Nov 29, 2023
Engineering
Raft Consensus algorithm for Backend.AI: Leader election
Jeongseok Kang
Researcher
High availability (HA) has become an indispensable concept when talking about modern applications. High availability is the ability of an IT system to remain nearly 100% accessible and reliable at all times by eliminating or minimizing downtime1. Backend.AI, which is developed and serviced by Rableup, also employs various methods to maintain high availability.
Backend.AI architectureBackground
Backend.AI consists of many different components, including managers and agents, storage proxies, and web servers. Each of these components runs as multiple processes in a distributed environment to increase reliability, especially the manager, which is responsible for scheduling session execution and many core functions of Backend.AI. Currently, the manager has an Active-Active HA structure that ensures high availability through load balancing.
One of the many features of the Backend.AI Manager is event handling. Backend.AI raises various events, such as AgentStartedEvent
and DoScheduleEvent
, to track the lifecycle of agents and sessions and provide optimal scheduling. For example, when a Backend.AI Agent process runs, it generates an AgentStartedEvent
, and the Backend.AI Manager process receives this event and performs a specific action (schedule()
). Backend.AI Manager also raises a DoScheduleEvent
internally to ensure periodic scheduling. This is where the problem arises. If you are running multiple Backend.AI Manager processes for high availability, having each process raise an event with its own timer adds unnecessary load and can cause the health of the entire system to be unreliable. The Backend.AI Manager implements a GlobalTimer to ensure that only one manager process generates events within the same system. The GlobalTimer
uses distributed locks to ensure mutual exclusivity between processes and to ensure that only one process generates events.
@preserve_termination_log
async def generate_tick(self) -> None:
try:
await asyncio.sleep(self.initial_delay)
if self._stopped:
return
while True:
try:
async with self._dist_lock:
if self._stopped:
return
await self._event_producer.produce_event(self._event_factory())
if self._stopped:
return
await asyncio.sleep(self.interval)
except asyncio.TimeoutError: # timeout raised from etcd lock
if self._stopped:
return
log.warn("timeout raised while trying to acquire lock. retrying...")
except asyncio.CancelledError:
pass
Currently, Backend.AI provides an interface for distributed locks, [AbstractDistributedLock] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L33-L44), and we have developed and are using [FileLock
] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L47-L142), [EtcdLock
] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L145-L190) based on the [etcd concurrency API] (https://etcd.io/docs/v3.5/dev-guide/api_concurrency_reference_v3/), and [RedisLock
] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L193-L248) based on [Redis Lock] (https://redis.io/docs/manual/patterns/distributed-locks/) as actual implementations.
etcd is a distributed, open-source key-value store used to store and manage critical information needed to keep distributed systems running2, most notably in Kubernetes.
class AbstractDistributedLock(metaclass=abc.ABCMeta):
def __init__(self, *, lifetime: Optional[float] = None) -> None:
assert lifetime is None or lifetime >= 0.0
self._lifetime = lifetime
@abc.abstractmethod
async def __aenter__(self) -> Any:
raise NotImplementedError
@abc.abstractmethod
async def __aexit__(self, *exc_info) -> Optional[bool]:
raise NotImplementedError
Requirements
The GlobalTimer
does a good job of controlling event generation on a per-process basis in a distributed environment. However, requirements are always changing and the software needs to change with them. This time, the added requirement was to implement a rate limit for requests. With the current load balancing scheme, we can't guarantee that every request is handled by the same manager, which can lead to the following problems because the state of each manager is not shared.
1. Set the counters for both managers to 0 and the request count limit to 1.
2. The first request is received by manager 1.
3. Increase the counter on manager 1 by 1. (C1: 0 -> 1)
4. The counter reaches the maximum allowed number of requests and the next request is rejected.
5. Manager 2 receives the second request due to load balancing.
6. The counter on manager 2 has not reached the maximum allowed number of times because it is still 0. (C2: 0)
7. Manager 2 processes the request.
8. The request count limit didn't work!
Therefore, the following issue has been proposed to discuss ways to improve these limitations.
Issue suggesting improvements to distributed timers (lablup/backend.ai#415)To delegate global state management to a single manager process, represented by a leader, we investigated consensus algorithms and decided to use the Raft Consensus Algorithm (hereafter Raft), which is used in projects such as etcd, which is used as a repository in Kubernetes (https://kubernetes.io/docs/concepts/overview/components/#etcd), and which we believe has been well validated.
Raft consensus algorithm
The Raft algorithm was proposed in "In Search of an Understandable Consensus Algorithm"3 submitted to USENIX in 2014. It was created to improve upon Paxos45, the leading algorithm at the time, which was difficult to understand and implement in practice due to its complex consensus process, hence the title.
But our most important goal — and most difficult challenge — was understandability.
- In Search of an Understandable Consensus Algorithm
A Raft cluster typically consists of five nodes, because a maximum of two nodes can fail and still satisfy a quorum
to maintain the system.
Each node in a cluster has one of three states: leader, follower, or candidate. In general, there can be at most one leader in each cluster, with the rest of the nodes being followers.
State transition diagram of a Raft node (Source: In Search of an Understandable Consensus Algorithm)Glossary #1
- quorum: The minimum number of people required to make a decision. (N/2+1)
The Raft algorithm delegates all power to an elected leader and makes the flow of logs unidirectional, making it easier to understand the overall picture. The Raft algorithm has the following characteristics
Glossary #2
- term: The generation of the current leader or candidate. Incremented by 1 each time a leader election begins.
- index: Refers to the location of a specific value in the log.
- commit: Indicates that a specific value from the log was applied to the state machine.
- commitIndex: Highest index that successfully commits
- Election Safety: Each term has a maximum of one leader.
- Leader Append-Only: Readers cannot overwrite or delete logs, they can only add new ones.
- Log Matching: If two logs have values with the same index and term, all values up to that index are the same.
- Leader Completeness: If a value is committed to the log in a particular term, all subsequent generations of readers are guaranteed to have that value.
- State Machine Safety: If one server applies a log value from a particular index to its state machine, another server cannot apply a different value from the same index.
Using the above features, Raft divides the entire consensus process into three independent parts.
- Leader election: If the existing leader is not working, a new leader must be elected.
- Log replication: The leader replicates the request logs it receives from clients to other nodes. The other nodes unconditionally accept the leader's logs.
- Safety: When one server applies a log value from a particular index to its state machine, another server cannot apply a different value from the same index.
In this article, we'll discuss the different states a Raft node can be in, and implement the leader election process in code.
Follower
Followers do not send requests themselves, but only receive and respond to requests from the leader or candidate. The Behavior Spec for a Follower proposed in the paper and the code written based on it is shown below.
- Handle RPC requests from leaders and candidates.
async def on_append_entries(
self,
*,
term: int,
leader_id: RaftId,
prev_log_index: int,
prev_log_term: int,
entries: Iterable[raft_pb2.Log],
leader_commit: int,
) -> Tuple[int, bool]:
await self._reset_timeout()
if term < (current_term := self.current_term):
return (current_term, False)
await self._synchronize_term(term)
return (self.current_term, True)
async def on_request_vote(
self,
*,
term: int,
candidate_id: RaftId,
last_log_index: int,
last_log_term: int,
) -> Tuple[int, bool]:
await self._reset_timeout()
async with self._vote_request_lock:
if term < (current_term := self.current_term):
return (current_term, False)
await self._synchronize_term(term)
async with self._vote_lock:
if self.voted_for in [None, candidate_id]:
self._voted_for = candidate_id
return (self.current_term, True)
return (self.current_term, False)
async def _synchronize_term(self, term: int) -> None:
if term > self.current_term:
self._current_term.set(term)
await self._change_state(RaftState.FOLLOWER)
async with self._vote_lock:
self._voted_for = None
- If you don't receive any requests from leaders or candidates for a period of time, you'll be placed in candidate status.
async def _wait_for_election_timeout(self, interval: float = 1.0 / 30) -> None:
while self._elapsed_time < self._election_timeout:
await asyncio.sleep(interval)
self._elapsed_time += interval
await self._change_state(RaftState.CANDIDATE)
Leaders must periodically announce their presence by sending heartbeat messages to their followers. If a follower does not receive any messages for a certain amount of time (
election_timeout
), it assumes that the cluster is leaderless and starts an election by becoming a candidate to become the new leader.
Candidate
The candidate's behavior statement and implementation code is as follows
- Become a follower when you receive the
AppendEntries
RPC request from the new leader (seeon_append_etries()
for followers). - Start the election with the following procedure
- Increase term by 1. (term += 1)
- Vote for yourself.
- Initialize the election timeout.
- Send a
RequestVote
RPC request to the other nodes.
async def _start_election(self) -> None:
self._current_term.increase()
async with self._vote_lock:
self._voted_for = self.id
current_term = self.current_term
terms, grants = zip(
*await asyncio.gather(
*[
asyncio.create_task(
self._client.request_vote(
to=server,
term=current_term,
candidate_id=self.id,
last_log_index=0,
last_log_term=0,
),
)
for server in self._configuration
]
)
)
- If you receive votes from a majority of nodes, you are the leader.
for term in terms:
if term > current_term:
await self._synchronize_term(term)
break
else:
if sum(grants) + 1 >= self.quorum:
await self._change_state(RaftState.LEADER)
- If the election timeout occurs, start a new election.
case RaftState.CANDIDATE: while self.__state is RaftState.CANDIDATE: await self._start_election() await self._reset_election_timeout() await self._initialize_volatile_state() if self.has_leadership(): await self._initialize_leader_volatile_state() break await asyncio.sleep(self.__election_timeout)
Leader
- Send the first heartbeat message (an empty
AppendEntries
request) immediately after the election. Send heartbeat messages periodically thereafter.
async def _publish_heartbeat(self) -> None:
if not self.has_leadership():
return
terms, successes = zip(
*await asyncio.gather(
*[
asyncio.create_task(
self._client.append_entries(
to=server,
term=self.current_term,
leader_id=self.id,
prev_log_index=0,
prev_log_term=0,
entries=(),
leader_commit=self._commit_index,
),
)
for server in self._configuration
]
)
)
for term in terms:
if term > self.current_term:
await self._synchronize_term(term)
break
- When it receives a request from a client, it adds a value to the log. After applying that value to the state machine, send a response to the request.
- If the follower has a log value with an index greater than the value the leader is tracking (nextIndex), replicate the log to the follower starting at nextIndex.
- If successful, update the leader's nextIndex and matchIndex.
- If it fails due to an inconsistency, it decrements the leader's nextIndex and tries again.
- If the value (N) below exists, update the commitIndex to that value.
- The majority of matchIndexes are greater than or equal to N (matchIndex >= N)
- The term of the Nth log is the same as the current term
The leader manages a nextIndex and a matchIndex for each follower.
- nextIndex: The next index that should be sent to each follower.
- matchIndex: the highest index that was successfully replicated to each follower
Conclusion
In this article, we've briefly covered the Raft algorithm and written code to perform a leader election. The remaining two features (log replication, membership changes) will face a variety of challenges in actual implementation, including timing issues. If you're interested in learning more about the Raft algorithm, we recommend reading the author's (Diego Ongaro) PhD thesis (CONSENSUS: BRIDGING THEORY AND PRACTICE)6.
Finally, let's end by checking out how ChatGPT describes the Raft algorithm. Raft algorithm explained by ChatGPT (Source: OpenAI ChatGPT 3.5)
This article is based on the code in lablup/aioraft-ng. Please also pay attention to lablup/raftify, the next generation Raft project currently under development at Lablup.