Implementation:PrefectHQ Prefect Fetch Page Task
| Metadata | |
|---|---|
| Sources | |
| Domains | |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete task for fetching paginated JSON data from REST APIs provided by the Prefect ETL example. The fetch_page task wraps an httpx.get call with Prefect's retry mechanism for resilient HTTP data extraction.
Description
The fetch_page task wraps an httpx.get call with Prefect's @task(retries=3, retry_delay_seconds=[2, 5, 15]) for resilient HTTP data extraction. It fetches a single page of results from a REST API and returns the parsed JSON response.
Key characteristics:
- 3 retries with escalating backoff (2s, 5s, 15s) for transient HTTP failures
- 30-second timeout on the HTTP request itself via
httpx - raise_for_status() to convert HTTP error codes (4xx, 5xx) into exceptions that trigger retries
- Independent execution -- each page fetch is a separate task run, so failures are isolated
Code Reference
- Repository: https://github.com/PrefectHQ/prefect
- File:
examples/run_api_sourced_etl.py(L72-80) - Import:
from prefect import task;import httpx
Signature:
@task(retries=3, retry_delay_seconds=[2, 5, 15])
def fetch_page(page: int, api_base: str, per_page: int) -> list[dict[str, Any]]:
"""Return a list of article dicts for a given page number."""
url = f"{api_base}/articles"
params = {"page": page, "per_page": per_page}
print(f"Fetching page {page} …")
response = httpx.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
I/O Contract
| Direction | Parameter | Type | Description |
|---|---|---|---|
| Input | page |
int, required |
The page number to fetch. |
| Input | api_base |
str, required |
The base URL of the REST API (e.g., https://dev.to/api).
|
| Input | per_page |
int, required |
Number of results to request per page. |
| Output | -- | list[dict[str, Any]] |
List of article dictionaries parsed from the JSON API response. |
Error behavior:
| Scenario | Behavior |
|---|---|
| HTTP 4xx/5xx response | raise_for_status() raises httpx.HTTPStatusError, triggering a retry
|
| Network timeout | httpx raises httpx.TimeoutException after 30 seconds, triggering a retry
|
| Connection error | httpx raises httpx.ConnectError, triggering a retry
|
| All retries exhausted | The task transitions to Failed state and the exception propagates to the parent flow |
Usage Example
from prefect import flow, task
import httpx
@task(retries=3, retry_delay_seconds=[2, 5, 15])
def fetch_page(page: int, api_base: str, per_page: int) -> list[dict]:
url = f"{api_base}/articles"
params = {"page": page, "per_page": per_page}
response = httpx.get(url, params=params, timeout=30)
response.raise_for_status()
return response.json()
@flow(name="devto_etl", log_prints=True)
def etl(api_base: str, pages: int, per_page: int, output_file: Path) -> None:
raw_pages = []
for page_number in range(1, pages + 1):
raw_pages.append(fetch_page(page_number, api_base, per_page))
# ... transform and load
In this example, the flow iterates through the requested number of pages. Each call to fetch_page creates an independent task run. If page 3 fails due to a network timeout, Prefect automatically retries it (waiting 2s, then 5s, then 15s) without re-fetching pages 1 and 2.