Implementation:Mage ai Mage ai Datadog Streams
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Datadog, Stream |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Stream class definitions for the Mage Datadog source connector, providing a base stream class and concrete implementations for Dashboards, Downtimes, Synthetic Tests, Audit Logs, Logs, Metrics, Incidents, Incident Teams, and Users.
Description
This module defines a DatadogStream base class and multiple concrete stream subclasses organized into a class hierarchy. The base class provides common data loading logic with bookmark-based time windowing, pagination support, and record transformation. The hierarchy includes:
- DatadogStream - Base class with GET-based data loading, time-window params, and pagination hooks.
- IncrementalSearchableStream - Extends base for POST-based search endpoints (Audit Logs, Logs) with cursor-based pagination and configurable query payloads.
- BasedListStream - Extends base for simple list endpoints (Metrics) that return data under a
datakey. - PaginatedBasedListStream - Extends BasedListStream with offset-based pagination (Incidents, Incident Teams).
- Users - Extends PaginatedBasedListStream with page-number-based pagination.
Each concrete stream defines TABLE, ENTITY, API_METHOD, SCHEMA, URL_PATH, and optionally parse_response_root.
Usage
Stream classes are instantiated by the Datadog source connector with config, state, catalog, client, and logger. The load_data method yields batches of records.
Code Reference
Source Location
- Repository: mage-ai
- File:
mage_integrations/mage_integrations/sources/datadog/streams/__init__.py - Lines: 1-276
Signature
class DatadogStream:
primary_key: Optional[str] = None
parse_response_root: Optional[str] = None
BASE_PATH = 'https://api.datadoghq.com/api/'
URL_PATH = ''
def __init__(self, config, state, catalog, client: DatadogClient, logger):
Import
from mage_integrations.sources.datadog.streams import (
DatadogStream, Dashboards, Downtimes, SyntheticTests,
AuditLogs, Logs, Metrics, Incidents, IncidentTeams, Users,
)
Stream Classes
| Stream Class | TABLE | API Method | URL Path | Replication |
|---|---|---|---|---|
Dashboards |
dashboards | GET | v1/dashboard | Full table |
Downtimes |
downtimes | GET | v1/downtime | Full table |
SyntheticTests |
synthetic_tests | GET | v1/synthetics/tests | Full table |
AuditLogs |
audit_logs | POST | v2/audit/events/search | Incremental (cursor) |
Logs |
logs | POST | v2/logs/events/search | Incremental (cursor) |
Metrics |
metrics | GET | v2/metrics | List-based |
Incidents |
incidents | GET | v2/incidents | Paginated offset |
IncidentTeams |
incident_teams | GET | v2/teams | Paginated offset |
Users |
users | GET | v2/users | Paginated page number |
Key Methods
| Method | Class | Description |
|---|---|---|
load_data(bookmarks, bookmark_properties, to_date) |
DatadogStream | Generator fetching records with time-window params and pagination. |
make_request(params) |
DatadogStream | Delegates to client's make_request with URL and API method. |
next_page_token(response) |
Various | Returns next page token or empty dict. Overridden per pagination strategy. |
parse_response(response) |
DatadogStream | Extracts records from response using parse_response_root and applies transformation.
|
get_payload(cursor, **kwargs) |
IncrementalSearchableStream | Builds POST body with filter, pagination, and optional query from config. |
Usage Examples
from mage_integrations.sources.datadog.streams import Dashboards
stream = Dashboards(config, state, catalog, client, logger)
for batch in stream.load_data(bookmarks={'date': '2024-01-01T00:00:00Z'}):
for record in batch:
process(record)