Environment:Mage ai Mage ai Singer SDK And Joblib Runtime
| Knowledge Sources | |
|---|---|
| Domains | Infrastructure, Data_Integration |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Singer SDK 0.34.x and Joblib runtime environment for target/destination parallel processing.
Description
This environment defines the Singer SDK and Joblib dependencies required by the destination (target) layer of the mage-integrations framework. The `singer_sdk` package provides the base classes for targets, sinks, and the Singer message protocol. Joblib is used for parallel sink draining with configurable concurrency. Together they form the core runtime for all destination connectors.
Usage
Use this environment when running any Destination or Target connector. It is required for the `Target.drain_all()` parallel processing, the `Sink` record batching, and the `Destination.process()` message ingestion pipeline.
System Requirements
| Category | Requirement | Notes |
|---|---|---|
| OS | Linux, macOS, or Windows | Cross-platform |
| Python | >= 3.9 | Same as Python_3_9_Runtime |
| CPU | Multi-core recommended | Joblib threading backend benefits from multiple cores for parallel sink draining |
Dependencies
Python Packages
- `singer_sdk` ~= 0.34.1 — Singer message protocol, Target/Sink base classes, schema validation
- `joblib` (transitive via singer_sdk) — Parallel sink draining via threading backend
- `jsonschema` (transitive via singer_sdk) — Draft4Validator for record validation
- `requests` ~= 2.31.0 — HTTP client for API connectors
- `singer` (singer-python) — Legacy Singer utilities (`singer.write_bookmark`, `singer.get_logger`)
- `pyyaml` — Config file parsing (YAML format support)
Destination-Specific Packages
Depending on the target database:
- BigQuery: `google-cloud-bigquery` ~= 3.0
- PostgreSQL: `psycopg2` == 2.9.3
- MySQL: `mysql-connector-python` (version-dependent on Python)
- Snowflake: `snowflake-connector-python` == 3.7.1
- MongoDB: `pymongo` == 4.3.3
- Elasticsearch: `elasticsearch` == 8.15.1
- S3/Delta Lake: `deltalake` == 0.20.2, `pyarrow` >= 14.0.1
- Redshift: `redshift-connector` ~= 2.0.915
Credentials
Credentials are connector-specific and passed via `config.json`:
- BigQuery: Google service account JSON or Application Default Credentials
- PostgreSQL/MySQL: `host`, `port`, `username`, `password`, `database`
- Snowflake: `account`, `user`, `password`, `warehouse`, `database`, `schema`
- S3: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` (environment variables or config)
Quick Install
# Core destination dependencies
pip install 'singer_sdk~=0.34.1' joblib requests pyyaml jsonschema
# For specific destinations, add the relevant driver:
pip install 'google-cloud-bigquery~=3.0' # BigQuery
pip install 'psycopg2-binary==2.9.3' # PostgreSQL
pip install 'snowflake-connector-python==3.7.1' # Snowflake
Code Evidence
Singer SDK import in Target class from `destinations/target.py:12-25`:
from joblib import Parallel, delayed, parallel_backend
from singer_sdk.exceptions import RecordsWithoutSchemaException
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers.capabilities import (
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
PluginCapabilities,
TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase
Draft4Validator usage in `destinations/base.py:14`:
from jsonschema.validators import Draft4Validator
Joblib parallel draining from `destinations/target.py:32`:
_MAX_PARALLELISM = 8
Common Errors
| Error Message | Cause | Solution |
|---|---|---|
| `ImportError: No module named 'singer_sdk'` | singer_sdk not installed | `pip install 'singer_sdk~=0.34.1'` |
| `ImportError: No module named 'joblib'` | joblib not installed | `pip install joblib` (usually transitive via singer_sdk) |
| `RecordsWithoutSchemaException` | RECORD message received before SCHEMA | Ensure source emits SCHEMA before RECORD for each stream |
| `jsonschema.exceptions.ValidationError` | Record does not match stream schema | Check source data against the declared JSON schema |
Compatibility Notes
- singer_sdk version: Pinned to ~= 0.34.1. Breaking changes between 0.x minor versions are possible; do not use 0.35+ without testing.
- joblib threading vs multiprocessing: The framework uses `parallel_backend("threading")`, not multiprocessing. This avoids serialization overhead but means CPU-bound sinks do not benefit from true parallelism.
- Draft4Validator: The framework uses JSON Schema Draft 4 validation. Draft 7+ schemas may not validate correctly.
Related Pages
- Implementation:Mage_ai_Mage_ai_Target_Drain_All
- Implementation:Mage_ai_Mage_ai_Destination_Init
- Implementation:Mage_ai_Mage_ai_Destination_Process
- Implementation:Mage_ai_Mage_ai_Destination_Process_Schema
- Implementation:Mage_ai_Mage_ai_Destination_Process_Record
- Implementation:Mage_ai_Mage_ai_Destination_Export_Batch_Data
- Implementation:Mage_ai_Mage_ai_Destination_Emit_State