async_postgres/pg_pool_cluster

Search:
Group by:

Types

PgPoolCluster = ref object
Connection pool cluster with explicit read/write routing.
  • read* methods route to the replica pool (read-only queries).
  • write* methods route to the primary pool (writes, SELECT FOR UPDATE, etc.).

For transactions, use cluster.withTransaction.

ReplicaFallback = enum
  fallbackNone,             ## Error when replica is unavailable
  fallbackPrimary            ## Fall back to primary when replica is unavailable

Procs

proc close(cluster: PgPoolCluster; timeout = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Close both primary and replica pools.
proc fallbackTimeout(cluster: PgPoolCluster): Duration {....raises: [], tags: [],
    forbids: [].}
The configured fallback timeout for read operations. When fallbackPrimary is set and the replica acquire fails, this limits how long the fallback primary acquire may wait. ZeroDuration means the primary pool's own acquireTimeout is used as-is.
proc isClosed(cluster: PgPoolCluster): bool {....raises: [], tags: [], forbids: [].}
Whether the cluster has been closed.
proc newPoolCluster(primaryConfig: PoolConfig; replicaConfig: PoolConfig;
                    fallback = fallbackNone; fallbackTimeout = ZeroDuration): Future[
    PgPoolCluster] {....stackTrace: false,
                     raises: [Exception, ValueError, CatchableError],
                     tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
Create a new pool cluster with separate primary and replica pools. If connConfig.targetSessionAttrs is tsaAny (the default), it is automatically set to tsaReadWrite for primary and tsaPreferStandby for replica.
proc primaryPool(cluster: PgPoolCluster): PgPool {....raises: [], tags: [],
    forbids: [].}
The primary (read-write) pool.
proc readQuery(cluster: PgPoolCluster; sql: string; params: seq[PgParam] = @[];
               resultFormat: ResultFormat = rfAuto;
               timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryColumn(cluster: PgPoolCluster; sql: string;
                     params: seq[PgParam] = @[];
                     timeout: Duration = ZeroDuration): Future[seq[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgError, PgPoolError,
                                CatchableError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryEach(cluster: PgPoolCluster; sql: string;
                   params: seq[PgParam] = @[]; callback: RowCallback;
                   resultFormat: ResultFormat = rfAuto;
                   timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryExists(cluster: PgPoolCluster; sql: string;
                     params: seq[PgParam] = @[];
                     timeout: Duration = ZeroDuration): Future[bool] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryRow(cluster: PgPoolCluster; sql: string;
                  params: seq[PgParam] = @[];
                  resultFormat: ResultFormat = rfAuto;
                  timeout: Duration = ZeroDuration): Future[Row] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgError, PgPoolError,
                                CatchableError, PgNoRowsError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryRowOpt(cluster: PgPoolCluster; sql: string;
                     params: seq[PgParam] = @[];
                     resultFormat: ResultFormat = rfAuto;
                     timeout: Duration = ZeroDuration): Future[Option[Row]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryValue(cluster: PgPoolCluster; sql: string;
                    params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[
    string] {....stackTrace: false, raises: [Exception, ValueError, PgError,
    PgPoolError, CatchableError, PgNoRowsError, PgNullError, PgTypeError],
              tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryValue[T](cluster: PgPoolCluster; _: typedesc[T]; sql: string;
                       params: seq[PgParam] = @[];
                       timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
proc readQueryValueOpt(cluster: PgPoolCluster; sql: string;
                       params: seq[PgParam] = @[];
                       timeout: Duration = ZeroDuration): Future[Option[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgError, PgPoolError,
                                CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryValueOpt[T](cluster: PgPoolCluster; _: typedesc[T]; sql: string;
                          params: seq[PgParam] = @[];
                          timeout: Duration = ZeroDuration): Future[Option[T]] {.
    ...stackTrace: false.}
proc readQueryValueOrDefault(cluster: PgPoolCluster; sql: string;
                             params: seq[PgParam] = @[]; default: string = "";
                             timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgError, PgPoolError,
                                CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readQueryValueOrDefault[T](cluster: PgPoolCluster; _: typedesc[T];
                                sql: string; params: seq[PgParam] = @[];
                                default: T; timeout: Duration = ZeroDuration): Future[
    T] {....stackTrace: false.}
proc readQueryValueOrDefault[T](cluster: PgPoolCluster; sql: string;
                                params: seq[PgParam] = @[]; default: T;
                                timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
proc readSimpleExec(cluster: PgPoolCluster; sql: string;
                    timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgError, PgPoolError, CatchableError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc readSimpleQuery(cluster: PgPoolCluster; sql: string): Future[
    seq[QueryResult]] {....stackTrace: false, raises: [Exception, ValueError,
    PgError, PgPoolError, CatchableError, PgConnectionError],
                        tags: [RootEffect, TimeEffect, WriteIOEffect],
                        forbids: [].}
proc replicaFallback(cluster: PgPoolCluster): ReplicaFallback {....raises: [],
    tags: [], forbids: [].}
The configured replica fallback behavior.
proc replicaPool(cluster: PgPoolCluster): PgPool {....raises: [], tags: [],
    forbids: [].}
The replica (read-only) pool.
proc writeExec(cluster: PgPoolCluster; sql: string; params: seq[PgParam] = @[];
               timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeExecInTransaction(cluster: PgPoolCluster; sql: string;
                            params: seq[PgParam] = @[];
                            timeout: Duration = ZeroDuration): Future[
    CommandResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgError],
                     tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeNotify(cluster: PgPoolCluster; channel: string; payload: string = "";
                 timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgQueryError,
                                PgConnectionError, ProtocolError, SslError,
                                KeyError, PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQuery(cluster: PgPoolCluster; sql: string;
                params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto;
                timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryColumn(cluster: PgPoolCluster; sql: string;
                      params: seq[PgParam] = @[];
                      timeout: Duration = ZeroDuration): Future[seq[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNullError,
                                PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryEach(cluster: PgPoolCluster; sql: string;
                    params: seq[PgParam] = @[]; callback: RowCallback;
                    resultFormat: ResultFormat = rfAuto;
                    timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryExists(cluster: PgPoolCluster; sql: string;
                      params: seq[PgParam] = @[];
                      timeout: Duration = ZeroDuration): Future[bool] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryInTransaction(cluster: PgPoolCluster; sql: string;
                             params: seq[PgParam] = @[];
                             resultFormat: ResultFormat = rfAuto;
                             timeout: Duration = ZeroDuration): Future[
    QueryResult] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgError],
                   tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryRow(cluster: PgPoolCluster; sql: string;
                   params: seq[PgParam] = @[];
                   resultFormat: ResultFormat = rfAuto;
                   timeout: Duration = ZeroDuration): Future[Row] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNoRowsError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryRowOpt(cluster: PgPoolCluster; sql: string;
                      params: seq[PgParam] = @[];
                      resultFormat: ResultFormat = rfAuto;
                      timeout: Duration = ZeroDuration): Future[Option[Row]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryValue(cluster: PgPoolCluster; sql: string;
                     params: seq[PgParam] = @[];
                     timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgNoRowsError,
                                PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryValue[T](cluster: PgPoolCluster; _: typedesc[T]; sql: string;
                        params: seq[PgParam] = @[];
                        timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
proc writeQueryValueOpt(cluster: PgPoolCluster; sql: string;
                        params: seq[PgParam] = @[];
                        timeout: Duration = ZeroDuration): Future[Option[string]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryValueOpt[T](cluster: PgPoolCluster; _: typedesc[T]; sql: string;
                           params: seq[PgParam] = @[];
                           timeout: Duration = ZeroDuration): Future[Option[T]] {.
    ...stackTrace: false.}
proc writeQueryValueOrDefault(cluster: PgPoolCluster; sql: string;
                              params: seq[PgParam] = @[]; default: string = "";
                              timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgPoolError,
                                CatchableError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeQueryValueOrDefault[T](cluster: PgPoolCluster; _: typedesc[T];
                                 sql: string; params: seq[PgParam] = @[];
                                 default: T; timeout: Duration = ZeroDuration): Future[
    T] {....stackTrace: false.}
proc writeQueryValueOrDefault[T](cluster: PgPoolCluster; sql: string;
                                 params: seq[PgParam] = @[]; default: T;
                                 timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
proc writeSimpleExec(cluster: PgPoolCluster; sql: string;
                     timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, PgPoolError, CatchableError, PgError],
    tags: [RootEffect, TimeEffect, WriteIOEffect], forbids: [].}
proc writeSimpleQuery(cluster: PgPoolCluster; sql: string): Future[
    seq[QueryResult]] {....stackTrace: false, raises: [Exception, ValueError,
    PgPoolError, CatchableError, PgError, PgConnectionError],
                        tags: [RootEffect, TimeEffect, WriteIOEffect],
                        forbids: [].}

Macros

macro withTransaction(cluster: PgPoolCluster; args: varargs[untyped]): untyped

Execute body inside a BEGIN/COMMIT transaction on the primary pool. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.

Usage: cluster.withTransaction(conn): conn.exec(...) cluster.withTransaction(conn, seconds(5)): conn.exec(...) cluster.withTransaction(conn, TransactionOptions(isolation: ilSerializable)): conn.exec(...) cluster.withTransaction(conn, opts, seconds(5)): conn.exec(...)

Warning: Inside the body, use conn.exec(...) / conn.query(...) directly — not cluster.writeExec(...) / cluster.writeQuery(...). Cluster methods acquire a separate connection, so those statements would run outside this transaction.

Templates

template withPipeline(cluster: PgPoolCluster; pipeline, body: untyped)
Create a pipeline on the primary pool.
template withReadConnection(cluster: PgPoolCluster; conn, body: untyped)
Acquire a read connection (from replica, with optional primary fallback), execute body, then release.
template withWriteConnection(cluster: PgPoolCluster; conn, body: untyped)
Acquire a write connection from the primary pool, execute body, then release.