Types
CachedStmt = object name*: string ## Server-side statement name ("_sc_1", "_sc_2", ...) fields*: seq[FieldDescription] ## From Describe(Statement), formatCode=0 resultFormats*: seq[int16] ## Cached buildResultFormats() output colFmts*: seq[int16] ## Per-column format codes for RowData colOids*: seq[int32] ## Per-column type OIDs for RowData lruNode*: DoublyLinkedNode[string] ## Embedded LRU list node
- A cached prepared statement in the LRU statement cache.
ConnConfig = object host*: string port*: int user*: string password*: string database*: string sslMode*: SslMode sslRootCert*: string ## PEM-encoded CA certificate(s) for sslVerifyCa/sslVerifyFull applicationName*: string connectTimeout*: Duration ## TCP connect timeout (default: no timeout) keepAlive*: bool ## Enable TCP keepalive (default true via parseDsn) keepAliveIdle*: int ## Seconds before first probe (0 = OS default) keepAliveInterval*: int ## Seconds between probes (0 = OS default) keepAliveCount*: int ## Number of probes before giving up (0 = OS default) hosts*: seq[HostEntry] ## Multiple hosts for failover (empty = use host/port) targetSessionAttrs*: TargetSessionAttrs ## Target server type (default tsaAny) extraParams*: seq[(string, string)] ## Additional startup parameters
- Connection configuration. Construct via parseDsn or set fields directly.
CopyInCallback = proc (): Future[seq[byte]] {....gcsafe.}
- Callback supplying data chunks during streaming COPY IN. Return empty seq to finish.
CopyInInfo = object format*: CopyFormat columnFormats*: seq[int16] commandTag*: string
- Metadata returned when a streaming COPY IN begins.
CopyOutCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
- Callback receiving each chunk during streaming COPY OUT.
CopyOutInfo = object format*: CopyFormat columnFormats*: seq[int16] commandTag*: string
- Metadata returned when a streaming COPY OUT begins.
CopyResult = object format*: CopyFormat columnFormats*: seq[int16] data*: seq[seq[byte]] commandTag*: string
- Result of a buffered COPY OUT operation: all rows collected in memory.
HostEntry = object host*: string port*: int
- A single host:port entry for multi-host connection.
Notice = object fields*: seq[ErrorField]
- A notice or warning message from the server (not an error).
NoticeCallback = proc (notice: Notice) {....gcsafe, raises: [].}
- Callback invoked when a notice/warning message arrives.
Notification = object pid*: int32 channel*: string payload*: string
- A NOTIFY message received from PostgreSQL.
NotifyCallback = proc (notification: Notification) {....gcsafe, raises: [].}
- Callback invoked when a NOTIFY message arrives.
PgConnection = ref object when hasChronos: transport*: StreamTransport elif hasAsyncDispatch: socket*: AsyncSocket sslEnabled*: bool recvBuf*: seq[byte] recvBufStart*: int ## Read pointer into recvBuf; bytes before this are consumed state*: PgConnState pid*: int32 secretKey*: int32 serverParams*: Table[string, string] txStatus*: TransactionStatus notifyCallback*: NotifyCallback noticeCallback*: NoticeCallback listenChannels*: HashSet[string] listenTask*: Future[void] createdAt*: Moment portalCounter*: int config*: ConnConfig notifyQueue*: Deque[Notification] notifyMaxQueue*: int sendBuf*: seq[byte] ## Reusable send buffer for COPY IN batching notifyDropped*: int ## Count of notifications dropped due to queue overflow reconnectCallback*: proc () {....gcsafe, raises: [].} notifyOverflowCallback*: proc (dropped: int) {....gcsafe, raises: [].} stmtCache*: Table[string, CachedStmt] stmtCounter*: int stmtCacheCapacity*: int ## 0=disabled, default 256 rowDataBuf*: RowData ## Reusable RowData buffer to avoid per-query allocation hstoreOid*: int32 ## Dynamic OID for hstore extension type; 0 if not available
- A single PostgreSQL connection with buffered I/O and statement caching.
PgConnectionError = object of PgError
- Connection failures, disconnections, SSL/auth errors.
PgConnState = enum csConnecting, csAuthentication, csReady, csBusy, csListening, csReplicating, csClosed
- Connection lifecycle state.
PgNotifyOverflowError = object of PgError dropped*: int ## Number of notifications dropped due to queue overflow
PgPoolError = object of PgError
- Pool exhaustion, pool closed, or acquire timeout.
PgQueryError = object of PgError sqlState*: string ## 5-char SQLSTATE code (e.g. "42P01"), empty if unavailable. severity*: string ## e.g. "ERROR", "FATAL" detail*: string ## DETAIL field, empty if not present. hint*: string ## HINT field, empty if not present.
- SQL execution errors from the server (ErrorResponse).
PgTimeoutError = object of PgError
- Operation timed out.
QueryResult = object fields*: seq[FieldDescription] data*: RowData rowCount*: int32 commandTag*: string
- Result of a query: field descriptions, row data, and command tag.
RowCallback = proc (row: Row) {....gcsafe.}
- Callback invoked once per row during queryEach. The Row is only valid inside the callback — its backing buffer is reused for the next row.
SslMode = enum sslDisable, ## Disable SSL (default) sslAllow, ## Try plaintext; fall back to SSL if refused sslPrefer, ## Try SSL; fall back to plaintext if refused sslRequire, ## Require SSL (no certificate verification) sslVerifyCa, ## Require SSL + verify CA chain (no hostname verification) sslVerifyFull ## Require SSL + verify CA chain and hostname
- SSL/TLS negotiation mode for the connection.
TargetSessionAttrs = enum tsaAny, ## Connect to any server (default) tsaReadWrite, ## Read-write server (primary) tsaReadOnly, ## Read-only server (standby) tsaPrimary, ## Primary server tsaStandby, ## Standby server tsaPreferStandby ## Prefer standby, fall back to any
- Target server type for multi-host failover (libpq compatible).
Procs
proc addStmtCache(conn: PgConnection; sql: string; cached: CachedStmt) {. ...raises: [], tags: [], forbids: [].}
- Add a prepared statement to the cache with auto-computed result formats.
proc cancel(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, OSError, ValueError, IOError, SslError, LibraryError], tags: [RootEffect], forbids: [].}
- Send a CancelRequest over a separate connection to abort the running query.
proc cancelNoWait(conn: PgConnection) {....raises: [Exception], tags: [RootEffect], forbids: [].}
- Schedule a best-effort CancelRequest without waiting. For use in timeout handlers.
proc checkReady(conn: PgConnection) {....raises: [PgConnectionError], tags: [], forbids: [].}
- Assert that the connection is in csReady state. Raises PgConnectionError otherwise.
proc clearStmtCache(conn: PgConnection) {....raises: [], tags: [], forbids: [].}
- Clear the client-side statement cache. Does not close server-side statements.
proc close(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, ValueError, LibraryError, SslError], tags: [RootEffect], forbids: [].}
- Close the connection. Idempotent: safe to call multiple times.
proc columnIndex(qr: QueryResult; name: string): int {....raises: [PgTypeError], tags: [], forbids: [].}
- Find the index of a column by name in a query result.
proc connect(config: ConnConfig): Future[PgConnection] {....raises: [Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Establish a new connection to a PostgreSQL server. Supports multi-host failover: tries each host in order. Respects targetSessionAttrs to select the appropriate server type. The connectTimeout wraps the entire multi-host connection attempt.
proc connect(dsn: string): Future[PgConnection] {....raises: [Exception, ValueError, AsyncTimeoutError, CancelledError, PgConnectionError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect, ReadIOEffect], forbids: [].}
- Shorthand for connect(parseDsn(dsn)).
proc dispatchNotice(conn: PgConnection; msg: BackendMessage) {....raises: [], tags: [RootEffect], forbids: [].}
proc dispatchNotification(conn: PgConnection; msg: BackendMessage) {. ...raises: [Exception], tags: [RootEffect], forbids: [].}
proc evictStmtCache(conn: PgConnection): CachedStmt {....raises: [KeyError], tags: [], forbids: [].}
- Evict the least recently used entry from the cache. Returns the evicted entry.
proc fillRecvBuf(conn: PgConnection; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, SslError, AsyncTimeoutError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Read data from socket into buffer. The only await point for message reception.
proc getHosts(config: ConnConfig): seq[HostEntry] {....raises: [], tags: [], forbids: [].}
- Return the list of hosts to try. If hosts is populated, return it; otherwise synthesize a single entry from host/port.
proc initConnConfig(host = "127.0.0.1"; port = 5432; user = ""; password = ""; database = ""; sslMode = sslDisable; sslRootCert = ""; applicationName = ""; connectTimeout = ZeroDuration; keepAlive = true; keepAliveIdle = 0; keepAliveInterval = 0; keepAliveCount = 0; hosts: seq[HostEntry] = @[]; targetSessionAttrs = tsaAny; extraParams: seq[(string, string)] = @[]): ConnConfig {. ...raises: [], tags: [], forbids: [].}
- Create a connection configuration with sensible defaults. For DSN-based configuration, use parseDsn instead.
proc isUnixSocket(host: string): bool {.inline, ...raises: [], tags: [], forbids: [].}
- True if host represents a Unix socket directory (starts with '/'). Compatible with libpq behavior.
proc len(qr: QueryResult): int {.inline, ...raises: [], tags: [], forbids: [].}
- Return the number of rows in the query result.
proc listen(conn: PgConnection; channel: string): Future[void] {. ...stackTrace: false, raises: [Exception, CancelledError, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Subscribe to a LISTEN channel and start the background notification pump.
proc lookupStmtCache(conn: PgConnection; sql: string): ptr CachedStmt {. ...raises: [], tags: [], forbids: [].}
- Look up a cached prepared statement by SQL text, updating LRU order on hit. Returns nil on miss. The returned pointer is valid until the next cache mutation.
proc newPgQueryError(fields: seq[ErrorField]): ref PgQueryError {....raises: [], tags: [], forbids: [].}
- Create a PgQueryError from server ErrorResponse fields.
proc nextMessage(conn: PgConnection; rowData: RowData = nil; rowCount: ptr int32 = nil): Option[BackendMessage] {. ...raises: [ProtocolError, Exception], tags: [RootEffect], forbids: [].}
- Synchronously parse the next message from the receive buffer. Returns none if the buffer doesn't contain a complete message. Notification/Notice messages are dispatched internally. DataRow messages are counted (if rowCount != nil) and consumed.
proc nextStmtName(conn: PgConnection): string {....raises: [], tags: [], forbids: [].}
- Generate the next unique prepared statement name for the statement cache.
proc onNotify(conn: PgConnection; callback: NotifyCallback) {....raises: [], tags: [], forbids: [].}
- Set a callback invoked for each incoming NOTIFY message.
proc parseDsn(dsn: string): ConnConfig {....raises: [PgError], tags: [ReadIOEffect], forbids: [].}
-
Parse a PostgreSQL connection string into a ConnConfig.
Supports two formats:
- URI: postgresql://[user[:password]@][host[:port]][/database][?param=value&...]
- keyword=value: host=localhost port=5432 dbname=test (libpq compatible)
Both postgresql:// and postgres:// schemes are accepted for URI format.
proc ping(conn: PgConnection; timeout = ZeroDuration): Future[void] {....raises: [ Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Lightweight health check using an empty simple query. Sends Query("") -> expects EmptyQueryResponse + ReadyForQuery. On timeout, the connection is marked csClosed (protocol out of sync).
proc quoteIdentifier(s: string): string {....raises: [], tags: [], forbids: [].}
- Quote a SQL identifier (e.g. table/channel name) with double quotes, escaping embedded quotes.
proc recvMessage(conn: PgConnection; timeout = ZeroDuration; rowData: RowData = nil; rowCount: ptr int32 = nil): Future[ BackendMessage] {....stackTrace: false, raises: [Exception, ValueError, ProtocolError, SslError, AsyncTimeoutError, PgConnectionError], tags: [RootEffect, TimeEffect], forbids: [].}
- Receive a single backend message from the connection. Thin wrapper around nextMessage + fillRecvBuf for backward compatibility.
proc removeStmtCache(conn: PgConnection; sql: string) {....raises: [], tags: [], forbids: [].}
- Remove a statement from the cache by its SQL text.
proc rows(qr: QueryResult): seq[Row] {....raises: [], tags: [], forbids: [].}
- Return all rows as lightweight Row views into the flat buffer.
proc sendBufMsg(conn: PgConnection): Future[void] {....stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send conn.sendBuf to the server without copying the seq. Safe because conn.state == csBusy prevents concurrent access to sendBuf.
proc sendMsg(conn: PgConnection; data: seq[byte]): Future[void] {. ...stackTrace: false, raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
- Send raw bytes to the PostgreSQL server over the connection.
proc simpleExec(conn: PgConnection; sql: string; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a SQL statement via simple query protocol, returning the command result. Lighter than exec for parameter-less commands (no Parse/Bind/Describe overhead). On timeout, the connection is marked csClosed (protocol out of sync).
proc simpleQuery(conn: PgConnection; sql: string): Future[seq[QueryResult]] {. ...stackTrace: false, raises: [Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute one or more SQL statements via simple query protocol. Returns one QueryResult per statement. Supports multiple statements separated by semicolons.
proc stopListening(conn: PgConnection): owned(Future[void]) {....stackTrace: false, raises: [Exception, CancelledError, ValueError], tags: [RootEffect], forbids: [].}
- Stop the background listen pump and return the connection to csReady.
proc unixSocketPath(host: string; port: int): string {....raises: [], tags: [], forbids: [].}
- Build the libpq-compatible Unix socket file path: {dir}/.s.PGSQL.{port}.
proc unlisten(conn: PgConnection; channel: string): Future[void] {. ...stackTrace: false, raises: [Exception, CancelledError, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Unsubscribe from a LISTEN channel. Stops the pump if no channels remain.
proc waitNotification(conn: PgConnection; timeout: Duration = ZeroDuration): Future[ Notification] {....stackTrace: false, raises: [Exception, ValueError, PgNotifyOverflowError, PgConnectionError, PgError, PgTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Wait for the next notification from the buffer. If the buffer is empty, blocks until a notification arrives or timeout expires. Raises PgNotifyOverflowError if notifications were dropped due to queue overflow. Raises PgError if the listen pump has died (e.g. reconnection failed).
Iterators
iterator items(qr: QueryResult): Row {....raises: [], tags: [], forbids: [].}
- Iterate over all rows in the query result.
Templates
template makeCopyInCallback(body: untyped): CopyInCallback
-
Create a CopyInCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.
With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.
var idx = 0 let rows = @["1\tAlice\n".toBytes(), "2\tBob\n".toBytes()] let cb = makeCopyInCallback: if idx < rows.len: let chunk = rows[idx] inc idx chunk else: newSeq[byte]()
template makeCopyOutCallback(body: untyped): CopyOutCallback
-
Create a CopyOutCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]] let cb = makeCopyOutCallback: chunks.add(data)