Implementation:Mage ai Mage ai Pipedrive Base Stream
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Pipedrive, Stream |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Base stream classes for the Mage Pipedrive source connector, providing common logic for schema loading, pagination, state management, and incremental record filtering.
Description
This module defines two stream classes:
- PipedriveStream - The base class for all Pipedrive streams. It provides schema loading from JSON files, Singer schema emission, offset-based pagination using
additional_data.paginationfrom API responses, bookmark-based state management usingpendulumfor datetime parsing, and record filtering viarecord_is_newer_equal_null. The pagination supportsstart/limitoffset parameters (default limit of 100). State is tracked viastate_field,initial_state, andearliest_stateproperties.
- PipedriveIterStream - An extended base class for streams that require iterating through deal IDs. It provides
get_deal_idswhich paginates through deals, identifying those added or with stage changes within a time window (betweeninitial_stateandstream_start). Usesfind_deal_idsto filter byadd_timeandstage_change_time.
Usage
Extended by concrete Pipedrive stream classes. The tap orchestrator calls has_data() to check for more pages and paginate(response) to advance.
Code Reference
Source Location
- Repository: mage-ai
- File:
mage_integrations/mage_integrations/sources/pipedrive/tap_pipedrive/stream.py - Lines: 1-196
Signature
class PipedriveStream(object):
tap = None
endpoint = ''
key_properties = []
state_field = None
initial_state = None
earliest_state = None
schema = ''
replication_method = 'FULL_TABLE'
start = 0
limit = 100
class PipedriveIterStream(PipedriveStream):
id_list = True
Import
from mage_integrations.sources.pipedrive.tap_pipedrive.stream import PipedriveStream, PipedriveIterStream
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| state | dict | Yes | Singer state dictionary with bookmarks per stream |
| start_date | pendulum.DateTime | Yes | Fallback start date when no bookmark exists |
Outputs
| Name | Type | Description |
|---|---|---|
| records | generator | Individual records yielded through write_record or deal IDs via get_deal_ids
|
Key Methods
| Method | Class | Description |
|---|---|---|
get_schema() |
PipedriveStream | Returns cached schema or loads from JSON file. |
load_schema() |
PipedriveStream | Loads JSON schema from schemas/{schema}.json relative to the stream module.
|
write_schema() |
PipedriveStream | Emits Singer schema message with key properties and replication method. |
paginate(response) |
PipedriveStream | Parses additional_data.pagination from response to advance offset or stop.
|
has_data() |
PipedriveStream | Returns whether more items exist in the paginated collection. |
update_state(row) |
PipedriveStream | Updates earliest_state bookmark if row's state field is newer.
|
set_initial_state(state, start_date) |
PipedriveStream | Initializes state from existing bookmark or config start_date. |
write_record(row) |
PipedriveStream | Writes a Singer record if it passes the record_is_newer_equal_null filter.
|
record_is_newer_equal_null(row) |
PipedriveStream | Returns True if record is newer than or equal to initial state, or if state is null. |
get_deal_ids(tap) |
PipedriveIterStream | Generator yielding deal IDs that were added or had stage changes within the sync window. |
find_deal_ids(data, start, stop) |
PipedriveIterStream | Filters data for deals added or with stage changes between start and stop datetimes. |
Usage Examples
from mage_integrations.sources.pipedrive.tap_pipedrive.stream import PipedriveStream
class DealsStream(PipedriveStream):
endpoint = 'deals'
schema = 'deals'
key_properties = ['id']
state_field = 'update_time'
replication_method = 'INCREMENTAL'
stream = DealsStream()
stream.set_initial_state(state, start_date)
stream.write_schema()
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment