Implementation:PrefectHQ Prefect Task Decorator
| Metadata | |
|---|---|
| Sources | |
| Domains | |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete decorator for defining retryable, observable pipeline steps provided by the prefect library. The @task decorator transforms a Python function into a Prefect task with independent state tracking, retry logic, caching, and observability.
Description
The @task decorator from prefect transforms a Python function into a Prefect task. Tasks run within flows, and each task run is independently logged and tracked through its own state lifecycle.
Key parameters:
- name -- task name displayed in the Prefect UI
- retries -- number of retry attempts on failure
- retry_delay_seconds -- delay between retries (fixed, list, or exponential)
- retry_condition_fn -- optional callable that determines whether to retry based on the exception
- timeout_seconds -- maximum execution time before the task is cancelled
- log_prints -- capture
print()as structured logs - cache_key_fn -- function to generate cache keys for result caching
- cache_expiration -- how long cached results remain valid
- tags -- tags for filtering and organizing tasks in the UI
- on_completion, on_failure -- lifecycle hook callbacks
Code Reference
- Repository: https://github.com/PrefectHQ/prefect
- File:
src/prefect/tasks.py(L1967 for the decorator) - Import:
from prefect import task
Signature:
def task(
__fn: Optional[Callable] = None,
*,
name: Optional[str] = None,
retries: Optional[int] = None,
retry_delay_seconds: Optional[Union[int, float, List[float]]] = None,
retry_condition_fn: Optional[Callable] = None,
timeout_seconds: Optional[Union[int, float]] = None,
log_prints: Optional[bool] = None,
cache_key_fn: Optional[Callable] = None,
cache_expiration: Optional[timedelta] = None,
tags: Optional[Iterable[str]] = None,
on_completion: Optional[List[Callable]] = None,
on_failure: Optional[List[Callable]] = None,
) -> Union[Task, Callable]:
I/O Contract
| Direction | Parameter | Type | Description |
|---|---|---|---|
| Input | __fn |
Callable |
The function to decorate. |
| Input | retries |
int |
Number of retry attempts on failure. |
| Input | retry_delay_seconds |
Union[int, float, List[float]] |
Delay between retries. |
| Input | timeout_seconds |
Union[int, float] |
Maximum execution time. |
| Input | log_prints |
bool |
Capture print() as structured logs.
|
| Input | cache_key_fn |
Callable |
Function to generate cache keys. |
| Input | tags |
Iterable[str] |
Tags for organizing tasks. |
| Output | -- | Task |
A Task object wrapping the original function with orchestration capabilities. |
Usage Examples
Example 1: HTTP Fetch with Retries and Backoff
from prefect import task
import httpx
@task(retries=3, retry_delay_seconds=[2, 5, 15])
def fetch_page(page: int, api_base: str, per_page: int) -> list[dict]:
url = f"{api_base}/articles"
params = {"page": page, "per_page": per_page}
response = httpx.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
This task fetches a single page of results from a REST API. The retries=3 with retry_delay_seconds=[2, 5, 15] provides escalating backoff -- if the first attempt fails, it waits 2 seconds; if the second fails, 5 seconds; if the third fails, 15 seconds before the final attempt.
Example 2: Simple Task Without Retries
from prefect import task
import pandas as pd
@task
def to_dataframe(raw_articles: list[list[dict]]) -> pd.DataFrame:
records = [article for page in raw_articles for article in page]
df = pd.json_normalize(records)[["id", "title", "published_at", "url"]]
return df
This task performs a pure data transformation with no external dependencies. Since it cannot fail due to transient issues, no retries are configured. The @task decorator still provides state tracking and logging.