Implementation:Mlc ai Mlc llm JSON FFI Engine Py
Overview
The JSON FFI Engine module provides a synchronous Python interface to the MLC LLM inference engine via JSON-based Foreign Function Interface (FFI) calls. Located at python/mlc_llm/json_ffi/engine.py, this file defines the JSONFFIEngine class along with several supporting classes: EngineState, BackgroundLoops, Completions, and Chat. The engine implements an OpenAI-compatible chat completions API that communicates with the underlying TVM-based C++ engine through JSON-serialized messages.
Purpose
While the AsyncMLCEngine provides an asynchronous interface for high-concurrency serving, the JSONFFIEngine offers a synchronous, streaming-only interface suitable for simpler use cases, testing, and integration scenarios. All communication with the C++ engine backend happens through JSON strings, hence the "JSON FFI" naming. The module bridges Python's synchronous iterator protocol with the engine's background processing threads.
File Location
python/mlc_llm/json_ffi/engine.py
Imports and Dependencies
import json
import queue
import threading
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Union
import tvm
from mlc_llm.protocol import debug_protocol, openai_api_protocol
from mlc_llm.serve import engine_utils
from mlc_llm.serve.engine_base import (
EngineConfig,
EngineMetrics,
_check_engine_config,
_parse_models,
_process_model_args,
_query_engine_metrics,
detect_device,
)
from mlc_llm.tokenizers import Tokenizer
Key dependencies:
- queue / threading -- Used for synchronous communication between the background engine threads and the Python caller.
- tvm -- The TVM runtime, used to load and invoke the C++ JSON FFI engine module.
- openai_api_protocol -- Defines the OpenAI-compatible request and response data models.
- engine_base -- Provides shared engine initialization utilities.
EngineState Class
The EngineState class manages the synchronous communication channel between the background engine threads and the calling thread.
class EngineState:
sync_queue: queue.Queue
def get_request_stream_callback(self) -> Callable[[str], None]:
def _callback(chat_completion_stream_responses_json_str: str) -> None:
self._sync_request_stream_callback(chat_completion_stream_responses_json_str)
return _callback
def _sync_request_stream_callback(self, chat_completion_stream_responses_json_str: str) -> None:
self.sync_queue.put_nowait(chat_completion_stream_responses_json_str)
Request Stream Callback
The get_request_stream_callback method returns a callback function that the C++ engine invokes whenever it produces output. The callback places the JSON response string into a queue.Queue using put_nowait (non-blocking).
Chat Completion Handler
The handle_chat_completion method drives a single chat completion request through the engine:
def handle_chat_completion(
self, ffi: dict, request_json_str: str, include_usage: bool, request_id: str
) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:
self.sync_queue = queue.Queue()
success = bool(ffi["chat_completion"](request_json_str, request_id))
try:
last_chunk_arrived = False
while not last_chunk_arrived:
chat_completion_responses_json_str = self.sync_queue.get()
chat_completion_responses_list = json.loads(chat_completion_responses_json_str)
for chat_completion_response_json_dict in chat_completion_responses_list:
chat_completion_response = (
openai_api_protocol.ChatCompletionStreamResponse.model_validate(
chat_completion_response_json_dict
)
)
if chat_completion_response.usage is not None:
if include_usage:
yield chat_completion_response
last_chunk_arrived = True
break
yield chat_completion_response
except Exception as exception:
ffi["abort"](request_id)
raise exception
The flow:
- A fresh
queue.Queueis created for this request. - The
chat_completionFFI function is called with the serialized request JSON and request ID. - The method enters a blocking loop, reading chunks from the queue.
- Each JSON string from the queue may contain multiple response objects, which are individually validated and yielded.
- The loop terminates when a response chunk containing
usagedata arrives (the final chunk). - If
include_usageisTrue, the final usage chunk is also yielded. - On exception, the request is aborted via the
abortFFI call.
BackgroundLoops Class
This class manages two background threads that drive the engine's processing loops.
class BackgroundLoops:
def __init__(self, ffi: dict):
self._ffi = ffi
background_loop = self._ffi["run_background_loop"]
background_stream_back_loop = self._ffi["run_background_stream_back_loop"]
self._background_loop_thread: threading.Thread = threading.Thread(target=background_loop)
self._background_stream_back_loop_thread: threading.Thread = threading.Thread(
target=background_stream_back_loop
)
self._background_loop_thread.start()
self._background_stream_back_loop_thread.start()
self._terminated = False
Two threads are started:
- background_loop -- The main engine processing loop that schedules and executes inference steps.
- background_stream_back_loop -- A secondary loop that streams results back to the Python side via the registered callback.
The terminate method signals the C++ engine to exit its loops and joins both threads:
def terminate(self):
if self._terminated:
return
self._terminated = True
self._ffi["exit_background_loop"]()
self._background_loop_thread.join()
self._background_stream_back_loop_thread.join()
The __del__ destructor calls terminate() to ensure cleanup even if the user does not call it explicitly.
Completions Class
The Completions class provides the create method, implementing the OpenAI-compatible chat completions interface.
class Completions:
"""Completions class to be compatible with OpenAI API"""
def create(
self,
*,
messages: List[Dict[str, Any]],
model: str = None,
frequency_penalty: Optional[float] = None,
presence_penalty: Optional[float] = None,
logprobs: bool = False,
top_logprobs: int = 0,
logit_bias: Optional[Dict[int, float]] = None,
max_tokens: Optional[int] = None,
n: int = 1,
seed: Optional[int] = None,
stop: Optional[Union[str, List[str]]] = None,
stream: bool = True,
stream_options: Optional[Dict[str, Any]] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[Union[Literal["none", "auto"], Dict]] = None,
user: Optional[str] = None,
response_format: Optional[Dict[str, Any]] = None,
request_id: Optional[str] = None,
extra_body: Optional[Dict[str, Any]] = None,
) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:
Key behavior:
- Stream-only: The
streamparameter must beTrue. IfFalseis passed, aValueErroris raised with the message "JSONFFIEngine only support stream=True". - Request ID generation: If no
request_idis provided, one is generated usingengine_utils.random_uuid()with the prefix"chatcmpl-". - Message validation: Each message dict is validated against
openai_api_protocol.ChatCompletionMessage. - Tool support: The method accepts
toolsandtool_choiceparameters, validated viaChatTool. - Debug config: An optional
debug_configcan be passed throughextra_body. - JSON serialization: The request is serialized via
model_dump_json(by_alias=True)before being passed to the FFI layer.
Chat Class
A simple wrapper that exposes the completions attribute, mirroring the OpenAI client structure (client.chat.completions.create(...)).
class Chat:
"""Chat class to be compatible with OpenAI API"""
completions: Completions
def __init__(self, ffi: dict, state: EngineState, background_loops: BackgroundLoops):
self.completions = Completions(ffi, state, background_loops)
JSONFFIEngine Class
The main engine class that initializes the C++ backend and exposes the chat completions API.
Initialization
class JSONFFIEngine:
chat: Chat
def __init__(
self,
model: str,
device: Union[str, tvm.runtime.Device] = "auto",
*,
model_lib: Optional[str] = None,
mode: Literal["local", "interactive", "server"] = "local",
engine_config: Optional[EngineConfig] = None,
) -> None:
The initialization process:
- Validate engine config: Calls
_check_engine_configto verify configuration consistency. - Parse models: Uses
_parse_modelsto resolve model paths and library paths, including any additional models (for speculative decoding). - Detect device: Converts string device specifications (e.g.,
"auto","cuda:0") to atvm.runtime.Device. - Process model args: Calls
_process_model_argsto prepare final model arguments. - Create FFI module: Loads the C++ engine via
tvm.get_global_func("mlc.json_ffi.CreateJSONFFIEngine"). - Extract FFI functions: Stores references to nine FFI functions in a dictionary:
self._ffi = {
key: module[key]
for key in [
"init_background_engine",
"reload",
"unload",
"reset",
"chat_completion",
"abort",
"run_background_loop",
"run_background_stream_back_loop",
"exit_background_loop",
]
}
| FFI Function | Purpose |
|---|---|
init_background_engine |
Initialize the engine with device type, device index, and stream callback |
reload |
Load/reload model configuration (takes JSON config string) |
unload |
Unload the current model |
reset |
Reset engine state |
chat_completion |
Submit a chat completion request |
abort |
Abort a specific request by ID |
run_background_loop |
Main engine processing loop (run in background thread) |
run_background_stream_back_loop |
Stream-back loop (run in background thread) |
exit_background_loop |
Signal both background loops to exit |
After extracting FFI functions, the constructor:
- Creates a
Tokenizerfrom the model path. - Starts the
BackgroundLoops. - Configures and stores the
EngineConfig. - Calls
init_background_enginewith the device type, device index, and stream callback. - Calls
reloadwith the JSON-serialized engine configuration. - Creates the
Chatobject.
Public Methods
def metrics(self) -> EngineMetrics:
"""Get the engine metrics."""
return _query_engine_metrics(self)
def terminate(self):
"""Explicitly terminate the engine"""
self._background_loops.terminate()
Internal/Test Methods
def _raw_chat_completion(
self, request_json_str: str, include_usage: bool, request_id: str
) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:
"""Raw chat completion API"""
return self._state.handle_chat_completion(
self._ffi, request_json_str, include_usage, request_id
)
def _test_reload(self):
self._ffi["reload"](self.engine_config.asjson())
def _test_reset(self):
self._ffi["reset"]()
def _test_unload(self):
self._ffi["unload"]()
The _raw_chat_completion method provides a lower-level API that accepts a pre-serialized JSON string. The _test_* methods expose engine lifecycle operations for testing purposes.
Architecture Diagram
The following describes the data flow during a chat completion request:
- User calls
engine.chat.completions.create(messages=[...]) - Completions.create validates parameters, builds a
ChatCompletionRequest, serializes to JSON - EngineState.handle_chat_completion submits the request via
ffi["chat_completion"] - C++ Engine (running in background thread) processes the request
- Stream-back thread invokes the Python callback, placing JSON chunks into
EngineState.sync_queue - EngineState.handle_chat_completion reads chunks from the queue, deserializes, and yields
ChatCompletionStreamResponseobjects - User iterates over the yielded responses
Relationship to Other Modules
- engine_base (
mlc_llm.serve.engine_base) -- Provides shared initialization utilities (_check_engine_config,_parse_models,_process_model_args,detect_device) and theEngineConfigclass. - openai_api_protocol (
mlc_llm.protocol.openai_api_protocol) -- Defines the Pydantic models for chat completion requests and streaming responses. - debug_protocol (
mlc_llm.protocol.debug_protocol) -- Provides theDebugConfigmodel for optional debug configuration. - engine_utils (
mlc_llm.serve.engine_utils) -- Providesrandom_uuid()for request ID generation. - Tokenizer (
mlc_llm.tokenizers) -- Loaded from the model path and stored asself.tokenizer. - TVM Runtime -- The C++ JSON FFI engine module is loaded via
tvm.get_global_func.