Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Twitter Ads Streams

From Leeroopedia
Revision as of 15:37, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Mage_ai_Mage_ai_Twitter_Ads_Streams.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Integration, Twitter_Ads, API
Last Updated 2026-02-09 00:00 GMT

Overview

Defines all Twitter Ads API stream classes, sync logic, and report generation for the Mage Twitter Ads source connector.

Description

This module provides a comprehensive set of stream classes for extracting data from the Twitter Ads API. The TwitterAds base class defines shared attributes (tap_stream_id, replication_method, replication_key, key_properties, path, params, data_key, bookmark_query_field_from/to, pagination, parent_path, parent_id_field) and utility methods for writing schemas, writing records, managing bookmarks per account, converting cursor objects to dictionaries, and making API SDK requests. Two major subclass categories exist: the Reports class (which handles asynchronous analytics report generation with placement-based segmentation, date-chunked queries, and job polling) and 33 concrete resource stream classes for entities such as Accounts, Campaigns, LineItems, Cards, PromotedTweets, TailoredAudiences, Tweets, and various targeting dimension lookups. The module includes update_currently_syncing for resumable sync state, configurable page_size validation, and a retry_pattern decorator that patches the Twitter SDK Request.perform method with ConnectionError backoff (5 retries, 60-second intervals).

Usage

Imported by the Twitter Ads source connector to discover available streams, configure sync parameters, and execute data extraction against the Twitter Ads API using the official Python SDK.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/twitter_ads/tap_twitter_ads/streams.py
  • Lines: 1-1875

Signature

class TwitterAds:
    tap_stream_id = None
    replication_method = None
    replication_key = []
    key_properties = []
    path = None
    data_key = None
    PLACEMENTS = ['ALL_ON_TWITTER', 'PUBLISHER_NETWORK']

    def write_schema(self, catalog, stream_name) -> None: ...
    def write_record(self, stream_name, record, time_extracted) -> None: ...
    def get_bookmark(self, state, stream, default, account_id) -> str: ...
    def write_bookmark(self, state, stream, value, account_id, sub_type=None) -> None: ...
    def obj_to_dict(self, obj) -> dict: ...

class Reports(TwitterAds):
    def sync(self, client, config, catalog, state, ...): ...

class Accounts(TwitterAds): ...
class Campaigns(TwitterAds): ...
class LineItems(TwitterAds): ...
class Tweets(TwitterAds): ...
# ... (33 resource stream classes total)

Import

from mage_integrations.sources.twitter_ads.tap_twitter_ads.streams import STREAMS, TwitterAds

I/O Contract

Inputs

Name Type Required Description
client twitter_ads.Client Yes Authenticated Twitter Ads SDK client
config dict Yes Tap configuration with account_ids, start_date, page_size, etc.
catalog Catalog Yes Singer catalog with selected streams and schemas
state dict Yes Singer state with per-account bookmark values
account_id str Yes Twitter Ads account ID for account-scoped operations

Outputs

Name Type Description
records Singer messages Records emitted via singer.messages.write_record per stream
state dict Updated state with per-account bookmark values

Stream Classes

Report Stream

Class Description Replication
Reports Asynchronous analytics reports with placement segmentation INCREMENTAL

Resource Streams (33 classes)

Stream Name Class Replication Key Properties
accounts Accounts INCREMENTAL [id]
account_media AccountMedia INCREMENTAL [id]
advertiser_business_categories AdvertiserBusinessCategories FULL_TABLE [id]
tracking_tags TrackingTags INCREMENTAL [id]
campaigns Campaigns INCREMENTAL [id]
cards Cards INCREMENTAL [id]
cards_poll CardsPoll INCREMENTAL [id]
cards_image_conversation CardsImageConversation INCREMENTAL [id]
cards_video_conversation CardsVideoConversation INCREMENTAL [id]
content_categories ContentCategories FULL_TABLE [id]
funding_instruments FundingInstruments INCREMENTAL [id]
iab_categories IabCategories FULL_TABLE [id]
targeting_criteria TargetingCriteria INCREMENTAL [id]
line_items LineItems INCREMENTAL [id]
media_creatives MediaCreatives INCREMENTAL [id]
preroll_call_to_actions PrerollCallToActions INCREMENTAL [id]
promoted_accounts PromotedAccounts INCREMENTAL [id]
promoted_tweets PromotedTweets INCREMENTAL [id]
promotable_users PromotableUsers INCREMENTAL [id]
scheduled_promoted_tweets ScheduledPromotedTweets INCREMENTAL [id]
tailored_audiences TailoredAudiences INCREMENTAL [id]
targeting_app_store_categories TargetingAppStoreCategories FULL_TABLE [targeting_value]
targeting_conversations TargetingConversations FULL_TABLE [targeting_value]
targeting_devices TargetingDevices FULL_TABLE [targeting_value]
targeting_events TargetingEvents FULL_TABLE [targeting_value]
targeting_interests TargetingInterests FULL_TABLE [targeting_value]
targeting_languages TargetingLanguages FULL_TABLE [targeting_value]
targeting_locations TargetingLocations FULL_TABLE [targeting_value]
targeting_network_operators TargetingNetworkOperators FULL_TABLE [targeting_value]
targeting_platform_versions TargetingPlatformVersions FULL_TABLE [targeting_value]
targeting_platforms TargetingPlatforms FULL_TABLE [targeting_value]
targeting_tv_markets TargetingTvMarkets FULL_TABLE [targeting_value]
targeting_tv_shows TargetingTVShows FULL_TABLE [targeting_value]
tweets Tweets INCREMENTAL [id]

Key Behaviors

Account-Scoped Bookmarks

Bookmark values are stored per-account within the state: state["bookmarks"][stream_name][account_id]. For the Tweets stream, bookmarks are further nested by sub_type within the account.

Asynchronous Report Generation

The Reports class uses the Twitter Ads async analytics endpoint. It submits report jobs with date ranges and placement types (ALL_ON_TWITTER, PUBLISHER_NETWORK), polls for completion, and then retrieves the results. Reports are segmented by date and transformed using transform_report.

SDK Request Retry

The module patches twitter_ads.http.Request.perform at import time with a retry_pattern decorator that retries on ConnectionError up to 5 times with 60-second constant backoff intervals.

Configurable Page Size

The get_page_size function validates and returns the page size from config, raising an exception for invalid values (non-integer, negative, or float).

Usage Examples

from mage_integrations.sources.twitter_ads.tap_twitter_ads.streams import STREAMS

# List available streams
for stream_name, stream_cls in STREAMS.items():
    stream = stream_cls()
    print(f"{stream_name}: replication={stream.replication_method}, keys={stream.key_properties}")

# Access a specific stream class
campaigns_cls = STREAMS["campaigns"]
campaigns = campaigns_cls()
print(campaigns.tap_stream_id)     # "campaigns"
print(campaigns.replication_method) # "INCREMENTAL"
print(campaigns.path)               # API path for campaigns endpoint

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment