Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai GitHub Streams

From Leeroopedia


Knowledge Sources
Domains Data_Integration, GitHub, API
Last Updated 2026-02-09 00:00 GMT

Overview

Defines all GitHub API stream classes and their replication strategies for the Mage GitHub source connector.

Description

This module provides the complete set of stream class definitions used by the GitHub source connector to extract data from the GitHub REST API. It contains a Stream base class with common URL building, bookmark management, and child record retrieval logic, along with three intermediate classes (FullTableStream, IncrementalStream, IncrementalOrderedStream) that set the replication method. Twenty concrete stream classes represent GitHub resources, each specifying its tap_stream_id, replication_method, replication_keys, key_properties, path, and parent-child relationships. A module-level STREAMS dictionary maps stream names to their classes for dynamic lookup.

Usage

Used by the GitHub sync orchestrator (sync.py) and client to dynamically instantiate stream objects, build API URLs, and manage bookmark state during data extraction.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/github/tap_github/streams.py
  • Lines: 1-952

Signature

class Stream:
    tap_stream_id = None
    replication_method = None
    replication_keys = None
    key_properties = []
    path = None
    children = []
    parent = None
    def build_url(self, base_url, repo_path, bookmark) -> str: ...
    def get_min_bookmark(self, stream, selected_streams, bookmark, repo_path, start_date, state) -> str: ...
    def write_bookmarks(self, stream, selected_streams, bookmark_value, repo_path, state) -> None: ...
    def get_child_records(self, client, catalog, child_stream, ...) -> None: ...

class FullTableStream(Stream): ...
class IncrementalStream(Stream): ...
class IncrementalOrderedStream(Stream): ...

Import

from mage_integrations.sources.github.tap_github.streams import STREAMS, Stream

I/O Contract

Inputs

Name Type Required Description
state dict Yes Singer state containing bookmark values per repository and stream
catalog list Yes Singer catalog with stream schemas and metadata
repo_path str Yes Repository path in owner/repo format
start_date str Yes ISO 8601 start date for initial sync

Outputs

Name Type Description
records list[dict] Transformed records written via Singer protocol (write_record)

Stream Classes

Full Table Streams

Stream Name Class Key Properties Path
assignees Assignees [id] assignees
releases Releases [id] releases
issue_labels IssueLabels [id] labels
collaborators Collaborators [id] collaborators
stargazers StarGazers [user_id] stargazers
teams Teams [id] orgs/{}/teams
team_members TeamMembers [id] orgs/{}/teams/{}/members
team_memberships TeamMemberships [url] orgs/{}/teams/{}/memberships/{}

Incremental Streams

Stream Name Class Replication Key Path
commits Commits since commits
events Events since events
commit_comments CommitComments since comments
reviews Reviews since pulls/{}/reviews
pr_commits PRCommits since pulls/{}/commits
project_cards ProjectCards since columns/{}/cards

Incremental Ordered Streams

Stream Name Class Replication Key Path
comments Comments since comments
issues Issues since issues
pull_requests PullRequests since pulls
issue_events IssueEvents since issues/events
issue_milestones IssueMilestones since milestones
review_comments ReviewComments since pulls/comments

Usage Examples

from mage_integrations.sources.github.tap_github.streams import STREAMS

# Instantiate a stream
commits_stream = STREAMS["commits"]()
url = commits_stream.build_url("https://api.github.com", "mage-ai/mage-ai", "2024-01-01T00:00:00Z")

# Access stream metadata
print(commits_stream.tap_stream_id)       # "commits"
print(commits_stream.replication_method)   # "INCREMENTAL"
print(commits_stream.replication_keys)     # "since"

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment