Implementation:Mage ai Mage ai SQLSource Load Data
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, SQL, Pagination |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for paginated SQL data extraction with LIMIT/OFFSET and bookmark filtering provided by the Mage integrations SQL source base class.
Description
SQLSource.load_data is a generator that yields batches of row dictionaries. It delegates to the private __fetch_rows method which constructs a SELECT query with selected columns, optional WHERE clauses from bookmarks and query parameters, ORDER BY on bookmark/key/unique columns, and LIMIT/OFFSET for pagination. For LOG_BASED replication, it delegates to load_data_from_logs instead.
Usage
Called automatically by Source.sync_stream during the sync loop. Override in subclasses only if the database requires custom pagination (e.g., cursor-based instead of offset-based).
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/sql/base.py
- Lines: 174-221 (load_data), 290-428 (__fetch_rows)
Signature
class Source(BaseSource): # SQLSource
def load_data(
self,
stream,
bookmarks: Dict = None,
query: Dict = None,
**kwargs,
) -> Generator[List[Dict], None, None]:
"""Load data from SQL source with pagination.
Args:
stream: CatalogEntry stream object.
bookmarks: Current bookmark values for WHERE filtering.
query: Additional query filters (_limit, _offset).
Yields:
List[Dict]: Batches of row dicts (column_name -> value).
"""
Import
from mage_integrations.sources.sql.base import Source as SQLSource
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream | CatalogEntry | Yes | Stream object with schema, key_properties, bookmark_properties |
| bookmarks | Dict | No | Bookmark values for incremental WHERE clause |
| query | Dict | No | Additional filters; _limit and _offset for custom pagination |
Outputs
| Name | Type | Description |
|---|---|---|
| yield | List[Dict] | Batch of row dicts, up to fetch_limit rows per batch |
Usage Examples
# Called internally by sync_stream:
for rows in source.load_data(
stream=catalog_entry,
bookmarks={"updated_at": "2024-01-01"},
query={},
):
# Each 'rows' is a list of dicts
result = source.write_records(stream, rows)