Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:PrefectHQ Prefect Task Decorator

From Leeroopedia


Metadata
Sources
Domains
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete decorator for defining retryable, observable pipeline steps provided by the prefect library. The @task decorator transforms a Python function into a Prefect task with independent state tracking, retry logic, caching, and observability.

Description

The @task decorator from prefect transforms a Python function into a Prefect task. Tasks run within flows, and each task run is independently logged and tracked through its own state lifecycle.

Key parameters:

  • name -- task name displayed in the Prefect UI
  • retries -- number of retry attempts on failure
  • retry_delay_seconds -- delay between retries (fixed, list, or exponential)
  • retry_condition_fn -- optional callable that determines whether to retry based on the exception
  • timeout_seconds -- maximum execution time before the task is cancelled
  • log_prints -- capture print() as structured logs
  • cache_key_fn -- function to generate cache keys for result caching
  • cache_expiration -- how long cached results remain valid
  • tags -- tags for filtering and organizing tasks in the UI
  • on_completion, on_failure -- lifecycle hook callbacks

Code Reference

Signature:

def task(
    __fn: Optional[Callable] = None,
    *,
    name: Optional[str] = None,
    retries: Optional[int] = None,
    retry_delay_seconds: Optional[Union[int, float, List[float]]] = None,
    retry_condition_fn: Optional[Callable] = None,
    timeout_seconds: Optional[Union[int, float]] = None,
    log_prints: Optional[bool] = None,
    cache_key_fn: Optional[Callable] = None,
    cache_expiration: Optional[timedelta] = None,
    tags: Optional[Iterable[str]] = None,
    on_completion: Optional[List[Callable]] = None,
    on_failure: Optional[List[Callable]] = None,
) -> Union[Task, Callable]:

I/O Contract

Direction Parameter Type Description
Input __fn Callable The function to decorate.
Input retries int Number of retry attempts on failure.
Input retry_delay_seconds Union[int, float, List[float]] Delay between retries.
Input timeout_seconds Union[int, float] Maximum execution time.
Input log_prints bool Capture print() as structured logs.
Input cache_key_fn Callable Function to generate cache keys.
Input tags Iterable[str] Tags for organizing tasks.
Output -- Task A Task object wrapping the original function with orchestration capabilities.

Usage Examples

Example 1: HTTP Fetch with Retries and Backoff

from prefect import task
import httpx

@task(retries=3, retry_delay_seconds=[2, 5, 15])
def fetch_page(page: int, api_base: str, per_page: int) -> list[dict]:
    url = f"{api_base}/articles"
    params = {"page": page, "per_page": per_page}
    response = httpx.get(url, params=params, timeout=30)
    response.raise_for_status()
    return response.json()

This task fetches a single page of results from a REST API. The retries=3 with retry_delay_seconds=[2, 5, 15] provides escalating backoff -- if the first attempt fails, it waits 2 seconds; if the second fails, 5 seconds; if the third fails, 15 seconds before the final attempt.

Example 2: Simple Task Without Retries

from prefect import task
import pandas as pd

@task
def to_dataframe(raw_articles: list[list[dict]]) -> pd.DataFrame:
    records = [article for page in raw_articles for article in page]
    df = pd.json_normalize(records)[["id", "title", "published_at", "url"]]
    return df

This task performs a pure data transformation with no external dependencies. Since it cannot fail due to transient issues, no retries are configured. The @task decorator still provides state tracking and logging.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment