Implementation:Astronomer Astronomer cosmos MemoryTracker
| Knowledge Sources | |
|---|---|
| Domains | Debugging, Memory_Tracking |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The MemoryTracker class and its companion functions start_memory_tracking and stop_memory_tracking provide a debug utility that monitors peak memory usage during Airflow task execution and publishes the result via XCom.
Description
This 122-line module implements a background memory monitoring system designed for diagnosing memory-related issues in dbt task execution within Airflow.
The MemoryTracker class accepts a process ID (pid) and a poll_interval that controls how frequently memory is sampled. The start method launches a background thread that calls _run in a loop, reading the resident set size (RSS) of the target process at each interval and tracking the maximum observed value. The stop method signals the background thread to terminate and returns the peak memory consumption in megabytes.
start_memory_tracking is an Airflow callback function intended to be attached to a task's pre_execute hook or invoked at the beginning of an operator's execute method. It creates a MemoryTracker instance targeting the current process, starts the monitoring thread, and stashes the tracker object in the Airflow task context so it can be retrieved later.
stop_memory_tracking is the complementary callback for post_execute or the end of execute. It retrieves the MemoryTracker from the context, stops the monitoring thread, and pushes the max_memory_mb value to XCom. This value can then be inspected in the Airflow UI, queried via the Airflow API, or consumed by downstream tasks for alerting.
The module is gated behind the debug mode feature flag introduced in the repository, ensuring zero overhead in production unless explicitly enabled.
Usage
Use the memory tracking functions when diagnosing memory spikes, out-of-memory errors, or unexpected memory growth in dbt tasks. Enable debug mode in the Cosmos configuration to activate tracking automatically across all Cosmos operators, or attach start_memory_tracking and stop_memory_tracking manually to specific tasks for targeted investigation.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/debug.py
- Lines: Full module (122 lines)
Signature
class MemoryTracker:
def __init__(self, pid: int, poll_interval: float = 0.1) -> None:
...
def start(self) -> None:
...
def stop(self) -> float:
...
def _run(self) -> None:
...
def start_memory_tracking(context: dict) -> None:
...
def stop_memory_tracking(context: dict) -> None:
...
Import
from cosmos.debug import start_memory_tracking
from cosmos.debug import stop_memory_tracking
from cosmos.debug import MemoryTracker
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| pid | int | Yes | (MemoryTracker) Process ID to monitor for memory usage |
| poll_interval | float | No | (MemoryTracker) Interval in seconds between memory samples (defaults to 0.1) |
| context | dict | Yes | (start/stop_memory_tracking) The Airflow task instance context dictionary, used to stash and retrieve the MemoryTracker instance |
Outputs
| Name | Type | Description |
|---|---|---|
| max_memory_mb | float | (MemoryTracker.stop) Peak memory usage in megabytes observed during the monitoring period |
| XCom: max_memory_mb | float | (stop_memory_tracking) The peak memory value pushed to Airflow XCom for downstream consumption or UI inspection |
Usage Examples
import os
from cosmos.debug import MemoryTracker
# Direct usage of MemoryTracker
tracker = MemoryTracker(pid=os.getpid(), poll_interval=0.5)
tracker.start()
# ... perform memory-intensive work ...
peak_mb = tracker.stop()
print(f"Peak memory usage: {peak_mb:.2f} MB")
from cosmos.debug import start_memory_tracking, stop_memory_tracking
# Usage as Airflow task callbacks
class MyCustomOperator(BaseOperator):
def execute(self, context):
start_memory_tracking(context)
try:
# ... execute dbt command ...
pass
finally:
stop_memory_tracking(context)
# max_memory_mb is now available in XCom
# Querying memory tracking results from XCom
from airflow.models import XCom
max_mem = XCom.get_value(
task_id="run_stg_orders",
dag_id="dbt_pipeline",
key="max_memory_mb",
)
print(f"Task used {max_mem:.2f} MB peak memory")
Related Pages
- Environment:Astronomer_Astronomer_cosmos_Python_Airflow_Runtime
- Astronomer_Astronomer_cosmos_DbtRunner_Wrapper -- dbt execution that may be monitored by the MemoryTracker
- Astronomer_Astronomer_cosmos_FullOutputSubprocessHook -- subprocess execution that may be monitored by the MemoryTracker