Source code for dran.storage.sqlite_repository

# =========================================================================== #
# File: sqlite_repository.py                                                  #
# Author: Pfesesani V. van Zyl                                                #
# Email: pfesi24@gmail.com                                                    #
# =========================================================================== #


# Library imports
# --------------------------------------------------------------------------- #
import sqlite3
from typing import Any, Mapping, Optional
from dran.storage.sqlite_schema import _quote_ident
from dran.storage.sqlite_types import blob_to_array, normalize_for_storage
# =========================================================================== #


[docs] def insert_dict( conn: sqlite3.Connection, table: str, item: Mapping[str, Any], ) -> int: """ Insert a dict into table and return inserted row id. """ keys = list(item.keys()) placeholders = ", ".join("?" for _ in keys) col_list = ", ".join(_quote_ident(k) for k in keys) vals = [normalize_for_storage(item[k]) for k in keys] # cur=conn.execute(f'PRAGMA table_info("{table}")') # existing_keys=[row[1] for row in cur.fetchall()] # print(existing_keys, keys,len(existing_keys),len(keys)) cursor = conn.execute( f'INSERT INTO "{table}" ({col_list}) VALUES ({placeholders});', vals, ) return int(cursor.lastrowid)
[docs] def fetch_row( conn: sqlite3.Connection, table: str, row_id: int, ) -> dict[str, Any]: """ Fetch a row and reconstruct arrays from BLOBs where possible. """ cursor = conn.execute(f'SELECT * FROM "{table}" WHERE id = ?;', (row_id,)) row = cursor.fetchone() if row is None: raise RuntimeError("Row not found") col_names = [d[0] for d in cursor.description] data = dict(zip(col_names, row)) for key, value in data.items(): if isinstance(value, bytes): try: data[key] = blob_to_array(value) except Exception: pass return data
[docs] def get_existing_keys( conn: sqlite3.Connection, table: str, key: str, ) -> set[Any]: """ Load all existing values of a key into a set for fast membership checks. """ cursor = conn.execute(f'SELECT "{key}" FROM "{table}"') return {row[0] for row in cursor.fetchall()}
[docs] def save_record( conn: sqlite3.Connection, table: str, item: Mapping[str, Any], *, create_table_fn: Optional[callable] = None, ) -> int: """ Insert one record. Returns row id. create_table_fn is optional and lets callers ensure schema before insert. """ if create_table_fn is not None: create_table_fn(conn, table, item) return insert_dict(conn, table, item)