Source code for dran.storage.db_introspection

# =========================================================================== #
# File: db_introspection.py                                                   #
# Author: Pfesesani V. van Zyl                                                #
# Email: pfesi24@gmail.com                                                    #
# =========================================================================== #


# Library imports
# --------------------------------------------------------------------------- #
import re
import sqlite3
from pathlib import Path
from typing import Iterable
import logging
from typing import Any, Dict, List
from dran.utils.fs import ProjectPaths
from dran.storage.sqlite_schema import ensure_table_from_dict
from dran.storage.sqlite_connection import get_connection
from dran.storage.sqlite_repository import insert_dict
# =========================================================================== #


# _TABLE_FREQ_SUFFIX = re.compile(r"^(?P<prefix>.+)_(?P<freq>\d+)$")
_PROCESSED_FILES_TABLE = "processed_files"

# def get_table_names(database_path: str) -> List[str]:
#     """
#     Return a sorted list of user-defined table names
#     from the given SQLite database file.

#     :param database_path: Path to the SQLite .db file
#     :return: List of table names
#     """
#     query: str = """
#         SELECT name
#         FROM sqlite_master
#         WHERE type = 'table'
#         AND name NOT LIKE 'sqlite_%'
#         ORDER BY name;
#     """

#     with sqlite3.connect(database_path) as connection:
#         cursor = connection.cursor()
#         cursor.execute(query)
#         rows = cursor.fetchall()

#     # Extract table names from returned tuples
#     return [row[0] for row in rows]

# def list_tables_in_frequency_range(
#     db_path: Path,
#     start_mhz: int,
#     end_mhz: int,
# ) -> list[tuple[str, int]]:
#     """
#     Return (table_name, freq_mhz) for tables named <prefix>_<freq_mhz>
#     where freq_mhz is within [start_mhz, end_mhz].
#     """
#     conn = sqlite3.connect(str(db_path))
#     try:
#         return list_tables_in_frequency_range_conn(conn, start_mhz, end_mhz)
#     finally:
#         conn.close()


# def list_tables_in_frequency_range_conn(
#     conn: sqlite3.Connection,
#     start_mhz: int,
#     end_mhz: int,
# ) -> list[tuple[str, int]]:
#     rows = conn.execute(
#         "SELECT name FROM sqlite_schema WHERE type='table';"
#     ).fetchall()

#     matches: list[tuple[str, int]] = []
#     for (name,) in rows:
#         m = _TABLE_FREQ_SUFFIX.match(name)
#         if not m:
#             continue

#         freq_mhz = int(m.group("freq"))
#         if start_mhz <= freq_mhz <= end_mhz:
#             matches.append((name, freq_mhz))

#     matches.sort(key=lambda x: (x[1], x[0]))
#     return matches


# def fetch_existing_file_basenames_and_paths(
#         path_to_db:Path,
#         tables: Iterable[str],
#         log,
#         filepath_field: str = "FILEPATH",
# ) -> tuple[set[str], dict[str, str]]:
#     """
#     Return:
#     - basenames: set of file basenames found in FILEPATH columns
#     - basename_to_path: dict mapping basename -> first seen stored path

#     Notes:
#     - Identifier quoting is used for table and column names.
#     - NULL or empty values are skipped.
#     """
#     if filepath_field != "FILEPATH":
#         raise ValueError("Only filepath_field='FILEPATH' is supported.")

#     basenames: set[str] = set()
    
#     conn = get_connection(path_to_db, log)
#     try:
#         for table in tables:
#             table=table.upper()
#             try:
#                 cursor = conn.execute(
#                     f'SELECT "{filepath_field}" FROM "{table}";'
#                 )
#             except sqlite3.Error:
#                 continue

#             for (filepath,) in cursor:
#                 if not filepath:
#                     continue

#                 basename = Path(filepath).name
#                 basenames.add(basename)

#                 # if basename not in basename_to_path:
#                 #     basename_to_path[basename] = str(filepath)
#     finally:
#         conn.close()

#     return basenames#, basename_to_path


[docs] def record_exists( conn: sqlite3.Connection, table: str, key_field: str, key_value: Any ) -> bool: """ Fast existence check using an indexed lookup (UNIQUE field). Returns True if a record exists, else False. """ table=table.upper() try: cursor = conn.execute( f'SELECT 1 FROM "{table}" WHERE "{key_field}" = ? LIMIT 1;', (key_value,), ) # print('oi') return cursor.fetchone() except: print('>>>> No record of this observation') # check if table exists # names=get_table_names query: str = """ SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name; """ cursor= conn.execute(query) print(f"Table : {table} not found") return None
def _ensure_and_insert( table_name: str, row: Dict[str, Any], paths:ProjectPaths, log: logging.Logger, ) -> None: """Ensure a SQLite table exists and insert a row safely. Opens a database connection, creates the table schema from the row if needed, inserts the row, logs duplicates on integrity errors, and always closes the connection. """ # print('****',paths, paths.db_path,) table_name=table_name.upper() conn = get_connection(paths.db_path, log) ensure_table_from_dict(conn, table_name, row) row_id = insert_dict(conn, table_name, row) try: row_id = insert_dict(conn, table_name, row) log.debug("Inserted id=%s into %s", row_id, table_name) except sqlite3.IntegrityError: log.debug("Row already exists in %s. Skipping.", table_name) finally: conn.close()
[docs] def ensure_processed_files_table(conn: sqlite3.Connection) -> None: """ Ensure a small registry table exists for processed files. This enables fast de-duplication across path changes and symlinks. """ conn.execute( f""" CREATE TABLE IF NOT EXISTS "{_PROCESSED_FILES_TABLE}" ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_hash TEXT UNIQUE, file_size INTEGER, file_mtime REAL, filepath TEXT, filename TEXT, created_at TEXT DEFAULT (datetime('now')) ); """ ) conn.execute( f'CREATE INDEX IF NOT EXISTS idx_processed_files_size ON "{_PROCESSED_FILES_TABLE}" (file_size);' ) conn.execute( f'CREATE INDEX IF NOT EXISTS idx_processed_files_path ON "{_PROCESSED_FILES_TABLE}" (filepath);' )
[docs] def processed_file_exists_by_path(conn: sqlite3.Connection, filepath: str) -> bool: ensure_processed_files_table(conn) cursor = conn.execute( f'SELECT 1 FROM "{_PROCESSED_FILES_TABLE}" WHERE filepath = ? LIMIT 1;', (filepath,), ) return cursor.fetchone() is not None
[docs] def processed_file_hashes_by_size(conn: sqlite3.Connection, file_size: int) -> List[str]: ensure_processed_files_table(conn) cursor = conn.execute( f'SELECT file_hash FROM "{_PROCESSED_FILES_TABLE}" WHERE file_size = ?;', (file_size,), ) return [row[0] for row in cursor.fetchall() if row and row[0]]
[docs] def insert_processed_file( conn: sqlite3.Connection, *, file_hash: str, file_size: int, file_mtime: float, filepath: str, filename: str ) -> None: ensure_processed_files_table(conn) conn.execute( f""" INSERT OR IGNORE INTO "{_PROCESSED_FILES_TABLE}" (file_hash, file_size, file_mtime, filepath, filename) VALUES (?, ?, ?, ?, ?); """, (file_hash, file_size, file_mtime, filepath, filename), )