Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Mage ai Mage ai Target Drain All

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Parallel_Processing, Performance
Last Updated 2026-02-09 00:00 GMT

Overview

Wrapper documentation for the Singer SDK Target.drain_all method that orchestrates parallel sink draining with joblib threading as used in the Mage integrations framework.

Description

Target.drain_all is a Wrapper Doc for the Singer SDK's Target class as customized in the Mage integrations framework. It drains all sinks (cleared sinks sequentially, active sinks in parallel via joblib threading with max_parallelism=8), calls clean_up on end-of-pipe, and writes state. The companion drain_one handles individual sink draining: start_drain() -> process_batch(context) -> mark_drained(). The private _drain_all dispatches to sequential or parallel execution based on parallelism setting.

Usage

Called automatically by the Target message processing loop when sinks are full, end of input is reached, or max record age is exceeded.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/destinations/target.py
  • Lines: 446-468 (drain_all), 471-484 (drain_one), 486-496 (_drain_all)

Signature

class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta):
    @final
    def drain_all(self, *, is_endofpipe: bool = False) -> None:
        """Drain all sinks, cleared first then active in parallel.

        Args:
            is_endofpipe: True when all stdin input has been read.
        """

    @final
    def drain_one(self, sink: Sink) -> None:
        """Drain a specific sink.

        Args:
            sink: Sink to be drained.
        """

    def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None:
        """Drain sink list with given parallelism. Uses joblib threading."""

Import

from mage_integrations.destinations.target import Target

I/O Contract

Inputs

Name Type Required Description
is_endofpipe bool No True when all input has been read (triggers cleanup)
self._sinks_active dict[str, Sink] Yes Active sinks keyed by stream name
self._sinks_to_clear list[Sink] Yes Sinks cleared due to schema changes

Outputs

Name Type Description
Side effects None Sinks drained (process_batch called), state written, cleanup on endofpipe

Usage Examples

from mage_integrations.destinations.target import Target
from mage_integrations.destinations.sink import BatchSink

class MyBatchSink(BatchSink):
    def process_batch(self, context):
        records = context.get("records", [])
        # Write records to target
        print(f"Writing {len(records)} records for {self.stream_name}")

class MyTarget(Target):
    default_sink_class = MyBatchSink

# Target automatically creates sinks per stream and drains them
# When a sink reaches max_size (10000 records), drain_one is called
# At end-of-pipe, drain_all drains all remaining sinks in parallel

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment