Implementation:Apache Airflow OtelTrace
| Knowledge Sources | |
|---|---|
| Domains | Observability, Tracing, OpenTelemetry |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
OpenTelemetry tracing integration for Apache Airflow, providing span lifecycle management (root, child, current), context propagation via W3C TraceContext, and deterministic trace/span ID generation through a custom AirflowOtelIdGenerator.
Description
The OtelTrace class is the central tracing facade used when OpenTelemetry tracing is enabled in Airflow. It encapsulates:
- Span creation: Three distinct span creation methods --
start_span(general purpose with optional parent),start_root_span(creates a new trace root with an invalid parent context), andstart_child_span(explicitly links to a parent span context). - Context propagation:
inject()serializes the current span context into a carrier dict using W3C TraceContext format;extract()deserializes a carrier back into a span context. - Processor selection: Supports both
BatchSpanProcessor(default, for long-running components like the scheduler) andSimpleSpanProcessor(for tasks that may finish before a batch export interval). - Debug mode: Optionally attaches a
ConsoleSpanExporterfor local debugging.
The AirflowOtelIdGenerator extends OTel's IdGenerator to produce a deterministic span ID on the first call to generate_span_id(), then falls back to random generation for subsequent spans. The trace ID, once set, is returned consistently across all calls.
Supporting utility functions:
- gen_context() -- Creates a remote
SpanContextfrom raw trace/span ID integers. - gen_links_from_kv_list() -- Converts a list of
{trace_id, span_id}dicts into OTelLinkobjects. - gen_link_from_traceparent() -- Parses a W3C traceparent string into an OTel
Link.
Usage
from airflow_shared.observability.traces.otel_tracer import OtelTrace, get_otel_tracer
# Initialize the tracer from configuration
tracer = get_otel_tracer(
cls=my_config_class,
host="otel-collector.svc",
port=4318,
otel_service="airflow-scheduler",
)
# Start a root span for a DAG run
with tracer.start_root_span("dag_run.my_dag", component="scheduler") as span:
span.set_attribute("dag_id", "my_dag")
# ... schedule tasks ...
# Start a child span for a specific task
with tracer.start_child_span("task.extract", component="worker") as child:
child.set_attribute("task_id", "extract")
# ... execute task ...
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/traces/otel_tracer.py
Key Classes
OtelTrace (lines 52-296)
class OtelTrace:
"""Handle all tracing requirements such as getting the tracer, and starting a new span."""
def __init__(
self,
span_exporter: OTLPSpanExporter,
use_simple_processor: bool,
tag_string: str | None = None,
otel_service: str | None = None,
debug: bool = False,
): ...
def get_otel_tracer_provider(
self, trace_id: int | None = None, span_id: int | None = None,
) -> TracerProvider: ...
def get_tracer(
self, component: str, trace_id: int | None = None, span_id: int | None = None,
) -> OpenTelemetryTracer | Tracer: ...
def get_current_span(self): ...
def use_span(self, span: Span): ...
def start_span(
self, span_name: str, component: str | None = None,
parent_sc: SpanContext | None = None, span_id=None,
links=None, start_time=None,
): ...
def start_root_span(
self, span_name: str, component: str | None = None,
links=None, start_time=None, start_as_current: bool = True,
): ...
def start_child_span(
self, span_name: str, parent_context: Context | None = None,
component: str | None = None, links=None, start_time=None,
start_as_current: bool = True,
): ...
def inject(self) -> dict: ...
def extract(self, carrier: dict) -> Context: ...
AirflowOtelIdGenerator (lines 361-389)
class AirflowOtelIdGenerator(IdGenerator):
"""
ID Generator for span id and trace id.
The specific purpose of this ID generator is to generate a given span_id
when the generate_span_id is called for the FIRST time. Any subsequent
calls to the generate_span_id() will then fall back into producing random
ones. As for the trace_id, the class is designed to produce the provided
trace id (and not anything random).
"""
def __init__(self, span_id=None, trace_id=None): ...
def generate_span_id(self) -> int: ...
def generate_trace_id(self) -> int: ...
Key Functions
def get_otel_tracer(
cls,
use_simple_processor: bool = False,
*,
host: str | None = None,
port: int | None = None,
ssl_active: bool = False,
otel_service: str | None = None,
debug: bool = False,
) -> OtelTrace: ...
def gen_context(trace_id: int, span_id: int):
"""Generate a remote span context for given trace and span id."""
def gen_links_from_kv_list(kv_list):
"""Convert list of kv dict of trace_id and span_id and generate list of SpanContext."""
def gen_link_from_traceparent(traceparent: str):
"""Generate Link object from provided traceparent string."""
Import
from airflow_shared.observability.traces.otel_tracer import OtelTrace, get_otel_tracer
from airflow_shared.observability.traces.otel_tracer import (
AirflowOtelIdGenerator,
gen_context,
gen_links_from_kv_list,
gen_link_from_traceparent,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| span_exporter | OTLPSpanExporter | Yes | OTLP HTTP span exporter connected to the collector |
| use_simple_processor | bool | No | If True, uses SimpleSpanProcessor for immediate export (suitable for short-lived tasks) |
| tag_string | str or None | No | Tracestate-formatted tags to attach as span attributes |
| otel_service | str or None | No | Service name for the OTel resource (default: "airflow")
|
| span_name | str | Yes (per span method) | Human-readable name for the span |
| component | str or None | No | Component name used to obtain the tracer (default: otel_service) |
| parent_sc / parent_context | SpanContext / Context | No | Parent span context for child span creation |
| links | list[dict] or list[Link] | No | Span links to related traces |
| start_time | datetime | No | Explicit start time; converted to nanoseconds internally |
Outputs
| Name | Type | Description |
|---|---|---|
| Span context manager | AbstractContextManager[Span] | Context manager wrapping an OTel span (from start_as_current_span)
|
| Span | trace.span.Span | Raw span object when start_as_current=False
|
| Carrier dict | dict[str, str] | W3C TraceContext carrier from inject()
|
| Context | opentelemetry.context.Context | Extracted context from extract()
|
| OtelTrace | OtelTrace | Fully configured tracer returned by get_otel_tracer()
|
Usage Examples
Context Propagation Across Process Boundaries
# In the scheduler process: inject context into a message
with tracer.start_root_span("dag_run.my_dag") as span:
carrier = tracer.inject()
send_to_worker(task_id="extract", carrier=carrier)
# In the worker process: extract context and create a child span
ctx = tracer.extract(carrier)
with tracer.start_child_span("task.extract", parent_context=ctx) as child:
execute_task()
Using Deterministic IDs
from airflow_shared.observability.traces.otel_tracer import AirflowOtelIdGenerator
# Create a tracer provider with a known trace and span ID
id_gen = AirflowOtelIdGenerator(span_id=0xABCD, trace_id=0x1234567890ABCDEF)
first_span_id = id_gen.generate_span_id() # Returns 0xABCD
second_span_id = id_gen.generate_span_id() # Returns random 64-bit int
trace_id = id_gen.generate_trace_id() # Always returns 0x1234567890ABCDEF
Generating Links from Traceparent
from airflow_shared.observability.traces.otel_tracer import gen_link_from_traceparent
traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
link = gen_link_from_traceparent(traceparent)
# link.context contains the SpanContext parsed from the traceparent header