PostgreSQL Logical Replication support.
Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.
Quick start
let conn = await connectReplication("postgresql://user:pass@host/db") defer: await conn.close() let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true) await conn.startReplication("my_slot", slot.consistentPoint, options = {"proto_version": "'1'", "publication_names": "'my_pub'"}, callback = myCallback)
Types
BeginMessage = object finalLsn*: Lsn ## LSN of the commit record commitTime*: int64 ## Commit timestamp (microseconds since PG epoch) xid*: int32 ## Transaction ID
- Transaction begin.
CommitMessage = object flags*: byte commitLsn*: Lsn endLsn*: Lsn commitTime*: int64
- Transaction commit.
DeleteMessage = object relationId*: int32 keyKind*: char ## 'K' if oldTuple holds only the replica identity key, ## 'O' if it holds the full old row (REPLICA IDENTITY FULL). oldTuple*: seq[TupleField]
- Row deletion.
InsertMessage = object relationId*: int32 newTuple*: seq[TupleField]
- Row insertion.
LogicalMessage = object flags*: byte ## Bit 0: transactional lsn*: Lsn prefix*: string content*: seq[byte]
- Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
- LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object originLsn*: Lsn originName*: string
- Replication origin.
PgOutputMessage = object case kind*: PgOutputMessageKind of pomkBegin: begin*: BeginMessage of pomkCommit: commit*: CommitMessage of pomkOrigin: origin*: OriginMessage of pomkRelation: relation*: RelationInfo of pomkType: typeMsg*: TypeMessage of pomkInsert: insert*: InsertMessage of pomkUpdate: update*: UpdateMessage of pomkDelete: delete*: DeleteMessage of pomkTruncate: truncate*: TruncateMessage of pomkMessage: message*: LogicalMessage
- A decoded pgoutput plugin message.
PgOutputMessageKind = enum pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert, pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
- Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object walEnd*: Lsn ## Current end of WAL on the server sendTime*: int64 ## Server send time (microseconds since PG epoch) replyRequested*: bool ## Whether the server wants an immediate status reply
- Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
- Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object flags*: byte ## Bit 0: part of replica identity key name*: string typeOid*: int32 typeMod*: int32
- A single column in a relation definition.
RelationInfo = object relationId*: int32 namespace*: string ## Schema name name*: string ## Table name replicaIdentity*: char ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index) columns*: seq[RelationColumn]
- Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
- Callback invoked for each replication message during streaming.
ReplicationMessage = object case kind*: ReplicationMessageKind of rmkXLogData: xlogData*: XLogData of rmkPrimaryKeepalive: keepalive*: PrimaryKeepalive
- A single message received during replication streaming.
ReplicationMessageKind = enum rmkXLogData, rmkPrimaryKeepalive
- Replication message types (decoded from CopyData during streaming)
ReplicationMode = enum rmDatabase, rmPhysical
- Replication mode selected at connection time. rmDatabase sends replication=database (logical replication + ability to run SQL on the chosen database). rmPhysical sends replication=true (physical replication; no SQL on user databases).
ReplicationSlotInfo = object slotName*: string consistentPoint*: Lsn ## confirmed_flush_lsn (logical) or restart_lsn (physical) snapshotName*: string ## Snapshot name (only available at CREATE time) outputPlugin*: string
- Information about a replication slot.
SystemInfo = object systemId*: string timeline*: int32 xLogPos*: Lsn dbName*: string
- Result of IDENTIFY_SYSTEM command.
TimelineHistory = object filename*: string ## Timeline history file name (e.g. "00000002.history"). content*: seq[byte] ## Raw history file content.
- Result of TIMELINE_HISTORY command.
TruncateMessage = object options*: byte ## Bit 0: CASCADE, bit 1: RESTART IDENTITY relationIds*: seq[int32]
- Table truncation.
TupleDataKind = enum tdkNull = 110, ## NULL value tdkText = 116, ## Text-formatted value tdkBinary = 98, ## Binary-formatted value (protocol_version >= 2) tdkUnchanged = 117 ## TOAST value unchanged
- Kind of a single field value in a pgoutput tuple.
TupleField = object kind*: TupleDataKind data*: seq[byte] ## Empty for null/unchanged
- A single field value in a pgoutput tuple.
TypeMessage = object typeId*: int32 namespace*: string name*: string
- Custom type definition.
UpdateMessage = object relationId*: int32 hasOldTuple*: bool ## True if old key/full row is included keyKind*: char ## When hasOldTuple is true: 'K' if oldTuple holds only the replica ## identity key, 'O' if it holds the full old row (REPLICA IDENTITY FULL). ## '\0' when no old tuple is present. oldTuple*: seq[TupleField] newTuple*: seq[TupleField]
- Row update.
XLogData = object startLsn*: Lsn ## Start LSN of the WAL data in this message walEnd*: Lsn ## Current end of WAL on the server at the time this message was sent. ## This is *not* the end of the WAL data contained in this message; it ## reflects how far WAL has advanced on the server and is informational. ## To acknowledge what was actually received, use ``receivedEndLsn`` ## (``startLsn + data.len``), never ``walEnd`` — ``walEnd`` may be ahead ## of what this message contains. sendTime*: int64 ## Server send time (microseconds since PG epoch) data*: seq[byte] ## Raw WAL data (plugin-dependent format)
- WAL data payload from the server.
Consts
InvalidLsn = 0'u
- Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
- Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).
Procs
proc confirmedFlushLsn(conn: PgConnection): Lsn {.inline, ...raises: [], tags: [], forbids: [].}
-
Highest LSN the application has confirmed durably flushed for the current replication stream via confirmFlushed. Initialised to the stream's startLsn by startReplication / startPhysicalReplication; this is the flush/apply position carried by automatic keepalive replies.
Only meaningful during an active stream: outside csReplicating (before a stream starts or after it ends) this returns InvalidLsn (0/0) rather than a stale value left over from a previous stream.
proc confirmFlushed(conn: PgConnection; lsn: Lsn): bool {. ...raises: [PgConnectionError], tags: [], forbids: [].}
-
Record that received WAL up to and including lsn has been durably persisted by the application, so automatic keepalive replies (see autoKeepaliveReply on startReplication) report it as the flush/apply position and let the server advance confirmed_flush_lsn and recycle WAL.
Call this from the replication callback after the received changes are durable. Until you do, the automatic reply acknowledges only receipt (the receive LSN), never flush — so a crash re-streams the unprocessed WAL, giving at-least-once delivery. Calls that would move the confirmed position backwards are ignored, so duplicate or out-of-order confirmations are safe.
lsn is clamped to the WAL actually received: you cannot have durably persisted WAL you have not yet received, so an lsn beyond the highest XLogData.receivedEndLsn observed confirms only up to that received position (passing walEnd — which runs ahead of the data this message carries — therefore confirms received WAL rather than over-advancing). Because of this clamp the confirmed position can never exceed received WAL, so automatic replies never emit a flush ahead of receive, and the call never raises on an out-of-range LSN (an uncaught raise from the callback would strand the stream). Must be called while the connection is csReplicating (i.e. from the replication callback); calling it outside an active stream raises PgConnectionError.
Returns true when the confirmed-flush position actually moved forward (after clamping and the monotonic guard). false means the request was ignored because it was behind the current confirmed position.
proc connectReplication(config: ConnConfig; mode: ReplicationMode = rmDatabase): Future[ PgConnection] {....raises: [Exception, ValueError, PgConnectionError, CatchableError], tags: [RootEffect, WriteIOEffect, TimeEffect], forbids: [].}
- Connect to PostgreSQL with the replication startup parameter set. rmDatabase enables logical replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, START_REPLICATION ... LOGICAL, etc.) against the chosen database. rmPhysical opens a physical replication connection (replication=true); only replication commands work — no SQL on user databases is permitted.
proc connectReplication(dsn: string; mode: ReplicationMode = rmDatabase): Future[ PgConnection] {....raises: [PgError, Exception, ValueError, PgConnectionError, CatchableError], tags: [ReadIOEffect, RootEffect, WriteIOEffect, TimeEffect], forbids: [].}
- DSN-string variant of connectReplication. See the ConnConfig overload for the meaning of mode.
proc createReplicationSlot(conn: PgConnection; slotName: string; plugin: string = "pgoutput"; temporary: bool = false; timeout: async_backend.Duration = ZeroDuration): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Create a logical replication slot. Returns slot info including the consistent point LSN.
On timeout, the connection is marked csClosed (protocol out of sync).
proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
- Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [PgProtocolError], tags: [], forbids: [].}
- Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string; wait: bool = false; timeout: async_backend.Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Drop a replication slot.
On timeout, the connection is marked csClosed (protocol out of sync).
proc identifySystem(conn: PgConnection; timeout: async_backend.Duration = ZeroDuration): Future[ SystemInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Execute IDENTIFY_SYSTEM and return system identification info.
On timeout, the connection is marked csClosed (protocol out of sync).
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {. ...raises: [PgProtocolError], tags: [], forbids: [].}
- Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: openArray[byte]): ReplicationMessage {. ...raises: [PgProtocolError], tags: [], forbids: [].}
- Parse a CopyData payload into a ReplicationMessage.
proc parseTimelineId(s: string): int32 {....raises: [PgTypeError], tags: [], forbids: [].}
- Parse the timeline id from an IDENTIFY_SYSTEM result row (text format). Converts a non-numeric value and an out-of-int32-range value into PgTypeError so callers stay under the except PgError contract. Range-check before narrowing: a bare parseInt(...).int32 would raise RangeDefect (a Defect, outside PgError) on an out-of-range value.
proc readReplicationSlot(conn: PgConnection; slotName: string; timeout: async_backend.Duration = ZeroDuration): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Read information about an existing replication slot.
On timeout, the connection is marked csClosed (protocol out of sync).
proc receivedEndLsn(msg: XLogData): Lsn {....raises: [], tags: [], forbids: [].}
- End LSN of the WAL data actually contained in this message (startLsn + len(data)). Use this when acknowledging received data via sendStandbyStatus; do not use walEnd, which is the server's current WAL position and may point past data this message does not carry.
proc sendCopyData(conn: PgConnection; data: openArray[byte]): Future[void] {. ...raises: [PgConnectionError, ValueError, Exception, SslError], tags: [RootEffect], forbids: [].}
-
Send a raw CopyData frame to the server during a CopyBoth stream (i.e. while the connection is in csReplicating). Useful for protocols layered on top of CopyBoth — for example, physical replication acknowledgements or custom replication plugins that exchange messages the library does not know about. For Standby Status Updates, prefer sendStandbyStatus which builds the payload for you.
data is encoded into a CopyData frame synchronously before the first async suspension, so the caller's buffer does not need to outlive the returned Future.
proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn; flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn; replyRequested: bool = false): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.
When flushLsn/applyLsn are left at InvalidLsn (0/0) they default up to receiveLsn — convenient for callers that ACK received data eagerly. Pass an explicit flushLsn/applyLsn to report a position behind receiveLsn (e.g. only what the callback has durably flushed). The automatic keepalive reply does not use this proc; it sends the confirmed-flush position verbatim via an internal path so it never inflates flush to merely-received WAL.
proc startPhysicalReplication(conn: PgConnection; startLsn: Lsn; slotName: string = ""; timeline: int32 = 0; autoKeepaliveReply: bool = true; statusInterval: async_backend.Duration = ZeroDuration; callback: ReplicationCallback): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, PgStateError, SslError, ValueError, PgProtocolError, PgQueryError, AsyncTimeoutError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Begin physical replication streaming.
slotName is optional; pass "" for a slot-less stream. timeline is appended as TIMELINE n when non-zero — useful when the standby is following a specific timeline and must abort if the primary advanced past it.
The callback contract matches startReplication: each XLogData or PrimaryKeepalive is delivered as a ReplicationMessage. The raw WAL bytes inside XLogData.data are the physical WAL stream; no pgoutput decoding applies.
autoKeepaliveReply behaves identically to startReplication: when true, PrimaryKeepalive(replyRequested=true) is answered before the callback runs, reporting the highest observed receivedEndLsn as the receive LSN and the confirmFlushed position (initially startLsn) as flush/apply. For physical replication the flush LSN governs how much WAL the primary may recycle, so call confirmFlushed only once that WAL is safely on durable storage. A physical standby listed in synchronous_standby_names that relies on the auto-reply must likewise call confirmFlushed (or reply manually), or the primary's synchronous COMMITs will block waiting for a flush position that never advances.
statusInterval behaves as documented on startReplication: a positive value makes the standby send a proactive Standby Status Update at least that often (receive = highest received, flush/apply = confirmFlushed) so the primary can recycle WAL even when it never requests a reply (e.g. wal_sender_timeout = 0); it is honoured only with autoKeepaliveReply, and on asyncdispatch only fires while messages are flowing.
Error handling matches startReplication: a callback exception or any other mid-stream failure poisons the connection (marked closed) and propagates, so reconnect and resume from the last LSN you tracked durable.
On a timeline switch the server may send a final result set describing the next timeline (RowDescription + DataRow + CommandComplete) between CopyDone and ReadyForQuery. This proc drains and discards those messages; callers that need the next-timeline information should re-issue IDENTIFY_SYSTEM after this proc returns.
proc startReplication(conn: PgConnection; slotName: string; startLsn: Lsn = InvalidLsn; options: seq[(string, string)] = @[]; autoKeepaliveReply: bool = true; statusInterval: async_backend.Duration = ZeroDuration; callback: ReplicationCallback): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgConnectionError, PgStateError, SslError, PgProtocolError, PgQueryError, AsyncTimeoutError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Begin logical replication streaming from the given slot.
The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. From the callback, acknowledge durable progress with confirmFlushed (the default path; see below). With autoKeepaliveReply = false you instead drive replies yourself with sendStandbyStatus; do not mix the two, since the auto-reply would report a flush position behind your manual ACKs.
When autoKeepaliveReply is true (the default), the library responds automatically to PrimaryKeepalive messages with replyRequested = true before invoking the callback. The reply reports the highest receivedEndLsn (startLsn + data.len) observed so far across received XLogData messages — or the caller-supplied startLsn if no XLogData has arrived yet — as the receive LSN, which resets wal_sender_timeout and prevents silent disconnects when the callback is slow. The flush/apply LSN, however, carries only what you have confirmed durable via confirmFlushed (initially startLsn), not the receive LSN. This keeps confirmed_flush_lsn from advancing past WAL the callback has not yet persisted, so a crash re-streams unprocessed changes (at-least-once delivery). The keepalive is still delivered to the callback.
To advance the slot, call confirmFlushed(conn, lsn) from the callback once the received changes are durable. The confirmed position reaches the server on the next reply-requested keepalive and on stopReplication (a clean stop flushes it), not on the confirmFlushed call itself. Set autoKeepaliveReply = false to manage replies entirely by hand with sendStandbyStatus instead — for example, to batch acknowledgements or report apply separately from flush.
Until confirmFlushed is called and while startLsn is at its default InvalidLsn (0/0), the auto-reply carries 0/0 for flush/apply. PostgreSQL treats this as "position unknown" and will not move confirmed_flush_lsn backwards, so the reply is still useful for resetting wal_sender_timeout without risking data loss.
Synchronous standbys: because the auto-reply reports receive and flush/apply separately, a consumer listed in synchronous_standby_names (with synchronous_commit at on/remote_write/remote_apply) that never calls confirmFlushed keeps wal_sender_timeout reset via the receive field yet never advances flush — so the primary's COMMITs block indefinitely waiting for a flush confirmation that never arrives. Call confirmFlushed promptly (or manage replies manually) in that setup.
Proactive status interval: statusInterval (ZeroDuration = off, the default) makes the library send a Standby Status Update on its own at least that often, in addition to answering reply-requested keepalives. The proactive update reports the highest received LSN as receive and the confirmFlushed position as flush/apply — same as the auto-reply — so it advances confirmed_flush_lsn (letting the server recycle WAL) without ever flushing past unconfirmed WAL. Set it when the server uses wal_sender_timeout = 0 (or a long timeout): such a server never asks for a reply, so without a proactive interval the slot only advances on stopReplication and WAL accumulates meanwhile. It is honoured only when autoKeepaliveReply is true; under manual reply management drive the cadence yourself with sendStandbyStatus. Under asyncdispatch the interval only fires while messages are flowing (it cannot safely interrupt a blocked read, so a fully idle stream sends nothing until the next message); chronos honours it even on a completely idle stream.
If the auto-reply itself fails (for example, the connection is lost between receiving the keepalive and writing the Standby Status Update), the exception is propagated out of startReplication and the callback is not invoked for that keepalive.
Errors invalidate the connection. If the callback raises — or the stream fails for any other reason mid-flight — the exception propagates out of this proc and the connection is poisoned (marked closed): the CopyBoth exchange is left half-open and the protocol stream is out of sync, so the connection cannot be reused. There is no built-in reconnect; treat the connection as dead, close it (a pool discards it automatically), and resume on a fresh connection. Because confirmFlushed / confirmedFlushLsn reset once the stream ends, track the last LSN you confirmed durable yourself in the callback so you know the restart point, then pass it as startLsn on the new stream. See examples/replication.nim for a reconnect-and-resume loop.
The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).
The bundled pgoutput decoder (parsePgOutputMessage / decodePgOutput) supports protocol version 1 only. Passing a proto_version other than 1 in options raises ValueError, because a v2/v3 stream reshapes and adds messages the decoder cannot parse.
proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Gracefully terminate the replication stream.
Before sending CopyDone, this flushes the latest confirmFlushed position to the server (receive = highest WAL received, flush/apply = confirmed) so a clean shutdown does not lose the final acknowledgement. confirmFlushed only records locally; without this flush the confirmed position would reach the server only on the next PrimaryKeepalive(replyRequested), which may never arrive before stop — leaving the slot behind and re-streaming the last batch on restart. When nothing has been confirmed the flush is the stream's startLsn (0/0 only when startLsn was left at its default InvalidLsn, which PostgreSQL reads as "position unknown" and will not move the slot backwards), so manual-ACK callers are unaffected.
The server responds with CopyDone and ReadyForQuery, which are handled by the startReplication recv loop.
If flushing the confirmed position fails (for example because the connection is already lost), the exception propagates and CopyDone is not sent. In that situation the server has already dropped the connection, so the missing CopyDone does not change the outcome.
proc timelineHistory(conn: PgConnection; timeline: int32; timeout: async_backend.Duration = ZeroDuration): Future[ TimelineHistory] {....stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Execute TIMELINE_HISTORY <tli> and return the history file metadata plus its raw contents. Required when a physical standby needs to follow a timeline switch on the primary.
On timeout, the connection is marked csClosed (protocol out of sync).
proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
- Convert a TupleField's data to a string by copying the bytes.
Templates
template makeReplicationCallback(body: untyped): ReplicationCallback
- Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.