async_postgres/pg_largeobject

Search:
Group by:

PostgreSQL Large Object API

Provides an async interface to PostgreSQL's Large Object facility for storing and streaming binary data larger than what fits comfortably in a bytea column.

All Large Object operations must be performed inside a transaction (use withTransaction).

Example

conn.withTransaction:
  let oid = await conn.loCreate()
  conn.withLargeObject(lo, oid, INV_READWRITE):
    let written = await lo.loWrite(data)
    await lo.loSeek(0, SEEK_SET)
    let readBack = await lo.loReadAll()

Types

LargeObject = object
  conn*: PgConnection
  fd*: int32
  oid*: Oid
LoReadCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
LoWriteCallback = proc (): Future[seq[byte]] {....gcsafe.}
Oid = uint32

Consts

INV_READ = 0x00040000'i32
INV_READWRITE = 393216'i32
INV_WRITE = 0x00020000'i32
loDefaultChunkSize = 262144
256KB
SEEK_CUR = 1'i32
SEEK_END = 2'i32
SEEK_SET = 0'i32

Procs

proc loClose(lo: LargeObject; timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Close an open Large Object handle.
proc loCreate(conn: PgConnection; requestedOid: Oid = 0;
              timeout: Duration = ZeroDuration): Future[Oid] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Create a new Large Object, returning its OID. Pass requestedOid = 0 to let the server assign an OID.
proc loExport(conn: PgConnection; oid: Oid; filename: string;
              timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Export a Large Object to a server-side file.
proc loImport(conn: PgConnection; filename: string;
              timeout: Duration = ZeroDuration): Future[Oid] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Import a server-side file into a new Large Object, returning its OID.
proc loOpen(conn: PgConnection; oid: Oid; mode: int32 = INV_READWRITE;
            timeout: Duration = ZeroDuration): Future[LargeObject] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Open a Large Object for reading/writing. Returns a LargeObject handle.
proc loRead(lo: LargeObject; length: int32; timeout: Duration = ZeroDuration): Future[
    seq[byte]] {....stackTrace: false,
                 raises: [Exception, ValueError, CatchableError, PgTypeError],
                 tags: [RootEffect, TimeEffect], forbids: [].}
Read up to length bytes from the current position. Returns the bytes read (may be fewer than length at EOF).
proc loReadAll(lo: LargeObject; chunkSize: int32 = loDefaultChunkSize;
               timeout: Duration = ZeroDuration): Future[seq[byte]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Read the entire Large Object from the current position to EOF.

Timeout semantics: timeout applies per chunk. Total wall-clock can reach N × timeout for N chunks. Use loReadAllDeadline for a single wall-clock deadline covering the whole read.

proc loReadAllDeadline(lo: LargeObject; deadline: Duration;
                       chunkSize: int32 = loDefaultChunkSize): Future[seq[byte]] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Like loReadAll but deadline bounds total wall-clock across all chunks. See the "Best-effort" note at the top of the Deadline-bounded API section.
proc loReadStream(lo: LargeObject; callback: LoReadCallback;
                  chunkSize: int32 = loDefaultChunkSize;
                  timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Read the Large Object in chunks, calling callback for each chunk.

Timeout semantics: timeout applies per chunk. Total wall-clock can reach N × timeout plus the cumulative callback runtime. Use loReadStreamDeadline for a single wall-clock deadline.

proc loReadStreamDeadline(lo: LargeObject; callback: LoReadCallback;
                          deadline: Duration;
                          chunkSize: int32 = loDefaultChunkSize): Future[void] {.
    ...stackTrace: false,
    raises: [Exception, ValueError, CatchableError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Like loReadStream but deadline bounds total wall-clock across all reads. Callback time is included in the deadline. See the "Best-effort" note at the top of the Deadline-bounded API section.
proc loSeek(lo: LargeObject; offset: int64; whence: int32 = SEEK_SET;
            timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Seek to a position. Returns the new absolute position.
proc loSize(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Return the total size of the Large Object in bytes.

Timeout semantics: timeout applies per call to each of the 3 internal lo_tell/lo_seek operations. Total wall-clock can reach 3 × timeout. Use loSizeDeadline for a single wall-clock deadline covering the operation as a whole.

Implementation: uses loSeek(0, SEEK_END)'s return value (the new absolute position) as the size directly, saving one round-trip over an equivalent loSeek + loTell pair.

proc loSizeDeadline(lo: LargeObject; deadline: Duration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Like loSize but deadline bounds total wall-clock across the 3 internal lo_tell/lo_seek operations (instead of 3 × per-call timeout). See the "Best-effort" note at the top of the Deadline-bounded API section.

Caveat: if the deadline is exhausted before the final loSeek(savedPos, SEEK_SET), that restore call will raise PgTimeoutError and the Large Object's file position will be left at the end of the object (from the internal SEEK_END). Subsequent reads via the same LargeObject handle will therefore return no data until the caller reseeks. The connection is also invalidated on timeout, so in practice the handle is unusable anyway.

proc loTell(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Return the current read/write position.
proc loTruncate(lo: LargeObject; length: int64; timeout: Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                        PgNoRowsError, PgNullError, PgTypeError],
            tags: [RootEffect, TimeEffect], forbids: [].}
Truncate the Large Object to length bytes.
proc loWrite(lo: LargeObject; data: seq[byte]; timeout: Duration = ZeroDuration): Future[
    int32] {....stackTrace: false, raises: [Exception, ValueError, CatchableError,
    PgNoRowsError, PgNullError, PgTypeError], tags: [RootEffect, TimeEffect],
             forbids: [].}
Write data at the current position. Returns the number of bytes written.
proc loWriteAll(lo: LargeObject; data: seq[byte];
                chunkSize: int = loDefaultChunkSize;
                timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError, PgError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Write all of data, splitting into chunks.

Timeout semantics: timeout applies per chunk. Total wall-clock can reach N × timeout for N chunks. Use loWriteAllDeadline for a single wall-clock deadline covering the whole write.

proc loWriteAllDeadline(lo: LargeObject; data: seq[byte]; deadline: Duration;
                        chunkSize: int = loDefaultChunkSize): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError, PgError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Like loWriteAll but deadline bounds total wall-clock across all chunks. See the "Best-effort" note at the top of the Deadline-bounded API section.
proc loWriteStream(lo: LargeObject; callback: LoWriteCallback;
                   chunkSize: int = loDefaultChunkSize;
                   timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError, PgError],
    tags: [RootEffect, TimeEffect], forbids: [].}

Write to the Large Object by repeatedly calling callback until it returns an empty seq[byte].

Timeout semantics: timeout applies per chunk within each loWriteAll invocation. Total wall-clock is unbounded. Use loWriteStreamDeadline for a single wall-clock deadline.

proc loWriteStreamDeadline(lo: LargeObject; callback: LoWriteCallback;
                           deadline: Duration;
                           chunkSize: int = loDefaultChunkSize): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, CatchableError,
                                PgNoRowsError, PgNullError, PgTypeError, PgError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Like loWriteStream but deadline bounds total wall-clock across all writes. Callback time is included in the deadline. See the "Best-effort" note at the top of the Deadline-bounded API section.

Templates

template makeLoReadCallback(body: untyped): LoReadCallback
Create a LoReadCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]]
let cb = makeLoReadCallback:
  chunks.add(data)
template makeLoWriteCallback(body: untyped): LoWriteCallback

Create a LoWriteCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.

With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.

var idx = 0
let chunks = @[data1, data2]
let cb = makeLoWriteCallback:
  if idx < chunks.len:
    let chunk = chunks[idx]
    inc idx
    chunk
  else:
    newSeq[byte]()
template withLargeObject(conn: PgConnection; lo: untyped; oidVal: Oid;
                         mode: int32; body: untyped)
Open a Large Object, execute body, then close it. Must be used inside withTransaction.