Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Source Write Records

From Leeroopedia


Knowledge Sources
Domains Data_Integration, ETL, Serialization
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for writing Singer RECORD and STATE messages to stdout during data extraction provided by the Mage integrations Source base class.

Description

Source.write_records iterates through a batch of row dictionaries, filters columns to those selected in stream metadata, and calls write_records (from messages.py) to emit each as a RECORD message to stdout. For INCREMENTAL/LOG_BASED streams, it also tracks bookmark values and emits STATE messages: after each record if the data is sorted (is_sorted=True), or accumulates max_bookmark for emission after all records if unsorted.

Usage

Called automatically by Source.sync_stream for each batch yielded by load_data. Do not call directly unless implementing a custom sync loop.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/base.py
  • Lines: 454-515

Signature

class Source:
    def write_records(
        self,
        stream,
        rows: List[Dict],
        properties: Dict = None,
    ) -> Dict:
        """Write RECORD messages and track bookmarks.

        Args:
            stream: CatalogEntry stream object.
            rows: Records to write.
            properties: Optional column override.

        Returns:
            Dict with 'final_record' and 'max_bookmark'.
        """

Import

from mage_integrations.sources.base import Source

I/O Contract

Inputs

Name Type Required Description
stream CatalogEntry Yes Stream with metadata, replication_method, bookmark_properties
rows List[Dict] Yes Batch of record dicts from load_data
properties Dict No Optional column dict override (bypasses metadata selection)

Outputs

Name Type Description
stdout JSON lines RECORD messages (one per row) and STATE messages
return Dict {"final_record": Dict, "max_bookmark": List}

Usage Examples

# Internal usage within sync_stream:
for rows in self.load_data(stream=stream, bookmarks=bookmarks, query=query):
    result = self.write_records(stream, rows)
    # result["final_record"] -> last emitted record
    # result["max_bookmark"] -> max bookmark values seen

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment