Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Chargebee Base Stream

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Chargebee, Stream
Last Updated 2026-02-09 00:00 GMT

Overview

Base stream class for all Chargebee data streams, providing common logic for schema writing, data loading with pagination, record transformation, and bookmark-based incremental syncing.

Description

The BaseChargebeeStream class serves as the foundation for all Chargebee entity streams. It implements the full data extraction lifecycle: schema emission via Singer's write_schema, incremental data loading with POSIX timestamp-based bookmark windows, offset-based pagination, and record transformation using Singer's Transformer with Unix seconds datetime parsing. The class handles custom field extraction (fields prefixed with cf_) by aggregating them into a serialized JSON custom_fields attribute. It supports tracking deleted records for plans, addons, and coupons via the events endpoint using the Util helper. Subclasses must define TABLE, ENTITY, SCHEMA, REPLICATION_KEY, BOOKMARK_PROPERTIES, and SORT_BY class properties.

Usage

Extended by concrete Chargebee stream classes (e.g., subscriptions, customers, events). The load_data method is a generator that yields batches of transformed records.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/chargebee/streams/base.py
  • Lines: 1-245

Signature

class BaseChargebeeStream():
    TABLE = None
    KEY_PROPERTIES = []
    API_METHOD = 'GET'
    REQUIRES = []

    def __init__(self, config, state, catalog, client, logger=None):

Import

from mage_integrations.sources.chargebee.streams.base import BaseChargebeeStream

I/O Contract

Inputs

Name Type Required Description
config dict Yes Configuration with start_date, include_deleted, and API credentials
state dict Yes Singer state for bookmark tracking
catalog CatalogEntry Yes Stream catalog entry with schema, metadata, and replication method
client ChargebeeClient Yes Chargebee API client instance
logger Logger No Optional logger (defaults to Singer LOGGER)

Outputs

Name Type Description
records Generator[list] Generator yielding batches of transformed record dictionaries

Key Methods

Method Description
load_data(logger) Generator that fetches records using bookmark-based time windows with offset pagination. Yields batches of transformed records.
write_schema() Emits the Singer schema message for this stream with key properties and bookmark properties.
transform_record(record) Transforms a single record using Singer Transformer with Unix-seconds datetime parsing and custom field extraction.
get_stream_data(data) Extracts and transforms entity records from the API response list.
append_custom_fields(record) Extracts fields prefixed with cf_ and aggregates them into a JSON custom_fields attribute.
get_url() Abstract method that subclasses must implement to return the API endpoint URL.
get_schema() Loads the JSON schema file by stream name from the schemas directory.

Subclass Requirements

Concrete stream subclasses must define the following class-level properties:

Property Type Description
TABLE str Stream/table name used for state bookmarking
ENTITY str Entity key used to extract records from API response items
SCHEMA str Name of the JSON schema file (without extension)
REPLICATION_KEY str Field name used for incremental replication bookmarking
BOOKMARK_PROPERTIES list List of bookmark property names
SORT_BY str/None Sort field for API requests (or None to skip sorting)

Usage Examples

from mage_integrations.sources.chargebee.streams.base import BaseChargebeeStream

class SubscriptionsStream(BaseChargebeeStream):
    TABLE = 'subscriptions'
    ENTITY = 'subscription'
    SCHEMA = 'subscriptions'
    KEY_PROPERTIES = ['id']
    REPLICATION_KEY = 'updated_at'
    BOOKMARK_PROPERTIES = ['updated_at']
    SORT_BY = 'updated_at'

    def get_url(self):
        return 'https://{}.chargebee.com/api/v2/subscriptions'.format(
            self.config.get('site'))

# Usage in sync:
stream = SubscriptionsStream(config, state, catalog, client)
stream.write_schema()
for batch in stream.load_data():
    for record in batch:
        singer.write_record(stream.TABLE, record)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment