Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai Knowi Streams

From Leeroopedia


Knowledge Sources
Domains Data_Integration, Knowi, Stream
Last Updated 2026-02-09 00:00 GMT

Overview

Stream class definitions for the Mage Knowi source connector, providing base, incremental, and full-table stream abstractions along with concrete implementations for Dashboards and Widgets.

Description

This module defines the Singer stream classes for the Knowi integration. The class hierarchy includes:

  • BaseStream - Abstract base class with common stream properties (tap_stream_id, replication_method, replication_key, key_properties, path), parent data fetching, and epoch time conversion utilities. Uses default_data_key = "list" which is unique to Knowi's API response format.
  • IncrementalStream - Sets replication_method = "INCREMENTAL".
  • FullTableStream - Sets replication_method = "FULL_TABLE".
  • DashboardList - Internal stream that lists dashboard IDs from /dashboards. Not replicated directly (to_replicate = False).
  • Dashboards - Fetches full dashboard details using DashboardList as parent. Iterates dashboard IDs to fetch individual records.
  • WidgetList - Internal stream that lists widget IDs from /widgets. Not replicated directly.
  • Widgets - Fetches full widget details using WidgetList as parent.

The module exports a STREAMS dictionary mapping stream names to their classes.

Usage

Stream classes are instantiated with a KnowiClient instance. The get_records method yields records for each stream.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/knowi/streams.py
  • Lines: 1-191

Signature

class BaseStream:
    tap_stream_id = None
    replication_method = None
    replication_key = None
    key_properties = []
    default_data_key = "list"

    def __init__(self, client: KnowiClient, logger=LOGGER):

Import

from mage_integrations.sources.knowi.streams import STREAMS, Dashboards, Widgets

Stream Classes

Stream Class tap_stream_id Replication Parent Path
DashboardList dashboard_list FULL_TABLE None dashboards
Dashboards dashboards FULL_TABLE DashboardList dashboards/{}
WidgetList widget_list FULL_TABLE None widgets
Widgets widgets FULL_TABLE WidgetList widgets/{}

Key Methods

Method Class Description
get_records(bookmark_datetime, is_parent) BaseStream Abstract method; subclasses implement to yield records.
get_parent_data(bookmark_datetime) BaseStream Instantiates parent stream class and calls get_records with is_parent=True.
epoch_milliseconds_to_dt_str(timestamp) BaseStream Static method converting epoch milliseconds to UTC datetime string.
dt_to_epoch_seconds(dt_object) BaseStream Static method converting datetime to epoch seconds.

Usage Examples

from mage_integrations.sources.knowi.streams import STREAMS

stream_cls = STREAMS['dashboards']
stream = stream_cls(client=knowi_client, logger=logger)
for record in stream.get_records():
    process(record)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment