Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Destination Process

From Leeroopedia


Knowledge Sources
Domains Data_Integration, ETL, Stream_Processing
Last Updated 2026-02-09 00:00 GMT

Overview

Concrete tool for processing Singer JSON line messages from stdin with batch accumulation and size-based flushing provided by the Mage integrations Destination base class.

Description

Destination._process reads lines from stdin (via __text_input generator), parses each as JSON, routes by message type to handlers (process_schema, process_record, process_state), and in batch mode accumulates records per-stream in batches_by_stream dict. When accumulated byte size exceeds maximum_batch_size_mb (default 100MB from config), it flushes via __process_batch_set which calls process_record_data -> export_batch_data for each stream. The orchestrator Destination.process wraps _process with before_process/after_process hooks and test_connection/show_templates modes.

Usage

Called by Destination.process(input_buffer). Not called directly by subclasses.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/destinations/base.py
  • Lines: 335-371 (process), 376-521 (_process), 523-596 (__process_batch_set)

Signature

class Destination(ABC):
    def process(self, input_buffer) -> None:
        """Main entry point. Orchestrates test_connection, templates, or _process."""

    def _process(self, input_buffer) -> None:
        """Read Singer messages from input, route to handlers, manage batches."""

Import

from mage_integrations.destinations.base import Destination

I/O Contract

Inputs

Name Type Required Description
input_buffer IO Yes stdin buffer or file input with Singer JSON lines

Outputs

Name Type Description
Side effects None Calls export_batch_data for each stream batch, emits state

Usage Examples

import sys
from mage_integrations.destinations.base import Destination

class MyDestination(Destination):
    def test_connection(self):
        pass

    def export_batch_data(self, record_data, stream, tags=None):
        for record in record_data:
            print(f"Loading record to {stream}: {record['record']}")

dest = MyDestination(config={"host": "localhost"}, batch_processing=True)
dest.process(sys.stdin.buffer)

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment