Implementation:Risingwavelabs Risingwave Streaming API Functions
| Property | Value |
|---|---|
| Module | Streaming API Functions |
| File | dashboard/lib/api/streaming.ts
|
| Language | TypeScript |
| Lines | 286 |
| Type | API Function Module |
| Dependencies | api.ts, class-transformer, lodash, protobuf types (catalog, meta, plan_common, user) |
Overview
streaming.ts provides API functions and type definitions for fetching all streaming-related catalog data (relations, fragments, dependencies) from the RisingWave meta node. It defines the core Relation interface and StreamingJob class (with class-transformer decorators for JSON deserialization). The module exports fetch functions for materialized views, tables, indexes, sinks, sources, views, functions, subscriptions, users, databases, schemas, and object dependencies. It also provides utility functions like relationType, relationTypeTitleCase, and relationIsStreamingJob for relation classification. This is the central data layer for the dashboard; virtually every page depends on this module to fetch and classify catalog objects.
Code Reference
Source Location
dashboard/lib/api/streaming.ts
Signature
// Core types
export interface Relation {
id: number
name: string
owner: number
schemaId: number
databaseId: number
streamingJob?: StreamingJob
columns?: (ColumnCatalog | Field)[]
ownerName?: string
schemaName?: string
databaseName?: string
totalSizeBytes?: number
}
export class StreamingJob {
id: number
name: string
jobStatus: string
maxParallelism: number
configOverride: string
get parallelism(): string
get type(): string
}
export type RelationType = ReturnType<typeof relationType>
// Utility functions
export function relationType(x: Relation): string
export function relationTypeTitleCase(x: Relation): string
export function relationIsStreamingJob(x: Relation): boolean
// Fragment API
export async function getFragmentsByJobId(jobId: number): Promise<TableFragments>
export async function getRelationIdInfos(): Promise<RelationIdInfos>
export async function getFragmentToRelationMap(): Promise<FragmentToRelationMap>
// Catalog fetch functions
export async function getStreamingJobs(): Promise<StreamingJob[]>
export async function getRelations(): Promise<Relation[]>
export async function getRelationDependencies(): Promise<Map<number, number[]>>
export async function getMaterializedViews(): Promise<ExtendedTable[]>
export async function getTables(): Promise<ExtendedTable[]>
export async function getIndexes(): Promise<(Table & Index)[]>
export async function getInternalTables(): Promise<ExtendedTable[]>
export async function getSinks(): Promise<Sink[]>
export async function getSources(): Promise<Source[]>
export async function getViews(): Promise<View[]>
export async function getFunctions(): Promise<Function[]>
export async function getSubscriptions(): Promise<Subscription[]>
export async function getUsers(): Promise<UserInfo[]>
export async function getDatabases(): Promise<Database[]>
export async function getSchemas(): Promise<Schema[]>
export async function getObjectDependencies(): Promise<Map<number, number[]>>
Import
import {
Relation,
StreamingJob,
getRelations,
getRelationDependencies,
relationType,
relationIsStreamingJob,
getFragmentsByJobId,
getFragmentToRelationMap,
} from "../lib/api/streaming"
I/O Contract
Relation Interface
| Field | Type | Description |
|---|---|---|
id |
number |
Unique relation identifier |
name |
string |
Relation name |
owner |
number |
Owner user ID |
schemaId |
number |
Schema identifier |
databaseId |
number |
Database identifier |
streamingJob |
StreamingJob? |
Associated streaming job, if any |
columns |
Field)[]? | Column definitions for display |
totalSizeBytes |
number? |
Total storage size for tables |
relationType Classification
| Detection Logic | Returns |
|---|---|
tableType property exists (Table) |
The table type string (e.g., "TABLE", "MATERIALIZED VIEW") |
sinkFromName property exists |
"SINK"
|
info property exists (Source) |
"SOURCE"
|
dependentTableId property exists |
"SUBSCRIPTION"
|
language property exists |
"FUNCTION"
|
| None of the above | "UNKNOWN"
|
StreamingJob.parallelism
The parallelism getter handles three cases:
- String value (e.g.,
"Adaptive") - returned as-is - Object value (e.g.,
{ "Fixed": 64 }) - formatted as"Fixed (64)" - Other - JSON stringified as fallback
API Endpoints Used
| Function | Endpoint | Deserialization |
|---|---|---|
getFragmentsByJobId |
/fragments/job_id/{id} |
TableFragments.fromJSON
|
getRelationIdInfos |
/relation_id_infos |
Direct JSON |
getFragmentToRelationMap |
/fragment_to_relation_map |
FragmentToRelationMap.fromJSON
|
getStreamingJobs |
/streaming_jobs |
plainToInstance(StreamingJob, ...)
|
getMaterializedViews |
/materialized_views |
extendedTableFromJSON
|
getTables |
/tables |
extendedTableFromJSON
|
getIndexes |
/indexes + /index_items |
extendedTableFromJSON + Index.fromJSON, joined by ID
|
getSinks |
/sinks |
Sink.fromJSON
|
getSources |
/sources |
Source.fromJSON
|
getObjectDependencies |
/object_dependencies |
ObjectDependencies.fromJSON, grouped into Map<number, number[]>
|
Usage Examples
Fetching All Relations
import { getRelations, relationType } from "../lib/api/streaming"
const relations = await getRelations()
// Returns sorted concat of: MVs, tables, indexes, sinks, sources, subscriptions
relations.forEach((r) => {
console.log(`${r.name} (${relationType(r)}) - ID: ${r.id}`)
})
Checking If Relation Is a Streaming Job
import { relationIsStreamingJob, relationType } from "../lib/api/streaming"
// Returns true for all types except "UNKNOWN", "SOURCE", and "INTERNAL"
if (relationIsStreamingJob(relation)) {
// This relation has a streaming execution plan
}
Fetching Fragment Data
import { getFragmentsByJobId } from "../lib/api/streaming"
const fragments = await getFragmentsByJobId(42)
// Returns TableFragments protobuf object with fragment topology
Building Dependency Map
import { getObjectDependencies } from "../lib/api/streaming"
const deps = await getObjectDependencies()
// Map<number, number[]> where key is object ID and value is list of referenced object IDs
const dependsOn = deps.get(relationId) // [id1, id2, ...]
Related Pages
- Risingwavelabs_Risingwave_Dashboard_API_Client - The underlying HTTP client used by all fetch functions
- Risingwavelabs_Risingwave_UseFetch_Hook - The React hook that wraps these async functions for component usage
- Risingwavelabs_Risingwave_Relation_Graph_Page - Primary consumer that visualizes relations and their dependencies
- Risingwavelabs_Risingwave_StreamingStats_Callbacks - Provides streaming stats that overlay on the data from this module
- Risingwavelabs_Risingwave_Cluster_API - Complementary API module for cluster topology data