Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Datadog Streams

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Datadog, Stream
Last Updated 2026-02-09 00:00 GMT

Overview

Stream class definitions for the Mage Datadog source connector, providing a base stream class and concrete implementations for Dashboards, Downtimes, Synthetic Tests, Audit Logs, Logs, Metrics, Incidents, Incident Teams, and Users.

Description

This module defines a DatadogStream base class and multiple concrete stream subclasses organized into a class hierarchy. The base class provides common data loading logic with bookmark-based time windowing, pagination support, and record transformation. The hierarchy includes:

  • DatadogStream - Base class with GET-based data loading, time-window params, and pagination hooks.
  • IncrementalSearchableStream - Extends base for POST-based search endpoints (Audit Logs, Logs) with cursor-based pagination and configurable query payloads.
  • BasedListStream - Extends base for simple list endpoints (Metrics) that return data under a data key.
  • PaginatedBasedListStream - Extends BasedListStream with offset-based pagination (Incidents, Incident Teams).
  • Users - Extends PaginatedBasedListStream with page-number-based pagination.

Each concrete stream defines TABLE, ENTITY, API_METHOD, SCHEMA, URL_PATH, and optionally parse_response_root.

Usage

Stream classes are instantiated by the Datadog source connector with config, state, catalog, client, and logger. The load_data method yields batches of records.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/datadog/streams/__init__.py
  • Lines: 1-276

Signature

class DatadogStream:
    primary_key: Optional[str] = None
    parse_response_root: Optional[str] = None
    BASE_PATH = 'https://api.datadoghq.com/api/'
    URL_PATH = ''

    def __init__(self, config, state, catalog, client: DatadogClient, logger):

Import

from mage_integrations.sources.datadog.streams import (
    DatadogStream, Dashboards, Downtimes, SyntheticTests,
    AuditLogs, Logs, Metrics, Incidents, IncidentTeams, Users,
)

Stream Classes

Stream Class TABLE API Method URL Path Replication
Dashboards dashboards GET v1/dashboard Full table
Downtimes downtimes GET v1/downtime Full table
SyntheticTests synthetic_tests GET v1/synthetics/tests Full table
AuditLogs audit_logs POST v2/audit/events/search Incremental (cursor)
Logs logs POST v2/logs/events/search Incremental (cursor)
Metrics metrics GET v2/metrics List-based
Incidents incidents GET v2/incidents Paginated offset
IncidentTeams incident_teams GET v2/teams Paginated offset
Users users GET v2/users Paginated page number

Key Methods

Method Class Description
load_data(bookmarks, bookmark_properties, to_date) DatadogStream Generator fetching records with time-window params and pagination.
make_request(params) DatadogStream Delegates to client's make_request with URL and API method.
next_page_token(response) Various Returns next page token or empty dict. Overridden per pagination strategy.
parse_response(response) DatadogStream Extracts records from response using parse_response_root and applies transformation.
get_payload(cursor, **kwargs) IncrementalSearchableStream Builds POST body with filter, pagination, and optional query from config.

Usage Examples

from mage_integrations.sources.datadog.streams import Dashboards

stream = Dashboards(config, state, catalog, client, logger)
for batch in stream.load_data(bookmarks={'date': '2024-01-01T00:00:00Z'}):
    for record in batch:
        process(record)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment