Implementation:Mage ai Mage ai Source Process
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL, Orchestration |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for orchestrating the source connector lifecycle across multiple execution modes provided by the Mage integrations Source base class.
Description
Source.process is the main entry point that dispatches to the appropriate execution mode. It checks CLI flags in priority order: test_connection, load_sample_data, discover_mode, count_records, show_templates, or default sync. For sync mode, it handles auto_add_new_fields by re-discovering streams that have this flag enabled, filters by selected_streams, and calls sync() which iterates through selected streams calling process_stream (SCHEMA emission) and sync_stream (data extraction + RECORD/STATE emission). The companion main function wraps process with Singer's handle_top_exception error handler.
Usage
Called as the entry point via: if __name__ == "__main__": main(MySource)
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/base.py
- Lines: 190-284 (process), 517-544 (sync), 378-452 (sync_stream), 729-732 (main)
Signature
class Source:
def process(self) -> None:
"""Main method to fetch data from the source.
Routes to: test_connection, discover, count_records,
show_templates, load_sample_data, or sync.
"""
def sync(self, catalog: Catalog, properties: Dict = None) -> None:
"""Sync data for all selected streams in the catalog."""
def sync_stream(self, stream, properties: Dict = None) -> int:
"""Sync a single stream: load_data -> write_records -> write_state.
Returns:
Number of records synced.
"""
@utils.handle_top_exception(LOGGER)
def main(source_class, **kwargs):
"""Entry point: instantiate source and call process()."""
source = source_class(**kwargs)
source.process()
Import
from mage_integrations.sources.base import Source, main
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| self (initialized Source) | Source | Yes | Source with config, state, catalog, mode flags set |
Outputs
| Name | Type | Description |
|---|---|---|
| stdout | JSON | Catalog (discover), records (sync), counts, or templates depending on mode |
Usage Examples
from mage_integrations.sources.base import Source, main
class MySource(Source):
def load_data(self, stream, bookmarks=None, query=None, **kwargs):
yield [{"id": 1, "name": "test"}]
def test_connection(self):
# Validate we can reach the data source
pass
if __name__ == "__main__":
main(MySource)
# CLI usage:
# python my_source.py -c config.json -d # Discover mode
# python my_source.py -c config.json --test_connection # Test mode
# python my_source.py -c config.json --catalog catalog.json # Sync mode