Implementation:Apache Airflow Configuration Parser
| Knowledge Sources | |
|---|---|
| Domains | Configuration, Core_Infrastructure |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Core configuration parser extending Python's ConfigParser with multi-source value resolution from config files, environment variables (AIRFLOW__SECTION__KEY), command execution, and secrets backends, plus type coercion methods for boolean, integer, float, JSON, timedelta, enum, and import values.
Description
The AirflowConfigParser class in shared/configuration/src/airflow_shared/configuration/parser.py (1,778 lines) is the foundation of Airflow's configuration system. It extends configparser.ConfigParser with a sophisticated multi-source lookup chain and typed accessors.
Lookup sequence (in priority order):
- Environment variables:
AIRFLOW__SECTION__KEYpattern (double underscores as delimiters) - Config file: Values from
airflow.cfg(INI format) - Command execution: For sensitive values,
key_cmdentries that execute shell commands to retrieve values - Secrets backend: For sensitive values,
key_secretentries that fetch from configured secrets backends (e.g., AWS Secrets Manager, HashiCorp Vault) - Default values: Built-in defaults from
configuration_description(derived fromconfig.yml)
Key capabilities:
- Type coercion methods:
getboolean(),getint(),getfloat(),getlist(),getenum(),getimport(),getjson(),gettimedelta() - Deprecation handling: Maps deprecated option names to new names via
deprecated_optionsanddeprecated_sectionsdictionaries, issuingDeprecationWarningwhen old names are used - Sensitive value masking: Identifies sensitive config values (passwords, secrets) for log redaction
- Template support: Config values can contain format variables that are interpolated at read time
- Validation: Extensible validator system via
validate()method - Section/team support:
getsection()supports team-specific configuration overrides
Usage
The parser is typically accessed through Airflow's conf singleton, but can be instantiated directly for testing or custom configurations.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/configuration/src/airflow_shared/configuration/parser.py - Lines: 1,778
Signature
class AirflowConfigParser(ConfigParser):
"""
Base configuration parser with pure parsing logic.
This class provides the core parsing methods that work with:
- configuration_description: dict describing config options (required in __init__)
- _default_values: ConfigParser with default values (required in __init__)
- deprecated_options: class attribute mapping new -> old options
- deprecated_sections: class attribute mapping new -> old sections
"""
deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {}
deprecated_options: dict[tuple[str, str], tuple[str, str, str]] = {
("dag_processor", "dag_file_processor_timeout"): ("core", "dag_file_processor_timeout", "3.0"),
("api", "base_url"): ("webserver", "base_url", "3.0"),
("api", "host"): ("webserver", "web_server_host", "3.0"),
# ... 30+ deprecated option mappings
}
deprecated_sections: dict[str, tuple[str, str]] = {}
def __init__(
self,
configuration_description: dict[str, dict[str, Any]],
_default_values: ConfigParser,
*args,
**kwargs,
): ...
# Lookup chain
@property
def _lookup_sequence(self) -> list[Callable]:
"""Define the sequence of lookup methods for get()."""
return [
self._get_environment_variables,
self._get_option_from_config_file,
self._get_option_from_commands,
self._get_option_from_secrets,
self._get_option_from_defaults,
]
# Core accessor methods
def get(self, section: str, key: str, **kwargs) -> str | None: ...
def getboolean(self, section: str, key: str, **kwargs) -> bool: ...
def getint(self, section: str, key: str, **kwargs) -> int: ...
def getfloat(self, section: str, key: str, **kwargs) -> float: ...
def getlist(self, section: str, key: str, delimiter=",", **kwargs): ...
def getenum(self, section: str, key: str, enum_class: type[E], **kwargs) -> E: ...
def getimport(self, section: str, key: str, **kwargs) -> Any: ...
def getjson(self, section: str, key: str, **kwargs) -> Any: ...
def gettimedelta(self, section: str, key: str, **kwargs) -> datetime.timedelta | None: ...
# Utility methods
def get_mandatory_value(self, section: str, key: str, **kwargs) -> str: ...
def has_option(self, section: str, option: str, lookup_from_deprecated: bool = True, **kwargs) -> bool: ...
def as_dict(self, ...) -> ConfigSourcesType | dict[str, dict[str, str]]: ...
def getsection(self, section: str, team_name: str | None = None) -> ConfigOptionsDictType | None: ...
def write(self, fp: IO, include_sources: bool = False, include_env: bool = True, ...) -> None: ...
def read_dict(self, dictionary: dict[str, dict[str, Any]], source: str = "dict") -> None: ...
def validate(self) -> None: ...
Import
from airflow_shared.configuration.parser import AirflowConfigParser
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| configuration_description | dict[str, dict[str, Any]] |
Yes | Configuration schema describing all sections, options, defaults, and metadata (from config.yml)
|
| _default_values | ConfigParser |
Yes | Pre-populated ConfigParser with default values for all options |
Config file (airflow.cfg) |
INI file | No | User-provided configuration file in INI format |
| Environment variables | AIRFLOW__SECTION__KEY |
No | Environment variables following the double-underscore naming convention |
| Command values | key_cmd entries |
No | Shell commands that return config values (for sensitive values only) |
| Secrets backend | Backend instance | No | External secrets manager (e.g., AWS Secrets Manager, Vault) configured via [secrets] section
|
Outputs
| Name | Type | Description |
|---|---|---|
| Typed config values | str, bool, int, float, list, Enum, timedelta, Any |
Resolved and type-coerced configuration values |
| Config dict | dict[str, dict[str, str]] |
Full configuration as nested dictionary via as_dict()
|
| Sensitive values set | set[tuple[str, str]] |
Set of (section, key) pairs marked as sensitive for log masking |
Usage Examples
Getting Typed Configuration Values
from airflow.configuration import conf
# Get a string value
executor = conf.get("core", "executor")
# Get a boolean value
load_examples = conf.getboolean("core", "load_examples")
# Get an integer value
parallelism = conf.getint("core", "parallelism")
# Get a float value
dag_file_processor_timeout = conf.getfloat("dag_processor", "dag_file_processor_timeout")
# Get a list value (comma-delimited)
allowed_deserialization_classes = conf.getlist("core", "allowed_deserialization_classes")
# Get a timedelta value
dagbag_import_timeout = conf.gettimedelta("core", "dagbag_import_timeout")
# Get a JSON value
backend_kwargs = conf.getjson("secrets", "backend_kwargs")
# Get an import (dynamically loads a Python class)
executor_class = conf.getimport("core", "executor")
# Get an enum value
from airflow.utils.state import State
default_state = conf.getenum("core", "default_task_retries", enum_class=int)
Environment Variable Override
# Override any config value via environment variable
# Pattern: AIRFLOW__SECTION__KEY (double underscores)
export AIRFLOW__CORE__EXECUTOR=LocalExecutor
export AIRFLOW__CORE__PARALLELISM=32
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://user:pass@host/db
export AIRFLOW__CORE__LOAD_EXAMPLES=False
Exporting Configuration
from airflow.configuration import conf
# Get full config as dictionary (with source tracking)
config_dict = conf.as_dict(display_source=True, display_sensitive=False)
# Write config to file
with open("/tmp/airflow.cfg", "w") as f:
conf.write(f, include_sources=False, include_env=True)