Implementation:CrewAIInc CrewAI Flow State Model
Overview
Flow State Model provides the concrete classes for defining flow state models with UUID tracking and type validation in the CrewAI Flow engine. It includes the FlowState base class, the Flow[T] generic parameterization, and the StateProxy thread-safe wrapper that mediates all state access within flow methods.
Source Reference
| Component | File | Lines |
|---|---|---|
FlowState base class |
src/crewai/flow/flow.py |
L116-122 |
Flow[T] generic class |
src/crewai/flow/flow.py |
L690-778 |
StateProxy thread-safe wrapper |
src/crewai/flow/flow.py |
L545-604 |
LockedListProxy / LockedDictProxy |
src/crewai/flow/flow.py |
L419-542 |
Signature
class FlowState(BaseModel):
"""Base model for all flow states, ensuring each state has a unique ID."""
id: str = Field(
default_factory=lambda: str(uuid4()),
description="Unique identifier for the flow state",
)
T = TypeVar("T", bound=dict[str, Any] | BaseModel)
class Flow(Generic[T], metaclass=FlowMeta):
"""Base class for all flows.
type parameter T must be either dict[str, Any] or a subclass of BaseModel."""
initial_state: type[T] | T | None = None
Import
from crewai.flow.flow import Flow, FlowState
I/O Contract
| Direction | Type | Description |
|---|---|---|
| Input (typed) | Pydantic BaseModel subclass extending FlowState |
Custom state model with validated fields and auto-generated UUID |
| Input (untyped) | dict[str, Any] |
Flexible key-value state; framework injects id if missing
|
| Output | State object via self.state |
StateProxy[T] wrapping the underlying state with thread-safe access
|
| Persistence key | str |
The id field (UUID) used as primary key for state persistence
|
Detailed Behavior
FlowState Base Class
FlowState is a minimal Pydantic BaseModel that adds a single field:
id: str = Field(default_factory=lambda: str(uuid4()))
This UUID is generated once at instantiation and remains stable throughout the flow's lifetime. It serves as the persistence primary key in SQLiteFlowPersistence and as the flow identifier in tracing events.
Flow Generic Parameterization
The Flow class is parameterized with Generic[T] where T is bound to dict[str, Any] | BaseModel. When subclassing, developers specify the state type:
class MyFlow(Flow[MyState]): # Typed state
...
class FlexFlow(Flow[dict]): # Untyped state
...
The __class_getitem__ method creates a dynamic subclass storing the type parameter as _initial_state_t, which the constructor uses to instantiate the correct state type.
StateProxy Thread Safety
When flow methods access self.state, they receive a StateProxy instance rather than the raw state object. The proxy:
- Intercepts
__setattr__calls and wraps them in athreading.Lock - Returns
LockedListProxyfor list-typed attributes (guardingappend,extend,pop, etc.) - Returns
LockedDictProxyfor dict-typed attributes (guarding__setitem__,update,pop, etc.) - Passes through read operations without locking for performance
The _unwrap() method on StateProxy returns the underlying raw state, used internally by the persistence layer for serialization.
Example
from crewai.flow.flow import Flow, FlowState, start, listen
from pydantic import BaseModel, Field
class ResearchState(FlowState):
"""Typed state for a research flow."""
topic: str = ""
sources: list[str] = Field(default_factory=list)
summary: str = ""
word_count: int = 0
class ResearchFlow(Flow[ResearchState]):
@start()
def gather_sources(self):
self.state.topic = "AI Safety"
self.state.sources.append("arxiv.org/abs/2301.00001")
self.state.sources.append("arxiv.org/abs/2301.00002")
return self.state.sources
@listen(gather_sources)
def summarize(self):
self.state.summary = f"Summary of {len(self.state.sources)} sources on {self.state.topic}"
self.state.word_count = len(self.state.summary.split())
return self.state.summary
# Untyped state alternative
class QuickFlow(Flow[dict]):
@start()
def begin(self):
self.state["topic"] = "AI Safety"
self.state["results"] = []
return "started"
# Execute
flow = ResearchFlow()
result = flow.kickoff()
print(flow.state.id) # UUID like "a1b2c3d4-..."
print(flow.state.summary) # "Summary of 2 sources on AI Safety"
Related Pages
- Principle:CrewAIInc_CrewAI_State_Model_Design
- Implementation:CrewAIInc_CrewAI_Flow_Decorators -- Decorators that define how methods interact with state
- Implementation:CrewAIInc_CrewAI_SQLite_Flow_Persistence -- Persists state using the UUID
idfield - Implementation:CrewAIInc_CrewAI_Flow_Kickoff_And_Plot -- Kickoff initializes and manages the state lifecycle