async_postgres/pg_connection

Search:
Group by:

Types

CachedStmt = object
  name*: string              ## Server-side statement name ("_sc_1", "_sc_2", ...)
  fields*: seq[FieldDescription] ## From Describe(Statement), formatCode=0
  resultFormats*: seq[int16] ## Cached buildResultFormats() output
  colFmts*: seq[int16]       ## Per-column format codes for RowData
  colOids*: seq[int32]       ## Per-column type OIDs for RowData
  lruNode*: DoublyLinkedNode[string] ## Embedded LRU list node
A cached prepared statement in the LRU statement cache.
ConnConfig = object
  host*: string
  port*: int
  user*: string
  password*: string
  database*: string
  sslMode*: SslMode
  sslRootCert*: string       ## PEM-encoded CA certificate(s) for sslVerifyCa/sslVerifyFull
  applicationName*: string
  connectTimeout*: Duration  ## TCP connect timeout (default: no timeout)
  keepAlive*: bool           ## Enable TCP keepalive (default true via parseDsn)
  keepAliveIdle*: int        ## Seconds before first probe (0 = OS default)
  keepAliveInterval*: int    ## Seconds between probes (0 = OS default)
  keepAliveCount*: int       ## Number of probes before giving up (0 = OS default)
  hosts*: seq[HostEntry]     ## Multiple hosts for failover (empty = use host/port)
  targetSessionAttrs*: TargetSessionAttrs ## Target server type (default tsaAny)
  extraParams*: seq[(string, string)] ## Additional startup parameters
Connection configuration. Construct via parseDsn or set fields directly.
CopyInCallback = proc (): Future[seq[byte]] {....gcsafe.}
Callback supplying data chunks during streaming COPY IN. Return empty seq to finish.
CopyInInfo = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  commandTag*: string
Metadata returned when a streaming COPY IN begins.
CopyOutCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
Callback receiving each chunk during streaming COPY OUT.
CopyOutInfo = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  commandTag*: string
Metadata returned when a streaming COPY OUT begins.
CopyResult = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  data*: seq[seq[byte]]
  commandTag*: string
Result of a buffered COPY OUT operation: all rows collected in memory.
HostEntry = object
  host*: string
  port*: int
A single host:port entry for multi-host connection.
Notice = object
  fields*: seq[ErrorField]
A notice or warning message from the server (not an error).
NoticeCallback = proc (notice: Notice) {....gcsafe, raises: [].}
Callback invoked when a notice/warning message arrives.
Notification = object
  pid*: int32
  channel*: string
  payload*: string
A NOTIFY message received from PostgreSQL.
NotifyCallback = proc (notification: Notification) {....gcsafe, raises: [].}
Callback invoked when a NOTIFY message arrives.
PgConnection = ref object
  when hasChronos:
    transport*: StreamTransport
  elif hasAsyncDispatch:
    socket*: AsyncSocket
  sslEnabled*: bool
  recvBuf*: seq[byte]
  recvBufStart*: int         ## Read pointer into recvBuf; bytes before this are consumed
  state*: PgConnState
  pid*: int32
  secretKey*: int32
  serverParams*: Table[string, string]
  txStatus*: TransactionStatus
  notifyCallback*: NotifyCallback
  noticeCallback*: NoticeCallback
  listenChannels*: HashSet[string]
  listenTask*: Future[void]
  createdAt*: Moment
  portalCounter*: int
  config*: ConnConfig
  notifyQueue*: Deque[Notification]
  notifyMaxQueue*: int
  sendBuf*: seq[byte]        ## Reusable send buffer for COPY IN batching
  notifyDropped*: int        ## Count of notifications dropped due to queue overflow
  reconnectCallback*: proc () {....gcsafe, raises: [].}
  notifyOverflowCallback*: proc (dropped: int) {....gcsafe, raises: [].}
  stmtCache*: Table[string, CachedStmt]
  stmtCounter*: int
  stmtCacheCapacity*: int    ## 0=disabled, default 256
  rowDataBuf*: RowData       ## Reusable RowData buffer to avoid per-query allocation
  hstoreOid*: int32          ## Dynamic OID for hstore extension type; 0 if not available
A single PostgreSQL connection with buffered I/O and statement caching.
PgConnectionError = object of PgError
Connection failures, disconnections, SSL/auth errors.
PgConnState = enum
  csConnecting, csAuthentication, csReady, csBusy, csListening, csReplicating,
  csClosed
Connection lifecycle state.
PgNotifyOverflowError = object of PgError
  dropped*: int              ## Number of notifications dropped due to queue overflow
PgPoolError = object of PgError
Pool exhaustion, pool closed, or acquire timeout.
PgQueryError = object of PgError
  sqlState*: string          ## 5-char SQLSTATE code (e.g. "42P01"), empty if unavailable.
  severity*: string          ## e.g. "ERROR", "FATAL"
  detail*: string            ## DETAIL field, empty if not present.
  hint*: string              ## HINT field, empty if not present.
SQL execution errors from the server (ErrorResponse).
PgTimeoutError = object of PgError
Operation timed out.
QueryResult = object
  fields*: seq[FieldDescription]
  data*: RowData
  rowCount*: int32
  commandTag*: string
Result of a query: field descriptions, row data, and command tag.
RowCallback = proc (row: Row) {....gcsafe.}
Callback invoked once per row during queryEach. The Row is only valid inside the callback — its backing buffer is reused for the next row.
SslMode = enum
  sslDisable,               ## Disable SSL (default)
  sslAllow,                 ## Try plaintext; fall back to SSL if refused
  sslPrefer,                ## Try SSL; fall back to plaintext if refused
  sslRequire,               ## Require SSL (no certificate verification)
  sslVerifyCa,              ## Require SSL + verify CA chain (no hostname verification)
  sslVerifyFull              ## Require SSL + verify CA chain and hostname
SSL/TLS negotiation mode for the connection.
TargetSessionAttrs = enum
  tsaAny,                   ## Connect to any server (default)
  tsaReadWrite,             ## Read-write server (primary)
  tsaReadOnly,              ## Read-only server (standby)
  tsaPrimary,               ## Primary server
  tsaStandby,               ## Standby server
  tsaPreferStandby           ## Prefer standby, fall back to any
Target server type for multi-host failover (libpq compatible).

Procs

proc addStmtCache(conn: PgConnection; sql: string; cached: CachedStmt) {.
    ...raises: [], tags: [], forbids: [].}
Add a prepared statement to the cache with auto-computed result formats.
proc cancel(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, OSError, ValueError, IOError, SslError, LibraryError],
    tags: [RootEffect], forbids: [].}
Send a CancelRequest over a separate connection to abort the running query.
proc cancelNoWait(conn: PgConnection) {....raises: [Exception], tags: [RootEffect],
                                        forbids: [].}
Schedule a best-effort CancelRequest without waiting. For use in timeout handlers.
proc checkReady(conn: PgConnection) {....raises: [PgConnectionError], tags: [],
                                      forbids: [].}
Assert that the connection is in csReady state. Raises PgConnectionError otherwise.
proc clearStmtCache(conn: PgConnection) {....raises: [], tags: [], forbids: [].}
Clear the client-side statement cache. Does not close server-side statements.
proc close(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, ValueError, LibraryError, SslError], tags: [RootEffect],
    forbids: [].}
Close the connection. Idempotent: safe to call multiple times.
proc columnIndex(qr: QueryResult; name: string): int {....raises: [PgTypeError],
    tags: [], forbids: [].}
Find the index of a column by name in a query result.
proc connect(config: ConnConfig): Future[PgConnection] {....raises: [Exception,
    ValueError, AsyncTimeoutError, CancelledError, PgConnectionError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Establish a new connection to a PostgreSQL server. Supports multi-host failover: tries each host in order. Respects targetSessionAttrs to select the appropriate server type. The connectTimeout wraps the entire multi-host connection attempt.
proc connect(dsn: string): Future[PgConnection] {....raises: [Exception,
    ValueError, AsyncTimeoutError, CancelledError, PgConnectionError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect, ReadIOEffect], forbids: [].}
Shorthand for connect(parseDsn(dsn)).
proc dispatchNotice(conn: PgConnection; msg: BackendMessage) {....raises: [],
    tags: [RootEffect], forbids: [].}
proc dispatchNotification(conn: PgConnection; msg: BackendMessage) {.
    ...raises: [Exception], tags: [RootEffect], forbids: [].}
proc evictStmtCache(conn: PgConnection): CachedStmt {....raises: [KeyError],
    tags: [], forbids: [].}
Evict the least recently used entry from the cache. Returns the evicted entry.
proc fillRecvBuf(conn: PgConnection; timeout: Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, SslError,
                                        AsyncTimeoutError, PgConnectionError],
            tags: [RootEffect, TimeEffect], forbids: [].}
Read data from socket into buffer. The only await point for message reception.
proc getHosts(config: ConnConfig): seq[HostEntry] {....raises: [], tags: [],
    forbids: [].}
Return the list of hosts to try. If hosts is populated, return it; otherwise synthesize a single entry from host/port.
proc initConnConfig(host = "127.0.0.1"; port = 5432; user = ""; password = "";
                    database = ""; sslMode = sslDisable; sslRootCert = "";
                    applicationName = ""; connectTimeout = ZeroDuration;
                    keepAlive = true; keepAliveIdle = 0; keepAliveInterval = 0;
                    keepAliveCount = 0; hosts: seq[HostEntry] = @[];
                    targetSessionAttrs = tsaAny;
                    extraParams: seq[(string, string)] = @[]): ConnConfig {.
    ...raises: [], tags: [], forbids: [].}
Create a connection configuration with sensible defaults. For DSN-based configuration, use parseDsn instead.
proc isUnixSocket(host: string): bool {.inline, ...raises: [], tags: [],
                                        forbids: [].}
True if host represents a Unix socket directory (starts with '/'). Compatible with libpq behavior.
proc len(qr: QueryResult): int {.inline, ...raises: [], tags: [], forbids: [].}
Return the number of rows in the query result.
proc listen(conn: PgConnection; channel: string): Future[void] {.
    ...stackTrace: false, raises: [Exception, CancelledError, ValueError,
                                PgConnectionError, SslError, ProtocolError,
                                PgQueryError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Subscribe to a LISTEN channel and start the background notification pump.
proc lookupStmtCache(conn: PgConnection; sql: string): ptr CachedStmt {.
    ...raises: [], tags: [], forbids: [].}
Look up a cached prepared statement by SQL text, updating LRU order on hit. Returns nil on miss. The returned pointer is valid until the next cache mutation.
proc newPgQueryError(fields: seq[ErrorField]): ref PgQueryError {....raises: [],
    tags: [], forbids: [].}
Create a PgQueryError from server ErrorResponse fields.
proc nextMessage(conn: PgConnection; rowData: RowData = nil;
                 rowCount: ptr int32 = nil): Option[BackendMessage] {.
    ...raises: [ProtocolError, Exception], tags: [RootEffect], forbids: [].}
Synchronously parse the next message from the receive buffer. Returns none if the buffer doesn't contain a complete message. Notification/Notice messages are dispatched internally. DataRow messages are counted (if rowCount != nil) and consumed.
proc nextStmtName(conn: PgConnection): string {....raises: [], tags: [],
    forbids: [].}
Generate the next unique prepared statement name for the statement cache.
proc onNotify(conn: PgConnection; callback: NotifyCallback) {....raises: [],
    tags: [], forbids: [].}
Set a callback invoked for each incoming NOTIFY message.
proc parseDsn(dsn: string): ConnConfig {....raises: [PgError],
    tags: [ReadIOEffect], forbids: [].}

Parse a PostgreSQL connection string into a ConnConfig.

Supports two formats:

  • URI: postgresql://[user[:password]@][host[:port]][/database][?param=value&...]
  • keyword=value: host=localhost port=5432 dbname=test (libpq compatible)

Both postgresql:// and postgres:// schemes are accepted for URI format.

proc ping(conn: PgConnection; timeout = ZeroDuration): Future[void] {....raises: [
    Exception, ValueError, PgQueryError, PgConnectionError, SslError,
    ProtocolError, PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Lightweight health check using an empty simple query. Sends Query("") -> expects EmptyQueryResponse + ReadyForQuery. On timeout, the connection is marked csClosed (protocol out of sync).
proc quoteIdentifier(s: string): string {....raises: [], tags: [], forbids: [].}
Quote a SQL identifier (e.g. table/channel name) with double quotes, escaping embedded quotes.
proc recvMessage(conn: PgConnection; timeout = ZeroDuration;
                 rowData: RowData = nil; rowCount: ptr int32 = nil): Future[
    BackendMessage] {....stackTrace: false, raises: [Exception, ValueError,
    ProtocolError, SslError, AsyncTimeoutError, PgConnectionError],
                      tags: [RootEffect, TimeEffect], forbids: [].}
Receive a single backend message from the connection. Thin wrapper around nextMessage + fillRecvBuf for backward compatibility.
proc removeStmtCache(conn: PgConnection; sql: string) {....raises: [], tags: [],
    forbids: [].}
Remove a statement from the cache by its SQL text.
proc rows(qr: QueryResult): seq[Row] {....raises: [], tags: [], forbids: [].}
Return all rows as lightweight Row views into the flat buffer.
proc sendBufMsg(conn: PgConnection): Future[void] {....stackTrace: false,
    raises: [Exception, SslError, ValueError], tags: [RootEffect], forbids: [].}
Send conn.sendBuf to the server without copying the seq. Safe because conn.state == csBusy prevents concurrent access to sendBuf.
proc sendMsg(conn: PgConnection; data: seq[byte]): Future[void] {.
    ...stackTrace: false, raises: [Exception, SslError, ValueError],
    tags: [RootEffect], forbids: [].}
Send raw bytes to the PostgreSQL server over the connection.
proc simpleExec(conn: PgConnection; sql: string;
                timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a SQL statement via simple query protocol, returning the command result. Lighter than exec for parameter-less commands (no Parse/Bind/Describe overhead). On timeout, the connection is marked csClosed (protocol out of sync).
proc simpleQuery(conn: PgConnection; sql: string): Future[seq[QueryResult]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgConnectionError,
                                SslError, ProtocolError, PgQueryError,
                                AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute one or more SQL statements via simple query protocol. Returns one QueryResult per statement. Supports multiple statements separated by semicolons.
proc stopListening(conn: PgConnection): owned(Future[void]) {....stackTrace: false,
    raises: [Exception, CancelledError, ValueError], tags: [RootEffect],
    forbids: [].}
Stop the background listen pump and return the connection to csReady.
proc unixSocketPath(host: string; port: int): string {....raises: [], tags: [],
    forbids: [].}
Build the libpq-compatible Unix socket file path: {dir}/.s.PGSQL.{port}.
proc unlisten(conn: PgConnection; channel: string): Future[void] {.
    ...stackTrace: false, raises: [Exception, CancelledError, ValueError,
                                PgConnectionError, SslError, ProtocolError,
                                PgQueryError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Unsubscribe from a LISTEN channel. Stops the pump if no channels remain.
proc waitNotification(conn: PgConnection; timeout: Duration = ZeroDuration): Future[
    Notification] {....stackTrace: false, raises: [Exception, ValueError,
    PgNotifyOverflowError, PgConnectionError, PgError, PgTimeoutError],
                    tags: [RootEffect, TimeEffect], forbids: [].}
Wait for the next notification from the buffer. If the buffer is empty, blocks until a notification arrives or timeout expires. Raises PgNotifyOverflowError if notifications were dropped due to queue overflow. Raises PgError if the listen pump has died (e.g. reconnection failed).

Iterators

iterator items(qr: QueryResult): Row {....raises: [], tags: [], forbids: [].}
Iterate over all rows in the query result.

Templates

template makeCopyInCallback(body: untyped): CopyInCallback

Create a CopyInCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.

With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.

var idx = 0
let rows = @["1\tAlice\n".toBytes(), "2\tBob\n".toBytes()]
let cb = makeCopyInCallback:
  if idx < rows.len:
    let chunk = rows[idx]
    inc idx
    chunk
  else:
    newSeq[byte]()
template makeCopyOutCallback(body: untyped): CopyOutCallback
Create a CopyOutCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]]
let cb = makeCopyOutCallback:
  chunks.add(data)