Implementation:Apache Airflow Structlog Configuration
| Knowledge Sources | |
|---|---|
| Domains | Logging, Core_Infrastructure |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Structlog-based logging configuration and integration module for Apache Airflow that configures a unified logging pipeline supporting JSON and console output formats, per-logger level overrides via a trie structure, JWT token redaction, and stdlib logging integration.
Description
The shared/logging/src/airflow_shared/logging/structlog.py file (659 lines) implements Airflow's structured logging system built on the structlog library. It provides a complete logging pipeline that unifies structlog-native and stdlib logging module output through a common set of processors.
Key components:
configure_logging()-- Main entry point that configures both structlog and stdlib logging. Accepts parameters for output format (JSON vs console), log level, custom format strings, per-namespace log levels, color control, and additional processors. It:- Configures the structlog processor chain
- Sets up stdlib's
logging.config.dictConfigwith a structlog-based formatter - Manages per-logger level overrides via a trie (
pygtrie.StringTrie) - Handles color support detection (respects
NO_COLORandFORCE_COLORenv vars)
structlog_processors()-- Cached function that builds the processor chain based on output configuration. Returns a tuple of (shared_processors, stdlib_renderer, structlog_renderer). The shared processor chain includes:MaybeTimeStamper(ISO format)merge_contextvarsfor context propagationadd_log_levelPositionalArgumentsFormatterlogger_namefor named logger identificationredact_jwtfor JWT token redactionStackInfoRenderer- Optional
CallsiteParameterAdderfor filename/lineno
NamedBytesLogger/NamedWriteLogger-- Custom logger classes extending structlog'sBytesLoggerandWriteLoggerwith name support and non-caching I/O wrappers.
LoggerFactory-- Generic factory class that creates named loggers with optional output stream configuration.
init_log_file()-- Creates log files and parent directories with correct permissions for multi-user impersonation scenarios.
reconfigure_logger()-- Allows removing specific processor types from an existing logger and optionally overriding the log level.
- Per-logger level trie -- Uses
pygtrie.StringTrie(dot-separated) for efficient longest-prefix matching of logger names to log levels, mirroring stdlib's hierarchical level cascade.
- JWT redaction -- Pattern-based detection of JWT tokens (matching
eyJprefix in base64) with automatic redaction in all log output.
- Pre-built filtering loggers -- Pre-creates
AirflowBoundLoggerFilteringAt*classes for each standard log level (Notset, Debug, Info, Warning, Error, Critical) to avoid runtime class generation overhead.
Usage
The module is initialized early in Airflow's startup process. For JSON structured logging (e.g., production or cloud deployments), pass json_output=True. For development, the console renderer with colors and Rich tracebacks is used when the DEV environment variable is set.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/logging/src/airflow_shared/logging/structlog.py - Lines: 659
Signature
def configure_logging(
*,
json_output: bool = False,
log_level: str = "DEBUG",
log_format: str = "",
stdlib_config: dict | None = None,
extra_processors: Sequence[Processor] | None = None,
callsite_parameters: Iterable[CallsiteParameter] | None = None,
colors: bool = True,
output: LogOutputType | None = None,
namespace_log_levels: str | dict[str, str] | None = None,
cache_logger_on_first_use: bool = True,
):
"""
Configure structlog (and stdlib's logging to send via structlog processors too).
:param json_output: Set to true to write all logs as JSON (one per line)
:param log_level: The default log level to use for most logs
:param log_format: A percent-style log format to write non JSON logs with
:param output: Where to write the logs to
:param colors: Whether to use colors for non-JSON logs
:param callsite_parameters: Parameters about the callsite to include in logs
:param namespace_log_levels: Per-logger level overrides (e.g., "sqlalchemy=INFO sqlalchemy.engine=DEBUG")
"""
@cache
def structlog_processors(
json_output: bool,
log_format: str = "",
colors: bool = True,
callsite_parameters: tuple[CallsiteParameter, ...] = (),
) -> tuple[list[Processor], Processor, Processor]:
"""Create the correct list of structlog processors for the given config."""
def init_log_file(
base_log_folder: str | os.PathLike[str],
local_relative_path: str | os.PathLike[str],
*,
new_folder_permissions: int = 0o775,
new_file_permissions: int = 0o664,
) -> Path:
"""Ensure log file and parent directories are created with correct permissions."""
def reconfigure_logger(
logger: WrappedLogger,
without_processor_type: type,
level_override: int | None = None,
):
"""Remove specific processor types from a logger and optionally override level."""
class NamedBytesLogger(structlog.BytesLogger):
"""BytesLogger with name attribute and non-caching I/O."""
def __init__(self, name: str | None = None, file: BinaryIO | None = None): ...
class NamedWriteLogger(structlog.WriteLogger):
"""WriteLogger with name attribute and non-caching I/O."""
def __init__(self, name: str | None = None, file: TextIO | None = None): ...
class LoggerFactory(Generic[LogOutputType]):
"""Generic factory creating named loggers with optional output stream."""
def __init__(self, cls: type[WrappedLogger], io: LogOutputType | None = None): ...
def __call__(self, logger_name: str | None = None, *args: Any) -> WrappedLogger: ...
Import
from airflow_shared.logging.structlog import configure_logging
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| json_output | bool |
No (default: False) |
When True, all logs are rendered as JSON (one object per line)
|
| log_level | str |
No (default: "DEBUG") |
Default log level for the root logger and Airflow namespace |
| log_format | str |
No (default: "") |
Percent-style format string (e.g., "%(asctime)s %(levelname)s %(message)s")
|
| stdlib_config | dict |
No | Additional stdlib logging.config.dictConfig entries to merge
|
| extra_processors | Sequence[Processor] |
No | Additional structlog processors appended to the shared chain |
| callsite_parameters | Iterable[CallsiteParameter] |
No | Callsite info to include (filename, lineno, function name, etc.) |
| colors | bool |
No (default: True) |
Enable colored console output (respects NO_COLOR/FORCE_COLOR)
|
| output | BinaryIO | No | Output stream (defaults to stdout/stderr) |
| namespace_log_levels | dict | No | Per-logger level overrides (e.g., "sqlalchemy=INFO httpx=WARN")
|
Outputs
| Name | Type | Description |
|---|---|---|
| Configured logging pipeline | Side effect | Both structlog and stdlib logging are configured to use the unified processor chain |
| Per-logger trie | pygtrie.StringTrie |
Global trie mapping logger name prefixes to log levels |
| Log output | JSON or console text | Formatted log entries written to the configured output stream |
Usage Examples
JSON Output for Production
from airflow_shared.logging.structlog import configure_logging
# Configure JSON structured logging for production/cloud environments
configure_logging(
json_output=True,
log_level="INFO",
namespace_log_levels="sqlalchemy.engine=WARN httpx=WARN",
)
# Output example:
# {"timestamp":"2026-02-08T21:00:00Z","level":"info","event":"DAG parsing complete","logger":"airflow.dag_processing","dag_count":42}
Console Output for Development
from airflow_shared.logging.structlog import configure_logging
from structlog.processors import CallsiteParameter
# Configure colored console output with callsite info
configure_logging(
json_output=False,
log_level="DEBUG",
colors=True,
callsite_parameters=[CallsiteParameter.FILENAME, CallsiteParameter.LINENO],
)
Custom Format String
from airflow_shared.logging.structlog import configure_logging
# Use a percent-style format string similar to stdlib logging
configure_logging(
log_format="[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s",
log_level="INFO",
)
Per-Namespace Log Levels
from airflow_shared.logging.structlog import configure_logging
# Set different log levels for different logger namespaces
configure_logging(
log_level="INFO",
namespace_log_levels={
"sqlalchemy.engine": "DEBUG", # Verbose SQL logging
"httpx": "WARN", # Suppress HTTP client noise
"airflow.dag_processing": "DEBUG", # Verbose DAG processing
},
)
Initializing Log Files With Permissions
from airflow_shared.logging.structlog import init_log_file
# Create a log file with proper permissions for impersonation
log_path = init_log_file(
base_log_folder="/var/log/airflow",
local_relative_path="dag_id/run_id/task_id/attempt_1.log",
new_folder_permissions=0o775,
new_file_permissions=0o664,
)
# log_path = Path("/var/log/airflow/dag_id/run_id/task_id/attempt_1.log")