Implementation:Mage ai Mage ai Destination Process Record
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Data_Quality, Batch_Processing |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for validating, preparing, and batch-accumulating Singer RECORD messages provided by the Mage integrations Destination base class.
Description
Destination.process_record validates and prepares a single record via __validate_and_prepare_record (which calls __prepare_record for type parsing and then runs Draft4Validator), then exports it immediately via export_data. In batch mode, _process accumulates records in batches_by_stream and calls process_record_data to batch-export via export_batch_data.
Usage
Called automatically by _process for each RECORD message (immediate mode) or accumulated in batch mode. Subclasses do not override this method; they override export_batch_data instead.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/destinations/base.py
- Lines: 213-237 (process_record), 239-281 (process_record_data), 706-732 (__validate_and_prepare_record), 609-689 (__prepare_record)
Signature
class Destination(ABC):
def process_record(
self,
stream: str,
schema: Dict,
row: Dict,
tags: Dict = None,
) -> None:
"""Validate and export a single record (immediate mode)."""
def process_record_data(
self,
record_data: List[Dict],
stream: str,
tags: Dict = None,
) -> None:
"""Validate and export a batch of records."""
Import
from mage_integrations.destinations.base import Destination
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| stream | str | Yes | Stream name |
| schema | Dict | Yes | Registered JSON Schema |
| row | Dict | Yes | RECORD message with 'record' key |
Outputs
| Name | Type | Description |
|---|---|---|
| Side effect | None | Calls export_data (immediate) or export_batch_data (batch) |
Usage Examples
# In immediate mode, each RECORD triggers export:
destination.process_record(
stream="users",
schema={"properties": {"id": {"type": ["integer"]}, "name": {"type": ["string"]}}},
row={"record": {"id": 1, "name": "Alice"}},
)
# Internally calls: export_data -> export_batch_data with single record
# In batch mode, records accumulate:
# batches_by_stream["users"]["record_data"].append(record_data)
# Flushed when byte size > maximum_batch_size_mb
Related Pages
Implements Principle
Requires Environment
- Environment:Mage_ai_Mage_ai_Python_3_9_Runtime
- Environment:Mage_ai_Mage_ai_Singer_SDK_And_Joblib_Runtime