Implementation:Risingwavelabs Risingwave PgOutputMessageDecoder
| Knowledge Sources | |
|---|---|
| Domains | CDC, Connectors, PostgreSQL, Java |
| Last Updated | 2026-02-09 07:00 GMT |
Overview
Patched Debezium pgoutput message decoder for PostgreSQL that parses binary replication messages from the pgoutput logical decoding plugin.
Description
PgOutputMessageDecoder extends Debezium's AbstractMessageDecoder to decode messages from PostgreSQL's built-in pgoutput logical decoding plugin. It is the core decoder enabling RisingWave to process PostgreSQL logical replication messages. The protocol specification is defined at the PostgreSQL protocol documentation.
The class defines a MessageType enum mapping single-character type codes to message types:
- R = RELATION, B = BEGIN, C = COMMIT
- I = INSERT, U = UPDATE, D = DELETE
- Y = TYPE, O = ORIGIN, T = TRUNCATE
- M = LOGICAL_DECODING_MESSAGE
Message Filtering (shouldMessageBeSkipped): TYPE and ORIGIN messages are always skipped. TRUNCATE messages are skipped unless truncate events are explicitly included. BEGIN, COMMIT, and RELATION messages are never skipped (they maintain stream state). INSERT, UPDATE, DELETE, and LOGICAL_DECODING_MESSAGE messages defer to the default skip logic based on LSN position.
Message Processing (processNotEmptyMessage): The main dispatch method reads the message type byte and delegates to specialized handlers:
- handleBeginMessage: Parses the final LSN, commit timestamp (using the PG epoch of 2000-01-01), and transaction ID.
- handleCommitMessage: Parses flags, commit LSN, end LSN, and commit timestamp.
- handleRelationMessage: A particularly important handler that parses relation metadata (relation ID, schema, table, replica identity, columns). It performs out-of-band database metadata queries to resolve column defaults, optionality, and primary keys. For RisingWave, it also dispatches SchemaChangeEvent if schema change recording is enabled. It handles temporal ordering issues where primary key metadata may reflect future DDL changes.
- decodeInsert/decodeUpdate/decodeDelete: Parse DML messages with column values.
- decodeTruncate: Parses truncate messages for affected relations.
- handleLogicalDecodingMessage: Processes custom logical decoding messages.
Stream Options (defaultOptions): Configures the replication slot with proto_version=1 and the configured publication name. For PostgreSQL 14+, enables the messages option.
Usage
This decoder is instantiated by the LogicalDecoder.PGOUTPUT enum in PostgresConnectorConfig and used by the ReplicationConnection to decode binary messages from the pgoutput replication slot.
Code Reference
Source Location
- Repository: risingwave
- File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java (L80-961)
Signature
public class PgOutputMessageDecoder extends AbstractMessageDecoder {
public enum MessageType {
RELATION, BEGIN, COMMIT, INSERT, UPDATE, DELETE,
TYPE, ORIGIN, TRUNCATE, LOGICAL_DECODING_MESSAGE;
public static MessageType forType(char type);
}
public PgOutputMessageDecoder(
MessageDecoderContext decoderContext, PostgresConnection connection);
@Override
public boolean shouldMessageBeSkipped(ByteBuffer buffer,
Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition);
@Override
public void processNotEmptyMessage(ByteBuffer buffer,
ReplicationMessageProcessor processor,
TypeRegistry typeRegistry) throws SQLException, InterruptedException;
@Override
public ChainedLogicalStreamBuilder defaultOptions(
ChainedLogicalStreamBuilder builder,
Function<Integer, Boolean> hasMinimumServerVersion);
}
Import
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| decoderContext | MessageDecoderContext | Yes | Provides access to connector config, schema, and table filters |
| connection | PostgresConnection | Yes | JDBC connection for out-of-band metadata queries during RELATION message processing |
| buffer | ByteBuffer | Yes (for processing) | Raw binary message from the pgoutput replication stream |
| processor | ReplicationMessageProcessor | Yes (for processing) | Callback processor that receives decoded messages and dispatches events |
| typeRegistry | TypeRegistry | Yes (for processing) | Registry mapping PostgreSQL type OIDs to type metadata |
Outputs
| Name | Type | Description |
|---|---|---|
| TransactionMessage | via processor.process() | BEGIN and COMMIT messages with transaction ID and commit timestamp |
| DML messages | via processor (decodeInsert/Update/Delete) | Decoded INSERT, UPDATE, DELETE messages with column values |
| SchemaChangeEvent | via dispatcher | RELATION messages dispatched as schema change events when schema change recording is enabled |
| Table | Table | Resolved table definition from RELATION messages, applied to the schema |
| shouldSkip | boolean | Whether a message should be skipped based on type and LSN position |
Usage Examples
Decoder Instantiation via LogicalDecoder Enum
// The decoder is typically created via the LogicalDecoder enum
LogicalDecoder decoder = PostgresConnectorConfig.LogicalDecoder.PGOUTPUT;
MessageDecoder messageDecoder = decoder.messageDecoder(decoderContext, connection);
// The decoder is then used by ReplicationConnection to process messages
stream.readPending(messageProcessor);
// Internally calls: messageDecoder.processNotEmptyMessage(buffer, processor, typeRegistry)
Message Type Dispatch
pgoutput binary stream
|
v
Read first byte --> MessageType.forType(char)
|
+-- 'B' (BEGIN) --> handleBeginMessage()
+-- 'C' (COMMIT) --> handleCommitMessage()
+-- 'R' (RELATION) --> handleRelationMessage() + dispatchSchemaChangeEvent
+-- 'I' (INSERT) --> decodeInsert()
+-- 'U' (UPDATE) --> decodeUpdate()
+-- 'D' (DELETE) --> decodeDelete()
+-- 'T' (TRUNCATE) --> decodeTruncate() (if included)
+-- 'M' (MESSAGE) --> handleLogicalDecodingMessage()
+-- 'Y'/'O' --> skipped (TYPE/ORIGIN)