Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Mage ai Mage ai Building a New Destination Connector

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, ETL, Connector_Development
Last Updated 2026-02-09 07:00 GMT

Overview

End-to-end process for creating a new Singer-compatible destination connector (target) within the Mage integrations framework, from class scaffolding through batch export implementation to state handling.

Description

This workflow guides developers through building a new data destination connector that integrates with the Mage ETL framework. Destinations consume Singer-protocol messages (SCHEMA, RECORD, STATE) and load data into target systems. The framework provides two base architectures: the Mage-native Destination base class with batch accumulation and byte-size-based draining, and the Singer SDK Target/Sink architecture with parallel drain operations and stream mapping. For SQL-based targets, the SQLSink base provides SQLAlchemy-powered schema management, table creation, and bulk INSERT/UPSERT operations. The resulting connector will support JSON schema validation, configurable batch processing, upsert operations, and state-based checkpoint recovery.

Usage

Execute this workflow when you need to add support for a new data destination that is not already covered by existing connectors. Common triggers include:

  • You need to load data into a database, warehouse, or service that does not have an existing Mage destination connector
  • You need custom loading logic beyond what existing connectors provide (e.g., specialized upsert strategies)
  • You are contributing a new destination integration to the Mage open-source project

Execution Steps

Step 1: Architecture Selection

Choose between the two available destination architectures based on the target system type and requirements. The Mage-native Destination base provides simpler batch processing, while the Singer SDK Target/Sink architecture provides parallel drain and stream mapping capabilities.

Architecture options:

  • Mage Destination base (destinations/base.py): Override export_data() and/or export_batch_data(). Best for simple targets and non-SQL destinations
  • Singer SDK Target + Sink (destinations/target.py + sink.py): Implement a Target subclass with get_sink_class() and a Sink subclass with process_record() and process_batch(). Best for complex targets needing parallel processing
  • Singer SDK SQLSink (destinations/sqlsink.py): Extends Sink with SQLAlchemy support for SQL targets. Provides automatic table creation, schema evolution, and bulk operations

Key considerations:

  • SQL-based destinations should use the SQLSink for maximum code reuse
  • API-based destinations typically use the Mage Destination base with export_batch_data()
  • The Singer SDK architecture supports parallel drain of multiple sinks (streams)

Step 2: Connector Class Implementation

Implement the destination connector class by extending the chosen base class. This involves implementing the data export methods that write records to the target system.

For Mage Destination base:

  • Override export_batch_data(record_data, stream, **kwargs) to write a batch of records
  • Optionally override export_data(stream, schema, record, **kwargs) for single-record export
  • Implement connection setup and teardown logic

For Singer SDK Target/Sink:

  • Create a Target subclass that returns the Sink class via get_sink_class()
  • Create a Sink subclass implementing process_record(record, context) and process_batch(context)
  • The setup() method handles connection initialization and table preparation
  • The clean_up() method handles resource release

For SQLSink targets:

  • Override the connector_class property to return a SQLAlchemy connector
  • Configure table creation and schema migration behavior
  • Implement upsert logic if needed via bulk_insert_records() or merge_upsert_records()

Step 3: Schema Handling and Table Management

Implement logic to translate incoming Singer schemas into target system structures. For SQL targets, this means creating or altering tables. For API targets, this means mapping fields to the target format.

What happens:

  • SCHEMA messages trigger the schema registration process
  • For SQL targets: DDL statements create tables if they do not exist, or alter them to add new columns
  • Data type mapping converts JSON Schema types to target-native types
  • Key properties determine primary keys for upsert operations
  • The unique_conflict_method setting controls how duplicates are handled (UPDATE, IGNORE, etc.)

Step 4: Batch Processing and Export Logic

Implement the core data export logic that writes accumulated records to the target system. This includes batch optimization, error handling, and retry logic.

Key considerations:

  • Batch sizes are configurable: the Mage base uses byte-size limits (default 100 MB), the Singer SDK Sink uses record count (default 10000)
  • Implement efficient bulk operations (multi-row INSERT, COPY, bulk API calls) rather than single-record writes
  • Handle transient errors with retry logic and proper error reporting
  • Track record counts for monitoring (tally_record_read, tally_record_written, tally_duplicate_merged)
  • For the Singer SDK architecture, process_batch() is called during drain operations

Step 5: State and Checkpoint Handling

Implement state management to support checkpoint recovery. The destination captures STATE messages from the source and emits them after successful batch processing.

What happens:

  • STATE messages are captured during message processing
  • After each successful batch drain, the current state is emitted
  • State is written to a file (state_file_path) or stdout
  • Bookmarks are merged from multiple state messages during batch accumulation
  • State persistence enables the source to resume from the last successfully loaded checkpoint

Step 6: Testing and Validation

Test the new destination through the complete target lifecycle: configuration validation, schema processing, record loading, batch draining, and state emission.

Key considerations:

  • Verify that SCHEMA messages correctly create or update target structures
  • Test batch loading with various batch sizes and record counts
  • Test upsert behavior with duplicate records based on key properties
  • Verify state emission occurs after successful batch processing
  • Test error handling for connection failures, schema mismatches, and invalid data
  • Validate that the destination correctly handles all Singer message types

Execution Diagram

GitHub URL

Workflow Repository