async_postgres/pg_largeobject

Search:
Group by:

PostgreSQL Large Object API

Provides an async interface to PostgreSQL's Large Object facility for storing and streaming binary data larger than what fits comfortably in a bytea column.

All Large Object operations must be performed inside a transaction (use withTransaction).

Example

conn.withTransaction:
  let oid = await conn.loCreate()
  conn.withLargeObject(lo, oid, INV_READWRITE):
    let written = await lo.loWrite(data)
    await lo.loSeek(0, SEEK_SET)
    let readBack = await lo.loReadAll()

Types

LargeObject = object
  conn*: PgConnection
  fd*: int32
  oid*: Oid
LoReadCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
LoWriteCallback = proc (): Future[seq[byte]] {....gcsafe.}
Oid = uint32

Consts

INV_READ = 0x00040000'i32
INV_READWRITE = 393216'i32
INV_WRITE = 0x00020000'i32
loDefaultChunkSize = 262144
256KB
SEEK_CUR = 1'i32
SEEK_END = 2'i32
SEEK_SET = 0'i32

Procs

proc loClose(lo: LargeObject; timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Close an open Large Object handle.
proc loCreate(conn: PgConnection; requestedOid: Oid = 0;
              timeout: Duration = ZeroDuration): Future[Oid] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Create a new Large Object, returning its OID. Pass requestedOid = 0 to let the server assign an OID.
proc loExport(conn: PgConnection; oid: Oid; filename: string;
              timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Export a Large Object to a server-side file.
proc loImport(conn: PgConnection; filename: string;
              timeout: Duration = ZeroDuration): Future[Oid] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Import a server-side file into a new Large Object, returning its OID.
proc loOpen(conn: PgConnection; oid: Oid; mode: int32 = INV_READWRITE;
            timeout: Duration = ZeroDuration): Future[LargeObject] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Open a Large Object for reading/writing. Returns a LargeObject handle.
proc loRead(lo: LargeObject; length: int32; timeout: Duration = ZeroDuration): Future[
    seq[byte]] {....stackTrace: false, raises: [Exception, ValueError,
    PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError,
    PgTimeoutError, AsyncTimeoutError, PgTypeError],
                 tags: [RootEffect, TimeEffect], forbids: [].}
Read up to length bytes from the current position. Returns the bytes read (may be fewer than length at EOF).
proc loReadAll(lo: LargeObject; chunkSize: int32 = loDefaultChunkSize;
               timeout: Duration = ZeroDuration): Future[seq[byte]] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Read the entire Large Object from the current position to EOF.
proc loReadStream(lo: LargeObject; callback: LoReadCallback;
                  chunkSize: int32 = loDefaultChunkSize;
                  timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Read the Large Object in chunks, calling callback for each chunk.
proc loSeek(lo: LargeObject; offset: int64; whence: int32 = SEEK_SET;
            timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Seek to a position. Returns the new absolute position.
proc loSize(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Return the total size of the Large Object in bytes.
proc loTell(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError],
    tags: [RootEffect, TimeEffect], forbids: [].}
Return the current read/write position.
proc loTruncate(lo: LargeObject; length: int64; timeout: Duration = ZeroDuration): Future[
    void] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                        PgConnectionError, SslError, KeyError,
                                        ProtocolError, PgTimeoutError,
                                        AsyncTimeoutError, PgError, PgTypeError],
            tags: [RootEffect, TimeEffect], forbids: [].}
Truncate the Large Object to length bytes.
proc loWrite(lo: LargeObject; data: seq[byte]; timeout: Duration = ZeroDuration): Future[
    int32] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError,
    PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError,
    AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect],
             forbids: [].}
Write data at the current position. Returns the number of bytes written.
proc loWriteAll(lo: LargeObject; data: seq[byte];
                chunkSize: int = loDefaultChunkSize;
                timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError,
                                CatchableError], tags: [RootEffect, TimeEffect],
    forbids: [].}
Write all of data, splitting into chunks.
proc loWriteStream(lo: LargeObject; callback: LoWriteCallback;
                   chunkSize: int = loDefaultChunkSize;
                   timeout: Duration = ZeroDuration): Future[void] {.
    ...stackTrace: false, raises: [Exception, ValueError, PgQueryError,
                                PgConnectionError, SslError, KeyError,
                                ProtocolError, PgTimeoutError,
                                AsyncTimeoutError, PgError, PgTypeError,
                                CatchableError], tags: [RootEffect, TimeEffect],
    forbids: [].}
Write to the Large Object by repeatedly calling callback until it returns an empty seq[byte].

Templates

template makeLoReadCallback(body: untyped): LoReadCallback
Create a LoReadCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]]
let cb = makeLoReadCallback:
  chunks.add(data)
template makeLoWriteCallback(body: untyped): LoWriteCallback

Create a LoWriteCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.

With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.

var idx = 0
let chunks = @[data1, data2]
let cb = makeLoWriteCallback:
  if idx < chunks.len:
    let chunk = chunks[idx]
    inc idx
    chunk
  else:
    newSeq[byte]()
template withLargeObject(conn: PgConnection; lo: untyped; oidVal: Oid;
                         mode: int32; body: untyped)
Open a Large Object, execute body, then close it. Must be used inside withTransaction.