Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Risingwavelabs Risingwave DbzSourceUtils

From Leeroopedia


Knowledge Sources
Domains Connectors, CDC, Debezium, PostgreSQL
Last Updated 2026-02-09 07:00 GMT

Overview

Utility class providing operational support functions for Debezium-based CDC sources, including PostgreSQL publication management and streaming readiness detection.

Description

The DbzSourceUtils class offers three major areas of functionality:

PostgreSQL Publication Management: Methods createPostgresPublicationInValidate and createPostgresPublicationInSourceExecutor handle automatic creation of PostgreSQL publications needed for logical replication. The inner method createPostgresPublicationInner connects to PostgreSQL via JDBC, checks if the publication already exists using a SQL query, and creates it if missing. For PostgreSQL 13+, it automatically includes the publish_via_partition_root = true option to support CDC on partitioned tables. Table-specific publications are created when schema and table names are provided; otherwise, a general publication is created.

Streaming Readiness Detection: The waitForStreamingRunning method polls Debezium JMX MBeans to detect when the streaming phase of the CDC engine has started. It supports MySQL, PostgreSQL, and SQL Server connectors. The polling loop checks the Connected attribute on the Debezium metrics MBean with a configurable timeout, sleeping one second between polls.

JMX Metrics Object Names: The getStreamingMetricsObjectName method constructs the appropriate JMX ObjectName for each connector type. SQL Server uses a task-based naming convention while MySQL and PostgreSQL use a simpler server-based pattern.

Usage

These utilities are called by the CDC engine runners and source handlers during source connector lifecycle management. Publication creation is invoked both during validation and during actual source execution. Streaming readiness detection is used to confirm the Debezium engine has successfully connected to the upstream database before proceeding with backfill operations.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
  • Lines: 223

Signature

public class DbzSourceUtils {

    public static void createPostgresPublicationInValidate(Map<String, String> properties)
            throws SQLException { ... }

    public static void createPostgresPublicationInSourceExecutor(
            Map<String, String> properties, long sourceId) throws SQLException { ... }

    public static boolean waitForStreamingRunning(
            SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) { ... }
}

Import

import com.risingwave.connector.source.common.DbzSourceUtils;

I/O Contract

createPostgresPublicationInValidate / createPostgresPublicationInSourceExecutor

Name Type Required Description
properties Map<String, String> Yes Must contain host, port, database.name, user, password, publication.name, publication.autocreate.mode, and optionally schema.name and table.name
sourceId long Only for SourceExecutor variant The source ID for logging context

waitForStreamingRunning

Name Type Required Description
sourceType SourceTypeE Yes The type of CDC source (MYSQL, POSTGRES, SQL_SERVER)
dbServerName String Yes The Debezium server name used in JMX bean naming
waitStreamingStartTimeout int Yes Maximum number of seconds to wait before timing out

Outputs

Method Return Type Description
createPostgresPublication* void Creates publication as a side effect; throws SQLException on failure
waitForStreamingRunning boolean Returns true if streaming started within timeout, false otherwise

Usage Examples

PostgreSQL Publication Creation

// During validation, auto-create publication if configured
Map<String, String> props = request.getPropertiesMap();
DbzSourceUtils.createPostgresPublicationInValidate(props);

Waiting for Streaming to Start

// Wait up to 60 seconds for the Debezium streaming phase to begin
boolean started = DbzSourceUtils.waitForStreamingRunning(
    SourceTypeE.POSTGRES,
    "rw_source_1",
    60
);
if (!started) {
    LOG.error("Streaming source failed to start within timeout");
}

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment