Internal building blocks shared by every pg_client/ submodule.
Contains types/constants for transaction options, the inline-parameter encoder, and the receive-loop templates that the extended-query path (exec, query, queryEach, queryDirect, …) reuses. Re-exported through pg_client.nim; submodules import this module directly via ./core.
Types
AccessMode = enum amDefault, amReadWrite, amReadOnly
- PostgreSQL transaction access mode (read-write or read-only).
DeferrableMode = enum dmDefault, dmDeferrable, dmNotDeferrable
- PostgreSQL transaction deferrable mode (for serializable read-only transactions).
IsolationLevel = enum ilDefault, ilReadCommitted, ilRepeatableRead, ilSerializable, ilReadUncommitted
- PostgreSQL transaction isolation level.
RetryOptions = object maxAttempts*: int = 3 ## Total attempts including the first. Values ``<= 1`` run the body exactly ## once with no retry (the body always runs at least once). baseDelayMs*: int = 20 ## Backoff before the first retry, in milliseconds. maxDelayMs*: int = 1000 ## Upper bound on the backoff delay, in milliseconds. multiplier*: float = 2.0 ## Exponential growth factor between attempts. jitter*: bool = true ## Full jitter: pick a random delay in ``0 .. computed``. Uses the ## `std/random` global RNG; to de-correlate retries *across processes* ## the application must call ``randomize()`` once at startup — otherwise ## every process replays the same (default-seeded) jitter sequence. retryableStates*: seq[string] = ["40001", "40P01"] ## SQLSTATE codes that trigger a retry. Defaults to serialization_failure ## (40001) and deadlock_detected (40P01) — the transaction-level conflicts ## PostgreSQL recommends resolving by re-running the whole transaction.
- Controls how withTransactionRetry re-runs a transaction after a retryable failure. Relies on Nim object field defaults, so partial construction (e.g. RetryOptions(maxAttempts: 5)) leaves the unset fields at their defaults below.
TransactionOptions = object isolation*: IsolationLevel access*: AccessMode deferrable*: DeferrableMode
- Options for BEGIN: isolation level, access mode, and deferrable mode.
Consts
copyBatchSize = 262144
- 256KB batch threshold for COPY IN buffering
StmtCacheInvalidatingStates = ["26000", "0A000"]
-
SQLSTATEs that mean a cache-hit prepared statement can no longer be reused as cached and must be evicted so the next call re-parses it:
- 26000 invalid_sql_statement_name — the server no longer has the prepared statement (e.g. DISCARD ALL / DEALLOCATE ran on the session, or a pooled backend was reset). Re-parse recreates it.
- 0A000 feature_not_supported — chiefly "cached plan must not change result type": DDL altered the statement's result columns, so the server rejects the cached (fixed-result) plan on Execute. Because the cache-hit path skips Describe, only a re-parse picks up the new schema; without eviction every subsequent hit would re-raise 0A000 forever. Other 0A000 causes simply re-parse once more (the error still propagates).
42P18 (indeterminate_datatype) is intentionally absent: it is a Parse-phase error and cannot arise on a cache hit (no Parse is sent), and a cache miss that fails to Parse never reaches addStmtCache.
Procs
proc backoffDelayMs(opts: RetryOptions; attempt: int): int {....raises: [], tags: [], forbids: [].}
- Backoff (milliseconds) to wait after the attempt-th failure (1-based). Exponential baseDelayMs * multiplier^(attempt-1) capped at maxDelayMs; with jitter the result is randomized within 0 .. computed to spread out retries from many contending clients. Jitter draws from the std/random global RNG — see RetryOptions.jitter on calling randomize() for cross-process de-correlation.
proc buildBeginSql(opts: TransactionOptions): string {....raises: [], tags: [], forbids: [].}
- Build a BEGIN SQL statement with the specified transaction options (isolation level, access mode, deferrable mode).
proc extractParams(params: openArray[PgParam]): tuple[oids: seq[int32], formats: seq[int16], values: seq[Option[seq[byte]]]] {....raises: [], tags: [], forbids: [].}
proc flattenInline(params: openArray[PgParamInline]): tuple[data: seq[byte], ranges: seq[tuple[off: int32, len: int32]], oids: seq[int32], formats: seq[int16]] {....raises: [], tags: [], forbids: [].}
proc invalidateIfOidMismatch(conn: PgConnection; sql: string; cached: CachedStmt; currentOids: openArray[int32]; cacheHit: var bool) {....raises: [], tags: [], forbids: [].}
-
If the caller is about to bind currentOids to a cached prepared statement whose parse-time OIDs do not match, evict the cache entry (queue the server-side Close via pendingStmtCloses, remove the local entry) and set cacheHit to false so the caller's cache-miss path runs and re-parses under the new OIDs.
No-op when cacheHit is already false — cached is only dereferenced under the cacheHit guard, so passing nil is safe as long as cacheHit is false.
proc invalidateIfOidMismatch(conn: PgConnection; sql: string; cached: CachedStmt; params: openArray[PgParam]; cacheHit: var bool) {....raises: [], tags: [], forbids: [].}
- PgParam overload for the query/exec call paths. Avoids the seq[int32] allocation a separate OID-projection step would require — the per-parameter .oid reads happen inside paramOidsMatch.
proc isRetryableTxError(e: ref CatchableError; states: openArray[string]): bool {. ...raises: [], tags: [], forbids: [].}
- Whether e is a PgQueryError whose SQLSTATE is in states. Non-PgQueryError failures (connection drops, timeouts) are never retryable here: they leave the connection unusable for a fresh attempt.
proc paramOidsMatch(cachedOids, currentOids: openArray[int32]): bool {. ...raises: [], tags: [], forbids: [].}
-
Whether a cached prepared statement's parse-time parameter OIDs are compatible with the OIDs a caller wants to bind now.
OID 0 (unknown) on either side is treated as a wildcard: the server inferred or will infer the type, so we cannot pre-judge a mismatch. A length mismatch is treated as incompatible.
Empty-vs-empty (parameter-less SQL) matches trivially: the loop body does not execute and the length check passes.
Callers use a false result to invalidate the cache entry and re-parse the statement with the new OIDs, preventing the server from interpreting bind payloads under the statement's original (and now wrong) parse-time type assumptions.
proc paramOidsMatch(cachedOids: openArray[int32]; params: openArray[PgParam]): bool {. ...raises: [], tags: [], forbids: [].}
- PgParam overload that reads each parameter's oid field directly, avoiding a temporary seq[int32] projection on the query/exec cache-hit path. Semantics match the openArray[int32] overload.
func toFormatCodes(rf: ResultFormat): seq[int16] {....raises: [], tags: [], forbids: [].}
- Convert a high-level ResultFormat to wire-protocol format codes.
Templates
template appendInlineParam(data: var seq[byte]; ranges: var seq[tuple[off: int32, len: int32]]; oids: var seq[int32]; formats: var seq[int16]; p: PgParamInline)
- Shared encoder for a single PgParamInline into SoA buffers. Used by both flattenInline (per-call temporaries) and Pipeline.appendInline (pipeline-wide SoA). Keeping the NULL / empty / inline / overflow branching in one place means wire-format semantics cannot drift between the two code paths.
template execRecvLoop(conn: PgConnection; sql: string; cacheHit, cacheMiss: bool; stmtName: string; commandTag: var string; timeout: Duration = ZeroDuration)
- Receive-loop counterpart of queryRecvLoop for the extended-query exec path: discards DataRows (exec callers don't need rows) and exposes only the CommandComplete tag via the commandTag out-parameter. Shared by execImpl (both overloads), execInlineImpl, and execDirectRunImpl.
template queryEachRecvLoop(conn: PgConnection; sql: string; resultFormats: openArray[int16]; cacheHit, cacheMiss: bool; stmtName: string; cachedFields: var seq[FieldDescription]; cachedColFmts: seq[int16]; cachedColOids: seq[int32]; callback: RowCallback; rowCount: var int64; timeout: Duration = ZeroDuration)
template queryRecvLoop(conn: PgConnection; sql: string; resultFormats: openArray[int16]; cacheHit, cacheMiss: bool; stmtName: string; cachedFields: var seq[FieldDescription]; cachedColFmts: seq[int16]; cachedColOids: seq[int32]; qr: var QueryResult; timeout: Duration = ZeroDuration)
template sendExtendedExec(conn: PgConnection; cached: CachedStmt; cacheHit, cacheMiss: var bool; stmtName: var string; parseStep, bindStep: untyped)
-
exec-shaped counterpart of sendExtendedQuery: same 3-branch send sequence, but no Describe(Portal) on the cache-disabled path and no result-format / cached-column bookkeeping (exec callers discard rows). The cache-miss path still issues Describe(Statement) so the recv loop can stash parameter OIDs and field info for future cache hits.
Precondition: cached may be nil iff cacheHit == false.
template sendExtendedQuery(conn: PgConnection; resultFormats: seq[int16]; cached: CachedStmt; cacheHit, cacheMiss: var bool; stmtName: var string; cachedFields: var seq[FieldDescription]; cachedColFmts: var seq[int16]; cachedColOids: var seq[int32]; effectiveResultFormats: var seq[int16]; parseStep, bindStep: untyped)
-
Emit the extended-query wire sequence (Parse/Bind/Describe/Execute/Sync) for a query-shaped round-trip into conn.sendBuf, branching on the prepared-statement cache state:
- cache hit → Bind, Execute, Sync. Pulls fields/colFmts/colOids/ resultFormats out of cached so the recv loop can reuse them.
- cache miss → optional Close (eviction), Parse, Describe(Statement), Bind, Execute, Sync. cachedFields/cachedColFmts/cachedColOids are left for the recv loop to populate on RowDescription.
- cache disabled (stmtCacheCapacity == 0) → Parse, Bind, Describe(Portal), Execute, Sync. Describe(Portal) is required so the recv loop sees a RowDescription for QueryResult.fields.
parseStep and bindStep are untyped blocks expanded inline so each caller can pick the right addParse / addBind / addBindRaw overload (PgParam, raw Option[seq[byte]] + OIDs, or inline raw data + ranges) without going through a proc-pointer indirection. Both blocks reference stmtName from the outer scope; the template sets it before expansion ("" on the cache-disabled path, the cache-named or freshly-generated name otherwise).
Precondition: cached may be nil iff cacheHit == false; the cache-miss and cache-disabled branches never read cached.