Implementation:Dagster io Dagster Sensor Decorator
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Event_Driven |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for defining event-driven sensors provided by the Dagster core library.
Description
The @sensor decorator transforms a Python function into a SensorDefinition that the Dagster daemon evaluates at regular intervals. The decorated function receives a SensorEvaluationContext and returns a SensorResult containing run requests, dynamic partition modifications, and an updated cursor.
The decorator accepts configuration for the target job or asset selection, polling interval, default status (running or stopped), and metadata. Sensors can target a single job, multiple jobs, or an asset selection for materialization.
Usage
Import and use when you need to poll an external system for changes and trigger pipeline runs in response. The sensor function should check the external system using the cursor from context.cursor, determine what has changed, and return appropriate run requests.
Code Reference
Source Location
- Repository: dagster
- File: python_modules/dagster/dagster/_core/definitions/decorators/sensor_decorator.py:L37-127
Signature
def sensor(
job_name: Optional[str] = None,
*,
name: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
asset_selection: Optional[CoercibleToAssetSelection] = None,
required_resource_keys: Optional[set[str]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[RawMetadataMapping] = None,
target: Optional[
Union[
CoercibleToAssetSelection,
AssetsDefinition,
JobDefinition,
UnresolvedAssetJobDefinition,
]
] = None,
owners: Optional[Sequence[str]] = None,
) -> Callable[[RawSensorEvaluationFunction], SensorDefinition]
Import
from dagster import sensor, SensorResult, RunRequest, SensorEvaluationContext, DefaultSensorStatus
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job_name | Optional[str] |
No | Name of the job to trigger. Mutually exclusive with job, jobs, and target.
|
| name | Optional[str] |
No | Name for the sensor. Defaults to the decorated function name. |
| minimum_interval_seconds | Optional[int] |
No | Minimum seconds between sensor evaluations. |
| description | Optional[str] |
No | Human-readable description of the sensor. |
| job | Optional[ExecutableDefinition] |
No | Job definition to trigger. Mutually exclusive with job_name, jobs, and target.
|
| jobs | Optional[Sequence[ExecutableDefinition]] |
No | Multiple job definitions the sensor can trigger. |
| default_status | DefaultSensorStatus |
No | Whether the sensor starts in RUNNING or STOPPED state (default STOPPED). |
| asset_selection | Optional[CoercibleToAssetSelection] |
No | Asset selection to materialize instead of a job. |
| target | Optional[Union[CoercibleToAssetSelection, AssetsDefinition, JobDefinition, UnresolvedAssetJobDefinition]] |
No | Unified target parameter accepting jobs or asset selections. |
| owners | Optional[Sequence[str]] |
No | List of owners for the sensor (e.g., team email addresses). |
Outputs
| Name | Type | Description |
|---|---|---|
| return value | SensorResult |
Contains run_requests (list of RunRequest), dynamic_partitions_requests (list of partition add/delete requests), and cursor (updated cursor string).
|
| skip | SkipReason |
Returned instead of SensorResult when no action is needed, with a human-readable explanation. |
Usage Examples
RSS Feed Sensor
import dagster as dg
import feedparser
@dg.sensor(
name="rss_feed_sensor",
minimum_interval_seconds=600,
default_status=dg.DefaultSensorStatus.RUNNING,
job=process_job,
)
def rss_feed_sensor(context: dg.SensorEvaluationContext):
feed = feedparser.parse("https://example.com/feed.xml", etag=context.cursor)
if not feed.entries:
return dg.SkipReason("No new entries")
return dg.SensorResult(
run_requests=[
dg.RunRequest(partition_key=entry.id) for entry in feed.entries
],
cursor=feed.etag,
)
Related Pages
Implements Principle
Requires Environment
- Environment:Dagster_io_Dagster_Python_3_10_Runtime
- Environment:Dagster_io_Dagster_DAGSTER_HOME_Configuration
- Environment:Dagster_io_Dagster_GRPC_Communication