Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow BaseOperator Interface

From Leeroopedia


Knowledge Sources
Domains Workflow_Orchestration, Python_API
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for defining custom task types provided by the BaseOperator and BaseSensorOperator classes.

Description

BaseOperator is the abstract base class for all operators with extensive configuration attributes. BaseSensorOperator extends it with poke/reschedule semantics for condition-waiting tasks. Both use Python dataclass-style field definitions with defaults from Airflow configuration.

Usage

Subclass BaseOperator for action tasks and BaseSensorOperator for condition-waiting tasks. Implement execute() or poke() respectively.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: task-sdk/src/airflow/sdk/bases/operator.py
  • Lines: L641-1993

Signature

@dataclass(repr=False)
class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
    task_id: str
    owner: str = DEFAULT_OWNER
    retries: int | None = DEFAULT_RETRIES
    retry_delay: timedelta = DEFAULT_RETRY_DELAY
    pool: str = DEFAULT_POOL_NAME
    pool_slots: int = DEFAULT_POOL_SLOTS
    queue: str = DEFAULT_QUEUE
    priority_weight: int = DEFAULT_PRIORITY_WEIGHT
    execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT
    trigger_rule: TriggerRule = DEFAULT_TRIGGER_RULE
    do_xcom_push: bool = True
    template_fields: Collection[str] = ()
    template_ext: Sequence[str] = ()
    ui_color: str = "#fff"
    ui_fgcolor: str = "#000"

    def execute(self, context: Context) -> Any:
        """Main task execution logic. Must be implemented by subclasses."""
        raise NotImplementedError()

BaseSensorOperator:

class BaseSensorOperator(BaseOperator):
    ui_color: str = "#e6f1f2"

    def __init__(
        self,
        *,
        poke_interval: timedelta | float = 60,
        timeout: timedelta | float = conf.getfloat("sensors", "default_timeout"),
        soft_fail: bool = False,
        mode: str = "poke",      # "poke" or "reschedule"
        exponential_backoff: bool = False,
        max_wait: timedelta | float | None = None,
        silent_fail: bool = False,
        never_fail: bool = False,
        **kwargs,
    ): ...

    def poke(self, context: Context) -> bool:
        """Check condition. Return True when condition is met."""
        raise NotImplementedError()

Import

from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.bases.sensor import BaseSensorOperator

I/O Contract

Inputs

Name Type Required Description
task_id str Yes Unique task identifier within DAG
context Context Yes (execute) Execution context with task instance, DAG run info
pool str No Resource pool for concurrency management
retries int No Number of retry attempts on failure

Outputs

Name Type Description
Return value Any Auto-pushed as XCom if do_xcom_push=True
poke result bool True if condition met (sensors)
State change str success, failed, up_for_retry, skipped

Usage Examples

Custom Operator

from airflow.sdk.bases.operator import BaseOperator

class MyOperator(BaseOperator):
    template_fields = ("query",)

    def __init__(self, query: str, conn_id: str, **kwargs):
        super().__init__(**kwargs)
        self.query = query
        self.conn_id = conn_id

    def execute(self, context):
        hook = MyHook(self.conn_id)
        return hook.run_query(self.query)

Custom Sensor

from airflow.sdk.bases.sensor import BaseSensorOperator

class MyFileSensor(BaseSensorOperator):
    def __init__(self, filepath: str, **kwargs):
        super().__init__(**kwargs)
        self.filepath = filepath

    def poke(self, context) -> bool:
        import os
        return os.path.exists(self.filepath)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment