Implementation:Risingwavelabs Risingwave ConnectorNodeMetrics
| Knowledge Sources | |
|---|---|
| Domains | Connectors, Observability, Metrics, Prometheus |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Observability component that defines, registers, and exposes Prometheus metrics for the Java connector node, covering source/sink connections, row throughput, CPU usage, memory usage, and error counts.
Description
The ConnectorNodeMetrics class manages all Prometheus metrics for the Java connector node. It defines the following static metric collectors:
Connection Metrics:
- active_source_connections (Counter) -- Tracks active source connections, labeled by source_type and ip.
- active_sink_connections (Counter) -- Tracks active sink connections, labeled by connector_type and ip.
- total_sink_connections (Counter) -- Tracks total sink connections over time, labeled by connector_type and ip.
Throughput Metrics:
- connector_source_rows_received (Counter) -- Number of rows received by source connectors, labeled by source_type and source_id.
- connector_sink_rows_received (Counter) -- Number of rows received by sink connectors, labeled by connector_type and sink_id.
System Metrics:
- process_cpu_seconds_total (Counter) -- Total CPU time spent in seconds, labeled by job.
- process_resident_memory_bytes (Gauge) -- Current RAM usage in bytes, labeled by job.
Error Metrics:
- error_count (Counter) -- Number of errors encountered, labeled by sink_type and ip.
The class includes a PeriodicMetricsCollector inner class that runs as a daemon thread, polling the JVM OperatingSystemMXBean every second to update CPU and memory metrics. The startHTTPServer method registers all metrics with a CollectorRegistry and starts a Prometheus HTTP exporter on the specified host and port.
Usage
The metrics are used throughout the connector node service. incSourceRowsReceived is called by DbzSourceHandler when CDC events are emitted. incActiveSinkConnections and decActiveSinkConnections are called during sink writer lifecycle management. The HTTP server is started during connector node initialization.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/metrics/ConnectorNodeMetrics.java
- Lines: 172
Signature
public class ConnectorNodeMetrics {
public static void startHTTPServer(String host, int port) { ... }
public static void incActiveSourceConnections(String sourceType, String ip) { ... }
public static void decActiveSourceConnections(String sourceType, String ip) { ... }
public static void incActiveSinkConnections(String connectorName, String ip) { ... }
public static void decActiveSinkConnections(String connectorName, String ip) { ... }
public static void incSourceRowsReceived(String sourceType, String sourceId, double amt) { ... }
public static void incSinkRowsReceived(String connectorName, String sinkId, double amt) { ... }
public static void incTotalConnections(String sinkType, String ip) { ... }
public static void incErrorCount(String sinkType, String ip) { ... }
public static void setRamUsage(String ip, long usedRamInBytes) { ... }
}
Import
import com.risingwave.metrics.ConnectorNodeMetrics;
I/O Contract
Metric Definitions
| Metric Name | Type | Labels | Description |
|---|---|---|---|
| active_source_connections | Counter | source_type, ip | Active source connection count |
| active_sink_connections | Counter | connector_type, ip | Active sink connection count |
| total_sink_connections | Counter | connector_type, ip | Total sink connections over time |
| connector_source_rows_received | Counter | source_type, source_id | Rows received by source connectors |
| connector_sink_rows_received | Counter | connector_type, sink_id | Rows received by sink connectors |
| process_cpu_seconds_total | Counter | job | Total CPU time in seconds |
| process_resident_memory_bytes | Gauge | job | Current RAM usage in bytes |
| error_count | Counter | sink_type, ip | Error count by sink type |
startHTTPServer
| Name | Type | Required | Description |
|---|---|---|---|
| host | String | Yes | Host address to bind the Prometheus HTTP exporter |
| port | int | Yes | Port number for the Prometheus HTTP exporter |
Usage Examples
Starting the Metrics Server
// Start Prometheus metrics exporter on port 50052
ConnectorNodeMetrics.startHTTPServer("0.0.0.0", 50052);
Recording Source Rows
// Called by DbzSourceHandler when CDC events are emitted
ConnectorNodeMetrics.incSourceRowsReceived(
config.getSourceType().toString(),
String.valueOf(config.getSourceId()),
resp.getEventsCount()
);
Tracking Sink Connections
// On sink connection open
ConnectorNodeMetrics.incActiveSinkConnections("jdbc", clientIp);
ConnectorNodeMetrics.incTotalConnections("jdbc", clientIp);
// On sink connection close
ConnectorNodeMetrics.decActiveSinkConnections("jdbc", clientIp);