Implementation:Infiniflow Ragflow Canvas Run
| Knowledge Sources | |
|---|---|
| Domains | RAG, Agent_Systems |
| Last Updated | 2026-02-12 06:00 GMT |
Overview
Concrete tool for executing agent workflows as async generators with SSE event streaming provided by RAGFlow's Canvas class.
Description
Canvas.run is the main execution method that traverses the DSL graph, instantiates component objects, invokes each component's _run method, handles streaming from LLM components, manages branching/iteration control flow, and yields SSE events. It extends the Graph base class which handles DSL parsing and component management.
Usage
Called from the POST /canvas/completion endpoint. Also used by POST /canvas/debug for single-component testing.
Code Reference
Source Location
- Repository: ragflow
- File: agent/canvas.py
- Lines: L369-649
Signature
class Canvas(Graph):
async def run(self, **kwargs) -> AsyncGenerator[dict, None]:
"""Execute the agent workflow.
Args:
query: str - User question.
files: list - Uploaded files.
inputs: dict - Custom input variables.
user_id: str - User ID.
Yields:
dict - SSE events:
workflow_started: {inputs}
node_started: {component_id, label, name}
node_finished: {component_id, outputs}
message: {content, audio_binary?}
message_end: {reference?, status?, attachment?}
user_inputs: {inputs, tips}
workflow_finished: {inputs, outputs}
node_error: {component_id, error}
"""
Import
from agent.canvas import Canvas, Graph
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| query | str | No | User question |
| files | list | No | Uploaded files |
| inputs | dict | No | Custom variables |
| user_id | str | No | User ID |
Outputs
| Name | Type | Description |
|---|---|---|
| SSE events | AsyncGenerator[dict] | Stream of workflow execution events |
Usage Examples
from agent.canvas import Canvas
import json
canvas = Canvas(dsl_json, tenant_id, canvas_id=canvas_id)
async for event in canvas.run(query="Analyze this data", user_id="user-123"):
print(f"Event: {event['event']} - {json.dumps(event['data'])[:100]}")