Implementation:Mage ai Mage ai Knowi Streams
Appearance
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Knowi, Stream |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Stream class definitions for the Mage Knowi source connector, providing base, incremental, and full-table stream abstractions along with concrete implementations for Dashboards and Widgets.
Description
This module defines the Singer stream classes for the Knowi integration. The class hierarchy includes:
- BaseStream - Abstract base class with common stream properties (
tap_stream_id,replication_method,replication_key,key_properties,path), parent data fetching, and epoch time conversion utilities. Usesdefault_data_key = "list"which is unique to Knowi's API response format. - IncrementalStream - Sets
replication_method = "INCREMENTAL". - FullTableStream - Sets
replication_method = "FULL_TABLE". - DashboardList - Internal stream that lists dashboard IDs from
/dashboards. Not replicated directly (to_replicate = False). - Dashboards - Fetches full dashboard details using DashboardList as parent. Iterates dashboard IDs to fetch individual records.
- WidgetList - Internal stream that lists widget IDs from
/widgets. Not replicated directly. - Widgets - Fetches full widget details using WidgetList as parent.
The module exports a STREAMS dictionary mapping stream names to their classes.
Usage
Stream classes are instantiated with a KnowiClient instance. The get_records method yields records for each stream.
Code Reference
Source Location
- Repository: mage-ai
- File:
mage_integrations/mage_integrations/sources/knowi/streams.py - Lines: 1-191
Signature
class BaseStream:
tap_stream_id = None
replication_method = None
replication_key = None
key_properties = []
default_data_key = "list"
def __init__(self, client: KnowiClient, logger=LOGGER):
Import
from mage_integrations.sources.knowi.streams import STREAMS, Dashboards, Widgets
Stream Classes
| Stream Class | tap_stream_id | Replication | Parent | Path |
|---|---|---|---|---|
DashboardList |
dashboard_list | FULL_TABLE | None | dashboards |
Dashboards |
dashboards | FULL_TABLE | DashboardList | dashboards/{} |
WidgetList |
widget_list | FULL_TABLE | None | widgets |
Widgets |
widgets | FULL_TABLE | WidgetList | widgets/{} |
Key Methods
| Method | Class | Description |
|---|---|---|
get_records(bookmark_datetime, is_parent) |
BaseStream | Abstract method; subclasses implement to yield records. |
get_parent_data(bookmark_datetime) |
BaseStream | Instantiates parent stream class and calls get_records with is_parent=True.
|
epoch_milliseconds_to_dt_str(timestamp) |
BaseStream | Static method converting epoch milliseconds to UTC datetime string. |
dt_to_epoch_seconds(dt_object) |
BaseStream | Static method converting datetime to epoch seconds. |
Usage Examples
from mage_integrations.sources.knowi.streams import STREAMS
stream_cls = STREAMS['dashboards']
stream = stream_cls(client=knowi_client, logger=logger)
for record in stream.get_records():
process(record)
Related Pages
Implements Principle
Requires Environment
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment