Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Destination Process Record

From Leeroopedia
Revision as of 15:36, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Mage_ai_Mage_ai_Destination_Process_Record.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Integration, Data_Quality, Batch_Processing
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for validating, preparing, and batch-accumulating Singer RECORD messages provided by the Mage integrations Destination base class.

Description

Destination.process_record validates and prepares a single record via __validate_and_prepare_record (which calls __prepare_record for type parsing and then runs Draft4Validator), then exports it immediately via export_data. In batch mode, _process accumulates records in batches_by_stream and calls process_record_data to batch-export via export_batch_data.

Usage

Called automatically by _process for each RECORD message (immediate mode) or accumulated in batch mode. Subclasses do not override this method; they override export_batch_data instead.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/destinations/base.py
  • Lines: 213-237 (process_record), 239-281 (process_record_data), 706-732 (__validate_and_prepare_record), 609-689 (__prepare_record)

Signature

class Destination(ABC):
    def process_record(
        self,
        stream: str,
        schema: Dict,
        row: Dict,
        tags: Dict = None,
    ) -> None:
        """Validate and export a single record (immediate mode)."""

    def process_record_data(
        self,
        record_data: List[Dict],
        stream: str,
        tags: Dict = None,
    ) -> None:
        """Validate and export a batch of records."""

Import

from mage_integrations.destinations.base import Destination

I/O Contract

Inputs

Name Type Required Description
stream str Yes Stream name
schema Dict Yes Registered JSON Schema
row Dict Yes RECORD message with 'record' key

Outputs

Name Type Description
Side effect None Calls export_data (immediate) or export_batch_data (batch)

Usage Examples

# In immediate mode, each RECORD triggers export:
destination.process_record(
    stream="users",
    schema={"properties": {"id": {"type": ["integer"]}, "name": {"type": ["string"]}}},
    row={"record": {"id": 1, "name": "Alice"}},
)
# Internally calls: export_data -> export_batch_data with single record

# In batch mode, records accumulate:
# batches_by_stream["users"]["record_data"].append(record_data)
# Flushed when byte size > maximum_batch_size_mb

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment