Tag : framework

  • Introducing Raftify: High-level Raft framework created with focusing on scalability

    By Gyubong Lee

    Hello, I've been working on introducing Raft to the Backend.AI manager processes at Lablup since last year.

    Here's a rough breakdown of the related tasks I'm working on.

    1. Introducing Raft to the Backend.AI manager process and making it a leader-follower structure.
    2. Replacing the existing distributed lock-based GlobalTimer with a Raft-based global timer, and ensuring that a specific task is performed exactly once in the cluster.
    3. Embedding a global, shareable state store in the manager process and synchronizing it appropriately.

    In this post, I'll introduce the Raft framework I've been shoveling over the past year to accomplish this task, some of the issues I've encountered while developing it, and walk through a raftify example code that implements a distributed key-value store in less than 300 lines of code in total.

    Introducing raftify

    raftify is a Raft framework developed with a focus on extensibility so that it can be easily integrated with any server application.

    Among the Raft implementations utilized in production, tikv's raft-rs implementation was developed on top of raftify, using LMDB as stable storage and gRPC as the network layer.

    Writing binding of the Raft module

    I decided that building and maintaining a reliable Raft implementation from the ground up would be a significant burden, so I decided to write a Python binding for the Raft module first.

    So I initially thought I'd try writing a Python binding using gopy, which is the most starred Raft implementation on GitHub, hashicorp/raft.

    However, gopy didn't support the binding of goroutine, and it didn't support the latest Python version.

    Then, on the advice of a senior developer in the company, I learned about a Rust implementation called tikv/raft-rs and PyO3, which inspired me to try writing a Python binding for tikv/raft-rs using PyO3.

    rraft-py

    Thus, I decided to develop a Python binding for Raft modules named rraft-py which is combined with Rust, Raft, and Py

    My first concern in developing rraft-py was to make the semantics of the rust code and the python code as close to a 1:1 match as possible.

    To achieve a 1:1 match, I needed to bypass some of the details of Rust's syntax.

    My main concern at the time was how to expose Rust references to the Python side, which you can see in my PyCon KR presentation if you're interested.

    The result is rraft-py, a port of over 10,000 lines of integration test code from raft-rs, which has become a fairly reliable implementation of Raft bindings that can be used directly in Python.

    Currently, raftify is in the process of being completely rewritten in Rust, and rraft-py is no longer used, but it was a great experience to write my first PyO3 bindings and try out the APIs of a Raft implementation.

    riteraft-py

    After developing riteraft-py and porting over 10,000 lines of integration tests from raft-rs and even the multiple-mem-node example to python code to get it working, my only thought was that I still didn't know where to start.

    The raft-rs really only provided the Raft implementation itself and I had no idea how to integrate it into my application.

    While browsing GitHub, I came across a high-level Rust implementation based on tikv/raft-rs called riteraft in an issue called How to use this lib?, and it was much more intuitive to figure out how to use. So I decided to develop riteraft-py with the goal of mimicking its behavior in Python and integrating it at the application level.

    The job of riteraft is to integrate this Raft implementation directly with Raft modules and logs, state machines, and network layers, but the problem was that it didn't work very well, apart from being intuitive to use.

    Leader election not happening when the leader is dead, data replication not happening in certain scenarios, panic when the commit count goes over 255, etc... All sorts of miscellaneous issues had to be resolved.

    Even after resolving all of these issues and getting the cluster to look like it was working, the issues kept coming: it would seem to be working fine, but then certain failures would cause catastrophic issues, such as cluster inconsistency or log synchronization locking up.

    Each time we encountered an issue, we needed to be able to dig into the technical details of RAFT-RS and understand them, which ended up being a process of ripping apart RAFT-RS's code and understanding it piece by piece.

    raftify

    While troubleshooting the issue, I decided to use a different abstraction than riteraft and implemented many changes, including a CLI module for debugging node and cluster state, which led me to rename the library to raftify.

    When I first started developing the library, the goal was to make it compatible with any Python application, hence the name raftify, which means to raftify.

    I am no longer developing the Python implementation, but you can find it on its branch.

    raftify written in Rust

    Developed in Python on top of rraft-py, raftify ended up working well, but the crude test harness written in a multi-process structure was hard to test in CI, easily broke cluster consistency, and got out of control at the slightest hint of code complexity.

    As a result, we decided to completely rewrite raftify's internal logic in Rust and expose only the high-level interface of the Raft package in Python.

    Once completely rewritten in Rust, raftify was single-threaded, integration testable, and could be tested in CI, which helped eliminate the fear of making code changes.

    raftify example code

    In this section, we'll create a simple distributed key-value store using raftify.

    For the full source code, see this link.

    Define the state machine

    The first thing we need to do is define the log entries and state machine that we will use in our key-value store.

    For the sake of this article, we'll just define a simple Insert type command that defines a value as a log entry.

    💡 Disclaimer: This article does not explain the Rust language syntax and the theoretical background of Raft.

    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub enum LogEntry {
        Insert { key: u64, value: String },
    }
    

    Let's define a state machine of type HashMap as shown below.

    #[derive(Clone, Debug)]
    pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
    

    Then we need to define encode and decode methods to indicate how we want to serialize and deserialize these data structures. You can use the bincode crate to define these as simply as below.

    impl AbstractLogEntry for LogEntry {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(self).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<LogEntry> {
            let log_entry: LogEntry = deserialize(bytes)?;
            Ok(log_entry)
        }
    }
    
    impl AbstractStateMachine for HashStore {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<Self> {
            let db: HashMap<u64, String> = deserialize(bytes)?;
            Ok(Self(Arc::new(RwLock::new(db))))
        }
    }
    

    Finally, we need to define three methods in the HashStore that will be used by raftify's internal code.

    Define apply, a method that will be called when a new log entry is applied to the HashStore, snapshot, which will be called when saving the current state of the HashStore as a snapshot, and restore, which will be called when restoring the state of the HashStore via a snapshot byte slice, as shown below.

    #[async_trait]
    impl AbstractStateMachine for HashStore {
        async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
            let log_entry: LogEntry = LogEntry::decode(&data)?;
            match log_entry {
                LogEntry::Insert { ref key, ref value } => {
                    let mut db = self.0.write().unwrap();
                    log::info!("Inserted: ({}, {})", key, value);
                    db.insert(*key, value.clone());
                }
            };
            Ok(data)
        }
    
        async fn snapshot(&self) -> Result<Vec<u8>> {
            Ok(serialize(&self.0.read().unwrap().clone())?)
        }
    
        async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
            let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
            let mut db = self.0.write().unwrap();
            let _ = std::mem::replace(&mut *db, new);
            Ok(())
        }
    }
    

    Define the web server API

    Let's define the web server API that will be used in our example. We will use this API to access the Raft object on the node and manipulate the HashStore.

    For our example, we'll use the actix-web crate and define it as shown below.

    The put command can be implemented by calling the propose method on the RaftNode of the Raft object. We can do this by encoding the Insert type LogEntry we defined earlier and passing it as an argument to the RaftNode::propose method.

    The get command can be implemented by returning the value corresponding to the id from the HashMap stored in memory.

    #[get("/put/{id}/{value}")]
    async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder {
        let log_entry = LogEntry::Insert {
            key: path.0,
            value: path.1.clone(),
        };
        data.1.raft_node.propose(log_entry.encode().unwrap()).await;
    
        "OK".to_string()
    }
    
    #[get("/get/{id}")]
    async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder {
        let id = path.into_inner();
    
        let response = data.0.get(id);
        format!("{:?}", response)
    }
    
    let web_server = tokio::spawn(
        HttpServer::new(move || {
            App::new()
                .app_data(web::Data::new((store.clone(), raft.clone())))
                .service(put)
                .service(get)
        })
        .bind(addr)
        .unwrap()
        .run(),
    );
    

    Bootstrap a Raft cluster

    Next, let's bootstrap a cluster of RaftNodes.

    If the --peer-addr argument is given, send a join request to the cluster to get a new node_id via peer-addr and bootstrap a new cluster if this argument is not given.

    Leader

    In this example, we'll stick with the node_id of the leader node for intuitive understanding. This means that when we call Raft::bootstrap_cluster, we can create a Raft object of the leader node by passing 1 to the node_id. After that, we can call the Raft::run method and the RaftNode will run.

    Followers

    As shown below, after calling Raft::request_id to receive a ClusterJoinTicket that can be used to join the cluster, you will call Raft::new_follower with the specified node_id to create a follower Raft node object, call the Raft::run method to have the RaftNode run, and join the cluster via the Raft::join method.

    let (raft, raft_handle) = match peer_addr {
        Some(peer_addr) => {
            log::info!("Running in Follower mode");
    
            let ticket = Raft::request_id(raft_addr, peer_addr.clone(), logger.clone()).await.unwrap();
            let node_id = ticket.reserved_id;
    
            let raft = Raft::new_follower(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
    
            let handle = tokio::spawn(raft.clone().run());
            raft.join(ticket).await;
            (raft, handle)
        }
        None => {
            log::info!("Bootstrap a Raft Cluster");
            let node_id = 1;
            let raft = Raft::bootstrap_cluster(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
            let handle = tokio::spawn(raft.clone().run());
            (raft, handle)
        }
    };
    
    let _ = tokio::try_join!(raft_handle)?;
    

    You can now bootstrap a Raft cluster of three nodes in the terminal as shown below.

    $ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8002
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8003
    

    Test

    We can now try out the key-value store we defined through the actix-web server API via the curl command.

    ❯ curl http://localhost:8001/put/1/test
    OK
    
    ❯ curl http://localhost:8001/get/1
    Some("test")
    

    If you're interested in learning more, you can check out the raftify repository for instructions on how to use the CLI module to help with debugging, example code for RaftServiceClient, and more.

    Summary

    raftify is an experimental framework that aims to make it easier for anyone to integrate Raft modules that are otherwise hard to access by the normal developer.

    It was developed to introduce a leader-follower structure to Backend.AI manager processes, but as I've shown in this post, it could be used in a variety of places where you need an HA structure, such as creating your own simple distributed key-value store with short source code.

    If you're intrigued by the inner workings of the tikv/raft-rs implementation, stay tuned for my next post where I'll be analyzing what happens inside the source code line by line in a few scenarios.

    This post is automatically translated from Korean

    26 January 2024

We're here for you!

Complete the form and we'll be in touch soon

Contact Us

Headquarter & HPC Lab

8F, 577, Seolleung-ro, Gangnam-gu, Seoul, Republic of Korea

© Lablup Inc. All rights reserved.