Implementation:Dagster io Dagster DynamicPartitionsDefinition API
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Event_Driven |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for runtime-created partitions provided by the Dagster core library.
Description
DynamicPartitionsDefinition is a subclass of PartitionsDefinition that maintains a mutable registry of partition keys. Unlike time-based or static partition definitions, the set of valid keys starts empty and grows through explicit add requests -- typically issued from sensor evaluations via SensorResult.dynamic_partitions_requests.
Each dynamic partition definition is identified by a unique name string. This name serves as the key for the partition registry in the Dagster instance storage. Multiple assets can reference the same dynamic partition definition by name, ensuring they share a consistent view of the available partitions.
Usage
Import and use when you need a partition set that grows at runtime. Define the partition set with a unique name, then use sensors to register new partition keys as they are discovered. Attach the definition to assets via the partitions_def parameter.
Code Reference
Source Location
- Repository: dagster
- File: python_modules/dagster/dagster/_core/definitions/partitions/definition/dynamic.py
Signature
class DynamicPartitionsDefinition(PartitionsDefinition):
def __init__(self, name: str):
...
Import
from dagster import DynamicPartitionsDefinition
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str |
Yes | Unique identifier for the dynamic partition set. Used to store and retrieve partition keys from instance storage. |
Outputs
| Name | Type | Description |
|---|---|---|
| instance | DynamicPartitionsDefinition |
A partition definition whose keys are managed at runtime. New keys are added via build_add_request() and removed via build_delete_request().
|
Usage Examples
Sensor-Driven Podcast Episode Partitions
import dagster as dg
rss_entry_partition = dg.DynamicPartitionsDefinition(name="podcast_episodes")
@dg.sensor(job=transcription_job, minimum_interval_seconds=600)
def new_episode_sensor(context: dg.SensorEvaluationContext):
new_episodes = check_rss_feed(context.cursor)
return dg.SensorResult(
run_requests=[
dg.RunRequest(partition_key=ep.id) for ep in new_episodes
],
dynamic_partitions_requests=[
rss_entry_partition.build_add_request(
partition_keys=[ep.id for ep in new_episodes]
)
],
cursor=new_episodes[-1].etag if new_episodes else context.cursor,
)