Workflow:Mage ai Mage ai API Source Extraction
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, ETL, API_Integration |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
End-to-end process for extracting data from SaaS and REST APIs (Stripe, HubSpot, GitHub, Salesforce, and 20+ others) using Singer-compatible HTTP source connectors with authentication, pagination, rate limiting, and incremental sync.
Description
This workflow covers the complete lifecycle of an HTTP/API-based Singer source connector in the Mage integrations framework. API sources extend either the base Source class directly or use the HTTP client base class to provide standardized authentication, pagination, retry logic, and error handling. Each API connector defines its own streams (data endpoints), JSON schemas, and extraction logic tailored to the specific service's API. The process handles OAuth and API key authentication, cursor-based and offset-based pagination, rate limit compliance, and incremental sync via bookmark tracking on timestamp or ID fields.
Usage
Execute this workflow when you need to extract data from a SaaS platform or REST API into a Mage data pipeline. Common triggers include:
- You need to extract data from a SaaS service (Stripe, HubSpot, GitHub, Salesforce, etc.) for analytics
- You need to replicate API data into a data warehouse on a recurring schedule
- You need incremental extraction that only fetches records created or modified since the last sync
- You need to handle complex API pagination patterns (cursor-based, offset-based, page-based)
Execution Steps
Step 1: Authentication and Client Setup
Configure API credentials and initialize the HTTP client for the target service. Each connector supports its own authentication method: API keys, OAuth 2.0 tokens, or basic authentication with service-specific headers.
Key considerations:
- Authentication methods vary by service (e.g., Stripe uses API key, HubSpot uses OAuth, GitHub uses personal access tokens)
- The HTTP client base class provides standardized error handling with retry logic and exponential backoff
- Rate limiting is handled per-service with configurable request intervals
- Configuration is loaded from a JSON config file with service-specific keys
Step 2: Stream Discovery
Discover available data streams (API endpoints) and their schemas. API sources define their streams via JSON schema files stored in a schemas directory, with each file describing the structure of records returned by a specific API endpoint.
What happens:
- Each stream corresponds to an API resource (e.g., customers, invoices, events, subscriptions)
- JSON schemas define the expected field names, types, and nesting structure
- Stream metadata includes replication method, valid replication keys, and key properties
- Discovery returns a Singer catalog with all available streams and their schemas
- Complex APIs may have parent-child stream relationships (e.g., reports contain queries, which contain charts)
Step 3: Catalog Selection
Select which API streams to extract and configure replication settings. Each stream can be set to FULL_TABLE or INCREMENTAL replication, with bookmark properties defined for incremental sync.
Key considerations:
- Select only the streams you need to minimize API calls and extraction time
- Some streams support incremental sync via timestamp fields (e.g., updated_at, created_at)
- Parent-child stream dependencies must be respected (selecting a child requires the parent)
- Replication keys determine which records are new since the last sync
Step 4: State and Bookmark Loading
Load state from a previous sync run to enable incremental extraction. The state contains per-stream bookmark values (typically timestamps or cursor positions) that indicate where the last extraction stopped.
What happens:
- State JSON is loaded from file if provided via CLI arguments
- Each stream has its own bookmark entry tracking the last extracted position
- For timestamp-based incrementals, the bookmark stores the maximum timestamp seen
- For cursor-based APIs, the bookmark stores the last cursor or offset value
- First-time runs start with empty state and extract all available data
Step 5: Paginated API Data Extraction
Extract data from each selected stream by making authenticated HTTP requests to the API, handling pagination to retrieve all records across multiple pages, and filtering for incremental updates when bookmark state is available.
What happens:
- For each selected stream, the connector constructs API requests with appropriate parameters
- Pagination is handled according to the service's pattern (cursor, offset, page number, or link-based)
- For incremental streams, request parameters include filters for records modified since the bookmark value
- Rate limiting is respected with configurable delays between requests
- API responses are parsed and records are extracted from the response body
- Error responses trigger retries with exponential backoff (configurable retry count)
- Records are yielded in batches for memory-efficient processing
Step 6: Record Transformation and Emission
Transform raw API response data into Singer-protocol messages. This includes type conversion, nested object flattening, timestamp normalization, and emission of SCHEMA, RECORD, and STATE messages.
What happens:
- Raw API records are transformed to match the declared JSON schema
- Nested objects and arrays may be flattened or serialized depending on the connector
- Timestamps are normalized to ISO 8601 format
- SCHEMA messages are emitted first for each stream with the full JSON schema
- RECORD messages are emitted for each individual data record
- STATE messages are emitted periodically with updated bookmark values
- Bookmark values are updated as records are processed (tracking the maximum replication key value)
Step 7: State Persistence
Persist the final sync state so subsequent runs can perform incremental extraction. The state captures per-stream bookmark positions enabling efficient delta syncs.
Key considerations:
- State is emitted as Singer STATE messages and captured by the downstream target
- The target persists state to a file for the next source run
- Bookmark values enable the next run to only fetch new or modified records
- API rate limits make incremental sync significantly more efficient than full extraction