Implementation:CrewAIInc CrewAI Flow Decorators
Overview
Flow Decorators provides the concrete decorator functions (@start, @listen, @router) and their corresponding wrapper classes (StartMethod, ListenMethod, RouterMethod) that declare flow entry points, listeners, and routers in the CrewAI Flow engine.
Source Reference
| Component | File | Lines |
|---|---|---|
start() decorator |
src/crewai/flow/flow.py |
L131-203 |
listen() decorator |
src/crewai/flow/flow.py |
L206-269 |
router() decorator |
src/crewai/flow/flow.py |
L272-343 |
StartMethod wrapper |
src/crewai/flow/flow_wrappers.py |
L143-149 |
ListenMethod wrapper |
src/crewai/flow/flow_wrappers.py |
L152-157 |
RouterMethod wrapper |
src/crewai/flow/flow_wrappers.py |
L160-166 |
FlowMethod base wrapper |
src/crewai/flow/flow_wrappers.py |
L41-140 |
FlowMeta metaclass |
src/crewai/flow/flow.py |
L607-687 |
Signatures
def start(
condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
"""Marks a method as a flow's starting point."""
def listen(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
"""Creates a listener that executes when specified conditions are met."""
def router(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions."""
Import
from crewai.flow.flow import Flow, start, listen, router
I/O Contract
| Decorator | Input (condition parameter) | Output (wrapper) | Metadata Set |
|---|---|---|---|
@start() |
None (unconditional) or method/string/FlowCondition |
StartMethod |
__is_start_method__ = True
|
@start(condition) |
Method reference, string, or or_()/and_() result |
StartMethod |
__is_start_method__, __trigger_methods__, __condition_type__
|
@listen(condition) |
Method reference, string, or or_()/and_() result |
ListenMethod |
__trigger_methods__, __condition_type__
|
@router(condition) |
Method reference, string, or or_()/and_() result |
RouterMethod |
__is_router__ = True, __trigger_methods__, __condition_type__
|
Detailed Behavior
Decorator Mechanics
Each decorator is a factory function that returns a decorator. The inner decorator wraps the target function in the appropriate FlowMethod subclass:
@start()wraps the function inStartMethod, setting__is_start_method__ = True. If a condition is provided, it also sets__trigger_methods__and__condition_type__.@listen(condition)wraps the function inListenMethod, always setting__trigger_methods__and__condition_type__.@router(condition)wraps the function inRouterMethod, setting__is_router__ = Truealong with trigger metadata. TheFlowMetametaclass also inspects the router's return type annotation to extract possible return constants (viaget_possible_return_constants()).
FlowMethod Wrapper Classes
The FlowMethod base class implements the descriptor protocol (__get__) so wrappers bind correctly when accessed as instance attributes. It preserves the original function's signature, docstring, and coroutine status. Key attributes:
class FlowMethod(Generic[P, R]):
_meth: Callable[P, R] # Original unwrapped function
_instance: Any # Bound instance (None for unbound)
__name__: FlowMethodName # Method name used for event routing
__signature__: inspect.Signature
FlowMeta Metaclass
At class creation time, FlowMeta.__new__ scans the namespace for attributes with flow-related metadata and builds four class-level registries:
_start_methods: list[FlowMethodName]-- All methods with__is_start_method___listeners: dict[FlowMethodName, SimpleFlowCondition | FlowCondition]-- All methods with__trigger_methods___routers: set[FlowMethodName]-- All methods with__is_router___router_paths: dict[FlowMethodName, list[FlowMethodName]]-- Possible return values for each router
Example
from crewai.flow.flow import Flow, FlowState, start, listen, router
from pydantic import Field
class PipelineState(FlowState):
raw_data: str = ""
processed_data: str = ""
route: str = ""
class DataPipeline(Flow[PipelineState]):
@start()
def ingest_data(self):
"""Entry point: load raw data."""
self.state.raw_data = "raw sensor readings..."
return self.state.raw_data
@listen(ingest_data)
def validate_data(self, raw_data):
"""Listener: validate the ingested data."""
self.state.processed_data = raw_data.strip()
return self.state.processed_data
@router(validate_data)
def route_by_quality(self) -> str:
"""Router: direct flow based on data quality."""
if len(self.state.processed_data) > 10:
return "HIGH_QUALITY"
return "LOW_QUALITY"
@listen("HIGH_QUALITY")
def analyze_data(self):
"""Triggered when router returns 'HIGH_QUALITY'."""
return f"Analysis of: {self.state.processed_data}"
@listen("LOW_QUALITY")
def flag_for_review(self):
"""Triggered when router returns 'LOW_QUALITY'."""
return f"Flagged: {self.state.processed_data}"
flow = DataPipeline()
result = flow.kickoff()
Related Pages
- Principle:CrewAIInc_CrewAI_Flow_Class_Definition
- Implementation:CrewAIInc_CrewAI_Flow_State_Model -- State model accessed by decorated methods
- Implementation:CrewAIInc_CrewAI_Flow_Routing_Primitives --
or_()andand_()combinators used in decorator conditions - Implementation:CrewAIInc_CrewAI_Flow_Kickoff_And_Plot -- Runtime execution of the decorator-defined graph
- Environment:CrewAIInc_CrewAI_Python_Runtime_Environment