Implementation:Mage ai Mage ai PostgreSQL Decoders
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, PostgreSQL, CDC |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Binary protocol decoders for PostgreSQL logical replication (pgoutput) messages, used by the Mage PostgreSQL source connector for Change Data Capture (CDC).
Description
This module implements a complete set of decoders for the PostgreSQL pgoutput logical replication protocol. The abstract base class PgoutputMessage provides binary buffer reading primitives (read_int8, read_int16, read_int32, read_int64, read_utf8, read_timestamp, read_string, read_tuple_data) and defines the decode_buffer interface. Concrete message classes decode specific replication events: Begin (transaction start with LSN and timestamp), Commit (transaction end), Relation (table schema metadata with column names, types, and primary key flags), Insert (new row with tuple data), Update (modified row with optional old tuple), Delete (removed row), and Truncate (table truncation). Helper dataclasses ColumnData and TupleData represent decoded column values with support for NULL, TOASTed (unchanged), and text-formatted data categories. Utility functions convert_pg_ts, convert_bytes_to_int, and convert_bytes_to_utf8 handle low-level type conversions from the PostgreSQL epoch (2000-01-01).
Usage
Used internally by the PostgreSQL source connector's CDC replication slot consumer to decode binary WAL messages into structured Python objects for downstream processing and record emission.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/postgresql/decoders.py
- Lines: 1-396
Signature
# Utility functions
def convert_pg_ts(_ts_in_microseconds: int) -> datetime: ...
def convert_bytes_to_int(_in_bytes: bytes) -> int: ...
def convert_bytes_to_utf8(_in_bytes: Union[bytes, bytearray]) -> str: ...
# Data classes
@dataclass(frozen=True)
class ColumnData:
col_data_category: Optional[str]
col_data_length: Optional[int] = None
col_data: Optional[str] = None
@dataclass(frozen=True)
class TupleData:
n_columns: Optional[int]
column_data: Optional[List[ColumnData]]
# Abstract base class
class PgoutputMessage(ABC):
def __init__(self, buffer: bytes): ...
def decode_buffer(self) -> None: ...
def read_int8(self) -> int: ...
def read_int16(self) -> int: ...
def read_int32(self) -> int: ...
def read_int64(self) -> int: ...
def read_utf8(self, n: int = 1) -> str: ...
def read_timestamp(self) -> datetime: ...
def read_string(self) -> str: ...
def read_tuple_data(self) -> TupleData: ...
# Concrete message decoders
class Begin(PgoutputMessage): ...
class Commit(PgoutputMessage): ...
class Relation(PgoutputMessage): ...
class Insert(PgoutputMessage): ...
class Update(PgoutputMessage): ...
class Delete(PgoutputMessage): ...
class Truncate(PgoutputMessage): ...
Import
from mage_integrations.sources.postgresql.decoders import (
Begin, Commit, Relation, Insert, Update, Delete, Truncate,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| buffer | bytes | Yes | Raw binary WAL message from PostgreSQL logical replication slot |
Outputs
| Name | Type | Description |
|---|---|---|
| message | PgoutputMessage | Decoded message object with structured fields (LSN, timestamp, relation info, tuple data) |
Message Classes
| Class | Byte Marker | Key Fields | Description |
|---|---|---|---|
| Begin | B | final_tx_lsn, commit_tx_ts, tx_xid | Transaction begin with final LSN and commit timestamp |
| Commit | C | flags, lsn_commit, final_tx_lsn, commit_tx_ts | Transaction commit with LSN and timestamp |
| Origin | O | (not implemented) | Replication origin marker |
| Relation | R | relation_id, namespace, relation_name, columns | Table schema with column names, types, and PK flags |
| PgType | Y | (not implemented) | Custom type definition |
| Insert | I | relation_id, new_tuple | New row insert with full tuple data |
| Update | U | relation_id, old_tuple, new_tuple | Row update with optional old values and new values |
| Delete | D | relation_id, old_tuple | Row deletion with identifying tuple data |
| Truncate | T | n_relations, options, relation_ids | Table truncation event |
Key Behaviors
Binary Buffer Reading
All integer values are read as big-endian signed integers. Strings are null-terminated UTF-8 sequences. Timestamps are microseconds since the PostgreSQL epoch (2000-01-01 00:00:00 UTC), converted to Python datetime objects.
Tuple Data Decoding
Each column in a tuple is identified by a category byte: n (NULL), u (unchanged TOASTed value not sent), or t (text-formatted data with a 4-byte length prefix followed by the UTF-8 value).
Relation Metadata
The Relation message decodes table schema information including the relation ID, namespace (schema), table name, replica identity setting, and a list of columns with their part-of-primary-key flag, name, OID data type, and type modifier.
Usage Examples
from mage_integrations.sources.postgresql.decoders import Begin, Insert, Relation
# Decode a Begin message from raw WAL bytes
raw_begin = b'B\x00\x00\x00\x00...' # actual WAL bytes
begin_msg = Begin(raw_begin)
print(begin_msg.tx_xid) # Transaction ID
print(begin_msg.commit_tx_ts) # Commit timestamp as datetime
# Decode a Relation message
raw_relation = b'R\x00\x00...' # actual WAL bytes
rel_msg = Relation(raw_relation)
print(rel_msg.relation_name) # Table name
print(rel_msg.columns) # [(pkey_flag, col_name, type_oid, modifier), ...]
# Decode an Insert message
raw_insert = b'I\x00\x00...' # actual WAL bytes
ins_msg = Insert(raw_insert)
for col in ins_msg.new_tuple.column_data:
print(col.col_data) # Column value as string