Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Risingwavelabs Risingwave PgOutputMessageDecoder

From Leeroopedia


Knowledge Sources
Domains CDC, Connectors, PostgreSQL, Java
Last Updated 2026-02-09 07:00 GMT

Overview

Patched Debezium pgoutput message decoder for PostgreSQL that parses binary replication messages from the pgoutput logical decoding plugin.

Description

PgOutputMessageDecoder extends Debezium's AbstractMessageDecoder to decode messages from PostgreSQL's built-in pgoutput logical decoding plugin. It is the core decoder enabling RisingWave to process PostgreSQL logical replication messages. The protocol specification is defined at the PostgreSQL protocol documentation.

The class defines a MessageType enum mapping single-character type codes to message types:

  • R = RELATION, B = BEGIN, C = COMMIT
  • I = INSERT, U = UPDATE, D = DELETE
  • Y = TYPE, O = ORIGIN, T = TRUNCATE
  • M = LOGICAL_DECODING_MESSAGE

Message Filtering (shouldMessageBeSkipped): TYPE and ORIGIN messages are always skipped. TRUNCATE messages are skipped unless truncate events are explicitly included. BEGIN, COMMIT, and RELATION messages are never skipped (they maintain stream state). INSERT, UPDATE, DELETE, and LOGICAL_DECODING_MESSAGE messages defer to the default skip logic based on LSN position.

Message Processing (processNotEmptyMessage): The main dispatch method reads the message type byte and delegates to specialized handlers:

  • handleBeginMessage: Parses the final LSN, commit timestamp (using the PG epoch of 2000-01-01), and transaction ID.
  • handleCommitMessage: Parses flags, commit LSN, end LSN, and commit timestamp.
  • handleRelationMessage: A particularly important handler that parses relation metadata (relation ID, schema, table, replica identity, columns). It performs out-of-band database metadata queries to resolve column defaults, optionality, and primary keys. For RisingWave, it also dispatches SchemaChangeEvent if schema change recording is enabled. It handles temporal ordering issues where primary key metadata may reflect future DDL changes.
  • decodeInsert/decodeUpdate/decodeDelete: Parse DML messages with column values.
  • decodeTruncate: Parses truncate messages for affected relations.
  • handleLogicalDecodingMessage: Processes custom logical decoding messages.

Stream Options (defaultOptions): Configures the replication slot with proto_version=1 and the configured publication name. For PostgreSQL 14+, enables the messages option.

Usage

This decoder is instantiated by the LogicalDecoder.PGOUTPUT enum in PostgresConnectorConfig and used by the ReplicationConnection to decode binary messages from the pgoutput replication slot.

Code Reference

Source Location

  • Repository: risingwave
  • File: java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java (L80-961)

Signature

public class PgOutputMessageDecoder extends AbstractMessageDecoder {

    public enum MessageType {
        RELATION, BEGIN, COMMIT, INSERT, UPDATE, DELETE,
        TYPE, ORIGIN, TRUNCATE, LOGICAL_DECODING_MESSAGE;
        public static MessageType forType(char type);
    }

    public PgOutputMessageDecoder(
            MessageDecoderContext decoderContext, PostgresConnection connection);

    @Override
    public boolean shouldMessageBeSkipped(ByteBuffer buffer,
            Lsn lastReceivedLsn, Lsn startLsn, WalPositionLocator walPosition);

    @Override
    public void processNotEmptyMessage(ByteBuffer buffer,
            ReplicationMessageProcessor processor,
            TypeRegistry typeRegistry) throws SQLException, InterruptedException;

    @Override
    public ChainedLogicalStreamBuilder defaultOptions(
            ChainedLogicalStreamBuilder builder,
            Function<Integer, Boolean> hasMinimumServerVersion);
}

Import

import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.WalPositionLocator;

I/O Contract

Inputs

Name Type Required Description
decoderContext MessageDecoderContext Yes Provides access to connector config, schema, and table filters
connection PostgresConnection Yes JDBC connection for out-of-band metadata queries during RELATION message processing
buffer ByteBuffer Yes (for processing) Raw binary message from the pgoutput replication stream
processor ReplicationMessageProcessor Yes (for processing) Callback processor that receives decoded messages and dispatches events
typeRegistry TypeRegistry Yes (for processing) Registry mapping PostgreSQL type OIDs to type metadata

Outputs

Name Type Description
TransactionMessage via processor.process() BEGIN and COMMIT messages with transaction ID and commit timestamp
DML messages via processor (decodeInsert/Update/Delete) Decoded INSERT, UPDATE, DELETE messages with column values
SchemaChangeEvent via dispatcher RELATION messages dispatched as schema change events when schema change recording is enabled
Table Table Resolved table definition from RELATION messages, applied to the schema
shouldSkip boolean Whether a message should be skipped based on type and LSN position

Usage Examples

Decoder Instantiation via LogicalDecoder Enum

// The decoder is typically created via the LogicalDecoder enum
LogicalDecoder decoder = PostgresConnectorConfig.LogicalDecoder.PGOUTPUT;
MessageDecoder messageDecoder = decoder.messageDecoder(decoderContext, connection);

// The decoder is then used by ReplicationConnection to process messages
stream.readPending(messageProcessor);
// Internally calls: messageDecoder.processNotEmptyMessage(buffer, processor, typeRegistry)

Message Type Dispatch

pgoutput binary stream
    |
    v
Read first byte --> MessageType.forType(char)
    |
    +-- 'B' (BEGIN)    --> handleBeginMessage()
    +-- 'C' (COMMIT)   --> handleCommitMessage()
    +-- 'R' (RELATION)  --> handleRelationMessage() + dispatchSchemaChangeEvent
    +-- 'I' (INSERT)    --> decodeInsert()
    +-- 'U' (UPDATE)    --> decodeUpdate()
    +-- 'D' (DELETE)    --> decodeDelete()
    +-- 'T' (TRUNCATE)  --> decodeTruncate() (if included)
    +-- 'M' (MESSAGE)   --> handleLogicalDecodingMessage()
    +-- 'Y'/'O'         --> skipped (TYPE/ORIGIN)

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment