Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mlc ai Mlc llm JSON FFI Engine Py

From Leeroopedia
Revision as of 15:50, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Mlc_ai_Mlc_llm_JSON_FFI_Engine_Py.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

The JSON FFI Engine module provides a synchronous Python interface to the MLC LLM inference engine via JSON-based Foreign Function Interface (FFI) calls. Located at python/mlc_llm/json_ffi/engine.py, this file defines the JSONFFIEngine class along with several supporting classes: EngineState, BackgroundLoops, Completions, and Chat. The engine implements an OpenAI-compatible chat completions API that communicates with the underlying TVM-based C++ engine through JSON-serialized messages.

Purpose

While the AsyncMLCEngine provides an asynchronous interface for high-concurrency serving, the JSONFFIEngine offers a synchronous, streaming-only interface suitable for simpler use cases, testing, and integration scenarios. All communication with the C++ engine backend happens through JSON strings, hence the "JSON FFI" naming. The module bridges Python's synchronous iterator protocol with the engine's background processing threads.

File Location

python/mlc_llm/json_ffi/engine.py

Imports and Dependencies

import json
import queue
import threading
from typing import Any, Callable, Dict, Iterator, List, Literal, Optional, Union

import tvm

from mlc_llm.protocol import debug_protocol, openai_api_protocol
from mlc_llm.serve import engine_utils
from mlc_llm.serve.engine_base import (
    EngineConfig,
    EngineMetrics,
    _check_engine_config,
    _parse_models,
    _process_model_args,
    _query_engine_metrics,
    detect_device,
)
from mlc_llm.tokenizers import Tokenizer

Key dependencies:

  • queue / threading -- Used for synchronous communication between the background engine threads and the Python caller.
  • tvm -- The TVM runtime, used to load and invoke the C++ JSON FFI engine module.
  • openai_api_protocol -- Defines the OpenAI-compatible request and response data models.
  • engine_base -- Provides shared engine initialization utilities.

EngineState Class

The EngineState class manages the synchronous communication channel between the background engine threads and the calling thread.

class EngineState:
    sync_queue: queue.Queue

    def get_request_stream_callback(self) -> Callable[[str], None]:
        def _callback(chat_completion_stream_responses_json_str: str) -> None:
            self._sync_request_stream_callback(chat_completion_stream_responses_json_str)
        return _callback

    def _sync_request_stream_callback(self, chat_completion_stream_responses_json_str: str) -> None:
        self.sync_queue.put_nowait(chat_completion_stream_responses_json_str)

Request Stream Callback

The get_request_stream_callback method returns a callback function that the C++ engine invokes whenever it produces output. The callback places the JSON response string into a queue.Queue using put_nowait (non-blocking).

Chat Completion Handler

The handle_chat_completion method drives a single chat completion request through the engine:

def handle_chat_completion(
    self, ffi: dict, request_json_str: str, include_usage: bool, request_id: str
) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:
    self.sync_queue = queue.Queue()
    success = bool(ffi["chat_completion"](request_json_str, request_id))
    try:
        last_chunk_arrived = False
        while not last_chunk_arrived:
            chat_completion_responses_json_str = self.sync_queue.get()
            chat_completion_responses_list = json.loads(chat_completion_responses_json_str)
            for chat_completion_response_json_dict in chat_completion_responses_list:
                chat_completion_response = (
                    openai_api_protocol.ChatCompletionStreamResponse.model_validate(
                        chat_completion_response_json_dict
                    )
                )
                if chat_completion_response.usage is not None:
                    if include_usage:
                        yield chat_completion_response
                    last_chunk_arrived = True
                    break
                yield chat_completion_response
    except Exception as exception:
        ffi["abort"](request_id)
        raise exception

The flow:

  1. A fresh queue.Queue is created for this request.
  2. The chat_completion FFI function is called with the serialized request JSON and request ID.
  3. The method enters a blocking loop, reading chunks from the queue.
  4. Each JSON string from the queue may contain multiple response objects, which are individually validated and yielded.
  5. The loop terminates when a response chunk containing usage data arrives (the final chunk).
  6. If include_usage is True, the final usage chunk is also yielded.
  7. On exception, the request is aborted via the abort FFI call.

BackgroundLoops Class

This class manages two background threads that drive the engine's processing loops.

class BackgroundLoops:
    def __init__(self, ffi: dict):
        self._ffi = ffi
        background_loop = self._ffi["run_background_loop"]
        background_stream_back_loop = self._ffi["run_background_stream_back_loop"]

        self._background_loop_thread: threading.Thread = threading.Thread(target=background_loop)
        self._background_stream_back_loop_thread: threading.Thread = threading.Thread(
            target=background_stream_back_loop
        )
        self._background_loop_thread.start()
        self._background_stream_back_loop_thread.start()
        self._terminated = False

Two threads are started:

  • background_loop -- The main engine processing loop that schedules and executes inference steps.
  • background_stream_back_loop -- A secondary loop that streams results back to the Python side via the registered callback.

The terminate method signals the C++ engine to exit its loops and joins both threads:

def terminate(self):
    if self._terminated:
        return
    self._terminated = True
    self._ffi["exit_background_loop"]()
    self._background_loop_thread.join()
    self._background_stream_back_loop_thread.join()

The __del__ destructor calls terminate() to ensure cleanup even if the user does not call it explicitly.

Completions Class

The Completions class provides the create method, implementing the OpenAI-compatible chat completions interface.

class Completions:
    """Completions class to be compatible with OpenAI API"""

    def create(
        self,
        *,
        messages: List[Dict[str, Any]],
        model: str = None,
        frequency_penalty: Optional[float] = None,
        presence_penalty: Optional[float] = None,
        logprobs: bool = False,
        top_logprobs: int = 0,
        logit_bias: Optional[Dict[int, float]] = None,
        max_tokens: Optional[int] = None,
        n: int = 1,
        seed: Optional[int] = None,
        stop: Optional[Union[str, List[str]]] = None,
        stream: bool = True,
        stream_options: Optional[Dict[str, Any]] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        tools: Optional[List[Dict[str, Any]]] = None,
        tool_choice: Optional[Union[Literal["none", "auto"], Dict]] = None,
        user: Optional[str] = None,
        response_format: Optional[Dict[str, Any]] = None,
        request_id: Optional[str] = None,
        extra_body: Optional[Dict[str, Any]] = None,
    ) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:

Key behavior:

  • Stream-only: The stream parameter must be True. If False is passed, a ValueError is raised with the message "JSONFFIEngine only support stream=True".
  • Request ID generation: If no request_id is provided, one is generated using engine_utils.random_uuid() with the prefix "chatcmpl-".
  • Message validation: Each message dict is validated against openai_api_protocol.ChatCompletionMessage.
  • Tool support: The method accepts tools and tool_choice parameters, validated via ChatTool.
  • Debug config: An optional debug_config can be passed through extra_body.
  • JSON serialization: The request is serialized via model_dump_json(by_alias=True) before being passed to the FFI layer.

Chat Class

A simple wrapper that exposes the completions attribute, mirroring the OpenAI client structure (client.chat.completions.create(...)).

class Chat:
    """Chat class to be compatible with OpenAI API"""
    completions: Completions

    def __init__(self, ffi: dict, state: EngineState, background_loops: BackgroundLoops):
        self.completions = Completions(ffi, state, background_loops)

JSONFFIEngine Class

The main engine class that initializes the C++ backend and exposes the chat completions API.

Initialization

class JSONFFIEngine:
    chat: Chat

    def __init__(
        self,
        model: str,
        device: Union[str, tvm.runtime.Device] = "auto",
        *,
        model_lib: Optional[str] = None,
        mode: Literal["local", "interactive", "server"] = "local",
        engine_config: Optional[EngineConfig] = None,
    ) -> None:

The initialization process:

  1. Validate engine config: Calls _check_engine_config to verify configuration consistency.
  2. Parse models: Uses _parse_models to resolve model paths and library paths, including any additional models (for speculative decoding).
  3. Detect device: Converts string device specifications (e.g., "auto", "cuda:0") to a tvm.runtime.Device.
  4. Process model args: Calls _process_model_args to prepare final model arguments.
  5. Create FFI module: Loads the C++ engine via tvm.get_global_func("mlc.json_ffi.CreateJSONFFIEngine").
  6. Extract FFI functions: Stores references to nine FFI functions in a dictionary:
self._ffi = {
    key: module[key]
    for key in [
        "init_background_engine",
        "reload",
        "unload",
        "reset",
        "chat_completion",
        "abort",
        "run_background_loop",
        "run_background_stream_back_loop",
        "exit_background_loop",
    ]
}
FFI Function Purpose
init_background_engine Initialize the engine with device type, device index, and stream callback
reload Load/reload model configuration (takes JSON config string)
unload Unload the current model
reset Reset engine state
chat_completion Submit a chat completion request
abort Abort a specific request by ID
run_background_loop Main engine processing loop (run in background thread)
run_background_stream_back_loop Stream-back loop (run in background thread)
exit_background_loop Signal both background loops to exit

After extracting FFI functions, the constructor:

  • Creates a Tokenizer from the model path.
  • Starts the BackgroundLoops.
  • Configures and stores the EngineConfig.
  • Calls init_background_engine with the device type, device index, and stream callback.
  • Calls reload with the JSON-serialized engine configuration.
  • Creates the Chat object.

Public Methods

def metrics(self) -> EngineMetrics:
    """Get the engine metrics."""
    return _query_engine_metrics(self)

def terminate(self):
    """Explicitly terminate the engine"""
    self._background_loops.terminate()

Internal/Test Methods

def _raw_chat_completion(
    self, request_json_str: str, include_usage: bool, request_id: str
) -> Iterator[openai_api_protocol.ChatCompletionStreamResponse]:
    """Raw chat completion API"""
    return self._state.handle_chat_completion(
        self._ffi, request_json_str, include_usage, request_id
    )

def _test_reload(self):
    self._ffi["reload"](self.engine_config.asjson())

def _test_reset(self):
    self._ffi["reset"]()

def _test_unload(self):
    self._ffi["unload"]()

The _raw_chat_completion method provides a lower-level API that accepts a pre-serialized JSON string. The _test_* methods expose engine lifecycle operations for testing purposes.

Architecture Diagram

The following describes the data flow during a chat completion request:

  1. User calls engine.chat.completions.create(messages=[...])
  2. Completions.create validates parameters, builds a ChatCompletionRequest, serializes to JSON
  3. EngineState.handle_chat_completion submits the request via ffi["chat_completion"]
  4. C++ Engine (running in background thread) processes the request
  5. Stream-back thread invokes the Python callback, placing JSON chunks into EngineState.sync_queue
  6. EngineState.handle_chat_completion reads chunks from the queue, deserializes, and yields ChatCompletionStreamResponse objects
  7. User iterates over the yielded responses

Relationship to Other Modules

  • engine_base (mlc_llm.serve.engine_base) -- Provides shared initialization utilities (_check_engine_config, _parse_models, _process_model_args, detect_device) and the EngineConfig class.
  • openai_api_protocol (mlc_llm.protocol.openai_api_protocol) -- Defines the Pydantic models for chat completion requests and streaming responses.
  • debug_protocol (mlc_llm.protocol.debug_protocol) -- Provides the DebugConfig model for optional debug configuration.
  • engine_utils (mlc_llm.serve.engine_utils) -- Provides random_uuid() for request ID generation.
  • Tokenizer (mlc_llm.tokenizers) -- Loaded from the model path and stored as self.tokenizer.
  • TVM Runtime -- The C++ JSON FFI engine module is loaded via tvm.get_global_func.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment