Implementation:Huggingface Datatrove InferenceServer
| Knowledge Sources | |
|---|---|
| Domains | Inference, Distributed_Computing, Server Management |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Wrapper around vLLM, SGLang, and external endpoint inference servers in the datatrove pipeline, providing a unified async context manager interface for server lifecycle management.
Description
The inference server subsystem consists of an abstract base class (InferenceServer) and three concrete implementations:
- VLLMServer: Launches a local vLLM process via Template:Code CLI command. Supports tensor/data/pipeline parallelism, multi-node distributed serving via Ray, and file-based compile lock coordination to prevent Template:Code cache corruption. Monitors server health by parsing stdout/stderr for startup completion and error indicators (CUDA OOM, runtime errors, GPU placement failures).
- SGLangServer: Launches a local SGLang process via Template:Code. Supports multi-node via native Template:Code with configurable Template:Code. Detects startup by watching for the "The server is fired up and ready to roll!" log message, and detects fatal errors like sampling corruption and index errors.
- EndpointServer: Connects to an external OpenAI-compatible API endpoint. No local process is started. For external HTTPS endpoints, uses the Template:Code Python client library with proper error classification (retryable vs. non-retryable). For localhost HTTP endpoints, falls back to a lightweight raw HTTP POST implementation to avoid unnecessary dependencies.
All three implementations share the same async context manager lifecycle: entering the context starts the server (or validates the endpoint), and exiting performs cleanup. The base class provides common functionality including port discovery, health polling, request routing, and server process management.
Usage
Use InferenceServer implementations when:
- Synthetic data generation requires a local or remote LLM inference backend
- The pipeline needs automatic server lifecycle management with health monitoring
- Multi-GPU or multi-node inference is required for large models
Code Reference
Source Location
- Repository: huggingface/datatrove
- Base class: src/datatrove/pipeline/inference/servers/base.py:L121-585
- VLLMServer: src/datatrove/pipeline/inference/servers/vllm_server.py:L29-232
- SGLangServer: src/datatrove/pipeline/inference/servers/sglang_server.py:L19-141
- EndpointServer: src/datatrove/pipeline/inference/servers/endpoint_server.py:L12-151
- CompileLock: src/datatrove/pipeline/inference/servers/compile_lock.py:L1-146
Signature
class InferenceServer(ABC):
"""Abstract base class for inference servers."""
_requires_dependencies = ["httpx"]
def __init__(self, config: "InferenceConfig", rank: int):
...
@abstractmethod
async def start_server(self) -> asyncio.subprocess.Process | None:
...
@abstractmethod
async def monitor_health(self) -> None:
...
async def make_request(self, payload: dict) -> dict:
...
async def is_ready(self) -> bool:
...
async def __aenter__(self):
...
async def __aexit__(self, exc_type, exc_val, exc_tb):
...
class VLLMServer(InferenceServer):
def __init__(self, config: "InferenceConfig", rank: int):
...
class SGLangServer(InferenceServer):
def __init__(self, config: "InferenceConfig", rank: int):
...
class EndpointServer(InferenceServer):
def __init__(self, config: "InferenceConfig", rank: int):
...
Import
from datatrove.pipeline.inference.servers import VLLMServer, SGLangServer, EndpointServer
from datatrove.pipeline.inference.servers.base import InferenceServer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | InferenceConfig | Yes | Configuration object containing server type, model path, parallelism settings, and model kwargs |
| rank | int | Yes | Rank of the server instance, used for port selection and logging |
Outputs
| Name | Type | Description |
|---|---|---|
| Running HTTP server | HTTP API | OpenAI-compatible API accessible at Template:Code for local servers, or at the configured endpoint URL |
| make_request response | dict | Parsed JSON response matching OpenAI API format with Template:Code and Template:Code fields |
Usage Examples
Example: Using VLLMServer as async context manager
from datatrove.pipeline.inference.run_inference import InferenceConfig
from datatrove.pipeline.inference.servers import VLLMServer
config = InferenceConfig(
server_type="vllm",
model_name_or_path="meta-llama/Llama-3.1-8B-Instruct",
model_max_context=8192,
tp=2, # tensor parallelism across 2 GPUs
)
async with VLLMServer(config, rank=0) as server:
payload = {
"model": config.model_name_or_path,
"messages": [{"role": "user", "content": "Hello!"}],
"max_tokens": 256,
}
response = await server.make_request(payload)
print(response["choices"][0]["message"]["content"])
Example: Using EndpointServer with external API
from datatrove.pipeline.inference.run_inference import InferenceConfig
from datatrove.pipeline.inference.servers import EndpointServer
config = InferenceConfig(
server_type="endpoint",
model_name_or_path="meta-llama/Llama-3.1-70B-Instruct",
endpoint_url="https://api-inference.huggingface.co/v1",
api_key="hf_...",
request_timeout=60.0,
)
async with EndpointServer(config, rank=0) as server:
payload = {
"model": config.model_name_or_path,
"messages": [{"role": "user", "content": "Summarize this text..."}],
"temperature": 0.7,
}
response = await server.make_request(payload)