Implementation:CrewAIInc CrewAI Flow Routing Primitives
Overview
Flow Routing Primitives provides the concrete functions for conditional routing, fan-in, and fan-out in flow execution: or_(), and_(), and @router. These primitives enable complex control flow patterns within the CrewAI Flow engine.
Source Reference
| Component | File | Lines |
|---|---|---|
or_() combinator |
src/crewai/flow/flow.py |
L346-379 |
and_() combinator |
src/crewai/flow/flow.py |
L382-416 |
router() decorator |
src/crewai/flow/flow.py |
L272-343 |
RouterMethod wrapper |
src/crewai/flow/flow_wrappers.py |
L160-166 |
FlowCondition TypedDict |
src/crewai/flow/flow_wrappers.py |
L22-36 |
| Racing group logic | src/crewai/flow/flow.py |
L806-898 |
Signatures
def or_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with OR logic for flow control.
Args:
conditions: Variable number of conditions (method names, condition dicts,
or method references).
Returns:
A condition dictionary: {"type": "OR", "conditions": [...]}
"""
def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition:
"""Combines multiple conditions with AND logic for flow control.
Args:
*conditions: Variable number of conditions (method names, condition dicts,
or method references).
Returns:
A condition dictionary: {"type": "AND", "conditions": [...]}
"""
def router(
condition: str | FlowCondition | Callable[..., Any],
) -> Callable[[Callable[P, R]], RouterMethod[P, R]]:
"""Creates a routing method that directs flow execution based on conditions."""
Import
from crewai.flow.flow import Flow, start, listen, router, or_, and_
I/O Contract
| Primitive | Input | Output | Effect |
|---|---|---|---|
or_(*methods) |
Method references, strings, or nested FlowConditions |
FlowCondition dict with type="OR" |
Listener fires when any condition is met |
and_(*methods) |
Method references, strings, or nested FlowConditions |
FlowCondition dict with type="AND" |
Listener fires when all conditions are met |
@router(cond) |
Trigger condition | RouterMethod wrapper |
Return value selects downstream @listen targets
|
Detailed Behavior
FlowCondition Data Structure
Both or_() and and_() produce a FlowCondition TypedDict:
class FlowCondition(TypedDict, total=False):
type: Required[FlowConditionType] # "OR" or "AND"
conditions: Sequence[FlowMethodName | FlowCondition] # Nested conditions
methods: list[FlowMethodName] # Flat method list (simple case)
When conditions are nested (e.g., or_(and_(a, b), c)), the conditions field contains a mix of strings and nested FlowCondition dicts. The framework recursively evaluates these at runtime.
or_() Evaluation
At runtime, the flow engine tracks which trigger methods have completed. For an OR condition:
- When any listed method completes, the listener is eligible to fire
- The framework uses
_fired_or_listeners(guarded by_or_listeners_lock) to ensure each OR listener fires at most once per execution cycle - The
_mark_or_listener_fired()method atomically checks and marks the listener, preventing duplicate execution from parallel triggers
and_() Evaluation
For an AND condition:
- The framework maintains
_pending_and_listeners, a dict mapping each AND listener to the set of methods that have completed - When a trigger method completes, it is added to the pending set
- When the pending set contains all required methods, the listener fires
- After firing, the pending set is cleared (supporting cyclic flows)
Router Path Discovery
The FlowMeta metaclass calls get_possible_return_constants() on each @router method to extract possible return values from:
Literaltype annotations (e.g.,-> Literal["SUCCESS", "FAILURE"])Enumreturn type annotations- String constants in return statements (via AST inspection)
These paths are stored in _router_paths and used for:
- Visualization (drawing edges from router to downstream listeners)
- Validation (warning about orphaned triggers)
Racing Groups
When multiple methods are sources for the same or_() listener and are triggered in parallel, the framework identifies them as a racing group. The _execute_racing_listeners() method uses asyncio.as_completed() to implement first-wins semantics: the first racing task to complete triggers the OR listener, and remaining racing tasks are cancelled.
Example
from crewai.flow.flow import Flow, FlowState, start, listen, router, or_, and_
from typing import Literal
class QAState(FlowState):
code: str = ""
tests_passed: bool = False
review_passed: bool = False
result: str = ""
class QAFlow(Flow[QAState]):
@start()
def write_code(self):
self.state.code = "def hello(): return 'world'"
return self.state.code
@listen(write_code)
def run_tests(self):
"""Parallel branch 1: run automated tests."""
self.state.tests_passed = True
return "tests_done"
@listen(write_code)
def code_review(self):
"""Parallel branch 2: perform code review."""
self.state.review_passed = True
return "review_done"
@listen(and_(run_tests, code_review))
def quality_gate(self):
"""Fan-in: fires only when BOTH tests and review complete."""
return "gate_passed"
@router(quality_gate)
def decide_release(self) -> Literal["RELEASE", "FIX"]:
"""Route based on combined quality results."""
if self.state.tests_passed and self.state.review_passed:
return "RELEASE"
return "FIX"
@listen("RELEASE")
def deploy(self):
self.state.result = "Deployed successfully"
return self.state.result
@listen("FIX")
def send_back(self):
self.state.result = "Sent back for fixes"
return self.state.result
@listen(or_(deploy, send_back))
def notify(self):
"""Fan-out consumer: fires when EITHER deploy or send_back completes."""
return f"Notification: {self.state.result}"
flow = QAFlow()
result = flow.kickoff()
Related Pages
- Principle:CrewAIInc_CrewAI_Conditional_Routing
- Implementation:CrewAIInc_CrewAI_Flow_Decorators --
@start,@listen,@routerdecorators that use these primitives - Implementation:CrewAIInc_CrewAI_Flow_State_Model -- State read by routing decisions
- Implementation:CrewAIInc_CrewAI_Flow_Kickoff_And_Plot -- Runtime execution of routing graphs