async_postgres/pg_pool

Search:
Group by:

Types

PgPool = ref object of PgPoolOwner
Connection pool that manages a set of PostgreSQL connections.
PoolConfig = object
  connConfig*: ConnConfig
  minSize*: int              ## Minimum idle connections (default 1)
  maxSize*: int              ## Maximum total connections (default 10)
  idleTimeout*: Duration     ## Close idle connections after this duration (default 10min, ZeroDuration=disabled)
  maxLifetime*: Duration     ## Max connection lifetime (default 1hr, ZeroDuration=disabled)
  maintenanceInterval*: Duration ## Maintenance loop interval (default 30s)
  healthCheckTimeout*: Duration ## Ping idle connections older than this before returning (default 5s, ZeroDuration=disabled)
  pingTimeout*: Duration     ## Max time to wait for a health check ping response (default 5s, ZeroDuration=no timeout)
  acquireTimeout*: Duration  ## Max time to wait for an available connection (default 30s, ZeroDuration=no timeout)
  maxWaiters*: int = -1      ## Max queued acquire waiters (default -1=unlimited, 0=no waiting). Rejects with PgPoolError when full.
  resetQuery*: string ## SQL to execute when returning a connection to the pool (default ""=disabled).
                      ## Common values: "DISCARD ALL" (full reset, recommended for PgBouncer),
                      ## "DEALLOCATE ALL" (clear prepared statements only),
                      ## "RESET ALL" (reset session parameters only).
                      ## On failure, the connection is discarded.
  tracer*: PgTracer          ## Optional tracer for pool-level hooks (acquire/release)
  pipelined*: bool ## Enable implicit query batching for pool.exec/query (default false).
                   ## When enabled, concurrent calls within the same event loop tick are
                   ## batched into a single TCP write per connection using per-query SYNC
                   ## for error isolation.
  maxPipelineSize*: int ## Max operations per pipeline batch per connection (default 0=unlimited).
                        ## Only used when `pipelined` is true.
  connectBackoffInitial*: Duration ## First backoff after a maintenance-loop connect failure (default 1s,
                                   ## ZeroDuration=disabled, falls back to fixed `maintenanceInterval` retries).
  connectBackoffMax*: Duration ## Cap for exponential backoff growth (default 60s). Doubles each failure
                               ## until reaching this value.
Configuration for the connection pool. Create via initPoolConfig.
PoolMetrics = object
  acquireCount*: int64       ## Total successful acquires
  acquireDuration*: Duration ## Total time spent waiting in acquire
  timeoutCount*: int64       ## Number of acquire timeouts
  createCount*: int64        ## Number of new connections created
  closeCount*: int64         ## Number of connections closed/discarded
Cumulative pool statistics.

Procs

proc acquire(pool: PgPool): Future[PgConnection] {....stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Acquire a connection from the pool. Tries idle connections first (with health checks), creates a new one if under maxSize, or waits for a release. Raises PgPoolError on timeout or if the pool is closed.
proc activeCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of connections currently checked out from the pool.
proc close(pool: PgPool; timeout = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Close the pool: stop the maintenance loop, cancel all waiters, and close all idle and active connections.

When timeout > ZeroDuration, waits up to timeout for active connections to be released. Unreleased connections are closed when they are eventually returned to the pool. Without a timeout (or ZeroDuration), active connections are closed on release.

proc computeConnectBackoff(initial, maxDelay: Duration; failures: int): Duration {.
    ...raises: [], tags: [], forbids: [].}
Exponential backoff for repeated connect failures: returns initial * 2^(failures-1) capped at maxDelay. Returns ZeroDuration when backoff is disabled (initial == ZeroDuration) or failures <= 0.
proc exec(pool: PgPool; sql: string; params: seq[PgParam] = @[];
          timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a statement with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc execInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                       timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a statement inside a pipelined transaction with typed parameters.
proc idleCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of idle connections currently in the pool.
proc initPoolConfig(connConfig: ConnConfig; minSize = 1; maxSize = 10;
                    idleTimeout = minutes(10); maxLifetime = hours(1);
                    maintenanceInterval = seconds(30);
                    healthCheckTimeout = seconds(5); pingTimeout = seconds(5);
                    acquireTimeout = seconds(30); maxWaiters = -1;
                    resetQuery = ""; pipelined = false; maxPipelineSize = 0;
                    connectBackoffInitial = seconds(1);
                    connectBackoffMax = seconds(60)): PoolConfig {.
    ...raises: [ValueError], tags: [], forbids: [].}

Create a pool configuration with sensible defaults. minSize idle connections are maintained; up to maxSize total. Set resetQuery to clean session state on release (e.g. "DISCARD ALL" for PgBouncer). Set pipelined to true to enable implicit query batching for pool.exec/pool.query. When the maintenance loop fails to open a connection, subsequent retries use exponential backoff starting at connectBackoffInitial, doubling up to connectBackoffMax. Set connectBackoffInitial = ZeroDuration to disable backoff and fall back to fixed-interval retries.

Raises ValueError if parameters are invalid.

proc isClosed(pool: PgPool): bool {....raises: [], tags: [], forbids: [].}
Whether the pool has been closed.
proc metrics(pool: PgPool): PoolMetrics {....raises: [], tags: [], forbids: [].}
Cumulative pool metrics.
proc newPool(config: PoolConfig): Future[PgPool] {....stackTrace: false,
    raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Create a new connection pool and establish minSize initial connections. Raises if any initial connection fails (all opened connections are closed on error).
proc notify(pool: PgPool; channel: string; payload: string = "";
            timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [
    Exception, ValueError, PgPoolError, CatchableError, PgError, PgQueryError,
    PgConnectionError, ProtocolError, SslError, KeyError, PgTimeoutError,
    AsyncTimeoutError], tags: [RootEffect, TimeEffect, WriteIOEffect],
    forbids: [].}
Send a NOTIFY on channel with optional payload using a pooled connection.
proc pendingAcquires(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of non-cancelled waiters queued for a connection.
proc poolConfig(pool: PgPool): PoolConfig {....raises: [], tags: [], forbids: [].}
The pool configuration.
proc query(pool: PgPool; sql: string; params: seq[PgParam] = @[];
           resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[
    QueryResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgError],
                   tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query with typed parameters using a pooled connection. When pipelined is enabled, the operation is batched with other concurrent calls and sent in a single TCP write.
proc queryColumn(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[seq[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNullError,
                                PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of all rows as strings. Raises PgNullError if any value is NULL.
proc queryEach(pool: PgPool; sql: string; params: seq[PgParam] = @[];
               callback: RowCallback; resultFormat: ResultFormat = rfAuto;
               timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}

Execute a query with typed parameters using a pooled connection, invoking callback once per row.

Row lifetime: the Row passed to callback is only valid for the duration of that single invocation. To retain a row beyond the callback, call row.clone() to get a detached copy.

proc queryExists(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[bool] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return whether any rows exist.
proc queryInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                        resultFormat: ResultFormat = rfAuto;
                        timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query inside a pipelined transaction with typed parameters.
proc queryRow(pool: PgPool; sql: string; params: seq[PgParam] = @[];
              resultFormat: ResultFormat = rfAuto;
              timeout: Duration = ZeroDuration): Future[Row] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNoRowsError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first row. Raises PgNoRowsError if no rows are returned.
proc queryRowOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                 resultFormat: ResultFormat = rfAuto;
                 timeout: Duration = ZeroDuration): Future[Option[Row]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first row, or none if no rows.
proc queryValue(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNoRowsError,
                                PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValue[T](pool: PgPool; _: typedesc[T]; sql: string;
                   params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[
    T] {....stackTrace: false.}
Execute a query and return the first column of the first row as T. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValueOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                   timeout: Duration = ZeroDuration): Future[Option[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns none if no rows or the value is NULL.
proc queryValueOpt[T](pool: PgPool; _: typedesc[T]; sql: string;
                      params: seq[PgParam] = @[];
                      timeout: Duration = ZeroDuration): Future[Option[T]] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns none if no rows or the value is NULL.
proc queryValueOrDefault(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                         default: string = ""; timeout: Duration = ZeroDuration): Future[
    string] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError,
    CatchableError, PgError, PgTypeError],
              tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; _: typedesc[T]; sql: string;
                            params: seq[PgParam] = @[]; default: T;
                            timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; sql: string;
                            params: seq[PgParam] = @[]; default: T;
                            timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T, inferring T from default. Returns default if no rows or the value is NULL.
proc release(conn: PgConnection) {....raises: [PgError, Exception, ValueError],
                                   tags: [RootEffect], forbids: [].}

Return a connection to its owning pool. If the connection is broken or in a transaction, it is closed instead; if waiters are queued, it is handed directly to the next waiter.

The owning pool is tracked on conn.ownerPool, set automatically when the connection is acquired from a PgPool (including pools inside a PgPoolCluster). For standalone connections created with connect this field is nil and calling release raises PgError — use conn.close() instead.

withConnection, withReadConnection, withWriteConnection, withPipeline, and withTransaction call this automatically; direct callers only need it when they manage acquire/release manually.

proc resetSession(pool: PgPool; conn: PgConnection): owned(Future[void]) {.
    ...stackTrace: false, raises: [Exception], tags: [RootEffect, TimeEffect],
    forbids: [].}

Execute the configured reset query on a connection before returning it to the pool. On failure, closes the connection so that release() will discard it.

Never propagates CatchableError: this is invoked from finally blocks in the with* helpers (and the per-call cleanup path of exec / query etc.), where a raised reset error would mask the body's original exception. Cleanup errors — including any raised from the close path's tracer hook — are swallowed.

proc simpleExec(pool: PgPool; sql: string; timeout: Duration = ZeroDuration): Future[
    CommandResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgError],
                     tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a side-effect SQL command via the simple query protocol using a pooled connection. See PgConnection.simpleExec for semantics — no parameters, no plan cache, last command tag returned.
proc simpleQuery(pool: PgPool; sql: string): Future[seq[QueryResult]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgConnectionError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute one or more SQL statements via the simple query protocol using a pooled connection. See PgConnection.simpleQuery for semantics — multi-statement, no parameters, no plan cache.
proc size(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Total number of connections (idle + active).

Macros

macro withTransaction(pool: PgPool; args: varargs[untyped]): untyped

Execute body inside a BEGIN/COMMIT transaction using a pooled connection. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.

Usage: pool.withTransaction(conn): conn.exec(...) pool.withTransaction(conn, seconds(5)): conn.exec(...) pool.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): conn.exec(...) pool.withTransaction(conn, opts, seconds(5)): conn.exec(...)

Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not pool.exec(...) / pool.query(...). Pool methods acquire a separate connection, so those statements would run outside this transaction.

Templates

template withConnection(pool: PgPool; conn, body: untyped)
Acquire a connection, execute body, then release it back to the pool. The connection is available as conn inside the body. If resetQuery is configured, session state is reset before release.
template withPipeline(pool: PgPool; pipeline, body: untyped)
Acquire a connection, create a Pipeline, execute body, then release. The pipeline identifier is a Pipeline available in body.