Implementation:Mit han lab Llm awq Serve Controller
| Knowledge Sources | |
|---|---|
| Domains | Serving, Infrastructure |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
The Controller class manages distributed model workers, routing inference requests via load balancing and monitoring worker health through periodic heartbeats.
Description
This module implements a centralized controller for the TinyChat distributed serving architecture. The Controller class maintains a registry of active WorkerInfo instances, each tracking a worker's model names, speed, queue length, heartbeat status, and last heartbeat timestamp. Worker dispatch is governed by the DispatchMethod enum, which supports two strategies: LOTTERY (weighted random selection proportional to worker speed) and SHORTEST_QUEUE (selects the worker with the lowest normalized queue length). A background thread periodically removes stale workers whose heartbeats have expired beyond the configured CONTROLLER_HEART_BEAT_EXPIRATION threshold. The controller also exposes a FastAPI application with POST endpoints for worker registration (/register_worker), heartbeat reception (/receive_heart_beat), model listing (/list_models), worker address retrieval (/get_worker_address), streaming generation proxying (/worker_generate_stream), and aggregated status reporting (/worker_get_status). The controller can itself act as a worker for hierarchical management of isolated sub-networks.
Usage
Use this module to orchestrate multiple model worker processes, providing a single entry point for clients to discover workers and route inference requests. It is typically launched as a standalone service before any model workers start.
Code Reference
Source Location
- Repository: Mit_han_lab_Llm_awq
- File: tinychat/serve/controller.py
- Lines: 1-326
Signature
class DispatchMethod(Enum):
LOTTERY = auto()
SHORTEST_QUEUE = auto()
@classmethod
def from_str(cls, name): ...
@dataclasses.dataclass
class WorkerInfo:
model_names: List[str]
speed: int
queue_length: int
check_heart_beat: bool
last_heart_beat: str
class Controller:
def __init__(self, dispatch_method: str): ...
def register_worker(self, worker_name: str, check_heart_beat: bool, worker_status: dict) -> bool: ...
def get_worker_status(self, worker_name: str) -> dict: ...
def remove_worker(self, worker_name: str) -> None: ...
def refresh_all_workers(self) -> None: ...
def list_models(self) -> List[str]: ...
def get_worker_address(self, model_name: str) -> str: ...
def receive_heart_beat(self, worker_name: str, queue_length: int) -> bool: ...
def remove_stable_workers_by_expiration(self) -> None: ...
def worker_api_generate_stream(self, params: dict) -> Generator: ...
def worker_api_get_status(self) -> dict: ...
Import
from tinychat.serve.controller import Controller
# Or run as a standalone service:
# python -m tinychat.serve.controller --host localhost --port 21001 --dispatch-method shortest_queue
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dispatch_method | str | Yes | Load balancing strategy: "lottery" or "shortest_queue" |
| worker_name | str | Yes | URL address of the worker (e.g., "http://localhost:21002") |
| check_heart_beat | bool | Yes | Whether the controller should monitor this worker's heartbeat |
| worker_status | dict | No | Dict with keys "model_names", "speed", "queue_length"; fetched from worker if not provided |
| model_name | str | Yes (for get_worker_address) | Name of the model to route a request to |
| queue_length | int | Yes (for receive_heart_beat) | Current queue length reported by the worker |
Outputs
| Name | Type | Description |
|---|---|---|
| worker_address | str | URL of the selected worker, or empty string if none available |
| models | List[str] | Aggregated list of unique model names across all registered workers |
| exist | bool | Whether the worker sending the heartbeat is known to the controller |
| streaming_response | StreamingResponse | Proxied streaming generation output from a worker |
Usage Examples
Launching the Controller
# From the command line:
# python -m tinychat.serve.controller --host localhost --port 21001 --dispatch-method shortest_queue
Programmatic Usage
from tinychat.serve.controller import Controller
controller = Controller(dispatch_method="shortest_queue")
# Register a worker
controller.register_worker(
worker_name="http://localhost:21002",
check_heart_beat=True,
worker_status={"model_names": ["llava-v1.5-7b-4bit-AWQ"], "speed": 1, "queue_length": 0}
)
# Get a worker address for a model
addr = controller.get_worker_address("llava-v1.5-7b-4bit-AWQ")
print(addr) # "http://localhost:21002"
# List all available models
models = controller.list_models()
print(models) # ["llava-v1.5-7b-4bit-AWQ"]