Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Intercom Streams

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Intercom, API
Last Updated 2026-02-09 00:00 GMT

Overview

Defines all Intercom API stream classes and their sync logic for the Mage Intercom source connector.

Description

This module contains the stream class hierarchy for the Intercom source connector. The BaseStream base class provides shared attributes (tap_stream_id, replication_method, replication_key, key_properties, path, params, data_key) and abstract methods for record retrieval and parent data access. Two intermediate classes define replication behavior: IncrementalStream (with a full sync method that manages bookmarks, writes schemas, transforms records, and emits Singer messages) and FullTableStream (which syncs all records without bookmark tracking). Twelve concrete stream classes represent Intercom resources: AdminList, Admins, Companies, CompanyAttributes, CompanySegments, Conversations, ConversationParts, ContactAttributes, Contacts, Segments, Tags, and Teams. Parent-child relationships are used (e.g., Conversations is the parent of ConversationParts). The module supports epoch-millisecond timestamp parsing via the Singer UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING transform.

Usage

Imported by the Intercom source connector to build stream objects, discover schemas, and execute sync operations against the Intercom API.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/intercom/streams.py
  • Lines: 1-739

Signature

class BaseStream:
    tap_stream_id = None
    replication_method = None
    replication_key = None
    key_properties = []
    path = None
    data_key = None
    parent = None
    def __init__(self, client: IntercomClient, logger=LOGGER): ...
    def get_records(self, bookmark_datetime=None, is_parent=False) -> list: ...
    def get_parent_data(self, bookmark_datetime=None) -> list: ...

class IncrementalStream(BaseStream):
    replication_method = 'INCREMENTAL'
    def sync(self, state, stream_schema, stream_metadata, config, transformer, ...): ...

class FullTableStream(BaseStream):
    replication_method = 'FULL_TABLE'
    def sync(self, state, stream_schema, stream_metadata, config, transformer, ...): ...

Import

from mage_integrations.sources.intercom.streams import STREAMS, BaseStream

I/O Contract

Inputs

Name Type Required Description
client IntercomClient Yes Authenticated Intercom API client for making HTTP requests
state dict Yes Singer state containing bookmark values
stream_schema dict Yes JSON schema for the stream
stream_metadata list Yes Singer metadata entries for the stream
config dict Yes Tap configuration with start_date and access_token

Outputs

Name Type Description
records list[dict] Transformed records emitted via Singer write_record
state dict Updated state with new bookmark values

Stream Classes

Full Table Streams

Stream Name Class Key Properties Data Key
admin_list AdminList [id] admins
admins Admins [id] admins
company_attributes CompanyAttributes [name] data_attributes
contact_attributes ContactAttributes [name] data_attributes
tags Tags [id] data
teams Teams [id] teams

Incremental Streams

Stream Name Class Replication Key Data Key
companies Companies updated_at data
company_segments CompnaySegments updated_at segments
conversations Conversations updated_at conversations
contacts Contacts updated_at data
segments Segments updated_at segments

Child Streams

Stream Name Class Parent Data Key
conversation_parts ConversationParts Conversations conversation_parts

Usage Examples

from mage_integrations.sources.intercom.streams import STREAMS
from mage_integrations.sources.intercom.client import IntercomClient

client = IntercomClient(config={"access_token": "tok_xxxx"})

# Instantiate a stream
contacts_cls = STREAMS["contacts"]
contacts_stream = contacts_cls(client)

# Retrieve records
records = contacts_stream.get_records(bookmark_datetime="2024-01-01T00:00:00Z")

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment