Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Environment:Mage ai Mage ai Singer SDK And Joblib Runtime

From Leeroopedia


Knowledge Sources
Domains Infrastructure, Data_Integration
Last Updated 2026-02-09 07:00 GMT

Overview

Singer SDK 0.34.x and Joblib runtime environment for target/destination parallel processing.

Description

This environment defines the Singer SDK and Joblib dependencies required by the destination (target) layer of the mage-integrations framework. The `singer_sdk` package provides the base classes for targets, sinks, and the Singer message protocol. Joblib is used for parallel sink draining with configurable concurrency. Together they form the core runtime for all destination connectors.

Usage

Use this environment when running any Destination or Target connector. It is required for the `Target.drain_all()` parallel processing, the `Sink` record batching, and the `Destination.process()` message ingestion pipeline.

System Requirements

Category Requirement Notes
OS Linux, macOS, or Windows Cross-platform
Python >= 3.9 Same as Python_3_9_Runtime
CPU Multi-core recommended Joblib threading backend benefits from multiple cores for parallel sink draining

Dependencies

Python Packages

  • `singer_sdk` ~= 0.34.1 — Singer message protocol, Target/Sink base classes, schema validation
  • `joblib` (transitive via singer_sdk) — Parallel sink draining via threading backend
  • `jsonschema` (transitive via singer_sdk) — Draft4Validator for record validation
  • `requests` ~= 2.31.0 — HTTP client for API connectors
  • `singer` (singer-python) — Legacy Singer utilities (`singer.write_bookmark`, `singer.get_logger`)
  • `pyyaml` — Config file parsing (YAML format support)

Destination-Specific Packages

Depending on the target database:

  • BigQuery: `google-cloud-bigquery` ~= 3.0
  • PostgreSQL: `psycopg2` == 2.9.3
  • MySQL: `mysql-connector-python` (version-dependent on Python)
  • Snowflake: `snowflake-connector-python` == 3.7.1
  • MongoDB: `pymongo` == 4.3.3
  • Elasticsearch: `elasticsearch` == 8.15.1
  • S3/Delta Lake: `deltalake` == 0.20.2, `pyarrow` >= 14.0.1
  • Redshift: `redshift-connector` ~= 2.0.915

Credentials

Credentials are connector-specific and passed via `config.json`:

  • BigQuery: Google service account JSON or Application Default Credentials
  • PostgreSQL/MySQL: `host`, `port`, `username`, `password`, `database`
  • Snowflake: `account`, `user`, `password`, `warehouse`, `database`, `schema`
  • S3: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` (environment variables or config)

Quick Install

# Core destination dependencies
pip install 'singer_sdk~=0.34.1' joblib requests pyyaml jsonschema

# For specific destinations, add the relevant driver:
pip install 'google-cloud-bigquery~=3.0'       # BigQuery
pip install 'psycopg2-binary==2.9.3'           # PostgreSQL
pip install 'snowflake-connector-python==3.7.1' # Snowflake

Code Evidence

Singer SDK import in Target class from `destinations/target.py:12-25`:

from joblib import Parallel, delayed, parallel_backend
from singer_sdk.exceptions import RecordsWithoutSchemaException
from singer_sdk.helpers._batch import BaseBatchFileEncoding
from singer_sdk.helpers._classproperty import classproperty
from singer_sdk.helpers._compat import final
from singer_sdk.helpers.capabilities import (
    TARGET_SCHEMA_CONFIG,
    CapabilitiesEnum,
    PluginCapabilities,
    TargetCapabilities,
)
from singer_sdk.io_base import SingerMessageType, SingerReader
from singer_sdk.mapper import PluginMapper
from singer_sdk.plugin_base import PluginBase

Draft4Validator usage in `destinations/base.py:14`:

from jsonschema.validators import Draft4Validator

Joblib parallel draining from `destinations/target.py:32`:

_MAX_PARALLELISM = 8

Common Errors

Error Message Cause Solution
`ImportError: No module named 'singer_sdk'` singer_sdk not installed `pip install 'singer_sdk~=0.34.1'`
`ImportError: No module named 'joblib'` joblib not installed `pip install joblib` (usually transitive via singer_sdk)
`RecordsWithoutSchemaException` RECORD message received before SCHEMA Ensure source emits SCHEMA before RECORD for each stream
`jsonschema.exceptions.ValidationError` Record does not match stream schema Check source data against the declared JSON schema

Compatibility Notes

  • singer_sdk version: Pinned to ~= 0.34.1. Breaking changes between 0.x minor versions are possible; do not use 0.35+ without testing.
  • joblib threading vs multiprocessing: The framework uses `parallel_backend("threading")`, not multiprocessing. This avoids serialization overhead but means CPU-bound sinks do not benefit from true parallelism.
  • Draft4Validator: The framework uses JSON Schema Draft 4 validation. Draft 7+ schemas may not validate correctly.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment