Implementation:Risingwavelabs Risingwave StreamingStats Callbacks
| Property | Value |
|---|---|
| Module | Streaming Stats Callbacks |
| File | dashboard/lib/api/streamingStats.ts
|
| Language | TypeScript |
| Lines | 143 |
| Type | Factory Function Module |
| Dependencies | api.ts, proto/gen/monitor_service, fragment_graph (ChannelStatsSnapshot) |
Overview
streamingStats.ts provides a factory function for creating streaming statistics refresh callbacks that fetch back-pressure, throughput, and latency data. The createStreamingStatsRefresh function returns a refresh function that first tries fetching from the Prometheus-backed endpoint (/metrics/streaming_stats_prometheus), and falls back to the embedded dashboard endpoint (/metrics/streaming_stats) on failure. It supports time-based queries via at and time_offset URL parameters and dispatches relation or fragment stats based on the statsType parameter. This module powers the real-time statistics overlay on both the Relation Graph and Fragment Graph pages.
Code Reference
Source Location
dashboard/lib/api/streamingStats.ts
Signature
export type StatsType = "relation" | "fragment"
export interface StreamingStatsCallbacks {
setChannelStats: (stats: Map<string, ChannelDeltaStats> | undefined) => void
setRelationStats?: (
stats: { [key: number]: RelationStats } | undefined
) => void
setFragmentStats?: (
stats: { [key: number]: FragmentStats } | undefined
) => void
toast: (
error: any,
status?: "info" | "warning" | "success" | "error" | "loading"
) => void
}
export interface TimeParams {
at?: number // Unix timestamp in seconds
timeOffset?: number // Time offset in seconds
}
export function createStreamingStatsRefresh(
callbacks: StreamingStatsCallbacks,
initialSnapshot: ChannelStatsSnapshot | undefined,
statsType: StatsType,
timeParams?: TimeParams
): () => void
Import
import {
createStreamingStatsRefresh,
StreamingStatsCallbacks,
TimeParams,
StatsType,
} from "../lib/api/streamingStats"
I/O Contract
createStreamingStatsRefresh Parameters
| Parameter | Type | Description |
|---|---|---|
callbacks |
StreamingStatsCallbacks |
Object containing state setter functions and a toast function for error display |
initialSnapshot |
undefined | Initial snapshot for computing delta rates in the embedded fallback path |
statsType |
StatsType |
Either "relation" or "fragment", determines which stats setter is called
|
timeParams |
TimeParams? |
Optional time-travel parameters (at timestamp and timeOffset)
|
| Returns | () => void |
A parameterless refresh function to be called directly or via setInterval
|
StreamingStatsCallbacks Interface
| Callback | Type | Description |
|---|---|---|
setChannelStats |
(Map<string, ChannelDeltaStats>) => void |
Required. Sets the channel-level stats map keyed by sourceFragment_targetFragment
|
setRelationStats |
({ [key: number]: RelationStats }) => void |
Optional. Called when statsType === "relation"
|
setFragmentStats |
({ [key: number]: FragmentStats }) => void |
Optional. Called when statsType === "fragment"
|
toast |
(error, status?) => void |
Required. Error display callback |
ChannelDeltaStats Fields
| Field | Type | Description |
|---|---|---|
actorCount |
number |
Number of actors in the channel |
backpressureRate |
number |
Back-pressure rate (0 to 1) |
recvThroughput |
number |
Receive throughput |
sendThroughput |
number |
Send throughput |
Data Flow
The refresh function follows a two-tier fetching strategy:
- Primary: Fetch from
/metrics/streaming_stats_prometheus(Prometheus-backed)- Deserializes via
GetStreamingPrometheusStatsResponse.fromJSON - Directly maps channel stats to
ChannelDeltaStats
- Deserializes via
- Fallback: On primary failure, fetch from
/metrics/streaming_stats(embedded dashboard)- Deserializes via
GetStreamingStatsResponse.fromJSON - Uses
ChannelStatsSnapshotto compute delta rates from the initial snapshot
- Deserializes via
- Error: If both fail, calls
callbacks.toastwith the error
Usage Examples
Setting Up Periodic Stats Refresh
import { createStreamingStatsRefresh } from "../lib/api/streamingStats"
useEffect(() => {
let initialSnapshot: ChannelStatsSnapshot | undefined
const refresh = createStreamingStatsRefresh(
{
setChannelStats,
setRelationStats,
toast,
},
initialSnapshot,
"relation",
timeParams
)
refresh() // run once immediately
const interval = setInterval(refresh, 5000) // then every 5s
return () => clearInterval(interval)
}, [toast, timeParams])
With Time Parameters
const timeParams: TimeParams = {
at: 1705312200, // Unix timestamp (seconds)
timeOffset: 3600, // 1 hour offset
}
const refresh = createStreamingStatsRefresh(
callbacks,
undefined,
"fragment",
timeParams
)
// This will fetch: /metrics/streaming_stats_prometheus?at=1705312200&time_offset=3600
Related Pages
- Risingwavelabs_Risingwave_Dashboard_API_Client - The HTTP client used to fetch metrics endpoints
- Risingwavelabs_Risingwave_Relation_Graph_Page - Uses this module with
statsType: "relation"for relation-level back-pressure overlay - Risingwavelabs_Risingwave_BackPressure_Utils - Visualization functions that consume the stats data produced by this module
- Risingwavelabs_Risingwave_TimeControls - UI component that produces the
TimeParamsconsumed by this module - Risingwavelabs_Risingwave_TimeUtils - Time parsing utilities used by TimeControls to generate timestamp/offset values