Implementation:Mage ai Mage ai Google Ads Sync
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Google_Ads, Sync |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Sync orchestration module for the Mage Google Ads source connector, responsible for coordinating the extraction of data across selected streams and customer accounts.
Description
This module implements the sync phase of the Google Ads tap. The do_sync function orchestrates the extraction process by:
1. Parsing customer accounts - Reads login_customer_ids from config (JSON or raw), sorts them by customerId.
2. Filtering selected streams - Extracts selected streams from the catalog based on metadata selected flag, sorts them by tap_stream_id.
3. Resumption support - Uses get_currently_syncing to determine if a previous sync was interrupted, and uses the shuffle function to reorder streams and customers to resume from the interruption point.
4. Stream iteration - For each selected stream and customer combination, creates an SDK client, writes the stream schema, and delegates to the appropriate stream object's sync method (either core stream or report stream).
5. Query limit management - get_query_limit reads and validates a configurable query limit (default: 1,000,000 rows).
The shuffle function implements a smart reordering algorithm: it places the currently-syncing item at the front of the list while maintaining consistent ordering, even handling the case where the item no longer exists in the list.
Usage
Called by the main tap entry point after discovery to execute the data extraction.
Code Reference
Source Location
- Repository: mage-ai
- File:
mage_integrations/mage_integrations/sources/google_ads/tap_google_ads/sync.py - Lines: 1-145
Signature
def get_currently_syncing(state):
def sort_customers(customers):
def sort_selected_streams(sort_list):
def shuffle(shuffle_list, shuffle_key, current_value, sort_function):
def get_query_limit(config):
def do_sync(config, catalog, resource_schema, state, logger=None):
Import
from mage_integrations.sources.google_ads.tap_google_ads.sync import do_sync
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Configuration with login_customer_ids, query_limit, and API credentials
|
| catalog | dict | Yes | Singer catalog with selected streams |
| resource_schema | dict | Yes | Resource schema from discovery phase |
| state | dict | Yes | Singer state for bookmark and resumption tracking |
| logger | Logger | No | Optional logger (defaults to Singer LOGGER) |
Outputs
| Name | Type | Description |
|---|---|---|
| state | dict | Updated Singer state with bookmarks and cleared currently_syncing
|
Key Functions
| Function | Description |
|---|---|
do_sync(config, catalog, resource_schema, state, logger) |
Main sync orchestrator. Iterates streams x customers, writes schemas, and calls stream sync methods. |
get_currently_syncing(state) |
Returns (stream_name, customer_id) tuple from state for resumption. |
shuffle(shuffle_list, shuffle_key, current_value, sort_function) |
Reorders a sorted list to place current_value first for sync resumption. |
get_query_limit(config) |
Reads and validates query_limit from config (default: 1,000,000). |
Usage Examples
from mage_integrations.sources.google_ads.tap_google_ads.sync import do_sync
from mage_integrations.sources.google_ads.tap_google_ads.discover import create_resource_schema
resource_schema = create_resource_schema(config)
do_sync(config, catalog, resource_schema, state, logger=logger)