Implementation:PrefectHQ Prefect Flow Decorator
| Metadata | |
|---|---|
| Sources | |
| Domains | |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete decorator for defining Prefect flows provided by the prefect library. The @flow decorator transforms a standard Python function into a tracked, retryable, parameterized workflow entry point.
Description
The @flow decorator from prefect transforms a Python function into a Prefect flow. When a decorated function is called, Prefect creates a flow run with full state tracking, logging, and orchestration capabilities.
Key parameters:
- name -- flow name displayed in the Prefect UI
- log_prints -- when
True, capturesprint()statements as structured log entries - retries -- number of times to retry the entire flow on failure
- retry_delay_seconds -- delay between retries (supports fixed, list, or exponential backoff)
- timeout_seconds -- maximum execution time before the flow is cancelled
- description -- human-readable description of the flow
- on_completion, on_failure, on_cancellation, on_crashed, on_running -- lifecycle hook callbacks
Code Reference
- Repository: https://github.com/PrefectHQ/prefect
- File:
src/prefect/flows.py(L2032-2237 for the decorator function) - Import:
from prefect import flow
Signature:
def flow(
__fn: Optional[Callable[P, R]] = None,
*,
name: Optional[str] = None,
version: Optional[str] = None,
flow_run_name: Optional[str] = None,
retries: Optional[int] = None,
retry_delay_seconds: Optional[Union[int, float, List[float]]] = None,
timeout_seconds: Optional[Union[int, float]] = None,
validate_parameters: bool = True,
persist_result: Optional[bool] = None,
result_storage: Optional[Any] = None,
result_serializer: Optional[Any] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
on_completion: Optional[List[Callable]] = None,
on_failure: Optional[List[Callable]] = None,
on_cancellation: Optional[List[Callable]] = None,
on_crashed: Optional[List[Callable]] = None,
on_running: Optional[List[Callable]] = None,
) -> Union[Flow[P, R], Callable[[Callable[P, R]], Flow[P, R]]]:
I/O Contract
| Direction | Parameter | Type | Description |
|---|---|---|---|
| Input | __fn |
Callable, optional |
The function to decorate. When provided, the decorator is used without parentheses. |
| Input | name |
str, optional |
Display name for the flow in the Prefect UI. |
| Input | log_prints |
bool, optional |
When True, captures print() as structured logs.
|
| Input | retries |
int, optional |
Number of times to retry the flow on failure. |
| Input | retry_delay_seconds |
Union[int, float, List[float]], optional |
Delay between retries. |
| Input | timeout_seconds |
Union[int, float], optional |
Maximum execution time. |
| Output | -- | Flow[P, R] |
A Flow object that wraps the original function with orchestration capabilities. |
Usage Examples
Example 1: Basic ETL Flow
from prefect import flow
@flow(name="devto_etl", log_prints=True)
def etl(api_base: str, pages: int, per_page: int, output_file: Path) -> None:
"""Run the end-to-end ETL for pages of articles."""
raw_pages = []
for page_number in range(1, pages + 1):
raw_pages.append(fetch_page(page_number, api_base, per_page))
df = to_dataframe(raw_pages)
save_csv(df, output_file)
This flow orchestrates a complete ETL pipeline: it iterates through API pages, transforms results into a DataFrame, and saves to CSV. The log_prints=True parameter ensures all print() calls within the flow and its tasks are captured as structured log entries.
Example 2: Async Flow with Deployment
from prefect import flow
@flow(name="ai-data-analyst", log_prints=True)
async def analyze_dataset_with_ai() -> DataAnalysis:
df = create_sample_dataset()
agent = create_data_analyst_agent()
result = await agent.run("Analyze this dataset.", deps=df)
return result.output
This async flow demonstrates integration with AI agents. Prefect handles the async execution, state tracking, and result persistence transparently.