async_postgres/pg_replication

Search:
Group by:

PostgreSQL Logical Replication support.

Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.

Quick start

let conn = await connectReplication("postgresql://user:pass@host/db")
defer: await conn.close()
let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true)
await conn.startReplication("my_slot", slot.consistentPoint,
    options = {"proto_version": "'1'", "publication_names": "'my_pub'"},
    callback = myCallback)

Types

BeginMessage = object
  finalLsn*: Lsn             ## LSN of the commit record
  commitTime*: int64         ## Commit timestamp (microseconds since PG epoch)
  xid*: int32                ## Transaction ID
Transaction begin.
CommitMessage = object
  flags*: byte
  commitLsn*: Lsn
  endLsn*: Lsn
  commitTime*: int64
Transaction commit.
DeleteMessage = object
  relationId*: int32
  keyKind*: char ## 'K' if oldTuple holds only the replica identity key,
                 ## 'O' if it holds the full old row (REPLICA IDENTITY FULL).
  oldTuple*: seq[TupleField]
Row deletion.
InsertMessage = object
  relationId*: int32
  newTuple*: seq[TupleField]
Row insertion.
LogicalMessage = object
  flags*: byte               ## Bit 0: transactional
  lsn*: Lsn
  prefix*: string
  content*: seq[byte]
Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object
  originLsn*: Lsn
  originName*: string
Replication origin.
PgOutputMessage = object
  case kind*: PgOutputMessageKind
  of pomkBegin:
    begin*: BeginMessage
  of pomkCommit:
    commit*: CommitMessage
  of pomkOrigin:
    origin*: OriginMessage
  of pomkRelation:
    relation*: RelationInfo
  of pomkType:
    typeMsg*: TypeMessage
  of pomkInsert:
    insert*: InsertMessage
  of pomkUpdate:
    update*: UpdateMessage
  of pomkDelete:
    delete*: DeleteMessage
  of pomkTruncate:
    truncate*: TruncateMessage
  of pomkMessage:
    message*: LogicalMessage
A decoded pgoutput plugin message.
PgOutputMessageKind = enum
  pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert,
  pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object
  walEnd*: Lsn               ## Current end of WAL on the server
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  replyRequested*: bool      ## Whether the server wants an immediate status reply
Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object
  flags*: byte               ## Bit 0: part of replica identity key
  name*: string
  typeOid*: int32
  typeMod*: int32
A single column in a relation definition.
RelationInfo = object
  relationId*: int32
  namespace*: string         ## Schema name
  name*: string              ## Table name
  replicaIdentity*: char     ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index)
  columns*: seq[RelationColumn]
Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
Callback invoked for each replication message during streaming.
ReplicationMessage = object
  case kind*: ReplicationMessageKind
  of rmkXLogData:
    xlogData*: XLogData
  of rmkPrimaryKeepalive:
    keepalive*: PrimaryKeepalive
A single message received during replication streaming.
ReplicationMessageKind = enum
  rmkXLogData, rmkPrimaryKeepalive
Replication message types (decoded from CopyData during streaming)
ReplicationMode = enum
  rmDatabase, rmPhysical
Replication mode selected at connection time. rmDatabase sends replication=database (logical replication + ability to run SQL on the chosen database). rmPhysical sends replication=true (physical replication; no SQL on user databases).
ReplicationSlotInfo = object
  slotName*: string
  consistentPoint*: Lsn      ## confirmed_flush_lsn (logical) or restart_lsn (physical)
  snapshotName*: string      ## Snapshot name (only available at CREATE time)
  outputPlugin*: string
Information about a replication slot.
SystemInfo = object
  systemId*: string
  timeline*: int32
  xLogPos*: Lsn
  dbName*: string
Result of IDENTIFY_SYSTEM command.
TimelineHistory = object
  filename*: string          ## Timeline history file name (e.g. "00000002.history").
  content*: seq[byte]        ## Raw history file content.
Result of TIMELINE_HISTORY command.
TruncateMessage = object
  options*: byte             ## Bit 0: CASCADE, bit 1: RESTART IDENTITY
  relationIds*: seq[int32]
Table truncation.
TupleDataKind = enum
  tdkNull = 110,            ## NULL value
  tdkText = 116,            ## Text-formatted value
  tdkBinary = 98,           ## Binary-formatted value (protocol_version >= 2)
  tdkUnchanged = 117         ## TOAST value unchanged
Kind of a single field value in a pgoutput tuple.
TupleField = object
  kind*: TupleDataKind
  data*: seq[byte]           ## Empty for null/unchanged
A single field value in a pgoutput tuple.
TypeMessage = object
  typeId*: int32
  namespace*: string
  name*: string
Custom type definition.
UpdateMessage = object
  relationId*: int32
  hasOldTuple*: bool         ## True if old key/full row is included
  keyKind*: char ## When hasOldTuple is true: 'K' if oldTuple holds only the replica
                 ## identity key, 'O' if it holds the full old row (REPLICA IDENTITY FULL).
                 ## '\0' when no old tuple is present.
  oldTuple*: seq[TupleField]
  newTuple*: seq[TupleField]
Row update.
XLogData = object
  startLsn*: Lsn             ## Start LSN of the WAL data in this message
  walEnd*: Lsn ## Current end of WAL on the server at the time this message was sent.
               ## This is *not* the end of the WAL data contained in this message; it
               ## reflects how far WAL has advanced on the server and is informational.
               ## To acknowledge what was actually received, use ``receivedEndLsn``
               ## (``startLsn + data.len``), never ``walEnd`` — ``walEnd`` may be ahead
               ## of what this message contains.
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  data*: seq[byte]           ## Raw WAL data (plugin-dependent format)
WAL data payload from the server.

Consts

InvalidLsn = 0'u
Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).

Procs

proc `$`(lsn: Lsn): string {....raises: [], tags: [], forbids: [].}
Format an LSN as "X/Y" hex string.
proc `<`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `<=`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `==`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `>`(a, b: Lsn): bool {.inline, ...raises: [], tags: [], forbids: [].}
proc `>=`(a, b: Lsn): bool {.inline, ...raises: [], tags: [], forbids: [].}
proc confirmedFlushLsn(conn: PgConnection): Lsn {.inline, ...raises: [], tags: [],
    forbids: [].}

Highest LSN the application has confirmed durably flushed for the current replication stream via confirmFlushed. Initialised to the stream's startLsn by startReplication / startPhysicalReplication; this is the flush/apply position carried by automatic keepalive replies.

Only meaningful during an active stream: outside csReplicating (before a stream starts or after it ends) this returns InvalidLsn (0/0) rather than a stale value left over from a previous stream.

proc confirmFlushed(conn: PgConnection; lsn: Lsn): bool {.
    ...raises: [PgConnectionError], tags: [], forbids: [].}

Record that received WAL up to and including lsn has been durably persisted by the application, so automatic keepalive replies (see autoKeepaliveReply on startReplication) report it as the flush/apply position and let the server advance confirmed_flush_lsn and recycle WAL.

Call this from the replication callback after the received changes are durable. Until you do, the automatic reply acknowledges only receipt (the receive LSN), never flush — so a crash re-streams the unprocessed WAL, giving at-least-once delivery. Calls that would move the confirmed position backwards are ignored, so duplicate or out-of-order confirmations are safe.

lsn is clamped to the WAL actually received: you cannot have durably persisted WAL you have not yet received, so an lsn beyond the highest XLogData.receivedEndLsn observed confirms only up to that received position (passing walEnd — which runs ahead of the data this message carries — therefore confirms received WAL rather than over-advancing). Because of this clamp the confirmed position can never exceed received WAL, so automatic replies never emit a flush ahead of receive, and the call never raises on an out-of-range LSN (an uncaught raise from the callback would strand the stream). Must be called while the connection is csReplicating (i.e. from the replication callback); calling it outside an active stream raises PgConnectionError.

Returns true when the confirmed-flush position actually moved forward (after clamping and the monotonic guard). false means the request was ignored because it was behind the current confirmed position.

proc connectReplication(config: ConnConfig; mode: ReplicationMode = rmDatabase): Future[
    PgConnection] {....raises: [Exception, ValueError, PgConnectionError,
                             CatchableError],
                    tags: [RootEffect, WriteIOEffect, TimeEffect], forbids: [].}
Connect to PostgreSQL with the replication startup parameter set. rmDatabase enables logical replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, START_REPLICATION ... LOGICAL, etc.) against the chosen database. rmPhysical opens a physical replication connection (replication=true); only replication commands work — no SQL on user databases is permitted.
proc connectReplication(dsn: string; mode: ReplicationMode = rmDatabase): Future[
    PgConnection] {....raises: [PgError, Exception, ValueError, PgConnectionError,
                             CatchableError], tags: [ReadIOEffect, RootEffect,
    WriteIOEffect, TimeEffect], forbids: [].}
DSN-string variant of connectReplication. See the ConnConfig overload for the meaning of mode.
proc createReplicationSlot(conn: PgConnection; slotName: string;
                           plugin: string = "pgoutput"; temporary: bool = false;
                           timeout: async_backend.Duration = ZeroDuration): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                           tags: [RootEffect, TimeEffect], forbids: [].}

Create a logical replication slot. Returns slot info including the consistent point LSN.

On timeout, the connection is marked csClosed (protocol out of sync).

proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [PgProtocolError],
    tags: [], forbids: [].}
Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string;
                         wait: bool = false;
                         timeout: async_backend.Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, CatchableError],
            tags: [RootEffect, TimeEffect], forbids: [].}

Drop a replication slot.

On timeout, the connection is marked csClosed (protocol out of sync).

proc identifySystem(conn: PgConnection;
                    timeout: async_backend.Duration = ZeroDuration): Future[
    SystemInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                  tags: [RootEffect, TimeEffect], forbids: [].}

Execute IDENTIFY_SYSTEM and return system identification info.

On timeout, the connection is marked csClosed (protocol out of sync).

proc parseLsn(s: string): Lsn {....raises: [PgTypeError, PgTypeError], tags: [],
                                forbids: [].}
Parse an LSN from "X/Y" hex string. Converts a malformed value (wrong shape or non-hex halves) into PgTypeError so callers stay under the except PgError contract, mirroring parseTimelineId.
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {.
    ...raises: [PgProtocolError], tags: [], forbids: [].}
Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: openArray[byte]): ReplicationMessage {.
    ...raises: [PgProtocolError], tags: [], forbids: [].}
Parse a CopyData payload into a ReplicationMessage.
proc parseTimelineId(s: string): int32 {....raises: [PgTypeError], tags: [],
    forbids: [].}
Parse the timeline id from an IDENTIFY_SYSTEM result row (text format). Converts a non-numeric value and an out-of-int32-range value into PgTypeError so callers stay under the except PgError contract. Range-check before narrowing: a bare parseInt(...).int32 would raise RangeDefect (a Defect, outside PgError) on an out-of-range value.
proc readReplicationSlot(conn: PgConnection; slotName: string;
                         timeout: async_backend.Duration = ZeroDuration): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                           tags: [RootEffect, TimeEffect], forbids: [].}

Read information about an existing replication slot.

On timeout, the connection is marked csClosed (protocol out of sync).

proc receivedEndLsn(msg: XLogData): Lsn {....raises: [], tags: [], forbids: [].}
End LSN of the WAL data actually contained in this message (startLsn + len(data)). Use this when acknowledging received data via sendStandbyStatus; do not use walEnd, which is the server's current WAL position and may point past data this message does not carry.
proc sendCopyData(conn: PgConnection; data: openArray[byte]): Future[void] {.
    ...raises: [PgConnectionError, ValueError, Exception, SslError],
    tags: [RootEffect], forbids: [].}

Send a raw CopyData frame to the server during a CopyBoth stream (i.e. while the connection is in csReplicating). Useful for protocols layered on top of CopyBoth — for example, physical replication acknowledgements or custom replication plugins that exchange messages the library does not know about. For Standby Status Updates, prefer sendStandbyStatus which builds the payload for you.

data is encoded into a CopyData frame synchronously before the first async suspension, so the caller's buffer does not need to outlive the returned Future.

proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn;
                       flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn;
                       replyRequested: bool = false): Future[void] {.
    ...stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.

When flushLsn/applyLsn are left at InvalidLsn (0/0) they default up to receiveLsn — convenient for callers that ACK received data eagerly. Pass an explicit flushLsn/applyLsn to report a position behind receiveLsn (e.g. only what the callback has durably flushed). The automatic keepalive reply does not use this proc; it sends the confirmed-flush position verbatim via an internal path so it never inflates flush to merely-received WAL.

proc startPhysicalReplication(conn: PgConnection; startLsn: Lsn;
                              slotName: string = ""; timeline: int32 = 0;
                              autoKeepaliveReply: bool = true; statusInterval: async_backend.Duration = ZeroDuration;
                              callback: ReplicationCallback): Future[void] {.
    ...stackTrace: false, raises: [Exception, PgConnectionError, PgStateError,
                                SslError, ValueError, PgProtocolError,
                                PgQueryError, AsyncTimeoutError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Begin physical replication streaming.

slotName is optional; pass "" for a slot-less stream. timeline is appended as TIMELINE n when non-zero — useful when the standby is following a specific timeline and must abort if the primary advanced past it.

The callback contract matches startReplication: each XLogData or PrimaryKeepalive is delivered as a ReplicationMessage. The raw WAL bytes inside XLogData.data are the physical WAL stream; no pgoutput decoding applies.

autoKeepaliveReply behaves identically to startReplication: when true, PrimaryKeepalive(replyRequested=true) is answered before the callback runs, reporting the highest observed receivedEndLsn as the receive LSN and the confirmFlushed position (initially startLsn) as flush/apply. For physical replication the flush LSN governs how much WAL the primary may recycle, so call confirmFlushed only once that WAL is safely on durable storage. A physical standby listed in synchronous_standby_names that relies on the auto-reply must likewise call confirmFlushed (or reply manually), or the primary's synchronous COMMITs will block waiting for a flush position that never advances.

statusInterval behaves as documented on startReplication: a positive value makes the standby send a proactive Standby Status Update at least that often (receive = highest received, flush/apply = confirmFlushed) so the primary can recycle WAL even when it never requests a reply (e.g. wal_sender_timeout = 0); it is honoured only with autoKeepaliveReply, and on asyncdispatch only fires while messages are flowing.

Error handling matches startReplication: a callback exception or any other mid-stream failure poisons the connection (marked closed) and propagates, so reconnect and resume from the last LSN you tracked durable.

On a timeline switch the server may send a final result set describing the next timeline (RowDescription + DataRow + CommandComplete) between CopyDone and ReadyForQuery. This proc drains and discards those messages; callers that need the next-timeline information should re-issue IDENTIFY_SYSTEM after this proc returns.

proc startReplication(conn: PgConnection; slotName: string;
                      startLsn: Lsn = InvalidLsn;
                      options: seq[(string, string)] = @[];
                      autoKeepaliveReply: bool = true;
                      statusInterval: async_backend.Duration = ZeroDuration;
                      callback: ReplicationCallback): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgConnectionError,
                                PgStateError, SslError, PgProtocolError,
                                PgQueryError, AsyncTimeoutError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Begin logical replication streaming from the given slot.

The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. From the callback, acknowledge durable progress with confirmFlushed (the default path; see below). With autoKeepaliveReply = false you instead drive replies yourself with sendStandbyStatus; do not mix the two, since the auto-reply would report a flush position behind your manual ACKs.

When autoKeepaliveReply is true (the default), the library responds automatically to PrimaryKeepalive messages with replyRequested = true before invoking the callback. The reply reports the highest receivedEndLsn (startLsn + data.len) observed so far across received XLogData messages — or the caller-supplied startLsn if no XLogData has arrived yet — as the receive LSN, which resets wal_sender_timeout and prevents silent disconnects when the callback is slow. The flush/apply LSN, however, carries only what you have confirmed durable via confirmFlushed (initially startLsn), not the receive LSN. This keeps confirmed_flush_lsn from advancing past WAL the callback has not yet persisted, so a crash re-streams unprocessed changes (at-least-once delivery). The keepalive is still delivered to the callback.

To advance the slot, call confirmFlushed(conn, lsn) from the callback once the received changes are durable. The confirmed position reaches the server on the next reply-requested keepalive and on stopReplication (a clean stop flushes it), not on the confirmFlushed call itself. Set autoKeepaliveReply = false to manage replies entirely by hand with sendStandbyStatus instead — for example, to batch acknowledgements or report apply separately from flush.

Until confirmFlushed is called and while startLsn is at its default InvalidLsn (0/0), the auto-reply carries 0/0 for flush/apply. PostgreSQL treats this as "position unknown" and will not move confirmed_flush_lsn backwards, so the reply is still useful for resetting wal_sender_timeout without risking data loss.

Synchronous standbys: because the auto-reply reports receive and flush/apply separately, a consumer listed in synchronous_standby_names (with synchronous_commit at on/remote_write/remote_apply) that never calls confirmFlushed keeps wal_sender_timeout reset via the receive field yet never advances flush — so the primary's COMMITs block indefinitely waiting for a flush confirmation that never arrives. Call confirmFlushed promptly (or manage replies manually) in that setup.

Proactive status interval: statusInterval (ZeroDuration = off, the default) makes the library send a Standby Status Update on its own at least that often, in addition to answering reply-requested keepalives. The proactive update reports the highest received LSN as receive and the confirmFlushed position as flush/apply — same as the auto-reply — so it advances confirmed_flush_lsn (letting the server recycle WAL) without ever flushing past unconfirmed WAL. Set it when the server uses wal_sender_timeout = 0 (or a long timeout): such a server never asks for a reply, so without a proactive interval the slot only advances on stopReplication and WAL accumulates meanwhile. It is honoured only when autoKeepaliveReply is true; under manual reply management drive the cadence yourself with sendStandbyStatus. Under asyncdispatch the interval only fires while messages are flowing (it cannot safely interrupt a blocked read, so a fully idle stream sends nothing until the next message); chronos honours it even on a completely idle stream.

If the auto-reply itself fails (for example, the connection is lost between receiving the keepalive and writing the Standby Status Update), the exception is propagated out of startReplication and the callback is not invoked for that keepalive.

Errors invalidate the connection. If the callback raises — or the stream fails for any other reason mid-flight — the exception propagates out of this proc and the connection is poisoned (marked closed): the CopyBoth exchange is left half-open and the protocol stream is out of sync, so the connection cannot be reused. There is no built-in reconnect; treat the connection as dead, close it (a pool discards it automatically), and resume on a fresh connection. Because confirmFlushed / confirmedFlushLsn reset once the stream ends, track the last LSN you confirmed durable yourself in the callback so you know the restart point, then pass it as startLsn on the new stream. See examples/replication.nim for a reconnect-and-resume loop.

The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).

The bundled pgoutput decoder (parsePgOutputMessage / decodePgOutput) supports protocol version 1 only. Passing a proto_version other than 1 in options raises ValueError, because a v2/v3 stream reshapes and adds messages the decoder cannot parse.

proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Gracefully terminate the replication stream.

Before sending CopyDone, this flushes the latest confirmFlushed position to the server (receive = highest WAL received, flush/apply = confirmed) so a clean shutdown does not lose the final acknowledgement. confirmFlushed only records locally; without this flush the confirmed position would reach the server only on the next PrimaryKeepalive(replyRequested), which may never arrive before stop — leaving the slot behind and re-streaming the last batch on restart. When nothing has been confirmed the flush is the stream's startLsn (0/0 only when startLsn was left at its default InvalidLsn, which PostgreSQL reads as "position unknown" and will not move the slot backwards), so manual-ACK callers are unaffected.

The server responds with CopyDone and ReadyForQuery, which are handled by the startReplication recv loop.

If flushing the confirmed position fails (for example because the connection is already lost), the exception propagates and CopyDone is not sent. In that situation the server has already dropped the connection, so the missing CopyDone does not change the outcome.

proc timelineHistory(conn: PgConnection; timeline: int32;
                     timeout: async_backend.Duration = ZeroDuration): Future[
    TimelineHistory] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                       tags: [RootEffect, TimeEffect], forbids: [].}

Execute TIMELINE_HISTORY <tli> and return the history file metadata plus its raw contents. Required when a physical standby needs to follow a timeline switch on the primary.

On timeout, the connection is marked csClosed (protocol out of sync).

proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
Convert a TupleField's data to a string by copying the bytes.

Templates

template makeReplicationCallback(body: untyped): ReplicationCallback
Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.
template toInt64(lsn: Lsn): int64
Get the LSN as int64 (for wire protocol encoding).
template toUInt64(lsn: Lsn): uint64
Get the raw uint64 value of an LSN.