Implementation:Apache Airflow RemoteLogIO Protocol
| Knowledge Sources | |
|---|---|
| Domains | Logging, Remote_Storage |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Defines the protocol interfaces for remote task log handlers in Apache Airflow, providing a standardized contract for uploading, reading, and streaming task logs to and from remote storage backends.
Description
The remote.py module establishes two core protocols and a factory function for remote log handling:
- RemoteLogIO -- The base protocol that all remote log handlers must implement. It declares three members:
processors-- A property returning a tuple ofstructlog.typing.Processorinstances to be installed in the task write path. This allows remote logging providers to transform structured log messages during writing or upload messages as they are generated.upload(path, ti)-- Uploads a local log file at the given path to remote storage for a specific task instance.read(relative_path, ti)-- Reads logs from a remote log path, returning aLogResponsetuple of source info and log messages.
- RemoteLogStreamIO -- Extends
RemoteLogIOand adds thestream()method for stream-based log reading. This protocol is decorated with@runtime_checkableto enableisinstance()checks at runtime.
- discover_remote_log_handler() -- A factory function that discovers and loads a remote log handler from a logging configuration module. It attempts to import a logging config dict and extract the
REMOTE_TASK_LOGandDEFAULT_REMOTE_CONN_IDattributes from the module.
The module also defines several type aliases for log response formats:
LogMessages--list[str], the legacy format before 3.0.4LogSourceInfo--list[str], information about the log fetching processRawLogStream--Generator[str, None, None], raw unparsed log linesLogResponse--tuple[LogSourceInfo, LogMessages | None]StreamingLogResponse--tuple[LogSourceInfo, list[RawLogStream]]
Usage
Remote logging providers (such as S3, GCS, or Azure Blob) implement the RemoteLogIO or RemoteLogStreamIO protocols. The Airflow core uses discover_remote_log_handler() at startup to locate and instantiate the configured remote log handler.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/logging/src/airflow_shared/logging/remote.py(104 lines)
Signature
class RemoteLogIO(Protocol):
"""Interface for remote task loggers."""
@property
def processors(self) -> tuple[structlog.typing.Processor, ...]: ...
def upload(self, path: os.PathLike | str, ti: RuntimeTI) -> None: ...
def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: ...
@runtime_checkable
class RemoteLogStreamIO(RemoteLogIO, Protocol):
"""Interface for remote task loggers with stream-based read support."""
def stream(self, relative_path: str, ti: RuntimeTI) -> StreamingLogResponse: ...
def discover_remote_log_handler(
logging_class_path: str,
fallback_path: str,
import_string: Callable[[str], Any],
) -> tuple[RemoteLogIO | None, str | None]: ...
Import
from airflow_shared.logging.remote import RemoteLogIO, RemoteLogStreamIO, discover_remote_log_handler
I/O Contract
| Component | Input | Output | Side Effects |
|---|---|---|---|
RemoteLogIO.processors |
(none) | tuple[structlog.typing.Processor, ...] |
None |
RemoteLogIO.upload |
str, ti: RuntimeTI |
None |
Uploads log file to remote storage |
RemoteLogIO.read |
relative_path: str, ti: RuntimeTI |
LogResponse (tuple of source info and messages) |
Reads from remote storage |
RemoteLogStreamIO.stream |
relative_path: str, ti: RuntimeTI |
StreamingLogResponse (tuple of source info and list of raw log streams) |
Reads from remote storage in streaming fashion |
discover_remote_log_handler |
logging_class_path: str, fallback_path: str, import_string: Callable |
None, str | None] | May import modules; logs on error |
Usage Examples
Implementing the RemoteLogIO Protocol
import structlog
from airflow_shared.logging.remote import RemoteLogIO, LogResponse
class MyRemoteLogHandler:
"""Custom remote log handler for a hypothetical storage backend."""
@property
def processors(self) -> tuple[structlog.typing.Processor, ...]:
return ()
def upload(self, path, ti):
# Upload file at `path` to remote storage
with open(path) as f:
content = f.read()
my_storage_client.upload(content, key=f"{ti.dag_id}/{ti.task_id}/{ti.run_id}")
def read(self, relative_path, ti) -> LogResponse:
# Read logs from remote storage
source_info = ["Retrieved from MyStorage"]
messages = my_storage_client.download(relative_path)
return (source_info, messages)
Discovering a Remote Log Handler
from airflow_shared.logging.remote import discover_remote_log_handler
from airflow_shared.module_loading import import_string
handler, conn_id = discover_remote_log_handler(
logging_class_path="my_module.LOGGING_CONFIG",
fallback_path="airflow.config_templates.airflow_local_settings.DEFAULT_LOGGING_CONFIG",
import_string=import_string,
)
if handler is not None:
handler.upload("/tmp/task.log", ti)
Checking for Stream Support
from airflow_shared.logging.remote import RemoteLogStreamIO
if isinstance(handler, RemoteLogStreamIO):
source_info, streams = handler.stream("path/to/log", ti)
for stream in streams:
for line in stream:
print(line)