Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave ConnectorNodeMetrics

From Leeroopedia


Knowledge Sources
Domains Connectors, Observability, Metrics, Prometheus
Last Updated 2026-02-09 07:00 GMT

Overview

Observability component that defines, registers, and exposes Prometheus metrics for the Java connector node, covering source/sink connections, row throughput, CPU usage, memory usage, and error counts.

Description

The ConnectorNodeMetrics class manages all Prometheus metrics for the Java connector node. It defines the following static metric collectors:

Connection Metrics:

  • active_source_connections (Counter) -- Tracks active source connections, labeled by source_type and ip.
  • active_sink_connections (Counter) -- Tracks active sink connections, labeled by connector_type and ip.
  • total_sink_connections (Counter) -- Tracks total sink connections over time, labeled by connector_type and ip.

Throughput Metrics:

  • connector_source_rows_received (Counter) -- Number of rows received by source connectors, labeled by source_type and source_id.
  • connector_sink_rows_received (Counter) -- Number of rows received by sink connectors, labeled by connector_type and sink_id.

System Metrics:

  • process_cpu_seconds_total (Counter) -- Total CPU time spent in seconds, labeled by job.
  • process_resident_memory_bytes (Gauge) -- Current RAM usage in bytes, labeled by job.

Error Metrics:

  • error_count (Counter) -- Number of errors encountered, labeled by sink_type and ip.

The class includes a PeriodicMetricsCollector inner class that runs as a daemon thread, polling the JVM OperatingSystemMXBean every second to update CPU and memory metrics. The startHTTPServer method registers all metrics with a CollectorRegistry and starts a Prometheus HTTP exporter on the specified host and port.

Usage

The metrics are used throughout the connector node service. incSourceRowsReceived is called by DbzSourceHandler when CDC events are emitted. incActiveSinkConnections and decActiveSinkConnections are called during sink writer lifecycle management. The HTTP server is started during connector node initialization.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/metrics/ConnectorNodeMetrics.java
  • Lines: 172

Signature

public class ConnectorNodeMetrics {

    public static void startHTTPServer(String host, int port) { ... }

    public static void incActiveSourceConnections(String sourceType, String ip) { ... }
    public static void decActiveSourceConnections(String sourceType, String ip) { ... }

    public static void incActiveSinkConnections(String connectorName, String ip) { ... }
    public static void decActiveSinkConnections(String connectorName, String ip) { ... }

    public static void incSourceRowsReceived(String sourceType, String sourceId, double amt) { ... }
    public static void incSinkRowsReceived(String connectorName, String sinkId, double amt) { ... }

    public static void incTotalConnections(String sinkType, String ip) { ... }
    public static void incErrorCount(String sinkType, String ip) { ... }
    public static void setRamUsage(String ip, long usedRamInBytes) { ... }
}

Import

import com.risingwave.metrics.ConnectorNodeMetrics;

I/O Contract

Metric Definitions

Metric Name Type Labels Description
active_source_connections Counter source_type, ip Active source connection count
active_sink_connections Counter connector_type, ip Active sink connection count
total_sink_connections Counter connector_type, ip Total sink connections over time
connector_source_rows_received Counter source_type, source_id Rows received by source connectors
connector_sink_rows_received Counter connector_type, sink_id Rows received by sink connectors
process_cpu_seconds_total Counter job Total CPU time in seconds
process_resident_memory_bytes Gauge job Current RAM usage in bytes
error_count Counter sink_type, ip Error count by sink type

startHTTPServer

Name Type Required Description
host String Yes Host address to bind the Prometheus HTTP exporter
port int Yes Port number for the Prometheus HTTP exporter

Usage Examples

Starting the Metrics Server

// Start Prometheus metrics exporter on port 50052
ConnectorNodeMetrics.startHTTPServer("0.0.0.0", 50052);

Recording Source Rows

// Called by DbzSourceHandler when CDC events are emitted
ConnectorNodeMetrics.incSourceRowsReceived(
    config.getSourceType().toString(),
    String.valueOf(config.getSourceId()),
    resp.getEventsCount()
);

Tracking Sink Connections

// On sink connection open
ConnectorNodeMetrics.incActiveSinkConnections("jdbc", clientIp);
ConnectorNodeMetrics.incTotalConnections("jdbc", clientIp);

// On sink connection close
ConnectorNodeMetrics.decActiveSinkConnections("jdbc", clientIp);

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment