PostgreSQL Logical Replication support.
Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.
Quick start
let conn = await connectReplication("postgresql://user:pass@host/db") defer: await conn.close() let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true) await conn.startReplication("my_slot", slot.consistentPoint, options = {"proto_version": "'1'", "publication_names": "'my_pub'"}, callback = myCallback)
Types
BeginMessage = object finalLsn*: Lsn ## LSN of the commit record commitTime*: int64 ## Commit timestamp (microseconds since PG epoch) xid*: int32 ## Transaction ID
- Transaction begin.
CommitMessage = object flags*: byte commitLsn*: Lsn endLsn*: Lsn commitTime*: int64
- Transaction commit.
DeleteMessage = object relationId*: int32 oldTuple*: seq[TupleField]
- Row deletion.
InsertMessage = object relationId*: int32 newTuple*: seq[TupleField]
- Row insertion.
LogicalMessage = object flags*: byte ## Bit 0: transactional lsn*: Lsn prefix*: string content*: seq[byte]
- Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
- LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object originLsn*: Lsn originName*: string
- Replication origin.
PgOutputMessage = object case kind*: PgOutputMessageKind of pomkBegin: begin*: BeginMessage of pomkCommit: commit*: CommitMessage of pomkOrigin: origin*: OriginMessage of pomkRelation: relation*: RelationInfo of pomkType: typeMsg*: TypeMessage of pomkInsert: insert*: InsertMessage of pomkUpdate: update*: UpdateMessage of pomkDelete: delete*: DeleteMessage of pomkTruncate: truncate*: TruncateMessage of pomkMessage: message*: LogicalMessage
- A decoded pgoutput plugin message.
PgOutputMessageKind = enum pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert, pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
- Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object walEnd*: Lsn ## Current end of WAL on the server sendTime*: int64 ## Server send time (microseconds since PG epoch) replyRequested*: bool ## Whether the server wants an immediate status reply
- Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
- Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object flags*: byte ## Bit 0: part of replica identity key name*: string typeOid*: int32 typeMod*: int32
- A single column in a relation definition.
RelationInfo = object relationId*: int32 namespace*: string ## Schema name name*: string ## Table name replicaIdentity*: char ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index) columns*: seq[RelationColumn]
- Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
- Callback invoked for each replication message during streaming.
ReplicationMessage = object case kind*: ReplicationMessageKind of rmkXLogData: xlogData*: XLogData of rmkPrimaryKeepalive: keepalive*: PrimaryKeepalive
- A single message received during replication streaming.
ReplicationMessageKind = enum rmkXLogData, rmkPrimaryKeepalive
- Replication message types (decoded from CopyData during streaming)
ReplicationMode = enum rmDatabase, rmPhysical
- Replication mode selected at connection time. rmDatabase sends replication=database (logical replication + ability to run SQL on the chosen database). rmPhysical sends replication=true (physical replication; no SQL on user databases).
ReplicationSlotInfo = object slotName*: string consistentPoint*: Lsn ## confirmed_flush_lsn (logical) or restart_lsn (physical) snapshotName*: string ## Snapshot name (only available at CREATE time) outputPlugin*: string
- Information about a replication slot.
SystemInfo = object systemId*: string timeline*: int32 xLogPos*: Lsn dbName*: string
- Result of IDENTIFY_SYSTEM command.
TimelineHistory = object filename*: string ## Timeline history file name (e.g. "00000002.history"). content*: seq[byte] ## Raw history file content.
- Result of TIMELINE_HISTORY command.
TruncateMessage = object options*: byte ## Bit 0: CASCADE, bit 1: RESTART IDENTITY relationIds*: seq[int32]
- Table truncation.
TupleDataKind = enum tdkNull = 110, ## NULL value tdkText = 116, ## Text-formatted value tdkBinary = 98, ## Binary-formatted value (protocol_version >= 2) tdkUnchanged = 117 ## TOAST value unchanged
- Kind of a single field value in a pgoutput tuple.
TupleField = object kind*: TupleDataKind data*: seq[byte] ## Empty for null/unchanged
- A single field value in a pgoutput tuple.
TypeMessage = object typeId*: int32 namespace*: string name*: string
- Custom type definition.
UpdateMessage = object relationId*: int32 hasOldTuple*: bool ## True if old key/full row is included oldTuple*: seq[TupleField] newTuple*: seq[TupleField]
- Row update.
XLogData = object startLsn*: Lsn ## Start LSN of the WAL data in this message walEnd*: Lsn ## Current end of WAL on the server at the time this message was sent. ## This is *not* the end of the WAL data contained in this message; it ## reflects how far WAL has advanced on the server and is informational. ## To acknowledge what was actually received, use ``receivedEndLsn`` ## (``startLsn + data.len``), never ``walEnd`` — ``walEnd`` may be ahead ## of what this message contains. sendTime*: int64 ## Server send time (microseconds since PG epoch) data*: seq[byte] ## Raw WAL data (plugin-dependent format)
- WAL data payload from the server.
Consts
InvalidLsn = 0'u
- Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
- Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).
Procs
proc connectReplication(config: ConnConfig; mode: ReplicationMode = rmDatabase): Future[ PgConnection] {....raises: [Exception, ValueError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Connect to PostgreSQL with the replication startup parameter set. rmDatabase enables logical replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, START_REPLICATION ... LOGICAL, etc.) against the chosen database. rmPhysical opens a physical replication connection (replication=true); only replication commands work — no SQL on user databases is permitted.
proc connectReplication(dsn: string; mode: ReplicationMode = rmDatabase): Future[ PgConnection] {....raises: [PgError, Exception, ValueError, CatchableError], tags: [ ReadIOEffect, RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- DSN-string variant of connectReplication. See the ConnConfig overload for the meaning of mode.
proc createReplicationSlot(conn: PgConnection; slotName: string; plugin: string = "pgoutput"; temporary: bool = false; timeout: async_backend.Duration = ZeroDuration): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Create a logical replication slot. Returns slot info including the consistent point LSN.
On timeout, the connection is marked csClosed (protocol out of sync).
proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
- Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [ProtocolError], tags: [], forbids: [].}
- Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string; wait: bool = false; timeout: async_backend.Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Drop a replication slot.
On timeout, the connection is marked csClosed (protocol out of sync).
proc identifySystem(conn: PgConnection; timeout: async_backend.Duration = ZeroDuration): Future[ SystemInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Execute IDENTIFY_SYSTEM and return system identification info.
On timeout, the connection is marked csClosed (protocol out of sync).
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {. ...raises: [ProtocolError], tags: [], forbids: [].}
- Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: seq[byte]): ReplicationMessage {. ...raises: [ProtocolError], tags: [], forbids: [].}
- Parse a CopyData payload into a ReplicationMessage.
proc readReplicationSlot(conn: PgConnection; slotName: string; timeout: async_backend.Duration = ZeroDuration): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Read information about an existing replication slot.
On timeout, the connection is marked csClosed (protocol out of sync).
proc receivedEndLsn(msg: XLogData): Lsn {....raises: [], tags: [], forbids: [].}
- End LSN of the WAL data actually contained in this message (startLsn + len(data)). Use this when acknowledging received data via sendStandbyStatus; do not use walEnd, which is the server's current WAL position and may point past data this message does not carry.
proc sendCopyData(conn: PgConnection; data: openArray[byte]): Future[void] {. ...raises: [PgConnectionError, Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
-
Send a raw CopyData frame to the server during a CopyBoth stream (i.e. while the connection is in csReplicating). Useful for protocols layered on top of CopyBoth — for example, physical replication acknowledgements or custom replication plugins that exchange messages the library does not know about. For Standby Status Updates, prefer sendStandbyStatus which builds the payload for you.
data is encoded into a CopyData frame synchronously before the first async suspension, so the caller's buffer does not need to outlive the returned Future.
proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn; flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn; replyRequested: bool = false): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
- Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.
proc startPhysicalReplication(conn: PgConnection; startLsn: Lsn; slotName: string = ""; timeline: int32 = 0; autoKeepaliveReply: bool = true; callback: ReplicationCallback): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError, ProtocolError, PgQueryError, AsyncTimeoutError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Begin physical replication streaming.
slotName is optional; pass "" for a slot-less stream. timeline is appended as TIMELINE n when non-zero — useful when the standby is following a specific timeline and must abort if the primary advanced past it.
The callback contract matches startReplication: each XLogData or PrimaryKeepalive is delivered as a ReplicationMessage. The raw WAL bytes inside XLogData.data are the physical WAL stream; no pgoutput decoding applies.
autoKeepaliveReply behaves identically to startReplication: when true, PrimaryKeepalive(replyRequested=true) is acknowledged with the highest observed receivedEndLsn before the callback is invoked.
On a timeline switch the server may send a final result set describing the next timeline (RowDescription + DataRow + CommandComplete) between CopyDone and ReadyForQuery. This proc drains and discards those messages; callers that need the next-timeline information should re-issue IDENTIFY_SYSTEM after this proc returns.
proc startReplication(conn: PgConnection; slotName: string; startLsn: Lsn = InvalidLsn; options: seq[(string, string)] = @[]; autoKeepaliveReply: bool = true; callback: ReplicationCallback): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, ValueError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Begin logical replication streaming from the given slot.
The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. Within the callback, use sendStandbyStatus to acknowledge received data.
When autoKeepaliveReply is true (the default), the library responds automatically to PrimaryKeepalive messages with replyRequested = true before invoking the callback, sending the highest receivedEndLsn (startLsn + data.len) observed so far across received XLogData messages — or the caller-supplied startLsn if no XLogData has arrived yet — as receive/flush/apply LSN. This prevents silent disconnects from wal_sender_timeout when the callback is slow. The keepalive is still delivered to the callback. Set autoKeepaliveReply = false to manage replies manually — for example, when the flush/apply LSN must reflect callback-side progress (durable writes) rather than what has merely been received from the wire.
If no XLogData has arrived and startLsn was left at its default InvalidLsn (0/0), the auto-reply will carry 0/0 for receive/flush/apply. PostgreSQL treats this as "position unknown" and will not move confirmed_flush_lsn backwards, so the reply is still useful for resetting wal_sender_timeout without risking data loss.
If the auto-reply itself fails (for example, the connection is lost between receiving the keepalive and writing the Standby Status Update), the exception is propagated out of startReplication and the callback is not invoked for that keepalive.
The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).
proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send CopyDone to gracefully terminate the replication stream. The server will respond with CopyDone and ReadyForQuery, which will be handled by the startReplication recv loop.
proc timelineHistory(conn: PgConnection; timeline: int32; timeout: async_backend.Duration = ZeroDuration): Future[ TimelineHistory] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Execute TIMELINE_HISTORY <tli> and return the history file metadata plus its raw contents. Required when a physical standby needs to follow a timeline switch on the primary.
On timeout, the connection is marked csClosed (protocol out of sync).
proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
- Convert a TupleField's data to a string by copying the bytes.
Templates
template makeReplicationCallback(body: untyped): ReplicationCallback
- Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.