Overview
Defines all Intercom API stream classes and their sync logic for the Mage Intercom source connector.
Description
This module contains the stream class hierarchy for the Intercom source connector. The BaseStream base class provides shared attributes (tap_stream_id, replication_method, replication_key, key_properties, path, params, data_key) and abstract methods for record retrieval and parent data access. Two intermediate classes define replication behavior: IncrementalStream (with a full sync method that manages bookmarks, writes schemas, transforms records, and emits Singer messages) and FullTableStream (which syncs all records without bookmark tracking). Twelve concrete stream classes represent Intercom resources: AdminList, Admins, Companies, CompanyAttributes, CompanySegments, Conversations, ConversationParts, ContactAttributes, Contacts, Segments, Tags, and Teams. Parent-child relationships are used (e.g., Conversations is the parent of ConversationParts). The module supports epoch-millisecond timestamp parsing via the Singer UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING transform.
Usage
Imported by the Intercom source connector to build stream objects, discover schemas, and execute sync operations against the Intercom API.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/intercom/streams.py
- Lines: 1-739
Signature
class BaseStream:
tap_stream_id = None
replication_method = None
replication_key = None
key_properties = []
path = None
data_key = None
parent = None
def __init__(self, client: IntercomClient, logger=LOGGER): ...
def get_records(self, bookmark_datetime=None, is_parent=False) -> list: ...
def get_parent_data(self, bookmark_datetime=None) -> list: ...
class IncrementalStream(BaseStream):
replication_method = 'INCREMENTAL'
def sync(self, state, stream_schema, stream_metadata, config, transformer, ...): ...
class FullTableStream(BaseStream):
replication_method = 'FULL_TABLE'
def sync(self, state, stream_schema, stream_metadata, config, transformer, ...): ...
Import
from mage_integrations.sources.intercom.streams import STREAMS, BaseStream
I/O Contract
Inputs
| Name |
Type |
Required |
Description
|
| client |
IntercomClient |
Yes |
Authenticated Intercom API client for making HTTP requests
|
| state |
dict |
Yes |
Singer state containing bookmark values
|
| stream_schema |
dict |
Yes |
JSON schema for the stream
|
| stream_metadata |
list |
Yes |
Singer metadata entries for the stream
|
| config |
dict |
Yes |
Tap configuration with start_date and access_token
|
Outputs
| Name |
Type |
Description
|
| records |
list[dict] |
Transformed records emitted via Singer write_record
|
| state |
dict |
Updated state with new bookmark values
|
Stream Classes
Full Table Streams
| Stream Name |
Class |
Key Properties |
Data Key
|
| admin_list |
AdminList |
[id] |
admins
|
| admins |
Admins |
[id] |
admins
|
| company_attributes |
CompanyAttributes |
[name] |
data_attributes
|
| contact_attributes |
ContactAttributes |
[name] |
data_attributes
|
| tags |
Tags |
[id] |
data
|
| teams |
Teams |
[id] |
teams
|
Incremental Streams
| Stream Name |
Class |
Replication Key |
Data Key
|
| companies |
Companies |
updated_at |
data
|
| company_segments |
CompnaySegments |
updated_at |
segments
|
| conversations |
Conversations |
updated_at |
conversations
|
| contacts |
Contacts |
updated_at |
data
|
| segments |
Segments |
updated_at |
segments
|
Child Streams
| Stream Name |
Class |
Parent |
Data Key
|
| conversation_parts |
ConversationParts |
Conversations |
conversation_parts
|
Usage Examples
from mage_integrations.sources.intercom.streams import STREAMS
from mage_integrations.sources.intercom.client import IntercomClient
client = IntercomClient(config={"access_token": "tok_xxxx"})
# Instantiate a stream
contacts_cls = STREAMS["contacts"]
contacts_stream = contacts_cls(client)
# Retrieve records
records = contacts_stream.get_records(bookmark_datetime="2024-01-01T00:00:00Z")
Related Pages
Implements Principle
Requires Environment