async_postgres/pg_connection/notify

LISTEN / NOTIFY plumbing.

  • onNotify / onListenError / listen / unlisten — channel subscription API.
  • startListening / stopListening / listenPump — background pump that converts incoming NotificationResponse messages into queue/ callback dispatch, with auto-reconnect on transport failure.
  • reconnectInPlace — replace the dead transport on the existing PgConnection object and re-LISTEN every subscribed channel so external references survive the reconnect.
  • waitNotification — async pull entry point with timeout and overflow detection.

Imports lifecycle.connect for reconnectInPlace and simple_query for the LISTEN/UNLISTEN round trips. Re-exported through pg_connection.nim.

Procs

proc listen(conn: PgConnection; channel: string): Future[void] {.
    ...stackTrace: false, raises: [Exception, CancelledError, ValueError,
                                PgConnectionError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Subscribe to a LISTEN channel and start the background notification pump.
proc listenPump(conn: PgConnection): owned(Future[void]) {....stackTrace: false,
    raises: [Exception], tags: [RootEffect, TimeEffect, WriteIOEffect],
    forbids: [].}
Background loop: repeatedly receives messages, dispatching notifications. Non-notification messages are discarded (recvMessage handles dispatch). On connection failure, attempts automatic reconnection with exponential backoff (up to listenReconnectMaxAttempts attempts; 0 or negative = unlimited) and re-subscribes to all channels. Exits cleanly when state changes from csListening (via stopListening sending an empty query), then drains until ReadyForQuery.
proc onListenError(conn: PgConnection; callback: proc (err: ref PgListenError) {.
    ...gcsafe, raises: [].}) {....raises: [], tags: [], forbids: [].}
Set a callback invoked when the listen pump dies permanently (reconnection failed, or the connection was lost with no channels left to re-subscribe). Push API (onNotify) users have no other way to learn the pump is gone; pull API users see the same failure raised from waitNotification.
proc onNotify(conn: PgConnection; callback: NotifyCallback) {....raises: [],
    tags: [], forbids: [].}
Set a callback invoked for each incoming NOTIFY message.
proc reconnectInPlace(conn: PgConnection): owned(Future[void]) {.
    ...stackTrace: false,
    raises: [Exception, LibraryError, SslError, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Reconnect using stored config, re-LISTENing on all channels.
proc startListening(conn: PgConnection) {....raises: [Exception],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc stopListening(conn: PgConnection): owned(Future[void]) {....stackTrace: false,
    raises: [Exception, CancelledError, ValueError], tags: [RootEffect],
    forbids: [].}
Stop the background listen pump and return the connection to csReady.
proc unlisten(conn: PgConnection; channel: string): Future[void] {.
    ...stackTrace: false, raises: [Exception, CancelledError, ValueError,
                                PgConnectionError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Unsubscribe from a LISTEN channel. Stops the pump if no channels remain.
proc waitNotification(conn: PgConnection; timeout: Duration = ZeroDuration): Future[
    Notification] {....stackTrace: false, raises: [Exception, ValueError,
    PgNotifyOverflowError, PgListenError, PgConnectionError, PgError,
    PgTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
Wait for the next notification from the buffer. If the buffer is empty, blocks until a notification arrives or timeout expires. Raises PgNotifyOverflowError if notifications were dropped due to queue overflow. Raises PgListenError if the listen pump has died (e.g. reconnection failed).