Types
AccessMode = enum amDefault, amReadWrite, amReadOnly
- PostgreSQL transaction access mode (read-write or read-only).
Cursor = ref object conn*: PgConnection fields*: seq[FieldDescription] exhausted*: bool
- A server-side portal for incremental row fetching via declareCursor/fetch.
DeferrableMode = enum dmDefault, dmDeferrable, dmNotDeferrable
- PostgreSQL transaction deferrable mode (for serializable read-only transactions).
IsolationLevel = enum ilDefault, ilReadCommitted, ilRepeatableRead, ilSerializable, ilReadUncommitted
- PostgreSQL transaction isolation level.
Pipeline = ref object
- Batch of queries/execs sent through the PostgreSQL pipeline protocol.
PipelineResult = object case kind*: PipelineResultKind of prkExec: commandResult*: CommandResult of prkQuery: queryResult*: QueryResult
- Result of a single operation within a pipeline.
PipelineResultKind = enum prkExec, prkQuery
- Discriminator for pipeline result variants.
PreparedStatement = object conn*: PgConnection name*: string fields*: seq[FieldDescription] paramOids*: seq[int32]
- A server-side prepared statement returned by prepare.
TransactionOptions = object isolation*: IsolationLevel access*: AccessMode deferrable*: DeferrableMode
- Options for BEGIN: isolation level, access mode, and deferrable mode.
Procs
proc addQuery(p: Pipeline; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto) {....raises: [], tags: [], forbids: [].}
- Add a query operation to the pipeline with typed parameters.
proc buildBeginSql(opts: TransactionOptions): string {....raises: [], tags: [], forbids: [].}
- Build a BEGIN SQL statement with the specified transaction options (isolation level, access mode, deferrable mode).
proc buildTxBeginAndTimeout(arg: NimNode): tuple[beginSql, txTimeout: NimNode] {. ...raises: [], tags: [], forbids: [].}
- Shared helper for withTransaction macros. Uses when ... is to dispatch on the argument type at compile time.
proc close(cursor: Cursor): Future[void] {....stackTrace: false, raises: [ Exception, ValueError, PgConnectionError, SslError, ProtocolError, PgQueryError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Close the cursor and return the connection to ready state. On timeout, the connection is marked csClosed (protocol out of sync).
proc close(stmt: PreparedStatement; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Close a prepared statement. On timeout, the connection is marked csClosed (protocol out of sync).
proc columnIndex(cursor: Cursor; name: string): int {....raises: [PgTypeError], tags: [], forbids: [].}
- Find the index of a column by name in a cursor.
proc columnIndex(stmt: PreparedStatement; name: string): int {. ...raises: [PgTypeError], tags: [], forbids: [].}
- Find the index of a column by name in a prepared statement.
proc copyIn(conn: PgConnection; sql: string; data: openArray[byte]; timeout: Duration = ZeroDuration): Future[CommandResult] {....raises: [ Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... FROM STDIN with a single contiguous buffer. Slices data into CopyData messages internally. Returns the command result (e.g. "COPY 5").
proc copyIn(conn: PgConnection; sql: string; data: seq[byte]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... FROM STDIN with a single contiguous seq[byte]. Avoids the copy that the openArray[byte] overload performs.
proc copyIn(conn: PgConnection; sql: string; data: seq[seq[byte]]; timeout: Duration = ZeroDuration): Future[CommandResult] {....raises: [ Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... FROM STDIN via simple query protocol. Concatenates chunks and delegates to the seq[byte] overload. Returns the command result (e.g. "COPY 5").
proc copyIn(conn: PgConnection; sql: string; data: string; timeout: Duration = ZeroDuration): Future[CommandResult] {....raises: [ Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... FROM STDIN with text data as a string. Converts to bytes internally; avoids manual toOpenArrayByte.
proc copyInStream(conn: PgConnection; sql: string; callback: CopyInCallback; timeout: Duration = ZeroDuration): Future[CopyInInfo] {. ...stackTrace: false, raises: [Exception, ValueError, CatchableError, PgConnectionError, SslError, ProtocolError, PgQueryError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... FROM STDIN via simple query protocol, streaming data from callback. The callback is called repeatedly; returning an empty seq[byte] signals EOF. If the callback raises, CopyFail is sent and the connection returns to csReady. On timeout, the connection is marked csClosed (protocol out of sync).
proc copyOut(conn: PgConnection; sql: string; timeout: Duration = ZeroDuration): Future[ CopyResult] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... TO STDOUT via simple query protocol. Collects all CopyData messages and returns them in a CopyResult. On timeout, the connection is marked csClosed (protocol out of sync).
proc copyOutStream(conn: PgConnection; sql: string; callback: CopyOutCallback; timeout: Duration = ZeroDuration): Future[CopyOutInfo] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, CatchableError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute COPY ... TO STDOUT via simple query protocol, streaming each CopyData chunk through callback. The callback is awaited, providing natural TCP backpressure. If the callback raises, the connection is marked csClosed (protocol cannot be resynchronized).
proc exec(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a statement with typed parameters. On timeout the connection is marked closed (protocol desync) and cannot be reused; pooled connections are discarded automatically.
proc execInTransaction(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; opts: TransactionOptions; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgTimeoutError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a statement inside a pipelined transaction with options.
proc execInTransaction(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[CommandResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgTimeoutError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a statement inside a pipelined transaction with typed parameters.
proc execute(p: Pipeline; timeout: Duration = ZeroDuration): Future[ seq[PipelineResult]] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, KeyError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute all queued pipeline operations in a single round trip. On timeout, the connection is marked csClosed (protocol out of sync).
proc execute(stmt: PreparedStatement; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[QueryResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, PgTypeError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a prepared statement with typed parameters.
proc fetchNext(cursor: Cursor): Future[seq[Row]] {....stackTrace: false, raises: [ Exception, ValueError, PgQueryError, SslError, ProtocolError, PgConnectionError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Fetch the next chunk of rows from the cursor. Returns an empty seq when the cursor is exhausted. On timeout, the connection is marked csClosed (protocol out of sync).
proc hasReturnStmt(n: NimNode): bool {....raises: [], tags: [], forbids: [].}
- Check whether an AST contains a return statement (excluding nested proc/func/method/iterator definitions where return is valid).
proc newPipeline(conn: PgConnection): Pipeline {....raises: [], tags: [], forbids: [].}
- Create a new pipeline for batching multiple operations into a single round trip.
proc notify(conn: PgConnection; channel: string; payload: string = ""; timeout: Duration = ZeroDuration): Future[void] {....stackTrace: false, raises: [ Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Send a NOTIFY on channel with optional payload. Uses NOTIFY for empty payloads, pg_notify() otherwise.
proc openCursor(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; chunkSize: int32 = 100; timeout: Duration = ZeroDuration): Future[Cursor] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Open a cursor with typed parameters.
proc prepare(conn: PgConnection; name: string; sql: string; timeout: Duration = ZeroDuration): Future[PreparedStatement] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Prepare a named statement, returning metadata. On timeout, the connection is marked csClosed (protocol out of sync).
proc query(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[ QueryResult] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query with typed parameters. On timeout the connection is marked closed (protocol desync) and cannot be reused; pooled connections are discarded automatically.
proc queryColumn(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[seq[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return the first column of all rows as strings. Raises PgTypeError if any value is NULL.
proc queryEach(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; callback: RowCallback; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, CatchableError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query with typed parameters, invoking callback once per row. Returns the number of rows processed.
proc queryExists(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[bool] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return whether any rows exist.
proc queryInTransaction(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; opts: TransactionOptions; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[QueryResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgTimeoutError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query inside a pipelined transaction with options.
proc queryInTransaction(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[QueryResult] {. ...stackTrace: false, raises: [Exception, ValueError, PgTimeoutError, PgConnectionError, SslError, ProtocolError, PgQueryError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query inside a pipelined transaction with typed parameters.
proc queryOne(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; resultFormat: ResultFormat = rfAuto; timeout: Duration = ZeroDuration): Future[Option[Row]] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return the first row, or none if no rows.
proc queryValue(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[string] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Raises PgError if no rows are returned or the value is NULL.
proc queryValue[T](conn: PgConnection; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[ T] {....stackTrace: false.}
- Execute a query and return the first column of the first row as T. Raises PgError if no rows are returned or the value is NULL. Supported types: int32, int64, float64, bool, string.
proc queryValueOpt(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[string]] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns none if no rows are returned or the value is NULL.
proc queryValueOpt[T](conn: PgConnection; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; timeout: Duration = ZeroDuration): Future[Option[T]] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns none if no rows are returned or the value is NULL. Supported types: int32, int64, float64, bool, string.
proc queryValueOrDefault(conn: PgConnection; sql: string; params: seq[PgParam] = @[]; default: string = ""; timeout: Duration = ZeroDuration): Future[string] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Execute a query and return the first column of the first row as a string. Returns default if no rows or the value is NULL.
proc queryValueOrDefault[T](conn: PgConnection; _: typedesc[T]; sql: string; params: seq[PgParam] = @[]; default: T; timeout: Duration = ZeroDuration): Future[T] {. ...stackTrace: false.}
- Execute a query and return the first column of the first row as T. Returns default if no rows or the value is NULL. Supported types: int32, int64, float64, bool, string.
Macros
macro execDirect(conn: PgConnection; sql: string; args: varargs[untyped]): untyped
-
Zero-allocation exec: encodes parameters directly into the send buffer at compile time, avoiding seq[PgParam] and intermediate seq[byte] allocs.
Usage: discard await conn.execDirect("UPDATE ... WHERE id = $1", myId)
macro queryDirect(conn: PgConnection; sql: string; args: varargs[untyped]): untyped
-
Zero-allocation query: encodes parameters directly into the send buffer at compile time, avoiding seq[PgParam] and intermediate seq[byte] allocs.
Usage: let qr = await conn.queryDirect("SELECT ... WHERE id = $1", myId)
macro withSavepoint(conn: PgConnection; args: varargs[untyped]): untyped
-
Execute body inside a SAVEPOINT. On exception, ROLLBACK TO SAVEPOINT is issued automatically. Using return inside the body is a compile-time error.
Usage: conn.withSavepoint: await conn.exec(...) conn.withSavepoint("my_sp"): await conn.exec(...) conn.withSavepoint(seconds(5)): await conn.exec(...) conn.withSavepoint("my_sp", seconds(5)): await conn.exec(...)
Note: The savepoint name must be a string literal, not a variable (the macro uses AST node kind to distinguish name from timeout).
macro withTransaction(conn: PgConnection; args: varargs[untyped]): untyped
-
Execute body inside a BEGIN/COMMIT transaction. On exception, ROLLBACK is issued automatically. Using return inside the body is a compile-time error.
Usage: conn.withTransaction: await conn.exec(...) conn.withTransaction(seconds(5)): await conn.exec(...) conn.withTransaction(TransactionOptions(isolation: ilSerializable)): await conn.exec(...) conn.withTransaction(TransactionOptions(...), seconds(5)): await conn.exec(...)
Templates
template withCursor(conn: PgConnection; sql: string; chunks: int32; cursorName, body: untyped; cursorTimeout: Duration = ZeroDuration)
- Open a cursor, execute body, then close the cursor automatically. The cursor is available as cursorName inside the body.