Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI Flow Kickoff And Plot

From Leeroopedia

Overview

Flow Kickoff and Plot provides the concrete methods for executing flows (kickoff(), kickoff_async()) and generating interactive visualizations (plot()) in the CrewAI Flow engine. These are instance methods on the Flow base class that serve as the primary API for running and inspecting flow workflows.

Source Reference

Component File Lines
kickoff() sync entry point src/crewai/flow/flow.py L1557-1616
kickoff_async() async execution src/crewai/flow/flow.py L1618-1636
plot() visualization src/crewai/flow/flow.py L2690-2708
build_flow_structure() graph builder src/crewai/flow/visualization/builder.py L178-439
render_interactive() HTML renderer src/crewai/flow/visualization/renderers/interactive.py (full file)
calculate_execution_paths() path counter src/crewai/flow/visualization/builder.py L442-511

Signatures

def kickoff(
    self,
    inputs: dict[str, Any] | None = None,
    input_files: dict[str, FileInput] | None = None,
) -> Any | FlowStreamingOutput:
    """Start the flow execution in a synchronous context.

    Args:
        inputs: Optional dictionary containing input values and/or a state ID.
        input_files: Optional dict of named file inputs for the flow.

    Returns:
        The final output from the flow or FlowStreamingOutput if streaming.
    """

async def kickoff_async(
    self,
    inputs: dict[str, Any] | None = None,
    input_files: dict[str, FileInput] | None = None,
) -> Any | FlowStreamingOutput:
    """Start the flow execution asynchronously.

    Args:
        inputs: Optional dictionary with input values and/or a state ID for restoration.
        input_files: Optional dict of named file inputs for the flow.

    Returns:
        The final output from the flow, which is the result of the last executed method.
    """

def plot(self, filename: str = "crewai_flow.html", show: bool = True) -> str:
    """Create interactive HTML visualization of Flow structure.

    Args:
        filename: Output HTML filename (default: "crewai_flow.html").
        show: Whether to open in browser (default: True).

    Returns:
        Absolute path to generated HTML file.
    """

Import

from crewai.flow.flow import Flow
# kickoff(), kickoff_async(), and plot() are instance methods on Flow

I/O Contract

Method Input Output Side Effects
kickoff() Optional inputs dict, optional input_files dict Return value of last executed method, or FlowStreamingOutput Executes all flow methods, emits events, optionally persists state
kickoff_async() Same as kickoff() Same as kickoff() Same as kickoff(), but runs in async context
plot() Optional filename (str), optional show (bool) Absolute path to generated HTML file (str) Writes HTML file to disk, optionally opens browser

Detailed Behavior

kickoff() Execution Flow

The synchronous kickoff() method handles two modes:

Streaming mode (when self.stream = True):

  1. Creates a FlowStreamingOutput with a sync iterator
  2. Launches the flow execution in a background thread
  3. Returns the streaming output immediately; results are yielded as chunks

Standard mode:

  1. Wraps kickoff_async() with asyncio.run()
  2. Blocks until the flow completes
  3. Returns the final method output

kickoff_async() Execution Flow

The async entry point performs the complete execution lifecycle:

  1. State restoration -- If inputs contains an "id" key and persistence is configured, calls persistence.load_state() to restore previous state
  2. State initialization -- Applies any additional inputs to the state object via _initialize_state()
  3. Event emission -- Emits FlowStartedEvent to the event bus
  4. Start method execution -- Launches all @start() methods as concurrent asyncio tasks
  5. Listener propagation -- As each method completes, calls _execute_listeners() which:
    • Finds all listeners whose trigger conditions reference the completed method
    • Evaluates OR conditions (fire immediately, at most once)
    • Evaluates AND conditions (accumulate completions, fire when all met)
    • Evaluates nested conditions recursively
    • Launches matching listeners as concurrent tasks
  6. Router handling -- Router method return values are treated as trigger strings for downstream listener matching
  7. Output collection -- Method outputs are appended to _method_outputs; the last output becomes the flow result
  8. Event emission -- Emits FlowFinishedEvent with the final result
  9. Human feedback handling -- If a HumanFeedbackPending exception is raised, saves pending context to persistence and re-raises

plot() Visualization Flow

The plot() method:

  1. Emits a FlowPlotEvent to the event bus
  2. Calls build_flow_structure(self) to extract the graph structure
  3. Calls render_interactive(structure, filename, show) to generate HTML

build_flow_structure() produces a FlowStructure TypedDict containing:

  • nodes -- Dict mapping method names to NodeMetadata (type, trigger methods, condition type, router paths, source code, signatures)
  • edges -- List of StructureEdge objects (source, target, condition_type, is_router_path, router_path_label)
  • start_methods -- List of start method names
  • router_methods -- List of router method names

The renderer generates a self-contained HTML file with interactive graph visualization that can be opened in any browser.

Example

from crewai.flow.flow import Flow, FlowState, start, listen, router
from typing import Literal


class AnalysisState(FlowState):
    data: str = ""
    result: str = ""


class AnalysisFlow(Flow[AnalysisState]):

    @start()
    def load_data(self):
        self.state.data = "sample dataset"
        return self.state.data

    @listen(load_data)
    def analyze(self):
        self.state.result = f"Analyzed: {self.state.data}"
        return self.state.result

    @router(analyze)
    def check_quality(self) -> Literal["PASS", "FAIL"]:
        if len(self.state.result) > 5:
            return "PASS"
        return "FAIL"

    @listen("PASS")
    def publish(self):
        return f"Published: {self.state.result}"

    @listen("FAIL")
    def retry(self):
        return "Retrying analysis..."


# Standard execution
flow = AnalysisFlow()
result = flow.kickoff(inputs={"data": "real dataset"})
print(result)

# Async execution
import asyncio
async def run_async():
    flow = AnalysisFlow()
    result = await flow.kickoff_async(inputs={"data": "real dataset"})
    return result
asyncio.run(run_async())

# Generate visualization
flow = AnalysisFlow()
html_path = flow.plot(filename="analysis_flow.html", show=True)
print(f"Visualization saved to: {html_path}")

# Generate without opening browser
html_path = flow.plot(filename="analysis_flow.html", show=False)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment