Implementation:Mage ai Mage ai Source Discover
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, API, Schema_Management |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for file-based schema discovery that builds Singer catalogs from JSON Schema files provided by the Mage integrations Source base class.
Description
Source.discover loads JSON Schema files from the connector's schemas/ folder via load_schemas_from_folder, then builds a CatalogEntry for each schema using build_catalog_entry. The catalog entry includes stream metadata (key_properties, replication_method, valid_replication_keys) derived from the connector's override methods. Subclasses can override discover() entirely for custom discovery logic (e.g., Chargebee checks the product catalog type to determine available streams).
Usage
Called when the --discover flag is passed. API connector developers should define JSON Schema files in schemas/ and optionally override get_table_key_properties, get_forced_replication_method, and get_valid_replication_keys.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/base.py
- Lines: 133-148 (discover), 546-606 (build_catalog_entry), 687-705 (load_schemas_from_folder)
Signature
class Source:
def discover(self, streams: List[str] = None) -> Catalog:
"""Discover streams and build catalog entries from schemas/ folder.
Args:
streams: Optional filter for specific stream names.
Returns:
Catalog with CatalogEntry per discovered stream.
"""
def build_catalog_entry(
self,
stream_id: str,
schema,
bookmark_properties: List[str] = None,
key_properties: List[str] = None,
replication_key: str = None,
replication_method: str = None,
**kwargs,
) -> CatalogEntry:
"""Build a single catalog entry for a stream."""
def load_schemas_from_folder(self) -> Dict:
"""Load JSON Schema files from schemas/ directory.
Returns:
Dict mapping stream_id to Schema object.
"""
Import
from mage_integrations.sources.base import Source
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| streams | List[str] | No | Optional filter for specific stream names |
| schemas/ directory | JSON files | Yes | JSON Schema Draft 4 files, one per stream |
Outputs
| Name | Type | Description |
|---|---|---|
| return | Catalog | Catalog object with CatalogEntry per stream |
Usage Examples
from mage_integrations.sources.base import Source, main
class MyAPISource(Source):
def get_table_key_properties(self, stream_id):
return {"users": ["id"], "events": ["event_id"]}.get(stream_id, [])
def get_forced_replication_method(self, stream_id):
if stream_id == "events":
return "INCREMENTAL"
return "FULL_TABLE"
def get_valid_replication_keys(self, stream_id):
return {"events": ["created_at"]}.get(stream_id, [])
def load_data(self, stream, bookmarks=None, query=None, **kwargs):
# Implement API extraction here
yield [{"id": 1, "name": "test"}]
if __name__ == "__main__":
main(MyAPISource)