Types
PgPool = ref object of PgPoolOwner
- Connection pool that manages a set of PostgreSQL connections.
PoolConfig = object connConfig*: ConnConfig minSize*: int ## Minimum idle connections (default 1) maxSize*: int ## Maximum total connections (default 10) idleTimeout*: Duration ## Close idle connections after this duration (default 10min, ZeroDuration=disabled) maxLifetime*: Duration ## Max connection lifetime (default 1hr, ZeroDuration=disabled) maintenanceInterval*: Duration ## Maintenance loop interval (default 30s) healthCheckTimeout*: Duration ## Ping idle connections older than this before returning (default 5s, ZeroDuration=disabled) pingTimeout*: Duration ## Max time to wait for a health check ping response (default 5s, ZeroDuration=no timeout) acquireTimeout*: Duration ## Max time to wait for an available connection (default 30s, ZeroDuration=no timeout) maxWaiters*: int = -1 ## Max queued acquire waiters (default -1=unlimited, 0=no waiting). Rejects with PgPoolError when full. resetQuery*: string ## SQL to execute when returning a connection to the pool (default ""=disabled). ## Common values: "DISCARD ALL" (full reset, recommended for PgBouncer), ## "DEALLOCATE ALL" (clear prepared statements only), ## "RESET ALL" (reset session parameters only). ## On failure, the connection is discarded. tracer*: PgTracer ## Optional tracer for pool-level hooks (acquire/release) pipelined*: bool ## Enable implicit query batching for pool.exec/query (default false). ## When enabled, concurrent calls within the same event loop tick are ## batched into a single TCP write per connection using per-query SYNC ## for error isolation. maxPipelineSize*: int ## Max operations per pipeline batch per connection (default 0=unlimited). ## Only used when `pipelined` is true. connectBackoffInitial*: Duration ## First backoff after a maintenance-loop connect failure (default 1s, ## ZeroDuration=disabled, falls back to fixed `maintenanceInterval` retries). connectBackoffMax*: Duration ## Cap for exponential backoff growth (default 60s). Doubles each failure ## until reaching this value.
- Configuration for the connection pool. Create via initPoolConfig.
PoolMetrics = object acquireCount*: int64 ## Total successful acquires acquireDuration*: Duration ## Total time spent waiting in acquire timeoutCount*: int64 ## Number of acquire timeouts createCount*: int64 ## Number of new connections created closeCount*: int64 ## Number of connections closed/discarded
- Cumulative pool statistics.
Procs
proc acquire(pool: PgPool): Future[PgConnection] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Acquire a connection from the pool. Tries idle connections first (with health checks), creates a new one if under maxSize, or waits for a release. Raises PgPoolError on timeout or if the pool is closed.
proc activeCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
- Number of connections currently checked out from the pool.
proc close(pool: PgPool; timeout = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError], tags: [RootEffect, TimeEffect], forbids: [].}
-
Close the pool: stop the maintenance loop, cancel all waiters, and close all idle and active connections.
When timeout > ZeroDuration, waits up to timeout for active connections to be released. Unreleased connections are closed when they are eventually returned to the pool. Without a timeout (or ZeroDuration), active connections are closed on release.
proc computeConnectBackoff(initial, maxDelay: Duration; failures: int): Duration {. ...raises: [], tags: [], forbids: [].}
- Exponential backoff for repeated connect failures: returns initial * 2^(failures-1) capped at maxDelay. Returns ZeroDuration when backoff is disabled (initial == ZeroDuration) or failures <= 0.
proc exec(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a statement with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc execInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a statement inside a pipelined transaction with typed parameters.
proc initPoolConfig(connConfig: ConnConfig; minSize = 1; maxSize = 10; idleTimeout = minutes(10); maxLifetime = hours(1); maintenanceInterval = seconds(30); healthCheckTimeout = seconds(5); pingTimeout = seconds(5); acquireTimeout = seconds(30); maxWaiters = -1; resetQuery = ""; pipelined = false; maxPipelineSize = 0; connectBackoffInitial = seconds(1); connectBackoffMax = seconds(60)): PoolConfig {. ...raises: [ValueError], tags: [], forbids: [].}
-
Create a pool configuration with sensible defaults. minSize idle connections are maintained; up to maxSize total. Set resetQuery to clean session state on release (e.g. "DISCARD ALL" for PgBouncer). Set pipelined to true to enable implicit query batching for pool.exec/pool.query. When the maintenance loop fails to open a connection, subsequent retries use exponential backoff starting at connectBackoffInitial, doubling up to connectBackoffMax. Set connectBackoffInitial = ZeroDuration to disable backoff and fall back to fixed-interval retries.
Raises ValueError if parameters are invalid.
proc metrics(pool: PgPool): PoolMetrics {....raises: [], tags: [], forbids: [].}
- Cumulative pool metrics.
proc newPool(config: PoolConfig): Future[PgPool] {....stackTrace: false, raises: [Exception, ValueError, CatchableError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Create a new connection pool and establish minSize initial connections. Raises if any initial connection fails (all opened connections are closed on error).
proc notify(pool: PgPool; channel: string; payload: string = ""; timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [ Exception, ValueError, PgPoolError, CatchableError, PgError, PgQueryError, PgConnectionError, ProtocolError, SslError, KeyError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Send a NOTIFY on channel with optional payload using a pooled connection.
proc pendingAcquires(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
- Number of non-cancelled waiters queued for a connection.
proc poolConfig(pool: PgPool): PoolConfig {....raises: [], tags: [], forbids: [].}
- The pool configuration.
proc query(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[ QueryResult] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc queryColumn(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[seq[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgNullError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of all rows as strings. Raises PgNullError if any value is NULL.
proc queryEach(pool: PgPool; sql: string; params: seq[PgParam] = @[]; callback: RowCallback; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
-
Execute a query with typed parameters using a pooled connection, invoking callback once per row.
Row lifetime: the Row passed to callback is only valid for the duration of that single invocation. To retain a row beyond the callback, call row.clone() to get a detached copy.
proc queryExists(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[bool] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return whether any rows exist.
proc queryInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[QueryResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query inside a pipelined transaction with typed parameters.
proc queryRow(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[Row] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgNoRowsError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first row. Raises PgNoRowsError if no rows are returned.
proc queryRowOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[Option[Row]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first row, or none if no rows.
proc queryValue(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[string] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgNoRowsError, PgNullError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValue[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[ T] {....stackTrace: false.}
- Execute a query and return the first column of the first row as T. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValueOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns none if no rows or the value is NULL.
proc queryValueOpt[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[T]] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns none if no rows or the value is NULL.
proc queryValueOrDefault(pool: PgPool; sql: string; params: seq[PgParam] = @[]; default: string = ""; timeout: Duration = ZeroDuration): Future[ string] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgTypeError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; default: T; timeout: Duration = ZeroDuration): Future[T] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; sql: string; params: seq[PgParam] = @[]; default: T; timeout: Duration = ZeroDuration): Future[T] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T, inferring T from default. Returns default if no rows or the value is NULL.
proc release(conn: PgConnection) {....raises: [PgError, Exception, ValueError], tags: [RootEffect], forbids: [].}
-
Return a connection to its owning pool. If the connection is broken or in a transaction, it is closed instead; if waiters are queued, it is handed directly to the next waiter.
The owning pool is tracked on conn.ownerPool, set automatically when the connection is acquired from a PgPool (including pools inside a PgPoolCluster). For standalone connections created with connect this field is nil and calling release raises PgError — use conn.close() instead.
withConnection, withReadConnection, withWriteConnection, withPipeline, and withTransaction call this automatically; direct callers only need it when they manage acquire/release manually.
proc resetSession(pool: PgPool; conn: PgConnection): owned(Future[void]) {. ...stackTrace: false, raises: [Exception], tags: [RootEffect, TimeEffect], forbids: [].}
-
Execute the configured reset query on a connection before returning it to the pool. On failure, closes the connection so that release() will discard it.
Never propagates CatchableError: this is invoked from finally blocks in the with* helpers (and the per-call cleanup path of exec / query etc.), where a raised reset error would mask the body's original exception. Cleanup errors — including any raised from the close path's tracer hook — are swallowed.
proc simpleExec(pool: PgPool; sql: string; timeout: Duration = ZeroDuration): Future[ CommandResult] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute a side-effect SQL command via the simple query protocol using a pooled connection. See PgConnection.simpleExec for semantics — no parameters, no plan cache, last command tag returned.
proc simpleQuery(pool: PgPool; sql: string): Future[seq[QueryResult]] {. ...stackTrace: false, raises: [Exception, ValueError, PgPoolError, CatchableError, PgError, PgConnectionError], tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
- Execute one or more SQL statements via the simple query protocol using a pooled connection. See PgConnection.simpleQuery for semantics — multi-statement, no parameters, no plan cache.
Macros
macro withTransaction(pool: PgPool; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction using a pooled connection. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.
Usage: pool.withTransaction(conn): conn.exec(...) pool.withTransaction(conn, seconds(5)): conn.exec(...) pool.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): conn.exec(...) pool.withTransaction(conn, opts, seconds(5)): conn.exec(...)
Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not pool.exec(...) / pool.query(...). Pool methods acquire a separate connection, so those statements would run outside this transaction.
Templates
template withConnection(pool: PgPool; conn, body: untyped)
- Acquire a connection, execute body, then release it back to the pool. The connection is available as conn inside the body. If resetQuery is configured, session state is reset before release.
template withPipeline(pool: PgPool; pipeline, body: untyped)
- Acquire a connection, create a Pipeline, execute body, then release. The pipeline identifier is a Pipeline available in body.