Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Pipedrive Base Stream

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Pipedrive, Stream
Last Updated 2026-02-09 00:00 GMT

Overview

Base stream classes for the Mage Pipedrive source connector, providing common logic for schema loading, pagination, state management, and incremental record filtering.

Description

This module defines two stream classes:

  • PipedriveStream - The base class for all Pipedrive streams. It provides schema loading from JSON files, Singer schema emission, offset-based pagination using additional_data.pagination from API responses, bookmark-based state management using pendulum for datetime parsing, and record filtering via record_is_newer_equal_null. The pagination supports start/limit offset parameters (default limit of 100). State is tracked via state_field, initial_state, and earliest_state properties.
  • PipedriveIterStream - An extended base class for streams that require iterating through deal IDs. It provides get_deal_ids which paginates through deals, identifying those added or with stage changes within a time window (between initial_state and stream_start). Uses find_deal_ids to filter by add_time and stage_change_time.

Usage

Extended by concrete Pipedrive stream classes. The tap orchestrator calls has_data() to check for more pages and paginate(response) to advance.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/pipedrive/tap_pipedrive/stream.py
  • Lines: 1-196

Signature

class PipedriveStream(object):
    tap = None
    endpoint = ''
    key_properties = []
    state_field = None
    initial_state = None
    earliest_state = None
    schema = ''
    replication_method = 'FULL_TABLE'
    start = 0
    limit = 100

class PipedriveIterStream(PipedriveStream):
    id_list = True

Import

from mage_integrations.sources.pipedrive.tap_pipedrive.stream import PipedriveStream, PipedriveIterStream

I/O Contract

Inputs

Name Type Required Description
state dict Yes Singer state dictionary with bookmarks per stream
start_date pendulum.DateTime Yes Fallback start date when no bookmark exists

Outputs

Name Type Description
records generator Individual records yielded through write_record or deal IDs via get_deal_ids

Key Methods

Method Class Description
get_schema() PipedriveStream Returns cached schema or loads from JSON file.
load_schema() PipedriveStream Loads JSON schema from schemas/{schema}.json relative to the stream module.
write_schema() PipedriveStream Emits Singer schema message with key properties and replication method.
paginate(response) PipedriveStream Parses additional_data.pagination from response to advance offset or stop.
has_data() PipedriveStream Returns whether more items exist in the paginated collection.
update_state(row) PipedriveStream Updates earliest_state bookmark if row's state field is newer.
set_initial_state(state, start_date) PipedriveStream Initializes state from existing bookmark or config start_date.
write_record(row) PipedriveStream Writes a Singer record if it passes the record_is_newer_equal_null filter.
record_is_newer_equal_null(row) PipedriveStream Returns True if record is newer than or equal to initial state, or if state is null.
get_deal_ids(tap) PipedriveIterStream Generator yielding deal IDs that were added or had stage changes within the sync window.
find_deal_ids(data, start, stop) PipedriveIterStream Filters data for deals added or with stage changes between start and stop datetimes.

Usage Examples

from mage_integrations.sources.pipedrive.tap_pipedrive.stream import PipedriveStream

class DealsStream(PipedriveStream):
    endpoint = 'deals'
    schema = 'deals'
    key_properties = ['id']
    state_field = 'update_time'
    replication_method = 'INCREMENTAL'

stream = DealsStream()
stream.set_initial_state(state, start_date)
stream.write_schema()

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment