Implementation:Turboderp org Exllamav2 ExLlamaV2DynamicGeneratorAsync
| Knowledge Sources | |
|---|---|
| Domains | Text_Generation, Async |
| Last Updated | 2026-02-15 00:00 GMT |
Overview
ExLlamaV2DynamicGeneratorAsync and ExLlamaV2DynamicJobAsync provide asyncio wrappers around the synchronous dynamic generator and job classes, enabling non-blocking text generation in async server contexts.
Description
The ExLlamaV2DynamicGeneratorAsync class wraps ExLlamaV2DynamicGenerator with an asyncio-based iteration loop. It manages a dictionary mapping synchronous jobs to their async counterparts, an asyncio.Condition variable for coordinating the iteration loop, and a background asyncio.Task that continuously calls the underlying generator's iterate() method.
The iteration task (_run_iteration) waits on the condition variable until at least one job is enqueued, then calls generator.iterate() to process all active jobs. Results are dispatched to the corresponding ExLlamaV2DynamicJobAsync instances via their result queues. When a result signals EOS, the job is removed from the active tracking dictionary. If the generator raises an exception, the error is pushed to all active async jobs.
ExLlamaV2DynamicJobAsync wraps ExLlamaV2DynamicJob and implements the async iterator protocol (__aiter__). Each instance maintains an asyncio.Queue for receiving results from the generator loop. Iteration yields result dictionaries until EOS is received or the job is cancelled. It auto-enqueues itself on the generator during construction.
Usage
Use these async wrappers when integrating ExLlamaV2 text generation into an asyncio-based server (e.g., FastAPI, aiohttp). Create an ExLlamaV2DynamicGeneratorAsync at startup, then create ExLlamaV2DynamicJobAsync instances for each generation request and iterate over them with async for.
Code Reference
Source Location
- Repository: Turboderp_org_Exllamav2
- File: exllamav2/generator/dynamic_async.py
- Lines: 1-104
Signature
class ExLlamaV2DynamicGeneratorAsync:
generator: ExLlamaV2DynamicGenerator
jobs: dict[ExLlamaV2DynamicJob: ExLlamaV2DynamicJobAsync]
condition: asyncio.Condition
iteration_task: asyncio.Task
def __init__(self, *args, **kwargs): ...
async def _run_iteration(self): ...
def enqueue(self, job: ExLlamaV2DynamicJobAsync): ...
async def close(self): ...
async def cancel(self, job: ExLlamaV2DynamicJobAsync): ...
class ExLlamaV2DynamicJobAsync:
job: ExLlamaV2DynamicJob
queue: asyncio.Queue
generator: ExLlamaV2DynamicGeneratorAsync
cancelled: bool = False
def __init__(
self,
generator: ExLlamaV2DynamicGeneratorAsync,
*args: object,
**kwargs: object,
): ...
async def put_result(self, result): ...
async def __aiter__(self): ...
async def cancel(self): ...
Import
from exllamav2.generator.dynamic_async import ExLlamaV2DynamicGeneratorAsync, ExLlamaV2DynamicJobAsync
I/O Contract
ExLlamaV2DynamicGeneratorAsync.__init__()
| Parameter | Type | Description |
|---|---|---|
| *args, **kwargs | any |
All arguments are forwarded directly to ExLlamaV2DynamicGenerator constructor (model, cache, tokenizer, etc.) |
ExLlamaV2DynamicGeneratorAsync.enqueue()
| Parameter | Type | Description |
|---|---|---|
| job | ExLlamaV2DynamicJobAsync |
Async job wrapper to enqueue; must not already be enqueued |
ExLlamaV2DynamicJobAsync.__init__()
| Parameter | Type | Description |
|---|---|---|
| generator | ExLlamaV2DynamicGeneratorAsync |
The async generator to attach this job to |
| *args, **kwargs | any |
Forwarded to ExLlamaV2DynamicJob constructor (input_ids, gen_settings, etc.) |
__aiter__() yields
| Field | Type | Description |
|---|---|---|
| result | dict |
Result dictionary from the generator containing text chunks, EOS status, and other metadata |
| result["eos"] | bool |
True when generation is complete; iteration stops after this |
Usage Examples
import asyncio
from exllamav2.generator.dynamic_async import (
ExLlamaV2DynamicGeneratorAsync,
ExLlamaV2DynamicJobAsync,
)
# Initialize the async generator (wraps ExLlamaV2DynamicGenerator)
async_gen = ExLlamaV2DynamicGeneratorAsync(
model=model,
cache=cache,
tokenizer=tokenizer,
)
# Create and iterate over an async generation job
async def generate(prompt: str):
input_ids = tokenizer.encode(prompt)
job = ExLlamaV2DynamicJobAsync(
async_gen,
input_ids=input_ids,
max_new_tokens=200,
gen_settings=gen_settings,
)
text = ""
async for result in job:
if "text" in result:
text += result["text"]
print(result["text"], end="", flush=True)
return text
# Run the generation
result = asyncio.run(generate("Once upon a time"))
# Cancel a running job
# await job.cancel()
# Shut down the generator cleanly
# await async_gen.close()
Related Pages
- Turboderp_org_Exllamav2_ExLlamaV2WebSocketServer - WebSocket server that can use async generation for streaming
- Turboderp_org_Exllamav2_WebSocket_Actions - Action handlers that implement streaming inference