Implementation:CrewAIInc CrewAI Flow Kickoff And Plot
Overview
Flow Kickoff and Plot provides the concrete methods for executing flows (kickoff(), kickoff_async()) and generating interactive visualizations (plot()) in the CrewAI Flow engine. These are instance methods on the Flow base class that serve as the primary API for running and inspecting flow workflows.
Source Reference
| Component | File | Lines |
|---|---|---|
kickoff() sync entry point |
src/crewai/flow/flow.py |
L1557-1616 |
kickoff_async() async execution |
src/crewai/flow/flow.py |
L1618-1636 |
plot() visualization |
src/crewai/flow/flow.py |
L2690-2708 |
build_flow_structure() graph builder |
src/crewai/flow/visualization/builder.py |
L178-439 |
render_interactive() HTML renderer |
src/crewai/flow/visualization/renderers/interactive.py |
(full file) |
calculate_execution_paths() path counter |
src/crewai/flow/visualization/builder.py |
L442-511 |
Signatures
def kickoff(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution in a synchronous context.
Args:
inputs: Optional dictionary containing input values and/or a state ID.
input_files: Optional dict of named file inputs for the flow.
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
async def kickoff_async(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution asynchronously.
Args:
inputs: Optional dictionary with input values and/or a state ID for restoration.
input_files: Optional dict of named file inputs for the flow.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
def plot(self, filename: str = "crewai_flow.html", show: bool = True) -> str:
"""Create interactive HTML visualization of Flow structure.
Args:
filename: Output HTML filename (default: "crewai_flow.html").
show: Whether to open in browser (default: True).
Returns:
Absolute path to generated HTML file.
"""
Import
from crewai.flow.flow import Flow
# kickoff(), kickoff_async(), and plot() are instance methods on Flow
I/O Contract
| Method | Input | Output | Side Effects |
|---|---|---|---|
kickoff() |
Optional inputs dict, optional input_files dict |
Return value of last executed method, or FlowStreamingOutput |
Executes all flow methods, emits events, optionally persists state |
kickoff_async() |
Same as kickoff() |
Same as kickoff() |
Same as kickoff(), but runs in async context
|
plot() |
Optional filename (str), optional show (bool) | Absolute path to generated HTML file (str) | Writes HTML file to disk, optionally opens browser |
Detailed Behavior
kickoff() Execution Flow
The synchronous kickoff() method handles two modes:
Streaming mode (when self.stream = True):
- Creates a
FlowStreamingOutputwith a sync iterator - Launches the flow execution in a background thread
- Returns the streaming output immediately; results are yielded as chunks
Standard mode:
- Wraps
kickoff_async()withasyncio.run() - Blocks until the flow completes
- Returns the final method output
kickoff_async() Execution Flow
The async entry point performs the complete execution lifecycle:
- State restoration -- If
inputscontains an"id"key and persistence is configured, callspersistence.load_state()to restore previous state - State initialization -- Applies any additional
inputsto the state object via_initialize_state() - Event emission -- Emits
FlowStartedEventto the event bus - Start method execution -- Launches all
@start()methods as concurrentasynciotasks - Listener propagation -- As each method completes, calls
_execute_listeners()which:- Finds all listeners whose trigger conditions reference the completed method
- Evaluates OR conditions (fire immediately, at most once)
- Evaluates AND conditions (accumulate completions, fire when all met)
- Evaluates nested conditions recursively
- Launches matching listeners as concurrent tasks
- Router handling -- Router method return values are treated as trigger strings for downstream listener matching
- Output collection -- Method outputs are appended to
_method_outputs; the last output becomes the flow result - Event emission -- Emits
FlowFinishedEventwith the final result - Human feedback handling -- If a
HumanFeedbackPendingexception is raised, saves pending context to persistence and re-raises
plot() Visualization Flow
The plot() method:
- Emits a
FlowPlotEventto the event bus - Calls
build_flow_structure(self)to extract the graph structure - Calls
render_interactive(structure, filename, show)to generate HTML
build_flow_structure() produces a FlowStructure TypedDict containing:
nodes-- Dict mapping method names toNodeMetadata(type, trigger methods, condition type, router paths, source code, signatures)edges-- List ofStructureEdgeobjects (source, target, condition_type, is_router_path, router_path_label)start_methods-- List of start method namesrouter_methods-- List of router method names
The renderer generates a self-contained HTML file with interactive graph visualization that can be opened in any browser.
Example
from crewai.flow.flow import Flow, FlowState, start, listen, router
from typing import Literal
class AnalysisState(FlowState):
data: str = ""
result: str = ""
class AnalysisFlow(Flow[AnalysisState]):
@start()
def load_data(self):
self.state.data = "sample dataset"
return self.state.data
@listen(load_data)
def analyze(self):
self.state.result = f"Analyzed: {self.state.data}"
return self.state.result
@router(analyze)
def check_quality(self) -> Literal["PASS", "FAIL"]:
if len(self.state.result) > 5:
return "PASS"
return "FAIL"
@listen("PASS")
def publish(self):
return f"Published: {self.state.result}"
@listen("FAIL")
def retry(self):
return "Retrying analysis..."
# Standard execution
flow = AnalysisFlow()
result = flow.kickoff(inputs={"data": "real dataset"})
print(result)
# Async execution
import asyncio
async def run_async():
flow = AnalysisFlow()
result = await flow.kickoff_async(inputs={"data": "real dataset"})
return result
asyncio.run(run_async())
# Generate visualization
flow = AnalysisFlow()
html_path = flow.plot(filename="analysis_flow.html", show=True)
print(f"Visualization saved to: {html_path}")
# Generate without opening browser
html_path = flow.plot(filename="analysis_flow.html", show=False)
Related Pages
- Principle:CrewAIInc_CrewAI_Flow_Execution_And_Visualization
- Implementation:CrewAIInc_CrewAI_Flow_Decorators -- Decorators that define the execution graph traversed by kickoff
- Implementation:CrewAIInc_CrewAI_Flow_State_Model -- State initialized and managed during kickoff
- Implementation:CrewAIInc_CrewAI_Flow_Routing_Primitives -- Routing conditions evaluated during execution
- Implementation:CrewAIInc_CrewAI_SQLite_Flow_Persistence -- Persistence backend used for state restoration in kickoff
- Implementation:CrewAIInc_CrewAI_Crew_Kickoff_In_Flow -- Crew executions triggered within flow methods during kickoff