Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai SQLSource Discover

From Leeroopedia


Knowledge Sources
Domains Data_Integration, SQL, Schema_Management
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for automatic SQL database schema discovery via information_schema queries provided by the Mage integrations SQL source base class.

Description

SQLSource.discover queries the database's information_schema.columns to build a Singer Catalog. It maps SQL column types to JSON Schema types, identifies primary keys and unique constraints from COLUMN_KEY, and attaches standard Singer metadata. The companion method build_discover_query constructs the SQL query filtered by schema name and optionally by table names.

Usage

Import this when building SQL-based source connectors. Called automatically by Source.process() when the --discover flag is set. Override build_discover_query in subclasses to customize the introspection query for specific databases.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/sql/base.py
  • Lines: 57-151 (discover), 235-250 (build_discover_query)

Signature

class Source(BaseSource):  # SQLSource
    def discover(self, streams: List[str] = None) -> Catalog:
        """Discover streams via information_schema and build catalog.

        Args:
            streams: Optional filter for specific table names.

        Returns:
            Catalog with CatalogEntry for each discovered table.
        """

    def build_discover_query(self, schema: str, streams: List[str] = None) -> str:
        """Build SQL query for information_schema.columns.

        Args:
            schema: Database schema name.
            streams: Optional table name filter.

        Returns:
            SQL query string.
        """

Import

from mage_integrations.sources.sql.base import Source as SQLSource

I/O Contract

Inputs

Name Type Required Description
streams List[str] No Optional filter for specific table/stream names
self.build_connection() Connection Yes Database connection with .load(query) method

Outputs

Name Type Description
return Catalog Catalog object with CatalogEntry per table
CatalogEntry.schema Schema JSON Schema with properties typed from SQL column types
CatalogEntry.key_properties List[str] Columns with PRI or UNIQUE COLUMN_KEY
CatalogEntry.metadata List[Dict] Standard Singer metadata with inclusion flags

Usage Examples

Basic Discovery

from mage_integrations.sources.sql.base import Source as SQLSource

class PostgresSource(SQLSource):
    def build_connection(self):
        return PostgresConnection(self.config)

    def build_discover_query(self, schema="public", streams=None):
        # Override for PostgreSQL-specific query
        return super().build_discover_query(schema, streams)

# Run discovery
source = PostgresSource()
catalog = source.discover()  # Queries information_schema.columns
for entry in catalog.streams:
    print(entry.tap_stream_id, entry.schema.to_dict())

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment