async_postgres/pg_client/pipeline

Pipelined batch execution of addExec/addQuery operations against the PostgreSQL extended-query protocol. Includes both the single-Sync execute variant and the per-op Sync executeIsolated (error-isolated) variant.

Types

IsolatedPipelineResults = object
  results*: seq[PipelineResult]
  errors*: seq[ref CatchableError] ## errors[i] is nil if ops[i] succeeded
Results from executeIsolated: per-op error isolation via per-query SYNC.
Pipeline = ref object
  autoReset*: bool ## When true, `execute`/`executeIsolated` call `reset()` in a `finally`
                   ## block so the Pipeline can be safely reused without leaking state from
                   ## the previous run. Default: false (backward-compatible).
Batch of queries/execs sent through the PostgreSQL pipeline protocol.
PipelineOp = object
PipelineOpKind = enum
  pokExec, pokQuery
PipelineResult = object
  case kind*: PipelineResultKind
  of prkExec:
    commandResult*: CommandResult
  of prkQuery:
    queryResult*: QueryResult
Result of a single operation within a pipeline.
PipelineResultKind = enum
  prkExec, prkQuery
Discriminator for pipeline result variants.

Procs

proc addExec(p: Pipeline; sql: string; params: openArray[PgParamInline]) {.
    ...raises: [], tags: [], forbids: [].}
Add an exec operation using the heap-alloc-free PgParamInline path.
proc addExec(p: Pipeline; sql: string; params: seq[PgParam] = @[]) {....raises: [],
    tags: [], forbids: [].}
Add an exec operation to the pipeline with typed parameters.
proc addQuery(p: Pipeline; sql: string; params: openArray[PgParamInline];
              resultFormat: ResultFormat = rfAuto) {....raises: [], tags: [],
    forbids: [].}
Add a query operation using the heap-alloc-free PgParamInline path.
proc addQuery(p: Pipeline; sql: string; params: seq[PgParam] = @[];
              resultFormat: ResultFormat = rfAuto) {....raises: [], tags: [],
    forbids: [].}
Add a query operation to the pipeline with typed parameters.
proc execute(p: Pipeline; timeout: Duration = ZeroDuration): Future[
    seq[PipelineResult]] {....stackTrace: false,
                           raises: [Exception, ValueError, CatchableError],
                           tags: [RootEffect, TimeEffect], forbids: [].}
Execute all queued pipeline operations in a single round trip. On timeout, the connection is marked csClosed (protocol out of sync). When p.autoReset is true, the pipeline is reset on exit (including on raise) so it can be safely reused.
proc executeIsolated(p: Pipeline; timeout: Duration = ZeroDuration): Future[
    IsolatedPipelineResults] {....stackTrace: false,
                               raises: [Exception, ValueError, CatchableError],
                               tags: [RootEffect, TimeEffect], forbids: [].}
Execute all queued pipeline operations with per-query error isolation. Each operation gets its own SYNC message, so a failed operation does not abort subsequent ones. Returns results and per-op errors. On timeout, the connection is marked csClosed (protocol out of sync). When p.autoReset is true, the pipeline is reset on exit (including on raise) so it can be safely reused.
proc newPipeline(conn: PgConnection; autoReset: bool = false): Pipeline {.
    ...raises: [], tags: [], forbids: [].}
Create a new pipeline for batching multiple operations into a single round trip. When autoReset is true, the pipeline's queued ops and inline buffers are cleared automatically after each execute/executeIsolated call, making it safe to reuse the same Pipeline instance.
proc reset(p: Pipeline) {....raises: [], tags: [], forbids: [].}
Clear all queued ops and inline SoA buffers. Safe to call at any time, including while the pipeline is empty. Does not affect the underlying connection or its statement cache. When p.autoReset is true, execute/executeIsolated call this automatically (including on raise), so manual calls are only needed when autoReset is false.