Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Mage ai Mage ai Singer Message Ingestion

From Leeroopedia


Knowledge Sources
Domains Data_Integration, ETL, Stream_Processing
Last Updated 2026-02-09 00:00 GMT

Overview

A stdin-based message processing loop that reads Singer JSON lines, routes them to type-specific handlers, and manages batch accumulation with size-based flushing.

Description

Singer Message Ingestion is the core event loop of a destination connector. It reads JSON lines from stdin (or a file), parses each line to determine its message type (SCHEMA, RECORD, STATE, ACTIVATE_VERSION, LOG), and routes to the appropriate handler. In batch processing mode, RECORD messages are accumulated per-stream in memory until the total byte size exceeds a configurable threshold (default 100MB), at which point the accumulated batch is flushed to the export layer.

Usage

This principle is the main processing loop for all destination connectors. It is triggered by Destination.process() and runs until all input is consumed.

Theoretical Basis

The message routing state machine:

  • SCHEMA -> process_schema(): Register stream schema and create validator
  • RECORD -> process_record() (immediate) or batch accumulation (batch mode)
  • STATE -> process_state() (immediate) or batch accumulation (batch mode)
  • ACTIVATE_VERSION -> Store version number for the stream
  • LOG -> Forward to logger at the specified level

Batch flush trigger: current_byte_size >= maximum_batch_size_mb * 1024 * 1024

Related Pages

Implemented By

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment