Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Mage ai Mage ai Destination Data Loading

From Leeroopedia
Revision as of 11:05, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Mage_ai_Mage_ai_Destination_Data_Loading.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Engineering, ETL, Data_Loading
Last Updated 2026-02-09 07:00 GMT

Overview

End-to-end process for loading data into target systems (databases, data warehouses, cloud storage) by consuming Singer-protocol messages with batch processing, schema validation, and state management.

Description

This workflow covers the complete lifecycle of a Singer-based destination (target) connector in the Mage integrations framework. Destinations consume Singer-protocol messages (SCHEMA, RECORD, STATE) from a source tap and load the data into a target system. The framework provides two destination architectures: a Mage-native Destination base class with batch accumulation and byte-size limits, and a Singer SDK Target/Sink architecture with parallel drain operations and stream mapping. Both support JSON schema validation, configurable batch sizes, upsert operations, and state persistence for checkpoint recovery.

Usage

Execute this workflow when you need to load extracted data into a target system. Common triggers include:

  • You have Singer-formatted data (from a source tap) that needs to be loaded into a database or warehouse
  • You need batch-optimized loading with configurable batch sizes for performance
  • You need schema validation to ensure data quality before loading
  • You need upsert capability to handle duplicate records based on key properties
  • You need state persistence to enable checkpoint recovery for long-running loads

Execution Steps

Step 1: Target Configuration

Prepare the destination connector configuration including target system credentials, connection parameters, and processing options. Configuration includes batch processing settings, schema handling preferences, and target-specific parameters.

Key considerations:

  • Configuration is loaded from a JSON file or YAML settings dictionary
  • Batch processing can be enabled with configurable maximum batch size (default 100 MB)
  • The target validates its connection before beginning to process messages
  • Target-specific settings include schema names, table prefixes, and conflict resolution strategies

Step 2: Singer Message Ingestion

Read Singer-protocol messages from standard input or a file buffer. Each line is a JSON object with a type field (SCHEMA, RECORD, STATE, ACTIVATE_VERSION) that determines how the message is processed.

What happens:

  • The destination reads JSONL (newline-delimited JSON) from stdin or an input buffer
  • Each line is parsed and dispatched based on its message type
  • The message processing loop handles messages in the order they arrive
  • The system tracks the current schema, validators, and key properties per stream

Step 3: Schema Registration and Validation Setup

Process SCHEMA messages to register stream schemas, build JSON validators, and prepare the target for incoming records. This establishes the data contract between the source and destination.

What happens:

  • SCHEMA messages contain the stream name, JSON Schema definition, key properties, and bookmark properties
  • A Draft4Validator is constructed from the schema for record validation
  • Key properties (primary keys) and bookmark properties (replication keys) are stored per stream
  • The replication method is recorded to determine loading strategy (append, upsert, or replace)
  • For SQL targets, this step may trigger table creation or schema migration

Step 4: Record Validation and Batch Accumulation

Process RECORD messages by validating each record against the registered schema, preparing it for loading, and accumulating records into batches for efficient bulk operations.

What happens:

  • Each record is validated against the stream's JSON Schema using the Draft4Validator
  • Records are prepared by extracting selected columns and parsing JSON and array string values
  • In batch mode, validated records are accumulated in memory organized by stream
  • Batch byte size is tracked; when the configured maximum is reached, the batch is drained
  • In non-batch mode, each record is immediately exported to the target system
  • Type coercion is applied for mismatched types (string to JSON, string to array)

Step 5: Batch Export to Target

Drain accumulated record batches to the target system. The export method handles the actual data writing, which varies by target type (SQL INSERT/UPSERT, API calls, file writes).

What happens:

  • When a batch reaches the size limit or the stream ends, export is triggered
  • For the Mage Destination base: export_batch_data() is called with all accumulated records per stream
  • For the Singer SDK Target: sink.process_batch() is called with parallel drain support (up to 8 concurrent sinks)
  • SQL sinks use SQLAlchemy for schema-aware INSERT or UPSERT operations
  • Batch numbers are tracked for monitoring and logging
  • Failed records may trigger retry logic depending on the target implementation

Step 6: State Management and Checkpointing

Process STATE messages to track sync progress and enable checkpoint recovery. State is emitted to a file or stdout after each successfully processed batch, allowing the source to resume from the last checkpoint.

What happens:

  • STATE messages contain bookmark values that mark the source's extraction progress
  • The destination captures state messages and merges bookmarks across multiple state emissions
  • After each successful batch drain, the current state is emitted to a state file or stdout
  • State persistence enables resumable loads: if a failure occurs, the next run starts from the last checkpoint
  • The drained state is tracked separately from the latest state to prevent premature advancement

Step 7: Cleanup and Finalization

After all messages are processed, drain any remaining records, emit the final state, and clean up resources. This ensures all data is written and the sync state is fully persisted.

Key considerations:

  • A final batch drain processes any remaining accumulated records
  • The final state message is emitted to capture the complete sync progress
  • Sink cleanup operations close database connections, flush buffers, and release resources
  • For the Singer SDK Target, clean_up() is called on all active sinks
  • The process exits with success status if all records were loaded without fatal errors

Execution Diagram

GitHub URL

Workflow Repository