Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI Flow Routing Primitives

From Leeroopedia

Overview

Flow Routing Primitives provides the concrete functions for conditional routing, fan-in, and fan-out in flow execution: or_(), and_(), and @router. These primitives enable complex control flow patterns within the CrewAI Flow engine.

Source Reference

Component File Lines
or_() combinator src/crewai/flow/flow.py L346-379
and_() combinator src/crewai/flow/flow.py L382-416
router() decorator src/crewai/flow/flow.py L272-343
RouterMethod wrapper src/crewai/flow/flow_wrappers.py L160-166
FlowCondition TypedDict src/crewai/flow/flow_wrappers.py L22-36
Racing group logic src/crewai/flow/flow.py L806-898

Signatures

def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
    """Combines multiple conditions with OR logic for flow control.

    Args:
        conditions: Variable number of conditions (method names, condition dicts,
                    or method references).

    Returns:
        A condition dictionary: {"type": "OR", "conditions": [...]}
    """

def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
    """Combines multiple conditions with AND logic for flow control.

    Args:
        *conditions: Variable number of conditions (method names, condition dicts,
                     or method references).

    Returns:
        A condition dictionary: {"type": "AND", "conditions": [...]}
    """

def router(
    condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
    """Creates a routing method that directs flow execution based on conditions."""

Import

from crewai.flow.flow import Flow, start, listen, router, or_, and_

I/O Contract

Primitive Input Output Effect
or_(*methods) Method references, strings, or nested FlowConditions FlowCondition dict with type="OR" Listener fires when any condition is met
and_(*methods) Method references, strings, or nested FlowConditions FlowCondition dict with type="AND" Listener fires when all conditions are met
@router(cond) Trigger condition RouterMethod wrapper Return value selects downstream @listen targets

Detailed Behavior

FlowCondition Data Structure

Both or_() and and_() produce a FlowCondition TypedDict:

class FlowCondition(TypedDict, total=False):
    type: Required[FlowConditionType]          # "OR" or "AND"
    conditions: Sequence[FlowMethodName | FlowCondition]  # Nested conditions
    methods: list[FlowMethodName]              # Flat method list (simple case)

When conditions are nested (e.g., or_(and_(a, b), c)), the conditions field contains a mix of strings and nested FlowCondition dicts. The framework recursively evaluates these at runtime.

or_() Evaluation

At runtime, the flow engine tracks which trigger methods have completed. For an OR condition:

  1. When any listed method completes, the listener is eligible to fire
  2. The framework uses _fired_or_listeners (guarded by _or_listeners_lock) to ensure each OR listener fires at most once per execution cycle
  3. The _mark_or_listener_fired() method atomically checks and marks the listener, preventing duplicate execution from parallel triggers

and_() Evaluation

For an AND condition:

  1. The framework maintains _pending_and_listeners, a dict mapping each AND listener to the set of methods that have completed
  2. When a trigger method completes, it is added to the pending set
  3. When the pending set contains all required methods, the listener fires
  4. After firing, the pending set is cleared (supporting cyclic flows)

Router Path Discovery

The FlowMeta metaclass calls get_possible_return_constants() on each @router method to extract possible return values from:

  • Literal type annotations (e.g., -> Literal["SUCCESS", "FAILURE"])
  • Enum return type annotations
  • String constants in return statements (via AST inspection)

These paths are stored in _router_paths and used for:

  • Visualization (drawing edges from router to downstream listeners)
  • Validation (warning about orphaned triggers)

Racing Groups

When multiple methods are sources for the same or_() listener and are triggered in parallel, the framework identifies them as a racing group. The _execute_racing_listeners() method uses asyncio.as_completed() to implement first-wins semantics: the first racing task to complete triggers the OR listener, and remaining racing tasks are cancelled.

Example

from crewai.flow.flow import Flow, FlowState, start, listen, router, or_, and_
from typing import Literal


class QAState(FlowState):
    code: str = ""
    tests_passed: bool = False
    review_passed: bool = False
    result: str = ""


class QAFlow(Flow[QAState]):

    @start()
    def write_code(self):
        self.state.code = "def hello(): return 'world'"
        return self.state.code

    @listen(write_code)
    def run_tests(self):
        """Parallel branch 1: run automated tests."""
        self.state.tests_passed = True
        return "tests_done"

    @listen(write_code)
    def code_review(self):
        """Parallel branch 2: perform code review."""
        self.state.review_passed = True
        return "review_done"

    @listen(and_(run_tests, code_review))
    def quality_gate(self):
        """Fan-in: fires only when BOTH tests and review complete."""
        return "gate_passed"

    @router(quality_gate)
    def decide_release(self) -> Literal["RELEASE", "FIX"]:
        """Route based on combined quality results."""
        if self.state.tests_passed and self.state.review_passed:
            return "RELEASE"
        return "FIX"

    @listen("RELEASE")
    def deploy(self):
        self.state.result = "Deployed successfully"
        return self.state.result

    @listen("FIX")
    def send_back(self):
        self.state.result = "Sent back for fixes"
        return self.state.result

    @listen(or_(deploy, send_back))
    def notify(self):
        """Fan-out consumer: fires when EITHER deploy or send_back completes."""
        return f"Notification: {self.state.result}"


flow = QAFlow()
result = flow.kickoff()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment