Implementation:Apache Airflow BaseOperator Interface
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Workflow_Orchestration, Python_API |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for defining custom task types provided by the BaseOperator and BaseSensorOperator classes.
Description
BaseOperator is the abstract base class for all operators with extensive configuration attributes. BaseSensorOperator extends it with poke/reschedule semantics for condition-waiting tasks. Both use Python dataclass-style field definitions with defaults from Airflow configuration.
Usage
Subclass BaseOperator for action tasks and BaseSensorOperator for condition-waiting tasks. Implement execute() or poke() respectively.
Code Reference
Source Location
- Repository: Apache Airflow
- File: task-sdk/src/airflow/sdk/bases/operator.py
- Lines: L641-1993
Signature
@dataclass(repr=False)
class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
task_id: str
owner: str = DEFAULT_OWNER
retries: int | None = DEFAULT_RETRIES
retry_delay: timedelta = DEFAULT_RETRY_DELAY
pool: str = DEFAULT_POOL_NAME
pool_slots: int = DEFAULT_POOL_SLOTS
queue: str = DEFAULT_QUEUE
priority_weight: int = DEFAULT_PRIORITY_WEIGHT
execution_timeout: timedelta | None = DEFAULT_TASK_EXECUTION_TIMEOUT
trigger_rule: TriggerRule = DEFAULT_TRIGGER_RULE
do_xcom_push: bool = True
template_fields: Collection[str] = ()
template_ext: Sequence[str] = ()
ui_color: str = "#fff"
ui_fgcolor: str = "#000"
def execute(self, context: Context) -> Any:
"""Main task execution logic. Must be implemented by subclasses."""
raise NotImplementedError()
BaseSensorOperator:
class BaseSensorOperator(BaseOperator):
ui_color: str = "#e6f1f2"
def __init__(
self,
*,
poke_interval: timedelta | float = 60,
timeout: timedelta | float = conf.getfloat("sensors", "default_timeout"),
soft_fail: bool = False,
mode: str = "poke", # "poke" or "reschedule"
exponential_backoff: bool = False,
max_wait: timedelta | float | None = None,
silent_fail: bool = False,
never_fail: bool = False,
**kwargs,
): ...
def poke(self, context: Context) -> bool:
"""Check condition. Return True when condition is met."""
raise NotImplementedError()
Import
from airflow.sdk.bases.operator import BaseOperator
from airflow.sdk.bases.sensor import BaseSensorOperator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| task_id | str | Yes | Unique task identifier within DAG |
| context | Context | Yes (execute) | Execution context with task instance, DAG run info |
| pool | str | No | Resource pool for concurrency management |
| retries | int | No | Number of retry attempts on failure |
Outputs
| Name | Type | Description |
|---|---|---|
| Return value | Any | Auto-pushed as XCom if do_xcom_push=True |
| poke result | bool | True if condition met (sensors) |
| State change | str | success, failed, up_for_retry, skipped |
Usage Examples
Custom Operator
from airflow.sdk.bases.operator import BaseOperator
class MyOperator(BaseOperator):
template_fields = ("query",)
def __init__(self, query: str, conn_id: str, **kwargs):
super().__init__(**kwargs)
self.query = query
self.conn_id = conn_id
def execute(self, context):
hook = MyHook(self.conn_id)
return hook.run_query(self.query)
Custom Sensor
from airflow.sdk.bases.sensor import BaseSensorOperator
class MyFileSensor(BaseSensorOperator):
def __init__(self, filepath: str, **kwargs):
super().__init__(**kwargs)
self.filepath = filepath
def poke(self, context) -> bool:
import os
return os.path.exists(self.filepath)
Related Pages
Implements Principle
Requires Environment
Uses Heuristic
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment