Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI Flow State Model

From Leeroopedia

Overview

Flow State Model provides the concrete classes for defining flow state models with UUID tracking and type validation in the CrewAI Flow engine. It includes the FlowState base class, the Flow[T] generic parameterization, and the StateProxy thread-safe wrapper that mediates all state access within flow methods.

Source Reference

Component File Lines
FlowState base class src/crewai/flow/flow.py L116-122
Flow[T] generic class src/crewai/flow/flow.py L690-778
StateProxy thread-safe wrapper src/crewai/flow/flow.py L545-604
LockedListProxy / LockedDictProxy src/crewai/flow/flow.py L419-542

Signature

class FlowState(BaseModel):
    """Base model for all flow states, ensuring each state has a unique ID."""
    id: str = Field(
        default_factory=lambda: str(uuid4()),
        description="Unique identifier for the flow state",
    )

T = TypeVar("T", bound=dict[str, Any] | BaseModel)

class Flow(Generic[T], metaclass=FlowMeta):
    """Base class for all flows.
    type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
    initial_state: type[T] | T | None = None

Import

from crewai.flow.flow import Flow, FlowState

I/O Contract

Direction Type Description
Input (typed) Pydantic BaseModel subclass extending FlowState Custom state model with validated fields and auto-generated UUID
Input (untyped) dict[str, Any] Flexible key-value state; framework injects id if missing
Output State object via self.state StateProxy[T] wrapping the underlying state with thread-safe access
Persistence key str The id field (UUID) used as primary key for state persistence

Detailed Behavior

FlowState Base Class

FlowState is a minimal Pydantic BaseModel that adds a single field:

id: str = Field(default_factory=lambda: str(uuid4()))

This UUID is generated once at instantiation and remains stable throughout the flow's lifetime. It serves as the persistence primary key in SQLiteFlowPersistence and as the flow identifier in tracing events.

Flow Generic Parameterization

The Flow class is parameterized with Generic[T] where T is bound to dict[str, Any] | BaseModel. When subclassing, developers specify the state type:

class MyFlow(Flow[MyState]):  # Typed state
    ...

class FlexFlow(Flow[dict]):  # Untyped state
    ...

The __class_getitem__ method creates a dynamic subclass storing the type parameter as _initial_state_t, which the constructor uses to instantiate the correct state type.

StateProxy Thread Safety

When flow methods access self.state, they receive a StateProxy instance rather than the raw state object. The proxy:

  1. Intercepts __setattr__ calls and wraps them in a threading.Lock
  2. Returns LockedListProxy for list-typed attributes (guarding append, extend, pop, etc.)
  3. Returns LockedDictProxy for dict-typed attributes (guarding __setitem__, update, pop, etc.)
  4. Passes through read operations without locking for performance

The _unwrap() method on StateProxy returns the underlying raw state, used internally by the persistence layer for serialization.

Example

from crewai.flow.flow import Flow, FlowState, start, listen
from pydantic import BaseModel, Field


class ResearchState(FlowState):
    """Typed state for a research flow."""
    topic: str = ""
    sources: list[str] = Field(default_factory=list)
    summary: str = ""
    word_count: int = 0


class ResearchFlow(Flow[ResearchState]):
    @start()
    def gather_sources(self):
        self.state.topic = "AI Safety"
        self.state.sources.append("arxiv.org/abs/2301.00001")
        self.state.sources.append("arxiv.org/abs/2301.00002")
        return self.state.sources

    @listen(gather_sources)
    def summarize(self):
        self.state.summary = f"Summary of {len(self.state.sources)} sources on {self.state.topic}"
        self.state.word_count = len(self.state.summary.split())
        return self.state.summary


# Untyped state alternative
class QuickFlow(Flow[dict]):
    @start()
    def begin(self):
        self.state["topic"] = "AI Safety"
        self.state["results"] = []
        return "started"


# Execute
flow = ResearchFlow()
result = flow.kickoff()
print(flow.state.id)       # UUID like "a1b2c3d4-..."
print(flow.state.summary)  # "Summary of 2 sources on AI Safety"

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment