async_postgres/pg_protocol

Search:
Group by:

Types

BackendMessage = object
  case kind*: BackendMessageKind
  of bmkAuthenticationOk, bmkAuthenticationCleartextPassword:
    nil
  of bmkAuthenticationMD5Password:
    md5Salt*: array[4, byte]
  of bmkAuthenticationSASL:
    saslMechanisms*: seq[string]
  of bmkAuthenticationSASLContinue:
    saslData*: seq[byte]
  of bmkAuthenticationSASLFinal:
    saslFinalData*: seq[byte]
  of bmkBackendKeyData:
    backendPid*: int32
    backendSecretKey*: int32
  of bmkBindComplete, bmkCloseComplete, bmkNoData, bmkEmptyQueryResponse,
     bmkParseComplete, bmkPortalSuspended, bmkCopyDone:
    nil
  of bmkCommandComplete:
    commandTag*: string
  of bmkCopyInResponse, bmkCopyOutResponse, bmkCopyBothResponse:
    copyFormat*: CopyFormat
    copyColumnFormats*: seq[int16]
  of bmkCopyData:
    copyData*: seq[byte]
  of bmkDataRow:
    columns*: seq[Option[seq[byte]]]
  of bmkErrorResponse:
    errorFields*: seq[ErrorField]
  of bmkNoticeResponse:
    noticeFields*: seq[ErrorField]
  of bmkNotificationResponse:
    notifPid*: int32
    notifChannel*: string
    notifPayload*: string
  of bmkParameterDescription:
    paramTypeOids*: seq[int32]
  of bmkParameterStatus:
    paramName*: string
    paramValue*: string
  of bmkReadyForQuery:
    txStatus*: TransactionStatus
  of bmkRowDescription:
    fields*: seq[FieldDescription]
Parsed message from the PostgreSQL backend. Variant type keyed by kind.
BackendMessageKind = enum
  bmkAuthenticationOk, bmkAuthenticationCleartextPassword,
  bmkAuthenticationMD5Password, bmkAuthenticationSASL,
  bmkAuthenticationSASLContinue, bmkAuthenticationSASLFinal, bmkBackendKeyData,
  bmkBindComplete, bmkCloseComplete, bmkCommandComplete, bmkCopyInResponse,
  bmkCopyOutResponse, bmkCopyBothResponse, bmkCopyData, bmkCopyDone, bmkDataRow,
  bmkEmptyQueryResponse, bmkErrorResponse, bmkNoData, bmkNoticeResponse,
  bmkNotificationResponse, bmkParameterDescription, bmkParameterStatus,
  bmkParseComplete, bmkPortalSuspended, bmkReadyForQuery, bmkRowDescription
Message types received from server to client.
CopyFormat = enum
  cfText = 0, cfBinary = 1
Wire format for COPY operations.
DescribeKind = enum
  dkPortal = 80, dkStatement = 83
Target of a Describe or Close message.
ErrorField = object
  code*: char
  value*: string
A single field from an ErrorResponse or NoticeResponse message.
FieldDescription = object
  name*: string
  tableOid*: int32
  columnAttrNum*: int16
  typeOid*: int32
  typeSize*: int16
  typeMod*: int32
  formatCode*: int16
Column metadata from a RowDescription message.
FrontendMessageKind = enum
  fmkStartup, fmkSSLRequest, fmkPassword, fmkSASLInitialResponse,
  fmkSASLResponse, fmkQuery, fmkParse, fmkBind, fmkDescribe, fmkExecute,
  fmkClose, fmkSync, fmkFlush, fmkTerminate, fmkCopyData, fmkCopyDone,
  fmkCopyFail
Message types sent from client to server.
ParseResult = object
  state*: ParseState
  message*: BackendMessage
Result of parsing bytes from the receive buffer.
ParseState = enum
  psComplete, psIncomplete, psDataRow ## DataRow parsed in-place into RowData; no BackendMessage constructed
ProtocolError = object of CatchableError
Raised on PostgreSQL wire protocol violations.
Row = object
  data*: RowData
  rowIdx*: int32
Lightweight view into a single row within a RowData buffer.
RowData = ref object
  buf*: seq[byte]            ## All column data concatenated
  cellIndex*: seq[int32]     ## [off, len, off, len, ...] per cell; len=-1 = NULL
  numCols*: int16
  colFormats*: seq[int16]    ## Per-column format codes (0=text, 1=binary)
  colTypeOids*: seq[int32]   ## Per-column type OIDs for binary→text conversion
  fields*: seq[FieldDescription] ## Column metadata for name-based access
  colMap*: Table[string, int] ## Cached name→index mapping (lazily built)
Flat buffer holding all row data for a query result.
TransactionStatus = enum
  tsInFailedTransaction = 69, tsIdle = 73, tsInTransaction = 84
Server transaction state reported in ReadyForQuery.

Consts

BinarySafeOids = [16'i32, 17, 20, 21, 23, 25, 600, 601, 602, 603, 604, 628, 700,
                  701, 718, 1043, 1560, 1561, 1562, 1563, 3904, 3905, 3906,
                  3907, 3908, 3909, 3910, 3911, 3912, 3913, 3926, 3927, 4451,
                  4532, 4533, 4534, 4535, 4536]
copyDoneMsg = [99'u, 0'u8, 0'u8, 0'u8, 4'u8]
Pre-built CopyDone message bytes.
flushMsg = [72'u, 0'u8, 0'u8, 0'u8, 4'u8]
Pre-built Flush message bytes.
pgCopyBinaryHeader: array[19, byte] = [80'u, 71'u, 67'u, 79'u, 80'u, 89'u, 10'u,
                                       0xFF'u8, 13'u, 10'u, 0x00'u8, 0x00'u8,
                                       0x00'u8, 0x00'u8, 0x00'u8, 0x00'u8,
                                       0x00'u8, 0x00'u8, 0x00'u8]
pgCopyBinaryTrailer: array[2, byte] = [0xFF'u8, 0xFF'u8]
PGCOPY binary format trailer (int16(-1) sentinel).
syncMsg = [83'u, 0'u8, 0'u8, 0'u8, 4'u8]
Pre-built Sync message bytes.

Procs

proc addBind(buf: var seq[byte]; portalName: string; stmtName: string;
             paramFormats: openArray[int16];
             paramValues: openArray[Option[seq[byte]]];
             resultFormats: openArray[int16] = []) {....raises: [], tags: [],
    forbids: [].}
Append a Bind message to the buffer (extended query protocol).
proc addClose(buf: var seq[byte]; kind: DescribeKind; name: string) {.
    ...raises: [], tags: [], forbids: [].}
Append a Close message to the buffer (portal or statement).
proc addCopyBinaryHeader(buf: var seq[byte]) {....raises: [], tags: [], forbids: [].}
Append the PostgreSQL binary COPY header (signature + flags + extension area).
proc addCopyBinaryTrailer(buf: var seq[byte]) {....raises: [], tags: [],
    forbids: [].}
Append the binary COPY trailer (int16 = -1).
proc addCopyDone(buf: var seq[byte]) {.inline, ...raises: [], tags: [], forbids: [].}
Append a CopyDone message to the buffer.
proc addCopyFieldBool(buf: var seq[byte]; val: bool) {....raises: [], tags: [],
    forbids: [].}
Append a boolean field in binary COPY format.
proc addCopyFieldFloat32(buf: var seq[byte]; val: float32) {....raises: [],
    tags: [], forbids: [].}
Append a float32 field in binary COPY format.
proc addCopyFieldFloat64(buf: var seq[byte]; val: float64) {....raises: [],
    tags: [], forbids: [].}
Append a float64 field in binary COPY format.
proc addCopyFieldInt16(buf: var seq[byte]; val: int16) {....raises: [], tags: [],
    forbids: [].}
Append an int16 field in binary COPY format.
proc addCopyFieldInt32(buf: var seq[byte]; val: int32) {....raises: [], tags: [],
    forbids: [].}
Append an int32 field in binary COPY format.
proc addCopyFieldInt64(buf: var seq[byte]; val: int64) {....raises: [], tags: [],
    forbids: [].}
Append an int64 field in binary COPY format.
proc addCopyFieldNull(buf: var seq[byte]) {....raises: [], tags: [], forbids: [].}
Append a NULL field in binary COPY format.
proc addCopyFieldString(buf: var seq[byte]; val: string) {....raises: [], tags: [],
    forbids: [].}
Append a string field in binary COPY format.
proc addCopyFieldText(buf: var seq[byte]; val: openArray[byte]) {....raises: [],
    tags: [], forbids: [].}
Append a raw byte field in binary COPY format.
proc addCopyTupleStart(buf: var seq[byte]; numCols: int16) {....raises: [],
    tags: [], forbids: [].}
Start a new tuple in binary COPY format with the given column count.
proc addCString(buf: var seq[byte]; s: string) {....raises: [], tags: [],
    forbids: [].}
Append a null-terminated C string to the buffer.
proc addDescribe(buf: var seq[byte]; kind: DescribeKind; name: string) {.
    ...raises: [], tags: [], forbids: [].}
Append a Describe message to the buffer (portal or statement).
proc addExecute(buf: var seq[byte]; portalName: string; maxRows: int32 = 0) {.
    ...raises: [], tags: [], forbids: [].}
Append an Execute message to the buffer. maxRows of 0 means unlimited.
proc addFlush(buf: var seq[byte]) {.inline, ...raises: [], tags: [], forbids: [].}
Append a Flush message to the buffer.
proc addInt16(buf: var seq[byte]; val: int16) {.inline, ...raises: [], tags: [],
    forbids: [].}
Append a 16-bit integer in big-endian format to the buffer.
proc addInt32(buf: var seq[byte]; val: int32) {.inline, ...raises: [], tags: [],
    forbids: [].}
Append a 32-bit integer in big-endian format to the buffer.
proc addInt64(buf: var seq[byte]; val: int64) {.inline, ...raises: [], tags: [],
    forbids: [].}
Append a 64-bit integer in big-endian format to the buffer.
proc addParse(buf: var seq[byte]; stmtName: string; sql: string;
              paramTypeOids: openArray[int32] = []) {....raises: [], tags: [],
    forbids: [].}
Append a Parse message to the buffer (extended query protocol).
proc addSync(buf: var seq[byte]) {.inline, ...raises: [], tags: [], forbids: [].}
Append a Sync message to the buffer.
proc buildResultFormats(fields: openArray[FieldDescription]): seq[int16] {.
    ...raises: [], tags: [], forbids: [].}
Build per-column binary format codes: 1 for known safe types, 0 for others.
proc buildResultFormats(fields: openArray[FieldDescription];
                        extraBinaryOids: openArray[int32]): seq[int16] {.
    ...raises: [], tags: [], forbids: [].}
Build per-column binary format codes with additional runtime-safe OIDs.
proc decodeCString(buf: openArray[byte]; offset: int): (string, int) {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Decode a null-terminated string at the given offset. Returns (string, bytes consumed).
proc decodeInt16(buf: openArray[byte]; offset: int): int16 {....raises: [],
    tags: [], forbids: [].}
Decode a 16-bit integer from big-endian bytes at the given offset.
proc decodeInt32(buf: openArray[byte]; offset: int): int32 {....raises: [],
    tags: [], forbids: [].}
Decode a 32-bit integer from big-endian bytes at the given offset.
proc decodeInt64(buf: openArray[byte]; offset: int): int64 {....raises: [],
    tags: [], forbids: [].}
Decode a 64-bit integer from big-endian bytes at the given offset.
proc encodeBind(portalName: string; stmtName: string;
                paramFormats: openArray[int16];
                paramValues: openArray[Option[seq[byte]]];
                resultFormats: openArray[int16] = []): seq[byte] {....raises: [],
    tags: [], forbids: [].}
Encode a standalone Bind message.
proc encodeCancelRequest(pid: int32; secretKey: int32): seq[byte] {....raises: [],
    tags: [], forbids: [].}
Encode a CancelRequest message to abort a running query.
proc encodeClose(kind: DescribeKind; name: string): seq[byte] {....raises: [],
    tags: [], forbids: [].}
Encode a standalone Close message.
proc encodeCopyData(buf: var seq[byte]; data: openArray[byte]) {....raises: [],
    tags: [], forbids: [].}
Encode a CopyData message, appending to buf. Single setLen for header + payload to minimize bounds checks.
proc encodeCopyDone(): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode a standalone CopyDone message.
proc encodeCopyFail(errorMsg: string): seq[byte] {....raises: [], tags: [],
    forbids: [].}
Encode a CopyFail message to abort a COPY operation with an error.
proc encodeDescribe(kind: DescribeKind; name: string): seq[byte] {....raises: [],
    tags: [], forbids: [].}
Encode a standalone Describe message.
proc encodeExecute(portalName: string; maxRows: int32 = 0): seq[byte] {.
    ...raises: [], tags: [], forbids: [].}
Encode a standalone Execute message.
proc encodeFlush(): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode a standalone Flush message.
proc encodeInt16(val: int16): array[2, byte] {....raises: [], tags: [], forbids: [].}
Encode a 16-bit integer as big-endian bytes.
proc encodeInt32(val: int32): array[4, byte] {....raises: [], tags: [], forbids: [].}
Encode a 32-bit integer as big-endian bytes.
proc encodeParse(stmtName: string; sql: string;
                 paramTypeOids: openArray[int32] = []): seq[byte] {....raises: [],
    tags: [], forbids: [].}
Encode a standalone Parse message.
proc encodePassword(password: string): seq[byte] {....raises: [], tags: [],
    forbids: [].}
Encode a PasswordMessage for cleartext or MD5 authentication.
proc encodeQuery(sql: string): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode a simple Query message.
proc encodeSASLInitialResponse(mechanism: string; data: seq[byte]): seq[byte] {.
    ...raises: [], tags: [], forbids: [].}
Encode a SASLInitialResponse message with the chosen mechanism and client-first data.
proc encodeSASLResponse(data: seq[byte]): seq[byte] {....raises: [], tags: [],
    forbids: [].}
Encode a SASLResponse message with client-final data.
proc encodeSSLRequest(): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode an SSLRequest message (magic number 80877103).
proc encodeStandbyStatusUpdate(receiveLsn, flushLsn, applyLsn, sendTime: int64;
                               reply: byte): seq[byte] {....raises: [], tags: [],
    forbids: [].}
Encode a Standby Status Update as a CopyData message. The inner format is: byte 'r' + receiveLsn(8) + flushLsn(8) + applyLsn(8) + sendTime(8) + reply(1) = 34 bytes payload.
proc encodeStartup(user: string; database: string;
                   extraParams: openArray[(string, string)] = []): seq[byte] {.
    ...raises: [], tags: [], forbids: [].}
Encode a StartupMessage (protocol v3.0) with user, database, and extra parameters.
proc encodeSync(): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode a standalone Sync message.
proc encodeTerminate(): seq[byte] {....raises: [], tags: [], forbids: [].}
Encode a Terminate message to close the connection.
proc formatError(fields: seq[ErrorField]): string {....raises: [], tags: [],
    forbids: [].}
Format error fields into a human-readable error message with severity, SQLSTATE, detail, and hint.
proc getErrorField(fields: seq[ErrorField]; code: char): string {....raises: [],
    tags: [], forbids: [].}
Get the value of an error field by its single-char code (e.g. 'M' for message).
func isBinarySafeOid(oid: int32): bool {....raises: [], tags: [], forbids: [].}
Check if a type OID can be safely requested in binary format.
proc newRowData(numCols: int16; colFormats: seq[int16] = @[];
                colTypeOids: seq[int32] = @[]): RowData {....raises: [], tags: [],
    forbids: [].}
Create a new RowData flat buffer for accumulating DataRow messages.
proc parseBackendMessage(buf: openArray[byte]; consumed: var int;
                         rowData: RowData = nil): ParseResult {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Parse a single backend message from buf. On success, sets consumed to the number of bytes used. The caller is responsible for discarding those bytes from the buffer.
proc parseDataRowInto(body: openArray[byte]; rd: RowData) {.
    ...raises: [ProtocolError], tags: [], forbids: [].}
Parse a DataRow message body directly into a RowData flat buffer. Column data is appended to rd.buf and (offset, length) pairs to rd.cellIndex. Uses a single bulk copyMem for the entire row payload, then walks the copied buffer to build cellIndex entries.
proc patchLen(buf: var seq[byte]; offset: int = 1) {....raises: [], tags: [],
    forbids: [].}
Patch the length placeholder at offset with buf.len minus the tag byte.
proc patchMsgLen(buf: var seq[byte]; msgStart: int) {.inline, ...raises: [],
    tags: [], forbids: [].}
Patch the length field of a message starting at msgStart. Length = total message size minus the type byte.
proc reuseRowData(rd: RowData; numCols: int16): RowData {....raises: [], tags: [],
    forbids: [].}
Create a new RowData that takes over the old buffer's capacity via move, without format metadata.
proc reuseRowData(rd: RowData; numCols: int16; colFormats: sink seq[int16];
                  colTypeOids: sink seq[int32]): RowData {....raises: [], tags: [],
    forbids: [].}
Create a new RowData that takes over the old buffer's capacity via move. The old RowData (and any QueryResult still referencing it) is left intact.