async_postgres/pg_pool

Search:
Group by:

Types

PgPool = ref object
Connection pool that manages a set of PostgreSQL connections.
PoolConfig = object
  connConfig*: ConnConfig
  minSize*: int              ## Minimum idle connections (default 1)
  maxSize*: int              ## Maximum total connections (default 10)
  idleTimeout*: Duration     ## Close idle connections after this duration (default 10min, ZeroDuration=disabled)
  maxLifetime*: Duration     ## Max connection lifetime (default 1hr, ZeroDuration=disabled)
  maintenanceInterval*: Duration ## Maintenance loop interval (default 30s)
  healthCheckTimeout*: Duration ## Ping idle connections older than this before returning (default 5s, ZeroDuration=disabled)
  pingTimeout*: Duration     ## Max time to wait for a health check ping response (default 5s, ZeroDuration=no timeout)
  acquireTimeout*: Duration  ## Max time to wait for an available connection (default 30s, ZeroDuration=no timeout)
  maxWaiters*: int = -1      ## Max queued acquire waiters (default -1=unlimited, 0=no waiting). Rejects with PgPoolError when full.
  resetQuery*: string ## SQL to execute when returning a connection to the pool (default ""=disabled).
                      ## Common values: "DISCARD ALL" (full reset, recommended for PgBouncer),
                      ## "DEALLOCATE ALL" (clear prepared statements only),
                      ## "RESET ALL" (reset session parameters only).
                      ## On failure, the connection is discarded.
Configuration for the connection pool. Create via initPoolConfig.
PoolMetrics = object
  acquireCount*: int64       ## Total successful acquires
  acquireDuration*: Duration ## Total time spent waiting in acquire
  timeoutCount*: int64       ## Number of acquire timeouts
  createCount*: int64        ## Number of new connections created
  closeCount*: int64         ## Number of connections closed/discarded
Cumulative pool statistics.

Procs

proc acquire(pool: PgPool): Future[PgConnection] {....stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Acquire a connection from the pool. Tries idle connections first (with health checks), creates a new one if under maxSize, or waits for a release. Raises PgPoolError on timeout or if the pool is closed.
proc activeCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of connections currently checked out from the pool.
proc close(pool: PgPool; timeout = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Close the pool: stop the maintenance loop, cancel all waiters, and close all idle and active connections.

When timeout > ZeroDuration, waits up to timeout for active connections to be released. Unreleased connections are closed when they are eventually returned to the pool. Without a timeout (or ZeroDuration), active connections are closed on release.

proc exec(pool: PgPool; sql: string; params: seq[PgParam] = @[];
          timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a statement with typed parameters using a pooled connection.
proc execInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                       timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgTimeoutError,
                                PgConnectionError, SslError, ProtocolError,
                                PgQueryError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a statement inside a pipelined transaction with typed parameters.
proc idleCount(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of idle connections currently in the pool.
proc initPoolConfig(connConfig: ConnConfig; minSize = 1; maxSize = 10;
                    idleTimeout = minutes(10); maxLifetime = hours(1);
                    maintenanceInterval = seconds(30);
                    healthCheckTimeout = seconds(5); pingTimeout = seconds(5);
                    acquireTimeout = seconds(30); maxWaiters = -1;
                    resetQuery = ""): PoolConfig {....raises: [ValueError],
    tags: [], forbids: [].}

Create a pool configuration with sensible defaults. minSize idle connections are maintained; up to maxSize total. Set resetQuery to clean session state on release (e.g. "DISCARD ALL" for PgBouncer).

Raises ValueError if parameters are invalid.

proc isClosed(pool: PgPool): bool {....raises: [], tags: [], forbids: [].}
Whether the pool has been closed.
proc metrics(pool: PgPool): PoolMetrics {....raises: [], tags: [], forbids: [].}
Cumulative pool metrics.
proc newPool(config: PoolConfig): Future[PgPool] {....stackTrace: false,
    raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Create a new connection pool and establish minSize initial connections. Raises if any initial connection fails (all opened connections are closed on error).
proc notify(pool: PgPool; channel: string; payload: string = "";
            timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [
    Exception, ValueError, PgPoolError, CatchableError, PgQueryError,
    PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError,
    AsyncTimeoutError], tags: [RootEffect, TimeEffect, WriteIOEffect],
    forbids: [].}
Send a NOTIFY on channel with optional payload using a pooled connection.
proc pendingAcquires(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Number of non-cancelled waiters queued for a connection.
proc poolConfig(pool: PgPool): PoolConfig {....raises: [], tags: [], forbids: [].}
The pool configuration.
proc query(pool: PgPool; sql: string; params: seq[PgParam] = @[];
           resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[
    QueryResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgQueryError, PgConnectionError, SslError,
    KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError],
                   tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query with typed parameters using a pooled connection.
proc queryColumn(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[seq[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of all rows as strings.
proc queryEach(pool: PgPool; sql: string; params: seq[PgParam] = @[];
               callback: RowCallback; resultFormat: ResultFormat = rfAuto;
               timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query with typed parameters using a pooled connection, invoking callback once per row.
proc queryExists(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[bool] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return whether any rows exist.
proc queryInTransaction(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                        resultFormat: ResultFormat = rfAuto;
                        timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgTimeoutError,
                                PgConnectionError, SslError, ProtocolError,
                                PgQueryError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query inside a pipelined transaction with typed parameters.
proc queryOne(pool: PgPool; sql: string; params: seq[PgParam] = @[];
              resultFormat: ResultFormat = rfAuto;
              timeout: Duration = ZeroDuration): Future[Option[Row]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first row, or none if no rows.
proc queryValue(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError, PgError,
                                PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Raises PgError if no rows or the value is NULL.
proc queryValue[T](pool: PgPool; _: typedesc[T]; sql: string;
                   params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[
    T] {....stackTrace: false.}
Execute a query and return the first column of the first row as T. Raises PgError if no rows or the value is NULL.
proc queryValueOpt(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                   timeout: Duration = ZeroDuration): Future[Option[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgQueryError, PgConnectionError,
                                SslError, KeyError, ProtocolError,
                                PgTimeoutError, AsyncTimeoutError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns none if no rows or the value is NULL.
proc queryValueOpt[T](pool: PgPool; _: typedesc[T]; sql: string;
                      params: seq[PgParam] = @[];
                      timeout: Duration = ZeroDuration): Future[Option[T]] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns none if no rows or the value is NULL.
proc queryValueOrDefault(pool: PgPool; sql: string; params: seq[PgParam] = @[];
                         default: string = ""; timeout: Duration = ZeroDuration): Future[
    string] {....stackTrace: false, raises: [Exception, ValueError, PgPoolError,
    CatchableError, PgQueryError, PgConnectionError, SslError, KeyError,
    ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError],
              tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](pool: PgPool; _: typedesc[T]; sql: string;
                            params: seq[PgParam] = @[]; default: T;
                            timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL.
proc release(pool: PgPool; conn: PgConnection) {.
    ...raises: [Exception, ValueError], tags: [RootEffect], forbids: [].}
Return a connection to the pool. If the connection is broken or in a transaction, it is closed instead. If waiters are queued, the connection is handed directly to the next waiter.
proc resetSession(pool: PgPool; conn: PgConnection): owned(Future[void]) {.
    ...stackTrace: false, raises: [Exception], tags: [RootEffect, TimeEffect],
    forbids: [].}
Execute the configured reset query on a connection before returning it to the pool. On failure, closes the connection so that release() will discard it.
proc simpleExec(pool: PgPool; sql: string; timeout: Duration = ZeroDuration): Future[
    CommandResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgQueryError, PgConnectionError, SslError,
    ProtocolError, PgTimeoutError, AsyncTimeoutError],
                     tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute a SQL statement via simple query protocol using a pooled connection. Returns the command result.
proc simpleQuery(pool: PgPool; sql: string): Future[seq[QueryResult]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgConnectionError, SslError,
                                ProtocolError, PgQueryError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Execute one or more SQL statements via simple query protocol using a pooled connection.
proc size(pool: PgPool): int {....raises: [], tags: [], forbids: [].}
Total number of connections (idle + active).

Macros

macro withTransaction(pool: PgPool; args: varargs[untyped]): untyped

Execute body inside a BEGIN/COMMIT transaction using a pooled connection. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.

Usage: pool.withTransaction(conn): conn.exec(...) pool.withTransaction(conn, seconds(5)): conn.exec(...) pool.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): conn.exec(...) pool.withTransaction(conn, opts, seconds(5)): conn.exec(...)

Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not pool.exec(...) / pool.query(...). Pool methods acquire a separate connection, so those statements would run outside this transaction.

Templates

template withConnection(pool: PgPool; conn, body: untyped)
Acquire a connection, execute body, then release it back to the pool. The connection is available as conn inside the body. If resetQuery is configured, session state is reset before release.
template withPipeline(pool: PgPool; pipeline, body: untyped)
Acquire a connection, create a Pipeline, execute body, then release. The pipeline identifier is a Pipeline available in body.