#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: "2025-07-16 14:00:04 (ywatanabe)"
# File: /ssh:sp:/home/ywatanabe/proj/scitex_repo/src/scitex/db/_sqlite3/_delete_duplicates.py
# ----------------------------------------
import os
__FILE__ = __file__
__DIR__ = os.path.dirname(__FILE__)
# ----------------------------------------
# Time-stamp: "2024-11-11 14:16:58 (ywatanabe)"
import sqlite3
from typing import List, Optional, Tuple, Union
import pandas as pd
"""
Functionality:
- Deletes duplicate entries from an SQLite database table
Input:
- SQLite database file path, table name, columns to consider for duplicates
Output:
- Updated SQLite database with duplicates removed
Prerequisites:
- sqlite3, pandas, tqdm, scitex
"""
def _sort_db(cursor: sqlite3.Cursor, table_name: str, columns: List[str]) -> None:
"""
Sorts the database table based on the specified columns.
Parameters
----------
cursor : sqlite3.Cursor
The cursor object for executing SQL commands.
table_name : str
The name of the table to be sorted.
columns : List[str]
The list of column names to sort by, in order of priority.
Example
-------
>>> conn = sqlite3.connect('example.db')
>>> cursor = conn.cursor()
>>> _sort_db(cursor, 'my_table', ['column1', 'column2'])
>>> conn.commit()
>>> conn.close()
"""
columns_str = ", ".join(columns)
temp_table = f"{table_name}_temp"
cursor.execute(
f"CREATE TABLE {temp_table} AS SELECT * FROM {table_name} ORDER BY {columns_str}"
)
cursor.execute(f"DROP TABLE {table_name}")
cursor.execute(f"ALTER TABLE {temp_table} RENAME TO {table_name}")
# def _determine_columns(
# cursor: sqlite3.Cursor,
# table_name: str,
# columns: Union[str, List[str]],
# include_blob: bool,
# ) -> List[str]:
# cursor.execute(f"PRAGMA table_info({table_name})")
# table_info = cursor.fetchall()
# all_columns = [col[1] for col in table_info]
# column_types = {col[1]: col[2] for col in table_info}
# if columns == "all":
# columns = (
# all_columns
# if include_blob
# else [
# col
# for col in all_columns
# if column_types[col].lower() != "blob"
# ]
# )
# elif isinstance(columns, str):
# columns = [columns]
# columns_str = ", ".join(columns)
# print(f"Columns considered for duplicates: {columns_str}")
# return columns
def _determine_columns(
cursor: sqlite3.Cursor,
table_name: str,
columns: Union[str, List[str]],
include_blob: bool,
) -> List[str]:
cursor.execute(f"PRAGMA table_info({table_name})")
table_info = cursor.fetchall()
all_columns = [col[1] for col in table_info]
column_types = {col[1]: col[2] for col in table_info}
# PRAGMA columns: (cid, name, type, notnull, dflt_value, pk).
pk_columns = {col[1] for col in table_info if col[5]}
if columns == "all":
# Exclude PK columns — by definition unique, so they prevent any row
# from matching another and 'all' would silently dedup nothing.
columns = [col for col in all_columns if col not in pk_columns]
# Exclude blob columns unless caller opted in.
if not include_blob:
columns = [col for col in columns if column_types[col].lower() != "blob"]
# Exclude timestamp columns (e.g. created_at, updated_at).
columns = [col for col in columns if not col.endswith("_at")]
elif isinstance(columns, str):
columns = [columns]
columns_str = ", ".join(columns)
print(f"Columns considered for duplicates: {columns_str}")
return columns
def _fetch_as_df(
cursor: sqlite3.Cursor, columns: List[str], table_name: str
) -> pd.DataFrame:
print("\nFetching all database entries...")
columns_str = ", ".join(columns)
query = f"SELECT {columns_str} FROM {table_name}"
cursor.execute(query)
df_entries = cursor.fetchall()
return pd.DataFrame(df_entries, columns=columns)
def _find_duplicated(df: pd.DataFrame) -> pd.DataFrame:
df_duplicated = df[df.duplicated(keep="first")].copy()
duplication_rate = len(df_duplicated) / (len(df) - len(df_duplicated))
print(f"\n{100 * duplication_rate:.2f}% of data was duplicated. Cleaning up...")
print(f"\nOriginal entries:\n{df.head()}")
print(f"\nDuplicated entries:\n{df_duplicated.head()}")
return df_duplicated
def verify_duplicated_index(
cursor: sqlite3.Cursor,
duplicated_row: pd.Series,
table_name: str,
dry_run: bool,
) -> Tuple[str, bool]:
"""Check if entry to delete is the one intended"""
columns = list(duplicated_row.index)
columns_str = ", ".join(columns)
where_conditions = " AND ".join([f"{col} = ?" for col in columns])
select_query = f"""
SELECT {columns_str}
FROM {table_name}
WHERE {where_conditions}
"""
cursor.execute(select_query, tuple(duplicated_row))
entries = cursor.fetchall()
is_verified = len(entries) >= 1
if dry_run:
print(f"Expected duplicate entry: {tuple(duplicated_row)}")
print(f"Found entries: {entries}")
print(f"Verification {'succeeded' if is_verified else 'failed'}")
return select_query, is_verified
def _delete_entry(
cursor: sqlite3.Cursor,
duplicated_row: pd.Series,
table_name: str,
dry_run: bool = True,
) -> None:
select_query, is_verified = verify_duplicated_index(
cursor, duplicated_row, table_name, dry_run
)
if is_verified:
# Build DELETE directly: `select_query.replace("SELECT", "DELETE")`
# produced `DELETE col1, col2 FROM ...` which is not valid SQLite.
columns = list(duplicated_row.index)
where_conditions = " AND ".join([f"{col} = ?" for col in columns])
delete_query = f"DELETE FROM {table_name} WHERE {where_conditions} LIMIT 1"
if dry_run:
print(f"[DRY RUN] Would delete entry:\n{duplicated_row}")
else:
try:
cursor.execute(delete_query, tuple(duplicated_row))
except sqlite3.OperationalError:
# Older SQLite builds compiled without ENABLE_UPDATE_DELETE_LIMIT
# — fall back to a rowid-based delete that hits one row only.
cursor.execute(
f"DELETE FROM {table_name} WHERE rowid = ("
f"SELECT rowid FROM {table_name} WHERE {where_conditions} "
f"LIMIT 1)",
tuple(duplicated_row),
)
print(f"Deleted entry:\n{duplicated_row}")
else:
print(f"Skipping entry (not found or already deleted):\n{duplicated_row}")
[docs]
def delete_sqlite3_duplicates(
lpath_db: str,
table_name: str,
columns: Union[str, List[str]] = "all",
include_blob: bool = False,
chunk_size: int = 10_000,
dry_run: bool = True,
) -> Tuple[Optional[int], Optional[int]]:
try:
conn = sqlite3.connect(lpath_db)
cursor = conn.cursor()
# Vacuum the database to free up space
if not dry_run:
cursor.execute("VACUUM")
conn.commit()
columns = _determine_columns(cursor, table_name, columns, include_blob)
columns_str = ", ".join(columns)
# Drop temp table if exists from previous run
temp_table = f"{table_name}_temp"
cursor.execute(f"DROP TABLE IF EXISTS {temp_table}")
# Get all columns for creating temp table with same structure
cursor.execute(f"PRAGMA table_info({table_name})")
all_cols_info = cursor.fetchall()
all_cols = [col[1] for col in all_cols_info]
all_cols_str = ", ".join(all_cols)
# Create temp table with same structure
cursor.execute(
f"CREATE TABLE {temp_table} AS SELECT {all_cols_str} FROM {table_name} LIMIT 0"
)
# Get total row count
total_rows = cursor.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
print(f"Total rows in table: {total_rows}")
# Insert unique rows based on specified columns
insert_query = f"""
INSERT INTO {temp_table}
SELECT {all_cols_str}
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY {columns_str} ORDER BY rowid) as rn
FROM {table_name}
)
WHERE rn = 1
"""
if dry_run:
print(f"[DRY RUN] Would execute deduplication based on: {columns_str}")
else:
cursor.execute(insert_query)
conn.commit()
# Count unique rows
total_unique = cursor.execute(f"SELECT COUNT(*) FROM {temp_table}").fetchone()[
0
]
total_duplicates = total_rows - total_unique
if not dry_run:
# Replace original table with deduplicated one
cursor.execute(f"DROP TABLE {table_name}")
cursor.execute(f"ALTER TABLE {temp_table} RENAME TO {table_name}")
cursor.execute("VACUUM")
conn.commit()
else:
# Clean up temp table in dry run
cursor.execute(f"DROP TABLE IF EXISTS {temp_table}")
print(f"Total rows processed: {total_rows}")
print(f"Total unique rows: {total_unique}")
print(f"Total duplicates removed: {total_duplicates}")
return total_rows, total_duplicates
except Exception as error:
print(f"An error occurred: {error}")
return None, None
finally:
conn.close()
# EOF