Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Source Process

From Leeroopedia


Knowledge Sources
Domains Data_Integration, ETL, Orchestration
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for orchestrating the source connector lifecycle across multiple execution modes provided by the Mage integrations Source base class.

Description

Source.process is the main entry point that dispatches to the appropriate execution mode. It checks CLI flags in priority order: test_connection, load_sample_data, discover_mode, count_records, show_templates, or default sync. For sync mode, it handles auto_add_new_fields by re-discovering streams that have this flag enabled, filters by selected_streams, and calls sync() which iterates through selected streams calling process_stream (SCHEMA emission) and sync_stream (data extraction + RECORD/STATE emission). The companion main function wraps process with Singer's handle_top_exception error handler.

Usage

Called as the entry point via: if __name__ == "__main__": main(MySource)

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/base.py
  • Lines: 190-284 (process), 517-544 (sync), 378-452 (sync_stream), 729-732 (main)

Signature

class Source:
    def process(self) -> None:
        """Main method to fetch data from the source.

        Routes to: test_connection, discover, count_records,
        show_templates, load_sample_data, or sync.
        """

    def sync(self, catalog: Catalog, properties: Dict = None) -> None:
        """Sync data for all selected streams in the catalog."""

    def sync_stream(self, stream, properties: Dict = None) -> int:
        """Sync a single stream: load_data -> write_records -> write_state.

        Returns:
            Number of records synced.
        """

@utils.handle_top_exception(LOGGER)
def main(source_class, **kwargs):
    """Entry point: instantiate source and call process()."""
    source = source_class(**kwargs)
    source.process()

Import

from mage_integrations.sources.base import Source, main

I/O Contract

Inputs

Name Type Required Description
self (initialized Source) Source Yes Source with config, state, catalog, mode flags set

Outputs

Name Type Description
stdout JSON Catalog (discover), records (sync), counts, or templates depending on mode

Usage Examples

from mage_integrations.sources.base import Source, main

class MySource(Source):
    def load_data(self, stream, bookmarks=None, query=None, **kwargs):
        yield [{"id": 1, "name": "test"}]

    def test_connection(self):
        # Validate we can reach the data source
        pass

if __name__ == "__main__":
    main(MySource)

# CLI usage:
# python my_source.py -c config.json -d              # Discover mode
# python my_source.py -c config.json --test_connection # Test mode
# python my_source.py -c config.json --catalog catalog.json  # Sync mode

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment