Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI SQLite Flow Persistence

From Leeroopedia

Overview

SQLite Flow Persistence provides the concrete SQLite-based persistence backend (SQLiteFlowPersistence), the @persist decorator for automatic state saving, and the FlowPersistence abstract base class that defines the persistence interface in the CrewAI Flow engine.

Source Reference

Component File Lines
FlowPersistence ABC src/crewai/flow/persistence/base.py L1-107
SQLiteFlowPersistence src/crewai/flow/persistence/sqlite.py L1-283
@persist decorator src/crewai/flow/persistence/decorators.py L139-315
PersistenceDecorator helper src/crewai/flow/persistence/decorators.py L56-136
Module exports src/crewai/flow/persistence/__init__.py L1-19

Signatures

FlowPersistence ABC

class FlowPersistence(ABC):
    """Abstract base class for flow state persistence."""

    @abstractmethod
    def init_db(self) -> None:
        """Initialize the persistence backend."""

    @abstractmethod
    def save_state(
        self, flow_uuid: str, method_name: str, state_data: dict[str, Any] | BaseModel
    ) -> None:
        """Persist the flow state after method completion."""

    @abstractmethod
    def load_state(self, flow_uuid: str) -> dict[str, Any] | None:
        """Load the most recent state for a given flow UUID."""

    def save_pending_feedback(
        self, flow_uuid: str, context: PendingFeedbackContext,
        state_data: dict[str, Any] | BaseModel,
    ) -> None:
        """Save state with a pending feedback marker (optional override)."""

    def load_pending_feedback(
        self, flow_uuid: str,
    ) -> tuple[dict[str, Any], PendingFeedbackContext] | None:
        """Load state and pending feedback context (optional override)."""

    def clear_pending_feedback(self, flow_uuid: str) -> None:
        """Clear the pending feedback marker after resume (optional override)."""

SQLiteFlowPersistence

class SQLiteFlowPersistence(FlowPersistence):
    """SQLite-based implementation of flow state persistence."""

    def __init__(self, db_path: str | None = None) -> None:
        """Initialize SQLite persistence.

        Args:
            db_path: Path to the SQLite database file. If not provided,
                     uses db_storage_path() / "flow_states.db".
        """

@persist Decorator

def persist(
    persistence: FlowPersistence | None = None, verbose: bool = False
) -> Callable[[type | Callable[..., T]], type | Callable[..., T]]:
    """Decorator to persist flow state.

    Can be applied at class level or method level.

    Args:
        persistence: Optional FlowPersistence implementation. Defaults to SQLiteFlowPersistence.
        verbose: Whether to log persistence operations. Defaults to False.
    """

Import

from crewai.flow.persistence import SQLiteFlowPersistence, persist
from crewai.flow.persistence.base import FlowPersistence  # for custom backends

I/O Contract

Operation Input Output Storage Effect
save_state() flow_uuid (str), method_name (str), state_data (dict or BaseModel) None Inserts row into flow_states table
load_state() flow_uuid (str) None Reads most recent row for UUID
save_pending_feedback() flow_uuid, PendingFeedbackContext, state_data None Inserts/replaces row in pending_feedback table
load_pending_feedback() flow_uuid (str) None Reads pending feedback row
clear_pending_feedback() flow_uuid (str) None Deletes pending feedback row

Detailed Behavior

SQLite Schema

SQLiteFlowPersistence.init_db() creates two tables:

CREATE TABLE IF NOT EXISTS flow_states (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    flow_uuid TEXT NOT NULL,
    method_name TEXT NOT NULL,
    timestamp DATETIME NOT NULL,
    state_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid ON flow_states(flow_uuid);

CREATE TABLE IF NOT EXISTS pending_feedback (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    flow_uuid TEXT NOT NULL UNIQUE,
    context_json TEXT NOT NULL,
    state_json TEXT NOT NULL,
    created_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pending_feedback_uuid ON pending_feedback(flow_uuid);

The flow_states table stores a new row for each checkpoint (append-only log). The pending_feedback table uses INSERT OR REPLACE to maintain at most one pending feedback per flow UUID.

@persist Decorator Behavior

The @persist decorator can be applied at two levels:

Class-level: Wraps all flow methods (those with __is_start_method__, __trigger_methods__, or __is_router__) to persist state after each completion. It also injects the persistence instance into the constructor.

Method-level: Wraps only the decorated method to persist state after completion.

In both cases, the decorator:

  1. Calls the original method
  2. Extracts the flow UUID from self.state.id
  3. Serializes state via model_dump() (Pydantic) or directly (dict)
  4. Calls persistence.save_state(flow_uuid, method_name, state_data)

The PersistenceDecorator.persist_state() classmethod handles the unwrapping of StateProxy objects, UUID extraction, error handling, and optional verbose logging.

State Restoration

When a flow is created with an existing UUID (via inputs={"id": "existing-uuid"}), the kickoff_async() method calls persistence.load_state(flow_uuid) and hydrates self.state with the loaded data. The _completed_methods set tracks which methods have already run to prevent re-execution.

Example

from crewai.flow.flow import Flow, FlowState, start, listen
from crewai.flow.persistence import persist, SQLiteFlowPersistence


# Class-level persistence: all methods auto-persist
@persist(verbose=True)
class ResearchFlow(Flow[FlowState]):

    @start()
    def gather_data(self):
        self.state.data = "collected research data"
        return self.state.data

    @listen(gather_data)
    def analyze(self):
        self.state.analysis = f"Analysis of: {self.state.data}"
        return self.state.analysis


# Method-level persistence: only specific methods persist
class SelectivePersistence(Flow[FlowState]):
    def __init__(self):
        super().__init__(persistence=SQLiteFlowPersistence("my_flows.db"))

    @start()
    def fast_step(self):
        """No persistence - lightweight step."""
        return "quick result"

    @listen(fast_step)
    @persist(SQLiteFlowPersistence("my_flows.db"))
    def expensive_step(self):
        """Persisted - expensive step worth checkpointing."""
        self.state.result = "expensive computation done"
        return self.state.result


# Resume a crashed flow
flow = ResearchFlow()
result = flow.kickoff()
flow_id = flow.state.id  # Save this UUID

# Later, after crash:
restored_flow = ResearchFlow()
result = restored_flow.kickoff(inputs={"id": flow_id})
# State is restored from SQLite; completed methods are skipped

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment