Implementation:Mage ai Mage ai Target Drain All
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Parallel_Processing, Performance |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Wrapper documentation for the Singer SDK Target.drain_all method that orchestrates parallel sink draining with joblib threading as used in the Mage integrations framework.
Description
Target.drain_all is a Wrapper Doc for the Singer SDK's Target class as customized in the Mage integrations framework. It drains all sinks (cleared sinks sequentially, active sinks in parallel via joblib threading with max_parallelism=8), calls clean_up on end-of-pipe, and writes state. The companion drain_one handles individual sink draining: start_drain() -> process_batch(context) -> mark_drained(). The private _drain_all dispatches to sequential or parallel execution based on parallelism setting.
Usage
Called automatically by the Target message processing loop when sinks are full, end of input is reached, or max record age is exceeded.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/destinations/target.py
- Lines: 446-468 (drain_all), 471-484 (drain_one), 486-496 (_drain_all)
Signature
class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta):
@final
def drain_all(self, *, is_endofpipe: bool = False) -> None:
"""Drain all sinks, cleared first then active in parallel.
Args:
is_endofpipe: True when all stdin input has been read.
"""
@final
def drain_one(self, sink: Sink) -> None:
"""Drain a specific sink.
Args:
sink: Sink to be drained.
"""
def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None:
"""Drain sink list with given parallelism. Uses joblib threading."""
Import
from mage_integrations.destinations.target import Target
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| is_endofpipe | bool | No | True when all input has been read (triggers cleanup) |
| self._sinks_active | dict[str, Sink] | Yes | Active sinks keyed by stream name |
| self._sinks_to_clear | list[Sink] | Yes | Sinks cleared due to schema changes |
Outputs
| Name | Type | Description |
|---|---|---|
| Side effects | None | Sinks drained (process_batch called), state written, cleanup on endofpipe |
Usage Examples
from mage_integrations.destinations.target import Target
from mage_integrations.destinations.sink import BatchSink
class MyBatchSink(BatchSink):
def process_batch(self, context):
records = context.get("records", [])
# Write records to target
print(f"Writing {len(records)} records for {self.stream_name}")
class MyTarget(Target):
default_sink_class = MyBatchSink
# Target automatically creates sinks per stream and drains them
# When a sink reaches max_size (10000 records), drain_one is called
# At end-of-pipe, drain_all drains all remaining sinks in parallel
Related Pages
Implements Principle
Requires Environment
- Environment:Mage_ai_Mage_ai_Python_3_9_Runtime
- Environment:Mage_ai_Mage_ai_Singer_SDK_And_Joblib_Runtime