Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow RemoteLogIO Protocol

From Leeroopedia


Knowledge Sources
Domains Logging, Remote_Storage
Last Updated 2026-02-08 21:00 GMT

Overview

Defines the protocol interfaces for remote task log handlers in Apache Airflow, providing a standardized contract for uploading, reading, and streaming task logs to and from remote storage backends.

Description

The remote.py module establishes two core protocols and a factory function for remote log handling:

  • RemoteLogIO -- The base protocol that all remote log handlers must implement. It declares three members:
    • processors -- A property returning a tuple of structlog.typing.Processor instances to be installed in the task write path. This allows remote logging providers to transform structured log messages during writing or upload messages as they are generated.
    • upload(path, ti) -- Uploads a local log file at the given path to remote storage for a specific task instance.
    • read(relative_path, ti) -- Reads logs from a remote log path, returning a LogResponse tuple of source info and log messages.
  • RemoteLogStreamIO -- Extends RemoteLogIO and adds the stream() method for stream-based log reading. This protocol is decorated with @runtime_checkable to enable isinstance() checks at runtime.
  • discover_remote_log_handler() -- A factory function that discovers and loads a remote log handler from a logging configuration module. It attempts to import a logging config dict and extract the REMOTE_TASK_LOG and DEFAULT_REMOTE_CONN_ID attributes from the module.

The module also defines several type aliases for log response formats:

  • LogMessages -- list[str], the legacy format before 3.0.4
  • LogSourceInfo -- list[str], information about the log fetching process
  • RawLogStream -- Generator[str, None, None], raw unparsed log lines
  • LogResponse -- tuple[LogSourceInfo, LogMessages | None]
  • StreamingLogResponse -- tuple[LogSourceInfo, list[RawLogStream]]

Usage

Remote logging providers (such as S3, GCS, or Azure Blob) implement the RemoteLogIO or RemoteLogStreamIO protocols. The Airflow core uses discover_remote_log_handler() at startup to locate and instantiate the configured remote log handler.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/logging/src/airflow_shared/logging/remote.py (104 lines)

Signature

class RemoteLogIO(Protocol):
    """Interface for remote task loggers."""

    @property
    def processors(self) -> tuple[structlog.typing.Processor, ...]: ...

    def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: ...

    def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: ...


@runtime_checkable
class RemoteLogStreamIO(RemoteLogIO, Protocol):
    """Interface for remote task loggers with stream-based read support."""

    def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: ...


def discover_remote_log_handler(
    logging_class_path: str,
    fallback_path: str,
    import_string: Callable[[str], Any],
) -> tuple[RemoteLogIO | None, str | None]: ...

Import

from airflow_shared.logging.remote import RemoteLogIO, RemoteLogStreamIO, discover_remote_log_handler

I/O Contract

Component Input Output Side Effects
RemoteLogIO.processors (none) tuple[structlog.typing.Processor, ...] None
RemoteLogIO.upload str, ti: RuntimeTI None Uploads log file to remote storage
RemoteLogIO.read relative_path: str, ti: RuntimeTI LogResponse (tuple of source info and messages) Reads from remote storage
RemoteLogStreamIO.stream relative_path: str, ti: RuntimeTI StreamingLogResponse (tuple of source info and list of raw log streams) Reads from remote storage in streaming fashion
discover_remote_log_handler logging_class_path: str, fallback_path: str, import_string: Callable None, str | None] May import modules; logs on error

Usage Examples

Implementing the RemoteLogIO Protocol

import structlog
from airflow_shared.logging.remote import RemoteLogIO, LogResponse

class MyRemoteLogHandler:
    """Custom remote log handler for a hypothetical storage backend."""

    @property
    def processors(self) -> tuple[structlog.typing.Processor, ...]:
        return ()

    def upload(self, path, ti):
        # Upload file at `path` to remote storage
        with open(path) as f:
            content = f.read()
        my_storage_client.upload(content, key=f"{ti.dag_id}/{ti.task_id}/{ti.run_id}")

    def read(self, relative_path, ti) -> LogResponse:
        # Read logs from remote storage
        source_info = ["Retrieved from MyStorage"]
        messages = my_storage_client.download(relative_path)
        return (source_info, messages)

Discovering a Remote Log Handler

from airflow_shared.logging.remote import discover_remote_log_handler
from airflow_shared.module_loading import import_string

handler, conn_id = discover_remote_log_handler(
    logging_class_path="my_module.LOGGING_CONFIG",
    fallback_path="airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG",
    import_string=import_string,
)
if handler is not None:
    handler.upload("/tmp/task.log", ti)

Checking for Stream Support

from airflow_shared.logging.remote import RemoteLogStreamIO

if isinstance(handler, RemoteLogStreamIO):
    source_info, streams = handler.stream("path/to/log", ti)
    for stream in streams:
        for line in stream:
            print(line)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment