Principle:Mage ai Mage ai Singer Message Ingestion
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, ETL, Stream_Processing |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
A stdin-based message processing loop that reads Singer JSON lines, routes them to type-specific handlers, and manages batch accumulation with size-based flushing.
Description
Singer Message Ingestion is the core event loop of a destination connector. It reads JSON lines from stdin (or a file), parses each line to determine its message type (SCHEMA, RECORD, STATE, ACTIVATE_VERSION, LOG), and routes to the appropriate handler. In batch processing mode, RECORD messages are accumulated per-stream in memory until the total byte size exceeds a configurable threshold (default 100MB), at which point the accumulated batch is flushed to the export layer.
Usage
This principle is the main processing loop for all destination connectors. It is triggered by Destination.process() and runs until all input is consumed.
Theoretical Basis
The message routing state machine:
- SCHEMA -> process_schema(): Register stream schema and create validator
- RECORD -> process_record() (immediate) or batch accumulation (batch mode)
- STATE -> process_state() (immediate) or batch accumulation (batch mode)
- ACTIVATE_VERSION -> Store version number for the stream
- LOG -> Forward to logger at the specified level
Batch flush trigger: current_byte_size >= maximum_batch_size_mb * 1024 * 1024