Implementation:Risingwavelabs Risingwave DbzSourceUtils
| Knowledge Sources | |
|---|---|
| Domains | Connectors, CDC, Debezium, PostgreSQL |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Utility class providing operational support functions for Debezium-based CDC sources, including PostgreSQL publication management and streaming readiness detection.
Description
The DbzSourceUtils class offers three major areas of functionality:
PostgreSQL Publication Management: Methods createPostgresPublicationInValidate and createPostgresPublicationInSourceExecutor handle automatic creation of PostgreSQL publications needed for logical replication. The inner method createPostgresPublicationInner connects to PostgreSQL via JDBC, checks if the publication already exists using a SQL query, and creates it if missing. For PostgreSQL 13+, it automatically includes the publish_via_partition_root = true option to support CDC on partitioned tables. Table-specific publications are created when schema and table names are provided; otherwise, a general publication is created.
Streaming Readiness Detection: The waitForStreamingRunning method polls Debezium JMX MBeans to detect when the streaming phase of the CDC engine has started. It supports MySQL, PostgreSQL, and SQL Server connectors. The polling loop checks the Connected attribute on the Debezium metrics MBean with a configurable timeout, sleeping one second between polls.
JMX Metrics Object Names: The getStreamingMetricsObjectName method constructs the appropriate JMX ObjectName for each connector type. SQL Server uses a task-based naming convention while MySQL and PostgreSQL use a simpler server-based pattern.
Usage
These utilities are called by the CDC engine runners and source handlers during source connector lifecycle management. Publication creation is invoked both during validation and during actual source execution. Streaming readiness detection is used to confirm the Debezium engine has successfully connected to the upstream database before proceeding with backfill operations.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java
- Lines: 223
Signature
public class DbzSourceUtils {
public static void createPostgresPublicationInValidate(Map<String, String> properties)
throws SQLException { ... }
public static void createPostgresPublicationInSourceExecutor(
Map<String, String> properties, long sourceId) throws SQLException { ... }
public static boolean waitForStreamingRunning(
SourceTypeE sourceType, String dbServerName, int waitStreamingStartTimeout) { ... }
}
Import
import com.risingwave.connector.source.common.DbzSourceUtils;
I/O Contract
createPostgresPublicationInValidate / createPostgresPublicationInSourceExecutor
| Name | Type | Required | Description |
|---|---|---|---|
| properties | Map<String, String> | Yes | Must contain host, port, database.name, user, password, publication.name, publication.autocreate.mode, and optionally schema.name and table.name |
| sourceId | long | Only for SourceExecutor variant | The source ID for logging context |
waitForStreamingRunning
| Name | Type | Required | Description |
|---|---|---|---|
| sourceType | SourceTypeE | Yes | The type of CDC source (MYSQL, POSTGRES, SQL_SERVER) |
| dbServerName | String | Yes | The Debezium server name used in JMX bean naming |
| waitStreamingStartTimeout | int | Yes | Maximum number of seconds to wait before timing out |
Outputs
| Method | Return Type | Description |
|---|---|---|
| createPostgresPublication* | void | Creates publication as a side effect; throws SQLException on failure |
| waitForStreamingRunning | boolean | Returns true if streaming started within timeout, false otherwise |
Usage Examples
PostgreSQL Publication Creation
// During validation, auto-create publication if configured
Map<String, String> props = request.getPropertiesMap();
DbzSourceUtils.createPostgresPublicationInValidate(props);
Waiting for Streaming to Start
// Wait up to 60 seconds for the Debezium streaming phase to begin
boolean started = DbzSourceUtils.waitForStreamingRunning(
SourceTypeE.POSTGRES,
"rw_source_1",
60
);
if (!started) {
LOG.error("Streaming source failed to start within timeout");
}