Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Pipedrive Tap

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Pipedrive, API
Last Updated 2026-02-09 00:00 GMT

Overview

Main tap class for the Pipedrive source connector, orchestrating stream registration, discovery, sync execution, and API request handling.

Description

The PipedriveTap class is the central orchestrator for the Pipedrive source connector. It manages a registry of 15 stream instances (CurrenciesStream, ActivityTypesStream, StagesStream, FiltersStream, PipelinesStream, RecentNotesStream, RecentUsersStream, RecentActivitiesStream, RecentDealsStream, RecentFilesStream, RecentOrganizationsStream, RecentPersonsStream, RecentProductsStream, DealStageChangeStream, DealsProductsStream), handles configuration with defaults, and provides do_discover and do_sync methods. The module also includes comprehensive HTTP error handling with a mapping of Pipedrive-specific error codes (400-503) to custom exception classes, rate limit handling via X-RateLimit-Reset header with backoff, and a custom PipedriveNull200Error for null response bodies. The do_sync method supports resumable syncing through currently_syncing state tracking and handles deal-ID-based pagination for child streams.

Usage

Instantiated by the Mage Pipedrive source connector with config and state dictionaries. The do_discover method returns a Singer catalog, and do_sync executes the data extraction pipeline.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/pipedrive/tap_pipedrive/tap.py
  • Lines: 1-426

Signature

class PipedriveTap(object):
    streams = [
        CurrenciesStream(), ActivityTypesStream(), StagesStream(),
        FiltersStream(), PipelinesStream(), RecentNotesStream(),
        RecentUsersStream(), RecentActivitiesStream(), RecentDealsStream(),
        RecentFilesStream(), RecentOrganizationsStream(), RecentPersonsStream(),
        RecentProductsStream(), DealStageChangeStream(), DealsProductsStream(),
    ]

    def __init__(self, config: dict, state: dict):
        ...
    def do_discover(self) -> Catalog:
        ...
    def do_sync(self, catalog: Catalog) -> None:
        ...
    def get_selected_streams(self, catalog: Catalog) -> list:
        ...
    @staticmethod
    def get_default_config() -> dict:
        ...

Import

from mage_integrations.sources.pipedrive.tap_pipedrive.tap import PipedriveTap

I/O Contract

Inputs

Name Type Required Description
config dict Yes Pipedrive configuration with api_token, start_date, and optional settings
state dict Yes Singer state with bookmarks and currently_syncing marker
catalog Catalog Yes (for sync) Singer catalog with selected streams and schemas

Outputs

Name Type Description
catalog Catalog Singer catalog with stream schemas, key properties, and metadata
records Singer messages Records, schemas, and state messages emitted via Singer protocol

Key Behaviors

Stream Registration

The tap pre-instantiates all 15 stream objects at class level. During do_discover and do_sync, each stream's tap attribute is set to the PipedriveTap instance, giving streams access to configuration and HTTP request methods.

Discovery

do_discover iterates over all registered streams, loads their JSON schemas, generates standard Singer metadata with replication method and valid replication keys, marks state_field properties as automatic inclusion, and returns a complete Catalog.

Sync Execution

do_sync supports resumable syncing by tracking currently_syncing in state. For each selected stream:

  1. Sets the stream's initial state from bookmarks or start_date.
  2. Writes the stream schema.
  3. For streams with id_list (like DealsProductsStream), iterates over deal IDs with pagination.
  4. For standard streams, fetches pages of records, transforms them, and writes Singer messages.
  5. Updates bookmarks and state after each page.

Error Handling

HTTP error codes 400-503 are mapped to Pipedrive-specific exception classes. Rate limiting is handled by reading X-RateLimit-Reset from response headers and sleeping accordingly. A PipedriveNull200Error is raised when the API returns a 200 status with a null body.

Registered Streams

Stream Class Resource Type
CurrenciesStream Reference data
ActivityTypesStream Reference data
StagesStream Reference data
FiltersStream Reference data
PipelinesStream Reference data
RecentNotesStream Incremental (recent items)
RecentUsersStream Incremental (recent items)
RecentActivitiesStream Incremental (recent items)
RecentDealsStream Incremental (recent items)
RecentFilesStream Incremental (recent items)
RecentOrganizationsStream Incremental (recent items)
RecentPersonsStream Incremental (recent items)
RecentProductsStream Incremental (recent items)
DealStageChangeStream Deal stage changes
DealsProductsStream Deal-product associations

Usage Examples

from mage_integrations.sources.pipedrive.tap_pipedrive.tap import PipedriveTap

config = {
    "api_token": "xxxxxxxxxxxx",
    "start_date": "2024-01-01T00:00:00Z",
}
state = {}

tap = PipedriveTap(config, state)

# Discover streams
catalog = tap.do_discover()

# Execute sync
tap.do_sync(catalog)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment