LISTEN / NOTIFY plumbing.
- onNotify / onListenError / listen / unlisten — channel subscription API.
- startListening / stopListening / listenPump — background pump that converts incoming NotificationResponse messages into queue/ callback dispatch, with auto-reconnect on transport failure.
- reconnectInPlace — replace the dead transport on the existing PgConnection object and re-LISTEN every subscribed channel so external references survive the reconnect.
- waitNotification — async pull entry point with timeout and overflow detection.
Imports lifecycle.connect for reconnectInPlace and simple_query for the LISTEN/UNLISTEN round trips. Re-exported through pg_connection.nim.
Procs
proc listen(conn: PgConnection; channel: string): Future[void] {. ...stackTrace: false, raises: [Exception, CancelledError, ValueError, PgConnectionError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Subscribe to a LISTEN channel and start the background notification pump.
proc listenPump(conn: PgConnection): owned(Future[void]) {....stackTrace: false, raises: [Exception], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Background loop: repeatedly receives messages, dispatching notifications. Non-notification messages are discarded (recvMessage handles dispatch). On connection failure, attempts automatic reconnection with exponential backoff (up to listenReconnectMaxAttempts attempts; 0 or negative = unlimited) and re-subscribes to all channels. Exits cleanly when state changes from csListening (via stopListening sending an empty query), then drains until ReadyForQuery.
proc onListenError(conn: PgConnection; callback: proc (err: ref PgListenError) {. ...gcsafe, raises: [].}) {....raises: [], tags: [], forbids: [].}
- Set a callback invoked when the listen pump dies permanently (reconnection failed, or the connection was lost with no channels left to re-subscribe). Push API (onNotify) users have no other way to learn the pump is gone; pull API users see the same failure raised from waitNotification.
proc onNotify(conn: PgConnection; callback: NotifyCallback) {....raises: [], tags: [], forbids: [].}
- Set a callback invoked for each incoming NOTIFY message.
proc reconnectInPlace(conn: PgConnection): owned(Future[void]) {. ...stackTrace: false, raises: [Exception, LibraryError, SslError, ValueError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Reconnect using stored config, re-LISTENing on all channels.
proc startListening(conn: PgConnection) {....raises: [Exception], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc stopListening(conn: PgConnection): owned(Future[void]) {....stackTrace: false, raises: [Exception, CancelledError, ValueError], tags: [RootEffect], forbids: [].}
- Stop the background listen pump and return the connection to csReady.
proc unlisten(conn: PgConnection; channel: string): Future[void] {. ...stackTrace: false, raises: [Exception, CancelledError, ValueError, PgConnectionError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Unsubscribe from a LISTEN channel. Stops the pump if no channels remain.
proc waitNotification(conn: PgConnection; timeout: Duration = ZeroDuration): Future[ Notification] {....stackTrace: false, raises: [Exception, ValueError, PgNotifyOverflowError, PgListenError, PgConnectionError, PgError, PgTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Wait for the next notification from the buffer. If the buffer is empty, blocks until a notification arrives or timeout expires. Raises PgNotifyOverflowError if notifications were dropped due to queue overflow. Raises PgListenError if the listen pump has died (e.g. reconnection failed).