Heuristic:Mage ai Mage ai Parallel Sink Concurrency Limit
| Knowledge Sources | |
|---|---|
| Domains | Optimization, Data_Integration |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Hard cap of 8 parallel sinks with 5-minute maximum record age enforcement to prevent stale buffers.
Description
The `Target` class limits concurrent sink draining to 8 threads by default using Joblib's threading backend. This prevents resource exhaustion when processing many streams simultaneously. Additionally, a 5-minute maximum record age timer forces all sinks to drain even if their batch buffers are not full, ensuring data freshness and preventing indefinite buffering in low-volume scenarios.
Usage
Apply this heuristic when:
- Running destinations with many streams: The 8-thread cap prevents database connection exhaustion or API rate limit violations.
- Processing low-volume streams: The 5-minute timer ensures records are flushed even when batch size thresholds are not reached.
- Tuning destination performance: Override `max_parallelism` on custom Target subclasses to increase or decrease concurrency.
The Insight (Rule of Thumb)
- Action: Accept the default of 8 parallel sinks. Override `max_parallelism` property on Target subclass only if you have specific resource constraints.
- Value: `_MAX_PARALLELISM = 8` threads, `_MAX_RECORD_AGE_IN_MINUTES = 5.0` minutes.
- Trade-off: Higher parallelism = faster multi-stream processing but more database connections and memory. Lower parallelism = safer resource usage but slower throughput.
- Timer trade-off: The 5-minute age check adds latency to low-volume streams but prevents records from sitting in buffers indefinitely.
Reasoning
Eight threads is a conservative default that works well across most destination types. Database connections are typically limited (PostgreSQL default is 100, MySQL default is 151), so draining 8 sinks in parallel uses a manageable fraction of the connection pool. The threading backend (not multiprocessing) avoids serialization overhead for I/O-bound operations like database writes and API calls.
The 5-minute record age timer complements the batch-size-based flushing. Without it, a stream receiving one record per hour would buffer records indefinitely until the batch size threshold is reached, leading to data that is hours or days stale in the destination.
Code Evidence
Parallelism cap from `destinations/target.py:32,45`:
_MAX_PARALLELISM = 8
class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta):
_MAX_RECORD_AGE_IN_MINUTES: float = 5.0
Max parallelism property from `destinations/target.py:103-115`:
@property
def max_parallelism(self) -> int:
"""Get max parallel sinks.
The default is 8 if not overridden.
Returns:
Max number of sinks that can be drained in parallel.
"""
if self._max_parallelism is not None:
return self._max_parallelism
return _MAX_PARALLELISM
Joblib threading backend usage from `destinations/target.py:12`:
from joblib import Parallel, delayed, parallel_backend