Implementation:Mage ai Mage ai Pipedrive Tap
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Pipedrive, API |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Main tap class for the Pipedrive source connector, orchestrating stream registration, discovery, sync execution, and API request handling.
Description
The PipedriveTap class is the central orchestrator for the Pipedrive source connector. It manages a registry of 15 stream instances (CurrenciesStream, ActivityTypesStream, StagesStream, FiltersStream, PipelinesStream, RecentNotesStream, RecentUsersStream, RecentActivitiesStream, RecentDealsStream, RecentFilesStream, RecentOrganizationsStream, RecentPersonsStream, RecentProductsStream, DealStageChangeStream, DealsProductsStream), handles configuration with defaults, and provides do_discover and do_sync methods. The module also includes comprehensive HTTP error handling with a mapping of Pipedrive-specific error codes (400-503) to custom exception classes, rate limit handling via X-RateLimit-Reset header with backoff, and a custom PipedriveNull200Error for null response bodies. The do_sync method supports resumable syncing through currently_syncing state tracking and handles deal-ID-based pagination for child streams.
Usage
Instantiated by the Mage Pipedrive source connector with config and state dictionaries. The do_discover method returns a Singer catalog, and do_sync executes the data extraction pipeline.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/pipedrive/tap_pipedrive/tap.py
- Lines: 1-426
Signature
class PipedriveTap(object):
streams = [
CurrenciesStream(), ActivityTypesStream(), StagesStream(),
FiltersStream(), PipelinesStream(), RecentNotesStream(),
RecentUsersStream(), RecentActivitiesStream(), RecentDealsStream(),
RecentFilesStream(), RecentOrganizationsStream(), RecentPersonsStream(),
RecentProductsStream(), DealStageChangeStream(), DealsProductsStream(),
]
def __init__(self, config: dict, state: dict):
...
def do_discover(self) -> Catalog:
...
def do_sync(self, catalog: Catalog) -> None:
...
def get_selected_streams(self, catalog: Catalog) -> list:
...
@staticmethod
def get_default_config() -> dict:
...
Import
from mage_integrations.sources.pipedrive.tap_pipedrive.tap import PipedriveTap
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Pipedrive configuration with api_token, start_date, and optional settings |
| state | dict | Yes | Singer state with bookmarks and currently_syncing marker |
| catalog | Catalog | Yes (for sync) | Singer catalog with selected streams and schemas |
Outputs
| Name | Type | Description |
|---|---|---|
| catalog | Catalog | Singer catalog with stream schemas, key properties, and metadata |
| records | Singer messages | Records, schemas, and state messages emitted via Singer protocol |
Key Behaviors
Stream Registration
The tap pre-instantiates all 15 stream objects at class level. During do_discover and do_sync, each stream's tap attribute is set to the PipedriveTap instance, giving streams access to configuration and HTTP request methods.
Discovery
do_discover iterates over all registered streams, loads their JSON schemas, generates standard Singer metadata with replication method and valid replication keys, marks state_field properties as automatic inclusion, and returns a complete Catalog.
Sync Execution
do_sync supports resumable syncing by tracking currently_syncing in state. For each selected stream:
- Sets the stream's initial state from bookmarks or start_date.
- Writes the stream schema.
- For streams with id_list (like DealsProductsStream), iterates over deal IDs with pagination.
- For standard streams, fetches pages of records, transforms them, and writes Singer messages.
- Updates bookmarks and state after each page.
Error Handling
HTTP error codes 400-503 are mapped to Pipedrive-specific exception classes. Rate limiting is handled by reading X-RateLimit-Reset from response headers and sleeping accordingly. A PipedriveNull200Error is raised when the API returns a 200 status with a null body.
Registered Streams
| Stream Class | Resource Type |
|---|---|
| CurrenciesStream | Reference data |
| ActivityTypesStream | Reference data |
| StagesStream | Reference data |
| FiltersStream | Reference data |
| PipelinesStream | Reference data |
| RecentNotesStream | Incremental (recent items) |
| RecentUsersStream | Incremental (recent items) |
| RecentActivitiesStream | Incremental (recent items) |
| RecentDealsStream | Incremental (recent items) |
| RecentFilesStream | Incremental (recent items) |
| RecentOrganizationsStream | Incremental (recent items) |
| RecentPersonsStream | Incremental (recent items) |
| RecentProductsStream | Incremental (recent items) |
| DealStageChangeStream | Deal stage changes |
| DealsProductsStream | Deal-product associations |
Usage Examples
from mage_integrations.sources.pipedrive.tap_pipedrive.tap import PipedriveTap
config = {
"api_token": "xxxxxxxxxxxx",
"start_date": "2024-01-01T00:00:00Z",
}
state = {}
tap = PipedriveTap(config, state)
# Discover streams
catalog = tap.do_discover()
# Execute sync
tap.do_sync(catalog)