Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Destination Export Batch Data

From Leeroopedia


Knowledge Sources
Domains Data_Integration, ETL, Batch_Processing
Last Updated 2026-02-09 00:00 GMT

Overview

Abstract batch export method that destination connectors must implement to write validated records to their target storage system, provided by the Mage integrations Destination base class.

Description

Destination.export_batch_data is the abstract method that every Mage destination connector must implement. It receives a list of record dicts (each containing 'record', 'stream', and optionally 'schema' and 'tags') and a stream name. The implementing connector writes these records to its target system (database, warehouse, cloud storage, API). The companion process_record_data method calls export_batch_data after validating and preparing all records in the batch.

Usage

Override this method in your destination subclass. It is called automatically by the batch processing loop.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/destinations/base.py
  • Lines: 192-193 (abstract), 239-281 (process_record_data caller)

Signature

class Destination(ABC):
    def export_batch_data(
        self,
        record_data: List[Dict],
        stream: str,
        tags: Dict = None,
    ) -> None:
        """Export a batch of records to the target.

        Args:
            record_data: List of dicts, each with keys:
                'record' (Dict) - the data record
                'stream' (str) - stream name
            stream: Stream name.
            tags: Optional logging tags.

        Raises:
            NotImplementedError: Must be implemented by subclasses.
        """
        raise NotImplementedError(
            'Subclasses must implement the export_batch_data method.'
        )

Import

from mage_integrations.destinations.base import Destination

I/O Contract

Inputs

Name Type Required Description
record_data List[Dict] Yes List of {"record": Dict, "stream": str} entries
stream str Yes Stream name for routing to correct table/target
tags Dict No Logging tags for observability

Outputs

Name Type Description
Side effect None Records written to target storage system

Usage Examples

from mage_integrations.destinations.base import Destination

class PostgresDestination(Destination):
    def test_connection(self):
        self.connection = psycopg2.connect(**self.config)

    def export_batch_data(self, record_data, stream, tags=None):
        """Write batch of records to PostgreSQL."""
        records = [rd["record"] for rd in record_data]
        columns = records[0].keys()

        # Build INSERT statement
        placeholders = ", ".join(["%s"] * len(columns))
        cols = ", ".join(columns)
        sql = f"INSERT INTO {stream} ({cols}) VALUES ({placeholders})"

        # Execute batch insert
        with self.connection.cursor() as cursor:
            for record in records:
                cursor.execute(sql, tuple(record[c] for c in columns))
        self.connection.commit()

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment