Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Astronomer Astronomer cosmos Telemetry

From Leeroopedia


Knowledge Sources
Domains Telemetry, Analytics
Last Updated 2026-02-07 17:00 GMT

Overview

The Telemetry module is the central hub for collecting and emitting anonymous usage metrics from Cosmos to a remote telemetry endpoint.

Description

This module provides the core telemetry pipeline for Cosmos. It checks whether telemetry is enabled, gathers standard environment metrics, merges them with event-specific data, and sends the combined payload to a remote endpoint via HTTP GET request.

The module exposes the following functions:

  • should_emit evaluates three settings -- enable_telemetry, do_not_track, and no_analytics -- to determine whether telemetry collection is active. All three must be in the permissive state for telemetry to emit.
  • collect_standard_usage_metrics assembles a dictionary of standard environment attributes: Cosmos version, Airflow version (URL-encoded), Python version, platform system, and platform machine architecture.
  • emit_usage_metrics takes a fully composed metrics dictionary, extracts the event_type field, constructs a URL using constants.TELEMETRY_URL with the remaining fields as query parameters, and performs an HTTP GET request using httpx with a configurable timeout. Returns a boolean indicating success.
  • emit_usage_metrics_if_enabled is the primary entry point used by listeners and other Cosmos components. It checks should_emit(), collects standard metrics, merges with the provided additional_metrics, sets the event_type, and calls emit_usage_metrics.
  • _compress_telemetry_metadata compresses a metadata dictionary using zlib (level 9) and encodes it as a base64 ASCII string, used to store telemetry metadata in DAG params while minimizing serialized DAG size.
  • _decompress_telemetry_metadata reverses the compression, decoding from base64 and decompressing via zlib back to the original dictionary.

Usage

Use emit_usage_metrics_if_enabled when adding telemetry to new Cosmos features. This is the recommended entry point as it handles all checks, standard metric collection, and error handling. The compression/decompression helpers are used internally by the DAG converter and the DAG run listener to safely transport metadata through Airflow's serialization layer.

Code Reference

Source Location

Signature

def should_emit() -> bool:
    ...

def collect_standard_usage_metrics() -> dict[str, object]:
    ...

def emit_usage_metrics(metrics: dict[str, object]) -> bool:
    ...

def emit_usage_metrics_if_enabled(
    event_type: str,
    additional_metrics: dict[str, object],
) -> bool:
    ...

def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str:
    ...

def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]:
    ...

Import

from cosmos.telemetry import emit_usage_metrics_if_enabled
from cosmos.telemetry import should_emit, collect_standard_usage_metrics
from cosmos.telemetry import _compress_telemetry_metadata, _decompress_telemetry_metadata

I/O Contract

Inputs

emit_usage_metrics_if_enabled

Name Type Required Description
event_type str Yes The type of telemetry event (e.g., "dag_run", "task_instance")
additional_metrics dict[str, object] Yes Event-specific metrics to merge with standard environment metrics

emit_usage_metrics

Name Type Required Description
metrics dict[str, object] Yes Complete metrics dictionary including event_type key and all fields to transmit

_compress_telemetry_metadata

Name Type Required Description
metadata dict[str, Any] Yes The telemetry metadata dictionary to compress

_decompress_telemetry_metadata

Name Type Required Description
compressed_data str Yes Base64-encoded zlib-compressed JSON string to decompress

Outputs

Name Type Description
should_emit bool True if telemetry is enabled and tracking is not suppressed
collect_standard_usage_metrics dict[str, object] Dictionary with cosmos_version, airflow_version, python_version, platform_system, platform_machine
emit_usage_metrics bool True if the HTTP request to the telemetry endpoint succeeded
emit_usage_metrics_if_enabled bool True if telemetry was emitted successfully; False if disabled or failed
_compress_telemetry_metadata str Base64-encoded zlib-compressed JSON string
_decompress_telemetry_metadata dict[str, Any] The original metadata dictionary

Usage Examples

from cosmos.telemetry import emit_usage_metrics_if_enabled

# Emit a custom telemetry event with additional metrics
additional_metrics = {
    "dag_hash": "abc123def456",
    "status": "success",
    "task_count": 10,
    "cosmos_task_count": 7,
    "execution_modes": "local",
}
was_emitted = emit_usage_metrics_if_enabled("dag_run", additional_metrics)

# Check if telemetry is enabled before performing expensive metric collection
from cosmos.telemetry import should_emit
if should_emit():
    expensive_metrics = gather_detailed_metrics()
    emit_usage_metrics_if_enabled("custom_event", expensive_metrics)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment