Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Dagster io Dagster Sensor Decorator

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Event_Driven
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for defining event-driven sensors provided by the Dagster core library.

Description

The @sensor decorator transforms a Python function into a SensorDefinition that the Dagster daemon evaluates at regular intervals. The decorated function receives a SensorEvaluationContext and returns a SensorResult containing run requests, dynamic partition modifications, and an updated cursor.

The decorator accepts configuration for the target job or asset selection, polling interval, default status (running or stopped), and metadata. Sensors can target a single job, multiple jobs, or an asset selection for materialization.

Usage

Import and use when you need to poll an external system for changes and trigger pipeline runs in response. The sensor function should check the external system using the cursor from context.cursor, determine what has changed, and return appropriate run requests.

Code Reference

Source Location

  • Repository: dagster
  • File: python_modules/dagster/dagster/_core/definitions/decorators/sensor_decorator.py:L37-127

Signature

def sensor(
    job_name: Optional[str] = None,
    *,
    name: Optional[str] = None,
    minimum_interval_seconds: Optional[int] = None,
    description: Optional[str] = None,
    job: Optional[ExecutableDefinition] = None,
    jobs: Optional[Sequence[ExecutableDefinition]] = None,
    default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
    asset_selection: Optional[CoercibleToAssetSelection] = None,
    required_resource_keys: Optional[set[str]] = None,
    tags: Optional[Mapping[str, str]] = None,
    metadata: Optional[RawMetadataMapping] = None,
    target: Optional[
        Union[
            CoercibleToAssetSelection,
            AssetsDefinition,
            JobDefinition,
            UnresolvedAssetJobDefinition,
        ]
    ] = None,
    owners: Optional[Sequence[str]] = None,
) -> Callable[[RawSensorEvaluationFunction], SensorDefinition]

Import

from dagster import sensor, SensorResult, RunRequest, SensorEvaluationContext, DefaultSensorStatus

I/O Contract

Inputs

Name Type Required Description
job_name Optional[str] No Name of the job to trigger. Mutually exclusive with job, jobs, and target.
name Optional[str] No Name for the sensor. Defaults to the decorated function name.
minimum_interval_seconds Optional[int] No Minimum seconds between sensor evaluations.
description Optional[str] No Human-readable description of the sensor.
job Optional[ExecutableDefinition] No Job definition to trigger. Mutually exclusive with job_name, jobs, and target.
jobs Optional[Sequence[ExecutableDefinition]] No Multiple job definitions the sensor can trigger.
default_status DefaultSensorStatus No Whether the sensor starts in RUNNING or STOPPED state (default STOPPED).
asset_selection Optional[CoercibleToAssetSelection] No Asset selection to materialize instead of a job.
target Optional[Union[CoercibleToAssetSelection, AssetsDefinition, JobDefinition, UnresolvedAssetJobDefinition]] No Unified target parameter accepting jobs or asset selections.
owners Optional[Sequence[str]] No List of owners for the sensor (e.g., team email addresses).

Outputs

Name Type Description
return value SensorResult Contains run_requests (list of RunRequest), dynamic_partitions_requests (list of partition add/delete requests), and cursor (updated cursor string).
skip SkipReason Returned instead of SensorResult when no action is needed, with a human-readable explanation.

Usage Examples

RSS Feed Sensor

import dagster as dg
import feedparser

@dg.sensor(
    name="rss_feed_sensor",
    minimum_interval_seconds=600,
    default_status=dg.DefaultSensorStatus.RUNNING,
    job=process_job,
)
def rss_feed_sensor(context: dg.SensorEvaluationContext):
    feed = feedparser.parse("https://example.com/feed.xml", etag=context.cursor)
    if not feed.entries:
        return dg.SkipReason("No new entries")
    return dg.SensorResult(
        run_requests=[
            dg.RunRequest(partition_key=entry.id) for entry in feed.entries
        ],
        cursor=feed.etag,
    )

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment