Engineering

  • Model Variant: Easily Serving Various Model Services

    By Jihyun Kang

    Introduction

    Imagine a scenario where you need to train an AI for research purposes and produce results. Our job would simply be to wait for the AI to correctly learn the data we've taught it. However, if we assume we're creating a service that 'utilizes' AI, things get more complicated. Every factor becomes a concern, from how to apply various models to the system to what criteria to use for scaling under load conditions. We can't carelessly modify the production environment where users exist to get answers to these concerns. If an accident occurs while expanding or reducing the production environment, terrible things could happen. If something terrible does happen, we'll need time to recover from it, and we can't expect the same patience from consumers using our service as we would from researchers waiting for model training. Besides engineering difficulties, there are also cost challenges. Obviously, there's a cost to serving models, and users are incurring expenses even at the moment of training models as resources are being consumed.

    However, there's no need to worry. Many well-made models already exist in the world, and in many cases, it's sufficient for us to take these models and serve them. As those interested in our solution may already know, Backend.AI already supports various features you need when serving models. It's possible to increase or decrease services according to traffic, and to serve various models tailored to users' preferences.

    But the Backend.AI team doesn't stop here. We have enhanced the model service provided from Backend.AI version 23.09 and improved it to easily serve various models. Through this post, we'll explore how to serve various models easily and conveniently.

    This post introduces features that allow you to serve various types of models more conveniently. Since we've already given an explanation about model service when releasing the 23.09 version update, we'll skip the detailed explanation. If you're unfamiliar with Backend.AI's model service, we recommend reading the following post first: Backend.AI Model Service Preview

    Existing Method

    | Requirement | Existing Method | Model Variant | |-------------|-----------------|---------------| | Writing model definition file (model-definition.yaml) | O | X | | Uploading model definition file to model folder | O | X | | Model metadata needed | O | △ (Some can be received automatically) |

    Backend.AI model service required a model definition file (model-definition.yaml) that contains commands to be executed when serving the model in a certain format, in addition to the model metadata needed to run. The service execution order was as follows: Write the model definition file, upload it to the model type folder so it can be read, and when starting the model service, mount the model folder. Then, an API server that automatically transfers the end user's input to the model according to the model definition file and sends back the response value would be executed. However, this method had the disadvantage of having to access the file every time the model definition file needed to be modified. Also, it was cumbersome to write different model definition files each time the model changed because the model path was already set in the model definition file.

    The Model Variant introduced this time is a feature that allows you to serve models immediately by inputting a few configuration values or without any input at all, using only model metadata without a model definition file. Model Variant supports command, vLLM, and NIM (NVIDIA Inference Microservice) methods. The methods of serving and verifying model service execution are as follows.

    Basically, model service requires metadata of the model to be served. Download the model you want to serve from Hugging Face, where you can easily access model metadata. In this example, we used the Llama-2-7b-hf model and Calm3-22b-chat model from Hugging Face. For how to upload model metadata to the model folder, refer to the "Preparing Model Storage" section in the previous post.

    Automatically Serving Model from Built Image (Command Method)

    The first introduced command method is a form where the command part that executes to serve the model in the model definition file is included in the execution image. After specifying the command to execute in the CMD environment variable, build the image and execute it immediately without any other input when actually serving the model. The command method does not support what's called a Health check, which verifies if the service is running properly. Therefore, it's more suitable for immediately setting up and checking a service as a prototype rather than performing large-scale services. The execution method is as follows:

    1. On the start screen, select Llama-2-7b-hf in the Model Storage To Mount item to mount the model folder containing the model metadata corresponding to the model service to be served, and select Predefined Image Command in the Inference Runtime Variant item.

    Activate the Open To Public switch button if you want to provide model service accessible without a separate token.

    모델-서비스-시작화면-모델-메타데이터-마운트-및-CMD-선택

    1. Select the environment to serve. Here, we use vllm:0.5.0 and allocate CPU 4 Core, Memory 16 GiB, NVIDIA CUDA GPU 10 fGPU as resources.

    모델-서비스-시작화면-실행환경-선택-및-자원할당

    1. Finally, select the cluster size and click the start button. The cluster size is set to single node, single container.

    모델-서비스-시작-화면-클러스터-크기-선택-및-시작

    If the service has been successfully launched, the service status will change to HEALTHY and the endpoint address will appear.

    모델-서비스-상세-화면

    Verifying the Service

    If the service has been launched normally, check the service model name with the cURL command:

    curl https://cmd-model-service.asia03.app.backend.ai/v1/models \
    -H "Content-Type: application/json"
    

    모델명-확인하기

    Now, let's send input to the service with the cURL command and check the response:

    For model services run with CMD, the model name is already defined in the image, so after checking the model name, you must enter the model name as the value of the model key when sending a request.

    curl https://cmd-model-service.asia03.app.backend.ai/v1/completions \
    -H "Content-Type: application/json" \
    -d '{
    "model": "image-model",
    "prompt": "San Francisco is a",
    "max_tokens": 7,
    "temperature": 0}'
    

    모델-서비스-요청-결과-화면

    Serving Models in vLLM Mode

    The vLLM mode is similar to the command method introduced earlier, but various options entered when running vLLM can be written as environment variables. The execution method is as follows:

    How to Run

    1. On the start screen, mount the model folder for the model service to be served and select vLLM in the Inference Runtime Variant item.

    모델-서비스-시작-화면-모델-메타데이터-마운트-및-vLLM-선택

    1. Select the environment to serve. As with the command method explained earlier, select vllm:0.5.0, and (although you can set the resources the same) this time we'll allocate CPU 16 Core, Memory 64 GiB, NVIDIA CUDA GPU 10 fGPU.

    모델-서비스-시작-화면-실행환경-선택-및-자원-할당

    1. Finally, select the cluster size and enter the environment variable BACKEND_MODEL_NAME. This value corresponds to the --model-name option in vLLM and becomes the model value specified by the user when sending a request to the service. 모델-서비스-시작-화면-실행환경-선택-및-자원-할당

    Likewise, if the service has been successfully launched, the service status will change to HEALTHY, and the endpoint address where the service is launched will appear.

    모델-서비스-상세-화면

    Verifying the Service

    Let's send input to the service with the cURL command and check the response value. At this time, enter the model value as the BACKEND_MODEL_NAME value you set earlier. Once the input is complete, click the START button to create the service.

    curl https://vllm-calm3-22b-chat.asia03.app.backend.ai/v1/completions \
    -H "Content-Type: application/json" \
    -d '{
    "model": "vllm-model",
    "prompt": "初めて会う日本人ビジネスマンに渡す最高の挨拶は何でしょうか?",
    "max_tokens":  200,
    "temperature": 0
    }'
    

    모델-서비스-요청-결과-화면

    Serving Models in NIM Mode

    To run NIM, you need an API key issued from an account that can access NGC's NIM model registry. For how to obtain the key value, please refer to the following content: NVIDIA Docs Hub : How to get NGC API Key

    The NIM (NVIDIA Inference Microservice) mode is also similar to the command mode, but it must be run with an image that has NVIDIA's NIM-supporting model server built-in. Also, when loading the model, the NGC API key value is needed. Assuming everything is ready, let's start the model service.

    How to Run

    1. On the start screen, select an empty model type folder to cache the metadata to be received from NIM, and select NIM in the Inference Runtime Variant item.

    모델-서비스-시작-화면-모델-폴더-마운트-및-NIM-선택

    1. Select the environment to serve. Here, we use ngc-nim:1.0.0-llama3.8b and set to allocate CPU 8 Core, Memory 32 GiB, NVIDIA CUDA GPU 15 fGPU as resources.

    모델-서비스-시작-화면-실행환경-선택-및-자원-할당

    1. Finally, select the cluster size and enter the default path /models for the environment variable HF_HOME. Then enter NGC_API_KEY and input the issued key value. Once the input is complete, click the CREATE button to create the service.

    모델-서비스-시작-화면-클러스터-크기-선택-환경변수-입력-및-시작

    When using NIM, it may take some time for the first execution as it receives model metadata from the repository. You can check the progress by viewing the container logs for the routing session in service on the session page. 모델-서비스에-대응하는-라우팅-세션 NIM-에서-데이터를-받고-있는-로그가-띄워진-컨테이너-로그-화면

    Like the command and vLLM modes, if the service has been successfully launched, the service status will change to HEALTHY. Let's input the content to send to the service using the endpoint address where the service is launched as follows, and check the response value.

    Verifying the Service

    from openai import OpenAI
    
    client = OpenAI(
      base_url = "https://nim-model-service.asia03.app.backend.ai/v1",
      api_key = "$YOUR_NGC_API_KEY"
    )
    
    completion = client.chat.completions.create(
      model="meta/llama3-8b-instruct",
      messages=[
          {        
            "role":"user", 
            "content":"Hello! How are you?"
          },
          {
            "role":"assistant",
            "content":"Hi! I am quite well, how can I help you today?"
          },
          {
            "role":"user",
            "content":"Can you write me a song?"
          }],
      temperature=0.5,
      top_p=1,
      max_tokens=1024,
      stream=True
    )
    
    for chunk in completion:
      if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")
    

    모델-서비스-요청-결과-화면

    Conclusion

    The Model Variant feature will be of great help to researchers and companies aiming to provide actual services with already trained models. Based on a powerful resource management system and support for various AI accelerators such as NVIDIA GPU, AMD ROCm, TPU, Graphcore IPU, Furiosa Warboy, Rebellions ATOM, Hyperaccel LPU, etc., Backend.AI now provides an integrated environment that can easily deploy services beyond simply training models. Try serving your desired AI model anytime with Backend.AI!

    11 July 2024

  • Backend.AI Open Source Contribution Guide (Jul. 2024)

    By Daehyun Sung

    Backend.AI's core engine utilizes many open-source software components and is itself being developed as open source. When enterprise customers encounter inconveniences or bugs while using Backend.AI, we provide issue tracking and support through customer support and technical support channels. However, those using the open-source version can also directly contribute to the project.

    There are mainly two ways to contribute: creating an issue that explains in detail what problem exists or what improvement ideas you have, and making a pull request to directly contribute code changes. In this post, we'd like to introduce a few things that are good to know in advance for more effective and faster communication with the development team during the contribution process.

    Introduction to GitHub Repositories

    As seen in the previous post Backend.AI Open Source Contribution Guide, Backend.AI was originally developed with repositories divided into the Backend.AI meta-repository and several sub-components.

    However, from version "22.06", Backend.AI has changed to a mono-repository using Pants.

    This transition in the development workflow has greatly helped in resolving package compatibility issues that often occur across multiple individual components, creating a more convenient development environment.

    Pants is a fast, scalable, and user-friendly build system.

    If you want to submit an issue, the first place to look is the Backend.AI repository. The repository named Backend.AI integrates multiple packages using Pants. This repository is not only for project management but also contains code that actually performs functions. All issues related to Backend.AI's server and Client SDK are managed here, and links to other projects are provided through the README.

    When creating a new issue, two default templates are provided: bug report and feature request. However, it's not strictly necessary to follow these templates. Considering the complexity of Backend.AI and its various usage environments, following these templates when writing content makes it easier to share context for problem identification.

    Introduction to Mono-repository

    From version "22.06", Backend.AI has changed to a mono-repository using Pants. A mono-repository is a project with an integrated code base that shares the basic dependencies, data models, features, tooling, and processes of multiple projects. It operates the repository by integrating multiple projects that were previously used into a single project.

    Introduction to Pants

    Backend.AI is installed using Pants as a build system. For more details about Pants, please check the following link Pants - Getting started.

    Relationship between Backend.AI Components

    Figure 1. Relationship structure between major Backend.AI components

    Figure 1 is a diagram showing the relationship between the major components of Backend.AI.

    Figure 2. Major component structure of Backend.AI and examples of execution methods

    Figure 2 is a diagram showing the major component structure of Backend.AI, and shows the location of the source code of the components and execution commands.

    Most of Backend.AI's components are managed in the Backend.AI repository, and the source code is located in the src/ai/backend/ subdirectory. Briefly, summarizing what each component does by directory:

    • src/ai/backend/manager (Manager): Core service that monitors computational resources of the entire cluster, handles session scheduling, provides user authentication and APIs for session execution
    • src/ai/backend/agent (Agent): Service installed on compute nodes to manage and control containers
    • src/ai/backend/common (Common): Library of functions and data formats commonly or frequently used across multiple server-side components
    • src/ai/backend/client (Client SDK for Python): Official command-line interface and library providing API wrapper functions and classes for Python
    • src/ai/backend/storage (Storage Proxy): Service that allows user web browsers or Client SDK to directly perform large-volume I/O from network storage
    • src/ai/backend/web (Web Server): HTTP service that provides routing for Web UI and SPA (single-page app) implementation and web session-based user authentication
    • src/ai/backend/webui (Web UI & Desktop App): Web component-based implementation of the actual UI that users interact with. Also supports Electron-based desktop app builds. Also includes a local lightweight version of the app proxy that allows users to directly access application ports running inside containers.

    Backend.AI Version Management Method

    Backend.AI has major releases every 6 months (March and September each year), with post-release support provided for about 1 year. Therefore, the version number follows the CalVer format in the form of YY.0M.micro (e.g., 20.09.14, 21.03.8). However, due to the version number normalization of the Python packaging system, the version of the wheel package is in the format YY.MM.micro without zero-padding in the month part (e.g., 20.9.14, 21.3.8). Some detailed components with version update cycles different from the main release cycle follow the general SemVer format.

    Essential Packages to Install Before Development

    Before installing Backend.AI, you need to install Docker, Docker Compose v2, etc. first. When installing Backend.AI using the scripts/install-dev.sh script in the repository, it checks for the installation of Docker, Docker Compose v2, etc., and guides you through the installation process. If Python, pyenv, Docker, npm are not installed, you need to install the essential packages as follows. For Python, please install it using the system package's Python3. Then, you need to install pyenv and pyenv-virtualenv.

    $ curl https://pyenv.run | bash
    

    Then, you can install Docker and Docker Compose v2 as follows:

    MacOS

    For MacOS, Docker Desktop on Mac automatically installs Docker and Docker Compose v2.

    Ubuntu, Debian, CentOS, Fedora Core, and other Linux environments

    For Ubuntu, Debian, CentOS, Fedora Core, you can automatically install Docker and Docker Compose v2 using the following script:

    $ sudo curl -fsSL https://get.docker.io | bash
    

    After installing Docker, if you get a unix:///var/run/docker.sock access permission error when running without sudo, like this:

    $ docker ps
    Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Get "http://%2Fvar%2Frun%2Fdocker.sock/v1.24/containers/json": dial unix /var/run/docker.sock: connect: permission denied
    

    If such a permission problem exists, set the permissions using the following command:

    $ sudo usermod -aG docker $(whoami)
    $ sudo chown root:docker /var/run/docker.sock
    

    After that, reboot and run docker run hello-world to confirm that it runs normally.

    $ docker run hello-world
    Unable to find image 'hello-world:latest' locally
    latest: Pulling from library/hello-world
    c1ec31eb5944: Pull complete
    Digest: sha256:94323f3e5e09a8b9515d74337010375a456c909543e1ff1538f5116d38ab3989
    Status: Downloaded newer image for hello-world:latest
    
    Hello from Docker!
    This message shows that your installation appears to be working correctly.
    
    To generate this message, Docker took the following steps:
    1. The Docker client contacted the Docker daemon.
    2. The Docker daemon pulled the "hello-world" image from the Docker Hub.
        (amd64)
    3. The Docker daemon created a new container from that image which runs the
        executable that produces the output you are currently reading.
    4. The Docker daemon streamed that output to the Docker client, which sent it
        to your terminal.
    
    To try something more ambitious, you can run an Ubuntu container with:
    $ docker run -it ubuntu bash
    
    Share images, automate workflows, and more with a free Docker ID:
    https://hub.docker.com/
    
    For more examples and ideas, visit:
    https://docs.docker.com/get-started/
    

    Instead of changing the group ownership of /var/run/docker.sock with chown, changing the permissions of the /var/run/docker.sock file to 666 allows other users in the group to access it without rebooting.

    sudo chmod 666 /var/run/docker.sock
    

    However, setting the permissions of the /var/run/docker.sock file to 666 creates a security vulnerability.

    You can check if Docker Compose v2 is installed as follows:

    $ sudo docker compose version
    Docker Compose version v2.28.1
    

    If nvm is not installed, you should install nvm as shown in the following link nvm - install & Update Script.

    $ curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash
    

    After installing nvm, install the latest LTS version of Node.js and set it up for use.

    $ nvm install --lts
    $ nvm use --lts
    

    How to Install the Development Environment

    To actually contribute code, you need to write a pull request, and unless it's a simple typo correction or documentation-related contribution, you need to directly modify the code and run it, so it's essential to set up your own development environment. Backend.AI has a structure where multiple components work together, so installation is not complete just by cloning one repository and creating a Python virtual environment with an editable install[1]. At a minimum, you need to set up and run manager, agent, storage-proxy, webserver, and wsproxy to check the functioning GUI, and for the CLI environment, you need to install the client SDK separately. Also, Redis, PostgreSQL, and etcd servers need to be run together for manager operation and communication with the agent.

    If you have installed the essential packages introduced earlier and want to install multiple components of Backend.AI, you can install them using the scripts/install-dev.sh script in the repository. This script does the following:

    • Checks for the installation of pyenv, Python, Docker, npm, etc., and guides the installation method
    • Installs all of these various components in their respective directories
      • At this time, components such as accelerator-cuda, which are necessary for the operation of other components, are additionally installed in an editable state.
    • Adds database/etcd fixtures including basic port settings and example authentication keys that each component can look at each other
    • Creates and runs PostgreSQL, Redis, etcd services using Docker Compose under the name "halfstack"

    When the install-dev script execution is successfully completed, it outputs commands to run service daemons such as manager and agent, and basic configured example account information. Following the instructions, use terminal multiplexers like tmux, screen, or multiple tab features of terminal apps to run service daemons in separate shells, and confirm that the hello world example works. Then you're ready to develop and test Backend.AI.

    Currently, this method only supports Intel (amd64/x86_64) and ARM-based macOS and Ubuntu/Debian/CentOS/Fedora and Linux environments where Docker Compose can be installed.

    Usually, when you first use this install-dev script, it often stops due to various errors or pre-check failures and needs to be run again. In this case, you can easily perform the deletion procedure using the scripts/delete-dev.sh script.

    Installing and Uninstalling Backend.AI

    Using these install-dev and delete-dev scripts, you can freely install and uninstall Backend.AI. First, clone the Backend.AI repository.

    $ git clone https://github.com/lablup/backend.ai 
    

    Then install Backend.AI.

    $ cd backend.ai
    $ ./scripts/install-dev.sh 
    

    After the installation is complete, please take note of the result content that appears on the screen.

    If you want to uninstall Backend.AI, run the scripts/delete-dev.sh script from the location where you cloned the Backend.AI repository.

    $ cd backend.ai
    $ ./scripts/delete-dev.sh 
    

    Things to Know Before Contributing

    As with most projects managed in distributed version control systems, to contribute to Backend.AI, code work should be based on the latest commit of the main branch of the original remote repository, and if conflicts occur, they should be resolved before requesting a review. If you've forked the original repository, the current forked original repository and the actual original repository need to be synchronized.

    Before explaining the method, please refer to the following terminology to help understanding:

    • Original remote repository (upstream): The original Backend.AI repository. All major commit contents are reflected here.
    • Forked original repository (origin): The Backend.AI repository copied to "your" account via GitHub. (Note: Original remote repository != Forked original repository)
    • Code copy (local working copy): The forked repository currently downloaded to your local machine

    Git command branch notation

    • main: The main branch of the current local working copy
    • origin/main: The main branch of the repository (origin) from which I cloned to create my local working copy
    • upstream/main: The main branch belonging to the separately added upstream remote repository

    Workflow concepts

    • At the time of forking, origin/main is created
    • When you clone the forked repository, main is created on your work computer
    • Create a new topic branch from main and proceed with work
    • When you upload this work branch to origin and create a PR, GitHub automatically points to the original repository of the fork
    • At this point, to synchronize changes to the main of the original repository during work, follow the procedure below

    The method of synchronization is as follows:

    • step1: Add the original remote repository as a name called upstream
    $ git remote add upstream https://github.com/lablup/backend.ai
    
    • step2: Fetch the latest commits of the main branch of the original remote repository to the code copy (local working copy)
    $ git fetch upstream
    
    • step3: Bring the latest commit reflection history of the main branch of the original remote repository to origin (the code copy (local working copy) of the original repository you forked)
    $ git switch main && git merge --ff upstream/main
    
    • step4: Reflect the changes in the code copy (local working copy) made in steps 1 ~ 3 to origin (the remote repository of the original repository you forked)
    $ git push origin main
    

    Now upstream/main and origin/main are synchronized through main.

    • step5: Reflect the latest updates to my branch that I'm working on
    $ git switch topic
    $ git merge main
    

    When performing this process, if a history branch is created between origin/main and upstream/main and step 5 is performed incorrectly, it can become extremely difficult to recover. Also, when the CI tools used by Backend.AI test PRs, they are set to find common ancestor commits to see the differences between upstream/main and origin/topic, but if you reuse the main name for the topic branch, these tools will not work properly. If possible, think of always giving a new name when creating a new branch.

    How to Write a Pull Request

    To send a specific bug patch or feature implementation as a PR, you first need to upload it to GitHub. There are several methods, but the following is recommended:

    • Fork the repository on the GitHub repository page. (If you have direct commit permissions, it's recommended to create a branch directly without forking.)
    • In your local working copy, use git remote to point to that forked repository.
      • Following convention, it's good to name Lablup's original repository as upstream and the newly created forked repository as origin.
      • If you installed with install-dev first instead of cloning after forking, the original repository will be origin, so you need to rename the remote.
    • Create a new branch.
      • For branch names, prepend fix/ for bug fixes or feature/ for feature additions or improvements, and summarize the topic in kebab-case. (e.g., feature/additional-cluster-env-vars, fix/memory-leak-in-stats) Other prefixes like docs/, refactor/ are also used.
      • It's possible to write a PR by directly modifying the main branch, but during PR review and modification periods, if additional changes occur on the main branch, you'll have to rebase or merge every time you synchronize with the upstream repository, which is more troublesome. Having a separate branch allows you to rebase and merge when you want.
    • Commit changes to that branch.
      • Commit messages should follow the conventional commit style as much as possible. Like branch names, use title prefixes such as fix:, feat:, refactor:, docs:, release:, and for Backend.AI specifically, setup: for dependency-related commits, repo: for cases like gitignore updates or repository directory structure changes. You can also indicate affected components in parentheses. (e.g., fix(scripts/install-dev): Update for v21.03 release)
      • Commit messages should be written in English.
    • Push the branch and write the PR.
      • For PRs with separate issues, you should write the issue number in the PR body. If you want to reference an issue in the repository, look at the number in the issue link like https://github.com/lablup/backend.ai/issues/401 and write it in the format #401, and GitHub will automatically link it.
      • There's no specific format required for the PR body, but it's good to write what problem it's solving, what principle it's written on, or what tools or libraries were used, and why those choices were made.
      • PR titles and bodies can be written in English or Korean.
      • When you create a PR, you'll see various automated inspection tools in action. In particular, you must sign (register your GitHub username) the CLA (contributor license agreement) for the review to proceed.
      • You must pass all basic coding style and coding rule checks for each language. (For Python code, flake8, mypy, etc.)
      • In repositories with a changes directory and towncrier check, when you create a PR and receive its number, create a file named changes/<PR number>.<modification type> and write a one-line English sentence summarizing the changes in Markdown syntax. (For relatively simple content or if there's a separate existing issue, this content can also serve as the PR body.) Modification types include fix, feature, breaking, misc, deprecation, doc, and parts that differ by project are defined in each repository's pyproject.toml. You can refer to files like CHANGELOG.md or CHANGES.md to see how existing messages were written.
    • Proceed with the review process.
      • When completed, the reviewer usually organizes the commit log in a squash-merge form to create a single commit for merging.
      • Therefore, don't feel burdened about making frequent small modification commits during the review process, and feel free to make commits whenever you think of something.

    It's even better to use tools like GitHub CLI, SourceTree, GitKraken along with git commands.

    Summary

    We've looked at Backend.AI's overall component structure and repository structure, how to install the development environment, and how to write pull requests. I hope this guide has helped you take one step closer to Backend.AI's source code.


    [1]: An "editable" installation refers to a method of installing a Python package to directly look at the source directory, allowing changes to be immediately reflected when importing the package just by modifying the source directory without editing inside the site-packages directory.

    10 July 2024

  • FastTrack Guide: Receiving Notifications for Model Training Results

    By Jeongseok Kang

    From the now-classic AlexNet to various large language models (LLMs) that are garnering a lot of attention these days, we train and evaluate various models to suit our needs. However, realistically, it's difficult for us to gauge when the training will end until we run the model multiple times and gain experience.

    Backend.AI's excellent scheduling minimizes GPU idle time and allows model training to run even while we sleep. Then, what if we could receive the results of a model that finished training while we were asleep? In this article, we'll cover how to receive model training results as messages using the new feature of FastTrack and Slack.

    This article is based on the Backend.AI FastTrack version 24.03.3.

    Before We Start

    This article does not cover how to create a Slack App and Bot. For detailed information, we recommend referring to the official documentation.

    Creating a Pipeline

    Let's create a pipeline for model training. A pipeline is a unit of work used in FastTrack. Each pipeline can be expressed as a collection of tasks, the minimum execution unit. Multiple tasks included in a single pipeline can have interdependencies, and they are executed sequentially according to these dependencies. Resource allocation can be set for each task, allowing flexible resource management.

    When an execution command is sent to a pipeline, it is executed by replicating the exact state at that point, and this unit is called a pipeline job. Multiple pipeline jobs can be run from a single pipeline, and each pipeline job is generated from a single pipeline.

    Create Pipeline button

    Click the Create Pipeline button ("+") at the top of the pipeline list.

    Creating a Pipeline

    You can specify the pipeline's name, description, location of the data store to use, environment variables to be applied commonly across the pipeline, and the method of pipeline initialization. Enter the name "slack-pipeline-0", and then click the "Create" button at the bottom to create the pipeline.

    Creating Tasks

    Dragging a Task

    You can see that the new pipeline has been created. Now let's add some tasks. From the task template list (Task templates) at the top, drag and drop the "Custom Task" block onto the workspace below.

    Entering the Task's Actions

    A task details window appears on the right where you can enter the task's specifics. You can give it a name like model-training-task to indicate its role, and set it to use the pytorch:1.11-py38-cuda11.3 image for model training. Since actual model training can take a long time, for this example, we'll have it execute the following simple commands:

    # Pause for 3 seconds to increase the execution time.
    sleep 3
    # Create a `result.txt` file in the pipeline-dedicated folder. Assume this is the accuracy of the trained model.
    echo "0.$RANDOM" > /pipeline/outputs/result.txt
    

    Creating a Task (1)

    Finally, enter the resource allocation for the task, and then click the "Save" button at the bottom to create the task.

    Dragging Another Task

    You can see that the model-training-task has been created in the workspace. This time, to create a task that reads the value from the result.txt file saved earlier and sends a Slack notification, drag another "Custom Task" block into the workspace below.

    Entering the Task-level Environment Variable `SLACK_TOKEN`

    For this task, set the name to slack-alarm-task, and enter the following script to send a notification to Slack:

    pip install slack-sdk
    python -c '
    import os
    from pathlib import Path
    from slack_sdk import WebClient
    SLACK_BOT_TOKEN = os.environ.get("SLACK_TOKEN")
    JOB_ID = os.environ.get("BACKENDAI_PIPELINE_JOB_ID")
    def main():
        result = Path("/pipeline/input1/result.txt").read_text()
        client = WebClient(token=SLACK_BOT_TOKEN)
        client.chat_postMessage(
            channel="#notification",
            text="Pipeline job({}) finished with accuracy {}".format(JOB_ID, result),
        )
    if __name__ == "__main__":
        main()
    '
    

    The code above uses two environment variables: SLACK_TOKEN and BACKENDAI_PIPELINE_JOB_ID. Environment variables in the BACKENDAI_* format are values automatically added by the Backend.AI and FastTrack systems, where BACKENDAI_PIPELINE_JOB_ID represents the unique identifier of the pipeline job in which each task is running.

    The other environment variable, SLACK_TOKEN, is a task-level environment variable. This feature allows you to manage and change various values without modifying the code.

    Creating a Task (2)

    After allocating appropriate resources for the slack-alarm-task, click the "Save" button at the bottom to create the task.

    Adding Task Dependencies

    Adding Task Dependencies

    Now there are two tasks (model-training-task and slack-alarm-task) in the workspace. Since slack-alarm-task should be executed after model-training-task completes, we need to add a dependency between the two tasks. Drag the mouse from the bottom of the task that should run first (model-training-task) to the top of the task that should run later (slack-alarm-task).

    Running the Pipeline

    Running the Pipeline (1)

    You can see an arrow connecting from model-training-task to slack-alarm-task, indicating that the dependency has been added. Now, to run the pipeline, click the "Run" button in the top right.

    Running the Pipeline (2)

    Before running the pipeline, you can review a brief summary of it. After confirming the presence of the two tasks, click the "Run" button at the bottom.

    Running the Pipeline (3)

    The pipeline was successfully run, and a pipeline job was created. Click "OK" at the bottom to view the pipeline job information.

    Pipeline Job

    The pipeline job was created successfully. You can see that the model training (model-training-task) has completed, and slack-alarm-task is running.

    Receiving Slack Notification

    Slack Notification (1)

    Slack Notification (2)

    You can see that the pipeline job execution results have been delivered to the user via Slack. Now we can sleep soundly.

    30 May 2024

  • Deconstructing a working Raft implementation - 2

    By Gyubong Lee

    In the previous post, we provided an overview centered around the types in raft-rs, and explored how leader elections occur during network failures, how log inconsistencies are resolved, and how a consistent state is maintained after overcoming failures, based on three scenarios.

    In this article, we will continue from the last post and examine the operation of the Raft implementation across several scenarios.

    The scenario we will look into involves the process by which the state of a Raft cluster is saved to stable storage, and how the cluster is rebooted and recovers the previous state from logs and snapshots.

    💡 Raftify is a high-level Raft implementation developed by Lablup. If you're curious about Raftify, check out this post.

    Exploring the raft-rs Architecture Centered Around Types

    In this article, before we proceed with the scenario analysis, let's first look at some of the types in raft-rs that will be featured in this discussion.

    ConfState

    The cluster is composed of multiple nodes, and each node is classified as either a voter or a learner depending on whether they participate in voting during a fault-induced election. Both voters and learners are members of the cluster and share the consensus, but learners do not participate in voting.

    Information about these cluster members is also included in the consensus among the members, and thus can be established or modified by applying log entries.

    💡 In raft-rs, the EntryType is divided into EntryConfChange for such ConfState configuration changes and EntryNormal for general state changes.

    Among the types used in raft-rs, those used in the network layer are defined in the eraftpb.proto file and are compiled into Rust code by tonic.

    message ConfState {
        repeated uint64 voters = 1;
        repeated uint64 learners = 2;
    
        // The voters in the outgoing config. If not empty the node is in joint consensus.
        repeated uint64 voters_outgoing = 3;
        // The nodes that will become learners when the outgoing config is removed.
        // These nodes are necessarily currently in nodes_joint (or they would have
        // been added to the incoming config right away).
        repeated uint64 learners_next = 4;
        // If set, the config is joint and Raft will automatically transition into
        // the final config (i.e. remove the outgoing config) when this is safe.
        bool auto_leave = 5;
    }
    

    voters_outgoing, learners_next, auto_leave are fields supporting joint consensus. In this article, we will omit the explanation of joint consensus.

    Snapshot, SnapshotMetadata

    To ensure system availability, logs cannot be indefinitely accumulated and must be deleted once they have been committed to the state machine.

    The process of removing logs up to a specific index in the log sequence is referred to as log compaction, and the recorded state up to that index after applying the log entries is called a snapshot.

    Snapshots, which are the main focus of this post, will be examined in detail in the scenario analysis below. They are used to transfer the state of the cluster to newly joined nodes or for recovery from failures.

    message Snapshot {
        bytes data = 1;
        SnapshotMetadata metadata = 2;
    }
    
    message SnapshotMetadata {
        // The current `ConfState`.
        ConfState conf_state = 1;
        // The applied index.
        uint64 index = 2;
        // The term of the applied index.
        uint64 term = 3;
    }
    

    SnapshotMetadata represents the metadata of a snapshot at the time it was created.

    Specifically, each field signifies the following:

    • conf_state: Indicates the cluster membership information at the time the snapshot was created.
    • index: Represents the index of the last log entry that was compacted when the snapshot was created.
    • term: Indicates the term value of the last log entry at the time the snapshot was created.

    Such metadata is essential for maintaining log consistency when utilizing snapshots.

    For instance, when restoring state information from a snapshot, if the term of the log entry at the snapshot's index does not match the term in the snapshot metadata, the snapshot application request must be ignored to maintain consistency.

    Scenario Analysis

    1 - Snapshot Recording

    In Raftify, snapshot creation is initiated by calling the make_snapshot() method of a RaftNode, passing the specific index and the term value of the log entry at that index as arguments.

    The data to be stored in the snapshot comes from the return of the self.fsm.snapshot() method, which represents the current state of the state machine.

    💡 The self.fsm.snapshot() method can be implemented differently depending on how the Finite State Machine (FSM) is to be stored, and it is one of the implementations that Raftify users must provide. For example, in the HashStore example, where the FSM is stored in memory, snapshot() simply serializes and returns a HashMap.

    Passing the index of the last log entry applied to the state machine, last_applied, to compact(), deletes log entries up to the given index.

    // lablup/raftify/blob/main/src/raft_node/mod.rs
    pub async fn make_snapshot(&mut self, index: u64, term: u64) -> Result<()> {
        ...
        let snapshot_data = self.fsm.snapshot().await?;
    
        let last_applied = self.raw_node.raft.raft_log.applied;
        let store = self.raw_node.mut_store();
        store.compact(last_applied)?;
        store.create_snapshot(snapshot_data, index, term)?;
        Ok(())
    }
    

    create_snapshot() records the snapshot metadata along with the snapshot data data received as an argument.

    // lablup/raftify/blob/main/src/heed_storage/mod.rs
    fn create_snapshot(&mut self, data: Vec<u8>, index: u64, term: u64) -> Result<()> {
        let store = self.wl();
        let mut writer = store.env.write_txn()?;
        let conf_state = store.conf_state(&writer)?;
    
        let mut snapshot = Snapshot::default();
        snapshot.set_data(data);
    
        let meta = snapshot.mut_metadata();
        meta.set_conf_state(conf_state);
        meta.index = index;
        meta.term = term;
    
        store.set_snapshot(&mut writer, &snapshot)?;
        writer.commit()?;
        Ok(())
    }
    

    2 - Transmitting Snapshots to Newly Joined Nodes

    Scenario

    When a new node joins the cluster, it must receive the state of the existing cluster to maintain consistency.

    However, replicating every log entry individually each time a new node joins is inefficient. Since all nodes share the same state machine, transmitting just the snapshot—the result of applied log entries—instead of every log entry can solve this issue. The type of message used to transmit snapshot data is MsgSnapshot.

    In this section, let's assume that Node 1 is the leader and Node 2 is the newly joined node. We will focus on the code and logs related to the MsgSnapshot message to understand what happens during this process.

    In Raftify, a newly joined follower does not send a separate snapshot request to the leader node.

    When a configuration change request (hereafter ConfChange) is committed, the leader tries to send this log entry to the newly joined node, which will reject this MsgAppend message because it does not have the required log entries.

    Do you remember the scenario from a previous part where MsgAppend messages were rejected due to a network failure, causing inconsistencies between nodes?

    In that scenario, inconsistencies were resolved by synchronizing mismatched log entries one by one using prepare_send_entries(). The difference when resolving log inconsistencies with a newly joined node is that instead of synchronizing log entries one by one, synchronization is done through a snapshot (prepare_send_snapshot()).

    Now, let’s delve into the code and log analysis to understand in detail how this scenario unfolds.

    Code Analysis

    Let's start by examining the part of the code where the MsgAppend message sent by the leader to the newly joined node is rejected.

    The maybe_send_append() function is as follows:

    In the code below, since the progress for the newly joined node is empty, the call to self.raft_log.term() fails, and prepare_send_snapshot() is triggered, causing maybe_send_wait() to return false (rejecting the MsgAppend message).

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                (_, Err(Error::Store(StorageError::LogTemporarilyUnavailable))) => {
                    // wait for storage to fetch entries asynchronously
                    return false;
                }
                _ => {
                    // 💡 In this scenario, the following branch is executed.
                    // send snapshot if we failed to get term or entries.
                    if !self.prepare_send_snapshot(&mut m, pr, to) {
                        return false;
                    }
                }
            }
        }
        self.send(m, msgs);
        true
    }
    

    The prepare_send_snapshot() called in this scenario is a function that retrieves snapshot data by invoking the self.raft_log.snapshot() method and sets this data in the message to be sent.

    Afterwards, it marks the progress object of the node as being in a snapshot state before returning.

    💡 The node's state being marked as snapshot indicates that the node is in the process of snapshot replication, which means that log replication to this node will be temporarily suspended.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
        ...
        m.set_msg_type(MessageType::MsgSnapshot);
        let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
        if let Err(ref e) = snapshot_r {
            if *e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
                self.logger.debug(
                    format!(
                        "failed to send snapshot to {} because snapshot is temporarily unavailable",
                        to
                    )
                    .as_str(),
                );
                return false;
            }
            self.logger
                .fatal(format!("unexpected error: {:?}", e).as_str());
        }
        let snapshot = snapshot_r.unwrap();
        if snapshot.get_metadata().index == 0 {
            self.logger.fatal("need non-empty snapshot");
        }
        let (sindex, sterm) = (snapshot.get_metadata().index, snapshot.get_metadata().term);
        m.set_snapshot(snapshot);
        self.logger.debug(format!(
            "[firstindex: {first_index}, commit: {committed}] sent snapshot[index: {snapshot_index}, term: {snapshot_term}] to {to}; progress: {progress}",
            first_index = self.raft_log.first_index(),
            committed = self.raft_log.committed,
            snapshot_index = sindex,
            snapshot_term = sterm,
            to = to,
            progress = format!("{:?}", pr)
        ).as_str());
    
        pr.become_snapshot(sindex);
        self.logger.debug(
            format!(
                "paused sending replication messages to {}; progress: {:?}",
                to, pr
            )
            .as_str(),
        );
        true
    }
    

    Thus, Raftify prepares the snapshot to be sent to the new node in advance via the RaftNode.make_snapshot() call when a ConfChange is committed, as we explored in scenario 1.

    The transmitted snapshot is detected and recovered by the Snapshot handling logic in the newly joined node's Raft loop. Through the logic below, the state machine is restored using the received snapshot data via self.fsm.restore(), and it is also applied to Stable storage via store.apply_snapshot().

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    async fn on_ready(&mut self) -> Result<()> {
        ...
        if *ready.snapshot() != Snapshot::default() {
            self.logger
                .info("Restoring state machine and snapshot metadata...");
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
        ...
    }
    

    Leader Node Log Analysis

    Now, let's analyze the logs generated on the leader node sequentially when a new node joins:

    1. Node 1 receives a join request from Node 2, and the cluster configuration is modified.
    Apr 11 06:51:14.189 INFO Node 2 (127.0.0.1:60062) joined the cluster as voter.
    Apr 11 06:51:14.189 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 11 06:51:14.189 DEBG Entries [9, 10) requested.
    
    1. Because a new log entry has been added to the leader, it sends a MsgAppend message to Node 2 to replicate this log entry.
    Apr 11 06:51:14.189 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 8, entries: [Entry { context: 7, data: ConfChangeV2 { transition: 0, changes: [ConfChangeSingle { change_type: AddNode, node_id: 2 }], context: [127.0.0.1:60062] }, entry_type: EntryConfChangeV2, index: 9, sync_log: false, term: 1 }], commit: 9, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. However, since the newly joined node does not possess the information of the existing cluster, this MsgAppend message is rejected, and Node 1 receives a message indicating that the request has been rejected as follows.
    Apr 11 06:51:14.298 DEBG >>> Node 1 received Raft message from the node 2, Message { msg_type: MsgAppendResponse, to: 1, from: 2, term: 1, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 11 06:51:14.298 DEBG received msgAppend rejection; reject_hint_index: 0, reject_hint_term: 0, from: 2, index: 8
    Apr 11 06:51:14.298 DEBG decreased progress of 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. As described earlier, since the progress of the newly joined node is empty, the snapshot is stored in Stable storage, and log entries up to the specified index are removed. In this case, log entries up to index 8 are removed, and the log entry index corresponding to Node 2's join request is 9. Therefore, a snapshot message is sent along with a log indicating that the first_index is 8 and the commit is 9.
    Apr 11 06:51:14.298 DEBG [firstindex: 8, commit: 9] sent snapshot[index: 9, term: 1] to 2; progress: Progress { matched: 0, next_idx: 1, state: Probe, paused: false, pending_snapshot: 0, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. Replication of log entries is paused to transmit the snapshot.
    Apr 11 06:51:14.299 DEBG paused sending replication messages to 2; progress: Progress { matched: 0, next_idx: 1, state: Snapshot, paused: false, pending_snapshot: 9, pending_request_snapshot: 0, recent_active: true, ins: Inflights { start: 0, count: 0, buffer: [], cap: 256, incoming_cap: None }, commit_group_id: 0, committed_index: 0 }
    
    1. A MsgSnapshot type message is sent to transmit the snapshot. The snapshot contains the data {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"} that was previously arbitrarily inserted.
    Apr 11 06:51:14.299 DEBG <<< Sending from 1 to 2, msg: Message { msg_type: MsgSnapshot, to: 2, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {4: "A", 3: "A", 2: "A", 1: "A", 5: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    

    Follower Node Log Analysis

    Analyzing the logs generated on the newly joined follower node, we find:

    1. Becomes a new follower node at term 1.
    Apr 15 06:37:27.421 INFO became follower at term 1
    
    1. Rejects the MsgAppend message received from the leader node.
    Apr 15 06:37:27.421 DEBG rejected msgApp [logterm: 1, index: 8] from 1; index: 8, logterm: Ok(0)
    Apr 15 06:37:27.421 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 8, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: true, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. Since the node is detected as being in a failed state and unnecessary voting should be avoided, it should respond correctly to MsgHeartbeat messages.
    Apr 15 06:37:27.423 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgHeartbeat, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.423 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeatResponse, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    
    1. Receives the snapshot through a MsgSnapshot message.
    Apr 15 06:37:27.424 DEBG >>> Node 2 received Raft message from the node 1, Message { msg_type: MsgSnapshot, to: 2, from: 1, term: 1, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: HashStore(RwLock { data: {3: "A", 5: "A", 2: "A", 4: "A", 1: "A"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [1, 2], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 9, term: 1 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.424 INFO log [committed=0, persisted=0, applied=0, unstable.offset=1, unstable.entries.len()=0] starts to restore snapshot [index: 9, term: 1]
    Apr 15 06:37:27.424 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    
    1. Restores the state using the received snapshot.
    Apr 15 06:37:27.424 INFO restored snapshot; commit: 9, last_index: 9, last_term: 1, snapshot_index: 9, snapshot_term: 1
    Apr 15 06:37:27.424 INFO [commit: 9, term: 1] restored snapshot [index: 9, term: 1]
    Apr 15 06:37:27.425 DEBG <<< Sending from 2 to 1, msg: Message { msg_type: MsgAppendResponse, to: 1, from: 0, term: 0, log_term: 0, index: 9, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: [], metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: [], deprecated_priority: 0, priority: 0 }
    Apr 15 06:37:27.425 INFO Restoring state machine and snapshot metadata...
    Apr 15 06:37:27.425 DEBG snapshot's persisted index  9
    

    3 - Recovery in Case of Failure in the Majority of Nodes

    When a specific node fails, it is not a problem because the node can simply replicate new log entries from the leader node after the network is restored. Even if a node needs to join anew, it can recover its state through snapshots as discussed in scenario 2.

    However, if failures occur in a majority of nodes, the cluster cannot recover on its own.

    In this case, the administrator must intervene manually to decide which node's log sequence to consider as the correct state, and then re-bootstrap the cluster from that log sequence.

    Depending on the administrator's judgment, recovery can be done by directly applying all log entries one by one to the state machine or by restoring the state from the last created snapshot.

    State Recovery from WAL Snapshots

    In this section, we will use the Raftify example code.

    To recreate the example, we will first insert a few key-value pairs into Node 1, then call the make_snapshot() method via the /snapshot API to create a snapshot. We will then assume that the node has failed and shut it down.

    To recover from a WAL snapshot, pass the node_id of the node to be recovered to the restore_wal_snapshot_from option. Here, we will recover from Node 1's snapshot, so we will use 1.

    To verify whether the log entries have been applied, we will log "Inserted: (key, value)" each time apply() is called.

    💡 apply() is also an abstract method of the StateMachine that Raftify users need to define, similar to restore(). It is called when log entries are committed.

    After taking the snapshot and shutting down Node 1, we can use the CLI commands provided by Raftify to dump the storage, as shown below.

    From the logs below, we can see that the snapshot is stored in the storage and contains data such as { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}.

    ❯ raftify-cli debug persisted-all ./logs
    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {2: \"A\", 5: \"A\", 3: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    Then, let's re-bootstrap Node 1 using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-snapshot-from=1.

    The logs generated on Node 1 will be as follows. Since the state is restored directly from the snapshot, apply() has not been executed for any of the log entries.

    Apr 15 07:54:44.703 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }, 2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: None, restore_wal_snapshot_from: Some(1), }
    Apr 15 07:54:44.705 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:54:44.705 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:54:44.705 INFO became follower at term 3
    Apr 15 07:54:44.705 INFO newRaft; term: 3, commit: 0, applied: 0, last index: 0, last term: 0, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:54:44.705 INFO RawNode created with id 1.
    Apr 15 07:54:44.748 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    

    And then let's dump the storage again.

    Since the recovery is from its own snapshot, we can confirm that no state changes have occurred.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 1, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: HashStore(RwLock { data: {3: \"A\", 2: \"A\", 5: \"A\", 4: \"A\", 1: \"A\"}, poisoned: false, .. }), metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 8, term: 2 }) }"
    Last index: 8
    

    State Recovery from WAL Logs

    This time, let's recover the state from a specific log sequence.

    In this case, as shown in the logs below, the storage has an empty snapshot but contains log entries necessary for state recovery.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    Let's assume a failure and shut down Node 1, then see what happens when we re-bootstrap it.

    After shutting down Node 1, re-bootstrap it using the command ./target/debug/memstore-static-members --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001 --restore-wal-from=1.

    The following logs will be printed on Node 1, showing that the previously entered log entries are applied at once via apply(), restoring the previous state.

    Apr 15 07:46:50.710 INFO RaftNode bootstrapped. Config { raft_config: { id: 0, election_tick: 10, heartbeat_tick: 3, applied: 0, max_size_per_msg: 0, max_inflight_msgs: 256, check_quorum: false, pre_vote: false, min_election_tick: 0, max_election_tick: 0, read_only_option: Safe, skip_bcast_commit: false, batch_append: false, priority: 0, max_uncommitted_size: 18446744073709551615, max_committed_size_per_ready: 18446744073709551615, }, log_dir: ./logs, save_compacted_logs: true, compacted_log_dir: ./logs, compacted_log_size_threshold: 1073741824, snapshot_interval: None, tick_interval: 0.1, initial_peers: Some(Peers { inner: {2: Peer { addr: 127.0.0.1:60062, role: Voter, client: None }, 1: Peer { addr: 127.0.0.1:60061, role: Voter, client: None }} }), lmdb_map_size: 1073741824, cluster_id: default, conf_change_request_timeout: 2, restore_wal_from: Some(1), restore_wal_snapshot_from: None, }
    Apr 15 07:46:50.712 INFO switched to configuration; config: Configuration { voters: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }, learners: {}, learners_next: {}, auto_leave: false }
    Apr 15 07:46:50.712 DEBG reset election timeout 0 -> 10 at 0
    Apr 15 07:46:50.712 INFO became follower at term 1
    Apr 15 07:46:50.712 INFO newRaft; term: 1, commit: 8, applied: 0, last index: 8, last term: 1, peers: Configuration { incoming: Configuration { voters: {1, 2} }, outgoing: Configuration { voters: {} } }
    Apr 15 07:46:50.712 INFO RawNode created with id 1.
    Apr 15 07:46:50.753 DEBG RaftServer starts to listen gRPC requests on "127.0.0.1:60061"...
    Apr 15 07:46:50.855 DEBG Entries [1, 9) requested.
    
    // Applies log entries one by one to restore the state machine state.
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (1, A)
    Inserted: (2, A)
    Inserted: (3, A)
    Inserted: (4, A)
    Inserted: (5, A)
    

    Similarly, since the state is recovered to its pre-crash state, dumping the storage will show that it is the same as before. The difference is that, compared to the previous recovery via snapshot, all log entries were applied one by one this time.

    *----- node-1 -----*
    ---- Persisted entries ----
    Key: 1, "Entry { context: [], data: [], entry_type: EntryNormal, index: 1, sync_log: false, term: 2 }"
    Key: 2, "Entry { context: 0, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 2, sync_log: false, term: 2 }"
    Key: 3, "Entry { context: 1, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 3, sync_log: false, term: 2 }"
    Key: 4, "Entry { context: 2, data: Insert { key: 1, value: \"A\" }, entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }"
    Key: 5, "Entry { context: 3, data: Insert { key: 2, value: \"A\" }, entry_type: EntryNormal, index: 5, sync_log: false, term: 2 }"
    Key: 6, "Entry { context: 4, data: Insert { key: 3, value: \"A\" }, entry_type: EntryNormal, index: 6, sync_log: false, term: 2 }"
    Key: 7, "Entry { context: 5, data: Insert { key: 4, value: \"A\" }, entry_type: EntryNormal, index: 7, sync_log: false, term: 2 }"
    Key: 8, "Entry { context: 6, data: Insert { key: 5, value: \"A\" }, entry_type: EntryNormal, index: 8, sync_log: false, term: 2 }"
    
    ---- Metadata ----
    HardState { term: 2, vote: 1, commit: 8 }
    ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }
    "Snapshot { data: [], metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [2, 1], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }"
    Last index: 8
    

    Conclusion

    In this article, following up from the previous one, we explored the issues of resolving log inconsistencies and recovery scenarios with snapshots, especially when there is a newly joined node.

    Raftify is participating as an interactive project in the 2024 Open Source Contribution Academy and is recruiting mentees interested in implementing distributed systems! (Recruitment period: ~ 06.23)

    Participants will have the opportunity to experience everything from learning the basic concepts of distributed systems to the actual implementation process, alongside mentors.

    We look forward to your interest! Thank you 😊

    29 May 2024

  • Deconstructing a working Raft implementation - 1

    By Gyubong Lee

    In this article, we'll assume that the reader has a theoretical background in Raft, and we'll dig into the tikv/raft-rs code to see how state machines in distributed systems actually synchronize and behave in a few brief scenarios.

    While this article focuses on analyzing the RAFT-RS code, we will use the RAFTIFY source code as an example in some sections for a complete understanding, as the RAFT-RS implementation does not include network and storage layers for flexibility.

    💡 raftify is a high-level Raft implementation developed by Lablup. In this post, I'm only going to describe raftify with minimal code to understand how raft works. If you're curious about raftify, check out this post.

    Img from: https://github.com/tikv/raft-rs

    RAFT-RS architecture with a focus on ## types

    Before we dive into the scenarios, let's take a quick look at the architecture, focusing on the typical types used in the code base.

    Raft

    The Raft object of each Raft node holds a message queue msgs in memory and interacts with other Raft nodes through this queue.

    In a high-level implementation like raftify, the network layer is responsible for putting messages into this queue through an abstraction layer that will be described later.

    This message queue can therefore be seen as an endpoint for communication, and the Raft implementation will process these messages according to its current state, maintaining a consistent state between nodes.

    The RaftCore type holds the data corresponding to the state of this Raft node.

    There is also a type called Progress that holds metadata for synchronizing log entries with other Raft nodes, and these are updated appropriately in the ProgressTracker depending on the situation.

    As a result, Raft has the following types

    // tikv/raft-rs/blob/master/src/raft.rs
    pub struct Raft<T: Storage> {
        pub msgs: Vec<Message>,
        pub r: RaftCore<T>,
        prs: ProgressTracker,
    }
    

    RaftLog

    Representative of the data that RaftCore has is RaftLog, which abstracts access to a sequence of log entries.

    RaftLog<T: Storage> abstracts the types Unstable and T so that they can be handled together. Here, T corresponds to persistent storage that needs to be implemented at a higher level, such as raftify, and Unstable is a buffer that goes through before being written to this storage.

    // tikv/raft-rs/blob/master/src/raft_log.rs
    pub struct RaftLog<T: Storage> {
        pub store: T,
        pub unstable: Unstable,
    
        ...
    }
    

    💡 If you're interested in learning more about the RaftCore type, check out this link.

    Raft Loop

    Raft implementations perform an iterative process of updating their state machine in an infinite loop in order to communicate with other Raft nodes and maintain a consistent state. In this article, we'll call this loop a Raft loop.

    The source code for implementing a Raft loop in raftify is shown below.

    (You can also see the example code in tikv/raft-rs if you want to see the most minimal implementation).

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    async fn on_ready(&mut self) -> Result<()> {
        if !self.raw_node.has_ready() {
            return Ok(());
        }
        let mut ready = self.raw_node.ready();
    
        if !ready.messages().is_empty() {
            self.send_messages(ready.take_messages()).await;
        }
    
        if *ready.snapshot() != Snapshot::default() {
            slog::info!(
                self.logger,
                "Restoring state machine and snapshot metadata..."
            );
            let snapshot = ready.snapshot();
            if !snapshot.get_data().is_empty() {
                self.fsm.restore(snapshot.get_data().to_vec()).await?;
            }
            let store = self.raw_node.mut_store();
            store.apply_snapshot(snapshot.clone())?;
        }
    
        self.handle_committed_entries(ready.take_committed_entries())
            .await?;
    
        if !ready.entries().is_empty() {
            let entries = &ready.entries()[..];
            let store = self.raw_node.mut_store();
            store.append(entries)?;
        }
    
        if let Some(hs) = ready.hs() {
            let store = self.raw_node.mut_store();
            store.set_hard_state(hs)?;
        }
    
        if !ready.persisted_messages().is_empty() {
            self.send_messages(ready.take_persisted_messages()).await;
        }
    
        let mut light_rd = self.raw_node.advance(ready);
    
        if let Some(commit) = light_rd.commit_index() {
            let store = self.raw_node.mut_store();
            store.set_hard_state_commit(commit)?;
        }
    
        if !light_rd.messages().is_empty() {
            self.send_messages(light_rd.take_messages()).await;
        }
    
        self.handle_committed_entries(light_rd.take_committed_entries())
            .await?;
    
        self.raw_node.advance_apply();
    
        Ok(())
    }
    

    RawNode

    Each Raft node has a higher-level instance of a type called RawNode that contains the Raft module. A RawNode has a records field that represents the metadata of SoftState, a state that is kept only in memory, HardState, a state that is stored in persistent storage, and Ready, which is not yet stored.

    💡 Ready is the data structure that is passed to the Raft node when it needs to be updated.

    // tikv/raft-rs/blob/master/src/raw_node.rs
    pub struct RawNode<T: Storage> {
        pub raft: Raft<T>,
        prev_ss: SoftState,
        prev_hs: HardState,
        max_number: u64,
        records: VecDeque<ReadyRecord>,
        commit_since_index: u64,
    }
    

    In the first part of the Raft loop, when the ready method is called, the metadata from Ready is stored in records, and after all the snapshots, entries, etc. that need to be stored are processed, the last part of the loop, advance, calls commit_ready and updates the offset of the buffer Unstable.

    RaftNode

    A RaftNode is a type that raftify abstracts a RawNode at a higher level, integrating it with the network and storage layers.

    In a separate asynchronous task, raftify receives messages sent by the gRPC client and passes them over the channel to the RaftNode.run task.

    After processing the messages, it handles state changes in a function (Raft loop) named on_ready.

    // lablup/raftify/blob/main/raftify/src/raft_node/mod.rs
    pub async fn run(mut self) -> Result<()> {
        let mut tick_timer = Duration::from_secs_f32(self.config.tick_interval);
        let fixed_tick_timer = tick_timer;
        let mut now = Instant::now();
    
        loop {
            ...
            tokio::select! {
                msg = timeout(fixed_tick_timer, self.server_rcv.recv()) => {
                    if let Ok(Some(msg)) = msg {
                        self.handle_server_request_msg(msg).await?;
                    }
                }
                ...
            }
    
            let elapsed = now.elapsed();
            now = Instant::now();
            if elapsed > tick_timer {
                tick_timer = Duration::from_millis(100);
                self.raw_node.tick();
            } else {
                tick_timer -= elapsed;
            }
    
            self.on_ready().await?
        }
    }
    

    To explain raftify's implementation in more detail, raftify iterates through the following process

    1. generate a request from the client (e.g. call RaftServiceClient.propose or RaftNode.propose)
    2. RaftServiceClient.propose on the remote Raft node is called via gRPC.
    3. RaftServiceClient.proposepasses theProposemessage over the channel to theRaftNode.run` coroutine. 4.
    4. RaftNode.run polls the message queue and calls RawNode::propose when it receives a Propose message.
    5. when there are changes to the state machine that need to be applied, a Ready instance is created and passed to the on_ready handler.
    6. when entries are committed, the on_ready handler processes the committed entries and responds to the client.

    With the theoretical stuff out of the way, let's analyze a few scenarios and see what happens.

    💡 What we arbitrarily call Propose messages in this paragraph is a type of message defined for the purpose of proposing a state change to the cluster.

    Scenario analysis.

    1 - Add a new log entry

    What happens under the hood when you request (propose) a change to the cluster to alter its state machine? In this section, we'll break down what happens when you call RawNode.propose. Here's a look at the RawNode.propose function

    // tikv/raft-rs/blob/master/src/raw_node.rs
    pub fn propose(&mut self, context: Vec<u8>, data: Vec<u8>) -> Result<()> {
        let mut m = Message::default();
        m.set_msg_type(MessageType::MsgPropose);
        m.from = self.raft.id;
        let mut e = Entry::default();
        e.data = data.into();
        e.context = context.into();
        m.set_entries(vec![e].into());
        self.raft.step(m)
    }
    

    From the code above, you can see that the propose function calls step to make it handle a message of type MsgPropose.

    Here, step is the function that corresponds to the actual message handler in raft-rs. If the node calling step is the leader, step_leader is called, if it is a follower, step_follower is called, and if it is a candidate, step_candidate is called.

    The code for step is quite complex, but let's follow the code to see how the MsgPropose type is handled on the leader node.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_leader(&mut self, mut m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgPropose => {
                ...
                if !self.append_entry(m.mut_entries()) {
                    ...
                }
                self.bcast_append();
                return Ok(());
            }
        ...
        }
    }
    

    Raft.append_entrycallsRaftLog.append to add an entry. RaftLog.append appends the entries added to the Unstable buffer by self.unstable.truncate_and_append.

    // tikv/raft-rs/blob/master/src/raft_log.rs
    pub fn append(&mut self, ents: &[Entry]) -> u64 {
        ...
        self.unstable.truncate_and_append(ents);
        self.last_index()
    }
    

    As previously described, the entries added to the buffer will be persisted in a Raft loop, and updating the state machine via an advance-like function will automatically update the offset and clear the buffer.

    Let's take a look at the next call, bcast_append.

    You can see that we're calling core.send_append with each follower's progress as an argument, using the ProgressTracker (prs) described in the previous section to synchronize the log entries of the leader and followers.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn bcast_append(&mut self) {
        let self_id = self.id;
        let core = &mut self.r;
        let msgs = &mut self.msgs;
        self.prs
            .iter_mut()
            .filter(|&(id, _)| *id != self_id)
            .for_each(|(id, pr)| core.send_append(*id, pr, msgs));
    }
    

    The send_append has the following simple structure

    // tikv/raft-rs/blob/master/src/raft.rs
    fn send_append(&mut self, to: u64, pr: &mut Progress, msgs: &mut Vec<Message>) {
        self.maybe_send_append(to, pr, true, msgs);
    }
    

    The maybe_send_append reads the log entries in the range pr.next_idx to to via RaftLog.entries and passes them to prepare_send_entries.

    (As you can infer from the maybe_ prefix to its name, the function returns true on success and false on failure.)

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let ents = self.raft_log.entries(
            pr.next_idx,
            self.max_msg_size,
            GetEntriesContext(GetEntriesFor::SendAppend {
                to,
                term: self.term,
                aggressively: !allow_empty,
            }),
        );
        ...
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        ...
        self.send(m, msgs);
        true
    }
    

    Prepare_send_entriescreates a message object m of typeMsgAppendand puts the entries into the message. It then updatesprogress` and returns it.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn prepare_send_entries(
        &mut self,
        m: &mut Message,
        pr: &mut Progress,
        term: u64,
        ents: Vec<Entry>,
    ) {
        m.set_msg_type(MessageType::MsgAppend);
        m.index = pr.next_idx - 1;
        m.log_term = term;
        m.set_entries(ents.into());
        m.commit = self.raft_log.committed;
        if !m.entries.is_empty() {
            let last = m.entries.last().unwrap().index;
            pr.update_state(last);
        }
    }
    

    Then self.send(m, msgs) puts this prepared message into the msgs message queue.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn send(&mut self, mut m: Message, msgs: &mut Vec<Message>) {
        ...
        msgs.push(m);
    }
    

    The MsgAppend message that enters the message queue will be sent to the follower node from send_messages through the network layer. Therefore, we need to see how the follower node handles the MsgAppend message after receiving it.

    Next, let's take a look at what happens on the follower node To find out what happens when a follower node receives an MsgAppend message, we can look at step_follower.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_follower(&mut self, mut m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgAppend => {
                self.election_elapsed = 0;
                self.leader_id = m.from;
                self.handle_append_entries(&m);
            }
            ...
        }
    }
    

    From the code above, you can see that the follower node that received the MsgAppend message is calling handle_append_entries.

    This function creates a to_send, a message of type MsgAppendResponse, and calls RaftLog.maybe_append, as shown below.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
    
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
            // MsgAppend 메시지를 수신
        } else {
            ...
            // MsgAppend 메시지를 거절
        }
        ...
        self.r.send(to_send, &mut self.msgs);
    }
    

    This function calls match_term to check if the message's logTerm and the log entry's term values are the same, calls find_conflict to check for conflicts in the log entry sequence, and calls Raft.append if it determines there are no problems.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn maybe_append(
        &mut self,
        idx: u64,
        term: u64,
        committed: u64,
        ents: &[Entry],
    ) -> Option<(u64, u64)> {
        if self.match_term(idx, term) {
            let conflict_idx = self.find_conflict(ents);
            if conflict_idx == 0 {
            } else if conflict_idx <= self.committed {
                fatal!(
                    self.unstable.logger,
                    "entry {} conflict with committed entry {}",
                    conflict_idx,
                    self.committed
                )
            } else {
                let start = (conflict_idx - (idx + 1)) as usize;
                self.append(&ents[start..]);
    
                if self.persisted > conflict_idx - 1 {
                    self.persisted = conflict_idx - 1;
                }
            }
            let last_new_index = idx + ents.len() as u64;
            self.commit_to(cmp::min(committed, last_new_index));
            return Some((conflict_idx, last_new_index));
        }
        None
    }
    

    We've seen this function before. It was the last function called before the call to RaftLog.append when a log entry was proposed by the leader node.

    As before, Raft.append_entry calls RaftLog.append to add the entry. RaftLog.appendappends the entries added to the Unstable buffer fromself.unstable.truncate_and_append`.

    This outlines a scenario where logs added to the leader are persisted on the leader node and copied to the follower nodes.

    2 - Leader and follower node log sequence mismatch

    In scenario 1, we looked at the code assuming a normal situation, but in reality, issues such as network disconnection can cause mismatches between leader and follower nodes. Let's take another look at the code, this time focusing on how to detect and resolve mismatches between leader and follower nodes.

    Let's say you have a cluster of three nodes that is processing thousands of requests that are successively changing the state machine, and then a network failure occurs.

    In the event of a failure, we should start by looking at the logs written to the nodes, persisted log entries, and debugging information to get some context, but to avoid making this post too long, we'll just pick out the logs that will give us a general idea of what's happening on the nodes and analyze them.

    First of all, node 3 is leaving a rejected msgApp... log indicating that it has rejected a message.

    Nov 28 05:30:59.233 DEBG rejected msgApp [logterm: 7, index: 3641] from 2, logterm: Ok(0), index: 3641, from: 2, msg_index: 3641, msg_log_term: 7
    

    From the log above, we can see that node 3 is a follower node, node 2 is the newly elected leader node after the failure, and that the MsgAppend message trying to replicate the 3641th entry was rejected.

    If we look up what function this log is output from, we can see that it is called from handle_append_entries, which we saw in Scenario 1 (the function that handles the MsgAppend messages that the follower receives from the leader).

    pub fn handle_append_entries(&mut self, m: &Message) {
        ...
        let mut to_send = Message::default();
        to_send.to = m.from;
        to_send.set_msg_type(MessageType::MsgAppendResponse);
        ...
        if let Some((_, last_idx)) = self
            .raft_log
            .maybe_append(m.index, m.log_term, m.commit, &m.entries)
        {
            ...
        } else {
            debug!(
                self.logger,
                "rejected msgApp [logterm: {msg_log_term}, index: {msg_index}] \
                from {from}",
                msg_log_term = m.log_term,
                msg_index = m.index,
                from = m.from;
                "index" => m.index,
                "logterm" => ?self.raft_log.term(m.index),
            );
    
            let hint_index = cmp::min(m.index, self.raft_log.last_index());
            let (hint_index, hint_term) =
                self.raft_log.find_conflict_by_term(hint_index, m.log_term);
    
            if hint_term.is_none() {
                fatal!(
                    self.logger,
                    "term({index}) must be valid",
                    index = hint_index
                )
            }
    
            to_send.index = m.index;
            to_send.reject = true;
            to_send.reject_hint = hint_index;
            to_send.log_term = hint_term.unwrap();
        }
    
        to_send.set_commit(self.raft_log.committed);
        self.r.send(to_send, &mut self.msgs);
    }
    

    If you look at the function, you can see that this log was output, which means that maybe_append returned None, which means that match_term returned False. This means that there is a mismatch between the logTerm in the message and the value of term in entry 3641.

    So we find the point of conflict via term (find_conflict_by_term) and put the point of conflict (hint_index) into the reject_hint of the message and send it back to the reader in the form of an MsgAppendResponse message.

    So what does the leader do with this rejected MsgAppendResponse message?

    The leader node that rejected the message will leave a log that the MsgAppend was rejected, as shown below.

    Nov 28 05:30:59.279 DEBG received msgAppend rejection, index: 3641, from: 3, reject_hint_term: 7, reject_hint_index: 3611
    

    So the next thing we need to look at is the function that receives this rejected MsgAppend message and outputs "received msgAppend rejection".

    This function is called handle_append_response, and while the function itself is quite long, it's not that long when you cut it down to just what happens when an MsgAppend is rejected.

    fn handle_append_response(&mut self, m: &Message) {
        let mut next_probe_index: u64 = m.reject_hint;
        ...
        if m.reject {
            debug!(
                self.r.logger,
                "received msgAppend rejection";
                "reject_hint_index" => m.reject_hint,
                "reject_hint_term" => m.log_term,
                "from" => m.from,
                "index" => m.index,
            );
    
            if pr.maybe_decr_to(m.index, next_probe_index, m.request_snapshot) {
                debug!(
                    self.r.logger,
                    "decreased progress of {}",
                    m.from;
                    "progress" => ?pr,
                );
                if pr.state == ProgressState::Replicate {
                    pr.become_probe();
                }
    
                self.send_append(m.from);
            }
            return;
        }
        ...
    }
    

    Take the reject_hint from the message and make it the next_probe_index, and call Progress.maybe_decr_to to decrement the progress. Indicate that Progress is in the probe state, and call send_append to send another MsgAppend message.

    💡 ProgressState is an enum that represents the synchronization progress of each node. Under normal circumstances, it is "Replicate" if the node is replicating logs, "Probe" if the follower node does not know the last index that was replicated, and "Snapshot" if the node is in a probing state and is replicating logs to the follower by sending snapshots.

    To summarize, to find the index (next_probe_index) of the log entry before the collision, we decrement the node's progress and send another MsgAppend message. This process is repeated until we find the Common log prefix of the leader and follower nodes.

    Once the Common log prefix is found, log entries after that index are replicated in a unidirectional fashion from the leader to the follower and overwritten. This process can be seen in the maybe_send_append function.

    The log entries obtained through RaftLog.entries are replicated into the SendAppend context as shown below. Here, max_msg_size is max_size_per_msg from Config, which defaults to 0. With RaftLog.entries, the max_size of the LMDBStorage.entries (persistent storage type, corresponding to T in RaftLog) argument is given 0, which, based on this comment, means that if you don't set it, it will synchronize log entries one by one when there is a mismatch in the logs of the leader and follower nodes.

    After that, prepare_send_entries is used to prepare the MsgAppend message as described in the previous section, and Raft.send is used to replicate the entries to the follower node.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn maybe_send_append(
        &mut self,
        to: u64,
        pr: &mut Progress,
        allow_empty: bool,
        msgs: &mut Vec<Message>,
    ) -> bool {
        ...
        let mut m = Message::default();
        m.to = to;
        if pr.pending_request_snapshot != INVALID_INDEX {
            ...
        } else {
            let ents = self.raft_log.entries(
                pr.next_idx,
                self.max_msg_size,
                GetEntriesContext(GetEntriesFor::SendAppend {
                    to,
                    term: self.term,
                    aggressively: !allow_empty,
                }),
            );
            ...
            let term = self.raft_log.term(pr.next_idx - 1);
            match (term, ents) {
                (Ok(term), Ok(mut ents)) => {
                    if self.batch_append && self.try_batching(to, msgs, pr, &mut ents) {
                        return true;
                    }
                    self.prepare_send_entries(&mut m, pr, term, ents)
                }
                ...
            }
        }
        self.send(m, msgs);
        true
    }
    

    There are a lot of logs missing in the middle, but you can see that after the synchronization between the leader and the follower has occurred through the above process from the *3612th entry to the *3642nd entry, the follower's progress state changes to Replicate and it starts sending and receiving Heartbeat messages normally.

    2023-11-28 14:30:59,269 - INFO     - Entries [3612, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3611, entries: [Entry { context: "1810", data: "{'key': '2292', 'value': '1'}", entry_type: EntryNormal, index: 3612, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3613, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3612, entries: [Entry { context: "1811", data: "{'key': '2294', 'value': '1'}", entry_type: EntryNormal, index: 3613, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3614, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3613, entries: [Entry { context: "1812", data: "{'key': '2295', 'value': '1'}", entry_type: EntryNormal, index: 3614, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,259 - INFO     - Entries [3615, 3643) requested
    Nov 28 05:30:59.269 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3614, entries: [Entry { context: "1813", data: "{'key': '2296', 'value': '1'}", entry_type: EntryNormal, index: 3615, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    ...
    
    2023-11-28 14:30:59,284 - INFO     - Entries [3641, 3643) requested
    Nov 28 05:30:59.283 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3640, entries: [Entry { context: "1839", data: "{'key': '2457', 'value': '1'}", entry_type: EntryNormal, index: 3641, sync_log: false, term: 7 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:30:59,284 - INFO     - Entries [3642, 3643) requested
    Nov 28 05:30:59.284 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgAppend, to: 3, from: 0, term: 0, log_term: 7, index: 3641, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 3642, sync_log: false, term: 12 }], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 1, msg: Message { msg_type: MsgHeartbeat, to: 1, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 2
    Nov 28 05:31:01.635 DEBG Sending from 2 to 3, msg: Message { msg_type: MsgHeartbeat, to: 3, from: 0, term: 0, log_term: 0, index: 0, entries: [], commit: 3642, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 3, from: 2
    2023-11-28 14:31:01,637
    

    3 - Electing a leader

    In Scenario 2, we could tell from the increase in the term value that the leader election was caused by a network failure, but in this scenario we'll take a closer look at the leader election process.

    To see what logs would be taken if the leader failed, we'll simply create a cluster of 3 nodes, force the leader process to shut down, and look at the logs of the process that is newly elected leader.

    To summarize the logs, after the leader node shuts down, node 3 starts the election and transitions to the Candidate state and sends a MsgRestVote message to the other voters. The process can be summarized as: you receive a MsgRequestVoteResponse message from node 2, you are elected as the new leader because you received a majority of the votes for yourself, you increase the term value to 2, and you send a special kind of message (an empty MsgAppend) to announce that you are the elected leader.

    💡 A follower node that has not received a heartbeat message by election_tick will start voting. In this case, to avoid split vote, election_tick is determined to be a random value between min_election_tick and max_election_tick each time. Therefore, after the leader node is terminated, any of the remaining two nodes can become the leader node, and it will be elected as the node with the smaller election_tick.

    Nov 29 01:30:30.210 INFO starting a new election, term: 1
    Nov 29 01:30:30.210 DEBG reset election timeout 16 -> 10 at 0, election_elapsed: 0, timeout: 10, prev_timeout: 16
    Nov 29 01:30:30.210 INFO became candidate at term 2, term: 2
    Nov 29 01:30:30.210 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgRequestVote, to: 1, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.210 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgRequestVote, to: 2, from: 0, term: 2, log_term: 1, index: 3, entries: [], commit: 3, commit_term: 1, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    Nov 29 01:30:30.211 INFO broadcasting vote request, to: [1, 2], log_index: 3, log_term: 1, term: 2, type: MsgRequestVote
    2023-11-29 10:30:30,217 - WARNING  - Failed to connect to node 1 elapsed from first failure: 0.0000s. Err message: <AioRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused"
        debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:60061: Failed to connect to remote host: Connection refused {created_time:"2023-11-29T10:30:30.216855+09:00", grpc_status:14}"
    >
    2023-11-29 10:30:30,222 - DEBUG    - Node 3 received Raft message from the node 2, Message: Message { msg_type: MsgRequestVoteResponse, to: 3, from: 2, term: 2, log_term: 0, index: 0, entries: [], commit: 0, commit_term: 0, snapshot: Snapshot { data: "None", metadata: Some(SnapshotMetadata { conf_state: Some(ConfState { voters: [], learners: [], voters_outgoing: [], learners_next: [], auto_leave: false }), index: 0, term: 0 }) }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }
    Nov 29 01:30:30.223 INFO received votes response, term: 2, type: MsgRequestVoteResponse, approvals: 2, rejections: 0, from: 2, vote: true
    Nov 29 01:30:30.223 TRCE ENTER become_leader
    Nov 29 01:30:30.223 DEBG reset election timeout 10 -> 17 at 0, election_elapsed: 0, timeout: 17, prev_timeout: 10
    Nov 29 01:30:30.223 TRCE Entries being appended to unstable list, ents: Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }
    Nov 29 01:30:30.223 INFO became leader at term 2, term: 2
    Nov 29 01:30:30.223 TRCE EXIT become_leader
    Nov 29 01:30:30.223 DEBG Sending from 3 to 1, msg: Message { msg_type: MsgAppend, to: 1, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 1, from: 3
    Nov 29 01:30:30.223 DEBG Sending from 3 to 2, msg: Message { msg_type: MsgAppend, to: 2, from: 0, term: 0, log_term: 1, index: 3, entries: [Entry { context: "None", data: "None", entry_type: EntryNormal, index: 4, sync_log: false, term: 2 }], commit: 3, commit_term: 0, snapshot: Snapshot { data: "None", metadata: None }, request_snapshot: 0, reject: false, reject_hint: 0, context: "None", deprecated_priority: 0, priority: 0 }, to: 2, from: 3
    

    Let's take a look at the logs to see what's going on in the code.

    First of all, the function that is printing the log "starting a new election" is hup.

    hupis called during the processing of messages of typeMsgHupfromstepandMsgTimeoutNowfromstep_follower`.

    Note that the MsgTimeoutNow message is the message type used for Leader transfer, not Leader election. This means that when the leader receives the MsgTransferLeader message, it will send a message of type MsgTimeoutNow to its followers and the hup function will be executed with the transfer_leader flag set to True. While Leader election is the process of electing a new leader in the event of a leader failure, Leader transfer is the process of a leader process transferring leadership to another follower process.

    So we can see that the message we need to follow now is MsgHup. We can guess that it was the tick_election function below that put in the MsgHup message because we didn't get a Heartbeat after the election_tick, so we started electing a leader.

    Remember how we called self.raw_node.tick() every tick_timer on RaftNode? This RawNode.tick allows the node to step an MsgHup message to itself if the election_elapsed has passed the randomized_election_timeout. (Randomizing the election_elapsed here is to prevent a situation where all nodes start voting at the same time and all nodes vote for themselves).

    // raw_node.rs
    pub fn tick(&mut self) -> bool {
        self.raft.tick()
    }
    
    // raft.rs
    pub fn tick(&mut self) -> bool {
        match self.state {
            StateRole::Follower | StateRole::PreCandidate | StateRole::Candidate => {
                self.tick_election()
            }
            StateRole::Leader => self.tick_heartbeat(),
        }
    }
    
    // raft.rs
    pub fn tick_election(&mut self) -> bool {
        self.election_elapsed += 1;
        if !self.pass_election_timeout() || !self.promotable {
            return false;
        }
        
        self.election_elapsed = 0;
        let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
        let _ = self.step(m);
        true
    }
    
    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            ...
            MessageType::MsgHup => {
                self.hup(false)
            },
        }
    }
    
    // raft.rs
    pub fn pass_election_timeout(&self) -> bool {
        self.election_elapsed >= self.randomized_election_timeout
    }
    

    The hup function runs the campaign function with the CAMPAIGN_ELECTION type, as shown below to summarize.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn hup(&mut self, transfer_leader: bool) {
        ...
        info!(
            self.logger,
            "starting a new election";
            "term" => self.term,
        );
    
        ...
        self.campaign(CAMPAIGN_ELECTION);
    }
    

    The campaign function transitions its own state to the Candidate state and starts voting, as shown below.

    First of all, self_id is the node's own id, as the name suggests, so self.poll(self_id, vote_msg, true) means to vote for yourself.

    If the result is VoteResult::Won, then the node wins the vote as it is and returns as the leader.

    So you can see that messages like MsgRequestVote, MsgRequestVoteResponse, etc. will not be sent back and forth in a single-node cluster.

    But of course, this scenario is not the case because it is not a single-node cluster.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        ...
    }
    

    Before we dive into the latter part of campaign, let's take a look at how poll works.

    The poll is a function that calls record_vote, tally_votes, and depending on the result of the poll, if it wins the vote, it transitions to the leader node and broadcasts (bcast_append) that it is the new leader of the cluster.

    If it loses the vote, it transitions to a follower node, and if the result is Pending, it returns without doing anything.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn poll(&mut self, from: u64, t: MessageType, vote: bool) -> VoteResult {
        self.prs.record_vote(from, vote);
        let (gr, rj, res) = self.prs.tally_votes();
        if from != self.id {
            info!(
                self.logger,
                "received votes response";
                "vote" => vote,
                "from" => from,
                "rejections" => rj,
                "approvals" => gr,
                "type" => ?t,
                "term" => self.term,
            );
        }
    
        match res {
            VoteResult::Won => {
                if self.state == StateRole::PreCandidate {
                    self.campaign(CAMPAIGN_ELECTION);
                } else {
                    self.become_leader();
                    self.bcast_append();
                }
            }
            VoteResult::Lost => {
                let term = self.term;
                self.become_follower(term, INVALID_ID);
            }
            VoteResult::Pending => (),
        }
        res
    }
    

    The role of record_vote is quite simple. It records in the hashmap object votes of the ProgressTracker when a node with the value id has voted for itself.

    // tikv/raft-rs/blob/master/src/tracker.rs
    pub fn record_vote(&mut self, id: u64, vote: bool) {
        self.votes.entry(id).or_insert(vote);
    }
    

    Let's look at tally_votes. You can see that the hashmap votes is counting the number of nodes that voted for you and the number of nodes that rejected you, and returning them as a tuple.

    💡 The word "tally" refers to the act of counting or aggregating points, so "tally_votes" is a function that counts and aggregates votes.

    // tikv/raft-rs/blob/master/src/tracker.rs
    pub fn tally_votes(&self) -> (usize, usize, VoteResult) {
        let (mut granted, mut rejected) = (0, 0);
        for (id, vote) in &self.votes {
            if !self.conf.voters.contains(*id) {
                continue;
            }
            if *vote {
                granted += 1;
            } else {
                rejected += 1;
            }
        }
        let result = self.vote_result(&self.votes);
        (granted, rejected, result)
    }
    

    Let's take a look at how we determine the outcome of a vote.

    For a joint quorum, we need to get the consensus of both quorums (Incoming quorum, Outgoing quorum) to win the vote.

    So we need to look at the three vote_result functions below.

    In tracker.rs, we pass as an argument the callback function check, which allows the node id to know if the hashmap votes has voted for it.

    In joint.rs, we return VoteResult::Won only if both configurations win, and VoteResult::Lost if either side loses the vote. Otherwise, we return VoteResult::Pending.

    The actual counting of votes is done in vote_result in majority.rs.

    It counts the number of nodes in the cluster that voted for itself and the number of nodes that did not vote, and returns VoteResult::Won if more than a majority of the nodes agree, VoteResult::Pending if the majority is greater than a majority when including nodes that did not get a majority of the votes but failed to send a response, or VoteResult::Lost otherwise.

    // tracker.rs
    pub fn vote_result(&self, votes: &HashMap<u64, bool>) -> VoteResult {
        self.conf.voters.vote_result(|id| votes.get(&id).cloned())
    }
    
    // joint.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        let i = self.incoming.vote_result(&check);
        let o = self.outgoing.vote_result(check);
        match (i, o) {
            // It won if won in both.
            (VoteResult::Won, VoteResult::Won) => VoteResult::Won,
            // It lost if lost in either.
            (VoteResult::Lost, _) | (_, VoteResult::Lost) => VoteResult::Lost,
            // It remains pending if pending in both or just won in one side.
            _ => VoteResult::Pending,
        }
    }
    
    // majority.rs
    pub fn vote_result(&self, check: impl Fn(u64) -> Option<bool>) -> VoteResult {
        ...
    
        let (mut yes, mut missing) = (0, 0);
        for v in &self.voters {
            match check(*v) {
                Some(true) => yes += 1,
                None => missing += 1,
                _ => (),
            }
        }
        let q = crate::majority(self.voters.len());
        if yes >= q {
            VoteResult::Won
        } else if yes + missing >= q {
            VoteResult::Pending
        } else {
            VoteResult::Lost
        }
    }
    
    // util.rs
    pub fn majority(total: usize) -> usize {
        (total / 2) + 1
    }
    

    We've seen how the voting process is based on the votes hashmap, but before this can happen, this hashmap needs to be updated appropriately via the MsgRequestVote, MsgRequestVoteResponse messages.

    So, let's continue following the campaign function.

    We can see that the campaign function is creating messages of type MsgRequestVote and sending them to voters.

    So next, let's follow the handler for the MsgRequestVote message.

    // tikv/raft-rs/blob/master/src/raft.rs
    pub fn campaign(&mut self, campaign_type: &'static [u8]) {
        let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
            ...
        } else {
            self.become_candidate();
            (MessageType::MsgRequestVote, self.term)
        };
        let self_id = self.id;
        if VoteResult::Won == self.poll(self_id, vote_msg, true) {
            // We won the election after voting for ourselves (which must mean that
            // this is a single-node cluster).
            return;
        }
        // Only send vote request to voters.
        for id in self.prs.conf().voters().ids().iter() {
            if id == self_id {
                continue;
            }
            ...
            let mut m = new_message(id, vote_msg, None);
            m.term = term;
            m.index = self.raft_log.last_index();
            m.log_term = self.raft_log.last_term();
            m.commit = commit;
            m.commit_term = commit_term;
            ...
            self.r.send(m, &mut self.msgs);
        }
        ...
    }
    
    

    At first glance, it seems complicated, but at the end of the day, what the handler of MsgRestVote does is create and send a message to agree or disagree with this vote.

    Based on the vote_resp_msg_type, the type we sent is MsgRequestVote, so the type of the response message will be MsgRequestVoteResponse. (We'll skip describing the prevote algorithm in this article)

    So let's see when a node agrees to vote and when it disagrees. If you peruse the code along with the comments, you'll notice that three conditions must be met for a node to agree to a vote.

    1. can_voteis *true* (either you already voted for the node, or you don't know theleader_idfor thisterm` and haven't voted yet)

    2. self.raft_log.is_up_to_date is true (the message's term value is greater than RaftLog.last_term or, if equal, the message's index is greater than RaftLog.last_index)

    3. the index of the message is greater than RaftLog.last_index, or has a higher priority.

    If these three conditions are met, we send a message that we agree to vote, and if none of them are met, we reject the vote.

    Now let's move on to the receiver of the MsgRequestVoteResponse.

    // raft.rs
    pub fn step(&mut self, m: Message) -> Result<()> {
        ...
        match m.get_msg_type() {
            MessageType::MsgRequestVote => {
                // We can vote if this is a repeat of a vote we've already cast...
                let can_vote = (self.vote == m.from) ||
                    // ...we haven't voted and we don't think there's a leader yet in this term...
                    (self.vote == INVALID_ID && self.leader_id == INVALID_ID)
                    
                // ...and we believe the candidate is up to date.
                if can_vote
                    && self.raft_log.is_up_to_date(m.index, m.log_term)
                    && (m.index > self.raft_log.last_index() || self.priority <= get_priority(&m))
                {
                    self.log_vote_approve(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = false;
                    to_send.term = m.term;
                    self.r.send(to_send, &mut self.msgs);
                    if m.get_msg_type() == MessageType::MsgRequestVote {
                        // Only record real votes.
                        self.election_elapsed = 0;
                        self.vote = m.from;
                    }
                } else {
                    self.log_vote_reject(&m);
                    let mut to_send =
                        new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
                    to_send.reject = true;
                    to_send.term = self.term;
                    let (commit, commit_term) = self.raft_log.commit_info();
                    to_send.commit = commit;
                    to_send.commit_term = commit_term;
                    self.r.send(to_send, &mut self.msgs);
                    self.maybe_commit_by_vote(&m);
                }
            }
        }
    }
    
    // raft.rs
    pub fn vote_resp_msg_type(t: MessageType) -> MessageType {
        match t {
            MessageType::MsgRequestVote => MessageType::MsgRequestVoteResponse,
            MessageType::MsgRequestPreVote => MessageType::MsgRequestPreVoteResponse,
            _ => panic!("Not a vote message: {:?}", t),
        }
    }
    
    // raft_log.rs
    pub fn is_up_to_date(&self, last_index: u64, term: u64) -> bool {
        term > self.last_term() || (term == self.last_term() && last_index >= self.last_index())
    }
    

    The MsgRequestVoteResponse message handler is very simple!

    It calls the poll function we saw earlier to update the votes hashmap and update the StateRole if the vote has been decided.

    // tikv/raft-rs/blob/master/src/raft.rs
    fn step_candidate(&mut self, m: Message) -> Result<()> {
        match m.get_msg_type() {
            ...
            MessageType::MsgRequestVoteResponse => {
                ...
                self.poll(m.from, m.get_msg_type(), !m.reject);
                self.maybe_commit_by_vote(&m);
            }
        }
    }
    

    Summary

    In this article, we looked at the code architecture based on the types used in RAFT-RS, and then followed and analyzed the code of a RAFT implementation in three basic scenarios. We hope that this article has helped you expand your understanding of the RAFT module. In the next installment, we'll take a deeper look at how the RAFT implementation works with more scenarios.

    Thanks 😊

    This post auto translated from Korean

    29 March 2024

  • 2024 GTC Event Live Rankings: How to Utilize GraphQL Subscription

    By Sujin Kim

    Lablup commemorated the 2024 GTC event by hosting a special event. Participants created images similar to the given image using the LLM model provided by Lablup, and among those who scored high, an NVIDIA RTX 4090 graphics card was awarded through lottery. 🫢
    In this post, we aim to highlight the subscription feature of GraphQL, which was used in the leaderboard page of the event, allowing participants to monitor their scores in real time.

    GTC24 event page

    What is a Subscription?

    It is a mechanism that allows the client to query data in response to a server side event stream. In cases where data changes in real time, for example when implementing real-time logs or chat applications, updates can be immediately reflected when pushed from the server.

    Subscription sends data only when the required information changes on the server. Therefore, in the case where data changes are not frequent, Subscription can reduce data traffic, which can also lead to cost savings.

    A similar concept is setting the fetchPolicy of GraphQL's Query to network-only to always get the latest results, but it’s different from the features of subscriptions. This ensures the latest data by always requesting the server whenever the client needs data. However, network costs accompany each request. Thus, while it is okay to set fetchPolicy to network-only to guarantee the latest results whenever a button is clicked, if it is used to retrieve data where updates are frequent like a stock trading window, network costs would be significant.

    How to Use

    Defining Subscription

    The usage is similar to Query, just use the keyword subscription.

      const leaderboardSubscriptions = graphql`
        subscription Ranking_leaderboardSubscription {
          leaderboard {
            submissions {
              id
              name
              score
              imageUrl
            }
            lastUpdatedAt
          }
        }
      `;
    

    When an event occurs in the leaderboard stream, a notification is sent to the application, and the client can get the updated result.

    Then the following result can be obtained.

    leaderboard: {
    	submissions: [
    		{
        	"id": "76293167-e369-4610-b7ac-4c0f6aa8f699",
    	    "name": "test",
        	"score": 0.5910864472389221,
    	    "imageUrl": "<IMAGE_URL>"
    		},
        ],
    	lastUpdatedAt: 1710176566.493705
    }
    

    subscribe

    To display real-time rankings, when entering the relevant page, call subscribe, and when moving to other pages, call dispose to unsubscribe using useEffect.

    import { useEffect } from 'react';
    import { requestSubscription } from 'react-relay';
    
    useEffect(() => {
      const subscriptionConfig = {
        subscription: leaderboardSubscriptions,
        variables: {},
        onNext: (response: any) => {
          setLeaderboard(response.leaderboard.submissions); // 미리 정의된 state
        },
        onError: (error: any) => {
          console.error('Leaderboard subscription error', error);
        },
      };
      const { dispose } = requestSubscription(
        RelayEnvironment, // refer 'How to Configure' below
        subscriptionConfig,
      );
      return () => {
        dispose();
      };
    }, []); //  Executing this part only when the component is mounted or unmounted by setting an empty dependency array
    

    requestSubscription

    • Provides a Disposable object as a return value.
    • This Disposable object includes a `dispose method to cancel the subscription.

    onNext

    • As data is updated through subscription, it updates the pre-defined state to display real-time rankings.
    • In addition to onNext, onError, there are various configurations such as onCompleted called when the subscription ends and updater to update the in-memory relay storage based on server response. For detailed descriptions, refer to this link.

    dispose

    • A cleanup function is returned in the useEffect hook and the dispose method is called to end the subscription when the component is unmounted.

    How to set up (+Relay)

    According to the Relay documentation, GraphQL subscriptions communicate with WebSockets, and you can set up a network using graphql-ws. (There is also a way to use subscriptions-transport-ws, but it's deprecated, so we'll pass on that).

    import { ExecutionResult, Sink, createClient } from 'graphql-ws';
    import {
      Environment,
      Network,
      RecordSource,
      Store,
      SubscribeFunction,
      RelayFeatureFlags,
      FetchFunction,
      Observable,
      GraphQLResponse,
    } from 'relay-runtime';
    import { RelayObservable } from 'relay-runtime/lib/network/RelayObservable';
    import { createClient } from 'graphql-ws';
    
    const wsClient = createClient({
      url: GRAPHQL_SUBSCRIPTION_ENDPOINT,
      connectionParams: () => {
        return {
          mode: 'cors',
          credentials: 'include',
        };
      },
    });
    
    const subscribeFn: SubscribeFunction = (operation, variables) => {
      return Observable.create((sink: Sink<ExecutionResult<GraphQLResponse>>) => {
        if (!operation.text) {
          return sink.error(new Error('Operation text cannot be empty'));
        }
        return wsClient.subscribe(
          {
            operationName: operation.name,
            query: operation.text,
            variables,
          },
          sink,
        );
      }) as RelayObservable<GraphQLResponse>;
    };
    
    // Export a singleton instance of Relay Environment
    // configured with our network function:
    export const createRelayEnvironment = () => {
      return new Environment({
        network: Network.create(fetchFn, subscribeFn),
        store: new Store(new RecordSource()),
      });
    };
    
    export const RelayEnvironment = createRelayEnvironment();
    

    wsClient

    • For url, enter the websocket URL of the GraphQL server.
    • credentials can be set via connectionParams.

    subscribeFn

    • Defines the subscription behavior of the Observable.
    • Validate the query string in if (!operation.text) { ... } and if it is invalid, raise an error and abort the execution.
    • Finally, the return wsClient.subscribe( ... ) code actually subscribes to the subscription using the WebSocket client and passes the payload of the GraphQL operation to the sink (i.e., the Observer).
    • In short, this function is responsible for handling the GraphQL subscription request and pushing the result to the Observable stream whenever a subscription event occurs.

    createRelayEnvironment

    • Create and return a new Relay Environment.
    • A Relay environment is a container that manages other high-level Relay objects, network layer, cache, etc.
    • We have assigned functions to fetchFn to handle GraphQL query/mutation requests and subscribeFn to handle subscription requests.
    • To create a Relay Store to store and manage cache data, we used the RecordSource store.

    RelayEnvironment

    • The createRelayEnvironment function is called to initialize the RelayEnvironment and export it for later import and use elsewhere.
    • This configured RelayEnvironment is mainly used by QueryRenderer, useLazyLoadQuery, commitMutation, etc.

    CORS error

    Initially, I read the config.toml file used on the server side to set the websocket URL of the GraphQL server and set the address. However, I kept getting CORS errors and Unauthorized every time I sent a request. So I did a lot of shoveling around, and with the help of my colleague, I was able to solve it. (Thank you so much 🥹🙏)

    The solution is to use http-proxy-middleware to set up setupProxy!

    As you can see in the create-react-app manual, you can set up a setupProxy to proxy requests from your development server to a specific path on your real server, usually to prevent CORS issues in development environments where the frontend and backend are separated, or to proxy requests from your development server to a specific path on your real server.

    The code looks like this

    const { createProxyMiddleware } = require('http-proxy-middleware');
    
    module.exports = function (app) {
      app.use(
        createProxyMiddleware('/graphql', {
          target: 'http://127.0.0.1:9220',
          changeOrigin: true,
          followRedirects: true,
          ws: true,
        }),
      );
    };
    

    createProxyMiddleware('/graphql', { ... })

    • Sets the middleware to handle all HTTP requests originating from '/graphql'.

    target: 'http://127.0.0.1:9220'

    • Set the address of the server to which proxied requests will be forwarded. Here we set it to port 9220.

    changeOrigin: true

    • Change the host header of the request to the host of the target. Use this to work around CORS issues.

    followRedirects: true

    • This setting causes the proxy to follow redirects when the server sends a redirect response to a request.

    ws: true

    • This setting enables the WebSocket proxy. The websocket connection between the client and server is also passed through this proxy, which we set to true for subscribe.

    Leaderboard page

    After a lot of digging, we've finally finished the leaderboard page! 🎉 A big thank you to everyone who participated. 🙇🏻‍♀️

    Conclusion

    Using GraphQL subscriptions, we were able to implement features like real-time rankings. Although I struggled with how to set it up because of CORS, it was not difficult to use because it is not much different from writing a query.

    I think the biggest advantages of subscriptions are real-time updates and efficiency. Because it receives data from the server in real time, users always see the latest status, and because it only gets updates when the data it needs changes, it can minimize server requests for data that doesn't change often.

    However, it is complex as it requires an implementation of websockets or similar real-time protocols, as well as logic to manage the connection state between the client and server. Although not covered in this article, subscription requires additional work on the server side. And because it requires a real-time connection, it can consume server resources and client resources.

    Therefore, which method is more cost or performance efficient depends on many factors, including the nature of your application, the frequency of data updates, and the number of concurrent users, so use your best judgment.

    references

    • https://relay.dev/docs/v10.1.3/subscriptions/
    • https://relay.dev/docs/guided-tour/updating-data/graphql-subscriptions/#configuring-the-network-layer
    • https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
    • https://github.com/enisdenjo/graphql-ws
    • https://github.com/apollographql/subscriptions-transport-ws
    • https://graphql.org/blog/subscriptions-in-graphql-and-relay
    • https://create-react-app.dev/docs/proxying-api-requests-in-development

    This post is automatically translated from Korean

    28 March 2024

  • Backend.AI Meets Tool LLMs : Revolutionizing AI Interaction with Tools - Part 3

    By Sergey Leksikov

    Part 3. Making own API Retriever and Question Answering system with few lines of code locally without training and serving LLM

    Previously, in Part 1 we talked about Tool LLM and their usage. Part 2 demonstrated how to run Gorilla LLM on Backend.AI. In the Part 3, there will be talk about the case when there are no GPU available, but we still want to get help and assistance regarding our API.

    Suppose we have Backend.AI, and we want to get information about Backend.AI REST API and Functional API in more interactive way via question answering style. The example of REST API can be described in this documentation: https://docs.backend.ai/en/latest/manager/rest-reference/index.html

    Figure 1. Backend.AI REST API Documentation

    In addition, Backend.AI REST API documentation can be exported into openapi.json format:

    Figure 2. Backend.AI openai.json

    Another source of BackendAI API is functional API defined in Backend.AI Client. We want to know how to interact with Backend.AI and which parts of code are responsible. The client code repository is responsible with managing and interacting with cloud and computing environment:

    Steps to make a Question Answering API system

    1. Let’s setup Backend.AI Client locally from https://github.com/lablup/backend.ai/tree/main/src/ai/backend/client on our local PC environment and create a new directory bai-dev/src/ai/backend/client/gpt_api_client

    Figure 3. The directory location of gpt_api_client

    1. At vector_data directory let’s create two sub directories data1/ which will store a REST api documentation: openapi.json and data2/ will store selected B.AI Client files over which we want to do an API Question Answering.

    Figure 4. Overview of data directories with openapi.json and client function code files

    1. Let’s install python library LlamaIndex library. Pip install llama-index. Note LlamaIndex is not related to Meta LLaMA language model. LlamaIndex is about data structures and methods for efficient processing and storing documents for retrieval.

    2. Let’s convert our api and code files into an embedded vector and store them in a Vector Database with LLamaIndex. Let’s use Jupyter Notebook interactive environment which is also integrated in out VSCode on a local PC.

    Figure 5. Jupyter Notebook interactive environment. Loading openapi.json from data/ directory. Then asking questions from query engine over a vector index.

    1. Vectorize data2/ directory with our code functions

    Figure 6. Load data2/ directory with code files from B.AI Client. Then vectorize them into index and create a question answering engine.

    We can save both indexes using python Pickle or Joblib libraries which are commonly used for storing and serializing objects to later load them into system. joblib.dump(index, "rest_api_index.joblib") and joblib.dump(index, "functional_index.joblib")

    1. Jupyter Notebook environment already provides to us ability to ask questions and get response in interactive way. Additionally, we can load the saved vectorized indexes on FastAPI server and answer questions over the web. In previous Part 2, we set computational session with Gorilla LLM. From the previous demo we still have a computational session with a FastAPI server.

    2. Let’s transfer the files rest_api_index.joblib and functional_index.joblib to api_helper/ vFolder at Backend.AI Cloud session

    3. At file server.py load the vector indexes and define the query engines.

    Figure 7. server.py definition of index files and query engine.

    1. For each query engine we specify an FastAPI endpoint.

    Figure 8. Code snippets for REST and Functional API retrieval

    1. Test server response from your local PC using curl command. When a server gets queried on a specific endpoint, it will get an answer from a user.
    curl -X POST -H "Content-Type: application/json" -d '{"instruction":"Create a new session"}' http://127.0.0.1:8000/rest_api
    

    Figure 9. Command line response from curl command. Example 1

    curl -X POST -H "Content-Type: application/json" -d '{"instruction":"Create a new session"}' http://127.0.0.1:8000/functional
    

    Figure 10. Command line response from curl command. Example 2

    In addition, we can make a web app which receives user input, sends to corresponding endpoint, and receives the answer.

    Figure 11. A web app prototype for Question Answering over Backend.AI REST and Functional API. Example 1

    Figure 12. A web app prototype for Question Answering over Backend.AI REST and Functional API. Example 2

    Conclusion

    In Part 3, we demonstrated how to locally create a Question-Answering system using open-source python library LLamaIndex which helped to convert our documents and Backend.AI code into vector form. The question answering can be done in interactive way in a Jupyter Notebook environment which Visual Studio Code supports with plugins. Furthermore, we decided to move those vector indexes to a Backend.AI Cloud environment where a Gorilla LLM API tuned model is server. Then an API Question-Answering web app was implemented to assist users over network.

    Reference:

    • LLama Index. https://docs.llamaindex.ai/en/stable/

    Demo video for Backend.AI API Helper and Gorilla LLM:

    30 January 2024

  • Backend.AI Meets Tool LLMs : Revolutionizing AI Interaction with Tools - Part 2

    By Sergey Leksikov

    Part 2. Backend.AI Gorilla LLM model serving

    Previously, we talked about the Tool LLM capabilities and usage. In this article, there will be a step-by-step demonstration of how to run the Gorilla LLM model on the Backend.AI Cloud while using Backend.AI Desktop app.

    Figure 1. A Backend.AI Desktop app installed on MacOs

    1. Press a start button to make a session creation menu appear.

    Figure 2. New session start interactive screen

    1. Select NGC-Pytorch 23.07 image

    2. Attach a vFolder which is a working directory containing the model files. For example: api_helper/ directory name.

    Figure 3. Attaching vFolder screen

    1. Select the resource amount 128 GB RAM and 5 fGPU

    Figure 4. Resource selection screen

    1. Select a Visual Studio Code Desktop environment

    Figure 5. IDE environment selection screen

    1. At /home/work/api_helper/ directory create a server.py file

    2. Create a requirements.txt file

    Figure 6. Content of requirements.txt file

    To install requirements run the command: pip install -r requirements.txt

    Figure 7. Executing install requirements command

    1. Create a server.py and define using transformers library the tokenizer and model loader.

    Figure 8. Code snippet of server.py

    1. Define server IP address and port number

    Figure 9. Definition of server IP address and port number

    1. To run the model type: python server.py

    Figure 10. Starting a server.py

    1. Accessing the created server

    VSCode automatically creates a port tunneling session from your device to a Backend.AI Cloud server. You may see the server status by accessing the localhost address and the request will be tunneled to a Backend.AI Cloud. In addition, you may define other custom endpoints according your needs.

    Figure 11. The server run log

    Figure 12. VSCode Port Forwarding configuration

    Figure 13. Accessing the root of a server

    Up to this point, we create a computation session on Backend.AI Cloud, attached an api_helper/ vFolder directory with requirements.txt file and server.py. Then we started our FastAPI server where the Gorilla LLM is gets downloaded from HuggingFace repository and loaded into computation session memory with inference/ api .endpoint

    1. API Inference testing To test the API inference of Gorilla LLM you may create a curl request from your local computer command line:
    curl -X POST -H "Content-Type: application/json" -d '{"text":"Object detection on a photo. <<<api_domain>>>:"}' http://127.0.0.1:8000/inference
    

    Figure 14. An example of curl request

    Figure 15. The GPU workload on a server after receiving the request

    Figure 16. The server logs of receiving the request and printing the result

    1. Defining UI web app. You may use any web technology to make a UI app which can display the result in a better way. For example, you may use html and JavaScript files and place them in static directory under root of server.py Then define an endpoint for a web app.

    Figure 17. Example of adding an html web app to a FastAPI server

    1. Gorilla LLM Web App prototype - an API tuned Large Language Model for API question answering and code generation.

    Figure 18. Gorilla LLM web app prototype. Example 1

    Figure 19. Gorilla LLM web app prototype. Example 2

    Conclusion

    Despite some difficulties of Gorilla LLM serving, LLM tuned on own API has a large potential and promises. Since, the model can provided the most recent results with more accurate parameters and function calls than commercial large models and be useful in tasks such as question answering over API, code autocomplete, API code executions.

    Limitations and difficulties:

    While trying to server the Gorilla LLM model there were following issues to consider:

    • Model may generate response in not expected format
    • Model may generate result different for same questions
    • Parsing and rendering LLM response
    • Eliminating the duplicate sentences and lines

    29 January 2024

  • Backend.AI Meets Tool LLMs : Revolutionizing AI Interaction with Tools - Part 1

    By Sergey Leksikov

    Part 1. Introduction to LLMs and Tool Interaction

    What if future AI technology capabilities were available now? Probably while you are on the way home from your workplace, you could ask an AI Assistant to turn on the air-conditioner in the home before your arrival. At same time you are planning the vacation and after having few options you ask an AI model to do hotel booking on your behalf. As the model books your trip, you receive a notification from a cloud provider about your deep learning model's training progress. You ask the AI Assistant to run another session with another set of parameters for the experiment while targeting specific values for performance accuracy. How be such a futuristic scenario realized in the present days?

    This kind of interaction of LLM with real world could be possible via Application Programmatic Interfaces (API). The specific Tool Large-Language Model (LLM) fine-tuned on APIs dataset can respond user’s query with specific API and that API can invoke a program or functions to make a real-world impact. Large Language Models (LLM) are rising in popularity due to their outstanding capabilities of generating text in context while also having reasoning capability for problem solving. Text model utilization ranges from text generating, editing they as well become useful as a copilot for a programmer. How else can LLMs extend their usage beyond their text-generating capabilities?

    With Tool LLM, we are stepping into an era where AI in addition to understanding our requests, the AI can act on those requests using a universe of online tools. Tool LLM are pushing the boundaries of what AI can do with tools via functional and REST APIs.

    GPT-4 is currently the state-of-the-art among LLMs, topping most AI benchmarks. Consider this scenario, a GPT-4 model is being asked to transcribe the audio file into text of another language. However, when prompted to use specific APIs, GPT-4 may hallucinate and suggest non-existent APIs or provide incorrect arguments. As consequence causing function execution failure and not achieving objectives of user specified task.

    Besides issues with hallucinations and inaccuracies, API documentation and versions are constantly changing. The retraining general purpose LLM is costly and not practical to keep the LLM models updated with constantly changing documentations. Tool LLMs provides a solution to the hallucination issues of general large models, enabling interaction with the physical world via programmatic interfaces. Tool LLM are much smaller, making it feasible to periodically be retrained with recent data. In addition, API documentation Retriever module can be added into model serving pipeline to help supplement the model with the most recent API documentation which is relevant to user’s input query.

    To overcome these challenges, researchers have recently proposed two notable open-source methods for enhancing LLMs tool use abilities such as Gorilla LLM and ToolLLaMA, each having its own advantages and specific use cases. Moreover, those models can be prepared for inference serving on Backend.AI Cloud.

    What is Tool LLM?

    Tool LLM is an LLM which was trained on a dataset with user query and API request with relevant context information such as API code usage and API description documentation. The response from such LLM can be executed as a code. The code execution implies that the LLM can interact with various online services and tools. Such as Cloud Computing Providers, Kubernetes machine learning and Deep Learning libraries and repositories such as HuggingFace, TorchHub, TensorFlowHub.

    The main advantage of such Tool LLM is ability to accurately generate an API response to user query which can be executed to obtain the results.

    Understanding the Types of API

    An Application Programming Interface (API) is a crucial element in modern computing, serving as a set of rules and protocols for how different software applications or hardware systems can communicate and interact.

    Functional APIs are designed to be invoked through function calls within a programming environment. For instance, machine learning and deep learning libraries like HuggingFace and TensorFlow offer various models that can be loaded into memory and utilized through Functional API calls. These APIs are integral in executing specific functions and operations within the software.

    This capability of LLM to generate a code related to an API extends their utility far beyond basic text generation and processing. Tool LLMs can seamlessly integrate with diverse online services and tools, ranging from cloud computing platforms to advanced machine learning libraries. Furthermore, their application is not limited to human queries; they can also be integrated into systems where they interact with other programs or AI agents. This versatility positions Tool LLMs as vital components in complex systems and infrastructures, enhancing their potential for real-world applications.

    In the following sections, we'll delve into how Tool LLM were trained and how they are operated. After that two specific research examples will be covered such as Gorilla LLM and ToolLLaMA.

    Tool LLM Training and Inference Workflow

    Tool LLM training involves several steps which includes setting api database, creating a training dataset, model training and inference.

    The API Database includes descriptions and relevant code samples. To generate a Self-Instruct training dataset there is a need to pre-process API database samples into {Input User Query-API Output} pairs. ChatGPT can help with automatically generating such dataset by covering various scenarios and query complexities which humans might ask. From specific cases to general and abstract cases. After Self-Instruct dataset is generated the model is trained to make accurate prediction in terms of API given user input query.

    For Tool LLM inference, it's crucial that the LLM not only responds with accurate argument parameters but also uses the latest API documentation. Thus, API Document Retriever is used which helps to keep the model with the most recent API changes.

    Figure 1. An overview workflow of Tool LLM training and inference over API instuction dataset

    Case Studies: Gorilla LLM and ToolLLaMA

    Gorilla

    Gorilla, a fine-tuned LLaMA 7 billion-based model that outperforms GPT-4 in writing API calls. The notable aspects of Gorilla are:

    • It addresses the limitations of current LLMs in generating accurate input arguments for APIs and their tendency to hallucinate incorrect API usage.
    • Gorilla integrates with a document API retriever, allowing it to adapt to real-time changes in documentation, a significant advantage considering how frequently APIs get updated.
    • The authors have developed a dataset called APIBench to evaluate the model's abilities, which includes APIs from HuggingFace, TorchHub, and TensorHub totaling 1600+ APIs.
    • Gorilla seems to mitigate hallucination issues and improves the reliability of LLM outputs. Also, Gorilla got updated and extended to work with Cloud providers such as AWS, GCP and managing Kubernetes clusters.

    ToolLLaMA

    ToolLLaMA is a model which was fine-tuned on ToolBench an instruction-tuning dataset for tool based on RapidAPI repository. There are following keypoints of ToolLLaMA:

    • ToolBench covers an impressive range of over 16,000 real-world APIs, offering diverse instruction sets and solution paths.
    • The paper proposes a novel Depth-First Search-Based Decision Tree algorithm (DFSDT) to enhance the reasoning capabilities of LLMs such as multiple tool usage and multi-step reasoning.
    • Finetuned ToolLLAMA on ToolBench matches the performance of ChatGPT and demonstrates the generalization abilities in out-of distribution datasets like APIBench.

    Both papers are significant in pushing the boundaries of LLM’s capabilities in real-world tool use by navigating and utilizing a vast array of APIs. This advancement is crucial for practical applications. Below is a comparative summary table provided.

    Figure 2. A comparative table between two API tuned LLM

    Synergy between Backend.AI and ToolLLM

    The training or model serving of LLM requires a significant computer resource, especially since there is a huge demand for Graphic Processing Units (GPU) with high capacity for RAM and computational speed.

    Backend.AI offers a scalable foundation for building, training, and serving diverse models. Backend.AI includes scaling on demand feature for model inference with adding external node for serving and Load Balance to optimize the workload. Backend.AI has vLLM and TensorRT server which can be used for high performance inference of LLMs. In addition, there is a well-designed user-friendly interface and pipeline maker FastTrack tool to create computing environment sessions of various complexities.

    Conclusion

    The futuristic scenario which can be realized at present day where various AI Assistants and Agents interact with various devices and services are possible through API and Tool LLM specifically fine-tuned on such interactions. Gorilla LLM and ToolLLaMA offer a good opportunity to incorporate them in complex tasks. The workflow of how they trained and served is easy to comprehend. Gorilla LLM could be recommended to use for Machine Learning and cloud administration tasks. While ToolLLaMA for more general API usage, multi-tool, and multi-step cases.

    There is also an advantage of training your own model on your own API documentation or code to have a LLM model which understands your code. Such LLM can be helpful at assisting or interacting with users who want to get the relevant information.

    The Backend.AI can effectively to be a backbone for model training and providing scalable model serving while offering a simplistic GUI. How to set up such models and step by step guide will be explained in other parts.

    Commonly asked questions:

    • Q: What is source of hallucinations and LLM limitations and how it solved in Tool LLM?
    • A: GPT-4, like other Large Language Models, faces limitations such as hallucinations and inaccuracies, which are primarily due to its training on extensive yet potentially outdated or inaccurate datasets from the internet. These 'hallucinations' refer to instances where the model confidently produces information that's either factually incorrect or not based in reality, a challenge stemming from the nature of its purely text-based training data and not directly from its size or lack of interaction with the physical world. To address these issues, Tool LLMs are being developed with a focus on specialization and frequent updates. They are fine-tuned on specific datasets, like API documentation, enabling direct interaction with real-world systems through programmatic interfaces for more accurate and current information. The retraining frequency of Tool LLMs varies, depending on the application and the pace of change in the relevant field, with updates potentially needed monthly, quarterly, or bi-annually to keep the model up-to-date with the latest trends and information.
    • Q: What are example pairs of user Query and API?
    • A: The example pairs are provided below.
    • User Query: "Summarize this article about space exploration."
    • API Output: HuggingFace.summarize(text="Article text here", model="facebook/bart-large-cnn")
    • User Query: "What is the sentiment of this customer review?"
    • API Output: HuggingFace.analyze_sentiment(text="Customer review text", model="distilbert-base-uncased-finetuned-sst-2-english")
    • User Query: "Identify the objects in this photo."
    • API Output: HuggingFace.image_recognition(image_file="path/to/photo.jpg", model="google/vit-base-patch16-224")
    • User Query: "Convert this speech recording to text."
    • API Output: HuggingFace.speech_to_text(audio_file="path/to/recording.wav", model="facebook/wav2vec2-base-960h")
    • Q: How do the GorillaLLM and ToolLLaMA papers differ in their approach to utilizing API documentation during the training and inference of their models?
    • A: GorillaLLM appends relevant API documentation during training and offers two inference modes, while ToolLLaMA employs Sentence-BERT for fine-tuning embeddings in the API domain. GorillaLLM uses BM25 and GPT-Retriever from LLamaIndex for documentation retrieval, whereas ToolLLaMA uses Sentence-BERT for a similar purpose.
    • Q: How frequently should small API models be retrained, and what role does the API Retriever play in handling changes in API documentation?
    • A: Training small API models annually is reasonable, but monthly retraining for API changes isn't practical. The API Retriever, using up-to-date documentation, can mitigate the need for frequent retraining. Evaluating and benchmarking fine-tuned API models and RAG methods is essential for effectiveness.
    • Q: What is the difference between ToolLLM and RAG systems, and how do they function in the context of LLMs?
    • A: ToolLLM is a model fine-tuned on API documentation, focusing on incorporating knowledge. RAG systems, on the other hand, are algorithms for data chunking, storage, search, re-ranking, and synthesis. They can work independently or in combination to enhance LLM efficiency, especially in handling context limits and knowledge updates.

    Reference:

    • Gorilla: Large Language Model Connected with Massive APIs. https://gorilla.cs.berkeley.edu/
    • ToolLLM: Facilitating Large Language Models To Master 16000+ Real-World APIs. https://github.com/OpenBMB/ToolBench

    28 January 2024

  • Introduce raftify: High-level Raft framework created with focusing on scalability

    By Gyubong Lee

    Hello, I've been working on introducing Raft to the Backend.AI manager processes at Lablup since last year.

    Here's a rough breakdown of the related tasks I'm working on.

    1. Introducing Raft to the Backend.AI manager process and making it a leader-follower structure.
    2. Replacing the existing distributed lock-based GlobalTimer with a Raft-based global timer, and ensuring that a specific task is performed exactly once in the cluster.
    3. Embedding a global, shareable state store in the manager process and synchronizing it appropriately.

    In this post, I'll introduce the Raft framework I've been shoveling over the past year to accomplish this task, some of the issues I've encountered while developing it, and walk through a raftify example code that implements a distributed key-value store in less than 300 lines of code in total.

    Introducing raftify

    raftify is a Raft framework developed with a focus on extensibility so that it can be easily integrated with any server application.

    Among the Raft implementations utilized in production, tikv's raft-rs implementation was developed on top of raftify, using LMDB as stable storage and gRPC as the network layer.

    Writing binding of the Raft module

    I decided that building and maintaining a reliable Raft implementation from the ground up would be a significant burden, so I decided to write a Python binding for the Raft module first.

    So I initially thought I'd try writing a Python binding using gopy, which is the most starred Raft implementation on GitHub, hashicorp/raft.

    However, gopy didn't support the binding of goroutine, and it didn't support the latest Python version.

    Then, on the advice of a senior developer in the company, I learned about a Rust implementation called tikv/raft-rs and PyO3, which inspired me to try writing a Python binding for tikv/raft-rs using PyO3.

    rraft-py

    Thus, I decided to develop a Python binding for Raft modules named rraft-py which is combined with Rust, Raft, and Py

    My first concern in developing rraft-py was to make the semantics of the rust code and the python code as close to a 1:1 match as possible.

    To achieve a 1:1 match, I needed to bypass some of the details of Rust's syntax.

    My main concern at the time was how to expose Rust references to the Python side, which you can see in my PyCon KR presentation if you're interested.

    The result is rraft-py, a port of over 10,000 lines of integration test code from raft-rs, which has become a fairly reliable implementation of Raft bindings that can be used directly in Python.

    Currently, raftify is in the process of being completely rewritten in Rust, and rraft-py is no longer used, but it was a great experience to write my first PyO3 bindings and try out the APIs of a Raft implementation.

    riteraft-py

    After developing riteraft-py and porting over 10,000 lines of integration tests from raft-rs and even the multiple-mem-node example to python code to get it working, my only thought was that I still didn't know where to start.

    The raft-rs really only provided the Raft implementation itself and I had no idea how to integrate it into my application.

    While browsing GitHub, I came across a high-level Rust implementation based on tikv/raft-rs called riteraft in an issue called How to use this lib?, and it was much more intuitive to figure out how to use. So I decided to develop riteraft-py with the goal of mimicking its behavior in Python and integrating it at the application level.

    The job of riteraft is to integrate this Raft implementation directly with Raft modules and logs, state machines, and network layers, but the problem was that it didn't work very well, apart from being intuitive to use.

    Leader election not happening when the leader is dead, data replication not happening in certain scenarios, panic when the commit count goes over 255, etc... All sorts of miscellaneous issues had to be resolved.

    Even after resolving all of these issues and getting the cluster to look like it was working, the issues kept coming: it would seem to be working fine, but then certain failures would cause catastrophic issues, such as cluster inconsistency or log synchronization locking up.

    Each time we encountered an issue, we needed to be able to dig into the technical details of RAFT-RS and understand them, which ended up being a process of ripping apart RAFT-RS's code and understanding it piece by piece.

    raftify

    While troubleshooting the issue, I decided to use a different abstraction than riteraft and implemented many changes, including a CLI module for debugging node and cluster state, which led me to rename the library to raftify.

    When I first started developing the library, the goal was to make it compatible with any Python application, hence the name raftify, which means to raftify.

    I am no longer developing the Python implementation, but you can find it on its branch.

    raftify written in Rust

    Developed in Python on top of rraft-py, raftify ended up working well, but the crude test harness written in a multi-process structure was hard to test in CI, easily broke cluster consistency, and got out of control at the slightest hint of code complexity.

    As a result, we decided to completely rewrite raftify's internal logic in Rust and expose only the high-level interface of the Raft package in Python.

    Once completely rewritten in Rust, raftify was single-threaded, integration testable, and could be tested in CI, which helped eliminate the fear of making code changes.

    raftify example code

    In this section, we'll create a simple distributed key-value store using raftify.

    For the full source code, see this link.

    Define the state machine

    The first thing we need to do is define the log entries and state machine that we will use in our key-value store.

    For the sake of this article, we'll just define a simple Insert type command that defines a value as a log entry.

    💡 Disclaimer: This article does not explain the Rust language syntax and the theoretical background of Raft.

    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub enum LogEntry {
        Insert { key: u64, value: String },
    }
    

    Let's define a state machine of type HashMap as shown below.

    #[derive(Clone, Debug)]
    pub struct HashStore(pub Arc<RwLock<HashMap<u64, String>>>);
    

    Then we need to define encode and decode methods to indicate how we want to serialize and deserialize these data structures. You can use the bincode crate to define these as simply as below.

    impl AbstractLogEntry for LogEntry {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(self).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<LogEntry> {
            let log_entry: LogEntry = deserialize(bytes)?;
            Ok(log_entry)
        }
    }
    
    impl AbstractStateMachine for HashStore {
        fn encode(&self) -> Result<Vec<u8>> {
            serialize(&self.0.read().unwrap().clone()).map_err(|e| e.into())
        }
    
        fn decode(bytes: &[u8]) -> Result<Self> {
            let db: HashMap<u64, String> = deserialize(bytes)?;
            Ok(Self(Arc::new(RwLock::new(db))))
        }
    }
    

    Finally, we need to define three methods in the HashStore that will be used by raftify's internal code.

    Define apply, a method that will be called when a new log entry is applied to the HashStore, snapshot, which will be called when saving the current state of the HashStore as a snapshot, and restore, which will be called when restoring the state of the HashStore via a snapshot byte slice, as shown below.

    #[async_trait]
    impl AbstractStateMachine for HashStore {
        async fn apply(&mut self, data: Vec<u8>) -> Result<Vec<u8>> {
            let log_entry: LogEntry = LogEntry::decode(&data)?;
            match log_entry {
                LogEntry::Insert { ref key, ref value } => {
                    let mut db = self.0.write().unwrap();
                    log::info!("Inserted: ({}, {})", key, value);
                    db.insert(*key, value.clone());
                }
            };
            Ok(data)
        }
    
        async fn snapshot(&self) -> Result<Vec<u8>> {
            Ok(serialize(&self.0.read().unwrap().clone())?)
        }
    
        async fn restore(&mut self, snapshot: Vec<u8>) -> Result<()> {
            let new: HashMap<u64, String> = deserialize(&snapshot[..]).unwrap();
            let mut db = self.0.write().unwrap();
            let _ = std::mem::replace(&mut *db, new);
            Ok(())
        }
    }
    

    Define the web server API

    Let's define the web server API that will be used in our example. We will use this API to access the Raft object on the node and manipulate the HashStore.

    For our example, we'll use the actix-web crate and define it as shown below.

    The put command can be implemented by calling the propose method on the RaftNode of the Raft object. We can do this by encoding the Insert type LogEntry we defined earlier and passing it as an argument to the RaftNode::propose method.

    The get command can be implemented by returning the value corresponding to the id from the HashMap stored in memory.

    #[get("/put/{id}/{value}")]
    async fn put(data: web::Data<(HashStore, Raft)>, path: web::Path<(u64, String)>) -> impl Responder {
        let log_entry = LogEntry::Insert {
            key: path.0,
            value: path.1.clone(),
        };
        data.1.raft_node.propose(log_entry.encode().unwrap()).await;
    
        "OK".to_string()
    }
    
    #[get("/get/{id}")]
    async fn get(data: web::Data<(HashStore, Raft)>, path: web::Path<u64>) -> impl Responder {
        let id = path.into_inner();
    
        let response = data.0.get(id);
        format!("{:?}", response)
    }
    
    let web_server = tokio::spawn(
        HttpServer::new(move || {
            App::new()
                .app_data(web::Data::new((store.clone(), raft.clone())))
                .service(put)
                .service(get)
        })
        .bind(addr)
        .unwrap()
        .run(),
    );
    

    Bootstrap a Raft cluster

    Next, let's bootstrap a cluster of RaftNodes.

    If the --peer-addr argument is given, send a join request to the cluster to get a new node_id via peer-addr and bootstrap a new cluster if this argument is not given.

    Leader

    In this example, we'll stick with the node_id of the leader node for intuitive understanding. This means that when we call Raft::bootstrap_cluster, we can create a Raft object of the leader node by passing 1 to the node_id. After that, we can call the Raft::run method and the RaftNode will run.

    Followers

    As shown below, after calling Raft::request_id to receive a ClusterJoinTicket that can be used to join the cluster, you will call Raft::new_follower with the specified node_id to create a follower Raft node object, call the Raft::run method to have the RaftNode run, and join the cluster via the Raft::join method.

    let (raft, raft_handle) = match peer_addr {
        Some(peer_addr) => {
            log::info!("Running in Follower mode");
    
            let ticket = Raft::request_id(raft_addr, peer_addr.clone(), logger.clone()).await.unwrap();
            let node_id = ticket.reserved_id;
    
            let raft = Raft::new_follower(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
    
            let handle = tokio::spawn(raft.clone().run());
            raft.join(ticket).await;
            (raft, handle)
        }
        None => {
            log::info!("Bootstrap a Raft Cluster");
            let node_id = 1;
            let raft = Raft::bootstrap_cluster(
                node_id,
                raft_addr,
                store.clone(),
                cfg,
                None,
                logger.clone(),
            )?;
            let handle = tokio::spawn(raft.clone().run());
            (raft, handle)
        }
    };
    
    let _ = tokio::try_join!(raft_handle)?;
    

    You can now bootstrap a Raft cluster of three nodes in the terminal as shown below.

    $ ./target/debug/memstore --raft-addr=127.0.0.1:60061 --web-server=127.0.0.1:8001
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60062 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8002
    $ ./target/debug/memstore --raft-addr=127.0.0.1:60063 --peer-addr=127.0.0.1:60061 --web-server=127.0.0.1:8003
    

    Test

    We can now try out the key-value store we defined through the actix-web server API via the curl command.

    ❯ curl http://localhost:8001/put/1/test
    OK
    
    ❯ curl http://localhost:8001/get/1
    Some("test")
    

    If you're interested in learning more, you can check out the raftify repository for instructions on how to use the CLI module to help with debugging, example code for RaftServiceClient, and more.

    Summary

    raftify is an experimental framework that aims to make it easier for anyone to integrate Raft modules that are otherwise hard to access by the normal developer.

    It was developed to introduce a leader-follower structure to Backend.AI manager processes, but as I've shown in this post, it could be used in a variety of places where you need an HA structure, such as creating your own simple distributed key-value store with short source code.

    If you're intrigued by the inner workings of the tikv/raft-rs implementation, stay tuned for my next post where I'll be analyzing what happens inside the source code line by line in a few scenarios.

    This post is automatically translated from Korean

    26 January 2024

  • Raft Consensus algorithm for Backend.AI: Leader election

    By Jeongseok Kang

    High availability (HA) has become an indispensable concept when talking about modern applications. High availability is the ability of an IT system to remain nearly 100% accessible and reliable at all times by eliminating or minimizing downtime^1. Backend.AI, which is developed and serviced by Rableup, also employs various methods to maintain high availability.

    Backend.AI architecture

    Background

    Backend.AI consists of many different components, including managers and agents, storage proxies, and web servers. Each of these components runs as multiple processes in a distributed environment to increase reliability, especially the manager, which is responsible for scheduling session execution and many core functions of Backend.AI. Currently, the manager has an Active-Active HA structure that ensures high availability through load balancing.

    One of the many features of the Backend.AI Manager is event handling. Backend.AI raises various events, such as AgentStartedEvent and DoScheduleEvent, to track the lifecycle of agents and sessions and provide optimal scheduling. For example, when a Backend.AI Agent process runs, it generates an AgentStartedEvent, and the Backend.AI Manager process receives this event and performs a specific action (schedule()). Backend.AI Manager also raises a DoScheduleEvent internally to ensure periodic scheduling. This is where the problem arises. If you are running multiple Backend.AI Manager processes for high availability, having each process raise an event with its own timer adds unnecessary load and can cause the health of the entire system to be unreliable. The Backend.AI Manager implements a GlobalTimer to ensure that only one manager process generates events within the same system. The GlobalTimer uses distributed locks to ensure mutual exclusivity between processes and to ensure that only one process generates events.

    @preserve_termination_log
    async def generate_tick(self) -> None:
        try:
            await asyncio.sleep(self.initial_delay)
            if self._stopped:
                return
            while True:
                try:
                    async with self._dist_lock:
                        if self._stopped:
                            return
                        await self._event_producer.produce_event(self._event_factory())
                        if self._stopped:
                            return
                        await asyncio.sleep(self.interval)
                except asyncio.TimeoutError:  # timeout raised from etcd lock
                    if self._stopped:
                        return
                    log.warn("timeout raised while trying to acquire lock. retrying...")
        except asyncio.CancelledError:
            pass
    

    Currently, Backend.AI provides an interface for distributed locks, [AbstractDistributedLock] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L33-L44), and we have developed and are using [FileLock] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L47-L142), [EtcdLock] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L145-L190) based on the [etcd concurrency API] (https://etcd.io/docs/v3.5/dev-guide/api_concurrency_reference_v3/), and [RedisLock] (https://github.com/lablup/backend.ai/blob/2f90d03c4477eda8e0beeabb7fe4b067c56dae09/src/ai/backend/common/lock.py#L193-L248) based on [Redis Lock] (https://redis.io/docs/manual/patterns/distributed-locks/) as actual implementations.

    etcd is a distributed, open-source key-value store used to store and manage critical information needed to keep distributed systems running^2, most notably in Kubernetes.

    class AbstractDistributedLock(metaclass=abc.ABCMeta):
        def __init__(self, *, lifetime: Optional[float] = None) -> None:
            assert lifetime is None or lifetime >= 0.0
            self._lifetime = lifetime
    
        @abc.abstractmethod
        async def __aenter__(self) -> Any:
            raise NotImplementedError
    
        @abc.abstractmethod
        async def __aexit__(self, *exc_info) -> Optional[bool]:
            raise NotImplementedError
    

    Requirements

    The GlobalTimer does a good job of controlling event generation on a per-process basis in a distributed environment. However, requirements are always changing and the software needs to change with them. This time, the added requirement was to implement a rate limit for requests. With the current load balancing scheme, we can't guarantee that every request is handled by the same manager, which can lead to the following problems because the state of each manager is not shared.

    1. Set the counters for both managers to 0 and the request count limit to 1.
    2. The first request is received by manager 1.
    3. Increase the counter on manager 1 by 1. (C1: 0 -> 1)
    4. The counter reaches the maximum allowed number of requests and the next request is rejected.
    5. Manager 2 receives the second request due to load balancing.
    6. The counter on manager 2 has not reached the maximum allowed number of times because it is still 0. (C2: 0)
    7. Manager 2 processes the request.
    8. The request count limit didn't work!
    

    Therefore, the following issue has been proposed to discuss ways to improve these limitations.

    Issue suggesting improvements to distributed timers (lablup/backend.ai#415)

    To delegate global state management to a single manager process, represented by a leader, we investigated consensus algorithms and decided to use the Raft Consensus Algorithm (hereafter Raft), which is used in projects such as etcd, which is used as a repository in Kubernetes (https://kubernetes.io/docs/concepts/overview/components/#etcd), and which we believe has been well validated.

    Raft consensus algorithm

    The Raft algorithm was proposed in "In Search of an Understandable Consensus Algorithm"^3 submitted to USENIX in 2014. It was created to improve upon Paxos^4, the leading algorithm at the time, which was difficult to understand and implement in practice due to its complex consensus process, hence the title.

    But our most important goal — and most difficult challenge — was understandability.

    • In Search of an Understandable Consensus Algorithm

    A Raft cluster typically consists of five nodes, because a maximum of two nodes can fail and still satisfy a quorum to maintain the system. Each node in a cluster has one of three states: leader, follower, or candidate. In general, there can be at most one leader in each cluster, with the rest of the nodes being followers.

    Glossary #1

    • quorum: The minimum number of people required to make a decision. (N/2+1)
    State transition diagram of a Raft node (Source: In Search of an Understandable Consensus Algorithm)

    The Raft algorithm delegates all power to an elected leader and makes the flow of logs unidirectional, making it easier to understand the overall picture. The Raft algorithm has the following characteristics

    Glossary #2

    • term: The generation of the current leader or candidate. Incremented by 1 each time a leader election begins.
    • index: Refers to the location of a specific value in the log.
    • commit: Indicates that a specific value from the log was applied to the state machine.
    • commitIndex: Highest index that successfully commits
    • Election Safety: Each term has a maximum of one leader.
    • Leader Append-Only: Readers cannot overwrite or delete logs, they can only add new ones.
    • Log Matching: If two logs have values with the same index and term, all values up to that index are the same.
    • Leader Completeness: If a value is committed to the log in a particular term, all subsequent generations of readers are guaranteed to have that value.
    • State Machine Safety: If one server applies a log value from a particular index to its state machine, another server cannot apply a different value from the same index.

    Using the above features, Raft divides the entire consensus process into three independent parts.

    • Leader election: If the existing leader is not working, a new leader must be elected.
    • Log replication: The leader replicates the request logs it receives from clients to other nodes. The other nodes unconditionally accept the leader's logs.
    • Safety: When one server applies a log value from a particular index to its state machine, another server cannot apply a different value from the same index.

    In this article, we'll discuss the different states a Raft node can be in, and implement the leader election process in code.

    Follower

    Followers do not send requests themselves, but only receive and respond to requests from the leader or candidate. The Behavior Spec for a Follower proposed in the paper and the code written based on it is shown below.

    • Handle RPC requests from leaders and candidates.
    async def on_append_entries(
        self,
        *,
        term: int,
        leader_id: RaftId,
        prev_log_index: int,
        prev_log_term: int,
        entries: Iterable[raft_pb2.Log],
        leader_commit: int,
    ) -> Tuple[int, bool]:
        await self._reset_timeout()
        if term < (current_term := self.current_term):
            return (current_term, False)
        await self._synchronize_term(term)
        return (self.current_term, True)
    
    async def on_request_vote(
        self,
        *,
        term: int,
        candidate_id: RaftId,
        last_log_index: int,
        last_log_term: int,
    ) -> Tuple[int, bool]:
        await self._reset_timeout()
        async with self._vote_request_lock:
            if term < (current_term := self.current_term):
                return (current_term, False)
            await self._synchronize_term(term)
    
            async with self._vote_lock:
                if self.voted_for in [None, candidate_id]:
                    self._voted_for = candidate_id
                    return (self.current_term, True)
            return (self.current_term, False)
    
    async def _synchronize_term(self, term: int) -> None:
        if term > self.current_term:
            self._current_term.set(term)
            await self._change_state(RaftState.FOLLOWER)
            async with self._vote_lock:
                self._voted_for = None
    
    • If you don't receive any requests from leaders or candidates for a period of time, you'll be placed in candidate status.
    async def _wait_for_election_timeout(self, interval: float = 1.0 / 30) -> None:
        while self._elapsed_time < self._election_timeout:
            await asyncio.sleep(interval)
            self._elapsed_time += interval
        await self._change_state(RaftState.CANDIDATE)
    

    Leaders must periodically announce their presence by sending heartbeat messages to their followers. If a follower does not receive any messages for a certain amount of time (election_timeout), it assumes that the cluster is leaderless and starts an election by becoming a candidate to become the new leader.

    Candidate

    The candidate's behavior statement and implementation code is as follows

    • Become a follower when you receive the AppendEntries RPC request from the new leader (see on_append_etries() for followers).
    • Start the election with the following procedure
      • Increase term by 1. (term += 1)
      • Vote for yourself.
      • Initialize the election timeout.
      • Send a RequestVote RPC request to the other nodes.
    async def _start_election(self) -> None:
        self._current_term.increase()
        async with self._vote_lock:
            self._voted_for = self.id
    
        current_term = self.current_term
    
        terms, grants = zip(
            *await asyncio.gather(
                *[
                    asyncio.create_task(
                        self._client.request_vote(
                            to=server,
                            term=current_term,
                            candidate_id=self.id,
                            last_log_index=0,
                            last_log_term=0,
                        ),
                    )
                    for server in self._configuration
                ]
            )
        )
    
    • If you receive votes from a majority of nodes, you are the leader.
        for term in terms:
            if term > current_term:
                await self._synchronize_term(term)
                break
        else:
            if sum(grants) + 1 >= self.quorum:
                await self._change_state(RaftState.LEADER)
    
    • If the election timeout occurs, start a new election.
    case RaftState.CANDIDATE:
        while self.__state is RaftState.CANDIDATE:
            await self._start_election()
            await self._reset_election_timeout()
            await self._initialize_volatile_state()
            if self.has_leadership():
                await self._initialize_leader_volatile_state()
                break
            await asyncio.sleep(self.__election_timeout)
    

    Leader

    • Send the first heartbeat message (an empty AppendEntries request) immediately after the election. Send heartbeat messages periodically thereafter.
    async def _publish_heartbeat(self) -> None:
        if not self.has_leadership():
            return
        terms, successes = zip(
            *await asyncio.gather(
                *[
                    asyncio.create_task(
                        self._client.append_entries(
                            to=server,
                            term=self.current_term,
                            leader_id=self.id,
                            prev_log_index=0,
                            prev_log_term=0,
                            entries=(),
                            leader_commit=self._commit_index,
                        ),
                    )
                    for server in self._configuration
                ]
            )
        )
        for term in terms:
            if term > self.current_term:
                await self._synchronize_term(term)
                break
    
    • When it receives a request from a client, it adds a value to the log. After applying that value to the state machine, send a response to the request.
    • If the follower has a log value with an index greater than the value the leader is tracking (nextIndex), replicate the log to the follower starting at nextIndex.
      • If successful, update the leader's nextIndex and matchIndex.
      • If it fails due to an inconsistency, it decrements the leader's nextIndex and tries again.
    • If the value (N) below exists, update the commitIndex to that value.
      • The majority of matchIndexes are greater than or equal to N (matchIndex >= N)
      • The term of the Nth log is the same as the current term

    The leader manages a nextIndex and a matchIndex for each follower.

    • nextIndex: The next index that should be sent to each follower.
    • matchIndex: the highest index that was successfully replicated to each follower

    Conclusion

    In this article, we've briefly covered the Raft algorithm and written code to perform a leader election. The remaining two features (log replication, membership changes) will face a variety of challenges in actual implementation, including timing issues. If you're interested in learning more about the Raft algorithm, we recommend reading the author's (Diego Ongaro) PhD thesis (CONSENSUS: BRIDGING THEORY AND PRACTICE)^6.

    Finally, let's end by checking out how ChatGPT describes the Raft algorithm. Raft algorithm explained by ChatGPT (Source: OpenAI ChatGPT 3.5)

    This article is based on the code in lablup/aioraft-ng. Please also pay attention to lablup/raftify, the next generation Raft project currently under development at Lablup.

    29 November 2023

  • Backend.AI Model Service Hands-on: Running GPT-NeoX

    By Kyujin Cho

    Backend.AI version 23.09 has been officially released to the public. We covered Model Service, a key feature in version 23.09, in our previous Sneak Peek: Backend.AI Model Service preview article. Since then, we have added a variety of new features, including GUI support, authentication token history management, and more, and we are going to walk you through them in a tutorial format to make it easy to understand the Backend.AI Model Service. In this tutorial post, we will show you how to use the Backend.AI Model Service to run GPT-NeoX models on top of Triton Inference Server. Triton Inference Server is an open source model inference framework from NVIDIA that enables easy HTTP and gRPC1 delivery of its TritonRT, FasterTransformer, and TritonRT-LLM models, as well as PyTorch, TensorFlow, vLLM, and many others.

    Create a Model VFolder

    1. Navigate to the Data & Folders tab. Click the "New Folder" button to open the VFolder creation dialog.
    2. Create a new model folder. It does not matter how you name the folder, but make sure to set the "Usage" at the bottom to "Model". Once you have specified all the values, click the "Create" button at the bottom. Your model VFolder has now been created.

    FasterTransformer Format Model Conversion

    1. Navigate to the "Sessions" tab. Click the "Start" button to open the session creation dialog.
    2. Select ngc-pytorch for "Running Environment" and 23.07 for "Version". Once you have made your selections, click the arrow icon in the lower right corner.
    3. The window to select the VFolder to mount in the session. To load the model, select the VFolder you just created under the "Model storage folder to mount" section. Once you have made your selections, click the arrow icon in the lower right corner.
    4. A window to specify the amount of resources to be used by the model session. You should allocate at least 16 CPU cores and 128 GB of RAM to ensure smooth model conversion. Once you have made your selections, click the arrow icon in the lower right corner.
    5. After confirming that all settings have been applied correctly, click the "Start" button below to start the session.
    6. Once the session is created, a popup will appear to select an app, as shown below. Click the "Console" app to access the terminal environment.
    7. Run the following shell script to download the GPT-NeoX 20B model and convert it to the FasterTransformer format. Note that where the script mentions <VFolder name>, you must replace it with the name of the model VFolder you created.
    cd /home/work/<VFolder name>
    pip install -U transformers bitsandbytes
    git clone https://github.com/NVIDIA/FasterTransformer
    git clone https://huggingface.co/ElutherAI/gpt-neox-20b
    cd neo-gptx-20b
    git lfs install
    git lfs pull
    

    The GPT-NeoX 20B model requires at least 40GB of VRAM to run. If the physical GPU you are using has less VRAM than this and you need to split the model across multiple GPUs, adjust the number in the -i_g parameter to match the number of GPUs you are using.

    cd /home/work/<VFolder name>
    mkdir -p triton-deploy/gpt-neox-20b-ft
    python ~/<VFolder name>/FasterTransformer/examples/pytorch/gptneox/utils/huggingface_gptneox_convert.py \
      -i /home/work/<VFolder name>/gpt-neox-20b \
      -o /home/work/<VFolder name>/triton-deploy/gpt-neox-20b-ft \
      -i_g 1 \
      -m_n GPT-NeoX-20B
    

    1. If you followed all the steps up to step 7, you should have the following folders under the VFolder.
    work@main1[PRRLCIqu-session]:~/GPT-NeoX-Triton-FT$ ls -al
    total 62
    drwxr-xr-x  5 work work 11776 Oct 12 12:14 .
    drwxr-xr-x  9 work work  4096 Oct 12 12:29 ..
    drwxr-xr-x 14 work work 12800 Oct 12 11:24 FasterTransformer
    drwxr-xr-x  3 work work 16896 Oct 12 10:18 gpt-neox-20b
    drwxr-xr-x  3 work work 11776 Oct 12 11:56 triton-deploy
    

    Now it's time to add the configuration file for Triton Inference Server. Create the file triton-deploy/gpt-neox-20b-ft/config.pbtxt and add the following contents.

    If you set the value of the -i_g parameter to anything other than 1 in step 7, you must modify the value of tensor_para_size in the settings below to match the value of -i_g.

    name: "gpt-neox-20b-ft"
    backend: "fastertransformer"
    default_model_filename: "gpt-neox-20b-ft"
    max_batch_size: 1024
    
    model_transaction_policy {
      decoupled: False
    }
    
    input [
      {
        name: "input_ids"
        data_type: TYPE_UINT32
        dims: [ -1 ]
      },
      {
        name: "start_id"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "end_id"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "input_lengths"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
      },
      {
        name: "request_output_len"
        data_type: TYPE_UINT32
        dims: [ -1 ]
      },
      {
        name: "runtime_top_k"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "runtime_top_p"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "beam_search_diversity_rate"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "temperature"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "len_penalty"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "repetition_penalty"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "random_seed"
        data_type: TYPE_UINT64
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "is_return_log_probs"
        data_type: TYPE_BOOL
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "beam_width"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "bad_words_list"
        data_type: TYPE_INT32
        dims: [ 2, -1 ]
        optional: true
      },
      {
        name: "stop_words_list"
        data_type: TYPE_INT32
        dims: [ 2, -1 ]
        optional: true
      },
      {
        name: "prompt_learning_task_name_ids"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "top_p_decay"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "top_p_min"
        data_type: TYPE_FP32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      },
      {
        name: "top_p_reset_ids"
        data_type: TYPE_UINT32
        dims: [ 1 ]
        reshape: { shape: [ ] }
        optional: true
      }
    ]
    output [
      {
        name: "output_ids"
        data_type: TYPE_UINT32
        dims: [ -1, -1 ]
      },
      {
        name: "sequence_length"
        data_type: TYPE_UINT32
        dims: [ -1 ]
      },
      {
        name: "cum_log_probs"
        data_type: TYPE_FP32
        dims: [ -1 ]
      },
      {
        name: "output_log_probs"
        data_type: TYPE_FP32
        dims: [ -1, -1 ]
      }
    ]
    instance_group [
      {
        count: 1
        kind: KIND_CPU
      }
    ]
    parameters {
      key: "tensor_para_size"
      value: {
        string_value: "1"
      }
    }
    parameters {
      key: "pipeline_para_size"
      value: {
        string_value: "1"
      }
    }
    parameters {
      key: "data_type"
      value: {
        string_value: "fp16"
      }
    }
    parameters {
      key: "model_type"
      value: {
        string_value: "GPT-NeoX"
      }
    }
    parameters {
      key: "model_checkpoint_path"
      value: {
        string_value: "/models/triton-deploy/gpt-neox-20b-ft/1-gpu"
      }
    }
    parameters {
      key: "enable_custom_all_reduce"
      value: {
        string_value: "0"
      }
    }
    
    1. Finally, you need to add the Backend.AI Model Service definition file to the root of the VFolder, under model-definition.yaml (model-definition.yml is also acceptable). Let's take a closer look at the model definition file for running Triton Inference Server.
    models:
    - name: "GPT-NeoX"
      model_path: "/models/triton-deploy"
    ...
    

    This is where you specify the model name and the path to the model.

    The name and path you set here can be accessed by the model server process as the BACKEND_MODEL_NAME and BACKEND_MODEL_PATH environment variables, respectively.

    ...
      service:
        start_command:
          - tritonserver
          - --model-repository=/models/triton-deploy
          - --disable-auto-complete-config
          - --log-verbose
          - "1"
    ...
    

    This is the part that defines the command line syntax for starting the Model Server process.

    ...
        port: 8000
    ...
    

    This is where you fill in the port for API communication that the model server process exposes. If not specified, Triton Inference Server exposes port 8000 for HTTP API communication by default, so you will also write that port in the model definition file.

    ...
        health_check:
          path: /v2/health/ready
          max_retries: 3
          max_wait_time: 5
          expected_status_code: 200
    

    This is where you enable and set up the Health Check feature. If the Health Check feature is enabled, Backend.AI will continuously send HTTP GET requests to the path to verify that it returns an HTTP response code corresponding to the expected_status_code (can be omitted, defaults to 200). If the model server does not respond, or returns an undefined response code, Backend.AI determines that the session is unhealthy and excludes it from the service. When a session is excluded from the service, it is not automatically terminated and the Model Service administrator must manually take the appropriate action by checking container logs, etc. The Health Check feature can be disabled by omitting the syntax entirely. If you do this, Backend.AI will not check the health of the model server and will always assume it is in a healthy state. The max_wait_time is the part that defines the API response timeout. It must be a number in seconds. The max_retries is the number of times the request is retried before the model server is judged to be unhealthy.
    The finished model definition file looks like this.

    models:
    - name: "GPT-NeoX"
      model_path: "/models/triton-deploy"
      service:
        start_command:
          - tritonserver
          - --model-repository=/models/triton-deploy
          - --disable-auto-complete-config
          - --log-verbose
          - "1"
        port: 8000
        health_check:
          path: /v2/health/ready
          max_retries: 3
          max_wait_time: 5
    

    More information about model definition files can be found in the Backend.AI WebUI documentation.

    Now you're all set to run the Model Service.

    Create a Model Service

    1. Navigate to the "Model Serving" tab. Click the "Start Service" button to open the Create Model Service window. Let's take a look at each section in a little more detail.
      • Service name: This is where you specify the name of the Model Service. The name of the Model Service can be used as a subdomain of the Model Service Endpoint (coming soon).
      • Resource Group: This is the field to select the resource group where the Inference Session for the Model Service will be created.
      • Open your app to the outside world: When this feature is enabled, all API requests to the model server must be accompanied by an authentication header before they can be made. For more information about Model Service authentication, see the Backend.AI WebUI documentation.
      • Desired number of routes: A field to specify the number of inference sessions the Model Server process runs in. Setting this value to a number greater than 1 creates multiple identical sessions and enables the round-robin load balancer feature, which distributes API requests evenly among these sessions. This value can be modified at any time after Model Service creation.
      • A panel that specifies the amount of resources for the inference session.

    The GPT-NeoX 20B model requires a minimum of 40 GB of vRAM to run. The relationship between fGPU units and vRAM in Backend.AI may apply differently depending on the settings of your Backend.AI. Consult with the administrator of your Backend.AI for more information. If you have set all the values correctly, press the "OK" button to create the Model Service.

    1. the Model Service has been created. If the Model Service is not yet ready for the model process in the reasoning session, the status will remain "PROVISIONING". Click on the "INFERENCE" section of the "Sessions" tab and you'll see that an inference session has been created corresponding to the Model Service you created in 1. Model Service administrators can click the clipboard icon in the "Control" row to view logs related to the model server processes in an inference session.
    2. When the Model Server process is running normally, the status of the route at the bottom and the status at the top will both change to "HEALTHY", and the address to access the Model Service will appear under "Service Endpoints". You can now access the Triton Inference Server that ran the inference session through that address.

    Conclusion

    In this article, you've learned how to start serving LLM models using the Backend.AI Model Service. The Model Service feature is available in Backend.AI's Cloud Beta. Start serving your own models today!

    1: Not supported by Backend.AI Model Service

    This post is automatically translated from Korean

    21 November 2023

  • High sky and plump horses, and Container Dieting

    By Mario (Manseok) Cho

    Introduction

    Most Linux distributions, such as Ubuntu, RedHat, and CentOS, use glibc as the system's standard C library. When you install a library package, such as OpenSSL, with apt on Ubuntu or rpm (yum) on the RedHat line, it is dynamically linked with glibc by default.

    GNU (Gnu) is an operating system and includes a wide range of computer software. GNU is open source, developed and maintained by the Free Software Foundation (FSF). Examples of things created by GNU include compilers and development tools such as GCC, G++, and Make. GNU uses glibc as its standard C library. glibc uses the GNU Lesser General Public License.

    musl is a Linux standard C library distributed under the MIT license. Its developer is Rich Felker, and while glibc uses dynamic linking, musl aims to implement a standard C library that conforms to POSIX standards using static linking. It also implements non-standard features of Linux, BSD, and glibc.

    Differences between glibc and musl in the Linux environment

    When you install a package on Linux, it uses glibc by default. If you've ever built a C/C++ program using gcc, you've most likely done a glibc-based dynamic link build. However, in addition to this common glibc dynamic build, you can also do a MUSL-based dynamic/static build.

    There are the following differences between *-linux-gnu and *-linux-musl.

    | Build targets | Standard C libraries | Linking method |----------------|-------------------|----------------| | *-linux-gnu | glibc | dynamic linking | | *-linux-musl | musl | dynamic/static linking |

    Consider the case of building an executable with Rust. When you install Rust on a Linux environment using rustup, *-linux-gnu is selected as the default target.

    If you don't specify any other options, Rust will build the binary with the *-linux-gnu target and dynamically link it with glibc. To run a binary built in this way, you must have glibc installed in your Linux environment for it to work. If the binary relies on external libraries such as OpenSSL (if it is dynamically linked), you will also need to install those libraries via a package manager such as apt. If you want to run these dynamically linked binaries as a regular user, you can bundle them into a package like a DEB or RPM that describes the dependencies on external libraries. The package manager will then automatically find and install the appropriate dependent libraries. However, if you're using a library that isn't registered with the package manager, or even the same library, there are subtle compatibility issues between the installed version and the version you used to develop it, there's a chance that the binary you build won't run as intended.

    If you specify the *-linux-musl target, Rust will statically link with musl when building the binary. If you rely on external libraries like OpenSSL, it will also statically link those as well, embedding them all into the binary. This means that you end up with all of these libraries inside a single binary file in Rust. This static binary can run on any Linux environment, as long as it matches the CPU architecture and the set of system calls provided by the Linux kernel. This makes it easier to distribute binaries because you only need to pass a single binary to run it, rather than using a package like a DEB or RPM.

    If this makes deploying binaries so easy, why isn't the *-linux-musl target the default for Linux environments?

    The reason is that using MUSL makes build preparation somewhat more complicated. This is because if a developer-created binary package uses *-linux-musl and also relies on external libraries, those external libraries must also be statically linked with musl instead of dynamically linked with glibc. This means that all dependent libraries, as well as the main body of the program you want to build using the compiler for musl, must be built as static links from source code.

    Fortunately, you don't have to build everything from scratch if it's a commonly used external library in Rust. By utilizing a Docker image that bundles frequently used libraries with the Rust compiler/gcc, you can easily create a musl-based static build. (In the command examples that follow, I'll arbitrarily use the <distro># prompt to distinguish the container environment for each Linux distribution).

    $ docker run -it --name ubuntu ubuntu:22.04 bash
    ubuntu# apt update && apt install -y curl gcc vim
    

    Let's configure a dynamic link, glibc, and a static link, musl, in the Rust language environment, which is commonly used for development. First, install Rust on your Ubuntu environment.

    ubuntu# curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
    ubuntu# source $HOME/.cargo/env
    

    Let's compare dynamic and static linking using Rust's default example, "Hello World" output.

    First, let's build "Hello World" using glibc.

    ubuntu# cd
    ubuntu# cargo new --bin hello && cd $_
         Created binary (application) `hello` package
    ubuntu# cargo build --release
       Compiling hello v0.1.0 (/root/hello)
        Finished release [optimized] target(s) in 0.35s
    

    Let's use the ldd command to verify that the library is configured as a dynamic link in the glibc environment. We can see that linux-vdso, libgcc_s, libc, etc. are configured as dynamic links.

    ubuntu# ldd target/release/hello
            linux-vdso.so.1 (0x00007fffe87df000)
            libgcc_s.so.1 => /lib/x86_64-linux-gnu/libgcc_s.so.1 (0x00007fdce9c3f000)
            libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fdce9a17000)
            /lib64/ld-linux-x86-64.so.2 (0x00007fdce9cc2000)
    

    So let's change the RUST target configuration with a MUSL static link.

    ubuntu# rustup target add x86_64-unknown-linux-musl
    info: downloading component 'rust-std' for 'x86_64-unknown-linux-musl'
    info: installing component 'rust-std' for 'x86_64-unknown-linux-musl'
     34.7 MiB / 34.7 MiB (100%) 8.6 MiB/s in 4s ETA: 0s
    
    ubuntu# rustup show
    Default host: x86_64-unknown-linux-gnu
    rustup home: /root/.rustup
    
    installed targets for active toolchain
    --------------------------------------
    
    x86_64-unknown-linux-gnu
    x86_64-unknown-linux-musl
    
    active toolchain
    ----------------
    
    stable-x86_64-unknown-linux-gnu (default)
    rustc 1.72.0 (5680fa18f 2023-08-23)
    
    ubuntu# 
    

    Let's build "Hello World" to verify that static links are configured correctly.

    ubuntu# cargo build --release --target=x86_64-unknown-linux-musl
       Compiling hello v0.1.0 (/root/hello)
        Finished release [optimized] target(s) in 0.37s
    
    ubuntu# ldd target/x86_64-unknown-linux-musl/release/hello
    statically linked
    

    You can see that "Hello World" is configured as a static link using the musl environment.

    Now let's run "Hello World" built with both dynamic and static links by copying the binaries on CentOS and Alpine environments. CentOS 8 uses glibc dynamic linking and Alpine Linux uses musl static linking.

    CentOS Container Environment

    $ docker run -it --name centos centos:centos8 bash
    centos#
    

    Alpine Container Environment

    The Alpine distribution uses musl by default rather than glic.

    $ docker run -it --rm alpine:3.18
    alpine#
    

    Let's copy 'Hello World' into a glibc environment and a musl environment to see the behavior.

    $ docker cp ubuntu:/root/hello/target/x86_64-unknown-linux-musl/release/hello .
    $ docker cp hello centos:/root/
    $ docker cp hello alpine:/root/
    

    Let's check the behavior on centOS.

    centos# ./hello
    Hello, world!
    

    Let's check the behavior on alpine.

    alpine# ./hello
    Hello, world!
    

    Comparing glibc and musl using the Rust application 'slice'

    Let's take the Rust application 'slice' and compare the container images created with glibc and musl.

    The Rust implementation of 'slice', like Python's 'slice', is publicly available on the GitHub repository https://github.com/ChanTsune/slice. 'slice' is a tool that prints the contents of a file from the front or back, like 'head' or 'tail'. For example, the command below will print lines 10 through 20 from 'file.txt'.

    $ slice 10:20 file.txt
    

    When you build 'slice' in a Rust environment and create a container to use it, you can use it like this

    $ docker run -i --rm -v `pwd`:`pwd` -w `pwd` slice
    

    Let's build a container using glibc in the Ubuntu 22.04 environment.

    FROM rust:latest as builder
    
    WORKDIR /work
    RUN git clone https://github.com/ChanTsune/slice /work/.
    RUN cargo build --release
    RUN strip /work/target/release/slice -o /slice
    
    FROM ubuntu:22.04
    COPY --from=builder /slice /usr/local/bin/
    
    ENTRYPOINT ["slice"]
    

    This time, we'll create a container image based on Ubuntu 22.04 using musl static links.

    FROM rust:latest as builder
    
    RUN rustup target add "$(uname -m)"-unknown-linux-musl
    WORKDIR /work
    RUN git clone https://github.com/ChanTsune/slice /work/.
    RUN cargo build --release --target "$(uname -m)"-unknown-linux-musl
    RUN strip /work/target/"$(uname -m)"-unknown-linux-musl/release/slice -o /slice
    
    FROM ubuntu:22.04
    COPY --from=builder /slice /usr/local/bin/
    
    ENTRYPOINT ["slice"]
    

    Let's create a container image based on theAlpine distribution using a musl static link.

    FROM rust:latest as builder
    
    RUN rustup target add "$(uname -m)"-unknown-linux-musl
    WORKDIR /work
    RUN git clone https://github.com/ChanTsune/slice /work/.
    RUN cargo build --release --target "$(uname -m)"-unknown-linux-musl
    RUN strip /work/target/"$(uname -m)"-unknown-linux-musl/release/slice -o /slice
    
    FROM alpine
    COPY --from=builder /slice /
    
    ENTRYPOINT ["slice"]
    

    If we compare the size of a glibc container image and a musl container image on Ubuntu 22.04 and a musl container image on Alpine, we can see that the container image with musl is smaller.

    $ docker images 
    REPOSITORY TAG               IMAGE ID       CREATED              SIZE
    slice      distroless-musl   d38a74f8568a   11 seconds ago        3.52MB
    slice      alpine-musl       e3abb5f0aace   39 seconds ago        8.4MB
    slice      ubuntu22.04-musl  467edd130e79   About a minute ago   78.9MB
    slice      ubuntu22.04-glibc 09fe5ad40d56   3 minutes ago        78.8MB
    

    In the Ubuntu environment, using glibc or musl doesn't make much difference in the size of the container image, but in the Alpine distribution, you can see that the container image size is reduced by about a tenth. This shows that by utilizing Alpine Linux with static builds, we can make our container images lightweight and reduce deployment time.

    Conclusion

    Using static links in programs that use standard C libraries can simplify the process of deploying Linux binaries. It also reduces the size of the container image compared to dynamic links, and makes deployment convenient regardless of the distribution. When you replace glibc with musl, you benefit not only from the difference in container image size, but also from features newly supported by musl, such as mDNS (a multicast-DNS-based zero config system) and NUMA clusters. Furthermore, if you use distroless, which is distributed by Google to better utilize musl, as your default container image, you can deploy and take advantage of smaller container images.

    This post is automatically translated from Korean

    20 September 2023

  • Digging bitsandbytes issue

    By Jeongseok Kang

    Backend.AI is a popular choice for developing these LLMs because of its ease of use in running large clusters and distributed processing. In fact, we get a lot of feedback and requests from customers, and today I'd like to share how we solved one of them.

    On April 4, 2023, we received a report of an issue where an error occurs when running certain packages in the container environment provided by the NGC Catalog[^1] (NVIDIA GPU Cloud). The NGC Catalog is a list of containers[^2] with optimized environments for developing AI/ML, metaverse, and high-performance computing applications, and because it is operated and distributed directly by NVIDIA, it is highly trusted and considered the standard for CUDA environments in particular. Therefore, an issue with this environment represents a potential risk that many users will face in the future, and we have decided to address this issue as a high priority.

    Reproducing the problem

    I first went through the process of reproducing the issue to determine the exact cause. In this case, I was running ViperGPT[^3] developed by Columbia University and encountered an error in a package called bitsandbytes. ViperGPT has a dependency on bitsandbytes as shown below.

    accelerate==0.18.0
    backoff==2.2.1
    // highlight-next-line
    bitsandbytes==0.38.1
    cityscapesscripts==2.2.1
    git+https://github.com/openai/CLIP.git
    decord==0.6.0
    dill==0.3.6
    ...
    

    I was able to reproduce the problem by simply importing bitsandbytes.

    The execution environment used the nvcr.io/nvidia/pytorch:22.05-py3 image.

    $ pip install bitsandbytes  # 0.37.1
    $ python
    >> import bitsandbytes
    ===================================BUG REPORT===================================
    Welcome to bitsandbytes. For bug reports, please submit your error trace to: https://github.com/TimDettmers/bitsandbytes/issues
    ================================================================================
    CUDA exception! Error code: OS call failed or operation not supported on this OS
    CUDA exception! Error code: initialization error
    CUDA SETUP: CUDA runtime path found: /home/work/data/miniconda3/envs/vipergpt/lib/libcudart.so
    /home/work/data/miniconda3/envs/vipergpt/lib/python3.10/site-packages/bitsandbytes/cuda_setup/main.py:136: UserWarning: WARNING: No GPU detected! Check your CUDA paths. Proceeding to load CPU-only library...
      warn(msg)
    CUDA SETUP: Detected CUDA version 116
    CUDA SETUP: Loading binary /home/work/data/miniconda3/envs/vipergpt/lib/python3.10/site-packages/bitsandbytes/libbitsandbytes_cpu.so...
    /home/work/data/miniconda3/envs/vipergpt/lib/python3.10/site-packages/bitsandbytes/cextension.py:31: UserWarning: The installed version of bitsandbytes was compiled without GPU support. 8-bit optimizers and GPU quantization are unavailable.
      warn("The installed version of bitsandbytes was compiled without GPU support. "
    

    The bitsandbytes traverses all the CUDA devices installed in the execution environment and checks their Compute Capability [^4]. We were supposed to check the number of CUDA devices installed in the execution environment using libcuda.so in the following way. We noticed that an error occurs when we call cuDeviceGetCount()[^5]. The error was 304 CUDA_ERROR_OPERATING_SYSTEM.

    def get_compute_capabilities(cuda):
        """
        1. find libcuda.so library (GPU driver) (/usr/lib)
           init_device -> init variables -> call function by reference
        2. call extern C function to determine CC
           (https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__DEVICE__DEPRECATED.html)
        3. Check for CUDA errors
           https://stackoverflow.com/questions/14038589/what-is-the-canonical-way-to-check-for-errors-using-the-cuda-runtime-api
        # bits taken from https://gist.github.com/f0k/63a664160d016a491b2cbea15913d549
        """
    
        nGpus = ct.c_int()
        cc_major = ct.c_int()
        cc_minor = ct.c_int()
    
        device = ct.c_int()
    
        # highlight-next-line
        check_cuda_result(cuda, cuda.cuDeviceGetCount(ct.byref(nGpus)))
        ccs = []
        for i in range(nGpus.value):
            check_cuda_result(cuda, cuda.cuDeviceGet(ct.byref(device), i))
            ref_major = ct.byref(cc_major)
            ref_minor = ct.byref(cc_minor)
            # 2. call extern C function to determine CC
            check_cuda_result(cuda, cuda.cuDeviceComputeCapability(ref_major, ref_minor, device))
            ccs.append(f"{cc_major.value}.{cc_minor.value}")
    
        return ccs
    

    What is bitsandbytes?

    Since the advent of Transformer, language models have shown high performance gains, and it has become a trend to increase the size of the model by stacking more Transformer blocks. This has led to a large number of GPU resources being required not only to train the model but also to service it. For example, to service GPT-3 with 175B parameters, eight 80GB A100 GPUs costing about $15,000 are required. This is a huge burden not only for individuals, but also for enterprises or research institutes, which is why there is a lot of research on lightweighting inference models for servicing.

    Image source: A Gentle Introduction to 8-bit Matrix Multiplication for transformers at scale using Hugging Face Transformers, Accelerate and bitsandbytes (Hugging Face)

    bitsandbytes has open-sourced LLM.int8()[^6], a work by Tim Dettmers, a PhD candidate at the University of Washington, with Facebook AI Research (now Meta AI). It has shown to reduce the size of the model while maintaining performance by applying a vector-wise quantization method that treats each vector independently when computing matrix products, and by using a mix of 8-bit and 16-bit techniques to minimize losses by representing important vectors in 16-bit. It has been merged into Hugging Face's Transformer implementation and is used in a variety of models including [Llama2] (https://github.com/facebookresearch/llama-recipes/blob/cd82118b74d2fd739bd6227af33b661d04a97406/requirements.txt#L6), [QLoRA] (https://github.com/artidoro/qlora/blob/6c6fc4653abd17ce550f48878a24c7bd8772e98a/requirements.txt#L1), [KoAlpaca] (https://github.com/Beomi/KoAlpaca/blob/4596f882957d286b4d60559b97dcf783822d23f5/webui/requirements.txt#L5), and [KULLM] (https://github.com/nlpai-lab/KULLM/blob/b7a78b62ed6cd9d83c51ad5a92a9dd40b9f35998/requirements.txt#L4).

    Identify the cause

    Now that we've located and reproduced the problem, it's time to get to the bottom of it. I looked to see if there were any similar cases, but I couldn't find any. Also, cuInit() was called normally, making it even more difficult to pinpoint the cause.

    import ctypes
    
    count = ctypes.c_int()
    
    libcuda = ctypes.CDLL("libcuda.so")
    libcuda.cuInit(0)  # 0 (CUDA_SUCCESS)
    libcuda.cuDeviceGetCount(ctypes.byref(count))  # 304 (CUDA_ERROR_OPERATING_SYSTEM)
    
    libcudart = ctypes.CDLL("libcudart.so")
    libcudart.cudaGetDeviceCount(ctypes.byref(count))  # 304 (CUDA_ERROR_OPERATING_SYSTEM)
    

    I filed an issue on the GitHub repo (TimDettmers/bitsandbytes#264) for advice, and was told to update the package to the latest version and try again. After updating to version 0.38.0.post1, which was the latest at the time, I tested again, and the same problem occurred. I couldn't afford to lose too much time, so I decided to switch gears and remove the offending part.

    Image source: Greco-Roman Mythology in Comics (Ghana Publishers)

    Troubleshooting

    My first approach was to use CUDA-Python[^7]. CUDA-Python is the CUDA Python Low-Level Bindings package officially distributed by NVIDIA. I had used it before and found it useful, so I immediately thought of it and decided to install and test it.

    $ pip install cuda-python
    
    from cuda import cuda
    from cuda import cudart
    
    cuda.cuInit(0)  # (<CUresult.CUDA_SUCCESS: 0>,)
    cudart.cudaGetDeviceCount()  # (<cudaError_t.cudaSuccess: 0>, 1)
    

    Fortunately, cudart.cudaGetDeviceCount() worked fine, and I proceeded to test integrating it into bitsandbytes. However, calling torch.cuda.is_available() after calling cuda.cuInit(0) resulted in an error. This was because I called cudaGetDeviceCount() inside torch.cuda.is_available().

    from cuda import cuda, cudart
    
    cuda.cuInit(0)  # <CUresult.CUDA_SUCCESS: 0>,)
    cuda.cudaGetDeviceCount()  # (<cudaError_t.cudaSuccess: 0>, 1)
    
    import bitsandbytes
    
    # ...
    # /opt/conda/lib/python3.8/site-packages/torch/cuda/__init__.py:82: UserWarning: CUDA initialization: Unexpected error from cudaGetDeviceCount(). Did you run some cuda functions before calling NumCudaDevices() that might have already set an error? Error 304: OS call failed or operation not supported on this OS (Triggered internally at /opt/pytorch/pytorch/c10/cuda/CUDAFunctions.cpp:109.)
    #   return torch._C._cuda_getDeviceCount() > 0
    # ...
    

    The problem seemed to be back to square one. I took a breath and calmly reread the error log above. Then something caught my eye.

    torch._C._cuda_getDeviceCount() > 0

    Note that bitsandbytes was already using PyTorch internally, which means it had a dependency on PyTorch. To be precise, `bitsandbytes' had a dependency on lion-pytorch, which had a dependency on PyTorch. And PyTorch already had an interface to CUDA functions, which I decided to take advantage of this time.

    Fortunately, all of the CUDA functions used by bitsandbytes existed in PyTorch. I made the following changes to the functions that were previously called via libcuda.so and libcudart.so.

    libcuda/libcudarttorch
    libcuda.cuDeviceGetCount()torch.cuda.device_count()
    libcuda.cuDeviceGet()torch.cuda.device()
    libcuda.cuDeviceComputeCapability()torch.cuda.get_device_capability()
    libcudart.cudaRuntimeGetVersion()torch.version.cuda

    After verifying that it worked after the change, I registered a PR in the GitHub repository (TimDettmers/bitsandbytes#375) to apply to the distribution package version.

    Postscript

    On July 14, 2023, about two months after registering the PR, the patch was merged into the main branch and included in version 0.40.1.

    I was also able to get some feedback from the author, Tim Dettmers, whose thoughts and philosophy are evident in this short article. Through this opportunity, I was able to learn more about LLM's ecosystem. It was also the first time in a long time that I was able to feel the fun of open source activities. I think the appeal of open source activities is that we can collaborate beyond spatial constraints and learn from each other's ideas. We run an open source version of Backend.AI alongside an enterprise version. We will always strive to provide a better user experience and a better developer experience.

    [^1]: NVIDIA GPU Cloud [^2]: The NGC catalog hosts containers for AI/ML, metaverse, and HPC applications and are performance-optimized, tested, and ready to deploy on GPU-powered on-prem, cloud, and edge systems. [^3]: ViperGPT: Visual Inference via Python Execution for Reasoning, March 14, 2023. [^4]: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#compute-capability [^5]: https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__DEVICE.html#group__CUDA__DEVICE_1g52b5ce05cb8c5fb6831b2c0ff2887c74 [^6]: LLM.int8(): 8-bit Matrix Multiplication for Transformers at Scale, November 10, 2022. [^7]: https://developer.nvidia.com/cuda-python

    This post is automatically translated from Korean

    28 July 2023

  • Sneak Peek: Backend.AI Model Service Preview

    By Kyujin Cho

    Introduction

    As super-sized AI models flood the market, there is a growing concern about not only developing the models, but also how to deliver them "well" and "efficiently" to users. Prior to Large Language Models (LLMs), the computing power of AI models was focused on training rather than inference, as the hardware requirements for attempting to make inferences with a trained model were much smaller than the computing power needed to train the model. Deployers of models could get enough power for inference from the NPU of a real user's end device (such as a smartphone). However, with the advent of LLMs, the tables were turned.

    Take Meta's [OPT 175b] (https://github.com/facebookresearch/metaseq) as an example: OPT-175b, as its name implies, has 175 billion parameters and requires roughly 320+ GB of GPU memory just to load them onto the GPU to perform inference tasks. That's a huge difference from the 4GB that pre-LLM image processing models used to require.
    With this change in AI model behavior, efficiently managing service resources has become paramount to keeping your service running reliably. In this article, we'll preview Backend.AI's upcoming model service feature, Backend.AI Model Service, and show you how it will allow you to efficiently run your AI model from training to serving with a single infrastructure.

    Backend.AI Model Service

    Backend.AI Model Service is a model serving system that runs on top of the existing Backend.AI solution. It takes Backend.AI's tried-and-true container management technology and container app delivery system, AppProxy[^1], to the next level, enabling both AI training and model service in one infrastructure without installing additional components and by simply upgrading the existing Backend.AI infrastructure. It also supports an auto-scaling feature that automatically scales up and down inference sessions based on per-session GPU usage, number of API calls, or time of day, allowing you to effectively manage AI resources used for inference.

    Inference Sessions

    Inference sessions in Backend.AI are conceptually the same as traditional training sessions. You can use the same execution environment you've been using for training for inference sessions, or you can deploy a dedicated execution environment just for inference sessions. Inference sessions are volatile and stateless, so you can terminate them at any time if the session is not performing well. In this case, Backend.AI will attempt to recover the original state by creating a new inference session, while simultaneously forwarding inference requests to other living inference sessions to minimize downtime for the inference service.

    Model storage

    Models to be served through Backend.AI are managed as "model storage" units. Model storage consists of model files, code for model services, and model definition files.

    Model definition file

    The model definition file is where you define the information for running a service provider's model in the Backend.AI Model Service. The model definition file contains information about the model, the ports exposed by the model service, and a set of tasks that must be executed to run the model service. If your model service provides a health check feature that reports its own health, you can use that information to take action, such as excluding sessions from the service if they are in bad health.

    models:
      - name: "KoAlpaca-5.8B-model"
        model_path: "/models/KoAlpaca-5.8B"
        service:
          pre_start_actions:
            - action: run_command
              args:
                command: ["pip3", "install", "-r", "/models/requirements.txt"]
          start_command:
            - uvicorn
            - --app-dir
            - /models
            - chatbot-api:app
            - --port
            - "8000"
            - --host
            - "0.0.0.0"
          port: 8000
          health_check:
            path: /health
            max_retries: 10
    

    Here is an example of a well-defined model definition file, which contains a set of steps to run the KoAlpaca 5.8B model as a model service.

    Tutorial: Model Service with Backend.AI Model Service

    In this tutorial, we'll actually use Backend.AI to service a KoAlpaca 5.8B model quantized to 8 bits.

    Write the API server code

    Write a simple API server to serve the model.

    import os
    from typing import Any, List
    
    from fastapi import FastAPI, Response
    from fastapi.responses import RedirectResponse, StreamingResponse, JSONResponse
    from fastapi.staticfiles import StaticFiles
    import numpy as np
    from pydantic import BaseModel
    import torch
    from transformers import pipeline, AutoModelForCausalLM
    import uvicorn
    
    URL = "localhost:8000"
    KOALPACA_MODEL = os.environ["BACKEND_MODEL_PATH"]
    
    torch.set_printoptions(precision=6)
    
    app = FastAPI()
    
    model = AutoModelForCausalLM.from_pretrained(
        KOALPACA_MODEL,
        device_map="auto",
        load_in_8bit=True,
    )
    
    
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=KOALPACA_MODEL,
    )
    
    
    class Message(BaseModel):
        role: str
        content: str
    
    
    class ChatRequest(BaseModel):
        messages: List[Message]
    
    
    BASE_CONTEXTS = [
        Message(role="맥락", content="KoAlpaca(코알파카)는 EleutherAI에서 개발한 Polyglot-ko 라는 한국어 모델을 기반으로, 자연어 처리 연구자 Beomi가 개발한 모델입니다."),
        Message(role="맥락", content="ChatKoAlpaca(챗코알파카)는 KoAlpaca를 채팅형으로 만든 것입니다."),
        Message(role="명령어", content="친절한 AI 챗봇인 ChatKoAlpaca 로서 답변을 합니다."),
        Message(role="명령어", content="인사에는 짧고 간단한 친절한 인사로 답하고, 아래 대화에 간단하고 짧게 답해주세요."),
    ]
    
    
    def preprocess_messages(messages: List[Message]) -> List[Message]:
        ...
    
    
    def flatten_messages(messages: List[Message]) -> str:
        ...
    
    
    def postprocess(answer: List[Any]) -> str:
        ...
    
    
    @app.post("/api/chat")
    async def chat(req: ChatRequest) -> StreamingResponse:
        messages = preprocess_messages(req.messages)
        conversation_history = flatten_messages(messages)
        ans = pipe(
            conversation_history,
            do_sample=True,
            max_new_tokens=512,
            temperature=0.7,
            top_p=0.9,
            return_full_text=False,
            eos_token_id=2,
        )
        msg = postprocess(ans)
    
        async def iterator():
            yield msg.strip().encode("utf-8")
    
        return StreamingResponse(iterator())
    
    
    @app.get("/health")
    async def health() -> Response:
        return JSONResponse(content={"healthy": True})
    
    
    @app.exception_handler(404)
    async def custom_404_handler(_, __):
        return RedirectResponse("/404.html")
    
    
    app.mount(
        "/",
        StaticFiles(directory=os.path.join(KOALPACA_MODEL, "..", "chatbot-ui"), html=True),
        name="html",
    )
    

    Create a model definition file

    Create a model definition file for your API server.

    models:
      - name: "KoAlpaca-5.8B-model"
        model_path: "/models/KoAlpaca-Ployglot-5.8B"
        service:
          pre_start_actions:
            - action: run_command
              args:
                command: ["pip3", "install", "-r", "/models/requirements.txt"]
          start_command:
            - uvicorn
            - --app-dir
            - /models
            - chatbot-api:app
            - --port
            - "8000"
            - --host
            - "0.0.0.0"
          port: 8000
          health_check:
            path: /health
            max_retries: 10
    

    In a session of the model service, model storage is always mounted under the /models path.

    Prepare model storage

    Add the model API server code you wrote, the model definition file, and the KoAlpaca model to your model storage.

    Create a model service

    With both the model file and the model definition file ready, you can now start the Backend.AI Model Service. The Model Service can be created using the backend.ai service create command in the Backend.AI CLI. The arguments accepted by service create are almost identical to the backend.ai session create command. After the image to use, you pass the ID of the model storage and the number of inference sessions to initially create.

    Using backend.ai service info, you can check the status of the model service and the inference sessions belonging to the service. You can see that one inference session has been successfully created.

    Use the Reasoning API

    You can use the backend.ai service get-endpoint command to see the inference endpoint of a created model service. The inference endpoint continues to have a unique value until a model service is created and removed. If a model service belongs to multiple inference sessions, AppProxy will distribute requests across the multiple inference sessions.

    Restricting access to the Reasoning API

    If you want to restrict who can access the inference API, you can enable authentication for the inference API by starting the model service with the --public option removed. Authentication tokens can be issued with the backend.ai service generate-token command.

    Scaling inference sessions

    The backend.ai service scale command allows you to change the scale of inference sessions belonging to the model service.

    Closing thoughts

    So far, we've learned about Backend.AI Model Service and how to actually deploy a model service with the Model Service feature. Backend.AI Model Service is targeted for general availability in Backend.AI 23.03. We're working hard to make the Model Service feature publicly available in the near future, so stay tuned.

    ---]

    [^1]: Available from Backend.AI Enterprise.

    This post is automatically translated from Korean

    30 May 2023

  • Concurrent React Changed Everything: Distinguishing Renders That Aren't Rushed

    By Jongeun Lee

    Backend.AI's MLOps platform, FastTrack is using React 18. We will explore the differences between rushed and non-rushed renders enabled by the Concurrent Renderer in React 18.

    The Concurrent feature in React, initially introduced as Async Rendering at JSConf Iceland in 2018, was not fully integrated into React 18 until the year 2022. As you might expect from this time period, the Concurrent Renderer is the biggest and most significant change in React 18. Even though the renderer has been updated, React developers can still run code written for versions before React 18 with minimal changes. It is possible to build user interfaces with React without knowledge of React's Concurrent Renderer. Understanding the Concurrent Renderer and its applications can simplify the complexities in your mind during React development, enabling you to create user interfaces(UI) that provide an enhanced user experience(UX). This article will not delve into the inner workings of the Concurrent Renderer. Instead, it will focus on defining what the Concurrent Renderer is and how it can transform the mindset of React developers, which is crucial for those creating applications using React.

    To summarize the content of this article, here's what you need to know:

    Because of the Concurrent Renderer,

    • Component rendering can be interrupted.
    • Parts of the tree can be rendered even when they are not visible on the screen.
    • This allows React developers to distinguish between non-rush renders like never before.

    “React components are abstractly pure functions.”
    React components are actually created as JavaScript functions. (You can also create it as a class, although this is generally not advised in most situations.) A function generates an output based on the provided input. Changing the input can alter the output, hence it is necessary to execute the function again to generate a new result. (A pure function consistently returns the same output for identical inputs.)

    What are the inputs and outputs of a React component?
    The inputs of a React component are known as properties, or 'props', which the component receives as a function. The outputs are the React elements that are returned by the function.

    Is state via hooks also an input?
    'hooks' can be conceptually understood as inputs to a function. Similar to React props, they act as triggers that prompt a re-render when their values change, leading to variations in the output of our React component.

    Now, back to the topic of rendering.

    Component rendering can be interrupted.

    The essence of Concurrent React lies in the ability to interrupt rendering, a feature unavailable before React 18(except in experimental form). In previous versions, when a React component began rendering, all JavaScript operations were blocked until the rendering completed. This means that if the rendering function takes a long time, the event handler function that handles the user's click cannot be executed until the element is returned. However, with React 18, rendering can now be interrupted.

    const A = ({ count }) => {
      return (
        <div>
          <span>{count}</span>
          <B/>
          <C/>
        </div>
      );
    };
    
    const B = () => {
      const [text, setText] = useState("");
      return (
        <div>
          <input value={text} onChange={(e) => setText(e.target.value)} />
          <D/>
        </div>
      );
    };
    
    const C = () => {
      return <span>C</span>;
    };
    
    const D = ({ text }) => {
      verySlowFunction(text); //Consider this function that takes a few seconds to compute.
      return <span>D</span>;
    };
    

    In earlier versions of React 18, rendering component A necessitated the rendering of components B and C, and component D had to be rendered for B. No other JavaScript operations could be performed until A's return value, a React element, was returned. The component tree that A returned was rendered as a single block, and it was not possible to interrupt A's rendering once it had begun.

    In Concurrent React, it is possible to interrupt the rendering process. Why is it necessary to interrupt rendering? You can think of the following:

    • When the current render in progress is no longer valid(stale)
      • For example, consider the situation in the code above where A's count prop is rendering with a value of 1. Before this render completes, count changes to 2, resulting in a render request for A on day 2. Consequently, the rendering result from day 1 becomes obsolete as it does not reflect the most recent value. By halting the rendering of day 1 and promptly beginning the rendering of day 2, you can present the user with the most recent value quickly.
    • When you have something you want to do before the ongoing render updates the screen you want to show.
      • When a user event occurs during rendering, it's possible to halt the ongoing render and give precedence to the event handler for an immediate response.

    These are all cases where you're improving the UX by stopping rendering that component so it can do something else.

    It is possible to render sections of the tree that are not visible on the display.

    Concurrent React enables you to render components corresponding to specific screen areas separately, in addition to what is currently visible on the screen. This feature allows the existing render to remain visible and functional while independently rendering a future screen update in advance, swapping it in once rendering is complete. Concerns may arise about rendering more than necessary and reducing usability. Yet, thanks to the Concurrent Renderer, this separate rendering process can be halted at any moment, ensuring it does not disrupt user interactions. Ultimately, this capability can enhance the user experience.

    So far, we've seen two features of the Concurrent Renderer, and now we'll see how they are utilized to “distinguish between non-rush renders”.

    Distinguish between non-rush renders

    Examples of rushed and non-rushed renders
     
    Consider the experience of visiting your website for the first time via a browser. What's the most urgent thing you need to do when you're faced with a white, blank screen? The most critical action is to display your site's content promptly. If the screen remains white for an extended period, users may not wait and leave. Therefore, it's essential to prioritize rendering the initial content quickly.
     

    On the left sidebar of your homepage, there is a set of menus for navigation. If a user intends to select Menu A but accidentally selects Menu B, and then attempts to select Menu A again while Menu B is still loading, the screen for Menu B will complete rendering before the screen for Menu A appears.
     

    If we consider such user pressed menu B and then pressed menu A immediately. it is more urgent to render the screen for Menu A than it is to render the screen for Menu B, because the screen for B is now invalid.

    As a React developer, you can inform React about non-rush renders by specifying which input changes that trigger a render are not pressing. The hooks that facilitate this for developers are useDeferredValue and useTransition. Both APIs, introduced in React 18, serve to defer non-rush rendering. We will examine these two hooks individually to grasp the distinctions between them.

    useDeferredValue: Separate using a changed input value

    It is used by components that use a specific value and want to handle changes to that specific value in a non-rush manner.

    function App() {
      const [text, setText] = useState('');
      return (
        <>
          <input value={text} onChange={e => setText(e.target.value)} />
          <SlowList text={text} />
        </>
      );
    }
    

    The example code above is one of the useDeferredValue examples from beta.reactjs.org.

    In this scenario, text serves as a state variable; thus, any changes to text will cause the App to re-render. The same text is also passed as a prop to both <input> and <SlowList>. Consequently, when text is modified, it initiates a re-render of App, and as part of this process, both input and SlowList will update to reflect the new text. However, if SlowList has a lengthy render time, the user's input will not appear until the rendering is fully completed, regardless of how fast the user types.

    In this scenario, input represents the user's keyboard input, which is rendered quickly, while SlowList is a result of the user's input and is rendered more slowly than input. We can utilize useDeferredValue to generate a deferredText, which will be displayed during a non-rush render, with text initiating an rush render.

    function App() {
      const [text, setText] = useState('');
      const deferredText = useDeferredValue(text);
      return (
        <>
          <input value={text} onChange={e => setText(e.target.value)} />
          <SlowList text={deferredText} />
        </>
      );
    }
    

    In this manner, when the text value changes, deferredText immediately retains the previous text value. Concurrently, deferredText undergoes a separate offscreen rendering with the new value. Only after this rendering is complete, both text and deferredText update to the latest value. The rendering of deferredText is not a rushed process and can be halted.

    If there be successive non-rushed render requests for the same component, the initial non-rushed render will cease and commence rendering the most recent change, provided it has not concluded. For instance, with text, if a user inputs 'A' followed by 'B' in quick succession into an empty input field, the render for 'A' will initiate. If 'B' is entered before the rendering of 'A' concludes, the render for 'A' will stop, and the rendering for 'AB' will begin.

    useTransition: Separate using a function that changes the input

    Previously, we discussed how both useTransition and useDeferredValue help manage non-urgent renderings. Now, let's explore the distinctions between the two and delve into useTransition.

    :warning: CAUTION

    To clarify the distinction, the example of useDeferredValue has been altered to demonstrate useTransition. It's important to note that useTransition is not compatible with input, as it necessitates synchronous updates. For an explanation of this limitation, refer to the Troubleshooting section on the useDeferredValue page at beta.reactjs.org.

    function App() {
      const [text, setText] = useState("");
      const [isPending, startTransition] = useTransition();
      return (
        <>
          <button
            onClick={(e) => {
              startTransition(() => setText((v) => v + "a"));
            }}
          >
            a 키
          </button>
          <SlowList text={text} />
        </>
      );
    }
    

    Different things:

    If useDeferredValue utilizes its value, text, to specify a non-urgent render, then it employs setText, which alters the value and triggers a render. In cases where text is not available, understanding setText alone is sufficient.

    It is not possible to instantly display the change in text as it occurs within startTransition. A distinct render will initiate for the updated text, but as it is a separate process, the actual screen render won't recognize the updated value, though it will be aware that the separate render is underway through isPending. The useTransition hook delays the change in state, and the useDeferredValue hook postpones certain renderings based on the altered state.

    Common things:

    If multiple non-rush render requests for the same component are made through startTransition, the initial render—similar to useDeferredValue—will be canceled if it's still ongoing, and a new render with the latest value will commence.

    Wrapping

    React 18's Concurrent Renderer introduces the ability to "distinguish between non-rush renders." Utilizing useTransition and useDeferredValue, it allows for updates to complex structures without compromising usability. Prior to React 18, achieving such seamless usability demanded significant development work. Now, with the streamlined process of "distinguishing between non-rush renders," developers can offer users a smooth user experience.

    29 January 2023

  • Introducing FastTrack: Backend.AI MLOps Platform

    By Jihyun Kang

    Introducing FastTrack, the MLOps Platform of Backend.AI. FastTrack allows you to organize each step of data preprocessing, training, validation, deployment, and inference into a single pipeline. FastTrack makes it easy for you to customize each step as you build your pipeline. In this article, we'll explain why you need an MLOps platform, how Backend.AI FastTrack came to be, and what makes itself really unique.

    Rise of MLOps Platforms

    Over the past few years, the IT industry, as well as most industries undergoing digital transformation, has been working hard to adopt AI to make meaningful predictions from scattered data and respond to rapidly changing markets. In order to make a good use of AI in this process, it is necessary to respond to various stages such as model training and optimization, hardware introduction considering data I/O, model version management, etc. The concept of MLOps (Machine Learning Operations) emerged from this. If you are unfamiliar with the concept, we recommend that you skim our 'MLOps series' before reading this article.

    FastTrack: History

    In 2019, we added the Backend.AI pipeline as a beta release to address the demand for DevOps pipelines. We developed and tested the ability to simplify the process of creating and managing complex pipelines, and to operate unidirectional pipelines that split into two or more paths in the middle. However, with the rise of the MLOps concept and the proliferation of various pipeline solutions such as AirFlow, MLFlow, and KubeFlow, we shifted our development direction to integrating and supporting open source pipeline tools instead of developing pipeline features as a full-fledged feature.

    Meanwhile, AI development pipelines have became increasingly complexed, and became clear that open-source MLOps pipeline tools were unable to meet the diverse needs of the users. At this point, we decided to revive the pipeline feature of Backend.AI. During the process of revitalizing and prototyping the Backend.AI pipeline, we changed the direction of development to a MLOps pipeline solution that works with the Backend.AI cluster, but stands independently, so that we could directly address our user requests.

    With such a colorful history, Lablup's AI/MLOps solution is called 'FastTrack'. This name came from the airport or logistics, a lane which expedites passenger or custom clearance. FastTrack became available with Backend.AI 22.09, and still being tested to meet our customer standards.

    FastTrack: What it is

    FastTrack is a machine learning workflow platform that enables users to tailor multiple work units based on Backend.AI clusters and execute them as a Directed Acyclic Graph (DAG). Users can run sessions for each stage of the machine learning pipeline, linked through pre- and post-relationships, allowing them to integrate steps like data preprocessing, training, validation, deployment, monitoring, and optimization into a unified workflow as needed. This means users can more efficiently build and reuse models by structuring sessions into workflows and automatically scheduling them after each phase, rather than manually crafting them in a conventional Backend.AI cluster.

    FastTrack: Structure and features

    FastTrack categorizes workflow templates as pipelines, executes workflows to pipeline jobs, divides the units of work in a workflow into tasks, and the units of work that are to be executed into task instances. The flowchart following outlines the step-by-step progression of work within FastTrack.

    Pipeline

    A pipeline is a structured collection of data and tasks, represented by a Directed Acyclic Graph (DAG). In setting up an AI workflow, constructing a pipeline prompts FastTrack to create a specific folder in your Backend.AI cluster dedicated to pipelines. This setup facilitates the monitoring of training progress via artifacts. FastTrack streamlines the modification of task relationships with an intuitive drag-and-drop interface, allowing for immediate visual feedback in the form of a schematic flow and verification through a YAML file. Moreover, managing pipelines in YAML format allows for easy export, import, and sharing among users.

    Pipeline Job

    Within the FastTrack GUI, the progress of job units is indicated by the color of the nodes associated with each unit. Similar to pipelines, the information and relationships of the task instances being configured in YAML are managed. Upon completion of all task instances, the pipeline job's status is displayed as either successful or failed.

    Task

    A task is the smallest unit of execution in a pipeline that allows you to allocate resources by purpose. For example, sole task for model training can dedicate a lot of GPU resources, to use resources more efficiently, as opposed to preprocessing. You can also specify the execution environment. Based on the images supported by the Backend.AI cluster, you can use images such as TensorFlow, PyTorch, Python 3.x, NGC TensorFlow, NGC PyTorch, etc. without Docker building process. You can also mount virtual folders created by the Backend.AI cluster on a per-task basis as needed.

    Task Instance

    Task instances are physical objects created when a pipeline job is created, based on the task informations that makes up the pipeline. Executing an AI workflow means that the task instances that make up the pipeline job are executed according to the specified preceding and following relationships. Task instances currently have a 1:1 correspondence with Sessions in the Backend.AI cluster, equating the state of a session with the state of a task instance, but we plan to expand beyond sessions to other units of execution in the near future.

    Wrap up

    So far, we've covered MLOps with an introduction to FastTrack, the Backend.AI MLOps platform. The latest release of Backend.AI FastTrack is version 22.09 (in the time of Nov.2022). Our development plans include a range of user-friendly features, such as debugging pipelines, creating dependencies between pipelines, optimizing the usage of resources for tasks, and providing support for GitHub-based model and data repositories. True to Lablup's vision of empowering anyone to develop and use AI models from anywhere, FastTrack will simplify the process of building automated models. We look forward to your interest in our future endeavors.

    29 November 2022

  • aiomonitor-ng: Debugging tool for complex asyncio applications

    By Joongi Kim

    As program complexity grows, software developers need robust debugging tools. The optimal debugging method involves pinpointing a reliable way to replicate an issue within a development setting conducive to free experimentation, followed by the creation of automated tests. However, when the reproduction scenario is overly complex or involves bugs that sporadically appear in production environments, detailed logging becomes the alternative to comprehend the issue retrospectively. In this post, we presents the 'aiomonitor-ng', designed to simplify the debugging of intricate asyncio programs.

    Debugging asyncio applications has its own difficulties. In Python, the stack trace is commonly used for debugging, revealing the program's location at the time of an exception. However, with asyncio's concurrent execution of multiple coroutine tasks, each with its own stack, it's crucial to examine not just the stack of the coroutine where the exception occurred but also those of 'related' coroutines to pinpoint if the error stemmed from another task. This issue intensifies when an external library implicitly generates a coroutine that invokes my code. Moreover, certain bugs, like coroutine task explosions that only manifest in production, or silent terminations of ongoing coroutine tasks, are particularly elusive in development settings, as they don't produce clear exceptions and are only detectable through post-incident logs.

    aiomonitor is a production-grade live debugging tool created by the asyncio core developers. Wrapping asyncio-based code within a monitor object allows you to initiate a telnet session to a pre-set TCP port outside the process while the code is active. Through simple commands, you can inspect the list of coroutine tasks running in the event loop and the status of individual stacks. Backend.AI has integrated aiomonitor, assigning a unique debugging telnet port to each service process. (For security purposes, only local connections are permitted.) This integration has significantly aided in troubleshooting production-specific issues. Nonetheless, pinpointing the cause of a coroutine task's failure due to an external library, not specific to Backend.AI's code, remains a challenge when using aiomonitor at the time of the problem's occurrence.

    We have developed an enhanced version named aiomonitor-ng, where "ng" signifies next-generation. This version includes the following additions and enhancements:

    • Task creation tracker: For all running coroutine tasks, the momentary stack trace is preserved for each job that created the coroutine task (asyncio.create_task()) to allow the entire chain of task creation to be tracked (ps, where command).
    • Task termination tracker: Recently terminated coroutine tasks can be preserved and viewed up to a maximum of N, especially when one job cancels (Task.cancel()) another job. The momentary stack trace of the cancellation trigger is also preserved to enable tracking of the entire cancellation chain (ps-terminated, where-terminated command).
    • Persistent task marker: By default, to prevent memory leaks, recently terminated jobs are tracked up to a maximum of N. However, if specific jobs that must continue running throughout the application's lifespan are marked with a decorator, those jobs always preserve their termination logs, regardless of the history limit. They also provide a filtering function as an additional option in the termination log query command (aiomonitor.task.preserve_termination_log decorator).
    • Sophisticated terminal UI: We improved command-line processing, which was previously composed of a simple REPL (read-evaluate-print loop) based on handcrafted command parsing. We rewrote the aiomonitor server-side implementation to use Click and prompt_toolkit. We also developed a Telnet client that natively operates with asyncio to provide argument autocomplete, such as command and task ID.

    Here are some screenshots of the actual usage:

    We have successfully resolved resource leaks and performance issues stemming from excessive coroutine task creation in the grpcio library through callbacks. Additionally, we addressed problems where tasks that monitor events produced by the docker daemon would silently stop due to specific input message patterns. This was preventing the outcomes of container creation or deletion tasks from being reported, leading to system crashes.

    We anticipate that developers working not only on Lablup but also on various Python asyncio applications will find aiomonitor-ng useful for debugging purposes in the future.

    aiomonitor-ng can be installed via PyPI using the command pip install aiomonitor-ng, and it is open-sourced on my GitHub account for anyone to use and contribute.

    28 November 2022

We're here for you!

Complete the form and we'll be in touch soon

Contact Us

Headquarter & HPC Lab

Namyoung Bldg. 4F/5F, 34, Seolleung-ro 100-gil, Gangnam-gu, Seoul, Republic of Korea

© Lablup Inc. All rights reserved.