Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave Streaming API Functions

From Leeroopedia


Property Value
Module Streaming API Functions
File dashboard/lib/api/streaming.ts
Language TypeScript
Lines 286
Type API Function Module
Dependencies api.ts, class-transformer, lodash, protobuf types (catalog, meta, plan_common, user)

Overview

streaming.ts provides API functions and type definitions for fetching all streaming-related catalog data (relations, fragments, dependencies) from the RisingWave meta node. It defines the core Relation interface and StreamingJob class (with class-transformer decorators for JSON deserialization). The module exports fetch functions for materialized views, tables, indexes, sinks, sources, views, functions, subscriptions, users, databases, schemas, and object dependencies. It also provides utility functions like relationType, relationTypeTitleCase, and relationIsStreamingJob for relation classification. This is the central data layer for the dashboard; virtually every page depends on this module to fetch and classify catalog objects.

Code Reference

Source Location

dashboard/lib/api/streaming.ts

Signature

// Core types
export interface Relation {
  id: number
  name: string
  owner: number
  schemaId: number
  databaseId: number
  streamingJob?: StreamingJob
  columns?: (ColumnCatalog | Field)[]
  ownerName?: string
  schemaName?: string
  databaseName?: string
  totalSizeBytes?: number
}

export class StreamingJob {
  id: number
  name: string
  jobStatus: string
  maxParallelism: number
  configOverride: string
  get parallelism(): string
  get type(): string
}

export type RelationType = ReturnType<typeof relationType>

// Utility functions
export function relationType(x: Relation): string
export function relationTypeTitleCase(x: Relation): string
export function relationIsStreamingJob(x: Relation): boolean

// Fragment API
export async function getFragmentsByJobId(jobId: number): Promise<TableFragments>
export async function getRelationIdInfos(): Promise<RelationIdInfos>
export async function getFragmentToRelationMap(): Promise<FragmentToRelationMap>

// Catalog fetch functions
export async function getStreamingJobs(): Promise<StreamingJob[]>
export async function getRelations(): Promise<Relation[]>
export async function getRelationDependencies(): Promise<Map<number, number[]>>
export async function getMaterializedViews(): Promise<ExtendedTable[]>
export async function getTables(): Promise<ExtendedTable[]>
export async function getIndexes(): Promise<(Table & Index)[]>
export async function getInternalTables(): Promise<ExtendedTable[]>
export async function getSinks(): Promise<Sink[]>
export async function getSources(): Promise<Source[]>
export async function getViews(): Promise<View[]>
export async function getFunctions(): Promise<Function[]>
export async function getSubscriptions(): Promise<Subscription[]>
export async function getUsers(): Promise<UserInfo[]>
export async function getDatabases(): Promise<Database[]>
export async function getSchemas(): Promise<Schema[]>
export async function getObjectDependencies(): Promise<Map<number, number[]>>

Import

import {
  Relation,
  StreamingJob,
  getRelations,
  getRelationDependencies,
  relationType,
  relationIsStreamingJob,
  getFragmentsByJobId,
  getFragmentToRelationMap,
} from "../lib/api/streaming"

I/O Contract

Relation Interface

Field Type Description
id number Unique relation identifier
name string Relation name
owner number Owner user ID
schemaId number Schema identifier
databaseId number Database identifier
streamingJob StreamingJob? Associated streaming job, if any
columns Field)[]? Column definitions for display
totalSizeBytes number? Total storage size for tables

relationType Classification

Detection Logic Returns
tableType property exists (Table) The table type string (e.g., "TABLE", "MATERIALIZED VIEW")
sinkFromName property exists "SINK"
info property exists (Source) "SOURCE"
dependentTableId property exists "SUBSCRIPTION"
language property exists "FUNCTION"
None of the above "UNKNOWN"

StreamingJob.parallelism

The parallelism getter handles three cases:

  • String value (e.g., "Adaptive") - returned as-is
  • Object value (e.g., { "Fixed": 64 }) - formatted as "Fixed (64)"
  • Other - JSON stringified as fallback

API Endpoints Used

Function Endpoint Deserialization
getFragmentsByJobId /fragments/job_id/{id} TableFragments.fromJSON
getRelationIdInfos /relation_id_infos Direct JSON
getFragmentToRelationMap /fragment_to_relation_map FragmentToRelationMap.fromJSON
getStreamingJobs /streaming_jobs plainToInstance(StreamingJob, ...)
getMaterializedViews /materialized_views extendedTableFromJSON
getTables /tables extendedTableFromJSON
getIndexes /indexes + /index_items extendedTableFromJSON + Index.fromJSON, joined by ID
getSinks /sinks Sink.fromJSON
getSources /sources Source.fromJSON
getObjectDependencies /object_dependencies ObjectDependencies.fromJSON, grouped into Map<number, number[]>

Usage Examples

Fetching All Relations

import { getRelations, relationType } from "../lib/api/streaming"

const relations = await getRelations()
// Returns sorted concat of: MVs, tables, indexes, sinks, sources, subscriptions

relations.forEach((r) => {
  console.log(`${r.name} (${relationType(r)}) - ID: ${r.id}`)
})

Checking If Relation Is a Streaming Job

import { relationIsStreamingJob, relationType } from "../lib/api/streaming"

// Returns true for all types except "UNKNOWN", "SOURCE", and "INTERNAL"
if (relationIsStreamingJob(relation)) {
  // This relation has a streaming execution plan
}

Fetching Fragment Data

import { getFragmentsByJobId } from "../lib/api/streaming"

const fragments = await getFragmentsByJobId(42)
// Returns TableFragments protobuf object with fragment topology

Building Dependency Map

import { getObjectDependencies } from "../lib/api/streaming"

const deps = await getObjectDependencies()
// Map<number, number[]> where key is object ID and value is list of referenced object IDs

const dependsOn = deps.get(relationId) // [id1, id2, ...]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment