PostgreSQL Large Object API
Provides an async interface to PostgreSQL's Large Object facility for storing and streaming binary data larger than what fits comfortably in a bytea column.
All Large Object operations must be performed inside a transaction (use withTransaction).
Example
conn.withTransaction: let oid = await conn.loCreate() conn.withLargeObject(lo, oid, INV_READWRITE): let written = await lo.loWrite(data) await lo.loSeek(0, SEEK_SET) let readBack = await lo.loReadAll()
Types
LargeObject = object conn*: PgConnection fd*: int32 oid*: Oid
LoReadCallback = proc (data: seq[byte]): Future[void] {....gcsafe.}
LoWriteCallback = proc (): Future[seq[byte]] {....gcsafe.}
Oid = uint32
Consts
INV_READ = 0x00040000'i32
INV_READWRITE = 393216'i32
INV_WRITE = 0x00020000'i32
loDefaultChunkSize = 262144
- 256KB
SEEK_CUR = 1'i32
SEEK_END = 2'i32
SEEK_SET = 0'i32
Procs
proc loClose(lo: LargeObject; timeout: Duration = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Close an open Large Object handle.
proc loCreate(conn: PgConnection; requestedOid: Oid = 0; timeout: Duration = ZeroDuration): Future[Oid] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Create a new Large Object, returning its OID. Pass requestedOid = 0 to let the server assign an OID.
proc loExport(conn: PgConnection; oid: Oid; filename: string; timeout: Duration = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Export a Large Object to a server-side file.
proc loImport(conn: PgConnection; filename: string; timeout: Duration = ZeroDuration): Future[Oid] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Import a server-side file into a new Large Object, returning its OID.
proc loOpen(conn: PgConnection; oid: Oid; mode: int32 = INV_READWRITE; timeout: Duration = ZeroDuration): Future[LargeObject] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Open a Large Object for reading/writing. Returns a LargeObject handle.
proc loRead(lo: LargeObject; length: int32; timeout: Duration = ZeroDuration): Future[ seq[byte]] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Read up to length bytes from the current position. Returns the bytes read (may be fewer than length at EOF).
proc loReadAll(lo: LargeObject; chunkSize: int32 = loDefaultChunkSize; timeout: Duration = ZeroDuration): Future[seq[byte]] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Read the entire Large Object from the current position to EOF.
proc loReadStream(lo: LargeObject; callback: LoReadCallback; chunkSize: int32 = loDefaultChunkSize; timeout: Duration = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Read the Large Object in chunks, calling callback for each chunk.
proc loSeek(lo: LargeObject; offset: int64; whence: int32 = SEEK_SET; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Seek to a position. Returns the new absolute position.
proc loSize(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Return the total size of the Large Object in bytes.
proc loTell(lo: LargeObject; timeout: Duration = ZeroDuration): Future[int64] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Return the current read/write position.
proc loTruncate(lo: LargeObject; length: int64; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Truncate the Large Object to length bytes.
proc loUnlink(conn: PgConnection; oid: Oid; timeout: Duration = ZeroDuration): Future[ void] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Delete a Large Object.
proc loWrite(lo: LargeObject; data: seq[byte]; timeout: Duration = ZeroDuration): Future[ int32] {....stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError], tags: [RootEffect, TimeEffect], forbids: [].}
- Write data at the current position. Returns the number of bytes written.
proc loWriteAll(lo: LargeObject; data: seq[byte]; chunkSize: int = loDefaultChunkSize; timeout: Duration = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
- Write all of data, splitting into chunks.
proc loWriteStream(lo: LargeObject; callback: LoWriteCallback; chunkSize: int = loDefaultChunkSize; timeout: Duration = ZeroDuration): Future[void] {. ...stackTrace: false, raises: [Exception, ValueError, PgQueryError, PgConnectionError, SslError, KeyError, ProtocolError, PgTimeoutError, AsyncTimeoutError, PgError, PgTypeError, CatchableError], tags: [RootEffect, TimeEffect], forbids: [].}
- Write to the Large Object by repeatedly calling callback until it returns an empty seq[byte].
Templates
template makeLoReadCallback(body: untyped): LoReadCallback
-
Create a LoReadCallback that works with both asyncdispatch and chronos. Inside body, the current chunk is available as data: seq[byte].
var chunks: seq[seq[byte]] let cb = makeLoReadCallback: chunks.add(data)
template makeLoWriteCallback(body: untyped): LoWriteCallback
-
Create a LoWriteCallback that works with both asyncdispatch and chronos. body must evaluate to seq[byte]. Return an empty seq to signal completion.
With asyncdispatch, anonymous async procs cannot return non-void types, so this template wraps the body in manual Future construction.
var idx = 0 let chunks = @[data1, data2] let cb = makeLoWriteCallback: if idx < chunks.len: let chunk = chunks[idx] inc idx chunk else: newSeq[byte]()
template withLargeObject(conn: PgConnection; lo: untyped; oidVal: Oid; mode: int32; body: untyped)
- Open a Large Object, execute body, then close it. Must be used inside withTransaction.