Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave StreamingStats Callbacks

From Leeroopedia


Property Value
Module Streaming Stats Callbacks
File dashboard/lib/api/streamingStats.ts
Language TypeScript
Lines 143
Type Factory Function Module
Dependencies api.ts, proto/gen/monitor_service, fragment_graph (ChannelStatsSnapshot)

Overview

streamingStats.ts provides a factory function for creating streaming statistics refresh callbacks that fetch back-pressure, throughput, and latency data. The createStreamingStatsRefresh function returns a refresh function that first tries fetching from the Prometheus-backed endpoint (/metrics/streaming_stats_prometheus), and falls back to the embedded dashboard endpoint (/metrics/streaming_stats) on failure. It supports time-based queries via at and time_offset URL parameters and dispatches relation or fragment stats based on the statsType parameter. This module powers the real-time statistics overlay on both the Relation Graph and Fragment Graph pages.

Code Reference

Source Location

dashboard/lib/api/streamingStats.ts

Signature

export type StatsType = "relation" | "fragment"

export interface StreamingStatsCallbacks {
  setChannelStats: (stats: Map<string, ChannelDeltaStats> | undefined) => void
  setRelationStats?: (
    stats: { [key: number]: RelationStats } | undefined
  ) => void
  setFragmentStats?: (
    stats: { [key: number]: FragmentStats } | undefined
  ) => void
  toast: (
    error: any,
    status?: "info" | "warning" | "success" | "error" | "loading"
  ) => void
}

export interface TimeParams {
  at?: number       // Unix timestamp in seconds
  timeOffset?: number // Time offset in seconds
}

export function createStreamingStatsRefresh(
  callbacks: StreamingStatsCallbacks,
  initialSnapshot: ChannelStatsSnapshot | undefined,
  statsType: StatsType,
  timeParams?: TimeParams
): () => void

Import

import {
  createStreamingStatsRefresh,
  StreamingStatsCallbacks,
  TimeParams,
  StatsType,
} from "../lib/api/streamingStats"

I/O Contract

createStreamingStatsRefresh Parameters

Parameter Type Description
callbacks StreamingStatsCallbacks Object containing state setter functions and a toast function for error display
initialSnapshot undefined Initial snapshot for computing delta rates in the embedded fallback path
statsType StatsType Either "relation" or "fragment", determines which stats setter is called
timeParams TimeParams? Optional time-travel parameters (at timestamp and timeOffset)
Returns () => void A parameterless refresh function to be called directly or via setInterval

StreamingStatsCallbacks Interface

Callback Type Description
setChannelStats (Map<string, ChannelDeltaStats>) => void Required. Sets the channel-level stats map keyed by sourceFragment_targetFragment
setRelationStats ({ [key: number]: RelationStats }) => void Optional. Called when statsType === "relation"
setFragmentStats ({ [key: number]: FragmentStats }) => void Optional. Called when statsType === "fragment"
toast (error, status?) => void Required. Error display callback

ChannelDeltaStats Fields

Field Type Description
actorCount number Number of actors in the channel
backpressureRate number Back-pressure rate (0 to 1)
recvThroughput number Receive throughput
sendThroughput number Send throughput

Data Flow

The refresh function follows a two-tier fetching strategy:

  1. Primary: Fetch from /metrics/streaming_stats_prometheus (Prometheus-backed)
    • Deserializes via GetStreamingPrometheusStatsResponse.fromJSON
    • Directly maps channel stats to ChannelDeltaStats
  2. Fallback: On primary failure, fetch from /metrics/streaming_stats (embedded dashboard)
    • Deserializes via GetStreamingStatsResponse.fromJSON
    • Uses ChannelStatsSnapshot to compute delta rates from the initial snapshot
  3. Error: If both fail, calls callbacks.toast with the error

Usage Examples

Setting Up Periodic Stats Refresh

import { createStreamingStatsRefresh } from "../lib/api/streamingStats"

useEffect(() => {
  let initialSnapshot: ChannelStatsSnapshot | undefined

  const refresh = createStreamingStatsRefresh(
    {
      setChannelStats,
      setRelationStats,
      toast,
    },
    initialSnapshot,
    "relation",
    timeParams
  )

  refresh() // run once immediately
  const interval = setInterval(refresh, 5000) // then every 5s
  return () => clearInterval(interval)
}, [toast, timeParams])

With Time Parameters

const timeParams: TimeParams = {
  at: 1705312200,    // Unix timestamp (seconds)
  timeOffset: 3600,  // 1 hour offset
}

const refresh = createStreamingStatsRefresh(
  callbacks,
  undefined,
  "fragment",
  timeParams
)
// This will fetch: /metrics/streaming_stats_prometheus?at=1705312200&time_offset=3600

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment