Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Dagster io Dagster Tenacity Retry Pattern

From Leeroopedia


Field Value
Implementation Name Tenacity Retry Pattern
Source (pattern) examples/docs_projects/project_atproto_dashboard/src/project_atproto_dashboard/defs/atproto.py:L12-47
Source (dagster config) examples/docs_projects/project_atproto_dashboard/dagster.yaml
Repository dagster-io/dagster
Domains Data_Engineering, API_Integration, Resilience

Overview

Concrete implementation of rate limiting using tenacity retry decorators and Dagster concurrency controls for API-heavy pipelines.

Description

This implementation demonstrates a multi-layered rate limiting strategy used in the AT Protocol (Bluesky) dashboard example project. At the request level, the tenacity library retries individual API calls with a fixed 150-second wait between attempts (up to 5 attempts). At the asset level, Dagster's dagster/concurrency_key op tag ensures that only one ingestion asset runs at a time. At the run level, the QueuedRunCoordinator with default_op_concurrency_limit: 1 provides a global concurrency ceiling.

Usage

Use this pattern when building pipelines that call rate-limited APIs. The tenacity retry handles transient rate limit errors at the call level, while Dagster's concurrency controls prevent multiple assets or runs from overwhelming the API simultaneously.

Code Reference

Source Location

  • Retry pattern: examples/docs_projects/project_atproto_dashboard/src/project_atproto_dashboard/defs/atproto.py:L12-47
  • Dagster configuration: examples/docs_projects/project_atproto_dashboard/dagster.yaml

Retry Pattern

import math
import tenacity
from tenacity import stop_after_attempt, wait_fixed

@tenacity.retry(
    stop=tenacity.stop_after_attempt(5),
    wait=tenacity.wait_fixed(math.ceil(60 * 2.5)),
)
def _get_feed_with_retries(client, actor, cursor):
    """Fetch feed with automatic retry on rate limit errors."""
    return client.get_author_feed(actor=actor, cursor=cursor, limit=100)

Asset with Concurrency Key

import dagster as dg

@dg.asset(
    partitions_def=actor_partition,
    op_tags={"dagster/concurrency_key": "ingestion"},
)
def actor_feed_snapshot(context, atproto: ATProtoResource, s3: S3Resource):
    client = atproto.get_client()
    feed = _get_feed_with_retries(client, context.partition_key, None)
    ...

Dagster Configuration (dagster.yaml)

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

concurrency:
  default_op_concurrency_limit: 1

Import

from tenacity import retry, stop_after_attempt, wait_fixed

I/O Contract

Direction Name Type Description
Input client API client The API client instance to make calls with (e.g., AT Protocol Client)
Input retry parameters stop_after_attempt(5), wait_fixed(150) Tenacity retry configuration: max attempts and wait time between retries
Input concurrency_key str Dagster op tag value that groups assets for serial execution
Output API response varies The API response, retried transparently on failure
Output execution order implicit Sequential execution enforced via concurrency key and run coordinator

Usage Examples

Full Ingestion Pattern

import math
from typing import Optional

import dagster as dg
import tenacity
from atproto import Client


@tenacity.retry(
    stop=tenacity.stop_after_attempt(5),
    wait=tenacity.wait_fixed(math.ceil(60 * 2.5)),
)
def _get_feed_with_retries(client: Client, actor: str, cursor: Optional[str]):
    return client.get_author_feed(actor=actor, cursor=cursor, limit=100)


def get_all_feed_items(client: Client, actor: str):
    feed = []
    cursor = None
    while True:
        data = _get_feed_with_retries(client, actor, cursor)
        feed.extend(data.feed)
        cursor = data.cursor
        if not cursor:
            break
    return feed


actor_partition = dg.StaticPartitionsDefinition(["user1.bsky.social", "user2.bsky.social"])


@dg.asset(
    partitions_def=actor_partition,
    op_tags={"dagster/concurrency_key": "ingestion"},
)
def actor_feed_snapshot(context: dg.AssetExecutionContext, atproto: ATProtoResource):
    client = atproto.get_client()
    feed_items = get_all_feed_items(client, context.partition_key)
    context.log.info(f"Retrieved {len(feed_items)} feed items for {context.partition_key}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment