PostgreSQL Logical Replication support.
Provides types and procedures for consuming a logical replication stream via the PostgreSQL streaming replication protocol. The streaming API is plugin-agnostic (raw WAL bytes are delivered to a callback). A built-in decoder for the pgoutput logical decoding plugin is included.
Quick start
let conn = await connectReplication("postgresql://user:pass@host/db") defer: await conn.close() let slot = await conn.createReplicationSlot("my_slot", "pgoutput", temporary = true) await conn.startReplication("my_slot", slot.consistentPoint, options = {"proto_version": "'1'", "publication_names": "'my_pub'"}, callback = myCallback)
Types
BeginMessage = object finalLsn*: Lsn ## LSN of the commit record commitTime*: int64 ## Commit timestamp (microseconds since PG epoch) xid*: int32 ## Transaction ID
- Transaction begin.
CommitMessage = object flags*: byte commitLsn*: Lsn endLsn*: Lsn commitTime*: int64
- Transaction commit.
DeleteMessage = object relationId*: int32 oldTuple*: seq[TupleField]
- Row deletion.
InsertMessage = object relationId*: int32 newTuple*: seq[TupleField]
- Row insertion.
LogicalMessage = object flags*: byte ## Bit 0: transactional lsn*: Lsn prefix*: string content*: seq[byte]
- Generic logical decoding message (via pg_logical_emit_message).
Lsn = distinct uint64
- LSN (Log Sequence Number) PostgreSQL Log Sequence Number. Displayed as "X/Y" where X and Y are hex-encoded upper and lower 32-bit halves.
OriginMessage = object originLsn*: Lsn originName*: string
- Replication origin.
PgOutputMessage = object case kind*: PgOutputMessageKind of pomkBegin: begin*: BeginMessage of pomkCommit: commit*: CommitMessage of pomkOrigin: origin*: OriginMessage of pomkRelation: relation*: RelationInfo of pomkType: typeMsg*: TypeMessage of pomkInsert: insert*: InsertMessage of pomkUpdate: update*: UpdateMessage of pomkDelete: delete*: DeleteMessage of pomkTruncate: truncate*: TruncateMessage of pomkMessage: message*: LogicalMessage
- A decoded pgoutput plugin message.
PgOutputMessageKind = enum pomkBegin, pomkCommit, pomkOrigin, pomkRelation, pomkType, pomkInsert, pomkUpdate, pomkDelete, pomkTruncate, pomkMessage
- Message types within the pgoutput logical decoding plugin.
PrimaryKeepalive = object walEnd*: Lsn ## Current end of WAL on the server sendTime*: int64 ## Server send time (microseconds since PG epoch) replyRequested*: bool ## Whether the server wants an immediate status reply
- Keepalive message from the server.
RelationCache = Table[int32, RelationInfo]
- Cache of relation metadata received during replication. The server sends a Relation message before the first DML for each table in a transaction; clients must cache them.
RelationColumn = object flags*: byte ## Bit 0: part of replica identity key name*: string typeOid*: int32 typeMod*: int32
- A single column in a relation definition.
RelationInfo = object relationId*: int32 namespace*: string ## Schema name name*: string ## Table name replicaIdentity*: char ## 'd' (default), 'n' (nothing), 'f' (full), 'i' (index) columns*: seq[RelationColumn]
- Relation (table) metadata sent by pgoutput before DML events.
ReplicationCallback = proc (msg: ReplicationMessage): Future[void] {....gcsafe.}
- Callback invoked for each replication message during streaming.
ReplicationMessage = object case kind*: ReplicationMessageKind of rmkXLogData: xlogData*: XLogData of rmkPrimaryKeepalive: keepalive*: PrimaryKeepalive
- A single message received during replication streaming.
ReplicationMessageKind = enum rmkXLogData, rmkPrimaryKeepalive
- Replication message types (decoded from CopyData during streaming)
ReplicationSlotInfo = object slotName*: string consistentPoint*: Lsn ## confirmed_flush_lsn (logical) or restart_lsn (physical) snapshotName*: string ## Snapshot name (only available at CREATE time) outputPlugin*: string
- Information about a replication slot.
SystemInfo = object systemId*: string timeline*: int32 xLogPos*: Lsn dbName*: string
- Result of IDENTIFY_SYSTEM command.
TruncateMessage = object options*: byte ## Bit 0: CASCADE, bit 1: RESTART IDENTITY relationIds*: seq[int32]
- Table truncation.
TupleDataKind = enum tdkNull = 110, ## NULL value tdkText = 116, ## Text-formatted value tdkBinary = 98, ## Binary-formatted value (protocol_version >= 2) tdkUnchanged = 117 ## TOAST value unchanged
- Kind of a single field value in a pgoutput tuple.
TupleField = object kind*: TupleDataKind data*: seq[byte] ## Empty for null/unchanged
- A single field value in a pgoutput tuple.
TypeMessage = object typeId*: int32 namespace*: string name*: string
- Custom type definition.
UpdateMessage = object relationId*: int32 hasOldTuple*: bool ## True if old key/full row is included oldTuple*: seq[TupleField] newTuple*: seq[TupleField]
- Row update.
Consts
InvalidLsn = 0'u
- Sentinel value representing an invalid or unset LSN.
pgEpochOffset = 946684800'i64
- Seconds between Unix epoch (1970-01-01) and PostgreSQL epoch (2000-01-01).
Procs
proc connectReplication(config: ConnConfig): Future[PgConnection] {....raises: [ Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Connect to PostgreSQL with replication=database in startup parameters. This enables replication commands (IDENTIFY_SYSTEM, CREATE_REPLICATION_SLOT, etc.).
proc connectReplication(dsn: string): Future[PgConnection] {....raises: [PgError, Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError], tags: [ReadIOEffect, RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Connect to PostgreSQL with replication=database using a DSN string.
proc createReplicationSlot(conn: PgConnection; slotName: string; plugin: string = "pgoutput"; temporary: bool = false): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Create a logical replication slot. Returns slot info including the consistent point LSN.
proc currentPgTimestamp(): int64 {....raises: [], tags: [TimeEffect], forbids: [].}
- Current time as microseconds since the PostgreSQL epoch (2000-01-01 UTC).
proc decodePgOutput(msg: XLogData): PgOutputMessage {....raises: [ProtocolError], tags: [], forbids: [].}
- Convenience: decode the pgoutput message from an XLogData's data field.
proc dropReplicationSlot(conn: PgConnection; slotName: string; wait: bool = false): Future[void] {....stackTrace: false, raises: [ Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Drop a replication slot.
proc identifySystem(conn: PgConnection): Future[SystemInfo] {....stackTrace: false, raises: [ Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute IDENTIFY_SYSTEM and return system identification info.
proc parsePgOutputMessage(data: openArray[byte]): PgOutputMessage {. ...raises: [ProtocolError], tags: [], forbids: [].}
- Decode a pgoutput logical decoding message from raw WAL bytes.
proc parseReplicationMessage(copyData: seq[byte]): ReplicationMessage {. ...raises: [ProtocolError], tags: [], forbids: [].}
- Parse a CopyData payload into a ReplicationMessage.
proc readReplicationSlot(conn: PgConnection; slotName: string): Future[ ReplicationSlotInfo] {....stackTrace: false, raises: [Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Read information about an existing replication slot.
proc sendStandbyStatus(conn: PgConnection; receiveLsn: Lsn; flushLsn: Lsn = InvalidLsn; applyLsn: Lsn = InvalidLsn; replyRequested: bool = false): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
- Send a Standby Status Update to the server during replication streaming. Must be called while the connection is in csReplicating state.
proc startReplication(conn: PgConnection; slotName: string; startLsn: Lsn = InvalidLsn; options: seq[(string, string)] = @[]; callback: ReplicationCallback): Future[void] {. ...stackTrace: false, raises: [Exception, PgConnectionError, ValueError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Begin logical replication streaming from the given slot.
The callback is invoked for each XLogData or PrimaryKeepalive message received. The callback is awaited, providing natural TCP backpressure. Within the callback, use sendStandbyStatus to acknowledge received data.
The proc returns when the server sends CopyDone or the connection closes. To stop replication from the client side, call stopReplication from within the callback (or from a concurrent task).
proc stopReplication(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, PgConnectionError, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send CopyDone to gracefully terminate the replication stream. The server will respond with CopyDone and ReadyForQuery, which will be handled by the startReplication recv loop.
proc toString(field: TupleField): string {....raises: [], tags: [], forbids: [].}
- Convert a TupleField's data to a string by copying the bytes.
Templates
template makeReplicationCallback(body: untyped): ReplicationCallback
- Create a ReplicationCallback that works with both asyncdispatch and chronos. Inside body, the current message is available as msg: ReplicationMessage.