Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Mage ai Mage ai PostgreSQL Decoders

From Leeroopedia
Revision as of 15:36, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Mage_ai_Mage_ai_PostgreSQL_Decoders.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Integration, PostgreSQL, CDC
Last Updated 2026-02-09 00:00 GMT

Overview

Binary protocol decoders for PostgreSQL logical replication (pgoutput) messages, used by the Mage PostgreSQL source connector for Change Data Capture (CDC).

Description

This module implements a complete set of decoders for the PostgreSQL pgoutput logical replication protocol. The abstract base class PgoutputMessage provides binary buffer reading primitives (read_int8, read_int16, read_int32, read_int64, read_utf8, read_timestamp, read_string, read_tuple_data) and defines the decode_buffer interface. Concrete message classes decode specific replication events: Begin (transaction start with LSN and timestamp), Commit (transaction end), Relation (table schema metadata with column names, types, and primary key flags), Insert (new row with tuple data), Update (modified row with optional old tuple), Delete (removed row), and Truncate (table truncation). Helper dataclasses ColumnData and TupleData represent decoded column values with support for NULL, TOASTed (unchanged), and text-formatted data categories. Utility functions convert_pg_ts, convert_bytes_to_int, and convert_bytes_to_utf8 handle low-level type conversions from the PostgreSQL epoch (2000-01-01).

Usage

Used internally by the PostgreSQL source connector's CDC replication slot consumer to decode binary WAL messages into structured Python objects for downstream processing and record emission.

Code Reference

Source Location

  • Repository: mage-ai
  • File: mage_integrations/mage_integrations/sources/postgresql/decoders.py
  • Lines: 1-396

Signature

# Utility functions
def convert_pg_ts(_ts_in_microseconds: int) -> datetime: ...
def convert_bytes_to_int(_in_bytes: bytes) -> int: ...
def convert_bytes_to_utf8(_in_bytes: Union[bytes, bytearray]) -> str: ...

# Data classes
@dataclass(frozen=True)
class ColumnData:
    col_data_category: Optional[str]
    col_data_length: Optional[int] = None
    col_data: Optional[str] = None

@dataclass(frozen=True)
class TupleData:
    n_columns: Optional[int]
    column_data: Optional[List[ColumnData]]

# Abstract base class
class PgoutputMessage(ABC):
    def __init__(self, buffer: bytes): ...
    def decode_buffer(self) -> None: ...
    def read_int8(self) -> int: ...
    def read_int16(self) -> int: ...
    def read_int32(self) -> int: ...
    def read_int64(self) -> int: ...
    def read_utf8(self, n: int = 1) -> str: ...
    def read_timestamp(self) -> datetime: ...
    def read_string(self) -> str: ...
    def read_tuple_data(self) -> TupleData: ...

# Concrete message decoders
class Begin(PgoutputMessage): ...
class Commit(PgoutputMessage): ...
class Relation(PgoutputMessage): ...
class Insert(PgoutputMessage): ...
class Update(PgoutputMessage): ...
class Delete(PgoutputMessage): ...
class Truncate(PgoutputMessage): ...

Import

from mage_integrations.sources.postgresql.decoders import (
    Begin, Commit, Relation, Insert, Update, Delete, Truncate,
)

I/O Contract

Inputs

Name Type Required Description
buffer bytes Yes Raw binary WAL message from PostgreSQL logical replication slot

Outputs

Name Type Description
message PgoutputMessage Decoded message object with structured fields (LSN, timestamp, relation info, tuple data)

Message Classes

Class Byte Marker Key Fields Description
Begin B final_tx_lsn, commit_tx_ts, tx_xid Transaction begin with final LSN and commit timestamp
Commit C flags, lsn_commit, final_tx_lsn, commit_tx_ts Transaction commit with LSN and timestamp
Origin O (not implemented) Replication origin marker
Relation R relation_id, namespace, relation_name, columns Table schema with column names, types, and PK flags
PgType Y (not implemented) Custom type definition
Insert I relation_id, new_tuple New row insert with full tuple data
Update U relation_id, old_tuple, new_tuple Row update with optional old values and new values
Delete D relation_id, old_tuple Row deletion with identifying tuple data
Truncate T n_relations, options, relation_ids Table truncation event

Key Behaviors

Binary Buffer Reading

All integer values are read as big-endian signed integers. Strings are null-terminated UTF-8 sequences. Timestamps are microseconds since the PostgreSQL epoch (2000-01-01 00:00:00 UTC), converted to Python datetime objects.

Tuple Data Decoding

Each column in a tuple is identified by a category byte: n (NULL), u (unchanged TOASTed value not sent), or t (text-formatted data with a 4-byte length prefix followed by the UTF-8 value).

Relation Metadata

The Relation message decodes table schema information including the relation ID, namespace (schema), table name, replica identity setting, and a list of columns with their part-of-primary-key flag, name, OID data type, and type modifier.

Usage Examples

from mage_integrations.sources.postgresql.decoders import Begin, Insert, Relation

# Decode a Begin message from raw WAL bytes
raw_begin = b'B\x00\x00\x00\x00...'  # actual WAL bytes
begin_msg = Begin(raw_begin)
print(begin_msg.tx_xid)         # Transaction ID
print(begin_msg.commit_tx_ts)   # Commit timestamp as datetime

# Decode a Relation message
raw_relation = b'R\x00\x00...'  # actual WAL bytes
rel_msg = Relation(raw_relation)
print(rel_msg.relation_name)    # Table name
print(rel_msg.columns)          # [(pkey_flag, col_name, type_oid, modifier), ...]

# Decode an Insert message
raw_insert = b'I\x00\x00...'   # actual WAL bytes
ins_msg = Insert(raw_insert)
for col in ins_msg.new_tuple.column_data:
    print(col.col_data)         # Column value as string

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment