Implementation:Mage ai Mage ai Destination Export Batch Data
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL, Batch_Processing |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Abstract batch export method that destination connectors must implement to write validated records to their target storage system, provided by the Mage integrations Destination base class.
Description
Destination.export_batch_data is the abstract method that every Mage destination connector must implement. It receives a list of record dicts (each containing 'record', 'stream', and optionally 'schema' and 'tags') and a stream name. The implementing connector writes these records to its target system (database, warehouse, cloud storage, API). The companion process_record_data method calls export_batch_data after validating and preparing all records in the batch.
Usage
Override this method in your destination subclass. It is called automatically by the batch processing loop.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/destinations/base.py
- Lines: 192-193 (abstract), 239-281 (process_record_data caller)
Signature
class Destination(ABC):
def export_batch_data(
self,
record_data: List[Dict],
stream: str,
tags: Dict = None,
) -> None:
"""Export a batch of records to the target.
Args:
record_data: List of dicts, each with keys:
'record' (Dict) - the data record
'stream' (str) - stream name
stream: Stream name.
tags: Optional logging tags.
Raises:
NotImplementedError: Must be implemented by subclasses.
"""
raise NotImplementedError(
'Subclasses must implement the export_batch_data method.'
)
Import
from mage_integrations.destinations.base import Destination
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| record_data | List[Dict] | Yes | List of {"record": Dict, "stream": str} entries |
| stream | str | Yes | Stream name for routing to correct table/target |
| tags | Dict | No | Logging tags for observability |
Outputs
| Name | Type | Description |
|---|---|---|
| Side effect | None | Records written to target storage system |
Usage Examples
from mage_integrations.destinations.base import Destination
class PostgresDestination(Destination):
def test_connection(self):
self.connection = psycopg2.connect(**self.config)
def export_batch_data(self, record_data, stream, tags=None):
"""Write batch of records to PostgreSQL."""
records = [rd["record"] for rd in record_data]
columns = records[0].keys()
# Build INSERT statement
placeholders = ", ".join(["%s"] * len(columns))
cols = ", ".join(columns)
sql = f"INSERT INTO {stream} ({cols}) VALUES ({placeholders})"
# Execute batch insert
with self.connection.cursor() as cursor:
for record in records:
cursor.execute(sql, tuple(record[c] for c in columns))
self.connection.commit()
Related Pages
Implements Principle
Requires Environment
- Environment:Mage_ai_Mage_ai_Python_3_9_Runtime
- Environment:Mage_ai_Mage_ai_Singer_SDK_And_Joblib_Runtime