Implementation:Mage ai Mage ai Google Ads Streams
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Google_Ads, API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Defines stream classes and GAQL query generation for extracting data from the Google Ads API within the Mage Google Ads source connector.
Description
This module implements the data extraction logic for Google Ads streams. It provides three main stream types: BaseStream for shared configuration and sync behavior, UserInterestStream for user interest data with custom pagination, and ReportStream for date-segmented report data with conversion window support. The module handles GAQL (Google Ads Query Language) query construction with WHERE/ORDER BY clauses, composite primary key pagination, nested resource schema generation, SHA-256 record hashing for deduplication, and exponential backoff retry logic for Google Ads API errors (quota exhaustion, transient failures, timeouts). It uses API version v15 and supports configurable conversion windows (1-30, 60, or 90 days) and request timeouts.
Usage
Used internally by the Google Ads source connector to sync both core resource streams (accounts, campaigns, ad groups) and performance report streams (with daily date segmentation).
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/google_ads/tap_google_ads/streams.py
- Lines: 1-1170
Signature
class BaseStream:
def __init__(self, fields, google_ads_resource_names, ...):
...
def sync(self, sdk_client, customer, stream, config, state, ...):
...
def get_query(self, selected_fields, last_pk_fetched):
...
class UserInterestStream(BaseStream):
def sync(self, sdk_client, customer, stream, config, state, ...):
...
class ReportStream(BaseStream):
def sync(self, sdk_client, customer, stream, config, state, ...):
...
def get_query(self, selected_fields, query_date):
...
def make_request(gas, query, customer_id, config=None) -> response:
...
def create_core_stream_query(resource_name, selected_fields, ...) -> str:
...
def create_report_query(resource_name, selected_fields, query_date) -> str:
...
def generate_hash(record, metadata) -> str:
...
Import
from mage_integrations.sources.google_ads.tap_google_ads.streams import (
BaseStream,
ReportStream,
make_request,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sdk_client | GoogleAdsClient | Yes | Authenticated Google Ads SDK client |
| customer | dict | Yes | Customer/account configuration with customer_id |
| stream | dict | Yes | Singer stream catalog entry with schema and metadata |
| config | dict | Yes | Tap configuration with conversion_window, request_timeout, start_date, end_date |
| state | dict | Yes | Singer state with bookmark values |
Outputs
| Name | Type | Description |
|---|---|---|
| records | list[dict] | Transformed and hashed records emitted via Singer write_record |
| state | dict | Updated state with new bookmark values after sync |
Key Behaviors
GAQL Query Construction
- create_core_stream_query builds SELECT queries with optional WHERE/ORDER BY clauses for primary-key-based pagination and LIMIT support.
- create_report_query builds date-filtered queries using segments.date for daily report extraction.
- build_parameters appends PARAMETERS omit_unselected_resource_names=true to all queries.
Retry Logic
The make_request function uses backoff.on_exception with exponential backoff (max 5 tries) for GoogleAdsException, ServerError, TooManyRequests, ReadTimeout, and AttributeError. The should_give_up function inspects Google Ads error codes to determine retryable errors (QuotaError.RESOURCE_EXHAUSTED, InternalError.TRANSIENT_ERROR, etc.) versus fatal errors.
Record Hashing
generate_hash creates a SHA-256 hash of non-metric fields for report records, stored as _sdc_record_hash, enabling deduplication of report rows.
Conversion Window
Report streams subtract the configured conversion window (default 30 days) from the bookmark date to re-fetch recent data that may have been retroactively updated by Google Ads attribution.
Usage Examples
from mage_integrations.sources.google_ads.tap_google_ads.streams import (
create_core_stream_query,
create_report_query,
)
# Core stream query with pagination
query = create_core_stream_query(
resource_name="campaign",
selected_fields=["campaign.id", "campaign.name"],
last_pk_fetched=12345,
filter_param="campaign.id",
composite_pks=False,
limit=10000,
)
# "SELECT campaign.id,campaign.name FROM campaign WHERE campaign.id > 12345 ORDER BY campaign.id ASC LIMIT 10000 PARAMETERS omit_unselected_resource_names=true"
# Report stream query
from singer import utils
query_date = utils.strptime_to_utc("2024-06-15")
query = create_report_query("campaign", ["campaign.id", "metrics.clicks", "segments.date"], query_date)