Coverage for src / crump / database.py: 91%
586 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-10 18:00 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-10 18:00 +0000
1"""Database operations for crump."""
3from __future__ import annotations
5import logging
6import sqlite3
7from pathlib import Path
8from typing import Any, Protocol
10import psycopg
11from psycopg import sql
13from crump.config import ColumnMapping, CrumpJob, FailureMode, apply_row_transformations
14from crump.tabular_file import create_reader
17def _detect_file_format(file_path: Path) -> Any:
18 """Detect file format from extension for tabular files.
20 Args:
21 file_path: Path to the file
23 Returns:
24 InputFileType enum value (CSV or PARQUET only, defaults to CSV for unknown extensions)
26 Note:
27 This function only detects CSV and Parquet formats since those are the
28 formats supported by the tabular file reader. CDF files are not directly
29 syncable and must be extracted first.
30 """
31 from crump.file_types import InputFileType
33 try:
34 file_type = InputFileType.from_path(str(file_path))
35 # Only return CSV or PARQUET; treat everything else (including CDF) as CSV
36 if file_type == InputFileType.PARQUET:
37 return InputFileType.PARQUET
38 else:
39 return InputFileType.CSV
40 except ValueError:
41 # Unknown extension, default to CSV
42 return InputFileType.CSV
45logger = logging.getLogger(__name__)
48class DryRunSummary:
49 """Summary of changes that would be made during a dry-run sync."""
51 def __init__(self) -> None:
52 """Initialize dry-run summary."""
53 self.table_name: str = ""
54 self.table_exists: bool = False
55 self.new_columns: list[tuple[str, str]] = []
56 self.new_indexes: list[str] = []
57 self.rows_to_sync: int = 0
58 self.rows_to_delete: int = 0
61class DatabaseBackend(Protocol):
62 """Protocol for database backend operations."""
64 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None:
65 """Execute a query."""
66 ...
68 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]:
69 """Fetch all results from a query."""
70 ...
72 def commit(self) -> None:
73 """Commit the current transaction."""
74 ...
76 def close(self) -> None:
77 """Close the connection."""
78 ...
80 def map_data_type(self, data_type: str | None) -> str:
81 """Map config data type to SQL database type."""
82 ...
84 def create_table_if_not_exists(
85 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None
86 ) -> None:
87 """Create table if it doesn't exist."""
88 ...
90 def get_existing_columns(self, table_name: str) -> set[str]:
91 """Get set of existing column names in a table."""
92 ...
94 def add_column(self, table_name: str, column_name: str, column_type: str) -> None:
95 """Add a new column to an existing table."""
96 ...
98 def upsert_row(
99 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any]
100 ) -> None:
101 """Upsert a row into the database."""
102 ...
104 def delete_stale_records_compound(
105 self,
106 table_name: str,
107 id_columns: list[str],
108 filter_columns: dict[str, str],
109 current_ids: set[tuple],
110 ) -> int:
111 """Delete records from database that aren't in current CSV using compound filter key."""
112 ...
114 def count_stale_records_compound(
115 self,
116 table_name: str,
117 id_columns: list[str],
118 filter_columns: dict[str, str],
119 current_ids: set[tuple],
120 ) -> int:
121 """Count records that would be deleted using compound filter key."""
122 ...
124 def get_existing_indexes(self, table_name: str) -> set[str]:
125 """Get set of existing index names for a table."""
126 ...
128 def create_index(
129 self, table_name: str, index_name: str, columns: list[tuple[str, str]]
130 ) -> None:
131 """Create an index on the specified columns.
133 Args:
134 table_name: Name of the table
135 index_name: Name of the index to create
136 columns: List of (column_name, order) tuples, e.g. [('email', 'ASC'), ('date', 'DESC')]
137 """
138 ...
140 def table_exists(self, table_name: str) -> bool:
141 """Check if a table exists in the database.
143 Args:
144 table_name: Name of the table to check
146 Returns:
147 True if table exists, False otherwise
148 """
149 ...
152class PostgreSQLBackend:
153 """PostgreSQL database backend."""
155 def __init__(self, connection_string: str) -> None:
156 """Initialize PostgreSQL connection."""
157 self.conn = psycopg.connect(connection_string)
159 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None:
160 """Execute a query."""
161 with self.conn.cursor() as cur:
162 if params:
163 cur.execute(query, params)
164 else:
165 cur.execute(query)
167 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]:
168 """Fetch all results from a query."""
169 with self.conn.cursor() as cur:
170 if params:
171 cur.execute(query, params)
172 else:
173 cur.execute(query)
174 return cur.fetchall()
176 def commit(self) -> None:
177 """Commit the current transaction."""
178 self.conn.commit()
180 def close(self) -> None:
181 """Close the connection."""
182 self.conn.close()
184 def map_data_type(self, data_type: str | None) -> str:
185 """Map config data type to PostgreSQL type."""
186 if data_type is None:
187 return "TEXT"
189 data_type_lower = data_type.lower().strip()
191 # Check for varchar(N) pattern
192 if data_type_lower.startswith("varchar"):
193 return data_type.upper() # VARCHAR(N)
195 # Map other types
196 type_mapping = {
197 "integer": "INTEGER",
198 "int": "INTEGER",
199 "bigint": "BIGINT",
200 "float": "DOUBLE PRECISION",
201 "double": "DOUBLE PRECISION",
202 "date": "DATE",
203 "datetime": "TIMESTAMP",
204 "timestamp": "TIMESTAMP",
205 "text": "TEXT",
206 "string": "TEXT",
207 }
209 return type_mapping.get(data_type_lower, "TEXT")
211 def create_table_if_not_exists(
212 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None
213 ) -> None:
214 """Create table if it doesn't exist."""
215 column_defs = []
216 for col_name, col_type in columns.items():
217 column_defs.append(sql.SQL("{} {}").format(sql.Identifier(col_name), sql.SQL(col_type)))
219 # Add primary key constraint if specified
220 if primary_keys:
221 pk_constraint = sql.SQL("PRIMARY KEY ({})").format(
222 sql.SQL(", ").join(sql.Identifier(pk) for pk in primary_keys)
223 )
224 column_defs.append(pk_constraint)
226 query = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
227 sql.Identifier(table_name), sql.SQL(", ").join(column_defs)
228 )
229 self.execute(query.as_string(self.conn))
230 self.commit()
232 def get_existing_columns(self, table_name: str) -> set[str]:
233 """Get set of existing column names in a table.
235 Uses case-insensitive comparison to handle quoted identifiers that preserve case.
236 """
237 query = """
238 SELECT column_name
239 FROM information_schema.columns
240 WHERE LOWER(table_name) = LOWER(%s)
241 """
242 results = self.fetchall(query, (table_name,))
243 return {row[0].lower() for row in results}
245 def add_column(self, table_name: str, column_name: str, column_type: str) -> None:
246 """Add a new column to an existing table."""
247 query = sql.SQL("ALTER TABLE {} ADD COLUMN {} {}").format(
248 sql.Identifier(table_name),
249 sql.Identifier(column_name),
250 sql.SQL(column_type),
251 )
252 self.execute(query.as_string(self.conn))
253 self.commit()
255 def upsert_row(
256 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any]
257 ) -> None:
258 """Upsert a row into the database."""
259 columns = list(row_data.keys())
260 values = tuple(row_data.values())
262 insert_query = sql.SQL(
263 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {}"
264 ).format(
265 sql.Identifier(table_name),
266 sql.SQL(", ").join(sql.Identifier(col) for col in columns),
267 sql.SQL(", ").join(sql.Placeholder() * len(values)),
268 sql.SQL(", ").join(sql.Identifier(col) for col in conflict_columns),
269 sql.SQL(", ").join(
270 sql.SQL("{} = EXCLUDED.{}").format(sql.Identifier(col), sql.Identifier(col))
271 for col in columns
272 if col not in conflict_columns
273 ),
274 )
275 self.execute(insert_query.as_string(self.conn), values)
276 self.commit()
278 def count_stale_records_compound(
279 self,
280 table_name: str,
281 id_columns: list[str],
282 filter_columns: dict[str, str],
283 current_ids: set[tuple],
284 ) -> int:
285 """Count records that would be deleted using compound filter key.
287 Args:
288 table_name: Name of the table
289 id_columns: List of ID column names (for compound keys)
290 filter_columns: Dictionary of column_name -> value to filter by (compound key)
291 current_ids: Set of ID tuples from the current CSV
293 Returns:
294 Count of records that would be deleted
295 """
296 if not current_ids or not filter_columns:
297 return 0
299 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...)
300 filter_conditions = [
301 sql.SQL("{} = %s").format(sql.Identifier(col)) for col in filter_columns
302 ]
304 if len(id_columns) == 1:
305 # Single key - simpler query
306 current_ids_list = [
307 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids
308 ]
309 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format(
310 sql.Identifier(table_name),
311 sql.SQL(" AND ").join(filter_conditions),
312 sql.Identifier(id_columns[0]),
313 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)),
314 )
315 params = tuple(list(filter_columns.values()) + current_ids_list)
316 else:
317 # Compound key - use row value constructor
318 id_cols_sql = sql.SQL("({})").format(
319 sql.SQL(", ").join(sql.Identifier(col) for col in id_columns)
320 )
321 placeholders = sql.SQL(", ").join(
322 sql.SQL("({})").format(sql.SQL(", ").join(sql.Placeholder() * len(id_columns)))
323 for _ in current_ids
324 )
325 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format(
326 sql.Identifier(table_name),
327 sql.SQL(" AND ").join(filter_conditions),
328 id_cols_sql,
329 placeholders,
330 )
331 # Flatten the list of tuples for params
332 id_params = [val for id_tuple in current_ids for val in id_tuple]
333 params = tuple(list(filter_columns.values()) + id_params)
335 count_result = self.fetchall(count_query.as_string(self.conn), params)
336 return count_result[0][0] if count_result else 0
338 def delete_stale_records_compound(
339 self,
340 table_name: str,
341 id_columns: list[str],
342 filter_columns: dict[str, str],
343 current_ids: set[tuple],
344 ) -> int:
345 """Delete records from database that aren't in current CSV using compound filter key.
347 Args:
348 table_name: Name of the table
349 id_columns: List of ID column names (for compound keys)
350 filter_columns: Dictionary of column_name -> value to filter by (compound key)
351 current_ids: Set of ID tuples from the current CSV
353 Returns:
354 Count of records deleted
355 """
356 if not current_ids or not filter_columns:
357 return 0
359 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...)
360 filter_conditions = [
361 sql.SQL("{} = %s").format(sql.Identifier(col)) for col in filter_columns
362 ]
364 if len(id_columns) == 1:
365 # Single key - simpler query
366 current_ids_list = [
367 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids
368 ]
369 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format(
370 sql.Identifier(table_name),
371 sql.SQL(" AND ").join(filter_conditions),
372 sql.Identifier(id_columns[0]),
373 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)),
374 )
375 delete_query = sql.SQL("DELETE FROM {} WHERE {} AND {} NOT IN ({})").format(
376 sql.Identifier(table_name),
377 sql.SQL(" AND ").join(filter_conditions),
378 sql.Identifier(id_columns[0]),
379 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)),
380 )
381 params = tuple(list(filter_columns.values()) + current_ids_list)
382 else:
383 # Compound key - use row value constructor
384 id_cols_sql = sql.SQL("({})").format(
385 sql.SQL(", ").join(sql.Identifier(col) for col in id_columns)
386 )
387 placeholders = sql.SQL(", ").join(
388 sql.SQL("({})").format(sql.SQL(", ").join(sql.Placeholder() * len(id_columns)))
389 for _ in current_ids
390 )
391 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format(
392 sql.Identifier(table_name),
393 sql.SQL(" AND ").join(filter_conditions),
394 id_cols_sql,
395 placeholders,
396 )
397 delete_query = sql.SQL("DELETE FROM {} WHERE {} AND {} NOT IN ({})").format(
398 sql.Identifier(table_name),
399 sql.SQL(" AND ").join(filter_conditions),
400 id_cols_sql,
401 placeholders,
402 )
403 # Flatten the list of tuples for params
404 id_params = [val for id_tuple in current_ids for val in id_tuple]
405 params = tuple(list(filter_columns.values()) + id_params)
407 # Count first
408 count_sql = count_query.as_string(self.conn)
409 logger.debug(f"PostgreSQL count query: {count_sql}")
410 logger.debug(f"PostgreSQL count params: {params}")
411 count_result = self.fetchall(count_sql, params)
412 deleted_count = count_result[0][0] if count_result else 0
414 # Then delete
415 delete_sql = delete_query.as_string(self.conn)
416 logger.debug(f"PostgreSQL delete query: {delete_sql}")
417 logger.debug(f"PostgreSQL delete params: {params}")
418 logger.debug(f"PostgreSQL deleted count: {deleted_count}")
419 self.execute(delete_sql, params)
420 self.commit()
422 return deleted_count
424 def get_existing_indexes(self, table_name: str) -> set[str]:
425 """Get set of existing index names for a table.
427 Uses case-insensitive comparison to handle quoted identifiers that preserve case.
428 """
429 query = """
430 SELECT indexname
431 FROM pg_indexes
432 WHERE LOWER(tablename) = LOWER(%s)
433 """
434 results = self.fetchall(query, (table_name,))
435 return {row[0].lower() for row in results}
437 def create_index(
438 self, table_name: str, index_name: str, columns: list[tuple[str, str]]
439 ) -> None:
440 """Create an index on the specified columns."""
441 # Build column list with order
442 column_parts = []
443 for col_name, order in columns:
444 column_parts.append(sql.SQL("{} {}").format(sql.Identifier(col_name), sql.SQL(order)))
446 query = sql.SQL("CREATE INDEX IF NOT EXISTS {} ON {} ({})").format(
447 sql.Identifier(index_name),
448 sql.Identifier(table_name),
449 sql.SQL(", ").join(column_parts),
450 )
452 self.execute(query.as_string(self.conn))
453 self.commit()
455 def table_exists(self, table_name: str) -> bool:
456 """Check if a table exists in the database.
458 Uses case-insensitive comparison to handle quoted identifiers that preserve case.
460 Args:
461 table_name: Name of the table to check
463 Returns:
464 True if table exists, False otherwise
465 """
466 query = """
467 SELECT EXISTS (
468 SELECT FROM information_schema.tables
469 WHERE LOWER(table_name) = LOWER(%s)
470 )
471 """
472 result = self.fetchall(query, (table_name,))
473 return result[0][0] if result else False
476class SQLiteBackend:
477 """SQLite database backend."""
479 def __init__(self, connection_string: str) -> None:
480 """Initialize SQLite connection."""
481 # Extract database path from connection string
482 # Supports: sqlite:///path/to/db.db or sqlite:///:memory:
483 if connection_string.startswith("sqlite:///"):
484 db_path = connection_string[10:] # Remove 'sqlite:///'
485 elif connection_string.startswith("sqlite://"):
486 db_path = connection_string[9:] # Remove 'sqlite://'
487 else:
488 db_path = connection_string
490 self.conn = sqlite3.connect(db_path)
491 self.cursor = self.conn.cursor()
493 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None:
494 """Execute a query."""
495 if params:
496 self.cursor.execute(query, params)
497 else:
498 self.cursor.execute(query)
500 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]:
501 """Fetch all results from a query."""
502 if params:
503 self.cursor.execute(query, params)
504 else:
505 self.cursor.execute(query)
506 return self.cursor.fetchall()
508 def commit(self) -> None:
509 """Commit the current transaction."""
510 self.conn.commit()
512 def close(self) -> None:
513 """Close the connection."""
514 self.cursor.close()
515 self.conn.close()
517 def map_data_type(self, data_type: str | None) -> str:
518 """Map config data type to SQLite type."""
519 if data_type is None:
520 return "TEXT"
522 data_type_lower = data_type.lower().strip()
524 # SQLite doesn't have VARCHAR, use TEXT
525 if data_type_lower.startswith("varchar"):
526 return "TEXT"
528 # Map other types
529 type_mapping = {
530 "integer": "INTEGER",
531 "int": "INTEGER",
532 "bigint": "INTEGER", # SQLite INTEGER is 8-byte signed, equivalent to BIGINT
533 "float": "REAL",
534 "double": "REAL",
535 "date": "TEXT",
536 "datetime": "TEXT",
537 "timestamp": "TEXT",
538 "text": "TEXT",
539 "string": "TEXT",
540 }
542 return type_mapping.get(data_type_lower, "TEXT")
544 def create_table_if_not_exists(
545 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None
546 ) -> None:
547 """Create table if it doesn't exist."""
548 column_defs_str = ", ".join(
549 f'"{col_name}" {col_type}' for col_name, col_type in columns.items()
550 )
552 # Add primary key constraint if specified
553 if primary_keys:
554 pk_columns = ", ".join(f'"{pk}"' for pk in primary_keys)
555 column_defs_str += f", PRIMARY KEY ({pk_columns})"
557 query = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({column_defs_str})'
558 self.execute(query)
559 self.commit()
561 def get_existing_columns(self, table_name: str) -> set[str]:
562 """Get set of existing column names in a table."""
563 query = f'PRAGMA table_info("{table_name}")'
564 results = self.fetchall(query)
565 # PRAGMA table_info returns: (cid, name, type, notnull, dflt_value, pk)
566 return {row[1].lower() for row in results}
568 def add_column(self, table_name: str, column_name: str, column_type: str) -> None:
569 """Add a new column to an existing table."""
570 query = f'ALTER TABLE "{table_name}" ADD COLUMN "{column_name}" {column_type}'
571 self.execute(query)
572 self.commit()
574 def upsert_row(
575 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any]
576 ) -> None:
577 """Upsert a row into the database."""
578 columns = list(row_data.keys())
579 values = tuple(row_data.values())
581 columns_str = ", ".join(f'"{col}"' for col in columns)
582 placeholders = ", ".join("?" * len(values))
583 update_str = ", ".join(
584 f'"{col}" = excluded."{col}"' for col in columns if col not in conflict_columns
585 )
587 # SQLite ON CONFLICT clause with multiple columns
588 conflict_cols_str = ", ".join(f'"{col}"' for col in conflict_columns)
590 query = f'INSERT INTO "{table_name}" ({columns_str}) VALUES ({placeholders}) '
591 query += f"ON CONFLICT ({conflict_cols_str}) DO UPDATE SET {update_str}"
593 self.execute(query, values)
594 self.commit()
596 def get_existing_indexes(self, table_name: str) -> set[str]:
597 """Get set of existing index names for a table."""
598 query = "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name=?"
599 results = self.fetchall(query, (table_name,))
600 return {row[0].lower() for row in results}
602 def create_index(
603 self, table_name: str, index_name: str, columns: list[tuple[str, str]]
604 ) -> None:
605 """Create an index on the specified columns."""
606 # Build column list with order
607 column_parts = []
608 for col_name, order in columns:
609 column_parts.append(f'"{col_name}" {order}')
611 columns_str = ", ".join(column_parts)
612 query = f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ({columns_str})'
614 self.execute(query)
615 self.commit()
617 def table_exists(self, table_name: str) -> bool:
618 """Check if a table exists in the database.
620 Args:
621 table_name: Name of the table to check
623 Returns:
624 True if table exists, False otherwise
625 """
626 query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?"
627 result = self.fetchall(query, (table_name,))
628 return len(result) > 0
630 def delete_stale_records_compound(
631 self,
632 table_name: str,
633 id_columns: list[str],
634 filter_columns: dict[str, str],
635 current_ids: set[tuple],
636 ) -> int:
637 """Delete records from database that aren't in current CSV using compound filter key.
639 Args:
640 table_name: Name of the table
641 id_columns: List of ID column names (for compound keys)
642 filter_columns: Dictionary of column_name -> value to filter by (compound key)
643 current_ids: Set of ID tuples from the current CSV
645 Returns:
646 Count of records deleted
647 """
648 if not current_ids or not filter_columns:
649 return 0
651 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...)
652 filter_conditions = [f'"{col}" = ?' for col in filter_columns]
654 if len(id_columns) == 1:
655 # Single key - simpler query
656 current_ids_list = [
657 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids
658 ]
659 placeholders = ", ".join("?" * len(current_ids_list))
660 count_query = f"""
661 SELECT COUNT(*) FROM "{table_name}"
662 WHERE {" AND ".join(filter_conditions)}
663 AND "{id_columns[0]}" NOT IN ({placeholders})
664 """
665 delete_query = f"""
666 DELETE FROM "{table_name}"
667 WHERE {" AND ".join(filter_conditions)}
668 AND "{id_columns[0]}" NOT IN ({placeholders})
669 """
670 params = tuple(list(filter_columns.values()) + current_ids_list)
671 else:
672 # Compound key - use row value constructor
673 quoted_cols = [f'"{col}"' for col in id_columns]
674 id_cols = f"({', '.join(quoted_cols)})"
675 placeholders = ", ".join(f"({', '.join('?' * len(id_columns))})" for _ in current_ids)
676 count_query = f"""
677 SELECT COUNT(*) FROM "{table_name}"
678 WHERE {" AND ".join(filter_conditions)}
679 AND {id_cols} NOT IN ({placeholders})
680 """
681 delete_query = f"""
682 DELETE FROM "{table_name}"
683 WHERE {" AND ".join(filter_conditions)}
684 AND {id_cols} NOT IN ({placeholders})
685 """
686 # Flatten the list of tuples for params
687 id_params = [val for id_tuple in current_ids for val in id_tuple]
688 params = tuple(list(filter_columns.values()) + id_params)
690 # Count first
691 logger.debug(f"SQLite count query: {count_query}")
692 logger.debug(f"SQLite count params: {params}")
693 count_result = self.fetchall(count_query, params)
694 deleted_count = count_result[0][0] if count_result else 0
696 # Delete stale records
697 logger.debug(f"SQLite delete query: {delete_query}")
698 logger.debug(f"SQLite delete params: {params}")
699 logger.debug(f"SQLite deleted count: {deleted_count}")
700 self.execute(delete_query, params)
701 self.commit()
703 return deleted_count
705 def count_stale_records_compound(
706 self,
707 table_name: str,
708 id_columns: list[str],
709 filter_columns: dict[str, str],
710 current_ids: set[tuple],
711 ) -> int:
712 """Count records that would be deleted using compound filter key.
714 Args:
715 table_name: Name of the table
716 id_columns: List of ID column names (for compound keys)
717 filter_columns: Dictionary of column_name -> value to filter by (compound key)
718 current_ids: Set of ID tuples from the current CSV
720 Returns:
721 Count of records that would be deleted
722 """
723 if not current_ids or not filter_columns:
724 return 0
726 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...)
727 filter_conditions = [f'"{col}" = ?' for col in filter_columns]
729 if len(id_columns) == 1:
730 # Single key - simpler query
731 current_ids_list = [
732 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids
733 ]
734 placeholders = ", ".join("?" * len(current_ids_list))
735 count_query = f"""
736 SELECT COUNT(*) FROM "{table_name}"
737 WHERE {" AND ".join(filter_conditions)}
738 AND "{id_columns[0]}" NOT IN ({placeholders})
739 """
740 params = tuple(list(filter_columns.values()) + current_ids_list)
741 else:
742 # Compound key - use row value constructor
743 quoted_cols = [f'"{col}"' for col in id_columns]
744 id_cols = f"({', '.join(quoted_cols)})"
745 placeholders = ", ".join(f"({', '.join('?' * len(id_columns))})" for _ in current_ids)
746 count_query = f"""
747 SELECT COUNT(*) FROM "{table_name}"
748 WHERE {" AND ".join(filter_conditions)}
749 AND {id_cols} NOT IN ({placeholders})
750 """
751 # Flatten the list of tuples for params
752 id_params = [val for id_tuple in current_ids for val in id_tuple]
753 params = tuple(list(filter_columns.values()) + id_params)
755 count_result = self.fetchall(count_query, params)
756 return count_result[0][0] if count_result else 0
759class DatabaseConnection:
760 """Database connection handler supporting PostgreSQL and SQLite."""
762 def __init__(self, connection_string: str) -> None:
763 """Initialize database connection.
765 Args:
766 connection_string: Database connection string
767 - PostgreSQL: postgresql://user:pass@host:port/db
768 - SQLite: sqlite:///path/to/db.db or sqlite:///:memory:
769 """
770 self.connection_string = connection_string
771 self.backend: DatabaseBackend | None = None
773 def __enter__(self) -> DatabaseConnection:
774 """Enter context manager."""
775 if self.connection_string.startswith("sqlite"):
776 self.backend = SQLiteBackend(self.connection_string)
777 elif self.connection_string.startswith("postgres"):
778 self.backend = PostgreSQLBackend(self.connection_string)
779 else:
780 raise ValueError(
781 f"Unsupported database type in connection string: {self.connection_string}"
782 )
783 return self
785 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
786 """Exit context manager."""
787 if self.backend:
788 self.backend.close()
790 def create_table_if_not_exists(
791 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None
792 ) -> None:
793 """Create table if it doesn't exist."""
794 if not self.backend:
795 raise RuntimeError("Database connection not established")
796 self.backend.create_table_if_not_exists(table_name, columns, primary_keys)
798 def get_existing_columns(self, table_name: str) -> set[str]:
799 """Get set of existing column names in a table."""
800 if not self.backend:
801 raise RuntimeError("Database connection not established")
802 return self.backend.get_existing_columns(table_name)
804 def add_column(self, table_name: str, column_name: str, column_type: str) -> None:
805 """Add a new column to an existing table."""
806 if not self.backend:
807 raise RuntimeError("Database connection not established")
808 self.backend.add_column(table_name, column_name, column_type)
810 def upsert_row(
811 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any]
812 ) -> None:
813 """Upsert a row into the database."""
814 if not self.backend:
815 raise RuntimeError("Database connection not established")
816 self.backend.upsert_row(table_name, conflict_columns, row_data)
818 def delete_stale_records_compound(
819 self,
820 table_name: str,
821 id_columns: list[str],
822 filter_columns: dict[str, str],
823 current_ids: set[tuple],
824 ) -> int:
825 """Delete records from database that aren't in current CSV using compound filter key."""
826 if not self.backend:
827 raise RuntimeError("Database connection not established")
828 return self.backend.delete_stale_records_compound(
829 table_name, id_columns, filter_columns, current_ids
830 )
832 def count_stale_records_compound(
833 self,
834 table_name: str,
835 id_columns: list[str],
836 filter_columns: dict[str, str],
837 current_ids: set[tuple],
838 ) -> int:
839 """Count records that would be deleted using compound filter key."""
840 if not self.backend:
841 raise RuntimeError("Database connection not established")
842 return self.backend.count_stale_records_compound(
843 table_name, id_columns, filter_columns, current_ids
844 )
846 def get_existing_indexes(self, table_name: str) -> set[str]:
847 """Get set of existing index names for a table."""
848 if not self.backend:
849 raise RuntimeError("Database connection not established")
850 return self.backend.get_existing_indexes(table_name)
852 def create_index(
853 self, table_name: str, index_name: str, columns: list[tuple[str, str]]
854 ) -> None:
855 """Create an index on the specified columns."""
856 if not self.backend:
857 raise RuntimeError("Database connection not established")
858 self.backend.create_index(table_name, index_name, columns)
860 def table_exists(self, table_name: str) -> bool:
861 """Check if a table exists in the database.
863 Args:
864 table_name: Name of the table to check
866 Returns:
867 True if table exists, False otherwise
868 """
869 if not self.backend:
870 raise RuntimeError("Database connection not established")
871 return self.backend.table_exists(table_name)
873 def _validate_id_columns(self, job: CrumpJob, csv_columns: set[str]) -> set[str]:
874 """Validate that required ID columns exist in CSV.
876 Args:
877 job: CrumpJob configuration
878 csv_columns: Set of column names from CSV
880 Returns:
881 Set of ID column names from CSV
883 Raises:
884 ValueError: If any ID column is missing from CSV
885 """
886 id_csv_columns = set()
887 for id_col in job.id_mapping:
888 # Skip validation for custom functions (no csv_column)
889 if id_col.csv_column is None:
890 # Custom function - validate input columns instead
891 if id_col.input_columns:
892 for input_col in id_col.input_columns:
893 if input_col not in csv_columns:
894 raise ValueError(
895 f"Input column '{input_col}' for custom function "
896 f"'{id_col.db_column}' not found in CSV"
897 )
898 continue
900 if id_col.csv_column not in csv_columns:
901 raise ValueError(f"ID column '{id_col.csv_column}' not found in CSV")
902 id_csv_columns.add(id_col.csv_column)
903 return id_csv_columns
905 def _determine_sync_columns(
906 self, job: CrumpJob, csv_columns: set[str], id_csv_columns: set[str]
907 ) -> list[Any]:
908 """Determine which columns to sync based on job configuration.
910 When failure_mode is set, missing CSV columns for configured mappings are
911 tolerated (the column is kept so rows can receive default/null values).
912 Custom function input columns that are missing always raise ValueError.
914 Args:
915 job: CrumpJob configuration
916 csv_columns: Set of column names from CSV
917 id_csv_columns: Set of ID column names
919 Returns:
920 List of ColumnMapping objects for columns to sync
922 Raises:
923 ValueError: If a custom function input column is missing from CSV
924 """
925 if job.columns:
926 # Specific columns defined
927 sync_columns = list(job.id_mapping) + job.columns
928 for col_mapping in job.columns:
929 # Skip validation for custom functions (no csv_column)
930 if col_mapping.csv_column is None:
931 # Custom function - validate input columns instead
932 if col_mapping.input_columns:
933 for input_col in col_mapping.input_columns:
934 if input_col not in csv_columns:
935 raise ValueError(
936 f"Input column '{input_col}' for custom function "
937 f"'{col_mapping.db_column}' not found in CSV"
938 )
939 continue
941 if col_mapping.csv_column not in csv_columns:
942 # Column is missing from CSV - log warning but keep it
943 # Row validation will handle this per-row based on failure_mode
944 logger.warning(
945 f"Column '{col_mapping.csv_column}' defined in config "
946 f"but not found in CSV file"
947 )
948 else:
949 # Sync all columns
950 sync_columns = list(job.id_mapping)
951 for csv_col in csv_columns:
952 if csv_col not in id_csv_columns:
953 sync_columns.append(ColumnMapping(csv_col, csv_col))
955 return sync_columns
957 def _build_column_definitions(self, sync_columns: list[Any], job: CrumpJob) -> dict[str, str]:
958 """Build column definitions with SQL types and nullable constraints.
960 Args:
961 sync_columns: List of ColumnMapping objects
962 job: CrumpJob configuration
964 Returns:
965 Dictionary mapping column names to SQL type definitions (including NULL/NOT NULL)
966 """
967 if not self.backend:
968 raise RuntimeError("Database connection not established")
969 columns_def = {}
970 for col_mapping in sync_columns:
971 sql_type = self.backend.map_data_type(col_mapping.data_type)
973 # Add nullable constraint if specified
974 if col_mapping.nullable is not None:
975 if col_mapping.nullable:
976 sql_type += " NULL"
977 else:
978 sql_type += " NOT NULL"
980 columns_def[col_mapping.db_column] = sql_type
982 # Add filename_to_column columns if configured
983 if job.filename_to_column:
984 for col_mapping in job.filename_to_column.columns.values():
985 sql_type = self.backend.map_data_type(col_mapping.data_type)
986 columns_def[col_mapping.db_column] = sql_type
988 return columns_def
990 def _setup_table_schema(
991 self, job: CrumpJob, columns_def: dict[str, str], primary_keys: list[str]
992 ) -> bool:
993 """Create table and add missing columns/indexes.
995 Args:
996 job: CrumpJob configuration
997 columns_def: Dictionary mapping column names to SQL types
998 primary_keys: List of primary key column names
1000 Returns:
1001 True if schema changes were made (table created, columns added, or indexes created)
1002 """
1003 schema_changed = False
1005 # Check if table exists before creating
1006 table_existed = self.table_exists(job.target_table)
1008 # Create table if it doesn't exist
1009 self.create_table_if_not_exists(job.target_table, columns_def, primary_keys)
1011 if not table_existed:
1012 schema_changed = True
1014 # Check for schema evolution: add missing columns from config
1015 existing_columns = self.get_existing_columns(job.target_table)
1016 for col_name, col_type in columns_def.items():
1017 if col_name.lower() not in existing_columns:
1018 self.add_column(job.target_table, col_name, col_type)
1019 schema_changed = True
1021 # Create indexes that don't already exist
1022 if job.indexes:
1023 existing_indexes = self.get_existing_indexes(job.target_table)
1024 for index in job.indexes:
1025 if index.name.lower() not in existing_indexes:
1026 index_columns = [(col.column, col.order) for col in index.columns]
1027 self.create_index(job.target_table, index.name, index_columns)
1028 schema_changed = True
1030 return schema_changed
1032 def _should_include_row(
1033 self, row_index: int, total_rows: int, sample_percentage: float | None
1034 ) -> bool:
1035 """Determine if a row should be included based on sampling percentage.
1037 Args:
1038 row_index: Zero-based index of the current row
1039 total_rows: Total number of rows in the dataset
1040 sample_percentage: Optional percentage of rows to sample (0-100)
1042 Returns:
1043 True if row should be included, False otherwise
1044 """
1045 # If no sampling or 100%, include all rows
1046 if sample_percentage is None or sample_percentage >= 100:
1047 return True
1049 # If 0%, exclude all rows (edge case)
1050 if sample_percentage <= 0:
1051 return False
1053 # Always include first row
1054 if row_index == 0:
1055 return True
1057 # Always include last row
1058 if row_index == total_rows - 1:
1059 return True
1061 # Sample other rows based on percentage
1062 # For 10%, interval = 10, so include rows 0, 10, 20, 30...
1063 # For 25%, interval = 4, so include rows 0, 4, 8, 12...
1064 interval = int(100 / sample_percentage)
1065 return row_index % interval == 0
1067 @staticmethod
1068 def _get_varchar_limit(data_type: str | None) -> int | None:
1069 """Extract the character limit from a varchar(N) type string.
1071 Args:
1072 data_type: Data type string, e.g. 'varchar(50)'
1074 Returns:
1075 The limit N, or None if not a varchar type
1076 """
1077 if data_type is None:
1078 return None
1079 import re as _re
1081 match = _re.match(r"varchar\((\d+)\)", data_type.lower().strip())
1082 if match:
1083 return int(match.group(1))
1084 return None
1086 @staticmethod
1087 def _get_default_value(data_type: str | None) -> Any:
1088 """Get the permissive default value for a non-nullable column.
1090 Args:
1091 data_type: The configured data type
1093 Returns:
1094 0 for integer/numeric types, empty string for text/string types
1095 """
1096 if data_type is None:
1097 return ""
1098 dt_lower = data_type.lower().strip()
1099 if dt_lower in ("integer", "int", "bigint"):
1100 return 0
1101 if dt_lower in ("float", "double"):
1102 return 0.0
1103 return ""
1105 def _validate_and_fix_row(
1106 self,
1107 row_data: dict[str, Any],
1108 sync_columns: list[Any],
1109 job: CrumpJob,
1110 csv_row: dict[str, Any],
1111 ) -> dict[str, Any] | None:
1112 """Validate a transformed row and apply failure_mode rules.
1114 Handles:
1115 - Missing nullable fields → NULL (both modes)
1116 - Missing non-nullable fields → skip row (STRICT), default value (PERMISSIVE)
1117 - String exceeding varchar limit → skip row (STRICT), truncate (PERMISSIVE)
1119 Args:
1120 row_data: The transformed row data (db_column → value)
1121 sync_columns: List of ColumnMapping objects
1122 job: CrumpJob configuration
1123 csv_row: The original CSV row (for context in logging)
1125 Returns:
1126 The validated/fixed row_data dict, or None if the row should be skipped
1127 """
1128 failure_mode = job.failure_mode
1130 for col_mapping in sync_columns:
1131 db_col = col_mapping.db_column
1133 # Determine if this column's value is missing from the CSV
1134 # A value is "missing" if:
1135 # - The db_col key is absent from row_data, OR
1136 # - The value is None (set by apply_row_transformations for missing CSV cols), OR
1137 # - The CSV column was not present in the original row (empty string artifact)
1138 value = row_data.get(db_col)
1139 is_missing = (
1140 db_col not in row_data
1141 or value is None
1142 or (
1143 value == ""
1144 and col_mapping.csv_column is not None
1145 and col_mapping.csv_column not in csv_row
1146 )
1147 )
1149 if is_missing:
1150 if col_mapping.nullable is False:
1151 # Non-nullable field missing
1152 if failure_mode == FailureMode.STRICT:
1153 logger.warning(
1154 f"STRICT mode: Skipping row - missing non-nullable field '{db_col}'"
1155 )
1156 return None
1157 else:
1158 # PERMISSIVE: use default value
1159 default = self._get_default_value(col_mapping.data_type)
1160 logger.warning(
1161 f"PERMISSIVE mode: Using default value {default!r} "
1162 f"for missing non-nullable field '{db_col}'"
1163 )
1164 row_data[db_col] = default
1165 else:
1166 # Nullable or unspecified → NULL
1167 row_data[db_col] = None
1169 # Check varchar limit
1170 varchar_limit = self._get_varchar_limit(col_mapping.data_type)
1171 if varchar_limit is not None and db_col in row_data and row_data[db_col] is not None:
1172 value = str(row_data[db_col])
1173 if len(value) > varchar_limit:
1174 if failure_mode == FailureMode.STRICT:
1175 logger.warning(
1176 f"STRICT mode: Skipping row - value for '{db_col}' "
1177 f"exceeds varchar({varchar_limit}) limit "
1178 f"(length {len(value)})"
1179 )
1180 return None
1181 else:
1182 # PERMISSIVE: truncate
1183 logger.warning(
1184 f"PERMISSIVE mode: Truncating value for '{db_col}' "
1185 f"from {len(value)} to {varchar_limit} characters"
1186 )
1187 row_data[db_col] = value[:varchar_limit]
1189 return row_data
1191 def _process_tabular_rows(
1192 self,
1193 reader: Any,
1194 job: CrumpJob,
1195 sync_columns: list[Any],
1196 primary_keys: list[str],
1197 filename_values: dict[str, str] | None = None,
1198 ) -> tuple[int, set[tuple]]:
1199 """Process and upsert tabular file rows into database.
1201 Args:
1202 reader: Tabular file reader (DictReader interface)
1203 job: CrumpJob configuration
1204 sync_columns: List of ColumnMapping objects
1205 primary_keys: List of primary key column names
1206 filename_values: Optional dict of values extracted from filename
1208 Returns:
1209 Tuple of (rows_synced, synced_ids) where synced_ids are tuples of ID values
1210 """
1211 rows_synced = 0
1212 rows_skipped = 0
1213 synced_ids: set[tuple] = set()
1215 # For sampling, we need to know total row count first
1216 if job.sample_percentage is not None and job.sample_percentage < 100:
1217 # Read all rows into memory to get total count and apply sampling
1218 all_rows = list(reader)
1219 total_rows = len(all_rows)
1221 for row_index, row in enumerate(all_rows):
1222 # Check if this row should be included
1223 if not self._should_include_row(row_index, total_rows, job.sample_percentage):
1224 continue
1226 # Apply column transformations
1227 row_data = apply_row_transformations(
1228 row, sync_columns, job.filename_to_column, filename_values
1229 )
1231 # Validate and fix row based on failure_mode
1232 validated = self._validate_and_fix_row(row_data, sync_columns, job, row)
1233 if validated is None:
1234 rows_skipped += 1
1235 continue
1237 self.upsert_row(job.target_table, primary_keys, validated)
1239 # Track synced IDs as tuples (for compound key support)
1240 id_values = tuple(validated[id_col.db_column] for id_col in job.id_mapping)
1241 synced_ids.add(id_values)
1242 rows_synced += 1
1243 else:
1244 # No sampling - process rows normally without loading into memory
1245 for row in reader:
1246 # Apply column transformations
1247 row_data = apply_row_transformations(
1248 row, sync_columns, job.filename_to_column, filename_values
1249 )
1251 # Validate and fix row based on failure_mode
1252 validated = self._validate_and_fix_row(row_data, sync_columns, job, row)
1253 if validated is None:
1254 rows_skipped += 1
1255 continue
1257 self.upsert_row(job.target_table, primary_keys, validated)
1259 # Track synced IDs as tuples (for compound key support)
1260 id_values = tuple(validated[id_col.db_column] for id_col in job.id_mapping)
1261 synced_ids.add(id_values)
1262 rows_synced += 1
1264 if rows_skipped > 0:
1265 logger.warning(f"Skipped {rows_skipped} rows due to validation failures")
1267 # In STRICT mode, if the file had rows but ALL were rejected, raise an error
1268 if job.failure_mode == FailureMode.STRICT and rows_skipped > 0 and rows_synced == 0:
1269 raise ValueError(
1270 f"STRICT mode: All {rows_skipped} row(s) were rejected due to "
1271 f"validation failures. No data was imported into '{job.target_table}'."
1272 )
1274 return rows_synced, synced_ids
1276 def _count_and_track_tabular_rows(
1277 self,
1278 file_path: Path,
1279 job: CrumpJob,
1280 sync_columns: list[Any],
1281 filename_values: dict[str, str] | None = None,
1282 ) -> tuple[int, set[tuple]]:
1283 """Count CSV rows and track synced IDs without database operations.
1285 This helper method processes the CSV to count rows and collect IDs that would be synced,
1286 which is shared logic between dry-run and actual sync operations.
1288 Args:
1289 file_path: Path to tabular file (CSV or Parquet)
1290 job: CrumpJob configuration
1291 sync_columns: List of ColumnMapping objects
1292 filename_values: Optional dict of values extracted from filename
1294 Returns:
1295 Tuple of (row_count, synced_ids) where synced_ids are tuples of ID values
1296 """
1297 row_count = 0
1298 synced_ids: set[tuple] = set()
1300 file_format = _detect_file_format(file_path)
1302 with create_reader(file_path, file_format=file_format) as reader:
1303 # For sampling, we need to know total row count first
1304 if job.sample_percentage is not None and job.sample_percentage < 100:
1305 # Read all rows into memory to get total count and apply sampling
1306 all_rows = list(reader)
1307 total_rows = len(all_rows)
1309 for row_index, row in enumerate(all_rows):
1310 # Check if this row should be included
1311 if not self._should_include_row(row_index, total_rows, job.sample_percentage):
1312 continue
1314 # Apply column transformations
1315 row_data = apply_row_transformations(
1316 row, sync_columns, job.filename_to_column, filename_values
1317 )
1319 # Track synced IDs as tuples (for compound key support)
1320 id_values = tuple(row_data[id_col.db_column] for id_col in job.id_mapping)
1321 synced_ids.add(id_values)
1322 row_count += 1
1323 else:
1324 # No sampling - process rows normally
1325 for row in reader:
1326 # Apply column transformations
1327 row_data = apply_row_transformations(
1328 row, sync_columns, job.filename_to_column, filename_values
1329 )
1331 # Track synced IDs as tuples (for compound key support)
1332 id_values = tuple(row_data[id_col.db_column] for id_col in job.id_mapping)
1333 synced_ids.add(id_values)
1334 row_count += 1
1336 return row_count, synced_ids
1338 def _prepare_sync(
1339 self, file_path: Path, job: CrumpJob
1340 ) -> tuple[set[str], list[Any], dict[str, str]]:
1341 """Prepare for sync by validating CSV and building schema definitions.
1343 Args:
1344 file_path: Path to tabular file (CSV or Parquet)
1345 job: CrumpJob configuration
1347 Returns:
1348 Tuple of (csv_columns, sync_columns, columns_def)
1350 Raises:
1351 FileNotFoundError: If CSV file doesn't exist
1352 ValueError: If CSV is invalid or columns don't match
1353 """
1354 if not file_path.exists():
1355 raise FileNotFoundError(f"File not found: {file_path}")
1357 file_format = _detect_file_format(file_path)
1359 with create_reader(file_path, file_format=file_format) as reader:
1360 if not reader.fieldnames:
1361 raise ValueError("File has no columns")
1362 csv_columns = set(reader.fieldnames)
1364 # Validate and determine columns to sync
1365 id_csv_columns = self._validate_id_columns(job, csv_columns)
1366 sync_columns = self._determine_sync_columns(job, csv_columns, id_csv_columns)
1368 # Build schema definitions
1369 columns_def = self._build_column_definitions(sync_columns, job)
1371 return csv_columns, sync_columns, columns_def
1373 def sync_tabular_file_dry_run(
1374 self,
1375 file_path: Path,
1376 job: CrumpJob,
1377 filename_values: dict[str, str] | None = None,
1378 ) -> DryRunSummary:
1379 """Simulate syncing a CSV file without making database changes.
1381 Args:
1382 file_path: Path to tabular file (CSV or Parquet)
1383 job: CrumpJob configuration
1384 filename_values: Optional dict of values extracted from filename
1386 Returns:
1387 DryRunSummary with details of what would be changed
1389 Raises:
1390 FileNotFoundError: If CSV file doesn't exist
1391 ValueError: If CSV is invalid or columns don't match
1392 """
1393 summary = DryRunSummary()
1394 summary.table_name = job.target_table
1396 # Prepare sync (validates CSV and builds schema)
1397 csv_columns, sync_columns, columns_def = self._prepare_sync(file_path, job)
1399 # Check what schema changes would be made
1400 summary.table_exists = self.table_exists(job.target_table)
1402 if summary.table_exists:
1403 # Check for new columns
1404 existing_columns = self.get_existing_columns(job.target_table)
1405 for col_name, col_type in columns_def.items():
1406 if col_name.lower() not in existing_columns:
1407 summary.new_columns.append((col_name, col_type))
1409 # Check for new indexes
1410 if job.indexes:
1411 existing_indexes = self.get_existing_indexes(job.target_table)
1412 for index in job.indexes:
1413 if index.name.lower() not in existing_indexes:
1414 summary.new_indexes.append(index.name)
1416 # Count rows and track IDs that would be synced
1417 # NOTE: This counts all CSV rows, even if they match existing data.
1418 # A more accurate implementation would query existing data and compare,
1419 # but that would be expensive for large datasets. For now, we report
1420 # the upper bound of rows that could be updated.
1421 # If there are new columns, all rows will need updating regardless.
1422 summary.rows_to_sync, synced_ids = self._count_and_track_tabular_rows(
1423 file_path, job, sync_columns, filename_values
1424 )
1426 # Count stale records that would be deleted
1427 if job.filename_to_column and filename_values and summary.table_exists:
1428 delete_key_columns = job.filename_to_column.get_delete_key_columns()
1429 if delete_key_columns:
1430 # Build compound key values from filename_values
1431 delete_key_values = {}
1432 for col_name, col_mapping in job.filename_to_column.columns.items():
1433 if col_mapping.use_to_delete_old_rows and col_name in filename_values:
1434 delete_key_values[col_mapping.db_column] = filename_values[col_name]
1436 id_columns = [id_col.db_column for id_col in job.id_mapping]
1437 summary.rows_to_delete = self.count_stale_records_compound(
1438 job.target_table,
1439 id_columns,
1440 delete_key_values,
1441 synced_ids,
1442 )
1444 return summary
1446 def sync_tabular_file(
1447 self,
1448 file_path: Path,
1449 job: CrumpJob,
1450 filename_values: dict[str, str] | None = None,
1451 enable_history: bool = False,
1452 ) -> int:
1453 """Sync a CSV file to the database using job configuration.
1455 Args:
1456 file_path: Path to tabular file (CSV or Parquet)
1457 job: CrumpJob configuration
1458 filename_values: Optional dict of values extracted from filename
1459 enable_history: Whether to record sync history
1461 Returns:
1462 Number of rows synced
1464 Raises:
1465 FileNotFoundError: If CSV file doesn't exist
1466 ValueError: If CSV is invalid or columns don't match
1467 """
1468 from crump.history import get_utc_now, record_sync_history
1470 # Track timing if history is enabled
1471 start_time = get_utc_now() if enable_history else None
1472 rows_deleted = 0
1473 schema_changed = False
1474 error_message: str | None = None
1475 success = False
1477 try:
1478 # Prepare sync (validates CSV and builds schema)
1479 csv_columns, sync_columns, columns_def = self._prepare_sync(file_path, job)
1481 # Build schema and setup table
1482 primary_keys = [id_col.db_column for id_col in job.id_mapping]
1483 logger.debug(f"Primary keys for table {job.target_table}: {primary_keys}")
1484 schema_changed = self._setup_table_schema(job, columns_def, primary_keys)
1486 # Process rows
1487 file_format = _detect_file_format(file_path)
1488 with create_reader(file_path, file_format=file_format) as reader:
1489 rows_synced, synced_ids = self._process_tabular_rows(
1490 reader, job, sync_columns, primary_keys, filename_values
1491 )
1493 # Clean up stale records
1494 if job.filename_to_column and filename_values:
1495 delete_key_columns = job.filename_to_column.get_delete_key_columns()
1496 if delete_key_columns:
1497 # Build compound key values from filename_values
1498 delete_key_values = {}
1499 for col_name, col_mapping in job.filename_to_column.columns.items():
1500 if col_mapping.use_to_delete_old_rows and col_name in filename_values:
1501 delete_key_values[col_mapping.db_column] = filename_values[col_name]
1503 id_columns = [id_col.db_column for id_col in job.id_mapping]
1504 rows_deleted = self.delete_stale_records_compound(
1505 job.target_table,
1506 id_columns,
1507 delete_key_values,
1508 synced_ids,
1509 )
1511 success = True
1512 return rows_synced
1514 except Exception as e:
1515 error_message = str(e)
1516 raise
1518 finally:
1519 # Record history if enabled and we have a backend
1520 if enable_history and self.backend and start_time:
1521 end_time = get_utc_now()
1522 # If sync failed, rows_synced might not be set
1523 final_rows_synced = rows_synced if success else 0
1524 try:
1525 record_sync_history(
1526 backend=self.backend,
1527 file_path=file_path,
1528 table_name=job.target_table,
1529 rows_upserted=final_rows_synced,
1530 rows_deleted=rows_deleted,
1531 schema_changed=schema_changed,
1532 start_time=start_time,
1533 end_time=end_time,
1534 success=success,
1535 error=error_message,
1536 )
1537 except Exception as hist_error:
1538 # Don't fail the sync if history recording fails
1539 logger.warning(f"Failed to record sync history: {hist_error}")
1542def sync_file_to_db(
1543 file_path: Path,
1544 job: CrumpJob,
1545 db_connection_string: str,
1546 filename_values: dict[str, str] | None = None,
1547 enable_history: bool = False,
1548) -> int:
1549 """Sync a tabular file (CSV or Parquet) to database.
1551 Args:
1552 file_path: Path to the tabular file (CSV or Parquet)
1553 job: CrumpJob configuration
1554 db_connection_string: Database connection string (PostgreSQL or SQLite)
1555 filename_values: Optional dict of values extracted from filename
1556 enable_history: Whether to record sync history
1558 Returns:
1559 Number of rows synced
1560 """
1561 with DatabaseConnection(db_connection_string) as db:
1562 return db.sync_tabular_file(file_path, job, filename_values, enable_history)
1565def sync_file_to_db_dry_run(
1566 file_path: Path,
1567 job: CrumpJob,
1568 db_connection_string: str,
1569 filename_values: dict[str, str] | None = None,
1570) -> DryRunSummary:
1571 """Simulate syncing a tabular file without making database changes.
1573 Args:
1574 file_path: Path to the tabular file (CSV or Parquet)
1575 job: CrumpJob configuration
1576 db_connection_string: Database connection string
1577 filename_values: Optional dict of values extracted from filename
1579 Returns:
1580 DryRunSummary with details of what would be changed
1581 """
1582 with DatabaseConnection(db_connection_string) as db:
1583 return db.sync_tabular_file_dry_run(file_path, job, filename_values)
1586# Backward compatibility aliases