Transport-layer buffering and message I/O.
- recvBuf/sendBuf management (compact, fill, send)
- Synchronous backend-message parsing (nextMessage) and the async wrapper recvMessage
- Notification/Notice dispatch (called from nextMessage)
- Transport teardown (closeTransport)
- TCP keepalive / TCP_NODELAY socket options
- Host helpers (isUnixSocket, unixSocketPath, getHosts)
- makeCopyOutCallback / makeCopyInCallback cross-backend templates
Re-exported through pg_connection.nim; depends only on types.nim and the protocol/error/backend abstraction modules.
Types
RecvWatch = ref object
-
A single in-flight background socket read used to watch for an unsolicited backend message while the client is busy sending.
A ref so it can be passed into and mutated by async helpers (a var of a value type cannot be captured across an await).
Contract: at most one background read per connection is in flight at a time. The read carries no per-read timeout — bound the whole operation with an outer wait. Before reusing the normal recv path (fillRecvBuf / nextMessage on freshly read bytes) the watch must be settled: either consume it via take + await, or drop it with cancel immediately before raising.
Vars
MSG_DONTWAIT {.importc, header: "<sys/socket.h>".}: cint
TCP_NODELAY {.importc, header: "<netinet/tcp.h>".}: cint
Procs
proc cancel(w: RecvWatch) {....raises: [Exception], tags: [RootEffect], forbids: [].}
- Abandon any in-flight read. Must be followed immediately by raising/exit: on chronos the read is cancelled asynchronously (cancelSoon), so starting a new read before unwinding would race the cancellation against the shared recvBuf. On asyncdispatch (no cancellation) the read keeps running; its eventual result is swallowed so it never surfaces as an unhandled future error.
proc closeTransport(conn: PgConnection): owned(Future[void]) {. ...stackTrace: false, raises: [Exception, LibraryError, SslError], tags: [RootEffect], forbids: [].}
- Close transport resources without sending Terminate.
proc compactRecvBuf(conn: PgConnection) {.inline, ...raises: [], tags: [], forbids: [].}
- Shift unconsumed data to the front of recvBuf, reclaiming space consumed by the read pointer. Called only before reading new data from the socket.
proc configureKeepalive(fd: posix.SocketHandle; config: ConnConfig) {. ...raises: [PgConnectionError], tags: [], forbids: [].}
- Set TCP keepalive options on the socket.
proc configureTcpNoDelay(fd: posix.SocketHandle) {....raises: [], tags: [], forbids: [].}
- Disable Nagle's algorithm for low-latency sends.
proc dispatchNotice(conn: PgConnection; msg: BackendMessage) {....raises: [], tags: [RootEffect], forbids: [].}
proc dispatchNotification(conn: PgConnection; msg: BackendMessage) {....raises: [], tags: [RootEffect], forbids: [].}
proc fillRecvBuf(conn: PgConnection; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Read data from socket into buffer. The only await point for message reception.
On AsyncTimeoutError the caller (typically invalidateOnTimeout) is responsible for the state transition. On any other CatchableError (transport failure, cancellation, etc.) the connection is marked csClosed before re-raising, since the read may have consumed an indeterminate number of bytes from the socket and the stream is no longer parseable.
proc getHosts(config: ConnConfig): seq[HostEntry] {....raises: [], tags: [], forbids: [].}
- Return the list of hosts to try. If hosts is populated, return it; otherwise synthesize a single entry from host/port.
proc isConnected(conn: PgConnection): bool {....raises: [], tags: [], forbids: [].}
-
Whether the underlying transport is present and the OS has not yet observed a peer-side close.
Cheap, non-blocking (no round trip): checks that the connection object holds a transport handle, and on POSIX also issues a single recv(MSG_PEEK | MSG_DONTWAIT) via socketHasFin to catch FIN/RST already sitting in the kernel buffer (half-open detection). On non-POSIX platforms the check falls back to handle presence only.
Pair with state == csReady to decide whether a connection is usable before issuing a query. Use ping for a full server round trip when the OS-level probe is insufficient (e.g. TLS-layer state, application liveness rather than transport liveness).
proc isUnixSocket(host: string): bool {.inline, ...raises: [], tags: [], forbids: [].}
- True if host represents a Unix socket directory (starts with '/'). Compatible with libpq behavior.
proc nextMessage(conn: PgConnection; rowData: RowData = nil; rowCount: ptr int32 = nil): Option[BackendMessage] {. ...raises: [PgProtocolError], tags: [RootEffect], forbids: [].}
-
Synchronously parse the next message from the receive buffer. Returns none if the buffer doesn't contain a complete message. Notification/Notice messages are dispatched internally. ParameterStatus messages are recorded into conn.serverParams and consumed, so callers never see them. DataRow messages are counted (if rowCount != nil) and consumed.
On PgProtocolError the protocol stream is desynchronised — the connection is transitioned to csClosed before re-raising so that it is never reused (in particular, by the connection pool).
proc rearm(w: RecvWatch; conn: PgConnection) {. ...raises: [Exception, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Resume watching with a fresh background read. Only call once the previous read has been consumed (take + await), never while one is still in flight.
proc recvMessage(conn: PgConnection; timeout = ZeroDuration; rowData: RowData = nil; rowCount: ptr int32 = nil): Future[ BackendMessage] {....stackTrace: false, raises: [Exception, ValueError, PgProtocolError, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Receive a single backend message from the connection. Thin wrapper around nextMessage + fillRecvBuf for backward compatibility.
proc sendBufMsg(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send conn.sendBuf to the server. The transport receives its own copy of the buffer, so conn.sendBuf is safe to mutate while the returned Future is still pending.
proc sendMsg(conn: PgConnection; data: seq[byte]): Future[void] {. ...stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send raw bytes to the PostgreSQL server over the connection.
proc sendRawBytes(socket: AsyncSocket; data: seq[byte]): Future[void] {. ...raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send seq[byte] via asyncdispatch socket.
proc sendRawData(socket: AsyncSocket; p: pointer; len: int): Future[void] {. ...raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send raw bytes via asyncdispatch socket. Copies data into a string once.
proc socketHasFin(conn: PgConnection): bool {....raises: [], tags: [], forbids: [].}
-
Non-blocking OS-level half-open probe (POSIX only).
Returns true when the kernel has already observed a peer-side FIN/RST on this connection's underlying socket. Returns false when the socket is alive and idle, when there is pending data (which the next operation will handle), when the probe hits transient kernel resource exhaustion (ENOMEM/ENOBUFS, which says nothing about peer state), or when there is no transport handle to probe (e.g. mock connections, or after close).
A single recv(MSG_PEEK | MSG_DONTWAIT) syscall — no round trip. For TLS connections this still detects TCP-level FIN/RST, but not TLS-layer errors that haven't been read yet; use ping for that.
On non-POSIX platforms this always returns false (no probe available).
proc socketHasPendingData(conn: PgConnection): bool {....raises: [], tags: [], forbids: [].}
-
Non-blocking OS-level check: does the kernel currently hold readable bytes on this connection's socket? (POSIX only.)
Used by SSL negotiation to detect pre-TLS plaintext injection (CVE-2021-23214 / CVE-2021-23222 family): after a server answers the SSLRequest with 'S' it must stay silent until the client sends the TLS ClientHello, so any byte already readable was injected by a man-in-the-middle to be smuggled ahead of the encrypted stream.
A single recv(MSG_PEEK | MSG_DONTWAIT) syscall — no round trip. Only a positive read of buffered bytes yields true. Returns false when the socket is idle (EAGAIN), when the peer has closed (FIN: nothing was injected), on EINTR/other transient errors, and where the probe is unavailable (non-POSIX, or no transport handle).
Note: this sees only bytes still in the kernel buffer. Data the higher-level transport has already drained into its own buffer (the chronos StreamTransport may do this) is invisible here and must be detected by the caller reading more than the single response byte.
Fail open: any non-data outcome (idle, FIN, transient or other error) yields false so a probe error never rejects a legitimate connection.
proc startRecvWatch(conn: PgConnection): RecvWatch {. ...raises: [Exception, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Begin watching for an unsolicited backend message. The bytes are committed to recvBuf when the read completes; poll with ready, then take + await (immediate once ready) and parse with nextMessage.
proc unixSocketPath(host: string; port: int): string {....raises: [], tags: [], forbids: [].}
- Build the libpq-compatible Unix socket file path: {dir}/.s.PGSQL.{port}.
Templates
template makeCopyInCallback(body: untyped): CopyInCallback
-
Create a CopyInCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.
With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.
var idx = 0 let rows = @["1\tAlice\n".toBytes(), "2\tBob\n".toBytes()] let cb = makeCopyInCallback: if idx < rows.len: let chunk = rows[idx] inc idx chunk else: newSeq[byte]()
template makeCopyOutCallback(body: untyped): CopyOutCallback
-
Create a CopyOutCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]] let cb = makeCopyOutCallback: chunks.add(data)