Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow Structlog Configuration

From Leeroopedia


Knowledge Sources
Domains Logging, Core_Infrastructure
Last Updated 2026-02-08 21:00 GMT

Overview

Structlog-based logging configuration and integration module for Apache Airflow that configures a unified logging pipeline supporting JSON and console output formats, per-logger level overrides via a trie structure, JWT token redaction, and stdlib logging integration.

Description

The shared/logging/src/airflow_shared/logging/structlog.py file (659 lines) implements Airflow's structured logging system built on the structlog library. It provides a complete logging pipeline that unifies structlog-native and stdlib logging module output through a common set of processors.

Key components:

  • configure_logging() -- Main entry point that configures both structlog and stdlib logging. Accepts parameters for output format (JSON vs console), log level, custom format strings, per-namespace log levels, color control, and additional processors. It:
    • Configures the structlog processor chain
    • Sets up stdlib's logging.config.dictConfig with a structlog-based formatter
    • Manages per-logger level overrides via a trie (pygtrie.StringTrie)
    • Handles color support detection (respects NO_COLOR and FORCE_COLOR env vars)
  • structlog_processors() -- Cached function that builds the processor chain based on output configuration. Returns a tuple of (shared_processors, stdlib_renderer, structlog_renderer). The shared processor chain includes:
    • MaybeTimeStamper (ISO format)
    • merge_contextvars for context propagation
    • add_log_level
    • PositionalArgumentsFormatter
    • logger_name for named logger identification
    • redact_jwt for JWT token redaction
    • StackInfoRenderer
    • Optional CallsiteParameterAdder for filename/lineno
  • NamedBytesLogger / NamedWriteLogger -- Custom logger classes extending structlog's BytesLogger and WriteLogger with name support and non-caching I/O wrappers.
  • LoggerFactory -- Generic factory class that creates named loggers with optional output stream configuration.
  • init_log_file() -- Creates log files and parent directories with correct permissions for multi-user impersonation scenarios.
  • reconfigure_logger() -- Allows removing specific processor types from an existing logger and optionally overriding the log level.
  • Per-logger level trie -- Uses pygtrie.StringTrie (dot-separated) for efficient longest-prefix matching of logger names to log levels, mirroring stdlib's hierarchical level cascade.
  • JWT redaction -- Pattern-based detection of JWT tokens (matching eyJ prefix in base64) with automatic redaction in all log output.
  • Pre-built filtering loggers -- Pre-creates AirflowBoundLoggerFilteringAt* classes for each standard log level (Notset, Debug, Info, Warning, Error, Critical) to avoid runtime class generation overhead.

Usage

The module is initialized early in Airflow's startup process. For JSON structured logging (e.g., production or cloud deployments), pass json_output=True. For development, the console renderer with colors and Rich tracebacks is used when the DEV environment variable is set.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/logging/src/airflow_shared/logging/structlog.py
  • Lines: 659

Signature

def configure_logging(
    *,
    json_output: bool = False,
    log_level: str = "DEBUG",
    log_format: str = "",
    stdlib_config: dict | None = None,
    extra_processors: Sequence[Processor] | None = None,
    callsite_parameters: Iterable[CallsiteParameter] | None = None,
    colors: bool = True,
    output: LogOutputType | None = None,
    namespace_log_levels: str | dict[str, str] | None = None,
    cache_logger_on_first_use: bool = True,
):
    """
    Configure structlog (and stdlib's logging to send via structlog processors too).

    :param json_output: Set to true to write all logs as JSON (one per line)
    :param log_level: The default log level to use for most logs
    :param log_format: A percent-style log format to write non JSON logs with
    :param output: Where to write the logs to
    :param colors: Whether to use colors for non-JSON logs
    :param callsite_parameters: Parameters about the callsite to include in logs
    :param namespace_log_levels: Per-logger level overrides (e.g., "sqlalchemy=INFO sqlalchemy.engine=DEBUG")
    """

@cache
def structlog_processors(
    json_output: bool,
    log_format: str = "",
    colors: bool = True,
    callsite_parameters: tuple[CallsiteParameter, ...] = (),
) -> tuple[list[Processor], Processor, Processor]:
    """Create the correct list of structlog processors for the given config."""

def init_log_file(
    base_log_folder: str | os.PathLike[str],
    local_relative_path: str | os.PathLike[str],
    *,
    new_folder_permissions: int = 0o775,
    new_file_permissions: int = 0o664,
) -> Path:
    """Ensure log file and parent directories are created with correct permissions."""

def reconfigure_logger(
    logger: WrappedLogger,
    without_processor_type: type,
    level_override: int | None = None,
):
    """Remove specific processor types from a logger and optionally override level."""

class NamedBytesLogger(structlog.BytesLogger):
    """BytesLogger with name attribute and non-caching I/O."""
    def __init__(self, name: str | None = None, file: BinaryIO | None = None): ...

class NamedWriteLogger(structlog.WriteLogger):
    """WriteLogger with name attribute and non-caching I/O."""
    def __init__(self, name: str | None = None, file: TextIO | None = None): ...

class LoggerFactory(Generic[LogOutputType]):
    """Generic factory creating named loggers with optional output stream."""
    def __init__(self, cls: type[WrappedLogger], io: LogOutputType | None = None): ...
    def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger: ...

Import

from airflow_shared.logging.structlog import configure_logging

I/O Contract

Inputs

Name Type Required Description
json_output bool No (default: False) When True, all logs are rendered as JSON (one object per line)
log_level str No (default: "DEBUG") Default log level for the root logger and Airflow namespace
log_format str No (default: "") Percent-style format string (e.g., "%(asctime)s %(levelname)s %(message)s")
stdlib_config dict No Additional stdlib logging.config.dictConfig entries to merge
extra_processors Sequence[Processor] No Additional structlog processors appended to the shared chain
callsite_parameters Iterable[CallsiteParameter] No Callsite info to include (filename, lineno, function name, etc.)
colors bool No (default: True) Enable colored console output (respects NO_COLOR/FORCE_COLOR)
output BinaryIO No Output stream (defaults to stdout/stderr)
namespace_log_levels dict No Per-logger level overrides (e.g., "sqlalchemy=INFO httpx=WARN")

Outputs

Name Type Description
Configured logging pipeline Side effect Both structlog and stdlib logging are configured to use the unified processor chain
Per-logger trie pygtrie.StringTrie Global trie mapping logger name prefixes to log levels
Log output JSON or console text Formatted log entries written to the configured output stream

Usage Examples

JSON Output for Production

from airflow_shared.logging.structlog import configure_logging

# Configure JSON structured logging for production/cloud environments
configure_logging(
    json_output=True,
    log_level="INFO",
    namespace_log_levels="sqlalchemy.engine=WARN httpx=WARN",
)

# Output example:
# {"timestamp":"2026-02-08T21:00:00Z","level":"info","event":"DAG parsing complete","logger":"airflow.dag_processing","dag_count":42}

Console Output for Development

from airflow_shared.logging.structlog import configure_logging
from structlog.processors import CallsiteParameter

# Configure colored console output with callsite info
configure_logging(
    json_output=False,
    log_level="DEBUG",
    colors=True,
    callsite_parameters=[CallsiteParameter.FILENAME, CallsiteParameter.LINENO],
)

Custom Format String

from airflow_shared.logging.structlog import configure_logging

# Use a percent-style format string similar to stdlib logging
configure_logging(
    log_format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s",
    log_level="INFO",
)

Per-Namespace Log Levels

from airflow_shared.logging.structlog import configure_logging

# Set different log levels for different logger namespaces
configure_logging(
    log_level="INFO",
    namespace_log_levels={
        "sqlalchemy.engine": "DEBUG",   # Verbose SQL logging
        "httpx": "WARN",                # Suppress HTTP client noise
        "airflow.dag_processing": "DEBUG",  # Verbose DAG processing
    },
)

Initializing Log Files With Permissions

from airflow_shared.logging.structlog import init_log_file

# Create a log file with proper permissions for impersonation
log_path = init_log_file(
    base_log_folder="/var/log/airflow",
    local_relative_path="dag_id/run_id/task_id/attempt_1.log",
    new_folder_permissions=0o775,
    new_file_permissions=0o664,
)
# log_path = Path("/var/log/airflow/dag_id/run_id/task_id/attempt_1.log")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment