Types
PgPool = ref object of PgPoolOwner
- Connection pool that manages a set of PostgreSQL connections.
PoolConfig = object connConfig*: ConnConfig minSize*: int ## Minimum idle connections (default 1) maxSize*: int ## Maximum total connections (default 10) idleTimeout*: Duration ## Close idle connections after this duration (default 10min, ZeroDuration=disabled) maxLifetime*: Duration ## Max connection lifetime (default 1hr, ZeroDuration=disabled) maintenanceInterval*: Duration ## Maintenance loop interval (default 30s) healthCheckTimeout*: Duration ## Ping idle connections older than this before returning (default 5s, ZeroDuration=disabled). ## Applies to plaintext connections. For TLS connections, see `tlsHealthCheckTimeout`. tlsHealthCheckTimeout*: Duration ## Same as `healthCheckTimeout` but for TLS connections (default 500ms, ## ZeroDuration=disabled). ## MSG_PEEK-based liveness detection is blind to TLS alerts and to any ## ErrorResponse already encrypted into the TCP buffer, so TLS pools ## need a much shorter idle window than plaintext to stay correct. pingTimeout*: Duration ## Max time to wait for a health check ping response (default 5s, ZeroDuration=no timeout) acquireTimeout*: Duration ## Max time to wait for an available connection (default 30s, ZeroDuration=no timeout) maxWaiters*: int = -1 ## Max queued acquire waiters (default -1=unlimited, 0=no waiting). Rejects with PgPoolError when full. resetQuery*: string ## SQL to execute when returning a connection to the pool (default ""=disabled). ## Common values: "DISCARD ALL" (full reset, recommended for PgBouncer), ## "DEALLOCATE ALL" (clear prepared statements only), ## "RESET ALL" (reset session parameters only). ## On failure, the connection is discarded. tracer*: PgTracer ## Optional tracer for pool-level hooks (acquire/release) pipelined*: bool ## Enable implicit query batching for pool.exec/query (default false). ## When enabled, concurrent calls within the same event loop tick are ## batched into a single TCP write per connection using per-query SYNC ## for error isolation. maxPipelineSize*: int ## Max operations per pipeline batch per connection (default 0=unlimited). ## Only used when `pipelined` is true. connectBackoffInitial*: Duration ## First backoff after a maintenance-loop connect failure (default 1s, ## ZeroDuration=disabled, falls back to fixed `maintenanceInterval` retries). connectBackoffMax*: Duration ## Cap for exponential backoff growth (default 60s). Doubles each failure ## until reaching this value.
- Configuration for the connection pool. Create via initPoolConfig.
PooledConnHandle = ref object conn*: PgConnection pool*: PgPool
-
A pool-borrowed connection paired with the pool it came from.
Returned by PgPool.acquireHandle and PgPoolCluster.readConnection / writeConnection. The handle must be released with release(h) to return the connection to the pool — typically via defer: h.release(). Forgetting to release leaks the connection until the pool is closed.
No session reset: unlike withConnection / withReadConnection / withWriteConnection, release(h) does not call resetSession, so a configured resetQuery will not run and any session-level advisory locks acquired through the typed API will not be released via pg_advisory_unlock_all. Use the with*Connection templates when you want automatic session cleanup, or call pool.resetSession(h.conn) yourself before release(h).
pool is the pool the connection was actually borrowed from. For PgPoolCluster.readConnection with fallbackPrimary, this can be either the replica or the primary depending on which served the acquire.
PoolMetrics = object acquireCount*: int64 ## Total successful acquires acquireDuration*: Duration ## Total time spent waiting in acquire timeoutCount*: int64 ## Number of acquire timeouts createCount*: int64 ## Number of new connections created closeCount*: int64 ## Number of connections closed/discarded
- Cumulative pool statistics.
Procs
proc acquire(pool: PgPool): Future[PgConnection] {....stackTrace: false, raises: [ Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Acquire a connection from the pool. Tries idle connections first (with health checks), creates a new one if under maxSize, or waits for a release. Raises PgPoolError on timeout or if the pool is closed.
proc acquireHandle(pool: PgPool): Future[PooledConnHandle] {....stackTrace: false, raises: [ Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
-
Acquire a connection wrapped in a PooledConnHandle. Equivalent to acquire, but the returned handle pairs the connection with its owning pool and provides an idempotent release(h).
The caller is responsible for releasing — typically via defer: h.release(). Forgetting to release leaks the connection. release(h) does not run resetSession; prefer withConnection when automatic session cleanup is desired.
proc activeCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
- Number of connections currently checked out from the pool.
proc close(pool: PgPool; timeout = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Close the pool: stop the maintenance loop, cancel all waiters, and close all idle and active connections.
When timeout > ZeroDuration, waits up to timeout for active connections to be released. Unreleased connections are closed when they are eventually returned to the pool. Without a timeout (or ZeroDuration), active connections are closed on release.
proc computeConnectBackoff(initial, maxDelay: Duration; failures: int): Duration {. ...raises: [], tags: [], forbids: [].}
- Exponential backoff for repeated connect failures: returns initial * 2^(failures-1) capped at maxDelay. Returns ZeroDuration when backoff is disabled (initial == ZeroDuration) or failures <= 0.
proc exec(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a statement with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc execInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a statement inside a pipelined transaction with typed parameters.
proc initPoolConfig(connConfig: ConnConfig; minSize = 1; maxSize = 10; idleTimeout = minutes(10); maxLifetime = hours(1); maintenanceInterval = seconds(30); healthCheckTimeout = seconds(5); tlsHealthCheckTimeout = milliseconds(500); pingTimeout = seconds(5); acquireTimeout = seconds(30); maxWaiters = -1; resetQuery = ""; pipelined = false; maxPipelineSize = 0; connectBackoffInitial = seconds(1); connectBackoffMax = seconds(60)): PoolConfig {. ...raises: [ValueError], tags: [], forbids: [].}
-
Create a pool configuration with sensible defaults. minSize idle connections are maintained; up to maxSize total. Set resetQuery to clean session state on release (e.g. "DISCARD ALL" for PgBouncer). Set pipelined to true to enable implicit query batching for pool.exec/pool.query. When the maintenance loop fails to open a connection, subsequent retries use exponential backoff starting at connectBackoffInitial, doubling up to connectBackoffMax. Set connectBackoffInitial = ZeroDuration to disable backoff and fall back to fixed-interval retries.
Raises ValueError if parameters are invalid.
proc metrics(pool: PgPool): PoolMetrics {....raises: [], tags: [], forbids: [].}
- Cumulative pool metrics.
proc newPool(config: PoolConfig): Future[PgPool] {....stackTrace: false, raises: [Exception, ValueError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Create a new connection pool and establish minSize initial connections. Raises if any initial connection fails (all opened connections are closed on error).
proc notify(pool: PgPool; channel: string; payload: string = ""; timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [ Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Send a NOTIFY on channel with optional payload using a pooled connection.
proc pendingAcquires(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
- Number of non-cancelled waiters queued for a connection.
proc poolConfig(pool: PgPool): PoolConfig {....raises: [], tags: [], forbids: [].}
- The pool configuration.
proc query(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[ QueryResult] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc queryColumn(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[seq[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError, PgNullError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of all rows as strings. Raises PgNullError if any value is NULL.
proc queryEach(pool: PgPool; sql: string; params: seq[PgParam] = @[]; callback: RowCallback; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
-
Execute a query with typed parameters using a pooled connection, invoking callback once per row.
Row lifetime: the Row passed to callback is only valid for the duration of that single invocation. To retain a row beyond the callback, call row.clone() to get a detached copy.
proc queryExists(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[bool] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return whether any rows exist.
proc queryInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[QueryResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query inside a pipelined transaction with typed parameters.
proc queryRow(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[Row] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError, PgNoRowsError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first row. Raises PgNoRowsError if no rows are returned.
proc queryRowOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[Option[Row]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first row, or none if no rows.
proc queryValue(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[string] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError, PgNoRowsError, PgNullError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValue[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[ T] {....stackTrace: false.}
- Execute a query and return the first column of the first row as T. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValueOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns none if no rows or the value is NULL.
proc queryValueOpt[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[T]] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns none if no rows or the value is NULL.
proc queryValueOrDefault(pool: PgPool; sql: string; params: seq[PgParam] = @[]; default: string = ""; timeout: Duration = ZeroDuration): Future[ string] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; default: T; timeout: Duration = ZeroDuration): Future[T] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; sql: string; params: seq[PgParam] = @[]; default: T; timeout: Duration = ZeroDuration): Future[T] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T, inferring T from default. Returns default if no rows or the value is NULL.
proc release(conn: PgConnection) {....raises: [PgError, Exception, ValueError], tags: [ RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
-
Return a connection to its owning pool. If the connection is broken or in a transaction, it is closed instead; if waiters are queued, it is handed directly to the next waiter.
The owning pool is tracked on conn.ownerPool, set automatically when the connection is acquired from a PgPool (including pools inside a PgPoolCluster). For standalone connections created with connect this field is nil and calling release raises PgError — use conn.close() instead.
withConnection, withReadConnection, withWriteConnection, withPipeline, and withTransaction call this automatically; direct callers only need it when they manage acquire/release manually.
proc release(h: PooledConnHandle) {....raises: [PgError, Exception, ValueError], tags: [ RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
-
Return the borrowed connection to its pool. Idempotent — safe to call twice (e.g. once explicitly and once via defer).
Does not run `resetSession`. Session state (SET/SET LOCAL outside a transaction, prepared statements, advisory locks acquired via the typed API, etc.) on the connection is not cleared before it returns to the pool, so subsequent borrowers may observe it. If that matters, use withConnection / withReadConnection / withWriteConnection instead, or call await h.pool.resetSession(h.conn) yourself before release(h).
proc resetSession(pool: PgPool; conn: PgConnection): owned(Future[void]) {. ...stackTrace: false, raises: [Exception], tags: [RootEffect, TimeEffect], forbids: [].}
-
Reset session-affecting state on a connection before returning it to the pool. Releases any session-level advisory locks acquired through the typed API, then runs the configured resetQuery (if any). On failure, closes the connection so that release() will discard it.
Always safe to call: returns immediately when the connection is unusable (broken / mid-transaction) or has nothing to clean up (no resetQuery and no advisory locks held). Callers don't need to gate on the pool config.
Never propagates CatchableError: this is invoked from finally blocks in the with* helpers (and the per-call cleanup path of exec / query etc.), where a raised reset error would mask the body's original exception. Cleanup errors — including any raised from the close path's tracer hook — are swallowed.
proc simpleExec(pool: PgPool; sql: string; timeout: Duration = ZeroDuration): Future[ CommandResult] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a side-effect SQL command via the simple query protocol using a pooled connection. See PgConnection.simpleExec for semantics — no parameters, no plan cache, last command tag returned.
proc simpleQuery(pool: PgPool; sql: string; timeout: Duration = ZeroDuration): Future[ seq[QueryResult]] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, CancelledError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute one or more SQL statements via the simple query protocol using a pooled connection. See PgConnection.simpleQuery for semantics — multi-statement, no parameters, no plan cache.
Macros
macro withTransaction(pool: PgPool; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction using a pooled connection. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.
Usage: pool.withTransaction(conn): conn.exec(...) pool.withTransaction(conn, seconds(5)): conn.exec(...) pool.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): conn.exec(...) pool.withTransaction(conn, opts, seconds(5)): conn.exec(...)
Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not pool.exec(...) / pool.query(...). Pool methods acquire a separate connection, so those statements would run outside this transaction.
Timeout semantics: The timeout argument applies per-call to BEGIN, COMMIT, and ROLLBACK only — it does not bound body operations or pool.acquire(). Worst-case wall-clock = acquire(unbounded) + BEGIN(≤timeout) + body(unbounded) + COMMIT(≤timeout) [+ ROLLBACK(≤timeout) on failure]. Use withTransactionDeadline for a single wall-clock deadline covering acquire, BEGIN, body, and COMMIT.
macro withTransactionDeadline(pool: PgPool; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction bounded by a single wall-clock deadline that covers pool.acquire(), BEGIN, the body, and COMMIT together.
Usage: pool.withTransactionDeadline(conn, seconds(5)): await conn.exec(...) pool.withTransactionDeadline(conn, TransactionOptions(...), seconds(5)): await conn.exec(...)
On deadline exceeded: if a connection was already acquired, it is invalidated via invalidateOnTimeout (marked csClosed) and the pool drops it on release. ROLLBACK is not attempted. PgTimeoutError is raised. If the timeout fires while still waiting for acquire(), the waiter remains queued (cancelled best-effort) until the underlying acquire future settles; this is unavoidable under asyncdispatch.
Edge case — acquire-completion race: under asyncdispatch the only preemption point is await, but the outer wait may still fire its timeout on the same tick the body finishes. To avoid a false-positive PgTimeoutError in that window, the timeout handler checks bodyFut.completed() (success only) and, when true, returns normally instead of reporting a timeout. A still-running or failed body falls through to the standard invalidate-and-raise path. This narrows but does not eliminate the race — a PgTimeoutError from this macro still does not guarantee the transaction was rolled back if the body was mid-flight when the timer won; it only guarantees the caller gave up waiting.
On other body exceptions: ROLLBACK is issued with rollbackGrace per-call timeout.
Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not pool.exec(...) / pool.query(...). Pool methods acquire a separate connection, so those statements would run outside this transaction.
macro withTransactionRetry(pool: PgPool; retryOpts: RetryOptions; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction on a pooled connection, re-running the whole transaction when it fails with a retryable error (by default the serialization_failure / deadlock_detected SQLSTATEs — see RetryOptions). The pooled connection is acquired once and reused across attempts; a ROLLBACK between attempts returns it to a clean tsIdle state. On a non-retryable error, or once maxAttempts is exhausted, the last exception propagates. Using return inside the body is a compile-time error.
Usage: pool.withTransactionRetry(RetryOptions(maxAttempts: 3), conn): await conn.exec(...) pool.withTransactionRetry(RetryOptions(...), conn, seconds(5)): await conn.exec(...) pool.withTransactionRetry(RetryOptions(...), conn, TransactionOptions(isolation: ilSerializable)): await conn.exec(...) pool.withTransactionRetry(RetryOptions(...), conn, opts, seconds(5)): await conn.exec(...)
Idempotency: body runs once per attempt, so it must be safe to re-run; non-database side effects are repeated on every retry. See withTransaction for the timeout semantics and the in-body conn.exec(...) warning.
macro withTransactionRetryDeadline(pool: PgPool; retryOpts: RetryOptions; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction on a pooled connection, bounded by a single wall-clock deadline that is shared across all retry attempts (covering acquire(), BEGIN, body, and COMMIT of every attempt), re-running the whole transaction on a retryable error while budget remains.
Usage: pool.withTransactionRetryDeadline(RetryOptions(maxAttempts: 3), conn, seconds(5)): await conn.exec(...) pool.withTransactionRetryDeadline(RetryOptions(...), conn, TransactionOptions(...), seconds(5)): await conn.exec(...)
Each attempt acquires a fresh connection (the previous one is released by the per-attempt finally), so a failed/poisoned connection is dropped by the pool's health check rather than reused. Worst-case wall-clock is deadline, not maxAttempts * deadline.
On deadline exceeded: the in-flight connection (if any) is invalidated and PgTimeoutError is raised — never retried. On a retryable error: ROLLBACK runs with rollbackGrace and the transaction is retried if budget remains. See withTransactionDeadline for the acquire-race / completed() rationale and the in-body conn.exec(...) warning. Idempotency: body runs once per attempt; non-database side effects repeat. Using return inside the body is a compile-time error.
Templates
template withConnection(pool: PgPool; conn, body: untyped)
- Acquire a connection, execute body, then release it back to the pool. The connection is available as conn inside the body. resetSession runs before release, so a configured resetQuery is applied and any session-level advisory locks acquired through the typed API are released via pg_advisory_unlock_all.
template withPipeline(pool: PgPool; pipeline, body: untyped)
- Acquire a connection, create a Pipeline, execute body, then release. The pipeline identifier is a Pipeline available in body.