Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow OtelTrace

From Leeroopedia


Knowledge Sources
Domains Observability, Tracing, OpenTelemetry
Last Updated 2026-02-08 21:00 GMT

Overview

OpenTelemetry tracing integration for Apache Airflow, providing span lifecycle management (root, child, current), context propagation via W3C TraceContext, and deterministic trace/span ID generation through a custom AirflowOtelIdGenerator.

Description

The OtelTrace class is the central tracing facade used when OpenTelemetry tracing is enabled in Airflow. It encapsulates:

  • Span creation: Three distinct span creation methods -- start_span (general purpose with optional parent), start_root_span (creates a new trace root with an invalid parent context), and start_child_span (explicitly links to a parent span context).
  • Context propagation: inject() serializes the current span context into a carrier dict using W3C TraceContext format; extract() deserializes a carrier back into a span context.
  • Processor selection: Supports both BatchSpanProcessor (default, for long-running components like the scheduler) and SimpleSpanProcessor (for tasks that may finish before a batch export interval).
  • Debug mode: Optionally attaches a ConsoleSpanExporter for local debugging.

The AirflowOtelIdGenerator extends OTel's IdGenerator to produce a deterministic span ID on the first call to generate_span_id(), then falls back to random generation for subsequent spans. The trace ID, once set, is returned consistently across all calls.

Supporting utility functions:

  • gen_context() -- Creates a remote SpanContext from raw trace/span ID integers.
  • gen_links_from_kv_list() -- Converts a list of {trace_id, span_id} dicts into OTel Link objects.
  • gen_link_from_traceparent() -- Parses a W3C traceparent string into an OTel Link.

Usage

from airflow_shared.observability.traces.otel_tracer import OtelTrace, get_otel_tracer

# Initialize the tracer from configuration
tracer = get_otel_tracer(
    cls=my_config_class,
    host="otel-collector.svc",
    port=4318,
    otel_service="airflow-scheduler",
)

# Start a root span for a DAG run
with tracer.start_root_span("dag_run.my_dag", component="scheduler") as span:
    span.set_attribute("dag_id", "my_dag")
    # ... schedule tasks ...

    # Start a child span for a specific task
    with tracer.start_child_span("task.extract", component="worker") as child:
        child.set_attribute("task_id", "extract")
        # ... execute task ...

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/traces/otel_tracer.py

Key Classes

OtelTrace (lines 52-296)

class OtelTrace:
    """Handle all tracing requirements such as getting the tracer, and starting a new span."""

    def __init__(
        self,
        span_exporter: OTLPSpanExporter,
        use_simple_processor: bool,
        tag_string: str | None = None,
        otel_service: str | None = None,
        debug: bool = False,
    ): ...

    def get_otel_tracer_provider(
        self, trace_id: int | None = None, span_id: int | None = None,
    ) -> TracerProvider: ...

    def get_tracer(
        self, component: str, trace_id: int | None = None, span_id: int | None = None,
    ) -> OpenTelemetryTracer | Tracer: ...

    def get_current_span(self): ...
    def use_span(self, span: Span): ...

    def start_span(
        self, span_name: str, component: str | None = None,
        parent_sc: SpanContext | None = None, span_id=None,
        links=None, start_time=None,
    ): ...

    def start_root_span(
        self, span_name: str, component: str | None = None,
        links=None, start_time=None, start_as_current: bool = True,
    ): ...

    def start_child_span(
        self, span_name: str, parent_context: Context | None = None,
        component: str | None = None, links=None, start_time=None,
        start_as_current: bool = True,
    ): ...

    def inject(self) -> dict: ...
    def extract(self, carrier: dict) -> Context: ...

AirflowOtelIdGenerator (lines 361-389)

class AirflowOtelIdGenerator(IdGenerator):
    """
    ID Generator for span id and trace id.

    The specific purpose of this ID generator is to generate a given span_id
    when the generate_span_id is called for the FIRST time. Any subsequent
    calls to the generate_span_id() will then fall back into producing random
    ones. As for the trace_id, the class is designed to produce the provided
    trace id (and not anything random).
    """

    def __init__(self, span_id=None, trace_id=None): ...
    def generate_span_id(self) -> int: ...
    def generate_trace_id(self) -> int: ...

Key Functions

def get_otel_tracer(
    cls,
    use_simple_processor: bool = False,
    *,
    host: str | None = None,
    port: int | None = None,
    ssl_active: bool = False,
    otel_service: str | None = None,
    debug: bool = False,
) -> OtelTrace: ...

def gen_context(trace_id: int, span_id: int):
    """Generate a remote span context for given trace and span id."""

def gen_links_from_kv_list(kv_list):
    """Convert list of kv dict of trace_id and span_id and generate list of SpanContext."""

def gen_link_from_traceparent(traceparent: str):
    """Generate Link object from provided traceparent string."""

Import

from airflow_shared.observability.traces.otel_tracer import OtelTrace, get_otel_tracer
from airflow_shared.observability.traces.otel_tracer import (
    AirflowOtelIdGenerator,
    gen_context,
    gen_links_from_kv_list,
    gen_link_from_traceparent,
)

I/O Contract

Inputs

Name Type Required Description
span_exporter OTLPSpanExporter Yes OTLP HTTP span exporter connected to the collector
use_simple_processor bool No If True, uses SimpleSpanProcessor for immediate export (suitable for short-lived tasks)
tag_string str or None No Tracestate-formatted tags to attach as span attributes
otel_service str or None No Service name for the OTel resource (default: "airflow")
span_name str Yes (per span method) Human-readable name for the span
component str or None No Component name used to obtain the tracer (default: otel_service)
parent_sc / parent_context SpanContext / Context No Parent span context for child span creation
links list[dict] or list[Link] No Span links to related traces
start_time datetime No Explicit start time; converted to nanoseconds internally

Outputs

Name Type Description
Span context manager AbstractContextManager[Span] Context manager wrapping an OTel span (from start_as_current_span)
Span trace.span.Span Raw span object when start_as_current=False
Carrier dict dict[str, str] W3C TraceContext carrier from inject()
Context opentelemetry.context.Context Extracted context from extract()
OtelTrace OtelTrace Fully configured tracer returned by get_otel_tracer()

Usage Examples

Context Propagation Across Process Boundaries

# In the scheduler process: inject context into a message
with tracer.start_root_span("dag_run.my_dag") as span:
    carrier = tracer.inject()
    send_to_worker(task_id="extract", carrier=carrier)

# In the worker process: extract context and create a child span
ctx = tracer.extract(carrier)
with tracer.start_child_span("task.extract", parent_context=ctx) as child:
    execute_task()

Using Deterministic IDs

from airflow_shared.observability.traces.otel_tracer import AirflowOtelIdGenerator

# Create a tracer provider with a known trace and span ID
id_gen = AirflowOtelIdGenerator(span_id=0xABCD, trace_id=0x1234567890ABCDEF)
first_span_id = id_gen.generate_span_id()   # Returns 0xABCD
second_span_id = id_gen.generate_span_id()   # Returns random 64-bit int
trace_id = id_gen.generate_trace_id()        # Always returns 0x1234567890ABCDEF

Generating Links from Traceparent

from airflow_shared.observability.traces.otel_tracer import gen_link_from_traceparent

traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"
link = gen_link_from_traceparent(traceparent)
# link.context contains the SpanContext parsed from the traceparent header

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment