Implementation:Pytorch Serve TorchModelServiceWorker
| Field | Value |
|---|---|
| Page Type | Implementation (API Doc) |
| Title | TorchModelServiceWorker |
| Implements | Principle:Pytorch_Serve_Distributed_Worker |
| Source | ts/model_service_worker.py
|
| Repository | TorchServe |
| Last Updated | 2026-02-13 00:00 GMT |
Overview
TorchModelServiceWorker is the backend worker process that handles model loading and inference in TorchServe. For distributed large model inference, each torchrun-spawned process creates its own TorchModelServiceWorker instance with a rank-specific socket. The worker reads the LOCAL_RANK, WORLD_SIZE, RANK, and LOCAL_WORLD_SIZE environment variables to determine its position in the distributed topology. The critical design feature is that only the rank 0 worker sends inference responses back to the frontend, while all ranks participate in computation.
Description
The TorchModelServiceWorker class manages the lifecycle of a single backend worker process:
1. Initialization: The constructor accepts socket type, name, host, port, and metrics config. For distributed inference, the socket name or port is offset by LOCAL_RANK:
- Unix sockets:
s_name_parts[0] + "." + str(int(s_name_parts[1]) + LOCAL_RANK) - TCP sockets:
int(port_num) + LOCAL_RANK
2. Model Loading: The load_model() method receives a load request from the frontend containing model path, name, handler, GPU ID, batch size, and other parameters. It uses ModelLoaderFactory to load the model and returns a service object.
3. Connection Handling: The handle_connection() method runs a loop that receives commands from the frontend:
- Command
b"I": Inference request. Callsservice.predict(msg)and sends the response only ifLOCAL_RANK == 0. - Command
b"L": Load model request. Callsself.load_model(msg)and sends the result to the frontend.
4. Async Mode: The handle_connection_async() method provides an alternative connection mode that loads the model and then delegates to AsyncService for asynchronous request handling.
5. Server Loop: The run_server() method binds the socket, listens for connections, and dispatches to the appropriate connection handler.
Usage
Code Reference
Source Location: ts/model_service_worker.py (lines 30-268)
Signature:
class TorchModelServiceWorker(object):
"""
Backend worker to handle Model Server's python service code
"""
def __init__(
self,
s_type: Optional[str] = None,
s_name: Optional[str] = None,
host_addr: Optional[str] = None,
port_num: Optional[int] = None,
metrics_config: Optional[str] = None,
async_comm: Optional[bool] = False,
):
Environment Variables (module-level constants):
LOCAL_RANK = int(os.getenv("LOCAL_RANK", 0))
WORLD_SIZE = int(os.getenv("WORLD_SIZE", 0))
WORLD_RANK = int(os.getenv("RANK", 0))
LOCAL_WORLD_SIZE = int(os.getenv("LOCAL_WORLD_SIZE", 0))
Socket Name Derivation (Unix, line 56):
s_name_new = s_name_parts[0] + "." + str(int(s_name_parts[1]) + LOCAL_RANK)
Port Offset (TCP, line 70):
self.port = int(port_num) + LOCAL_RANK
Rank 0 Response Guard (handle_connection, line 186):
if cmd == b"I":
if service is not None:
resp = service.predict(msg)
if LOCAL_RANK == 0:
cl_socket.sendall(resp)
else:
logging.info("skip sending response at rank %d", LOCAL_RANK)
Import:
from ts.model_service_worker import TorchModelServiceWorker
Module-level Constants:
MAX_FAILURE_THRESHOLD = 5
SOCKET_ACCEPT_TIMEOUT = 30.0
DEBUG = False
BENCHMARK = os.getenv("TS_BENCHMARK") in ["True", "true", "TRUE"]
I/O Contract
Constructor Inputs:
| Parameter | Type | Description |
|---|---|---|
s_type |
Optional[str] | Socket type: "unix" or "tcp" |
s_name |
Optional[str] | Socket name (for Unix sockets), e.g., "/tmp/ts_worker.0" |
host_addr |
Optional[str] | Host address (for TCP sockets), defaults to "127.0.0.1" |
port_num |
Optional[int] | Base port number (for TCP sockets), offset by LOCAL_RANK |
metrics_config |
Optional[str] | Path to metrics configuration YAML file |
async_comm |
Optional[bool] | Whether to use async communication mode (default False) |
load_model() Input (binary protocol message):
| Field | Type | Description |
|---|---|---|
command |
str | "load" |
modelPath |
str | Path to model files |
modelName |
str | Model name |
gpu |
int or None | GPU ID (None for CPU) |
handler |
str | Handler entry point |
envelope |
str | Request data wrapper name (optional) |
batchSize |
int | Batch size |
limitMaxImagePixels |
bool | Whether to limit PIL max image pixels |
load_model() Output:
- Tuple of
(service, message, code):service: The loaded model service object (or None on failure)message: Status message stringcode: HTTP status code (200 for success, 507 for OOM, 500 for other errors)
handle_connection() Protocol:
- Receives binary commands via socket:
b"I"(inference) orb"L"(load) - For inference: calls
service.predict(msg), sends response only from rank 0 - For load: calls
self.load_model(msg), sends load response from all ranks
Usage Examples
Worker startup (from __main__):
# This is how TorchServe (via torchrun) starts each worker process
args = ArgParser.model_service_worker_args().parse_args()
worker = TorchModelServiceWorker(
sock_type, # "unix" or "tcp"
socket_name, # e.g., "/tmp/ts_worker.0"
host, # e.g., "127.0.0.1"
port, # e.g., 9000 (will become 9000+LOCAL_RANK)
metrics_config, # path to metrics YAML
async_comm, # True/False
)
worker.run_server()
Example: 4-GPU distributed inference socket layout:
For a model with nproc-per-node: 4 and base socket name /tmp/ts_worker.0:
| Process | LOCAL_RANK | Socket Name | Sends Response? |
|---|---|---|---|
| Worker-rank0 | 0 | /tmp/ts_worker.0 | Yes |
| Worker-rank1 | 1 | /tmp/ts_worker.1 | No (skipped) |
| Worker-rank2 | 2 | /tmp/ts_worker.2 | No (skipped) |
| Worker-rank3 | 3 | /tmp/ts_worker.3 | No (skipped) |
Example: TCP socket layout with base port 9000:
| Process | LOCAL_RANK | Address:Port | Sends Response? |
|---|---|---|---|
| Worker-rank0 | 0 | 127.0.0.1:9000 | Yes |
| Worker-rank1 | 1 | 127.0.0.1:9001 | No (skipped) |
| Worker-rank2 | 2 | 127.0.0.1:9002 | No (skipped) |
| Worker-rank3 | 3 | 127.0.0.1:9003 | No (skipped) |
Related Pages
- Principle:Pytorch_Serve_Distributed_Worker - Theory of distributed worker coordination
- Pytorch_Serve_ParallelType_Config - How parallelType triggers torchrun
- Pytorch_Serve_Parallelism_Model_Config - Torchrun nproc-per-node configuration
- Pytorch_Serve_Send_Intermediate_Response - Streaming with rank-aware sending
- Environment:Pytorch_Serve_Python_PyTorch_Runtime - Core Python/PyTorch runtime for worker processes
- Environment:Pytorch_Serve_Distributed_Training_Environment - Distributed env vars (LOCAL_RANK, WORLD_SIZE, etc.)
- Infrastructure
- Distributed_Computing