Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove BaseInferenceServer

From Leeroopedia
Knowledge Sources
Domains Machine Learning Inference, Server Management
Last Updated 2026-02-14 17:00 GMT

Overview

InferenceServer is an abstract base class that provides the shared lifecycle management, health monitoring, request handling, and async context manager protocol for all inference server implementations in datatrove.

Description

The InferenceServer class defines the core infrastructure that all five concrete server implementations (SGLang, VLLM, Dummy, Custom, Endpoint) inherit from. It manages the full server lifecycle through an asyncio.Future-based readiness state machine.

The lifecycle works as follows: bg_start_server acquires a start lock, finds an available port via _find_available_port (which scans from a random base port with rank-based offset), calls the abstract start_server() method to launch the process, starts a _monitor_server background task wrapping the abstract monitor_health() method, and on master nodes, polls is_ready() by making GET requests to /v1/models until the server responds with HTTP 200. On worker nodes, the method blocks until the monitoring task completes.

The async context manager protocol (__aenter__/__aexit__) starts the server in a background task on the master node (allowing preprocessing to begin immediately) while blocking worker nodes until the server is fully initialized. Cleanup in __aexit__ cancels the background task and terminates server processes.

The module also includes a dependency-free raw HTTP POST helper (_raw_post) using asyncio streams for making inference requests to /v1/completions or /v1/chat/completions endpoints, and a port-finding utility that scans up to 200 ports to find an available one.

Usage

This class is not instantiated directly. Subclass it and implement the start_server() and monitor_health() abstract methods to create a concrete inference server implementation.

Code Reference

Source Location

Signature

class InferenceServer(ABC):
    _requires_dependencies = ["httpx"]

    def __init__(self, config: "InferenceConfig", rank: int):

    # Abstract methods (must be implemented by subclasses)
    @abstractmethod
    async def start_server(self) -> asyncio.subprocess.Process | None:
        pass

    @abstractmethod
    async def monitor_health(self) -> None:
        pass

    # Public API
    def get_base_url(self) -> str:
    async def is_ready(self) -> bool:
    async def make_request(self, payload: dict) -> dict:

    # Context manager
    async def __aenter__(self):
    async def __aexit__(self, exc_type, exc_val, exc_tb):

Import

from datatrove.pipeline.inference.servers.base import InferenceServer

I/O Contract

Inputs

Name Type Required Description
config InferenceConfig Yes Inference configuration with server settings (model, use_chat, server_log_folder, etc.)
rank int Yes Rank/ID of this server instance, used for port selection and logging

Outputs

Name Type Description
make_request response dict Parsed JSON response from the inference server
Server logs Log files Per-rank server log files (if server_log_folder configured)

Usage Examples

Basic Usage

import asyncio
from datatrove.pipeline.inference.servers.base import InferenceServer

# Concrete implementation example
class MyServer(InferenceServer):
    async def start_server(self):
        process = await asyncio.create_subprocess_exec(
            "my-server", "--port", str(self._port),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        return process

    async def monitor_health(self):
        while True:
            if not await self.is_ready():
                raise Exception("Server unhealthy")
            await asyncio.sleep(10)

# Usage via context manager
async def run_inference(config, rank):
    server = MyServer(config=config, rank=rank)
    async with server:
        result = await server.make_request({
            "prompt": "Hello, world!",
            "max_tokens": 100,
        })

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment