Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai SQLSource Load Data

From Leeroopedia


Knowledge Sources
Domains Data_Integration, SQL, Pagination
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for paginated SQL data extraction with LIMIT/OFFSET and bookmark filtering provided by the Mage integrations SQL source base class.

Description

SQLSource.load_data is a generator that yields batches of row dictionaries. It delegates to the private __fetch_rows method which constructs a SELECT query with selected columns, optional WHERE clauses from bookmarks and query parameters, ORDER BY on bookmark/key/unique columns, and LIMIT/OFFSET for pagination. For LOG_BASED replication, it delegates to load_data_from_logs instead.

Usage

Called automatically by Source.sync_stream during the sync loop. Override in subclasses only if the database requires custom pagination (e.g., cursor-based instead of offset-based).

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/sql/base.py
  • Lines: 174-221 (load_data), 290-428 (__fetch_rows)

Signature

class Source(BaseSource):  # SQLSource
    def load_data(
        self,
        stream,
        bookmarks: Dict = None,
        query: Dict = None,
        **kwargs,
    ) -> Generator[List[Dict], None, None]:
        """Load data from SQL source with pagination.

        Args:
            stream: CatalogEntry stream object.
            bookmarks: Current bookmark values for WHERE filtering.
            query: Additional query filters (_limit, _offset).

        Yields:
            List[Dict]: Batches of row dicts (column_name -> value).
        """

Import

from mage_integrations.sources.sql.base import Source as SQLSource

I/O Contract

Inputs

Name Type Required Description
stream CatalogEntry Yes Stream object with schema, key_properties, bookmark_properties
bookmarks Dict No Bookmark values for incremental WHERE clause
query Dict No Additional filters; _limit and _offset for custom pagination

Outputs

Name Type Description
yield List[Dict] Batch of row dicts, up to fetch_limit rows per batch

Usage Examples

# Called internally by sync_stream:
for rows in source.load_data(
    stream=catalog_entry,
    bookmarks={"updated_at": "2024-01-01"},
    query={},
):
    # Each 'rows' is a list of dicts
    result = source.write_records(stream, rows)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment