Implementation:Apache Airflow SecretsMasker
| Knowledge Sources | |
|---|---|
| Domains | Security, Logging |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Secret redaction engine that prevents sensitive values from appearing in Airflow logs by implementing a logging.Filter subclass that recursively replaces secret patterns with *** across strings, dicts, lists, tuples, enums, and Kubernetes V1EnvVar objects.
Description
The SecretsMasker class is a singleton logging.Filter that intercepts log records and redacts any values matching registered secret patterns. Its design supports multiple layers of secret detection:
- Pattern-based masking: Secrets are registered via
add_mask()which compiles them into a combined regex replacer. Any string passing throughredact()has matching substrings replaced with***. - Key-based masking: Field names matching entries in
DEFAULT_SENSITIVE_FIELDS(e.g.,password,api_key,secret,token) trigger full redaction of their associated values regardless of pattern matching. - Recursive redaction: The
_redact()method walks nested data structures (dicts, lists, tuples, sets, enums, K8sV1EnvVar) up toMAX_RECURSION_DEPTH(5) levels deep. - Merge support: The
merge()method allows restoring original values from a redacted copy -- if a user-modified redacted value still contains***, the original unredacted value is restored; otherwise the new value is preserved.
The RedactedIO class wraps sys.stdout as a TextIO implementation that automatically redacts any text written through it, used with contextlib.redirect_stdout().
Key safety features:
- Secrets shorter than
min_length_to_mask(default: 5 characters) are not masked to avoid false positives. - The common term
"airflow"is excluded from masking viaSECRETS_TO_SKIP_MASKING. - An
ALREADY_FILTERED_FLAGprevents duplicate processing when filters are attached to multiple handlers. - Failed redaction attempts return
"<redaction-failed>"rather than exposing sensitive data.
Usage
from airflow_shared.secrets_masker.secrets_masker import SecretsMasker, mask_secret, redact
# Register a secret to be masked
mask_secret("my-super-secret-password")
# Redact a string containing the secret
result = redact("Connection string: user:my-super-secret-password@host")
# Result: "Connection string: user:***@host"
# Redact a dict with sensitive keys
data = {"password": "hunter2", "host": "db.example.com"}
result = redact(data)
# Result: {"password": "***", "host": "db.example.com"}
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py
Key Classes
SecretsMasker (lines 184-585)
class SecretsMasker(logging.Filter):
"""Redact secrets from logs."""
replacer: Pattern | None = None
patterns: set[str]
ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
MAX_RECURSION_DEPTH = 5
min_length_to_mask = 5
def __init__(self): ...
def filter(self, record) -> bool:
"""logging.Filter interface -- redacts all record attributes."""
def redact(
self, item: Redactable, name: str | None = None,
max_depth: int | None = None, replacement: str = "***",
) -> Redacted:
"""Redact any secrets found in item, if it is a string."""
def merge(
self, new_item: Redacted, old_item: Redactable,
name: str | None = None, max_depth: int | None = None,
replacement: str = "***",
) -> Redacted:
"""Merge a redacted item with its original unredacted counterpart."""
def add_mask(self, secret: JsonValue, name: str | None = None):
"""Add a new secret to be masked to this filter instance."""
def should_hide_value_for_key(self, name) -> bool:
"""Return if the value for this given name should be hidden."""
def reset_masker(self):
"""Reset the patterns and the replacer in the masker instance."""
@classmethod
def enable_log_masking(cls) -> None: ...
@classmethod
def disable_log_masking(cls) -> None: ...
@classmethod
def is_log_masking_enabled(cls) -> bool: ...
RedactedIO (lines 588-658)
class RedactedIO(TextIO):
"""
IO class that redacts values going into stdout.
Expected usage::
with contextlib.redirect_stdout(RedactedIO()):
... # Writes to stdout will be redacted.
"""
def __init__(self): ...
def write(self, s: str) -> int:
"""Redacts the string before writing to the underlying stdout."""
Key Module-Level Functions
def mask_secret(secret: JsonValue, name: str | None = None) -> None:
"""Mask a secret from appearing in the logs."""
def redact(
value: Redactable, name: str | None = None,
max_depth: int | None = None, replacement: str = "***",
) -> Redacted:
"""Redact any secrets found in value with the given replacement."""
def merge(
new_value: Redacted, old_value: Redactable,
name: str | None = None, max_depth: int | None = None,
) -> Redacted:
"""Merge a redacted value with its original unredacted counterpart."""
def should_hide_value_for_key(name) -> bool:
"""Return if the value for this given name should be hidden."""
def reset_secrets_masker() -> None:
"""Reset the secrets masker to clear existing patterns and replacer."""
Constants
DEFAULT_SENSITIVE_FIELDS = frozenset({
"access_key", "access_token", "api_key", "apikey", "authorization",
"connection_string", "passphrase", "passwd", "password", "private_key",
"proxy", "proxy_password", "proxies", "secret", "token",
"keyfile_dict", "service_account",
})
SECRETS_TO_SKIP_MASKING = {"airflow"}
Import
from airflow_shared.secrets_masker.secrets_masker import SecretsMasker, mask_secret, redact
from airflow_shared.secrets_masker.secrets_masker import (
merge, should_hide_value_for_key, reset_secrets_masker, RedactedIO,
DEFAULT_SENSITIVE_FIELDS,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| item / value | Redactable (str, dict, list, tuple, V1EnvVar) | Yes | The value to redact |
| name | str or None | No | Field name; if it matches a sensitive field, the entire value is redacted |
| max_depth | int or None | No | Maximum recursion depth for nested structures (default: 5) |
| replacement | str | No | Replacement string (default: "***")
|
| secret | JsonValue | Yes (for add_mask) | The secret value to register for pattern-based masking |
Outputs
| Name | Type | Description |
|---|---|---|
| Redacted value | Redacted (str, dict, list, tuple) | The input with all detected secrets replaced by ***
|
| Merged value | Redacted | Result of merging user-modified redacted values with originals |
| Filter result | bool | Always True (the record is never suppressed, only redacted)
|
Usage Examples
Registering and Redacting Secrets
from airflow_shared.secrets_masker.secrets_masker import mask_secret, redact
# Register secrets from a connection
mask_secret("s3cr3t-p4ssw0rd")
mask_secret({"password": "db-pass-123", "api_key": "key-abc-xyz"})
# Redact a log message
msg = "Connecting with password s3cr3t-p4ssw0rd to host"
print(redact(msg))
# Output: "Connecting with password *** to host"
Key-Based Redaction
from airflow_shared.secrets_masker.secrets_masker import redact
# Keys matching DEFAULT_SENSITIVE_FIELDS trigger full value redaction
config = {
"host": "db.example.com",
"password": "hunter2",
"api_key": "sk-live-abc123",
"port": 5432,
}
result = redact(config)
# Result: {"host": "db.example.com", "password": "***", "api_key": "***", "port": 5432}
Redirecting Stdout with Redaction
import contextlib
from airflow_shared.secrets_masker.secrets_masker import RedactedIO, mask_secret
mask_secret("sensitive-token-value")
with contextlib.redirect_stdout(RedactedIO()):
print("Using token: sensitive-token-value")
# Actually prints: "Using token: ***"
Merging Redacted Values
from airflow_shared.secrets_masker.secrets_masker import redact, merge
original = {"password": "hunter2", "host": "db.example.com", "port": 5432}
redacted = redact(original)
# redacted = {"password": "***", "host": "db.example.com", "port": 5432}
# User modifies the host but leaves password as ***
user_modified = {"password": "***", "host": "new-db.example.com", "port": 5432}
merged = merge(user_modified, original)
# merged = {"password": "hunter2", "host": "new-db.example.com", "port": 5432}