Implementation:Dagster io Dagster Tenacity Retry Pattern
| Field | Value |
|---|---|
| Implementation Name | Tenacity Retry Pattern |
| Source (pattern) | examples/docs_projects/project_atproto_dashboard/src/project_atproto_dashboard/defs/atproto.py:L12-47
|
| Source (dagster config) | examples/docs_projects/project_atproto_dashboard/dagster.yaml
|
| Repository | dagster-io/dagster |
| Domains | Data_Engineering, API_Integration, Resilience |
Overview
Concrete implementation of rate limiting using tenacity retry decorators and Dagster concurrency controls for API-heavy pipelines.
Description
This implementation demonstrates a multi-layered rate limiting strategy used in the AT Protocol (Bluesky) dashboard example project. At the request level, the tenacity library retries individual API calls with a fixed 150-second wait between attempts (up to 5 attempts). At the asset level, Dagster's dagster/concurrency_key op tag ensures that only one ingestion asset runs at a time. At the run level, the QueuedRunCoordinator with default_op_concurrency_limit: 1 provides a global concurrency ceiling.
Usage
Use this pattern when building pipelines that call rate-limited APIs. The tenacity retry handles transient rate limit errors at the call level, while Dagster's concurrency controls prevent multiple assets or runs from overwhelming the API simultaneously.
Code Reference
Source Location
- Retry pattern:
examples/docs_projects/project_atproto_dashboard/src/project_atproto_dashboard/defs/atproto.py:L12-47 - Dagster configuration:
examples/docs_projects/project_atproto_dashboard/dagster.yaml
Retry Pattern
import math
import tenacity
from tenacity import stop_after_attempt, wait_fixed
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_fixed(math.ceil(60 * 2.5)),
)
def _get_feed_with_retries(client, actor, cursor):
"""Fetch feed with automatic retry on rate limit errors."""
return client.get_author_feed(actor=actor, cursor=cursor, limit=100)
Asset with Concurrency Key
import dagster as dg
@dg.asset(
partitions_def=actor_partition,
op_tags={"dagster/concurrency_key": "ingestion"},
)
def actor_feed_snapshot(context, atproto: ATProtoResource, s3: S3Resource):
client = atproto.get_client()
feed = _get_feed_with_retries(client, context.partition_key, None)
...
Dagster Configuration (dagster.yaml)
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
concurrency:
default_op_concurrency_limit: 1
Import
from tenacity import retry, stop_after_attempt, wait_fixed
I/O Contract
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | client | API client | The API client instance to make calls with (e.g., AT Protocol Client)
|
| Input | retry parameters | stop_after_attempt(5), wait_fixed(150) |
Tenacity retry configuration: max attempts and wait time between retries |
| Input | concurrency_key | str |
Dagster op tag value that groups assets for serial execution |
| Output | API response | varies | The API response, retried transparently on failure |
| Output | execution order | implicit | Sequential execution enforced via concurrency key and run coordinator |
Usage Examples
Full Ingestion Pattern
import math
from typing import Optional
import dagster as dg
import tenacity
from atproto import Client
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_fixed(math.ceil(60 * 2.5)),
)
def _get_feed_with_retries(client: Client, actor: str, cursor: Optional[str]):
return client.get_author_feed(actor=actor, cursor=cursor, limit=100)
def get_all_feed_items(client: Client, actor: str):
feed = []
cursor = None
while True:
data = _get_feed_with_retries(client, actor, cursor)
feed.extend(data.feed)
cursor = data.cursor
if not cursor:
break
return feed
actor_partition = dg.StaticPartitionsDefinition(["user1.bsky.social", "user2.bsky.social"])
@dg.asset(
partitions_def=actor_partition,
op_tags={"dagster/concurrency_key": "ingestion"},
)
def actor_feed_snapshot(context: dg.AssetExecutionContext, atproto: ATProtoResource):
client = atproto.get_client()
feed_items = get_all_feed_items(client, context.partition_key)
context.log.info(f"Retrieved {len(feed_items)} feed items for {context.partition_key}")