Implementation:Mage ai Mage ai Twitter Ads Streams
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Twitter_Ads, API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Defines all Twitter Ads API stream classes, sync logic, and report generation for the Mage Twitter Ads source connector.
Description
This module provides a comprehensive set of stream classes for extracting data from the Twitter Ads API. The TwitterAds base class defines shared attributes (tap_stream_id, replication_method, replication_key, key_properties, path, params, data_key, bookmark_query_field_from/to, pagination, parent_path, parent_id_field) and utility methods for writing schemas, writing records, managing bookmarks per account, converting cursor objects to dictionaries, and making API SDK requests. Two major subclass categories exist: the Reports class (which handles asynchronous analytics report generation with placement-based segmentation, date-chunked queries, and job polling) and 33 concrete resource stream classes for entities such as Accounts, Campaigns, LineItems, Cards, PromotedTweets, TailoredAudiences, Tweets, and various targeting dimension lookups. The module includes update_currently_syncing for resumable sync state, configurable page_size validation, and a retry_pattern decorator that patches the Twitter SDK Request.perform method with ConnectionError backoff (5 retries, 60-second intervals).
Usage
Imported by the Twitter Ads source connector to discover available streams, configure sync parameters, and execute data extraction against the Twitter Ads API using the official Python SDK.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/twitter_ads/tap_twitter_ads/streams.py
- Lines: 1-1875
Signature
class TwitterAds:
tap_stream_id = None
replication_method = None
replication_key = []
key_properties = []
path = None
data_key = None
PLACEMENTS = ['ALL_ON_TWITTER', 'PUBLISHER_NETWORK']
def write_schema(self, catalog, stream_name) -> None: ...
def write_record(self, stream_name, record, time_extracted) -> None: ...
def get_bookmark(self, state, stream, default, account_id) -> str: ...
def write_bookmark(self, state, stream, value, account_id, sub_type=None) -> None: ...
def obj_to_dict(self, obj) -> dict: ...
class Reports(TwitterAds):
def sync(self, client, config, catalog, state, ...): ...
class Accounts(TwitterAds): ...
class Campaigns(TwitterAds): ...
class LineItems(TwitterAds): ...
class Tweets(TwitterAds): ...
# ... (33 resource stream classes total)
Import
from mage_integrations.sources.twitter_ads.tap_twitter_ads.streams import STREAMS, TwitterAds
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| client | twitter_ads.Client | Yes | Authenticated Twitter Ads SDK client |
| config | dict | Yes | Tap configuration with account_ids, start_date, page_size, etc. |
| catalog | Catalog | Yes | Singer catalog with selected streams and schemas |
| state | dict | Yes | Singer state with per-account bookmark values |
| account_id | str | Yes | Twitter Ads account ID for account-scoped operations |
Outputs
| Name | Type | Description |
|---|---|---|
| records | Singer messages | Records emitted via singer.messages.write_record per stream |
| state | dict | Updated state with per-account bookmark values |
Stream Classes
Report Stream
| Class | Description | Replication |
|---|---|---|
| Reports | Asynchronous analytics reports with placement segmentation | INCREMENTAL |
Resource Streams (33 classes)
| Stream Name | Class | Replication | Key Properties |
|---|---|---|---|
| accounts | Accounts | INCREMENTAL | [id] |
| account_media | AccountMedia | INCREMENTAL | [id] |
| advertiser_business_categories | AdvertiserBusinessCategories | FULL_TABLE | [id] |
| tracking_tags | TrackingTags | INCREMENTAL | [id] |
| campaigns | Campaigns | INCREMENTAL | [id] |
| cards | Cards | INCREMENTAL | [id] |
| cards_poll | CardsPoll | INCREMENTAL | [id] |
| cards_image_conversation | CardsImageConversation | INCREMENTAL | [id] |
| cards_video_conversation | CardsVideoConversation | INCREMENTAL | [id] |
| content_categories | ContentCategories | FULL_TABLE | [id] |
| funding_instruments | FundingInstruments | INCREMENTAL | [id] |
| iab_categories | IabCategories | FULL_TABLE | [id] |
| targeting_criteria | TargetingCriteria | INCREMENTAL | [id] |
| line_items | LineItems | INCREMENTAL | [id] |
| media_creatives | MediaCreatives | INCREMENTAL | [id] |
| preroll_call_to_actions | PrerollCallToActions | INCREMENTAL | [id] |
| promoted_accounts | PromotedAccounts | INCREMENTAL | [id] |
| promoted_tweets | PromotedTweets | INCREMENTAL | [id] |
| promotable_users | PromotableUsers | INCREMENTAL | [id] |
| scheduled_promoted_tweets | ScheduledPromotedTweets | INCREMENTAL | [id] |
| tailored_audiences | TailoredAudiences | INCREMENTAL | [id] |
| targeting_app_store_categories | TargetingAppStoreCategories | FULL_TABLE | [targeting_value] |
| targeting_conversations | TargetingConversations | FULL_TABLE | [targeting_value] |
| targeting_devices | TargetingDevices | FULL_TABLE | [targeting_value] |
| targeting_events | TargetingEvents | FULL_TABLE | [targeting_value] |
| targeting_interests | TargetingInterests | FULL_TABLE | [targeting_value] |
| targeting_languages | TargetingLanguages | FULL_TABLE | [targeting_value] |
| targeting_locations | TargetingLocations | FULL_TABLE | [targeting_value] |
| targeting_network_operators | TargetingNetworkOperators | FULL_TABLE | [targeting_value] |
| targeting_platform_versions | TargetingPlatformVersions | FULL_TABLE | [targeting_value] |
| targeting_platforms | TargetingPlatforms | FULL_TABLE | [targeting_value] |
| targeting_tv_markets | TargetingTvMarkets | FULL_TABLE | [targeting_value] |
| targeting_tv_shows | TargetingTVShows | FULL_TABLE | [targeting_value] |
| tweets | Tweets | INCREMENTAL | [id] |
Key Behaviors
Account-Scoped Bookmarks
Bookmark values are stored per-account within the state: state["bookmarks"][stream_name][account_id]. For the Tweets stream, bookmarks are further nested by sub_type within the account.
Asynchronous Report Generation
The Reports class uses the Twitter Ads async analytics endpoint. It submits report jobs with date ranges and placement types (ALL_ON_TWITTER, PUBLISHER_NETWORK), polls for completion, and then retrieves the results. Reports are segmented by date and transformed using transform_report.
SDK Request Retry
The module patches twitter_ads.http.Request.perform at import time with a retry_pattern decorator that retries on ConnectionError up to 5 times with 60-second constant backoff intervals.
Configurable Page Size
The get_page_size function validates and returns the page size from config, raising an exception for invalid values (non-integer, negative, or float).
Usage Examples
from mage_integrations.sources.twitter_ads.tap_twitter_ads.streams import STREAMS
# List available streams
for stream_name, stream_cls in STREAMS.items():
stream = stream_cls()
print(f"{stream_name}: replication={stream.replication_method}, keys={stream.key_properties}")
# Access a specific stream class
campaigns_cls = STREAMS["campaigns"]
campaigns = campaigns_cls()
print(campaigns.tap_stream_id) # "campaigns"
print(campaigns.replication_method) # "INCREMENTAL"
print(campaigns.path) # API path for campaigns endpoint