Implementation:Astronomer Astronomer cosmos Telemetry
| Knowledge Sources | |
|---|---|
| Domains | Telemetry, Analytics |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
The Telemetry module is the central hub for collecting and emitting anonymous usage metrics from Cosmos to a remote telemetry endpoint.
Description
This module provides the core telemetry pipeline for Cosmos. It checks whether telemetry is enabled, gathers standard environment metrics, merges them with event-specific data, and sends the combined payload to a remote endpoint via HTTP GET request.
The module exposes the following functions:
- should_emit evaluates three settings --
enable_telemetry,do_not_track, andno_analytics-- to determine whether telemetry collection is active. All three must be in the permissive state for telemetry to emit. - collect_standard_usage_metrics assembles a dictionary of standard environment attributes: Cosmos version, Airflow version (URL-encoded), Python version, platform system, and platform machine architecture.
- emit_usage_metrics takes a fully composed metrics dictionary, extracts the
event_typefield, constructs a URL usingconstants.TELEMETRY_URLwith the remaining fields as query parameters, and performs an HTTP GET request usinghttpxwith a configurable timeout. Returns a boolean indicating success. - emit_usage_metrics_if_enabled is the primary entry point used by listeners and other Cosmos components. It checks
should_emit(), collects standard metrics, merges with the providedadditional_metrics, sets theevent_type, and callsemit_usage_metrics. - _compress_telemetry_metadata compresses a metadata dictionary using zlib (level 9) and encodes it as a base64 ASCII string, used to store telemetry metadata in DAG params while minimizing serialized DAG size.
- _decompress_telemetry_metadata reverses the compression, decoding from base64 and decompressing via zlib back to the original dictionary.
Usage
Use emit_usage_metrics_if_enabled when adding telemetry to new Cosmos features. This is the recommended entry point as it handles all checks, standard metric collection, and error handling. The compression/decompression helpers are used internally by the DAG converter and the DAG run listener to safely transport metadata through Airflow's serialization layer.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/telemetry.py
Signature
def should_emit() -> bool:
...
def collect_standard_usage_metrics() -> dict[str, object]:
...
def emit_usage_metrics(metrics: dict[str, object]) -> bool:
...
def emit_usage_metrics_if_enabled(
event_type: str,
additional_metrics: dict[str, object],
) -> bool:
...
def _compress_telemetry_metadata(metadata: dict[str, Any]) -> str:
...
def _decompress_telemetry_metadata(compressed_data: str) -> dict[str, Any]:
...
Import
from cosmos.telemetry import emit_usage_metrics_if_enabled
from cosmos.telemetry import should_emit, collect_standard_usage_metrics
from cosmos.telemetry import _compress_telemetry_metadata, _decompress_telemetry_metadata
I/O Contract
Inputs
emit_usage_metrics_if_enabled
| Name | Type | Required | Description |
|---|---|---|---|
| event_type | str |
Yes | The type of telemetry event (e.g., "dag_run", "task_instance")
|
| additional_metrics | dict[str, object] |
Yes | Event-specific metrics to merge with standard environment metrics |
emit_usage_metrics
| Name | Type | Required | Description |
|---|---|---|---|
| metrics | dict[str, object] |
Yes | Complete metrics dictionary including event_type key and all fields to transmit
|
_compress_telemetry_metadata
| Name | Type | Required | Description |
|---|---|---|---|
| metadata | dict[str, Any] |
Yes | The telemetry metadata dictionary to compress |
_decompress_telemetry_metadata
| Name | Type | Required | Description |
|---|---|---|---|
| compressed_data | str |
Yes | Base64-encoded zlib-compressed JSON string to decompress |
Outputs
| Name | Type | Description |
|---|---|---|
| should_emit | bool |
True if telemetry is enabled and tracking is not suppressed |
| collect_standard_usage_metrics | dict[str, object] |
Dictionary with cosmos_version, airflow_version, python_version, platform_system, platform_machine |
| emit_usage_metrics | bool |
True if the HTTP request to the telemetry endpoint succeeded |
| emit_usage_metrics_if_enabled | bool |
True if telemetry was emitted successfully; False if disabled or failed |
| _compress_telemetry_metadata | str |
Base64-encoded zlib-compressed JSON string |
| _decompress_telemetry_metadata | dict[str, Any] |
The original metadata dictionary |
Usage Examples
from cosmos.telemetry import emit_usage_metrics_if_enabled
# Emit a custom telemetry event with additional metrics
additional_metrics = {
"dag_hash": "abc123def456",
"status": "success",
"task_count": 10,
"cosmos_task_count": 7,
"execution_modes": "local",
}
was_emitted = emit_usage_metrics_if_enabled("dag_run", additional_metrics)
# Check if telemetry is enabled before performing expensive metric collection
from cosmos.telemetry import should_emit
if should_emit():
expensive_metrics = gather_detailed_metrics()
emit_usage_metrics_if_enabled("custom_event", expensive_metrics)