Implementation:Mlc ai Mlc llm Threaded Engine
| Knowledge Sources | |
|---|---|
| Domains | LLM Serving, Concurrency, Engine Architecture |
| Last Updated | 2026-02-09 19:00 GMT |
Overview
A thread-safe engine wrapper that runs the MLC LLM serving engine in a background thread, providing asynchronous request processing and stream callback delivery.
Description
The ThreadedEngine is a concurrency layer that wraps the core MLC LLM engine to enable asynchronous, non-blocking serving. It separates request submission from request processing using a background loop architecture with two dedicated threads.
InstructionKind is an enum defining the types of operations that can be queued: kAddRequest, kAbortRequest, kUnloadEngine, kReloadEngine, kResetEngine, and kDebugCallFuncOnAllAllWorker.
ThreadedEngineImpl is the main implementation class. It maintains:
- An instruction queue where operations are submitted from the foreground thread.
- A background engine pointer that performs the actual inference work.
- Multiple mutexes and condition variables for thread synchronization.
- Atomic counters for pending operations.
The threading model uses two background loops:
RunBackgroundLoop is the main processing loop. It waits on a condition variable until either the engine has work to do (non-empty request queue) or new instructions arrive. When woken, it copies the instruction queue under a lock, clears it, and processes each instruction. After processing instructions, it calls background_engine_->Step() to advance the engine by one step. This design minimizes lock contention by batching instruction processing.
RunBackgroundStreamBackLoop is a separate thread that handles streaming output callbacks. When the background engine produces output tokens, they are queued into request_stream_callback_inputs_. This loop wakes up, flattens the queued outputs, and invokes the user-provided stream callback function. This separation ensures that slow callbacks do not block the main engine processing loop.
Request submission methods (AddRequest, AbortRequest, Reload, Unload, Reset) all follow the same pattern: acquire the mutex, append to the instruction queue, increment the pending counter, and conditionally notify the background loop. The Reload and Unload operations additionally use a separate mutex and condition variable to block the caller until the operation completes, preventing race conditions during engine lifecycle transitions.
ThreadedEngineModule wraps the implementation as a TVM Module, exposing all methods through TVM's module vtable system for use from Python via the FFI boundary.
Usage
The ThreadedEngine is the primary entry point used by the Python-side serving infrastructure. It allows the Python layer to submit requests and receive streaming results without blocking, while all heavy computation occurs on a dedicated background thread. It is created via ThreadedEngine::Create() or the TVM FFI function mlc.serve.create_threaded_engine.
Code Reference
Source Location
- Repository: Mlc_ai_Mlc_llm
- File: cpp/serve/threaded_engine.cc
Signature
class ThreadedEngineImpl : public ThreadedEngine {
public:
void InitThreadedEngine(Device device, Optional<Function> request_stream_callback,
Optional<EventTraceRecorder> trace_recorder) final;
void Reload(String engine_config_json_str) final;
void Unload() final;
void Reset() final;
void AddRequest(Request request) final;
void AbortRequest(const String& request_id) final;
void RunBackgroundLoop() final;
void RunBackgroundStreamBackLoop() final;
void ExitBackgroundLoop() final;
GenerationConfig GetDefaultGenerationConfig() const final;
EngineConfig GetCompleteEngineConfig() const final;
void DebugCallFuncOnAllAllWorker(const String& func_name, Optional<String> func_args) final;
};
// Factory functions
static std::unique_ptr<ThreadedEngine> ThreadedEngine::Create();
// TVM FFI: "mlc.serve.create_threaded_engine"
Import
#include "threaded_engine.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| device | Device | Yes | The TVM device (e.g., GPU) to run models on. |
| request_stream_callback | Optional<Function> | Yes | Callback function invoked when streaming output is available. |
| trace_recorder | Optional<EventTraceRecorder> | No | Optional event trace recorder for profiling. |
| engine_config_json_str | String | For Reload | JSON string containing the engine configuration. |
| request | Request | For AddRequest | A user request to process. |
| request_id | String | For AbortRequest | The ID of the request to abort. |
Outputs
| Name | Type | Description |
|---|---|---|
| Stream callback invocation | Array<RequestStreamOutput> | Delta outputs streamed to the user via the callback function. |
| GetDefaultGenerationConfig() | GenerationConfig | The default generation configuration after engine reload. |
| GetCompleteEngineConfig() | EngineConfig | The complete engine configuration after engine reload. |
Usage Examples
// Create a threaded engine
auto engine = ThreadedEngine::Create();
// Initialize with device and callback
engine->InitThreadedEngine(device, request_stream_callback, trace_recorder);
// Load the engine with configuration
engine->Reload(engine_config_json);
// Submit a request (non-blocking)
engine->AddRequest(request);
// Abort a request
engine->AbortRequest("request_123");
// Run the background loops (typically on dedicated threads)
std::thread bg_thread([&] { engine->RunBackgroundLoop(); });
std::thread cb_thread([&] { engine->RunBackgroundStreamBackLoop(); });
// When done, exit
engine->ExitBackgroundLoop();
bg_thread.join();
cb_thread.join();