Transport-layer buffering and message I/O.
- recvBuf/sendBuf management (compact, fill, send)
- Synchronous backend-message parsing (nextMessage) and the async wrapper recvMessage
- Notification/Notice dispatch (called from nextMessage)
- Transport teardown (closeTransport)
- TCP keepalive / TCP_NODELAY socket options
- Host helpers (isUnixSocket, unixSocketPath, getHosts)
- makeCopyOutCallback / makeCopyInCallback cross-backend templates
Re-exported through pg_connection.nim; depends only on types.nim and the protocol/error/backend abstraction modules.
Vars
MSG_DONTWAIT {.importc, header: "<sys/socket.h>".}: cint
TCP_NODELAY {.importc, header: "<netinet/tcp.h>".}: cint
Procs
proc closeTransport(conn: PgConnection): owned(Future[void]) {. ...stackTrace: false, raises: [Exception, LibraryError, SslError], tags: [RootEffect], forbids: [].}
- Close transport resources without sending Terminate.
proc compactRecvBuf(conn: PgConnection) {.inline, ...raises: [], tags: [], forbids: [].}
- Shift unconsumed data to the front of recvBuf, reclaiming space consumed by the read pointer. Called only before reading new data from the socket.
proc configureKeepalive(fd: posix.SocketHandle; config: ConnConfig) {. ...raises: [PgConnectionError], tags: [], forbids: [].}
- Set TCP keepalive options on the socket.
proc configureTcpNoDelay(fd: posix.SocketHandle) {....raises: [], tags: [], forbids: [].}
- Disable Nagle's algorithm for low-latency sends.
proc dispatchNotice(conn: PgConnection; msg: BackendMessage) {....raises: [], tags: [RootEffect], forbids: [].}
proc dispatchNotification(conn: PgConnection; msg: BackendMessage) {....raises: [], tags: [RootEffect], forbids: [].}
proc fillRecvBuf(conn: PgConnection; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Read data from socket into buffer. The only await point for message reception.
On AsyncTimeoutError the caller (typically invalidateOnTimeout) is responsible for the state transition. On any other CatchableError (transport failure, cancellation, etc.) the connection is marked csClosed before re-raising, since the read may have consumed an indeterminate number of bytes from the socket and the stream is no longer parseable.
proc getHosts(config: ConnConfig): seq[HostEntry] {....raises: [], tags: [], forbids: [].}
- Return the list of hosts to try. If hosts is populated, return it; otherwise synthesize a single entry from host/port.
proc isConnected(conn: PgConnection): bool {....raises: [], tags: [], forbids: [].}
-
Whether the underlying transport is present and the OS has not yet observed a peer-side close.
Cheap, non-blocking (no round trip): checks that the connection object holds a transport handle, and on POSIX also issues a single recv(MSG_PEEK | MSG_DONTWAIT) via socketHasFin to catch FIN/RST already sitting in the kernel buffer (half-open detection). On non-POSIX platforms the check falls back to handle presence only.
Pair with state == csReady to decide whether a connection is usable before issuing a query. Use ping for a full server round trip when the OS-level probe is insufficient (e.g. TLS-layer state, application liveness rather than transport liveness).
proc isUnixSocket(host: string): bool {.inline, ...raises: [], tags: [], forbids: [].}
- True if host represents a Unix socket directory (starts with '/'). Compatible with libpq behavior.
proc nextMessage(conn: PgConnection; rowData: RowData = nil; rowCount: ptr int32 = nil): Option[BackendMessage] {. ...raises: [ProtocolError], tags: [RootEffect], forbids: [].}
-
Synchronously parse the next message from the receive buffer. Returns none if the buffer doesn't contain a complete message. Notification/Notice messages are dispatched internally. DataRow messages are counted (if rowCount != nil) and consumed.
On ProtocolError the protocol stream is desynchronised — the connection is transitioned to csClosed before re-raising so that it is never reused (in particular, by the connection pool).
proc recvMessage(conn: PgConnection; timeout = ZeroDuration; rowData: RowData = nil; rowCount: ptr int32 = nil): Future[ BackendMessage] {....stackTrace: false, raises: [Exception, ValueError, ProtocolError, AsyncTimeoutError, CatchableError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Receive a single backend message from the connection. Thin wrapper around nextMessage + fillRecvBuf for backward compatibility.
proc sendBufMsg(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send conn.sendBuf to the server. The transport receives its own copy of the buffer, so conn.sendBuf is safe to mutate while the returned Future is still pending.
proc sendMsg(conn: PgConnection; data: seq[byte]): Future[void] {. ...stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send raw bytes to the PostgreSQL server over the connection.
proc sendRawBytes(socket: AsyncSocket; data: seq[byte]): Future[void] {. ...raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send seq[byte] via asyncdispatch socket.
proc sendRawData(socket: AsyncSocket; p: pointer; len: int): Future[void] {. ...raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send raw bytes via asyncdispatch socket. Copies data into a string once.
proc socketHasFin(conn: PgConnection): bool {....raises: [], tags: [], forbids: [].}
-
Non-blocking OS-level half-open probe (POSIX only).
Returns true when the kernel has already observed a peer-side FIN/RST on this connection's underlying socket. Returns false when the socket is alive and idle, when there is pending data (which the next operation will handle), or when there is no transport handle to probe (e.g. mock connections, or after close).
A single recv(MSG_PEEK | MSG_DONTWAIT) syscall — no round trip. For TLS connections this still detects TCP-level FIN/RST, but not TLS-layer errors that haven't been read yet; use ping for that.
On non-POSIX platforms this always returns false (no probe available).
proc unixSocketPath(host: string; port: int): string {....raises: [], tags: [], forbids: [].}
- Build the libpq-compatible Unix socket file path: {dir}/.s.PGSQL.{port}.
Templates
template makeCopyInCallback(body: untyped): CopyInCallback
-
Create a CopyInCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.
With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.
var idx = 0 let rows = @["1\tAlice\n".toBytes(), "2\tBob\n".toBytes()] let cb = makeCopyInCallback: if idx < rows.len: let chunk = rows[idx] inc idx chunk else: newSeq[byte]()
template makeCopyOutCallback(body: untyped): CopyOutCallback
-
Create a CopyOutCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]] let cb = makeCopyOutCallback: chunks.add(data)