Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Google Ads Streams

From Leeroopedia
Revision as of 15:36, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Mage_ai_Mage_ai_Google_Ads_Streams.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Integration, Google_Ads, API
Last Updated 2026-02-09 00:00 GMT

Overview

Defines stream classes and GAQL query generation for extracting data from the Google Ads API within the Mage Google Ads source connector.

Description

This module implements the data extraction logic for Google Ads streams. It provides three main stream types: BaseStream for shared configuration and sync behavior, UserInterestStream for user interest data with custom pagination, and ReportStream for date-segmented report data with conversion window support. The module handles GAQL (Google Ads Query Language) query construction with WHERE/ORDER BY clauses, composite primary key pagination, nested resource schema generation, SHA-256 record hashing for deduplication, and exponential backoff retry logic for Google Ads API errors (quota exhaustion, transient failures, timeouts). It uses API version v15 and supports configurable conversion windows (1-30, 60, or 90 days) and request timeouts.

Usage

Used internally by the Google Ads source connector to sync both core resource streams (accounts, campaigns, ad groups) and performance report streams (with daily date segmentation).

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/google_ads/tap_google_ads/streams.py
  • Lines: 1-1170

Signature

class BaseStream:
    def __init__(self, fields, google_ads_resource_names, ...):
        ...
    def sync(self, sdk_client, customer, stream, config, state, ...):
        ...
    def get_query(self, selected_fields, last_pk_fetched):
        ...

class UserInterestStream(BaseStream):
    def sync(self, sdk_client, customer, stream, config, state, ...):
        ...

class ReportStream(BaseStream):
    def sync(self, sdk_client, customer, stream, config, state, ...):
        ...
    def get_query(self, selected_fields, query_date):
        ...

def make_request(gas, query, customer_id, config=None) -> response:
    ...
def create_core_stream_query(resource_name, selected_fields, ...) -> str:
    ...
def create_report_query(resource_name, selected_fields, query_date) -> str:
    ...
def generate_hash(record, metadata) -> str:
    ...

Import

from mage_integrations.sources.google_ads.tap_google_ads.streams import (
    BaseStream,
    ReportStream,
    make_request,
)

I/O Contract

Inputs

Name Type Required Description
sdk_client GoogleAdsClient Yes Authenticated Google Ads SDK client
customer dict Yes Customer/account configuration with customer_id
stream dict Yes Singer stream catalog entry with schema and metadata
config dict Yes Tap configuration with conversion_window, request_timeout, start_date, end_date
state dict Yes Singer state with bookmark values

Outputs

Name Type Description
records list[dict] Transformed and hashed records emitted via Singer write_record
state dict Updated state with new bookmark values after sync

Key Behaviors

GAQL Query Construction

  • create_core_stream_query builds SELECT queries with optional WHERE/ORDER BY clauses for primary-key-based pagination and LIMIT support.
  • create_report_query builds date-filtered queries using segments.date for daily report extraction.
  • build_parameters appends PARAMETERS omit_unselected_resource_names=true to all queries.

Retry Logic

The make_request function uses backoff.on_exception with exponential backoff (max 5 tries) for GoogleAdsException, ServerError, TooManyRequests, ReadTimeout, and AttributeError. The should_give_up function inspects Google Ads error codes to determine retryable errors (QuotaError.RESOURCE_EXHAUSTED, InternalError.TRANSIENT_ERROR, etc.) versus fatal errors.

Record Hashing

generate_hash creates a SHA-256 hash of non-metric fields for report records, stored as _sdc_record_hash, enabling deduplication of report rows.

Conversion Window

Report streams subtract the configured conversion window (default 30 days) from the bookmark date to re-fetch recent data that may have been retroactively updated by Google Ads attribution.

Usage Examples

from mage_integrations.sources.google_ads.tap_google_ads.streams import (
    create_core_stream_query,
    create_report_query,
)

# Core stream query with pagination
query = create_core_stream_query(
    resource_name="campaign",
    selected_fields=["campaign.id", "campaign.name"],
    last_pk_fetched=12345,
    filter_param="campaign.id",
    composite_pks=False,
    limit=10000,
)
# "SELECT campaign.id,campaign.name FROM campaign WHERE campaign.id > 12345 ORDER BY campaign.id ASC LIMIT 10000 PARAMETERS omit_unselected_resource_names=true"

# Report stream query
from singer import utils
query_date = utils.strptime_to_utc("2024-06-15")
query = create_report_query("campaign", ["campaign.id", "metrics.clicks", "segments.date"], query_date)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment