async_postgres/pg_client

Search:
Group by:

Query execution API.

Choosing between extended- and simple-protocol entry points

exec / query use the extended query protocol (Parse / Bind / Describe / Execute). They are the default choice for application queries:

  • Exactly one statement per call.
  • Typed parameters via seq[PgParam] or openArray[PgParamInline] — values are bound out-of-band, so no string escaping is required.
  • Reuses server-side prepared statements across calls with identical SQL text (bounded by stmtCacheCapacity); the statement is parsed once and rebound on subsequent calls.
  • Result rows may use the binary wire format when resultFormat = rfBinary is passed, or on paths that build per-column format codes via buildResultFormats. The default rfAuto returns text rows.

simpleExec / simpleQuery use the simple query protocol (a single Query message, text-only rows). Prefer them only when the extended protocol cannot express what you need:

  • No parameters. The SQL string is sent verbatim — only use with trusted input, or quote identifiers/literals yourself (e.g. via quoteIdentifier).
  • No prepared statement reuse. Each call re-parses on the server; appropriate for one-off session commands (BEGIN, SET, VACUUM …) where a cached statement would be wasted. For LISTEN / UNLISTEN / NOTIFY prefer the dedicated listen, unlisten, and notify helpers — they quote the channel name for you.
  • simpleQuery accepts multiple ;-separated statements and returns one QueryResult per statement — the one case the extended protocol cannot cover in a single round trip.
  • simpleExec expects a side-effect command; the returned tag is the last CommandComplete seen, so multi-statement input is accepted but per-statement results are not surfaced — use simpleQuery when you need them.

Quick reference

APIProtocolMulti-stmtParametersPlan cache
query / execextendednoyesyes
simpleQuerysimpleyesnono
simpleExecsimplelast-winsnono

Timeout behaviour is shared by all four: when a timeout is exceeded the connection is marked csClosed (the protocol may be mid-exchange) and a pooled connection is discarded on release.

Types

AccessMode = enum
  amDefault, amReadWrite, amReadOnly
PostgreSQL transaction access mode (read-write or read-only).
Cursor = ref object
  conn*: PgConnection
  fields*: seq[FieldDescription]
  exhausted*: bool
A server-side portal for incremental row fetching via declareCursor/fetch.
DeferrableMode = enum
  dmDefault, dmDeferrable, dmNotDeferrable
PostgreSQL transaction deferrable mode (for serializable read-only transactions).
IsolatedPipelineResults = object
  results*: seq[PipelineResult]
  errors*: seq[ref CatchableError] ## errors[i] is nil if ops[i] succeeded
Results from executeIsolated: per-op error isolation via per-query SYNC.
IsolationLevel = enum
  ilDefault, ilReadCommitted, ilRepeatableRead, ilSerializable,
  ilReadUncommitted
PostgreSQL transaction isolation level.
Pipeline = ref object
  autoReset*: bool ## When true, `execute`/`executeIsolated` call `reset()` in a `finally`
                   ## block so the Pipeline can be safely reused without leaking state from
                   ## the previous run. Default: false (backward-compatible).
Batch of queries/execs sent through the PostgreSQL pipeline protocol.
PipelineResult = object
  case kind*: PipelineResultKind
  of prkExec:
    commandResult*: CommandResult
  of prkQuery:
    queryResult*: QueryResult
Result of a single operation within a pipeline.
PipelineResultKind = enum
  prkExec, prkQuery
Discriminator for pipeline result variants.
PreparedStatement = object
  conn*: PgConnection
  name*: string
  sql*: string
  fields*: seq[FieldDescription]
  paramOids*: seq[int32]
A server-side prepared statement returned by prepare.
TransactionOptions = object
  isolation*: IsolationLevel
  access*: AccessMode
  deferrable*: DeferrableMode
Options for BEGIN: isolation level, access mode, and deferrable mode.

Procs

proc addExec(p: Pipeline; sql: string; params: openArray[PgParamInline]) {.
    ...raises: [], tags: [], forbids: [].}
Add an exec operation using the heap-alloc-free PgParamInline path.
proc addExec(p: Pipeline; sql: string; params: seq[PgParam] = @[]) {....raises: [],
    tags: [], forbids: [].}
Add an exec operation to the pipeline with typed parameters.
proc addQuery(p: Pipeline; sql: string; params: openArray[PgParamInline];
              resultFormat: ResultFormat = rfAuto) {....raises: [], tags: [],
    forbids: [].}
Add a query operation using the heap-alloc-free PgParamInline path.
proc addQuery(p: Pipeline; sql: string; params: seq[PgParam] = @[];
              resultFormat: ResultFormat = rfAuto) {....raises: [], tags: [],
    forbids: [].}
Add a query operation to the pipeline with typed parameters.
proc buildBeginSql(opts: TransactionOptions): string {....raises: [], tags: [],
    forbids: [].}
Build a BEGIN SQL statement with the specified transaction options (isolation level, access mode, deferrable mode).
proc buildTxBeginAndTimeout(arg: NimNode): tuple[beginSql, txTimeout: NimNode] {.
    ...raises: [], tags: [], forbids: [].}
Shared helper for withTransaction macros. Uses when ... is to dispatch on the argument type at compile time.
proc close(cursor: Cursor): Future[void] {....stackTrace: false, raises: [
    Exception, ValueError, PgConnectionError, ProtocolError, SslError,
    PgQueryError, PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Close the cursor and return the connection to ready state. On timeout, the connection is marked csClosed (protocol out of sync).
proc close(stmt: PreparedStatement; timeout: Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                        PgConnectionError, ProtocolError,
                                        SslError, PgTimeoutError,
                                        AsyncTimeoutError],
            tags: [RootEffect, TimeEffect], forbids: [].}
Close a prepared statement. On timeout, the connection is marked csClosed (protocol out of sync).
proc columnIndex(cursor: Cursor; name: string): int {....raises: [PgTypeError],
    tags: [], forbids: [].}
Find the index of a column by name in a cursor.
proc columnIndex(stmt: PreparedStatement; name: string): int {.
    ...raises: [PgTypeError], tags: [], forbids: [].}
Find the index of a column by name in a prepared statement.
proc copyIn(conn: PgConnection; sql: string; data: openArray[byte];
            timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... FROM STDIN with a single contiguous buffer. Slices data into CopyData messages internally. Returns the command result (e.g. "COPY 5").
proc copyIn(conn: PgConnection; sql: string; data: seq[byte];
            timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... FROM STDIN with a single contiguous seq[byte]. Avoids the copy that the openArray[byte] overload performs.
proc copyIn(conn: PgConnection; sql: string; data: seq[seq[byte]];
            timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... FROM STDIN via simple query protocol. Concatenates chunks and delegates to the seq[byte] overload. Returns the command result (e.g. "COPY 5").
proc copyIn(conn: PgConnection; sql: string; data: string;
            timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... FROM STDIN with text data as a string. Converts to bytes internally; avoids manual toOpenArrayByte.
proc copyInStream(conn: PgConnection; sql: string; callback: CopyInCallback;
                  timeout: Duration = ZeroDuration): Future[CopyInInfo] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... FROM STDIN via simple query protocol, streaming data from callback. The callback is called repeatedly; returning an empty seq[byte] signals EOF. If the callback raises, CopyFail is sent and the connection returns to csReady. On timeout, the connection is marked csClosed (protocol out of sync).
proc copyOut(conn: PgConnection; sql: string; timeout: Duration = ZeroDuration): Future[
    CopyResult] {....stackTrace: false,
                  raises: [Exception, ValueError, CatchableError],
                  tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... TO STDOUT via simple query protocol. Collects all CopyData messages and returns them in a CopyResult. On timeout, the connection is marked csClosed (protocol out of sync).
proc copyOutStream(conn: PgConnection; sql: string; callback: CopyOutCallback;
                   timeout: Duration = ZeroDuration): Future[CopyOutInfo] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute COPY ... TO STDOUT via simple query protocol, streaming each CopyData chunk through callback. The callback is awaited, providing natural TCP backpressure. If the callback raises, the connection is marked csClosed (protocol cannot be resynchronized).
proc exec(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
          timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Execute a statement with typed parameters via the extended query protocol.

Single statement only; the plan is cached per-connection. Use simpleExec for parameter-less session commands (BEGIN, SET, VACUUM, LISTEN …) or simpleQuery when you need multi-statement execution in one round trip.

On timeout the connection is marked closed (protocol desync) and cannot be reused; pooled connections are discarded automatically.

proc exec(conn: PgConnection; sql: string; params: seq[PgParamInline];
          timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a statement with heap-alloc-free inline parameters. Prefer this overload for scalar-heavy workloads (e.g. bulk INSERT of numeric columns) where seq[PgParam] would heap-allocate per parameter.
proc execInTransaction(conn: PgConnection; sql: string;
                       params: seq[PgParam] = @[]; opts: TransactionOptions;
                       timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a statement inside a pipelined transaction with options.
proc execInTransaction(conn: PgConnection; sql: string;
                       params: seq[PgParam] = @[];
                       timeout: Duration = ZeroDuration): Future[CommandResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a statement inside a pipelined transaction with typed parameters.
proc execute(p: Pipeline; timeout: Duration = ZeroDuration): Future[
    seq[PipelineResult]] {....stackTrace: false,
                           raises: [Exception, ValueError, CatchableError],
                           tags: [RootEffect, TimeEffect], forbids: [].}
Execute all queued pipeline operations in a single round trip. On timeout, the connection is marked csClosed (protocol out of sync). When p.autoReset is true, the pipeline is reset on exit (including on raise) so it can be safely reused.
proc execute(stmt: PreparedStatement; params: seq[PgParam] = @[];
             resultFormat: ResultFormat = rfAuto;
             timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a prepared statement with typed parameters.
proc executeIsolated(p: Pipeline; timeout: Duration = ZeroDuration): Future[
    IsolatedPipelineResults] {....stackTrace: false,
                               raises: [Exception, ValueError, CatchableError],
                               tags: [RootEffect, TimeEffect], forbids: [].}
Execute all queued pipeline operations with per-query error isolation. Each operation gets its own SYNC message, so a failed operation does not abort subsequent ones. Returns results and per-op errors. On timeout, the connection is marked csClosed (protocol out of sync). When p.autoReset is true, the pipeline is reset on exit (including on raise) so it can be safely reused.
proc fetchNext(cursor: Cursor): Future[seq[Row]] {....stackTrace: false, raises: [
    Exception, ValueError, PgQueryError, ProtocolError, SslError,
    PgConnectionError, PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Fetch the next chunk of rows from the cursor. Returns an empty seq when the cursor is exhausted. On timeout, the connection is marked csClosed (protocol out of sync).
proc hasReturnStmt(n: NimNode): bool {....raises: [], tags: [], forbids: [].}
Check whether an AST contains a return statement (excluding nested proc/func/method/iterator definitions where return is valid).
proc newPipeline(conn: PgConnection; autoReset: bool = false): Pipeline {.
    ...raises: [], tags: [], forbids: [].}
Create a new pipeline for batching multiple operations into a single round trip. When autoReset is true, the pipeline's queued ops and inline buffers are cleared automatically after each execute/executeIsolated call, making it safe to reuse the same Pipeline instance.
proc notify(conn: PgConnection; channel: string; payload: string = "";
            timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [
    Exception, ValueError, PgQueryError, PgConnectionError, ProtocolError,
    SslError, KeyError, PgTimeoutError, AsyncTimeoutError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Send a NOTIFY on channel with optional payload. Uses NOTIFY for empty payloads, pg_notify() otherwise.
proc openCursor(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                resultFormat: ResultFormat = rfAuto; chunkSize: int32 = 100;
                timeout: Duration = ZeroDuration): Future[Cursor] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, ProtocolError, SslError,
                                PgTimeoutError, AsyncTimeoutError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Open a cursor with typed parameters.
proc prepare(conn: PgConnection; name: string; sql: string;
             timeout: Duration = ZeroDuration): Future[PreparedStatement] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Prepare a named statement, returning metadata. On timeout, the connection is marked csClosed (protocol out of sync).
proc query(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
           resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[
    QueryResult] {....stackTrace: false,
                   raises: [Exception, ValueError, CatchableError],
                   tags: [RootEffect, TimeEffect], forbids: [].}

Execute a query with typed parameters via the extended query protocol.

Single statement only; the plan is cached per-connection. Use simpleQuery when you need multiple ;-separated statements to run in one round trip (no parameters, text-only rows).

On timeout the connection is marked closed (protocol desync) and cannot be reused; pooled connections are discarded automatically.

proc query(conn: PgConnection; sql: string; params: seq[PgParamInline];
           resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[
    QueryResult] {....stackTrace: false,
                   raises: [Exception, ValueError, CatchableError],
                   tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query with heap-alloc-free inline parameters. Prefer this overload for scalar-heavy workloads where seq[PgParam] would heap-allocate per parameter.
proc queryColumn(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[seq[string]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first column of all rows as strings. Raises PgNullError if any value is NULL.
proc queryEach(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
               callback: RowCallback; resultFormat: ResultFormat = rfAuto;
               timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Execute a query with typed parameters, invoking callback once per row. Returns the number of rows processed.

The Row passed to callback is only valid for the duration of that single invocation: its backing buffer is reused for the next row as soon as the callback returns. To retain a row beyond the callback, call row.clone() to get a detached copy, or extract the column values you need into your own types before returning.

proc queryExists(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                 timeout: Duration = ZeroDuration): Future[bool] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return whether any rows exist.
proc queryInTransaction(conn: PgConnection; sql: string;
                        params: seq[PgParam] = @[]; opts: TransactionOptions;
                        resultFormat: ResultFormat = rfAuto;
                        timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query inside a pipelined transaction with options.
proc queryInTransaction(conn: PgConnection; sql: string;
                        params: seq[PgParam] = @[];
                        resultFormat: ResultFormat = rfAuto;
                        timeout: Duration = ZeroDuration): Future[QueryResult] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query inside a pipelined transaction with typed parameters.
proc queryRow(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
              resultFormat: ResultFormat = rfAuto;
              timeout: Duration = ZeroDuration): Future[Row] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgNoRowsError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first row. Raises PgNoRowsError if no rows are returned.
proc queryRowOpt(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                 resultFormat: ResultFormat = rfAuto;
                 timeout: Duration = ZeroDuration): Future[Option[Row]] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first row, or none if no rows.
proc queryValue(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL.
proc queryValue[T](conn: PgConnection; _: typedesc[T]; sql: string;
                   params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[
    T] {....stackTrace: false.}
Execute a query and return the first column of the first row as T. Raises PgNoRowsError if no rows are returned, or PgNullError if the value is NULL. Supported types: int32, int64, float64, bool, string.
proc queryValueOpt(conn: PgConnection; sql: string; params: seq[PgParam] = @[];
                   timeout: Duration = ZeroDuration): Future[Option[string]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns none if no rows are returned or the value is NULL.
proc queryValueOpt[T](conn: PgConnection; _: typedesc[T]; sql: string;
                      params: seq[PgParam] = @[];
                      timeout: Duration = ZeroDuration): Future[Option[T]] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns none if no rows are returned or the value is NULL. Supported types: int32, int64, float64, bool, string.
proc queryValueOrDefault(conn: PgConnection; sql: string;
                         params: seq[PgParam] = @[]; default: string = "";
                         timeout: Duration = ZeroDuration): Future[string] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](conn: PgConnection; _: typedesc[T]; sql: string;
                            params: seq[PgParam] = @[]; default: T;
                            timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL. Supported types: int32, int64, float64, bool, string.
proc queryValueOrDefault[T](conn: PgConnection; sql: string;
                            params: seq[PgParam] = @[]; default: T;
                            timeout: Duration = ZeroDuration): Future[T] {.
    ...stackTrace: false.}
Execute a query and return the first column of the first row as T, inferring T from default. Returns default if no rows or the value is NULL. Supported types: int32, int64, float64, bool, string.
proc reset(p: Pipeline) {....raises: [], tags: [], forbids: [].}
Clear all queued ops and inline SoA buffers. Safe to call at any time, including while the pipeline is empty. Does not affect the underlying connection or its statement cache. When p.autoReset is true, execute/executeIsolated call this automatically (including on raise), so manual calls are only needed when autoReset is false.

Macros

macro execDirect(conn: PgConnection; sql: string; args: varargs[untyped]): untyped

Zero-allocation exec: encodes parameters directly into the send buffer at compile time, avoiding seq[PgParam] and intermediate seq[byte] allocs.

Usage: discard await conn.execDirect("UPDATE ... WHERE id = $1", myId)

macro queryDirect(conn: PgConnection; sql: string; args: varargs[untyped]): untyped

Zero-allocation query: encodes parameters directly into the send buffer at compile time, avoiding seq[PgParam] and intermediate seq[byte] allocs.

Usage: let qr = await conn.queryDirect("SELECT ... WHERE id = $1", myId)

macro withSavepoint(conn: PgConnection; args: varargs[untyped]): untyped

Execute body inside a SAVEPOINT. On exception, ROLLBACK TO SAVEPOINT is issued automatically. Using return inside the body is a compile-time error.

Usage: conn.withSavepoint: await conn.exec(...) conn.withSavepoint("my_sp"): await conn.exec(...) conn.withSavepoint(seconds(5)): await conn.exec(...) conn.withSavepoint("my_sp", seconds(5)): await conn.exec(...)

Note: The savepoint name must be a string literal, not a variable (the macro uses AST node kind to distinguish name from timeout).

macro withTransaction(conn: PgConnection; args: varargs[untyped]): untyped

Execute body inside a BEGIN/COMMIT transaction. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.

Usage: conn.withTransaction: await conn.exec(...) conn.withTransaction(seconds(5)): await conn.exec(...) conn.withTransaction(TransactionOptions(isolation: ilSerializable)): await conn.exec(...) conn.withTransaction(TransactionOptions(...), seconds(5)): await conn.exec(...)

Templates

template withCursor(conn: PgConnection; sql: string; chunks: int32;
                    cursorName, body: untyped;
                    cursorTimeout: Duration = ZeroDuration)
Open a cursor, execute body, then close the cursor automatically. The cursor is available as cursorName inside the body.