Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI Flow Decorators

From Leeroopedia

Overview

Flow Decorators provides the concrete decorator functions (@start, @listen, @router) and their corresponding wrapper classes (StartMethod, ListenMethod, RouterMethod) that declare flow entry points, listeners, and routers in the CrewAI Flow engine.

Source Reference

Component File Lines
start() decorator src/crewai/flow/flow.py L131-203
listen() decorator src/crewai/flow/flow.py L206-269
router() decorator src/crewai/flow/flow.py L272-343
StartMethod wrapper src/crewai/flow/flow_wrappers.py L143-149
ListenMethod wrapper src/crewai/flow/flow_wrappers.py L152-157
RouterMethod wrapper src/crewai/flow/flow_wrappers.py L160-166
FlowMethod base wrapper src/crewai/flow/flow_wrappers.py L41-140
FlowMeta metaclass src/crewai/flow/flow.py L607-687

Signatures

def start(
    condition: str | FlowCondition | Callable[..., Any] | None = None,
) -> Callable[[Callable[P, R]], StartMethod[P, R]]:
    """Marks a method as a flow's starting point."""

def listen(
    condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], ListenMethod[P, R]]:
    """Creates a listener that executes when specified conditions are met."""

def router(
    condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
    """Creates a routing method that directs flow execution based on conditions."""

Import

from crewai.flow.flow import Flow, start, listen, router

I/O Contract

Decorator Input (condition parameter) Output (wrapper) Metadata Set
@start() None (unconditional) or method/string/FlowCondition StartMethod __is_start_method__ = True
@start(condition) Method reference, string, or or_()/and_() result StartMethod __is_start_method__, __trigger_methods__, __condition_type__
@listen(condition) Method reference, string, or or_()/and_() result ListenMethod __trigger_methods__, __condition_type__
@router(condition) Method reference, string, or or_()/and_() result RouterMethod __is_router__ = True, __trigger_methods__, __condition_type__

Detailed Behavior

Decorator Mechanics

Each decorator is a factory function that returns a decorator. The inner decorator wraps the target function in the appropriate FlowMethod subclass:

  1. @start() wraps the function in StartMethod, setting __is_start_method__ = True. If a condition is provided, it also sets __trigger_methods__ and __condition_type__.
  2. @listen(condition) wraps the function in ListenMethod, always setting __trigger_methods__ and __condition_type__.
  3. @router(condition) wraps the function in RouterMethod, setting __is_router__ = True along with trigger metadata. The FlowMeta metaclass also inspects the router's return type annotation to extract possible return constants (via get_possible_return_constants()).

FlowMethod Wrapper Classes

The FlowMethod base class implements the descriptor protocol (__get__) so wrappers bind correctly when accessed as instance attributes. It preserves the original function's signature, docstring, and coroutine status. Key attributes:

class FlowMethod(Generic[P, R]):
    _meth: Callable[P, R]         # Original unwrapped function
    _instance: Any                 # Bound instance (None for unbound)
    __name__: FlowMethodName       # Method name used for event routing
    __signature__: inspect.Signature

FlowMeta Metaclass

At class creation time, FlowMeta.__new__ scans the namespace for attributes with flow-related metadata and builds four class-level registries:

  • _start_methods: list[FlowMethodName] -- All methods with __is_start_method__
  • _listeners: dict[FlowMethodName, SimpleFlowCondition | FlowCondition] -- All methods with __trigger_methods__
  • _routers: set[FlowMethodName] -- All methods with __is_router__
  • _router_paths: dict[FlowMethodName, list[FlowMethodName]] -- Possible return values for each router

Example

from crewai.flow.flow import Flow, FlowState, start, listen, router
from pydantic import Field


class PipelineState(FlowState):
    raw_data: str = ""
    processed_data: str = ""
    route: str = ""


class DataPipeline(Flow[PipelineState]):

    @start()
    def ingest_data(self):
        """Entry point: load raw data."""
        self.state.raw_data = "raw sensor readings..."
        return self.state.raw_data

    @listen(ingest_data)
    def validate_data(self, raw_data):
        """Listener: validate the ingested data."""
        self.state.processed_data = raw_data.strip()
        return self.state.processed_data

    @router(validate_data)
    def route_by_quality(self) -> str:
        """Router: direct flow based on data quality."""
        if len(self.state.processed_data) > 10:
            return "HIGH_QUALITY"
        return "LOW_QUALITY"

    @listen("HIGH_QUALITY")
    def analyze_data(self):
        """Triggered when router returns 'HIGH_QUALITY'."""
        return f"Analysis of: {self.state.processed_data}"

    @listen("LOW_QUALITY")
    def flag_for_review(self):
        """Triggered when router returns 'LOW_QUALITY'."""
        return f"Flagged: {self.state.processed_data}"


flow = DataPipeline()
result = flow.kickoff()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment