Implementation:CrewAIInc CrewAI SQLite Flow Persistence
Overview
SQLite Flow Persistence provides the concrete SQLite-based persistence backend (SQLiteFlowPersistence), the @persist decorator for automatic state saving, and the FlowPersistence abstract base class that defines the persistence interface in the CrewAI Flow engine.
Source Reference
| Component | File | Lines |
|---|---|---|
FlowPersistence ABC |
src/crewai/flow/persistence/base.py |
L1-107 |
SQLiteFlowPersistence |
src/crewai/flow/persistence/sqlite.py |
L1-283 |
@persist decorator |
src/crewai/flow/persistence/decorators.py |
L139-315 |
PersistenceDecorator helper |
src/crewai/flow/persistence/decorators.py |
L56-136 |
| Module exports | src/crewai/flow/persistence/__init__.py |
L1-19 |
Signatures
FlowPersistence ABC
class FlowPersistence(ABC):
"""Abstract base class for flow state persistence."""
@abstractmethod
def init_db(self) -> None:
"""Initialize the persistence backend."""
@abstractmethod
def save_state(
self, flow_uuid: str, method_name: str, state_data: dict[str, Any] | BaseModel
) -> None:
"""Persist the flow state after method completion."""
@abstractmethod
def load_state(self, flow_uuid: str) -> dict[str, Any] | None:
"""Load the most recent state for a given flow UUID."""
def save_pending_feedback(
self, flow_uuid: str, context: PendingFeedbackContext,
state_data: dict[str, Any] | BaseModel,
) -> None:
"""Save state with a pending feedback marker (optional override)."""
def load_pending_feedback(
self, flow_uuid: str,
) -> tuple[dict[str, Any], PendingFeedbackContext] | None:
"""Load state and pending feedback context (optional override)."""
def clear_pending_feedback(self, flow_uuid: str) -> None:
"""Clear the pending feedback marker after resume (optional override)."""
SQLiteFlowPersistence
class SQLiteFlowPersistence(FlowPersistence):
"""SQLite-based implementation of flow state persistence."""
def __init__(self, db_path: str | None = None) -> None:
"""Initialize SQLite persistence.
Args:
db_path: Path to the SQLite database file. If not provided,
uses db_storage_path() / "flow_states.db".
"""
@persist Decorator
def persist(
persistence: FlowPersistence | None = None, verbose: bool = False
) -> Callable[[type | Callable[..., T]], type | Callable[..., T]]:
"""Decorator to persist flow state.
Can be applied at class level or method level.
Args:
persistence: Optional FlowPersistence implementation. Defaults to SQLiteFlowPersistence.
verbose: Whether to log persistence operations. Defaults to False.
"""
Import
from crewai.flow.persistence import SQLiteFlowPersistence, persist
from crewai.flow.persistence.base import FlowPersistence # for custom backends
I/O Contract
| Operation | Input | Output | Storage Effect |
|---|---|---|---|
save_state() |
flow_uuid (str), method_name (str), state_data (dict or BaseModel) | None | Inserts row into flow_states table
|
load_state() |
flow_uuid (str) | None | Reads most recent row for UUID |
save_pending_feedback() |
flow_uuid, PendingFeedbackContext, state_data | None | Inserts/replaces row in pending_feedback table
|
load_pending_feedback() |
flow_uuid (str) | None | Reads pending feedback row |
clear_pending_feedback() |
flow_uuid (str) | None | Deletes pending feedback row |
Detailed Behavior
SQLite Schema
SQLiteFlowPersistence.init_db() creates two tables:
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
method_name TEXT NOT NULL,
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid ON flow_states(flow_uuid);
CREATE TABLE IF NOT EXISTS pending_feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL UNIQUE,
context_json TEXT NOT NULL,
state_json TEXT NOT NULL,
created_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pending_feedback_uuid ON pending_feedback(flow_uuid);
The flow_states table stores a new row for each checkpoint (append-only log). The pending_feedback table uses INSERT OR REPLACE to maintain at most one pending feedback per flow UUID.
@persist Decorator Behavior
The @persist decorator can be applied at two levels:
Class-level: Wraps all flow methods (those with __is_start_method__, __trigger_methods__, or __is_router__) to persist state after each completion. It also injects the persistence instance into the constructor.
Method-level: Wraps only the decorated method to persist state after completion.
In both cases, the decorator:
- Calls the original method
- Extracts the flow UUID from
self.state.id - Serializes state via
model_dump()(Pydantic) or directly (dict) - Calls
persistence.save_state(flow_uuid, method_name, state_data)
The PersistenceDecorator.persist_state() classmethod handles the unwrapping of StateProxy objects, UUID extraction, error handling, and optional verbose logging.
State Restoration
When a flow is created with an existing UUID (via inputs={"id": "existing-uuid"}), the kickoff_async() method calls persistence.load_state(flow_uuid) and hydrates self.state with the loaded data. The _completed_methods set tracks which methods have already run to prevent re-execution.
Example
from crewai.flow.flow import Flow, FlowState, start, listen
from crewai.flow.persistence import persist, SQLiteFlowPersistence
# Class-level persistence: all methods auto-persist
@persist(verbose=True)
class ResearchFlow(Flow[FlowState]):
@start()
def gather_data(self):
self.state.data = "collected research data"
return self.state.data
@listen(gather_data)
def analyze(self):
self.state.analysis = f"Analysis of: {self.state.data}"
return self.state.analysis
# Method-level persistence: only specific methods persist
class SelectivePersistence(Flow[FlowState]):
def __init__(self):
super().__init__(persistence=SQLiteFlowPersistence("my_flows.db"))
@start()
def fast_step(self):
"""No persistence - lightweight step."""
return "quick result"
@listen(fast_step)
@persist(SQLiteFlowPersistence("my_flows.db"))
def expensive_step(self):
"""Persisted - expensive step worth checkpointing."""
self.state.result = "expensive computation done"
return self.state.result
# Resume a crashed flow
flow = ResearchFlow()
result = flow.kickoff()
flow_id = flow.state.id # Save this UUID
# Later, after crash:
restored_flow = ResearchFlow()
result = restored_flow.kickoff(inputs={"id": flow_id})
# State is restored from SQLite; completed methods are skipped
Related Pages
- Principle:CrewAIInc_CrewAI_State_Persistence
- Implementation:CrewAIInc_CrewAI_Flow_State_Model -- The
FlowStatebase class with the UUIDidfield - Implementation:CrewAIInc_CrewAI_Flow_Decorators -- Decorators whose completion triggers persistence
- Implementation:CrewAIInc_CrewAI_Flow_Kickoff_And_Plot -- Kickoff integrates with persistence for state restoration