Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Google Ads Sync

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Google_Ads, Sync
Last Updated 2026-02-09 00:00 GMT

Overview

Sync orchestration module for the Mage Google Ads source connector, responsible for coordinating the extraction of data across selected streams and customer accounts.

Description

This module implements the sync phase of the Google Ads tap. The do_sync function orchestrates the extraction process by:

1. Parsing customer accounts - Reads login_customer_ids from config (JSON or raw), sorts them by customerId. 2. Filtering selected streams - Extracts selected streams from the catalog based on metadata selected flag, sorts them by tap_stream_id. 3. Resumption support - Uses get_currently_syncing to determine if a previous sync was interrupted, and uses the shuffle function to reorder streams and customers to resume from the interruption point. 4. Stream iteration - For each selected stream and customer combination, creates an SDK client, writes the stream schema, and delegates to the appropriate stream object's sync method (either core stream or report stream). 5. Query limit management - get_query_limit reads and validates a configurable query limit (default: 1,000,000 rows).

The shuffle function implements a smart reordering algorithm: it places the currently-syncing item at the front of the list while maintaining consistent ordering, even handling the case where the item no longer exists in the list.

Usage

Called by the main tap entry point after discovery to execute the data extraction.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/google_ads/tap_google_ads/sync.py
  • Lines: 1-145

Signature

def get_currently_syncing(state):
def sort_customers(customers):
def sort_selected_streams(sort_list):
def shuffle(shuffle_list, shuffle_key, current_value, sort_function):
def get_query_limit(config):
def do_sync(config, catalog, resource_schema, state, logger=None):

Import

from mage_integrations.sources.google_ads.tap_google_ads.sync import do_sync

I/O Contract

Inputs

Name Type Required Description
config dict Yes Configuration with login_customer_ids, query_limit, and API credentials
catalog dict Yes Singer catalog with selected streams
resource_schema dict Yes Resource schema from discovery phase
state dict Yes Singer state for bookmark and resumption tracking
logger Logger No Optional logger (defaults to Singer LOGGER)

Outputs

Name Type Description
state dict Updated Singer state with bookmarks and cleared currently_syncing

Key Functions

Function Description
do_sync(config, catalog, resource_schema, state, logger) Main sync orchestrator. Iterates streams x customers, writes schemas, and calls stream sync methods.
get_currently_syncing(state) Returns (stream_name, customer_id) tuple from state for resumption.
shuffle(shuffle_list, shuffle_key, current_value, sort_function) Reorders a sorted list to place current_value first for sync resumption.
get_query_limit(config) Reads and validates query_limit from config (default: 1,000,000).

Usage Examples

from mage_integrations.sources.google_ads.tap_google_ads.sync import do_sync
from mage_integrations.sources.google_ads.tap_google_ads.discover import create_resource_schema

resource_schema = create_resource_schema(config)
do_sync(config, catalog, resource_schema, state, logger=logger)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment