Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Source Discover

From Leeroopedia


Knowledge Sources
Domains Data_Integration, API, Schema_Management
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for file-based schema discovery that builds Singer catalogs from JSON Schema files provided by the Mage integrations Source base class.

Description

Source.discover loads JSON Schema files from the connector's schemas/ folder via load_schemas_from_folder, then builds a CatalogEntry for each schema using build_catalog_entry. The catalog entry includes stream metadata (key_properties, replication_method, valid_replication_keys) derived from the connector's override methods. Subclasses can override discover() entirely for custom discovery logic (e.g., Chargebee checks the product catalog type to determine available streams).

Usage

Called when the --discover flag is passed. API connector developers should define JSON Schema files in schemas/ and optionally override get_table_key_properties, get_forced_replication_method, and get_valid_replication_keys.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/base.py
  • Lines: 133-148 (discover), 546-606 (build_catalog_entry), 687-705 (load_schemas_from_folder)

Signature

class Source:
    def discover(self, streams: List[str] = None) -> Catalog:
        """Discover streams and build catalog entries from schemas/ folder.

        Args:
            streams: Optional filter for specific stream names.

        Returns:
            Catalog with CatalogEntry per discovered stream.
        """

    def build_catalog_entry(
        self,
        stream_id: str,
        schema,
        bookmark_properties: List[str] = None,
        key_properties: List[str] = None,
        replication_key: str = None,
        replication_method: str = None,
        **kwargs,
    ) -> CatalogEntry:
        """Build a single catalog entry for a stream."""

    def load_schemas_from_folder(self) -> Dict:
        """Load JSON Schema files from schemas/ directory.

        Returns:
            Dict mapping stream_id to Schema object.
        """

Import

from mage_integrations.sources.base import Source

I/O Contract

Inputs

Name Type Required Description
streams List[str] No Optional filter for specific stream names
schemas/ directory JSON files Yes JSON Schema Draft 4 files, one per stream

Outputs

Name Type Description
return Catalog Catalog object with CatalogEntry per stream

Usage Examples

from mage_integrations.sources.base import Source, main

class MyAPISource(Source):
    def get_table_key_properties(self, stream_id):
        return {"users": ["id"], "events": ["event_id"]}.get(stream_id, [])

    def get_forced_replication_method(self, stream_id):
        if stream_id == "events":
            return "INCREMENTAL"
        return "FULL_TABLE"

    def get_valid_replication_keys(self, stream_id):
        return {"events": ["created_at"]}.get(stream_id, [])

    def load_data(self, stream, bookmarks=None, query=None, **kwargs):
        # Implement API extraction here
        yield [{"id": 1, "name": "test"}]

if __name__ == "__main__":
    main(MyAPISource)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment