async_postgres/pg_client/core

Search:
Group by:

Internal building blocks shared by every pg_client/ submodule.

Contains types/constants for transaction options, the inline-parameter encoder, and the receive-loop templates that the extended-query path (exec, query, queryEach, queryDirect, …) reuses. Re-exported through pg_client.nim; submodules import this module directly via ./core.

Types

AccessMode = enum
  amDefault, amReadWrite, amReadOnly
PostgreSQL transaction access mode (read-write or read-only).
DeferrableMode = enum
  dmDefault, dmDeferrable, dmNotDeferrable
PostgreSQL transaction deferrable mode (for serializable read-only transactions).
IsolationLevel = enum
  ilDefault, ilReadCommitted, ilRepeatableRead, ilSerializable,
  ilReadUncommitted
PostgreSQL transaction isolation level.
RetryOptions = object
  maxAttempts*: int = 3 ## Total attempts including the first. Values ``<= 1`` run the body exactly
                        ## once with no retry (the body always runs at least once).
  baseDelayMs*: int = 20     ## Backoff before the first retry, in milliseconds.
  maxDelayMs*: int = 1000    ## Upper bound on the backoff delay, in milliseconds.
  multiplier*: float = 2.0   ## Exponential growth factor between attempts.
  jitter*: bool = true ## Full jitter: pick a random delay in ``0 .. computed``. Uses the
                       ## `std/random` global RNG; to de-correlate retries *across processes*
                       ## the application must call ``randomize()`` once at startup — otherwise
                       ## every process replays the same (default-seeded) jitter sequence.
  retryableStates*: seq[string] = ["40001", "40P01"] ## SQLSTATE codes that trigger a retry. Defaults to serialization_failure
                                                     ## (40001) and deadlock_detected (40P01) — the transaction-level conflicts
                                                     ## PostgreSQL recommends resolving by re-running the whole transaction.
Controls how withTransactionRetry re-runs a transaction after a retryable failure. Relies on Nim object field defaults, so partial construction (e.g. RetryOptions(maxAttempts: 5)) leaves the unset fields at their defaults below.
TransactionOptions = object
  isolation*: IsolationLevel
  access*: AccessMode
  deferrable*: DeferrableMode
Options for BEGIN: isolation level, access mode, and deferrable mode.

Consts

copyBatchSize = 262144
256KB batch threshold for COPY IN buffering
StmtCacheInvalidatingStates = ["26000", "0A000"]
SQLSTATEs that mean a cache-hit prepared statement can no longer be reused as cached and must be evicted so the next call re-parses it:
  • 26000 invalid_sql_statement_name — the server no longer has the prepared statement (e.g. DISCARD ALL / DEALLOCATE ran on the session, or a pooled backend was reset). Re-parse recreates it.
  • 0A000 feature_not_supported — chiefly "cached plan must not change result type": DDL altered the statement's result columns, so the server rejects the cached (fixed-result) plan on Execute. Because the cache-hit path skips Describe, only a re-parse picks up the new schema; without eviction every subsequent hit would re-raise 0A000 forever. Other 0A000 causes simply re-parse once more (the error still propagates).

42P18 (indeterminate_datatype) is intentionally absent: it is a Parse-phase error and cannot arise on a cache hit (no Parse is sent), and a cache miss that fails to Parse never reaches addStmtCache.

Procs

proc backoffDelayMs(opts: RetryOptions; attempt: int): int {....raises: [],
    tags: [], forbids: [].}
Backoff (milliseconds) to wait after the attempt-th failure (1-based). Exponential baseDelayMs * multiplier^(attempt-1) capped at maxDelayMs; with jitter the result is randomized within 0 .. computed to spread out retries from many contending clients. Jitter draws from the std/random global RNG — see RetryOptions.jitter on calling randomize() for cross-process de-correlation.
proc buildBeginSql(opts: TransactionOptions): string {....raises: [], tags: [],
    forbids: [].}
Build a BEGIN SQL statement with the specified transaction options (isolation level, access mode, deferrable mode).
proc extractParams(params: openArray[PgParam]): tuple[oids: seq[int32],
    formats: seq[int16], values: seq[Option[seq[byte]]]] {....raises: [], tags: [],
    forbids: [].}
proc flattenInline(params: openArray[PgParamInline]): tuple[data: seq[byte],
    ranges: seq[tuple[off: int32, len: int32]], oids: seq[int32],
    formats: seq[int16]] {....raises: [], tags: [], forbids: [].}
proc invalidateIfOidMismatch(conn: PgConnection; sql: string;
                             cached: CachedStmt; currentOids: openArray[int32];
                             cacheHit: var bool) {....raises: [], tags: [],
    forbids: [].}

If the caller is about to bind currentOids to a cached prepared statement whose parse-time OIDs do not match, evict the cache entry (queue the server-side Close via pendingStmtCloses, remove the local entry) and set cacheHit to false so the caller's cache-miss path runs and re-parses under the new OIDs.

No-op when cacheHit is already false — cached is only dereferenced under the cacheHit guard, so passing nil is safe as long as cacheHit is false.

proc invalidateIfOidMismatch(conn: PgConnection; sql: string;
                             cached: CachedStmt; params: openArray[PgParam];
                             cacheHit: var bool) {....raises: [], tags: [],
    forbids: [].}
PgParam overload for the query/exec call paths. Avoids the seq[int32] allocation a separate OID-projection step would require — the per-parameter .oid reads happen inside paramOidsMatch.
proc isRetryableTxError(e: ref CatchableError; states: openArray[string]): bool {.
    ...raises: [], tags: [], forbids: [].}
Whether e is a PgQueryError whose SQLSTATE is in states. Non-PgQueryError failures (connection drops, timeouts) are never retryable here: they leave the connection unusable for a fresh attempt.
proc paramOidsMatch(cachedOids, currentOids: openArray[int32]): bool {.
    ...raises: [], tags: [], forbids: [].}

Whether a cached prepared statement's parse-time parameter OIDs are compatible with the OIDs a caller wants to bind now.

OID 0 (unknown) on either side is treated as a wildcard: the server inferred or will infer the type, so we cannot pre-judge a mismatch. A length mismatch is treated as incompatible.

Empty-vs-empty (parameter-less SQL) matches trivially: the loop body does not execute and the length check passes.

Callers use a false result to invalidate the cache entry and re-parse the statement with the new OIDs, preventing the server from interpreting bind payloads under the statement's original (and now wrong) parse-time type assumptions.

proc paramOidsMatch(cachedOids: openArray[int32]; params: openArray[PgParam]): bool {.
    ...raises: [], tags: [], forbids: [].}
PgParam overload that reads each parameter's oid field directly, avoiding a temporary seq[int32] projection on the query/exec cache-hit path. Semantics match the openArray[int32] overload.
func toFormatCodes(rf: ResultFormat): seq[int16] {....raises: [], tags: [],
    forbids: [].}
Convert a high-level ResultFormat to wire-protocol format codes.

Templates

template appendInlineParam(data: var seq[byte];
                           ranges: var seq[tuple[off: int32, len: int32]];
                           oids: var seq[int32]; formats: var seq[int16];
                           p: PgParamInline)
Shared encoder for a single PgParamInline into SoA buffers. Used by both flattenInline (per-call temporaries) and Pipeline.appendInline (pipeline-wide SoA). Keeping the NULL / empty / inline / overflow branching in one place means wire-format semantics cannot drift between the two code paths.
template execRecvLoop(conn: PgConnection; sql: string;
                      cacheHit, cacheMiss: bool; stmtName: string;
                      commandTag: var string; timeout: Duration = ZeroDuration)
Receive-loop counterpart of queryRecvLoop for the extended-query exec path: discards DataRows (exec callers don't need rows) and exposes only the CommandComplete tag via the commandTag out-parameter. Shared by execImpl (both overloads), execInlineImpl, and execDirectRunImpl.
template queryEachRecvLoop(conn: PgConnection; sql: string;
                           resultFormats: openArray[int16];
                           cacheHit, cacheMiss: bool; stmtName: string;
                           cachedFields: var seq[FieldDescription];
                           cachedColFmts: seq[int16]; cachedColOids: seq[int32];
                           callback: RowCallback; rowCount: var int64;
                           timeout: Duration = ZeroDuration)
template queryRecvLoop(conn: PgConnection; sql: string;
                       resultFormats: openArray[int16];
                       cacheHit, cacheMiss: bool; stmtName: string;
                       cachedFields: var seq[FieldDescription];
                       cachedColFmts: seq[int16]; cachedColOids: seq[int32];
                       qr: var QueryResult; timeout: Duration = ZeroDuration)
template sendExtendedExec(conn: PgConnection; cached: CachedStmt;
                          cacheHit, cacheMiss: var bool; stmtName: var string;
                          parseStep, bindStep: untyped)

exec-shaped counterpart of sendExtendedQuery: same 3-branch send sequence, but no Describe(Portal) on the cache-disabled path and no result-format / cached-column bookkeeping (exec callers discard rows). The cache-miss path still issues Describe(Statement) so the recv loop can stash parameter OIDs and field info for future cache hits.

Precondition: cached may be nil iff cacheHit == false.

template sendExtendedQuery(conn: PgConnection; resultFormats: seq[int16];
                           cached: CachedStmt; cacheHit, cacheMiss: var bool;
                           stmtName: var string;
                           cachedFields: var seq[FieldDescription];
                           cachedColFmts: var seq[int16];
                           cachedColOids: var seq[int32];
                           effectiveResultFormats: var seq[int16];
                           parseStep, bindStep: untyped)
Emit the extended-query wire sequence (Parse/Bind/Describe/Execute/Sync) for a query-shaped round-trip into conn.sendBuf, branching on the prepared-statement cache state:
  • cache hit → Bind, Execute, Sync. Pulls fields/colFmts/colOids/ resultFormats out of cached so the recv loop can reuse them.
  • cache miss → optional Close (eviction), Parse, Describe(Statement), Bind, Execute, Sync. cachedFields/cachedColFmts/cachedColOids are left for the recv loop to populate on RowDescription.
  • cache disabled (stmtCacheCapacity == 0) → Parse, Bind, Describe(Portal), Execute, Sync. Describe(Portal) is required so the recv loop sees a RowDescription for QueryResult.fields.

parseStep and bindStep are untyped blocks expanded inline so each caller can pick the right addParse / addBind / addBindRaw overload (PgParam, raw Option[seq[byte]] + OIDs, or inline raw data + ranges) without going through a proc-pointer indirection. Both blocks reference stmtName from the outer scope; the template sets it before expansion ("" on the cache-disabled path, the cache-named or freshly-generated name otherwise).

Precondition: cached may be nil iff cacheHit == false; the cache-miss and cache-disabled branches never read cached.