Workflow:Mage ai Mage ai Building a New Destination Connector
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, ETL, Connector_Development |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
End-to-end process for creating a new Singer-compatible destination connector (target) within the Mage integrations framework, from class scaffolding through batch export implementation to state handling.
Description
This workflow guides developers through building a new data destination connector that integrates with the Mage ETL framework. Destinations consume Singer-protocol messages (SCHEMA, RECORD, STATE) and load data into target systems. The framework provides two base architectures: the Mage-native Destination base class with batch accumulation and byte-size-based draining, and the Singer SDK Target/Sink architecture with parallel drain operations and stream mapping. For SQL-based targets, the SQLSink base provides SQLAlchemy-powered schema management, table creation, and bulk INSERT/UPSERT operations. The resulting connector will support JSON schema validation, configurable batch processing, upsert operations, and state-based checkpoint recovery.
Usage
Execute this workflow when you need to add support for a new data destination that is not already covered by existing connectors. Common triggers include:
- You need to load data into a database, warehouse, or service that does not have an existing Mage destination connector
- You need custom loading logic beyond what existing connectors provide (e.g., specialized upsert strategies)
- You are contributing a new destination integration to the Mage open-source project
Execution Steps
Step 1: Architecture Selection
Choose between the two available destination architectures based on the target system type and requirements. The Mage-native Destination base provides simpler batch processing, while the Singer SDK Target/Sink architecture provides parallel drain and stream mapping capabilities.
Architecture options:
- Mage Destination base (destinations/base.py): Override export_data() and/or export_batch_data(). Best for simple targets and non-SQL destinations
- Singer SDK Target + Sink (destinations/target.py + sink.py): Implement a Target subclass with get_sink_class() and a Sink subclass with process_record() and process_batch(). Best for complex targets needing parallel processing
- Singer SDK SQLSink (destinations/sqlsink.py): Extends Sink with SQLAlchemy support for SQL targets. Provides automatic table creation, schema evolution, and bulk operations
Key considerations:
- SQL-based destinations should use the SQLSink for maximum code reuse
- API-based destinations typically use the Mage Destination base with export_batch_data()
- The Singer SDK architecture supports parallel drain of multiple sinks (streams)
Step 2: Connector Class Implementation
Implement the destination connector class by extending the chosen base class. This involves implementing the data export methods that write records to the target system.
For Mage Destination base:
- Override export_batch_data(record_data, stream, **kwargs) to write a batch of records
- Optionally override export_data(stream, schema, record, **kwargs) for single-record export
- Implement connection setup and teardown logic
For Singer SDK Target/Sink:
- Create a Target subclass that returns the Sink class via get_sink_class()
- Create a Sink subclass implementing process_record(record, context) and process_batch(context)
- The setup() method handles connection initialization and table preparation
- The clean_up() method handles resource release
For SQLSink targets:
- Override the connector_class property to return a SQLAlchemy connector
- Configure table creation and schema migration behavior
- Implement upsert logic if needed via bulk_insert_records() or merge_upsert_records()
Step 3: Schema Handling and Table Management
Implement logic to translate incoming Singer schemas into target system structures. For SQL targets, this means creating or altering tables. For API targets, this means mapping fields to the target format.
What happens:
- SCHEMA messages trigger the schema registration process
- For SQL targets: DDL statements create tables if they do not exist, or alter them to add new columns
- Data type mapping converts JSON Schema types to target-native types
- Key properties determine primary keys for upsert operations
- The unique_conflict_method setting controls how duplicates are handled (UPDATE, IGNORE, etc.)
Step 4: Batch Processing and Export Logic
Implement the core data export logic that writes accumulated records to the target system. This includes batch optimization, error handling, and retry logic.
Key considerations:
- Batch sizes are configurable: the Mage base uses byte-size limits (default 100 MB), the Singer SDK Sink uses record count (default 10000)
- Implement efficient bulk operations (multi-row INSERT, COPY, bulk API calls) rather than single-record writes
- Handle transient errors with retry logic and proper error reporting
- Track record counts for monitoring (tally_record_read, tally_record_written, tally_duplicate_merged)
- For the Singer SDK architecture, process_batch() is called during drain operations
Step 5: State and Checkpoint Handling
Implement state management to support checkpoint recovery. The destination captures STATE messages from the source and emits them after successful batch processing.
What happens:
- STATE messages are captured during message processing
- After each successful batch drain, the current state is emitted
- State is written to a file (state_file_path) or stdout
- Bookmarks are merged from multiple state messages during batch accumulation
- State persistence enables the source to resume from the last successfully loaded checkpoint
Step 6: Testing and Validation
Test the new destination through the complete target lifecycle: configuration validation, schema processing, record loading, batch draining, and state emission.
Key considerations:
- Verify that SCHEMA messages correctly create or update target structures
- Test batch loading with various batch sizes and record counts
- Test upsert behavior with duplicate records based on key properties
- Verify state emission occurs after successful batch processing
- Test error handling for connection failures, schema mismatches, and invalid data
- Validate that the destination correctly handles all Singer message types