Implementation:Mage ai Mage ai Destination Process
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL, Stream_Processing |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for processing Singer JSON line messages from stdin with batch accumulation and size-based flushing provided by the Mage integrations Destination base class.
Description
Destination._process reads lines from stdin (via __text_input generator), parses each as JSON, routes by message type to handlers (process_schema, process_record, process_state), and in batch mode accumulates records per-stream in batches_by_stream dict. When accumulated byte size exceeds maximum_batch_size_mb (default 100MB from config), it flushes via __process_batch_set which calls process_record_data -> export_batch_data for each stream. The orchestrator Destination.process wraps _process with before_process/after_process hooks and test_connection/show_templates modes.
Usage
Called by Destination.process(input_buffer). Not called directly by subclasses.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/destinations/base.py
- Lines: 335-371 (process), 376-521 (_process), 523-596 (__process_batch_set)
Signature
class Destination(ABC):
def process(self, input_buffer) -> None:
"""Main entry point. Orchestrates test_connection, templates, or _process."""
def _process(self, input_buffer) -> None:
"""Read Singer messages from input, route to handlers, manage batches."""
Import
from mage_integrations.destinations.base import Destination
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| input_buffer | IO | Yes | stdin buffer or file input with Singer JSON lines |
Outputs
| Name | Type | Description |
|---|---|---|
| Side effects | None | Calls export_batch_data for each stream batch, emits state |
Usage Examples
import sys
from mage_integrations.destinations.base import Destination
class MyDestination(Destination):
def test_connection(self):
pass
def export_batch_data(self, record_data, stream, tags=None):
for record in record_data:
print(f"Loading record to {stream}: {record['record']}")
dest = MyDestination(config={"host": "localhost"}, batch_processing=True)
dest.process(sys.stdin.buffer)
Related Pages
Implements Principle
Requires Environment
- Environment:Mage_ai_Mage_ai_Python_3_9_Runtime
- Environment:Mage_ai_Mage_ai_Singer_SDK_And_Joblib_Runtime