Implementation:Risingwavelabs Risingwave Fragment Graph Page
| Property | Value |
|---|---|
| Page Component | fragment_graph (default export) |
| Source File | dashboard/pages/fragment_graph.tsx
|
| Language | TypeScript / TSX |
| Lines | 519 |
| Category | Dashboard Page |
| Framework | React, Next.js, D3.js, d3-dag, Chakra UI |
Overview
The fragment_graph page is a core debugging and monitoring page in the RisingWave dashboard for visualizing the internal structure of streaming job execution pipelines. It fetches streaming job data and fragment information from the API, builds d3 hierarchy trees for plan nodes within each fragment, constructs a directed acyclic graph (DAG) of fragment dependencies, and periodically polls channel and fragment statistics (every 5 seconds) to display real-time backpressure and throughput metrics.
The page provides a sidebar for selecting streaming jobs and searching by fragment ID or actor ID, and renders two graph visualizations: a FragmentDependencyGraph for the high-level fragment DAG, and a detailed FragmentGraph component showing plan nodes within fragments.
Code Reference
Source Location
dashboard/pages/fragment_graph.tsx
Full URL: dashboard/pages/fragment_graph.tsx
Signature
The page exports a default Next.js page component (no props) and the following named exports:
// Main page component
export default function FragmentGraphPage(): JSX.Element
// Data type for plan nodes used by FragmentGraph component
export interface PlanNodeDatum {
name: string
children?: PlanNodeDatum[]
operatorId: string | number
node: StreamNode | DispatcherNode
actorIds?: string[]
}
// Channel stats snapshot class for computing deltas between polling intervals
export class ChannelStatsSnapshot {
metrics: Map<string, ChannelStats>
time: number
constructor(metrics: Map<string, ChannelStats>, time: number)
}
Imports
import {
Box, Button, Flex, FormControl, FormLabel, HStack, Input, Select,
Table, TableContainer, Tbody, Td, Text, Tr, VStack,
} from "@chakra-ui/react"
import * as d3 from "d3"
import { dagStratify } from "d3-dag"
import _ from "lodash"
import Head from "next/head"
import { parseAsInteger, useQueryState } from "nuqs"
import { Fragment, useCallback, useEffect, useMemo, useState } from "react"
import FragmentDependencyGraph from "../components/FragmentDependencyGraph"
import FragmentGraph from "../components/FragmentGraph"
import TimeControls from "../components/TimeControls"
import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
getFragmentsByJobId, getRelationIdInfos, getStreamingJobs,
} from "../lib/api/streaming"
import { TimeParams, createStreamingStatsRefresh } from "../lib/api/streamingStats"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import {
ChannelDeltaStats, ChannelStats, FragmentStats,
} from "../proto/gen/monitor_service"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"
I/O Contract
Inputs (Page-level)
The page reads the streaming job ID from the URL query parameter id via useQueryState from the nuqs library.
API Dependencies
| API Function | Purpose |
|---|---|
getStreamingJobs() |
Fetches the list of all streaming jobs for the sidebar selector |
getFragmentsByJobId(jobId) |
Fetches fragment details for the selected streaming job |
getRelationIdInfos() |
Fetches relation ID mappings for fragment cross-references |
createStreamingStatsRefresh() |
Creates a polling function that periodically fetches channel and fragment stats |
Outputs
The page renders:
- A sidebar (225px wide) with a streaming job selector dropdown, fragment ID search, actor ID search, time controls, and a stats table
- A FragmentDependencyGraph showing the high-level fragment DAG
- A FragmentGraph component showing the detailed plan node tree within each fragment
Internal Types
interface DispatcherNode {
[actorId: number]: Dispatcher[]
fragment: TableFragments_Fragment
}
Key Functions
buildPlanNodeDependency
function buildPlanNodeDependency(
fragment: TableFragments_Fragment
): d3.HierarchyNode<PlanNodeDatum>
Builds a D3 hierarchy tree from a fragment's stream plan. Takes the first actor's node tree, recursively converts StreamNode instances into PlanNodeDatum objects, and creates a root dispatcher node with actor IDs. The dispatcher name is derived from the dispatcher type (e.g., "hashDispatcher", "multipleDispatchers", "noDispatcher").
findMergeNodes
function findMergeNodes(root: StreamNode): MergeNode[]
Recursively searches a stream plan tree for merge nodes, which represent the input end of inter-fragment channels.
buildFragmentDependencyAsEdges
function buildFragmentDependencyAsEdges(
fragments: TableFragments
): FragmentBox[]
Constructs the fragment dependency DAG as an array of FragmentBox objects. For each fragment, it identifies upstream fragment IDs from the fragment metadata, distinguishing between internal parent IDs (within the same streaming job) and external parent IDs (cross-job references).
ChannelStatsSnapshot
export class ChannelStatsSnapshot {
metrics: Map<string, ChannelStats>
time: number
}
Stores a snapshot of channel statistics at a point in time. Used to compute delta statistics (backpressure rate, throughput) between consecutive polling intervals.
Configuration Constants
const INTERVAL_MS = 5000 // Refresh interval for back pressure stats
const SIDEBAR_WIDTH = 225 // Width of the sidebar in pixels
Usage Examples
The page is accessible at /fragment_graph?id=<streaming_job_id> in the dashboard. Links from the Relations component's fragments column button (F) navigate directly to this page with the appropriate job ID.
/fragment_graph/?id=42
Related Pages
- Risingwavelabs_Risingwave_FragmentGraph_Component -- The main fragment graph visualization component rendered by this page
- Risingwavelabs_Risingwave_Relations_Component -- Provides the navigation links to this page via the fragments column
- Risingwavelabs_Risingwave_RelationGraph_Component -- Sibling graph visualization for relation-level dependencies
- Risingwavelabs_Risingwave_Dashboard_Streaming_API -- API layer used for fetching streaming jobs and fragments