async_postgres/pg_connection/types

Search:
Group by:

Internal building blocks shared by every pg_connection/ submodule. Contains the PgConnection ref type, ConnConfig, tracing data types, public read-only/read-write accessors, internal accessors for cross-module use within the library, and the tracing helper templates.

Re-exported through pg_connection.nim; submodules import this module directly.

Types

AuthMethod = enum
  amNone,                   ## AuthenticationOk with no challenge (trust/peer/ident)
  amPassword,               ## cleartext password (libpq: "password")
  amMd5,                    ## MD5 challenge (libpq: "md5")
  amScramSha256,            ## SASL SCRAM-SHA-256 (libpq: "scram-sha-256")
  amScramSha256Plus          ## SASL SCRAM-SHA-256-PLUS (libpq: "scram-sha-256-plus")
Individual authentication methods for ConnConfig.requireAuth allowlisting (libpq require_auth parity).
CachedStmt = ref object
  name*: string              ## Server-side statement name ("_sc_1", "_sc_2", ...)
  fields*: seq[FieldDescription] ## From Describe(Statement), formatCode=0
  paramOids*: seq[int32] ## Input parameter type OIDs from ParameterDescription. Used to detect
                         ## type-mismatch on cache hit: if a later call binds the same SQL with
                         ## different parameter OIDs, the server would interpret the bytes using
                         ## the original parse-time types, silently corrupting results. The
                         ## cache-hit path checks these against the caller's OIDs and falls back
                         ## to a re-parse when they diverge. Empty for parameter-less SQL —
                         ## an empty-vs-empty comparison matches trivially and the cache entry
                         ## is reused.
  resultFormats*: seq[int16] ## Cached buildResultFormats() output
  colFmts*: seq[int16]       ## Per-column format codes for RowData
  colOids*: seq[int32]       ## Per-column type OIDs for RowData
  lruNode*: DoublyLinkedNode[string] ## Embedded LRU list node
A cached prepared statement in the LRU statement cache.
ChannelBindingMode = enum
  cbPrefer,                 ## Use SCRAM-SHA-256-PLUS when SSL and server support it (default).
  cbDisable,                ## Never use SCRAM-SHA-256-PLUS; only SCRAM-SHA-256.
  cbRequire                  ## Require SCRAM-SHA-256-PLUS; fail if unavailable.
SCRAM channel binding policy (libpq-compatible).
CleanupKind = enum
  ckTxRollback,             ## Outer `ROLLBACK` (withTransaction / withTransactionDeadline)
  ckSavepointRollback        ## `ROLLBACK TO SAVEPOINT` (withSavepoint / withSavepointDeadline)
Which automatic cleanup operation was skipped or whose failure was swallowed. Reported through onCleanupSkipped so operators can distinguish outer-transaction cleanup from savepoint cleanup.
CleanupSkipReason = enum
  csrConnInvalidated, ## The connection was already `csClosed` (typically because a per-call
                       ## timeout invalidated it via `invalidateOnTimeout`). The cleanup SQL
                       ## was *never dispatched* — `err` on the event is nil.
  csrCleanupFailed ## The cleanup SQL was dispatched but raised. The failure was
                   ## swallowed so it cannot mask the original body/COMMIT error;
                   ## `err` carries the cleanup failure.
Why an automatic cleanup operation did not run to completion.
ConnConfig = object
  host*: string
  port*: int
  user*: string
  password*: string
  database*: string
  sslMode*: SslMode
  sslRootCert*: string       ## PEM-encoded CA certificate(s) for sslVerifyCa/sslVerifyFull
  channelBinding*: ChannelBindingMode ## SCRAM channel binding policy (default cbPrefer). `cbRequire` fails the
                                      ## connection if SCRAM-SHA-256-PLUS cannot actually be used (libpq parity).
  requireAuth*: set[AuthMethod] ## Allowlist of auth methods the client will accept. An empty set
                                ## (default) means "allow any" — matching libpq when `require_auth` is
                                ## unset. If the server requests a method outside this set, connect
                                ## fails with `PgConnectionError`. For SASL, advertised mechanisms are
                                ## filtered and the selected mechanism is validated.
                                ## 
                                ## Note: libpq's `!`-prefix negation syntax (e.g. `!password`) is not
                                ## yet supported by `parseRequireAuth` — specify the allowed methods
                                ## positively instead.
  applicationName*: string
  connectTimeout*: Duration  ## TCP connect timeout (default: no timeout)
  keepAlive*: bool           ## Enable TCP keepalive (default true via parseDsn)
  keepAliveIdle*: int        ## Seconds before first probe (0 = OS default)
  keepAliveInterval*: int    ## Seconds between probes (0 = OS default)
  keepAliveCount*: int       ## Number of probes before giving up (0 = OS default)
  hosts*: seq[HostEntry]     ## Multiple hosts for failover (empty = use host/port)
  targetSessionAttrs*: TargetSessionAttrs ## Target server type (default tsaAny)
  extraParams*: seq[(string, string)] ## Additional startup parameters
  maxMessageSize*: int ## Upper bound (in bytes) on a single backend message including
                       ## its 1-byte type and 4-byte length header. A server claiming a
                       ## larger message is rejected with `ProtocolError` before any
                       ## further recv-buffer growth, capping memory exposure to a
                       ## misbehaving or malicious peer. ``0`` (default) selects
                       ## `DefaultMaxBackendMessageLen` (1 GiB).
  tracer*: PgTracer          ## Optional tracer for connection-level hooks
Connection configuration. Construct via parseDsn or set fields directly.
CopyInCallback = proc (): Future[seq[byte]] {....gcsafe.}
Callback supplying data chunks during streaming COPY IN. Return empty seq to finish.
CopyInInfo = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  commandTag*: string
Metadata returned when a streaming COPY IN begins.
CopyOutCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
Callback receiving each chunk during streaming COPY OUT.
CopyOutInfo = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  commandTag*: string
Metadata returned when a streaming COPY OUT begins.
CopyResult = object
  format*: CopyFormat
  columnFormats*: seq[int16]
  data*: seq[seq[byte]]
  commandTag*: string
Result of a buffered COPY OUT operation: all rows collected in memory.
HostEntry = object
  host*: string
  port*: int
A single host:port entry for multi-host connection.
Notice = object
  fields*: seq[ErrorField]
A notice or warning message from the server (not an error).
NoticeCallback = proc (notice: Notice) {....gcsafe, raises: [].}
Callback invoked when a notice/warning message arrives.
Notification = object
  pid*: int32
  channel*: string
  payload*: string
A NOTIFY message received from PostgreSQL.
NotifyCallback = proc (notification: Notification) {....gcsafe, raises: [].}
Callback invoked when a NOTIFY message arrives.
PgConnection = ref object
  when hasChronos:
    transport*: StreamTransport
    baseReader*: AsyncStreamReader
    baseWriter*: AsyncStreamWriter
    reader*: AsyncStreamReader
    writer*: AsyncStreamWriter
    tlsStream*: TLSAsyncStream
    trustAnchorBufs*: seq[seq[byte]] ## Backing memory for custom trust anchor pointers
    x509Capture*: X509CertCaptureContext ## X509 wrapper for cert capture
  elif hasAsyncDispatch:
    socket*: AsyncSocket
  serverCertDer*: seq[byte]  ## DER-encoded server certificate for SCRAM channel binding
  sslEnabled*: bool
  recvBuf*: seq[byte]
  recvBufStart*: int         ## Read pointer into recvBuf; bytes before this are consumed
  state*: PgConnState
  pid*: int32
  secretKey*: int32
  serverParams*: Table[string, string]
  txStatus*: TransactionStatus
  notifyCallback*: NotifyCallback
  noticeCallback*: NoticeCallback
  listenChannels*: HashSet[string]
  listenTask*: Future[void]
  host*: string
  port*: int
  createdAt*: Moment
  portalCounter*: int
  config*: ConnConfig
  notifyQueue*: Deque[Notification]
  notifyMaxQueue*: int
  notifyWaiter*: Future[void]
  sendBuf*: seq[byte]        ## Reusable send buffer for COPY IN batching
  notifyDropped*: int        ## Count of notifications dropped due to queue overflow
  listenError*: ref PgListenError ## Set when listen pump fails permanently
  listenReconnectMaxAttempts*: int ## Max reconnect attempts on listen pump failure. Default 10.
                                   ## 0 or negative = unlimited retries (retry until close()).
  listenReconnectMaxBackoff*: int ## Max seconds between reconnect attempts (backoff cap). Default 30.
  reconnectCallback*: proc () {....gcsafe, raises: [].}
  notifyOverflowCallback*: proc (dropped: int) {....gcsafe, raises: [].}
  listenErrorCallback*: proc (err: ref PgListenError) {....gcsafe, raises: [].} ## Invoked when the listen pump dies permanently (reconnection failed or
                                                                             ## the connection was lost with nothing left to re-subscribe). Lets push
                                                                             ## API (`onNotify`) users learn the pump is gone — the pull API surfaces
                                                                             ## the same failure through `waitNotification`.
  stmtCache*: Table[string, CachedStmt]
  stmtCacheLru*: DoublyLinkedList[string] ## LRU order: oldest at head, newest at tail
  stmtCounter*: int
  stmtCacheCapacity*: int    ## 0=disabled, default 256
  pendingStmtCloses*: seq[string] ## Server-side prepared statement names whose ``Close`` was not bundled
                                  ## with the operation that evicted them. Populated when the defensive
                                  ## eviction loop in ``addStmtCache`` fires (caller skipped the
                                  ## pre-eviction step, or ``stmtCacheCapacity`` was shrunk below the
                                  ## current cache size). Flushed by ``flushPendingStmtCloses`` at the
                                  ## start of the next Extended Query send phase so the leak is bounded
                                  ## to the gap until the next operation.
  heldSessionLocks*: int ## Count of session-level `pg_advisory_lock` acquires through the typed
                         ## API. The pool releases or discards connections with a non-zero count
                         ## so that locks never leak to subsequent borrowers. Raw-SQL acquires
                         ## (`conn.exec("SELECT pg_advisory_lock(...)")`) bypass this counter.
  tracer*: PgTracer          ## Inherited from ConnConfig on connect
  ownerPool*: PgPoolOwner ## Owning pool back-reference. Set when this connection is managed by
                          ## a `PgPool` (or a pool inside `PgPoolCluster`); `nil` for standalone
                          ## connections created via `connect`. Used by `release(conn)` to route
                          ## the connection back to the correct pool.
A single PostgreSQL connection with buffered I/O and statement caching.
PgConnState = enum
  csConnecting, csAuthentication, csReady, csBusy, csListening, csReplicating,
  csClosed
Connection lifecycle state.
PgPoolOwner = ref object of RootObj
Opaque base for pool-ownership back-references on PgConnection. The concrete type is PgPool (defined in pg_pool); this base lives here to avoid a circular import. Consumers should not subclass this.
PgTracer = ref object
  onConnectStart*: proc (data: TraceConnectStartData): TraceContext {....gcsafe,
      raises: [].}
  onConnectEnd*: proc (ctx: TraceContext; data: TraceConnectEndData) {....gcsafe,
      raises: [].}
  onQueryStart*: proc (conn: PgConnection; data: TraceQueryStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onQueryEnd*: proc (ctx: TraceContext; conn: PgConnection;
                     data: TraceQueryEndData) {....gcsafe, raises: [].}
  onPrepareStart*: proc (conn: PgConnection; data: TracePrepareStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onPrepareEnd*: proc (ctx: TraceContext; conn: PgConnection;
                       data: TracePrepareEndData) {....gcsafe, raises: [].}
  onPipelineStart*: proc (conn: PgConnection; data: TracePipelineStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onPipelineEnd*: proc (ctx: TraceContext; conn: PgConnection;
                        data: TracePipelineEndData) {....gcsafe, raises: [].}
  onCopyStart*: proc (conn: PgConnection; data: TraceCopyStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onCopyEnd*: proc (ctx: TraceContext; conn: PgConnection;
                    data: TraceCopyEndData) {....gcsafe, raises: [].}
  onPoolAcquireStart*: proc (data: TracePoolAcquireStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onPoolAcquireEnd*: proc (ctx: TraceContext; data: TracePoolAcquireEndData) {.
      ...gcsafe, raises: [].}
  onPoolReleaseStart*: proc (data: TracePoolReleaseStartData): TraceContext {.
      ...gcsafe, raises: [].}
  onPoolReleaseEnd*: proc (ctx: TraceContext; data: TracePoolReleaseEndData) {.
      ...gcsafe, raises: [].}
  onPoolCloseError*: proc (data: TracePoolCloseErrorData) {....gcsafe, raises: [].}
  onTransportCloseError*: proc (data: TraceTransportCloseErrorData) {....gcsafe,
      raises: [].} ## Fires when a transport ``closeWait()`` raises during teardown.
                   ## Advisory only — ``closeTransport`` continues releasing the remaining
                   ## resources regardless. Use this to surface half-closed TLS sessions
                   ## or peer RSTs that would otherwise be invisible.
  onLeakedSessionLocks*: proc (data: TraceLeakedSessionLocksData) {....gcsafe,
      raises: [].} ## Fires when a pool connection returns holding session-level advisory
                   ## locks acquired through the typed API. Advisory only — the pool
                   ## handles cleanup as described in `TraceLeakedSessionLocksData`. Use
                   ## this to surface missing ``advisoryUnlock`` calls at the borrow site.
  onCleanupSkipped*: proc (data: TraceCleanupSkippedData) {....gcsafe, raises: [].} ## Fires from `withTransaction*` / `withSavepoint*` error paths when
                                                                                 ## an automatic ROLLBACK is either skipped (connection already
                                                                                 ## `csClosed`, e.g. after a per-call timeout) or attempted but failed
                                                                                 ## (failure swallowed to keep the original error). Advisory only —
                                                                                 ## the macro's behaviour is unchanged. Use this to close the
                                                                                 ## diagnostic gap between the timeout path (silent) and the body-
                                                                                 ## error path (visible ROLLBACK simpleExec event).
                                                                                 ## 
                                                                                 ## **Nested macros may fire this hook more than once per failure.**
                                                                                 ## When `withSavepoint*` is nested inside `withTransaction*` and the
                                                                                 ## connection becomes `csClosed`, the savepoint's error handler
                                                                                 ## fires `ckSavepointRollback` first, then the original exception
                                                                                 ## propagates to the outer transaction's handler which sees the same
                                                                                 ## `csClosed` state and fires `ckTxRollback`. Both events refer to
                                                                                 ## the same underlying cause; observers that aggregate by failure
                                                                                 ## (not by cleanup attempt) should dedupe.
  onInsecureAuth*: proc (data: TraceInsecureAuthData) {....gcsafe, raises: [].} ## Fires when an auth method is used over an insecure transport
                                                                             ## (currently: cleartext password without SSL). Advisory only; does
                                                                             ## not abort the connection. Use `ConnConfig.requireAuth` to enforce.
  onDeprecatedAuth*: proc (data: TraceDeprecatedAuthData) {....gcsafe, raises: [].} ## Fires when a server-requested auth method is cryptographically
                                                                                 ## weak / deprecated regardless of transport (currently: MD5).
                                                                                 ## Advisory only; does not abort the connection. Use
                                                                                 ## `ConnConfig.requireAuth` to enforce.

Tracing hooks for async-postgres operations. Set only the callbacks you need; nil callbacks are skipped with zero overhead.

Start hooks return a TraceContext (opaque pointer) that is passed to the corresponding End hook for correlation (e.g. timing, span linking). Return nil from Start if you don't need correlation.

QueryResult = object
  fields*: seq[FieldDescription]
  data*: RowData
  rowCount*: int32
  commandTag*: string
Result of a query: field descriptions, row data, and command tag.
RowCallback = proc (row: Row) {....gcsafe.}
Callback invoked once per row during queryEach. The Row is only valid inside the callback — its backing buffer is reused for the next row.
SslMode = enum
  sslDisable,               ## Disable SSL (default)
  sslAllow,                 ## Try plaintext; fall back to SSL if refused
  sslPrefer,                ## Try SSL; fall back to plaintext if refused
  sslRequire,               ## Require SSL (no certificate verification)
  sslVerifyCa,              ## Require SSL + verify CA chain (no hostname verification)
  sslVerifyFull              ## Require SSL + verify CA chain and hostname
SSL/TLS negotiation mode for the connection.
TargetSessionAttrs = enum
  tsaAny,                   ## Connect to any server (default)
  tsaReadWrite,             ## Read-write server (primary)
  tsaReadOnly,              ## Read-only server (standby)
  tsaPrimary,               ## Primary server
  tsaStandby,               ## Standby server
  tsaPreferStandby           ## Prefer standby, fall back to any
Target server type for multi-host failover (libpq compatible).
TraceCleanupSkippedData = object
  conn*: PgConnection
  kind*: CleanupKind
  reason*: CleanupSkipReason
  err*: ref CatchableError ## Cleanup-SQL failure when `reason == csrCleanupFailed`; nil when
                           ## `reason == csrConnInvalidated` (nothing was dispatched).
Advisory notification fired from withTransaction* / withSavepoint* error-cleanup paths when ROLLBACK is either skipped (connection already invalidated) or attempted but failed (failure swallowed to preserve the original error). Useful for surfacing the diagnostic asymmetry between the timeout path (silent skip) and the body-error path (visible ROLLBACK simpleExec event).
TraceConnectEndData = object
  conn*: PgConnection
  err*: ref CatchableError
Data passed to the connect end hook.
TraceConnectStartData = object
  hosts*: seq[HostEntry]
Data passed to the connect start hook.
TraceContext = RootRef
Opaque correlation token returned by trace Start hooks and passed to End hooks. Users subtype RootObj (e.g. type Span = ref object of RootObj) and return it from Start hooks; End hooks downcast via Span(ctx).
TraceCopyDirection = enum
  tcdIn, tcdOut
TraceCopyEndData = object
  commandTag*: string
  err*: ref CatchableError
Data passed to the copy end hook.
TraceCopyStartData = object
  sql*: string
  direction*: TraceCopyDirection
Data passed to the copy start hook.
TraceDeprecatedAuthData = object
  conn*: PgConnection
  authMethod*: AuthMethod    ## The method the server requested
Advisory notification that a server-requested auth method is considered cryptographically weak / deprecated regardless of transport. Currently fires for MD5 (PostgreSQL recommends SCRAM-SHA-256 since v10). The connection is NOT aborted — use ConnConfig.requireAuth for actual enforcement.
TraceInsecureAuthData = object
  conn*: PgConnection
  authMethod*: AuthMethod    ## The method the server requested
  sslEnabled*: bool          ## Transport state at the time of the auth step
Advisory notification that a server-requested auth method is considered insecure in the current transport context. Currently fires for cleartext password over a non-SSL connection. The connection is NOT aborted — use ConnConfig.requireAuth for actual enforcement.
TraceLeakedSessionLocksData = object
  conn*: PgConnection
  count*: int                ## Value of ``heldSessionLocks`` at detection time
Advisory notification that a pool connection returned while still holding session-level advisory locks acquired through the typed API. The pool handles cleanup itself — either running pg_advisory_unlock_all from resetSession and reusing the connection, or discarding it on release when resetSession was bypassed. Use this hook to detect missing advisoryUnlock / advisoryUnlockAll calls at the borrow site, since silent cleanup would otherwise mask the leak.
TracePipelineEndData = object
  err*: ref CatchableError
Data passed to the pipeline end hook.
TracePipelineStartData = object
  opCount*: int
Data passed to the pipeline start hook.
TracePoolAcquireEndData = object
  conn*: PgConnection
  err*: ref CatchableError
  wasCreated*: bool          ## true if a new connection was created
Data passed to the pool acquire end hook.
TracePoolAcquireStartData = object
  idleCount*: int
  activeCount*: int
  maxSize*: int
Data passed to the pool acquire start hook.
TracePoolCloseErrorData = object
  conn*: PgConnection
  err*: ref CatchableError
Data passed to the pool close-error hook. Fired when a pool-initiated conn.close() raises — these errors are otherwise swallowed because close runs from non-async cleanup paths and fire-and-forget tasks, making leaks hard to observe without tracing.
TracePoolReleaseEndData = object
  wasClosed*: bool           ## true if connection was closed instead of returned to pool
  handedToWaiter*: bool      ## true if connection was given directly to a waiting acquirer
Data passed to the pool release end hook.
TracePoolReleaseStartData = object
  conn*: PgConnection
Data passed to the pool release start hook.
TracePrepareEndData = object
  err*: ref CatchableError
Data passed to the prepare end hook.
TracePrepareStartData = object
  name*: string
  sql*: string
Data passed to the prepare start hook.
TraceQueryEndData = object
  commandTag*: string
  rowCount*: int64
  err*: ref CatchableError
Data passed to the query/exec end hook.
TraceQueryStartData = object
  sql*: string
  params*: seq[PgParam] ## Populated when the caller used a `seq[PgParam]` overload. Mutually
                        ## exclusive with `paramsInline`: exactly one of the two is non-empty
                        ## per call (or both are empty if the query has no bound parameters).
  paramsInline*: seq[PgParamInline] ## Populated when the caller used a `PgParamInline` overload. Mutually
                                    ## exclusive with `params` (see above). Tracers that want a single view
                                    ## should branch on whichever field is non-empty.
  isExec*: bool              ## true for exec, false for query
Data passed to the query/exec start hook.
TraceTransportCloseErrorData = object
  conn*: PgConnection
  stage*: TransportCloseStage
  err*: ref CatchableError
Data passed to the transport close-error hook. Fired when a chronos closeWait() call raises while closeTransport is releasing connection resources. These errors are otherwise swallowed because teardown must release every transport resource regardless of individual failures, leaving operators with no signal for half-closed TLS sessions, BearSSL close_notify mismatches, or peer RSTs.
TransportCloseStage = enum
  tcsTlsReader, tcsTlsWriter, tcsBaseReader, tcsBaseWriter, tcsTransport
Which transport resource raised during connection teardown.

Vars

TCP_KEEPCNT {.importc, header: "<netinet/tcp.h>".}: cint
TCP_KEEPIDLE {.importc, header: "<netinet/tcp.h>".}: cint
TCP_KEEPINTVL {.importc, header: "<netinet/tcp.h>".}: cint

Consts

RecvBufSize = 131072
Size of the temporary read buffer for recv operations

Procs

func effectiveMaxMessageSize(conn: PgConnection): int {.inline, ...raises: [],
    tags: [], forbids: [].}
Effective per-message recv cap for this connection. Resolves the ConnConfig.maxMessageSize default (0) to DefaultMaxBackendMessageLen.
proc fireCleanupSkipped(conn: PgConnection; kind: CleanupKind;
                        reason: CleanupSkipReason; err: ref CatchableError = nil) {.
    ...raises: [], tags: [RootEffect], forbids: [].}
Route a withTransaction* / withSavepoint* ROLLBACK skip-or-swallow event to the tracer. Reads from conn.config.tracer so events fire regardless of the runtime conn.tracer alias. Nil hook is a no-op.
proc fireDeprecatedAuth(conn: PgConnection; authMethod: AuthMethod) {.
    ...raises: [], tags: [RootEffect], forbids: [].}
proc fireInsecureAuth(conn: PgConnection; authMethod: AuthMethod) {....raises: [],
    tags: [RootEffect], forbids: [].}
proc newPgQueryError(fields: seq[ErrorField]): ref PgQueryError {....raises: [],
    tags: [], forbids: [].}
Create a PgQueryError from server ErrorResponse fields.

Templates

template withConnTracing(conn: PgConnection; startHook, endHook: untyped;
                         startData: typed; EndDataType: typedesc;
                         endDataExpr: typed; body: untyped)
Wrap an operation with connection-scoped tracing hooks.
template withTracing(tracer: PgTracer; startHook, endHook: untyped;
                     startData: typed; EndDataType: typedesc;
                     endDataExpr: typed; body: untyped)
Wrap an operation with non-connection tracing hooks (connect, pool).