async_postgres/pg_replication

Search:
Group by:

PostgreSQL Logical Replication support.

Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.

Quick start

let conn = await connectReplication("postgresql://user:pass@host/db")
defer: await conn.close()
let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true)
await conn.startReplication("my_slot", slot.consistentPoint,
    options = {"proto_version": "'1'", "publication_names": "'my_pub'"},
    callback = myCallback)

Types

BeginMessage = object
  finalLsn*: Lsn             ## LSN of the commit record
  commitTime*: int64         ## Commit timestamp (microseconds since PG epoch)
  xid*: int32                ## Transaction ID
Transaction begin.
CommitMessage = object
  flags*: byte
  commitLsn*: Lsn
  endLsn*: Lsn
  commitTime*: int64
Transaction commit.
DeleteMessage = object
  relationId*: int32
  oldTuple*: seq[TupleField]
Row deletion.
InsertMessage = object
  relationId*: int32
  newTuple*: seq[TupleField]
Row insertion.
LogicalMessage = object
  flags*: byte               ## Bit 0: transactional
  lsn*: Lsn
  prefix*: string
  content*: seq[byte]
Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object
  originLsn*: Lsn
  originName*: string
Replication origin.
PgOutputMessage = object
  case kind*: PgOutputMessageKind
  of pomkBegin:
    begin*: BeginMessage
  of pomkCommit:
    commit*: CommitMessage
  of pomkOrigin:
    origin*: OriginMessage
  of pomkRelation:
    relation*: RelationInfo
  of pomkType:
    typeMsg*: TypeMessage
  of pomkInsert:
    insert*: InsertMessage
  of pomkUpdate:
    update*: UpdateMessage
  of pomkDelete:
    delete*: DeleteMessage
  of pomkTruncate:
    truncate*: TruncateMessage
  of pomkMessage:
    message*: LogicalMessage
A decoded pgoutput plugin message.
PgOutputMessageKind = enum
  pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert,
  pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object
  walEnd*: Lsn               ## Current end of WAL on the server
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  replyRequested*: bool      ## Whether the server wants an immediate status reply
Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object
  flags*: byte               ## Bit 0: part of replica identity key
  name*: string
  typeOid*: int32
  typeMod*: int32
A single column in a relation definition.
RelationInfo = object
  relationId*: int32
  namespace*: string         ## Schema name
  name*: string              ## Table name
  replicaIdentity*: char     ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index)
  columns*: seq[RelationColumn]
Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
Callback invoked for each replication message during streaming.
ReplicationMessage = object
  case kind*: ReplicationMessageKind
  of rmkXLogData:
    xlogData*: XLogData
  of rmkPrimaryKeepalive:
    keepalive*: PrimaryKeepalive
A single message received during replication streaming.
ReplicationMessageKind = enum
  rmkXLogData, rmkPrimaryKeepalive
Replication message types (decoded from CopyData during streaming)
ReplicationMode = enum
  rmDatabase, rmPhysical
Replication mode selected at connection time. rmDatabase sends replication=database (logical replication + ability to run SQL on the chosen database). rmPhysical sends replication=true (physical replication; no SQL on user databases).
ReplicationSlotInfo = object
  slotName*: string
  consistentPoint*: Lsn      ## confirmed_flush_lsn (logical) or restart_lsn (physical)
  snapshotName*: string      ## Snapshot name (only available at CREATE time)
  outputPlugin*: string
Information about a replication slot.
SystemInfo = object
  systemId*: string
  timeline*: int32
  xLogPos*: Lsn
  dbName*: string
Result of IDENTIFY_SYSTEM command.
TimelineHistory = object
  filename*: string          ## Timeline history file name (e.g. "00000002.history").
  content*: seq[byte]        ## Raw history file content.
Result of TIMELINE_HISTORY command.
TruncateMessage = object
  options*: byte             ## Bit 0: CASCADE, bit 1: RESTART IDENTITY
  relationIds*: seq[int32]
Table truncation.
TupleDataKind = enum
  tdkNull = 110,            ## NULL value
  tdkText = 116,            ## Text-formatted value
  tdkBinary = 98,           ## Binary-formatted value (protocol_version >= 2)
  tdkUnchanged = 117         ## TOAST value unchanged
Kind of a single field value in a pgoutput tuple.
TupleField = object
  kind*: TupleDataKind
  data*: seq[byte]           ## Empty for null/unchanged
A single field value in a pgoutput tuple.
TypeMessage = object
  typeId*: int32
  namespace*: string
  name*: string
Custom type definition.
UpdateMessage = object
  relationId*: int32
  hasOldTuple*: bool         ## True if old key/full row is included
  oldTuple*: seq[TupleField]
  newTuple*: seq[TupleField]
Row update.
XLogData = object
  startLsn*: Lsn             ## Start LSN of the WAL data in this message
  walEnd*: Lsn ## Current end of WAL on the server at the time this message was sent.
               ## This is *not* the end of the WAL data contained in this message; it
               ## reflects how far WAL has advanced on the server and is informational.
               ## To acknowledge what was actually received, use ``receivedEndLsn``
               ## (``startLsn + data.len``), never ``walEnd`` — ``walEnd`` may be ahead
               ## of what this message contains.
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  data*: seq[byte]           ## Raw WAL data (plugin-dependent format)
WAL data payload from the server.

Consts

InvalidLsn = 0'u
Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).

Procs

proc `$`(lsn: Lsn): string {....raises: [], tags: [], forbids: [].}
Format an LSN as "X/Y" hex string.
proc `<`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `<=`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `==`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `>`(a, b: Lsn): bool {.inline, ...raises: [], tags: [], forbids: [].}
proc `>=`(a, b: Lsn): bool {.inline, ...raises: [], tags: [], forbids: [].}
proc connectReplication(config: ConnConfig; mode: ReplicationMode = rmDatabase): Future[
    PgConnection] {....raises: [Exception, ValueError, CatchableError],
                    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Connect to PostgreSQL with the replication startup parameter set. rmDatabase enables logical replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, START_REPLICATION ... LOGICAL, etc.) against the chosen database. rmPhysical opens a physical replication connection (replication=true); only replication commands work — no SQL on user databases is permitted.
proc connectReplication(dsn: string; mode: ReplicationMode = rmDatabase): Future[
    PgConnection] {....raises: [PgError, Exception, ValueError, CatchableError], tags: [
    ReadIOEffect, RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
DSN-string variant of connectReplication. See the ConnConfig overload for the meaning of mode.
proc createReplicationSlot(conn: PgConnection; slotName: string;
                           plugin: string = "pgoutput"; temporary: bool = false;
                           timeout: async_backend.Duration = ZeroDuration): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                           tags: [RootEffect, TimeEffect], forbids: [].}

Create a logical replication slot. Returns slot info including the consistent point LSN.

On timeout, the connection is marked csClosed (protocol out of sync).

proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [ProtocolError],
    tags: [], forbids: [].}
Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string;
                         wait: bool = false;
                         timeout: async_backend.Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, CatchableError],
            tags: [RootEffect, TimeEffect], forbids: [].}

Drop a replication slot.

On timeout, the connection is marked csClosed (protocol out of sync).

proc identifySystem(conn: PgConnection;
                    timeout: async_backend.Duration = ZeroDuration): Future[
    SystemInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                  tags: [RootEffect, TimeEffect], forbids: [].}

Execute IDENTIFY_SYSTEM and return system identification info.

On timeout, the connection is marked csClosed (protocol out of sync).

proc parseLsn(s: string): Lsn {....raises: [ValueError], tags: [], forbids: [].}
Parse an LSN from "X/Y" hex string. Raises ValueError on invalid format.
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: seq[byte]): ReplicationMessage {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Parse a CopyData payload into a ReplicationMessage.
proc readReplicationSlot(conn: PgConnection; slotName: string;
                         timeout: async_backend.Duration = ZeroDuration): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                           tags: [RootEffect, TimeEffect], forbids: [].}

Read information about an existing replication slot.

On timeout, the connection is marked csClosed (protocol out of sync).

proc receivedEndLsn(msg: XLogData): Lsn {....raises: [], tags: [], forbids: [].}
End LSN of the WAL data actually contained in this message (startLsn + len(data)). Use this when acknowledging received data via sendStandbyStatus; do not use walEnd, which is the server's current WAL position and may point past data this message does not carry.
proc sendCopyData(conn: PgConnection; data: openArray[byte]): Future[void] {.
    ...raises: [PgConnectionError, Exception, SslError, ValueError],
    tags: [RootEffect], forbids: [].}

Send a raw CopyData frame to the server during a CopyBoth stream (i.e. while the connection is in csReplicating). Useful for protocols layered on top of CopyBoth — for example, physical replication acknowledgements or custom replication plugins that exchange messages the library does not know about. For Standby Status Updates, prefer sendStandbyStatus which builds the payload for you.

data is encoded into a CopyData frame synchronously before the first async suspension, so the caller's buffer does not need to outlive the returned Future.

proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn;
                       flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn;
                       replyRequested: bool = false): Future[void] {.
    ...stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.
proc startPhysicalReplication(conn: PgConnection; startLsn: Lsn;
                              slotName: string = ""; timeline: int32 = 0;
                              autoKeepaliveReply: bool = true;
                              callback: ReplicationCallback): Future[void] {.
    ...stackTrace: false, raises: [Exception, PgConnectionError, SslError,
                                ValueError, ProtocolError, PgQueryError,
                                AsyncTimeoutError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Begin physical replication streaming.

slotName is optional; pass "" for a slot-less stream. timeline is appended as TIMELINE n when non-zero — useful when the standby is following a specific timeline and must abort if the primary advanced past it.

The callback contract matches startReplication: each XLogData or PrimaryKeepalive is delivered as a ReplicationMessage. The raw WAL bytes inside XLogData.data are the physical WAL stream; no pgoutput decoding applies.

autoKeepaliveReply behaves identically to startReplication: when true, PrimaryKeepalive(replyRequested=true) is acknowledged with the highest observed receivedEndLsn before the callback is invoked.

On a timeline switch the server may send a final result set describing the next timeline (RowDescription + DataRow + CommandComplete) between CopyDone and ReadyForQuery. This proc drains and discards those messages; callers that need the next-timeline information should re-issue IDENTIFY_SYSTEM after this proc returns.

proc startReplication(conn: PgConnection; slotName: string;
                      startLsn: Lsn = InvalidLsn;
                      options: seq[(string, string)] = @[];
                      autoKeepaliveReply: bool = true;
                      callback: ReplicationCallback): Future[void] {.
    ...stackTrace: false, raises: [Exception, PgConnectionError, ValueError,
                                SslError, ProtocolError, PgQueryError,
                                AsyncTimeoutError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Begin logical replication streaming from the given slot.

The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. Within the callback, use sendStandbyStatus to acknowledge received data.

When autoKeepaliveReply is true (the default), the library responds automatically to PrimaryKeepalive messages with replyRequested = true before invoking the callback, sending the highest receivedEndLsn (startLsn + data.len) observed so far across received XLogData messages — or the caller-supplied startLsn if no XLogData has arrived yet — as receive/flush/apply LSN. This prevents silent disconnects from wal_sender_timeout when the callback is slow. The keepalive is still delivered to the callback. Set autoKeepaliveReply = false to manage replies manually — for example, when the flush/apply LSN must reflect callback-side progress (durable writes) rather than what has merely been received from the wire.

If no XLogData has arrived and startLsn was left at its default InvalidLsn (0/0), the auto-reply will carry 0/0 for receive/flush/apply. PostgreSQL treats this as "position unknown" and will not move confirmed_flush_lsn backwards, so the reply is still useful for resetting wal_sender_timeout without risking data loss.

If the auto-reply itself fails (for example, the connection is lost between receiving the keepalive and writing the Standby Status Update), the exception is propagated out of startReplication and the callback is not invoked for that keepalive.

The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).

proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect], forbids: [].}
Send CopyDone to gracefully terminate the replication stream. The server will respond with CopyDone and ReadyForQuery, which will be handled by the startReplication recv loop.
proc timelineHistory(conn: PgConnection; timeline: int32;
                     timeout: async_backend.Duration = ZeroDuration): Future[
    TimelineHistory] {....stackTrace: false, raises: [Exception, ValueError,
    CatchableError, PgConnectionError, PgTypeError],
                       tags: [RootEffect, TimeEffect], forbids: [].}

Execute TIMELINE_HISTORY <tli> and return the history file metadata plus its raw contents. Required when a physical standby needs to follow a timeline switch on the primary.

On timeout, the connection is marked csClosed (protocol out of sync).

proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
Convert a TupleField's data to a string by copying the bytes.

Templates

template makeReplicationCallback(body: untyped): ReplicationCallback
Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.
template toInt64(lsn: Lsn): int64
Get the LSN as int64 (for wire protocol encoding).
template toUInt64(lsn: Lsn): uint64
Get the raw uint64 value of an LSN.