Implementation:Mage ai Mage ai Source Write Records
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL, Serialization |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for writing Singer RECORD and STATE messages to stdout during data extraction provided by the Mage integrations Source base class.
Description
Source.write_records iterates through a batch of row dictionaries, filters columns to those selected in stream metadata, and calls write_records (from messages.py) to emit each as a RECORD message to stdout. For INCREMENTAL/LOG_BASED streams, it also tracks bookmark values and emits STATE messages: after each record if the data is sorted (is_sorted=True), or accumulates max_bookmark for emission after all records if unsorted.
Usage
Called automatically by Source.sync_stream for each batch yielded by load_data. Do not call directly unless implementing a custom sync loop.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/base.py
- Lines: 454-515
Signature
class Source:
def write_records(
self,
stream,
rows: List[Dict],
properties: Dict = None,
) -> Dict:
"""Write RECORD messages and track bookmarks.
Args:
stream: CatalogEntry stream object.
rows: Records to write.
properties: Optional column override.
Returns:
Dict with 'final_record' and 'max_bookmark'.
"""
Import
from mage_integrations.sources.base import Source
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream | CatalogEntry | Yes | Stream with metadata, replication_method, bookmark_properties |
| rows | List[Dict] | Yes | Batch of record dicts from load_data |
| properties | Dict | No | Optional column dict override (bypasses metadata selection) |
Outputs
| Name | Type | Description |
|---|---|---|
| stdout | JSON lines | RECORD messages (one per row) and STATE messages |
| return | Dict | {"final_record": Dict, "max_bookmark": List} |
Usage Examples
# Internal usage within sync_stream:
for rows in self.load_data(stream=stream, bookmarks=bookmarks, query=query):
result = self.write_records(stream, rows)
# result["final_record"] -> last emitted record
# result["max_bookmark"] -> max bookmark values seen