async_postgres/pg_replication

Search:
Group by:

PostgreSQL Logical Replication support.

Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.

Quick start

let conn = await connectReplication("postgresql://user:pass@host/db")
defer: await conn.close()
let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true)
await conn.startReplication("my_slot", slot.consistentPoint,
    options = {"proto_version": "'1'", "publication_names": "'my_pub'"},
    callback = myCallback)

Types

BeginMessage = object
  finalLsn*: Lsn             ## LSN of the commit record
  commitTime*: int64         ## Commit timestamp (microseconds since PG epoch)
  xid*: int32                ## Transaction ID
Transaction begin.
CommitMessage = object
  flags*: byte
  commitLsn*: Lsn
  endLsn*: Lsn
  commitTime*: int64
Transaction commit.
DeleteMessage = object
  relationId*: int32
  oldTuple*: seq[TupleField]
Row deletion.
InsertMessage = object
  relationId*: int32
  newTuple*: seq[TupleField]
Row insertion.
LogicalMessage = object
  flags*: byte               ## Bit 0: transactional
  lsn*: Lsn
  prefix*: string
  content*: seq[byte]
Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object
  originLsn*: Lsn
  originName*: string
Replication origin.
PgOutputMessage = object
  case kind*: PgOutputMessageKind
  of pomkBegin:
    begin*: BeginMessage
  of pomkCommit:
    commit*: CommitMessage
  of pomkOrigin:
    origin*: OriginMessage
  of pomkRelation:
    relation*: RelationInfo
  of pomkType:
    typeMsg*: TypeMessage
  of pomkInsert:
    insert*: InsertMessage
  of pomkUpdate:
    update*: UpdateMessage
  of pomkDelete:
    delete*: DeleteMessage
  of pomkTruncate:
    truncate*: TruncateMessage
  of pomkMessage:
    message*: LogicalMessage
A decoded pgoutput plugin message.
PgOutputMessageKind = enum
  pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert,
  pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object
  walEnd*: Lsn               ## Current end of WAL on the server
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  replyRequested*: bool      ## Whether the server wants an immediate status reply
Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object
  flags*: byte               ## Bit 0: part of replica identity key
  name*: string
  typeOid*: int32
  typeMod*: int32
A single column in a relation definition.
RelationInfo = object
  relationId*: int32
  namespace*: string         ## Schema name
  name*: string              ## Table name
  replicaIdentity*: char     ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index)
  columns*: seq[RelationColumn]
Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
Callback invoked for each replication message during streaming.
ReplicationMessage = object
  case kind*: ReplicationMessageKind
  of rmkXLogData:
    xlogData*: XLogData
  of rmkPrimaryKeepalive:
    keepalive*: PrimaryKeepalive
A single message received during replication streaming.
ReplicationMessageKind = enum
  rmkXLogData, rmkPrimaryKeepalive
Replication message types (decoded from CopyData during streaming)
ReplicationSlotInfo = object
  slotName*: string
  consistentPoint*: Lsn      ## confirmed_flush_lsn (logical) or restart_lsn (physical)
  snapshotName*: string      ## Snapshot name (only available at CREATE time)
  outputPlugin*: string
Information about a replication slot.
SystemInfo = object
  systemId*: string
  timeline*: int32
  xLogPos*: Lsn
  dbName*: string
Result of IDENTIFY_SYSTEM command.
TruncateMessage = object
  options*: byte             ## Bit 0: CASCADE, bit 1: RESTART IDENTITY
  relationIds*: seq[int32]
Table truncation.
TupleDataKind = enum
  tdkNull = 110,            ## NULL value
  tdkText = 116,            ## Text-formatted value
  tdkBinary = 98,           ## Binary-formatted value (protocol_version >= 2)
  tdkUnchanged = 117         ## TOAST value unchanged
Kind of a single field value in a pgoutput tuple.
TupleField = object
  kind*: TupleDataKind
  data*: seq[byte]           ## Empty for null/unchanged
A single field value in a pgoutput tuple.
TypeMessage = object
  typeId*: int32
  namespace*: string
  name*: string
Custom type definition.
UpdateMessage = object
  relationId*: int32
  hasOldTuple*: bool         ## True if old key/full row is included
  oldTuple*: seq[TupleField]
  newTuple*: seq[TupleField]
Row update.
XLogData = object
  startLsn*: Lsn             ## Start of the WAL data in this message
  endLsn*: Lsn               ## Current end of WAL on the server
  sendTime*: int64           ## Server send time (microseconds since PG epoch)
  data*: seq[byte]           ## Raw WAL data (plugin-dependent format)
WAL data payload from the server.

Consts

InvalidLsn = 0'u
Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).

Procs

proc `$`(lsn: Lsn): string {....raises: [], tags: [], forbids: [].}
Format an LSN as "X/Y" hex string.
proc `<`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `<=`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc `==`(a, b: Lsn): bool {.borrow, ...raises: [], tags: [], forbids: [].}
proc connectReplication(config: ConnConfig): Future[PgConnection] {....raises: [
    Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Connect to PostgreSQL with replication=database in startup parameters. This enables replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, etc.).
proc connectReplication(dsn: string): Future[PgConnection] {....raises: [PgError,
    Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError],
    tags: [ReadIOEffect, RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Connect to PostgreSQL with replication=database using a DSN string.
proc createReplicationSlot(conn: PgConnection; slotName: string;
                           plugin: string = "pgoutput"; temporary: bool = false): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError,
    PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
Create a logical replication slot. Returns slot info including the consistent point LSN.
proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [ProtocolError],
    tags: [], forbids: [].}
Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string;
                         wait: bool = false): Future[void] {....stackTrace: false, raises: [
    Exception, ValueError, PgConnectionError, SslError, ProtocolError,
    PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect],
    forbids: [].}
Drop a replication slot.
proc identifySystem(conn: PgConnection): Future[SystemInfo] {....stackTrace: false, raises: [
    Exception, ValueError, PgConnectionError, SslError, ProtocolError,
    PgQueryError, AsyncTimeoutError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute IDENTIFY_SYSTEM and return system identification info.
proc parseLsn(s: string): Lsn {....raises: [ValueError], tags: [], forbids: [].}
Parse an LSN from "X/Y" hex string. Raises ValueError on invalid format.
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: seq[byte]): ReplicationMessage {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Parse a CopyData payload into a ReplicationMessage.
proc readReplicationSlot(conn: PgConnection; slotName: string): Future[
    ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError,
    PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError,
    PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
Read information about an existing replication slot.
proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn;
                       flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn;
                       replyRequested: bool = false): Future[void] {.
    ...stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.
proc startReplication(conn: PgConnection; slotName: string;
                      startLsn: Lsn = InvalidLsn;
                      options: seq[(string, string)] = @[];
                      callback: ReplicationCallback): Future[void] {.
    ...stackTrace: false, raises: [Exception, PgConnectionError, ValueError,
                                SslError, ProtocolError, PgQueryError,
                                AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Begin logical replication streaming from the given slot.

The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. Within the callback, use sendStandbyStatus to acknowledge received data.

The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).

proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, PgConnectionError, SslError, ValueError],
    tags: [RootEffect], forbids: [].}
Send CopyDone to gracefully terminate the replication stream. The server will respond with CopyDone and ReadyForQuery, which will be handled by the startReplication recv loop.
proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
Convert a TupleField's data to a string by copying the bytes.

Templates

template makeReplicationCallback(body: untyped): ReplicationCallback
Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.
template toInt64(lsn: Lsn): int64
Get the LSN as int64 (for wire protocol encoding).
template toUInt64(lsn: Lsn): uint64
Get the raw uint64 value of an LSN.