Internal building blocks shared by every pg_client/ submodule.
Contains types/constants for transaction options, the inline-parameter encoder, and the receive-loop templates that the extended-query path (exec, query, queryEach, queryDirect, …) reuses. Re-exported through pg_client.nim; submodules import this module directly via ./core.
Types
AccessMode = enum amDefault, amReadWrite, amReadOnly
- PostgreSQL transaction access mode (read-write or read-only).
DeferrableMode = enum dmDefault, dmDeferrable, dmNotDeferrable
- PostgreSQL transaction deferrable mode (for serializable read-only transactions).
IsolationLevel = enum ilDefault, ilReadCommitted, ilRepeatableRead, ilSerializable, ilReadUncommitted
- PostgreSQL transaction isolation level.
RetryOptions = object maxAttempts*: int = 3 ## Total attempts including the first. Values ``<= 1`` run the body exactly ## once with no retry (the body always runs at least once). baseDelayMs*: int = 20 ## Backoff before the first retry, in milliseconds. maxDelayMs*: int = 1000 ## Upper bound on the backoff delay, in milliseconds. multiplier*: float = 2.0 ## Exponential growth factor between attempts. jitter*: bool = true ## Full jitter: pick a random delay in ``0 .. computed``. Uses the ## `std/random` global RNG; to de-correlate retries *across processes* ## the application must call ``randomize()`` once at startup — otherwise ## every process replays the same (default-seeded) jitter sequence. retryableStates*: seq[string] = ["40001", "40P01"] ## SQLSTATE codes that trigger a retry. Defaults to serialization_failure ## (40001) and deadlock_detected (40P01) — the transaction-level conflicts ## PostgreSQL recommends resolving by re-running the whole transaction.
- Controls how withTransactionRetry re-runs a transaction after a retryable failure. Relies on Nim object field defaults, so partial construction (e.g. RetryOptions(maxAttempts: 5)) leaves the unset fields at their defaults below.
TransactionOptions = object isolation*: IsolationLevel access*: AccessMode deferrable*: DeferrableMode
- Options for BEGIN: isolation level, access mode, and deferrable mode.
Consts
copyBatchSize = 262144
- 256KB batch threshold for COPY IN buffering
StmtCacheInvalidatingStates = ["26000", "0A000"]
-
SQLSTATEs that mean a cache-hit prepared statement can no longer be reused as cached and must be evicted so the next call re-parses it:
- 26000 invalid_sql_statement_name — the server no longer has the prepared statement (e.g. DISCARD ALL / DEALLOCATE ran on the session, or a pooled backend was reset). Re-parse recreates it.
- 0A000 feature_not_supported — chiefly "cached plan must not change result type": DDL altered the statement's result columns, so the server rejects the cached (fixed-result) plan on Execute. Because the cache-hit path skips Describe, only a re-parse picks up the new schema; without eviction every subsequent hit would re-raise 0A000 forever. Other 0A000 causes simply re-parse once more (the error still propagates).
42P18 (indeterminate_datatype) is intentionally absent: it is a Parse-phase error and cannot arise on a cache hit (no Parse is sent), and a cache miss that fails to Parse never reaches addStmtCache.
Procs
proc backoffDelayMs(opts: RetryOptions; attempt: int): int {....raises: [], tags: [], forbids: [].}
- Backoff (milliseconds) to wait after the attempt-th failure (1-based). Exponential baseDelayMs * multiplier^(attempt-1) capped at maxDelayMs; with jitter the result is randomized within 0 .. computed to spread out retries from many contending clients. Jitter draws from the std/random global RNG — see RetryOptions.jitter on calling randomize() for cross-process de-correlation.
proc buildBeginSql(opts: TransactionOptions): string {....raises: [], tags: [], forbids: [].}
- Build a BEGIN SQL statement with the specified transaction options (isolation level, access mode, deferrable mode).
func cacheHitColFmts(resultFormats: openArray[int16]; cachedColFmts: seq[int16]; numCols: int): seq[int16] {....raises: [], tags: [], forbids: [].}
- Per-column decode formats for a statement-cache HIT. Use the formats this Bind actually requested (deriveColFmts of resultFormats, which on a cache hit is effectiveResultFormats), not the formats negotiated when the statement was first cached: the same SQL can be re-issued with a different resultFormat (e.g. cached as rfAuto/rfBinary, now rfText) and the server returns rows in the format this Bind asked for; reusing the stale cached format would reinterpret the bytes and silently corrupt values (text "42" decoded as a big-endian int, etc.). The cachedColFmts fallback covers the caller-didn't-override / zero-column cases, where resultFormats is empty. Shared by all four cache-hit Extended Query paths so they cannot drift.
func deriveColFmts(resultFormats: openArray[int16]; numCols: int): seq[int16] {. ...raises: [], tags: [], forbids: [].}
- Expand wire-level Bind result-format codes to one code per column. A single code broadcasts to every column (Bind's "apply to all" form); a per-column array is applied positionally; any column past the end of a multi-element array defaults to text (0). Shared by every Extended Query path that has to decode rows under the formats this Bind actually requested (cache hit and cache miss alike).
proc extractParams(params: openArray[PgParam]): tuple[oids: seq[int32], formats: seq[int16], values: seq[Option[seq[byte]]]] {....raises: [], tags: [], forbids: [].}
proc flattenInline(params: openArray[PgParamInline]): tuple[data: seq[byte], ranges: seq[tuple[off: int32, len: int32]], oids: seq[int32], formats: seq[int16]] {....raises: [], tags: [], forbids: [].}
proc invalidateIfOidMismatch(conn: PgConnection; sql: string; cached: CachedStmt; currentOids: openArray[int32]; cacheHit: var bool) {....raises: [], tags: [], forbids: [].}
-
If the caller is about to bind currentOids to a cached prepared statement whose parse-time OIDs do not match, evict the cache entry (queue the server-side Close via pendingStmtCloses, remove the local entry) and set cacheHit to false so the caller's cache-miss path runs and re-parses under the new OIDs.
No-op when cacheHit is already false — cached is only dereferenced under the cacheHit guard, so passing nil is safe as long as cacheHit is false.
proc invalidateIfOidMismatch(conn: PgConnection; sql: string; cached: CachedStmt; params: openArray[PgParam]; cacheHit: var bool) {....raises: [], tags: [], forbids: [].}
- PgParam overload for the query/exec call paths. Avoids the seq[int32] allocation a separate OID-projection step would require — the per-parameter .oid reads happen inside paramOidsMatch.
proc isRetryableTxError(e: ref CatchableError; states: openArray[string]): bool {. ...raises: [], tags: [], forbids: [].}
- Whether e is a PgQueryError whose SQLSTATE is in states. Non-PgQueryError failures (connection drops, timeouts) are never retryable here: they leave the connection unusable for a fresh attempt.
proc paramOidsMatch(cachedOids, currentOids: openArray[int32]): bool {. ...raises: [], tags: [], forbids: [].}
-
Whether a cached prepared statement's parse-time parameter OIDs are compatible with the OIDs a caller wants to bind now.
OID 0 (unknown) on either side is treated as a wildcard: the server inferred or will infer the type, so we cannot pre-judge a mismatch. A length mismatch is treated as incompatible.
Empty-vs-empty (parameter-less SQL) matches trivially: the loop body does not execute and the length check passes.
Callers use a false result to invalidate the cache entry and re-parse the statement with the new OIDs, preventing the server from interpreting bind payloads under the statement's original (and now wrong) parse-time type assumptions.
proc paramOidsMatch(cachedOids: openArray[int32]; params: openArray[PgParam]): bool {. ...raises: [], tags: [], forbids: [].}
- PgParam overload that reads each parameter's oid field directly, avoiding a temporary seq[int32] projection on the query/exec cache-hit path. Semantics match the openArray[int32] overload.
func toFormatCodes(rf: ResultFormat): seq[int16] {....raises: [], tags: [], forbids: [].}
- Convert a high-level ResultFormat to wire-protocol format codes.
Templates
template appendInlineParam(data: var seq[byte]; ranges: var seq[tuple[off: int32, len: int32]]; oids: var seq[int32]; formats: var seq[int16]; p: PgParamInline)
- Shared encoder for a single PgParamInline into SoA buffers. Used by both flattenInline (per-call temporaries) and Pipeline.appendInline (pipeline-wide SoA). Keeping the NULL / empty / inline / overflow branching in one place means wire-format semantics cannot drift between the two code paths.
template execRecvLoop(conn: PgConnection; sql: string; cacheHit, cacheMiss: bool; stmtName: string; commandTag: var string)
- Receive-loop counterpart of queryRecvLoop for the extended-query exec path: discards DataRows (exec callers don't need rows) and exposes only the CommandComplete tag via the commandTag out-parameter. Shared by execImpl (both overloads), execInlineImpl, and execDirectRunImpl.
template queryEachRecvLoop(conn: PgConnection; sql: string; resultFormats: openArray[int16]; cacheHit, cacheMiss: bool; stmtName: string; cachedFields: var seq[FieldDescription]; cachedColFmts: seq[int16]; cachedColOids: seq[int32]; callback: RowCallback; rowCount: var int64)
template queryRecvLoop(conn: PgConnection; sql: string; resultFormats: openArray[int16]; cacheHit, cacheMiss: bool; stmtName: string; cachedFields: var seq[FieldDescription]; cachedColFmts: seq[int16]; cachedColOids: seq[int32]; qr: var QueryResult)
template sendExtendedExec(conn: PgConnection; cached: CachedStmt; cacheHit, cacheMiss: var bool; stmtName: var string; parseStep, bindStep: untyped)
-
exec-shaped counterpart of sendExtendedQuery: same 3-branch send sequence, but no Describe(Portal) on the cache-disabled path and no result-format / cached-column bookkeeping (exec callers discard rows). The cache-miss path still issues Describe(Statement) so the recv loop can stash parameter OIDs and field info for future cache hits.
Precondition: cached may be nil iff cacheHit == false.
template sendExtendedQuery(conn: PgConnection; resultFormats: seq[int16]; cached: CachedStmt; cacheHit, cacheMiss: var bool; stmtName: var string; cachedFields: var seq[FieldDescription]; cachedColFmts: var seq[int16]; cachedColOids: var seq[int32]; effectiveResultFormats: var seq[int16]; parseStep, bindStep: untyped)
-
Emit the extended-query wire sequence (Parse/Bind/Describe/Execute/Sync) for a query-shaped round-trip into conn.sendBuf, branching on the prepared-statement cache state:
- cache hit → Bind, Execute, Sync. Pulls fields/colFmts/colOids/ resultFormats out of cached so the recv loop can reuse them.
- cache miss → optional Close (eviction), Parse, Describe(Statement), Bind, Execute, Sync. cachedFields/cachedColFmts/cachedColOids are left for the recv loop to populate on RowDescription.
- cache disabled (stmtCacheCapacity == 0) → Parse, Bind, Describe(Portal), Execute, Sync. Describe(Portal) is required so the recv loop sees a RowDescription for QueryResult.fields.
parseStep and bindStep are untyped blocks expanded inline so each caller can pick the right addParse / addBind / addBindRaw overload (PgParam, raw Option[seq[byte]] + OIDs, or inline raw data + ranges) without going through a proc-pointer indirection. Both blocks reference stmtName from the outer scope; the template sets it before expansion ("" on the cache-disabled path, the cache-named or freshly-generated name otherwise).
Precondition: cached may be nil iff cacheHit == false; the cache-miss and cache-disabled branches never read cached.