Implementation:Mage ai Mage ai Chargebee Base Stream
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Chargebee, Stream |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Base stream class for all Chargebee data streams, providing common logic for schema writing, data loading with pagination, record transformation, and bookmark-based incremental syncing.
Description
The BaseChargebeeStream class serves as the foundation for all Chargebee entity streams. It implements the full data extraction lifecycle: schema emission via Singer's write_schema, incremental data loading with POSIX timestamp-based bookmark windows, offset-based pagination, and record transformation using Singer's Transformer with Unix seconds datetime parsing. The class handles custom field extraction (fields prefixed with cf_) by aggregating them into a serialized JSON custom_fields attribute. It supports tracking deleted records for plans, addons, and coupons via the events endpoint using the Util helper. Subclasses must define TABLE, ENTITY, SCHEMA, REPLICATION_KEY, BOOKMARK_PROPERTIES, and SORT_BY class properties.
Usage
Extended by concrete Chargebee stream classes (e.g., subscriptions, customers, events). The load_data method is a generator that yields batches of transformed records.
Code Reference
Source Location
- Repository: mage-ai
- File:
mage_integrations/mage_integrations/sources/chargebee/streams/base.py - Lines: 1-245
Signature
class BaseChargebeeStream():
TABLE = None
KEY_PROPERTIES = []
API_METHOD = 'GET'
REQUIRES = []
def __init__(self, config, state, catalog, client, logger=None):
Import
from mage_integrations.sources.chargebee.streams.base import BaseChargebeeStream
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Configuration with start_date, include_deleted, and API credentials
|
| state | dict | Yes | Singer state for bookmark tracking |
| catalog | CatalogEntry | Yes | Stream catalog entry with schema, metadata, and replication method |
| client | ChargebeeClient | Yes | Chargebee API client instance |
| logger | Logger | No | Optional logger (defaults to Singer LOGGER) |
Outputs
| Name | Type | Description |
|---|---|---|
| records | Generator[list] | Generator yielding batches of transformed record dictionaries |
Key Methods
| Method | Description |
|---|---|
load_data(logger) |
Generator that fetches records using bookmark-based time windows with offset pagination. Yields batches of transformed records. |
write_schema() |
Emits the Singer schema message for this stream with key properties and bookmark properties. |
transform_record(record) |
Transforms a single record using Singer Transformer with Unix-seconds datetime parsing and custom field extraction. |
get_stream_data(data) |
Extracts and transforms entity records from the API response list. |
append_custom_fields(record) |
Extracts fields prefixed with cf_ and aggregates them into a JSON custom_fields attribute.
|
get_url() |
Abstract method that subclasses must implement to return the API endpoint URL. |
get_schema() |
Loads the JSON schema file by stream name from the schemas directory. |
Subclass Requirements
Concrete stream subclasses must define the following class-level properties:
| Property | Type | Description |
|---|---|---|
TABLE |
str | Stream/table name used for state bookmarking |
ENTITY |
str | Entity key used to extract records from API response items |
SCHEMA |
str | Name of the JSON schema file (without extension) |
REPLICATION_KEY |
str | Field name used for incremental replication bookmarking |
BOOKMARK_PROPERTIES |
list | List of bookmark property names |
SORT_BY |
str/None | Sort field for API requests (or None to skip sorting) |
Usage Examples
from mage_integrations.sources.chargebee.streams.base import BaseChargebeeStream
class SubscriptionsStream(BaseChargebeeStream):
TABLE = 'subscriptions'
ENTITY = 'subscription'
SCHEMA = 'subscriptions'
KEY_PROPERTIES = ['id']
REPLICATION_KEY = 'updated_at'
BOOKMARK_PROPERTIES = ['updated_at']
SORT_BY = 'updated_at'
def get_url(self):
return 'https://{}.chargebee.com/api/v2/subscriptions'.format(
self.config.get('site'))
# Usage in sync:
stream = SubscriptionsStream(config, state, catalog, client)
stream.write_schema()
for batch in stream.load_data():
for record in batch:
singer.write_record(stream.TABLE, record)