Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos MemoryTracker

From Leeroopedia


Knowledge Sources
Domains Debugging, Memory_Tracking
Last Updated 2026-02-07 17:00 GMT

Overview

The MemoryTracker class and its companion functions start_memory_tracking and stop_memory_tracking provide a debug utility that monitors peak memory usage during Airflow task execution and publishes the result via XCom.

Description

This 122-line module implements a background memory monitoring system designed for diagnosing memory-related issues in dbt task execution within Airflow.

The MemoryTracker class accepts a process ID (pid) and a poll_interval that controls how frequently memory is sampled. The start method launches a background thread that calls _run in a loop, reading the resident set size (RSS) of the target process at each interval and tracking the maximum observed value. The stop method signals the background thread to terminate and returns the peak memory consumption in megabytes.

start_memory_tracking is an Airflow callback function intended to be attached to a task's pre_execute hook or invoked at the beginning of an operator's execute method. It creates a MemoryTracker instance targeting the current process, starts the monitoring thread, and stashes the tracker object in the Airflow task context so it can be retrieved later.

stop_memory_tracking is the complementary callback for post_execute or the end of execute. It retrieves the MemoryTracker from the context, stops the monitoring thread, and pushes the max_memory_mb value to XCom. This value can then be inspected in the Airflow UI, queried via the Airflow API, or consumed by downstream tasks for alerting.

The module is gated behind the debug mode feature flag introduced in the repository, ensuring zero overhead in production unless explicitly enabled.

Usage

Use the memory tracking functions when diagnosing memory spikes, out-of-memory errors, or unexpected memory growth in dbt tasks. Enable debug mode in the Cosmos configuration to activate tracking automatically across all Cosmos operators, or attach start_memory_tracking and stop_memory_tracking manually to specific tasks for targeted investigation.

Code Reference

Source Location

Signature

class MemoryTracker:
    def __init__(self, pid: int, poll_interval: float = 0.1) -> None:
        ...

    def start(self) -> None:
        ...

    def stop(self) -> float:
        ...

    def _run(self) -> None:
        ...


def start_memory_tracking(context: dict) -> None:
    ...

def stop_memory_tracking(context: dict) -> None:
    ...

Import

from cosmos.debug import start_memory_tracking
from cosmos.debug import stop_memory_tracking
from cosmos.debug import MemoryTracker

I/O Contract

Inputs

Name Type Required Description
pid int Yes (MemoryTracker) Process ID to monitor for memory usage
poll_interval float No (MemoryTracker) Interval in seconds between memory samples (defaults to 0.1)
context dict Yes (start/stop_memory_tracking) The Airflow task instance context dictionary, used to stash and retrieve the MemoryTracker instance

Outputs

Name Type Description
max_memory_mb float (MemoryTracker.stop) Peak memory usage in megabytes observed during the monitoring period
XCom: max_memory_mb float (stop_memory_tracking) The peak memory value pushed to Airflow XCom for downstream consumption or UI inspection

Usage Examples

import os
from cosmos.debug import MemoryTracker

# Direct usage of MemoryTracker
tracker = MemoryTracker(pid=os.getpid(), poll_interval=0.5)
tracker.start()

# ... perform memory-intensive work ...

peak_mb = tracker.stop()
print(f"Peak memory usage: {peak_mb:.2f} MB")
from cosmos.debug import start_memory_tracking, stop_memory_tracking

# Usage as Airflow task callbacks
class MyCustomOperator(BaseOperator):
    def execute(self, context):
        start_memory_tracking(context)
        try:
            # ... execute dbt command ...
            pass
        finally:
            stop_memory_tracking(context)
            # max_memory_mb is now available in XCom
# Querying memory tracking results from XCom
from airflow.models import XCom

max_mem = XCom.get_value(
    task_id="run_stg_orders",
    dag_id="dbt_pipeline",
    key="max_memory_mb",
)
print(f"Task used {max_mem:.2f} MB peak memory")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment