Workflow:Mage ai Mage ai Building a New Source Connector
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, ETL, Connector_Development |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
End-to-end process for creating a new Singer-compatible source connector (tap) within the Mage integrations framework, from directory scaffolding through schema definition to implementing extraction logic.
Description
This workflow guides developers through building a new data source connector that integrates with the Mage ETL framework. The process follows the Singer specification and leverages Mage's base class hierarchy: the generic Source base for all connectors, the SQL base for relational databases, and the HTTP client base for REST APIs. The resulting connector will support schema discovery, catalog-based stream selection, multiple replication methods (FULL_TABLE, INCREMENTAL, LOG_BASED), and state-based incremental extraction. Connectors follow a standardized directory structure with __init__.py (main class), README.md (configuration docs), schemas/ (JSON schema files), and templates/ (config templates).
Usage
Execute this workflow when you need to add support for a new data source that is not already covered by the 40+ existing connectors. Common triggers include:
- You need to extract data from a database or API that does not have an existing Mage connector
- You need a custom connector with specialized extraction logic beyond what the generic API source provides
- You are contributing a new integration to the Mage open-source project
Execution Steps
Step 1: Directory Structure Scaffolding
Create the standardized directory structure for the new source connector following the established convention used by all existing integrations.
What to create:
- A new directory under mage_integrations/mage_integrations/sources/{source_name}/
- __init__.py for the main connector class
- README.md documenting configuration requirements
- schemas/ directory for JSON schema definitions
- templates/config.json with a sample configuration template
- Optionally: client.py for HTTP API communication, streams/ for multi-stream sources
Key considerations:
- Follow the naming convention used by existing connectors (lowercase with underscores)
- Study a similar existing connector as a template (e.g., postgresql for SQL, chargebee for API)
- The README should document all required and optional configuration parameters
Step 2: JSON Schema Definition
Define JSON schemas for each data stream (table or API endpoint) that the connector will support. Schemas declare the structure, types, and constraints of the data records.
What happens:
- Create one JSON file per stream in the schemas/ directory (e.g., schemas/users.json, schemas/orders.json)
- Each schema follows the JSON Schema Draft 4 specification
- Properties define field names with their types (string, integer, number, boolean, object, array)
- Key properties (primary keys) are identified for deduplication
- Date and timestamp fields use the string type with format: date-time
- Nullable fields use type arrays (e.g., ["string", "null"])
Step 3: Main Connector Class Implementation
Implement the main connector class by extending the appropriate base class: Source for generic connectors, SQL Source for database connectors, or the HTTP base for API connectors.
What happens:
- The class extends the base Source, SQL Source, or a custom base depending on the data source type
- For SQL sources: override build_connection() to return a connection object, and optionally override column_type_mapping() for custom type mappings
- For API sources: implement the HTTP client with authentication, override load_data() to make API calls and yield record batches
- The discover() method can be inherited (for SQL) or overridden (for API) to build the catalog
- The test_connection() method validates that credentials and connectivity are correct
Key considerations:
- The load_data() method must be a generator that yields batches of records as lists of dictionaries
- Each record dictionary must have keys matching the JSON schema property names
- For incremental sources, load_data() receives bookmark values and should filter accordingly
- Handle pagination, rate limiting, and error retry within load_data()
Step 4: Configuration Template and Documentation
Create the configuration template and write comprehensive README documentation covering all connection parameters, authentication methods, and setup instructions.
What to document:
- All required configuration keys with descriptions and example values
- Authentication setup steps (API key generation, OAuth flow, database user creation)
- Optional parameters with their default values
- Any prerequisites or service-specific setup steps
- The templates/config.json file should contain all config keys with placeholder values
Step 5: Testing and Validation
Test the new connector through the complete tap lifecycle: connection testing, schema discovery, full-table extraction, and incremental extraction with bookmark state.
Key considerations:
- Run the connector in discover mode to verify schema discovery produces a valid catalog
- Run a full-table sync to verify records are correctly extracted and emitted as Singer messages
- Run an incremental sync with state to verify bookmark filtering works
- Validate that SCHEMA, RECORD, and STATE messages conform to the Singer specification
- Test error handling for authentication failures, network errors, and invalid data
- Use the test scripts in scripts/ as reference for integration testing patterns