Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow SecretsMasker

From Leeroopedia


Knowledge Sources
Domains Security, Logging
Last Updated 2026-02-08 21:00 GMT

Overview

Secret redaction engine that prevents sensitive values from appearing in Airflow logs by implementing a logging.Filter subclass that recursively replaces secret patterns with *** across strings, dicts, lists, tuples, enums, and Kubernetes V1EnvVar objects.

Description

The SecretsMasker class is a singleton logging.Filter that intercepts log records and redacts any values matching registered secret patterns. Its design supports multiple layers of secret detection:

  • Pattern-based masking: Secrets are registered via add_mask() which compiles them into a combined regex replacer. Any string passing through redact() has matching substrings replaced with ***.
  • Key-based masking: Field names matching entries in DEFAULT_SENSITIVE_FIELDS (e.g., password, api_key, secret, token) trigger full redaction of their associated values regardless of pattern matching.
  • Recursive redaction: The _redact() method walks nested data structures (dicts, lists, tuples, sets, enums, K8s V1EnvVar) up to MAX_RECURSION_DEPTH (5) levels deep.
  • Merge support: The merge() method allows restoring original values from a redacted copy -- if a user-modified redacted value still contains ***, the original unredacted value is restored; otherwise the new value is preserved.

The RedactedIO class wraps sys.stdout as a TextIO implementation that automatically redacts any text written through it, used with contextlib.redirect_stdout().

Key safety features:

  • Secrets shorter than min_length_to_mask (default: 5 characters) are not masked to avoid false positives.
  • The common term "airflow" is excluded from masking via SECRETS_TO_SKIP_MASKING.
  • An ALREADY_FILTERED_FLAG prevents duplicate processing when filters are attached to multiple handlers.
  • Failed redaction attempts return "<redaction-failed>" rather than exposing sensitive data.

Usage

from airflow_shared.secrets_masker.secrets_masker import SecretsMasker, mask_secret, redact

# Register a secret to be masked
mask_secret("my-super-secret-password")

# Redact a string containing the secret
result = redact("Connection string: user:my-super-secret-password@host")
# Result: "Connection string: user:***@host"

# Redact a dict with sensitive keys
data = {"password": "hunter2", "host": "db.example.com"}
result = redact(data)
# Result: {"password": "***", "host": "db.example.com"}

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py

Key Classes

SecretsMasker (lines 184-585)

class SecretsMasker(logging.Filter):
    """Redact secrets from logs."""

    replacer: Pattern | None = None
    patterns: set[str]
    ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
    MAX_RECURSION_DEPTH = 5
    min_length_to_mask = 5

    def __init__(self): ...

    def filter(self, record) -> bool:
        """logging.Filter interface -- redacts all record attributes."""

    def redact(
        self, item: Redactable, name: str | None = None,
        max_depth: int | None = None, replacement: str = "***",
    ) -> Redacted:
        """Redact any secrets found in item, if it is a string."""

    def merge(
        self, new_item: Redacted, old_item: Redactable,
        name: str | None = None, max_depth: int | None = None,
        replacement: str = "***",
    ) -> Redacted:
        """Merge a redacted item with its original unredacted counterpart."""

    def add_mask(self, secret: JsonValue, name: str | None = None):
        """Add a new secret to be masked to this filter instance."""

    def should_hide_value_for_key(self, name) -> bool:
        """Return if the value for this given name should be hidden."""

    def reset_masker(self):
        """Reset the patterns and the replacer in the masker instance."""

    @classmethod
    def enable_log_masking(cls) -> None: ...

    @classmethod
    def disable_log_masking(cls) -> None: ...

    @classmethod
    def is_log_masking_enabled(cls) -> bool: ...

RedactedIO (lines 588-658)

class RedactedIO(TextIO):
    """
    IO class that redacts values going into stdout.

    Expected usage::
        with contextlib.redirect_stdout(RedactedIO()):
            ...  # Writes to stdout will be redacted.
    """

    def __init__(self): ...
    def write(self, s: str) -> int:
        """Redacts the string before writing to the underlying stdout."""

Key Module-Level Functions

def mask_secret(secret: JsonValue, name: str | None = None) -> None:
    """Mask a secret from appearing in the logs."""

def redact(
    value: Redactable, name: str | None = None,
    max_depth: int | None = None, replacement: str = "***",
) -> Redacted:
    """Redact any secrets found in value with the given replacement."""

def merge(
    new_value: Redacted, old_value: Redactable,
    name: str | None = None, max_depth: int | None = None,
) -> Redacted:
    """Merge a redacted value with its original unredacted counterpart."""

def should_hide_value_for_key(name) -> bool:
    """Return if the value for this given name should be hidden."""

def reset_secrets_masker() -> None:
    """Reset the secrets masker to clear existing patterns and replacer."""

Constants

DEFAULT_SENSITIVE_FIELDS = frozenset({
    "access_key", "access_token", "api_key", "apikey", "authorization",
    "connection_string", "passphrase", "passwd", "password", "private_key",
    "proxy", "proxy_password", "proxies", "secret", "token",
    "keyfile_dict", "service_account",
})

SECRETS_TO_SKIP_MASKING = {"airflow"}

Import

from airflow_shared.secrets_masker.secrets_masker import SecretsMasker, mask_secret, redact
from airflow_shared.secrets_masker.secrets_masker import (
    merge, should_hide_value_for_key, reset_secrets_masker, RedactedIO,
    DEFAULT_SENSITIVE_FIELDS,
)

I/O Contract

Inputs

Name Type Required Description
item / value Redactable (str, dict, list, tuple, V1EnvVar) Yes The value to redact
name str or None No Field name; if it matches a sensitive field, the entire value is redacted
max_depth int or None No Maximum recursion depth for nested structures (default: 5)
replacement str No Replacement string (default: "***")
secret JsonValue Yes (for add_mask) The secret value to register for pattern-based masking

Outputs

Name Type Description
Redacted value Redacted (str, dict, list, tuple) The input with all detected secrets replaced by ***
Merged value Redacted Result of merging user-modified redacted values with originals
Filter result bool Always True (the record is never suppressed, only redacted)

Usage Examples

Registering and Redacting Secrets

from airflow_shared.secrets_masker.secrets_masker import mask_secret, redact

# Register secrets from a connection
mask_secret("s3cr3t-p4ssw0rd")
mask_secret({"password": "db-pass-123", "api_key": "key-abc-xyz"})

# Redact a log message
msg = "Connecting with password s3cr3t-p4ssw0rd to host"
print(redact(msg))
# Output: "Connecting with password *** to host"

Key-Based Redaction

from airflow_shared.secrets_masker.secrets_masker import redact

# Keys matching DEFAULT_SENSITIVE_FIELDS trigger full value redaction
config = {
    "host": "db.example.com",
    "password": "hunter2",
    "api_key": "sk-live-abc123",
    "port": 5432,
}
result = redact(config)
# Result: {"host": "db.example.com", "password": "***", "api_key": "***", "port": 5432}

Redirecting Stdout with Redaction

import contextlib
from airflow_shared.secrets_masker.secrets_masker import RedactedIO, mask_secret

mask_secret("sensitive-token-value")

with contextlib.redirect_stdout(RedactedIO()):
    print("Using token: sensitive-token-value")
    # Actually prints: "Using token: ***"

Merging Redacted Values

from airflow_shared.secrets_masker.secrets_masker import redact, merge

original = {"password": "hunter2", "host": "db.example.com", "port": 5432}
redacted = redact(original)
# redacted = {"password": "***", "host": "db.example.com", "port": 5432}

# User modifies the host but leaves password as ***
user_modified = {"password": "***", "host": "new-db.example.com", "port": 5432}
merged = merge(user_modified, original)
# merged = {"password": "hunter2", "host": "new-db.example.com", "port": 5432}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment