Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Infiniflow Ragflow Canvas Run

From Leeroopedia
Knowledge Sources
Domains RAG, Agent_Systems
Last Updated 2026-02-12 06:00 GMT

Overview

Concrete tool for executing agent workflows as async generators with SSE event streaming provided by RAGFlow's Canvas class.

Description

Canvas.run is the main execution method that traverses the DSL graph, instantiates component objects, invokes each component's _run method, handles streaming from LLM components, manages branching/iteration control flow, and yields SSE events. It extends the Graph base class which handles DSL parsing and component management.

Usage

Called from the POST /canvas/completion endpoint. Also used by POST /canvas/debug for single-component testing.

Code Reference

Source Location

  • Repository: ragflow
  • File: agent/canvas.py
  • Lines: L369-649

Signature

class Canvas(Graph):
    async def run(self, **kwargs) -> AsyncGenerator[dict, None]:
        """Execute the agent workflow.

        Args:
            query: str - User question.
            files: list - Uploaded files.
            inputs: dict - Custom input variables.
            user_id: str - User ID.

        Yields:
            dict - SSE events:
                workflow_started: {inputs}
                node_started: {component_id, label, name}
                node_finished: {component_id, outputs}
                message: {content, audio_binary?}
                message_end: {reference?, status?, attachment?}
                user_inputs: {inputs, tips}
                workflow_finished: {inputs, outputs}
                node_error: {component_id, error}
        """

Import

from agent.canvas import Canvas, Graph

I/O Contract

Inputs

Name Type Required Description
query str No User question
files list No Uploaded files
inputs dict No Custom variables
user_id str No User ID

Outputs

Name Type Description
SSE events AsyncGenerator[dict] Stream of workflow execution events

Usage Examples

from agent.canvas import Canvas
import json

canvas = Canvas(dsl_json, tenant_id, canvas_id=canvas_id)

async for event in canvas.run(query="Analyze this data", user_id="user-123"):
    print(f"Event: {event['event']} - {json.dumps(event['data'])[:100]}")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment