Implementation:Mage ai Mage ai SQLSource Discover
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, SQL, Schema_Management |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for automatic SQL database schema discovery via information_schema queries provided by the Mage integrations SQL source base class.
Description
SQLSource.discover queries the database's information_schema.columns to build a Singer Catalog. It maps SQL column types to JSON Schema types, identifies primary keys and unique constraints from COLUMN_KEY, and attaches standard Singer metadata. The companion method build_discover_query constructs the SQL query filtered by schema name and optionally by table names.
Usage
Import this when building SQL-based source connectors. Called automatically by Source.process() when the --discover flag is set. Override build_discover_query in subclasses to customize the introspection query for specific databases.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/sql/base.py
- Lines: 57-151 (discover), 235-250 (build_discover_query)
Signature
class Source(BaseSource): # SQLSource
def discover(self, streams: List[str] = None) -> Catalog:
"""Discover streams via information_schema and build catalog.
Args:
streams: Optional filter for specific table names.
Returns:
Catalog with CatalogEntry for each discovered table.
"""
def build_discover_query(self, schema: str, streams: List[str] = None) -> str:
"""Build SQL query for information_schema.columns.
Args:
schema: Database schema name.
streams: Optional table name filter.
Returns:
SQL query string.
"""
Import
from mage_integrations.sources.sql.base import Source as SQLSource
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| streams | List[str] | No | Optional filter for specific table/stream names |
| self.build_connection() | Connection | Yes | Database connection with .load(query) method |
Outputs
| Name | Type | Description |
|---|---|---|
| return | Catalog | Catalog object with CatalogEntry per table |
| CatalogEntry.schema | Schema | JSON Schema with properties typed from SQL column types |
| CatalogEntry.key_properties | List[str] | Columns with PRI or UNIQUE COLUMN_KEY |
| CatalogEntry.metadata | List[Dict] | Standard Singer metadata with inclusion flags |
Usage Examples
Basic Discovery
from mage_integrations.sources.sql.base import Source as SQLSource
class PostgresSource(SQLSource):
def build_connection(self):
return PostgresConnection(self.config)
def build_discover_query(self, schema="public", streams=None):
# Override for PostgreSQL-specific query
return super().build_discover_query(schema, streams)
# Run discovery
source = PostgresSource()
catalog = source.discover() # Queries information_schema.columns
for entry in catalog.streams:
print(entry.tap_stream_id, entry.schema.to_dict())