태그 : 프레임워크

  • Introducing Raftify: 확장성에 초점을 맞춰 개발된 High-level Raft Framework

    By 이규봉

    안녕하세요, 저는 작년부터 Lablup에서 Backend.AI 매니저 프로세스에 Raft를 도입하는 작업을 맡아 수행하고 있습니다.

    제가 수행 중인 관련 작업을 대략적으로 나타내어 보면 아래와 같습니다.

    1. Backend.AI 매니저 프로세스에 Raft를 도입해 리더-팔로워 구조로 만드는 것.
    2. 기존 분산 락 기반의 GlobalTimer를 Raft 기반의 글로벌 타이머로 변경하고, 클러스터에서 특정 작업이 정확히 한 번만 수행되도록 보장하는 것.
    3. 매니저 프로세스 간 공유 가능한 전역적인 상태 저장소를 매니저 프로세스에 내장시키고 적절하게 동기화하는 것.

    이 글에선 이러한 작업을 수행하기 위해 제가 지난 1년간 삽질하며 개발하게 된 Raft 프레임워크와 이를 개발하며 마주친 여러 이슈들에 대해 소개드리고 총 300줄이 되지 않는 간략한 코드를 통해 분산 키값 저장소를 구현하는 raftify 예제 코드에 대해 설명드려 보도록 하겠습니다.

    raftify 소개

    raftify는 어떤 서버 애플리케이션과도 쉽게 통합될 수 있도록 확장성에 초점을 맞추어 개발된 Raft 프레임워크입니다.

    raftify는 프로덕션에서 활용되고 있는 Raft 구현체들 중 tikv의 raft-rs 구현체 위에 LMDB를 stable storage로, gRPC를 네트워크 계층으로 사용해 개발되었습니다.

    raft 모듈 바인딩

    저는 신뢰할 수 있는 Raft 구현체를 밑바닥부터 모두 쌓아올려 유지 보수하는 것은 현실적으로 큰 짐이 될 수 있다고 판단해 우선 Raft 모듈의 파이썬 바인딩을 작성해보기로 결정했습니다.

    그래서 처음에는 GitHub에서 가장 스타를 많이 받은 Raft 구현체인 hashicorp/raft 구현체를 gopy를 사용해 파이썬 바인딩을 작성해보면 어떨까 생각했습니다.

    하지만 gopy는 고루틴에 대한 바인딩을 지원해주지 못했고 최신 파이썬 버전도 지원해 주지 않고 있었습니다.

    그러던 참에 사내 시니어 개발자님의 조언을 통해 tikv/raft-rs란 Rust 구현체와 PyO3에 대해 알게 되며 PyO3를 통해 tikv/raft-rs의 파이썬 바인딩을 작성해 보아야겠다고 생각하게 되었습니다.

    rraft-py

    그렇게 rust, raft, py를 합쳐 rraft-py란 이름으로 Raft 모듈의 파이썬 바인딩 개발에 도전해보게 되었습니다.

    rraft-py를 개발하면서 가장 먼저 신경 쓴 것은 rust 코드와 파이썬 코드의 의미가 가능한 1:1 매칭이 되도록 만들어야겠다는 것이었습니다.

    1:1 매칭이 가능하려면 러스트의 문법에 관련된 세부 사항들을 잘 우회할 필요가 있었습니다.

    제가 당시에 제일 고민했던 것은 러스트의 참조를 파이썬 측으로 어떻게 노출해주어야 좋을지에 관련된 것이었으며, 관심이 있으시다면 해당 파이콘 발표 영상을 참고하실 수 있습니다.

    이렇게 개발된 rraft-py는 1만 줄 이상의 raft-rs의 통합 테스트 코드를 그대로 포팅해 파이썬에서 바로 사용할 수 있는 나름 신뢰할 수 있는 Raft 바인딩 구현체가 되었습니다.

    현재 raftify는 Rust로 완전히 재작성하는 과정을 거친 후 rraft-py를 사용하지 않게 되었지만 처음으로 PyO3 바인딩을 작성해보고 Raft 구현체의 API들을 사용해보는 좋은 경험이 되었습니다.

    riteraft-py

    rraft-py를 개발하고 raft-rs의 1만 줄 가량의 통합 테스트들과 multiple-mem-node example까지 파이썬 코드로 포팅해 정상적으로 동작하도록 개발한 후 든 생각은 여전히 어디에서부터 시작해야 할지 모르겠다는 것이었습니다.

    raft-rs는 정말 Raft 구현체 자체만을 제공했고 이것을 어떻게 애플리케이션에 통합할 수 있을지 전혀 감이 잡히지 않았습니다.

    Github을 찾아보던 중 How to use this lib?란 이슈에서 riteraft란 이름으로 tikv/raft-rs를 기반으로 하는 하이 레벨의 Rust 구현체를 발견하게 되었고, 해당 라이브러리는 훨씬 직관적으로 사용 방법을 파악할 수 있었습니다. 그래서 저는 파이썬에서 이것의 동작을 그대로 모방해 애플리케이션 레벨에 통합하는 것을 목표로 riteraft-py를 개발하기로 결심했습니다.

    riteraft는 Raft 모듈과 로그, 상태 머신, 네트워크 계층과 이 Raft 구현체를 직접 통합하는 일을 수행하는데요, 문제는 직관적인 사용법과 별개로 제대로 동작하지 않는다는 점이었습니다.

    리더가 죽었는데 Leader election이 일어나지 않는 문제, 특정 시나리오에서 데이터 복제가 일어나지 않는 문제, 커밋 갯수가 255개를 넘어갈 때 일어나는 패닉 등... 온갖 잡다한 이슈를 모두 해결해야 했습니다.

    위 이슈들을 모두 해결하고 클러스터가 동작하는 것 처럼 보이게 만든 후에도 이슈는 계속해서 발생했습니다. 잘 동작하는 것 같다가도 특정 장애 상황을 마주하면 클러스터 일관성이 깨지거나 로그 동기화가 락인 되는 등 치명적인 문제들이 발생했습니다.

    이슈가 발생할 때마다 매번 raft-rs의 기술적인 세부 사항들을 들여다보고 이해할 수 있어야 했으며 이 과정은 결국 raft-rs의 코드를 뜯어보고 하나 하나 이해해 나가는 과정을 요구했습니다.

    raftify

    이슈를 해결하는 과정에서 riteraft와 다른 추상화를 사용하기로 결정했고 노드와 클러스터 상태를 디버깅 하기 위한 CLI 모듈 등 여러 변경 사항들을 구현하면서 라이브러리 이름을 raftify로 변경하게 되었습니다.

    해당 라이브러리를 처음 개발하기 시작할 땐 어떤 파이썬 애플리케이션과도 잘 호환될 수 있도록 하는 것을 목표로 개발했기 때문에 raft화 시키겠다는 의미로 raftify라는 이름을 붙였습니다.

    파이썬 구현체는 현재는 더 이상 개발하고 있지 않지만 해당 브랜치에서 확인할 수 있습니다.

    raftify written in Rust

    rraft-py 위에 파이썬으로 개발된 raftify는 결과적으로 잘 작동되긴 했지만 멀티 프로세스 구조로 작성된 조잡한 테스트 하네스는 CI에서 테스트 하기 힘들었고 쉽게 클러스터 일관성이 깨졌으며 코드가 조금만 복잡해져도 제어하기 힘들어졌습니다.

    결과적으로 raftify 내부 로직을 러스트로 완전히 재작성하고 Raft 패키지의 하이 레벨에서의 인터페이스만을 파이썬으로 노출시키기로 결정하게 되었습니다.

    그렇게 완전히 러스트로 재작성된 raftify는 싱글 스레드만으로 통합 테스트 수행이 가능했고, CI에서 테스트할 수 있어 코드 변경의 두려움을 없애도록 도와주었습니다.

    raftify 예제 코드

    이 섹션에선 raftify를 사용해 간단한 분산 키값 저장소를 만들어봅니다.

    전체 소스 코드는 해당 링크를 참고하세요.

    상태 머신 정의

    우선은 키값 저장소에서 사용할 로그 엔트리와 상태 머신을 정의해야 합니다.

    이 글에선 간단하게 로그 엔트리로 값을 정의하는 Insert라는 타입의 명령어만 정의해보겠습니다.

    💡 이 글에선 Rust 문법, Raft의 이론적 배경에 대해 설명하지 않습니다.

    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub enum LogEntry {
        Insert { key: u64, value: String },
    }
    

    상태 머신은 HashMap 타입으로 아래처럼 정의해보겠습니다.

    #[derive(Clone, Debug)]
    pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
    

    그런 다음 이 자료구조들을 어떻게 직렬화, 역직렬화 할지 나타낼 encode, decode 메서드를 정의해주어야 합니다. bincode 크레이트를 사용해 아래처럼 간단하게 정의할 수 있습니다.

    impl AbstractLogEntry for LogEntry {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(self).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<LogEntry> {
            let log_entry: LogEntry = deserialize(bytes)?;
            Ok(log_entry)
        }
    }
    
    impl AbstractStateMachine for HashStore {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<Self> {
            let db: HashMap<u64, String> = deserialize(bytes)?;
            Ok(Self(Arc::new(RwLock::new(db))))
        }
    }
    

    마지막으로 HashStore에 raftify 내부 코드에서 사용될 세 가지 메서드를 정의하면 됩니다.

    HashStore에 새 로그 엔트리가 적용될 때 호출될 메서드인 apply, 현재 HashStore의 상태를 스냅샷으로 저장할 때 호출될 snapshot, 스냅샷 바이트 슬라이스를 통해 HashStore의 상태를 복구할 때 호출될 restore를 아래처럼 정의해줍니다.

    #[async_trait]
    impl AbstractStateMachine for HashStore {
        async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
            let log_entry: LogEntry = LogEntry::decode(&data)?;
            match log_entry {
                LogEntry::Insert { ref key, ref value } => {
                    let mut db = self.0.write().unwrap();
                    log::info!("Inserted: ({}, {})", key, value);
                    db.insert(*key, value.clone());
                }
            };
            Ok(data)
        }
    
        async fn snapshot(&self) -> Result<Vec<u8>> {
            Ok(serialize(&self.0.read().unwrap().clone())?)
        }
    
        async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
            let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
            let mut db = self.0.write().unwrap();
            let _ = std::mem::replace(&mut *db, new);
            Ok(())
        }
    }
    

    웹 서버 API 정의

    예제에서 사용될 웹 서버 API를 정의해봅시다. 이 API를 통해 노드의 Raft 객체에 접근해 HashStore를 조작할 것입니다.

    예제에선 actix-web 크레이트를 사용해 아래처럼 정의해보도록 하겠습니다.

    put 명령은 Raft 객체의 RaftNode에서 propose 메서드를 호출함으로써 구현할 수 있습니다. 이전에 정의한 Insert 타입 LogEntry를 인코딩해 RaftNode::propose 메서드의 인자에 넘겨주면 됩니다.

    get 명령은 인메모리에 저장되어 있는 HashMap에서 id에 해당하는 값을 리턴하는 것으로 구현할 수 있습니다.

    #[get("/put/{id}/{value}")]
    async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder {
        let log_entry = LogEntry::Insert {
            key: path.0,
            value: path.1.clone(),
        };
        data.1.raft_node.propose(log_entry.encode().unwrap()).await;
    
        "OK".to_string()
    }
    
    #[get("/get/{id}")]
    async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder {
        let id = path.into_inner();
    
        let response = data.0.get(id);
        format!("{:?}", response)
    }
    
    

    Raft 클러스터 부트스트랩

    이제 RaftNode들의 클러스터를 부트스트랩 시켜 봅시다.

    이 예제에서는 아래와 같은 구성을 갖는 Raft 클러스터를 부트스트랩할 것입니다.

    아래와 같은 구성에서는 세 개의 노드가 voter로서 부트스트랩되므로 클러스터에 리더가 존재하지 않아 election_timeout 후 바로 리더를 선출하게 됩니다.

    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60061
    node_id = 1
    role = "voter"
    
    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60062
    node_id = 2
    role = "voter"
    
    [[raft.peers]]
    ip = "127.0.0.1"
    port = 60063
    node_id = 3
    role = "voter"
    
    let options = Options::from_args();
    let store = HashStore::new();
    let initial_peers = load_peers().await?;
    
    let mut cfg = build_config();
    cfg.initial_peers = Some(initial_peers.clone());
    
    let node_id = initial_peers
        .get_node_id_by_addr(options.raft_addr.clone())
        .unwrap();
    
    let raft = Raft::bootstrap(
        node_id,
        options.raft_addr,
        store.clone(),
        cfg.clone(),
        logger.clone(),
    )?;
    
    let handle = tokio::spawn(raft.clone().run());
    
    // ...
    tokio::try_join!(handle)?;
    

    그리고 이 Raft 서버와 통신하기 위한 웹 서버를 붙여 줍시다.

    if let Some(addr) = options.web_server {
        let _web_server = tokio::spawn(
            HttpServer::new(move || {
                App::new()
                    .app_data(web::Data::new((store.clone(), raft.clone())))
                    .service(put)
                    .service(get)
            })
            .bind(addr)
            .unwrap()
            .run(),
        );
    }
    

    이제 터미널에서 아래처럼 세 노드로 이뤄진 Raft 클러스터를 부트스트랩 시킬 수 있습니다.

    $ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --web-server=127.0.0.1:8002
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --web-server=127.0.0.1:8003
    

    테스트

    이제 curl 명령을 통해 actix-web 서버 API를 통해 우리가 정의한 키값 저장소를 사용해 볼 수 있습니다.

    ❯ curl http://localhost:8001/put/1/test
    OK
    
    ❯ curl http://localhost:8001/get/1
    Some("test")
    

    더 자세한 내용이 궁금하시다면 raftify 레포지토리에서 디버깅을 도와주는 CLI 모듈의 사용법, RaftServiceClient의 예제 코드 등을 확인하실 수 있습니다.

    정리

    raftify는 일반 개발자 입장에서 접근하기 쉽지 않은 Raft 모듈을 누구나 쉽게 통합시킬 수 있도록 만드는 것을 목표로 하고 있는 실험적인 프레임워크입니다.

    Backend.AI 매니저 프로세스들에 리더-팔로워 구조를 도입하겠다는 목적으로 개발되었지만 이 글에서 설명드린 것 처럼 짧은 소스 코드로 자신만의 간단한 분산 키값 저장소를 만드는 등 HA 구조가 필요한 곳에서 다양하게 활용될 수 있을 것으로 보입니다.

    혹시 tikv/raft-rs 구현체 내부 동작 방식에 흥미가 생기셨다면 다음 글에서 몇 가지 시나리오에서 일어나는 일들을 소스코드 내부를 한 줄 한 줄 따라가며 분석해 볼 예정이니 기대해주시면 감사하겠습니다.

    26 January 2024

도움이 필요하신가요?

내용을 작성해 주시면 곧 연락 드리겠습니다.

문의하기

Headquarter & HPC Lab

서울특별시 강남구 선릉로100길 34 남영빌딩 4층, 5층

© Lablup Inc. All rights reserved.