Coverage for src / crump / database.py: 91%

586 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-10 18:00 +0000

1"""Database operations for crump.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import sqlite3 

7from pathlib import Path 

8from typing import Any, Protocol 

9 

10import psycopg 

11from psycopg import sql 

12 

13from crump.config import ColumnMapping, CrumpJob, FailureMode, apply_row_transformations 

14from crump.tabular_file import create_reader 

15 

16 

17def _detect_file_format(file_path: Path) -> Any: 

18 """Detect file format from extension for tabular files. 

19 

20 Args: 

21 file_path: Path to the file 

22 

23 Returns: 

24 InputFileType enum value (CSV or PARQUET only, defaults to CSV for unknown extensions) 

25 

26 Note: 

27 This function only detects CSV and Parquet formats since those are the 

28 formats supported by the tabular file reader. CDF files are not directly 

29 syncable and must be extracted first. 

30 """ 

31 from crump.file_types import InputFileType 

32 

33 try: 

34 file_type = InputFileType.from_path(str(file_path)) 

35 # Only return CSV or PARQUET; treat everything else (including CDF) as CSV 

36 if file_type == InputFileType.PARQUET: 

37 return InputFileType.PARQUET 

38 else: 

39 return InputFileType.CSV 

40 except ValueError: 

41 # Unknown extension, default to CSV 

42 return InputFileType.CSV 

43 

44 

45logger = logging.getLogger(__name__) 

46 

47 

48class DryRunSummary: 

49 """Summary of changes that would be made during a dry-run sync.""" 

50 

51 def __init__(self) -> None: 

52 """Initialize dry-run summary.""" 

53 self.table_name: str = "" 

54 self.table_exists: bool = False 

55 self.new_columns: list[tuple[str, str]] = [] 

56 self.new_indexes: list[str] = [] 

57 self.rows_to_sync: int = 0 

58 self.rows_to_delete: int = 0 

59 

60 

61class DatabaseBackend(Protocol): 

62 """Protocol for database backend operations.""" 

63 

64 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None: 

65 """Execute a query.""" 

66 ... 

67 

68 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]: 

69 """Fetch all results from a query.""" 

70 ... 

71 

72 def commit(self) -> None: 

73 """Commit the current transaction.""" 

74 ... 

75 

76 def close(self) -> None: 

77 """Close the connection.""" 

78 ... 

79 

80 def map_data_type(self, data_type: str | None) -> str: 

81 """Map config data type to SQL database type.""" 

82 ... 

83 

84 def create_table_if_not_exists( 

85 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None 

86 ) -> None: 

87 """Create table if it doesn't exist.""" 

88 ... 

89 

90 def get_existing_columns(self, table_name: str) -> set[str]: 

91 """Get set of existing column names in a table.""" 

92 ... 

93 

94 def add_column(self, table_name: str, column_name: str, column_type: str) -> None: 

95 """Add a new column to an existing table.""" 

96 ... 

97 

98 def upsert_row( 

99 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any] 

100 ) -> None: 

101 """Upsert a row into the database.""" 

102 ... 

103 

104 def delete_stale_records_compound( 

105 self, 

106 table_name: str, 

107 id_columns: list[str], 

108 filter_columns: dict[str, str], 

109 current_ids: set[tuple], 

110 ) -> int: 

111 """Delete records from database that aren't in current CSV using compound filter key.""" 

112 ... 

113 

114 def count_stale_records_compound( 

115 self, 

116 table_name: str, 

117 id_columns: list[str], 

118 filter_columns: dict[str, str], 

119 current_ids: set[tuple], 

120 ) -> int: 

121 """Count records that would be deleted using compound filter key.""" 

122 ... 

123 

124 def get_existing_indexes(self, table_name: str) -> set[str]: 

125 """Get set of existing index names for a table.""" 

126 ... 

127 

128 def create_index( 

129 self, table_name: str, index_name: str, columns: list[tuple[str, str]] 

130 ) -> None: 

131 """Create an index on the specified columns. 

132 

133 Args: 

134 table_name: Name of the table 

135 index_name: Name of the index to create 

136 columns: List of (column_name, order) tuples, e.g. [('email', 'ASC'), ('date', 'DESC')] 

137 """ 

138 ... 

139 

140 def table_exists(self, table_name: str) -> bool: 

141 """Check if a table exists in the database. 

142 

143 Args: 

144 table_name: Name of the table to check 

145 

146 Returns: 

147 True if table exists, False otherwise 

148 """ 

149 ... 

150 

151 

152class PostgreSQLBackend: 

153 """PostgreSQL database backend.""" 

154 

155 def __init__(self, connection_string: str) -> None: 

156 """Initialize PostgreSQL connection.""" 

157 self.conn = psycopg.connect(connection_string) 

158 

159 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None: 

160 """Execute a query.""" 

161 with self.conn.cursor() as cur: 

162 if params: 

163 cur.execute(query, params) 

164 else: 

165 cur.execute(query) 

166 

167 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]: 

168 """Fetch all results from a query.""" 

169 with self.conn.cursor() as cur: 

170 if params: 

171 cur.execute(query, params) 

172 else: 

173 cur.execute(query) 

174 return cur.fetchall() 

175 

176 def commit(self) -> None: 

177 """Commit the current transaction.""" 

178 self.conn.commit() 

179 

180 def close(self) -> None: 

181 """Close the connection.""" 

182 self.conn.close() 

183 

184 def map_data_type(self, data_type: str | None) -> str: 

185 """Map config data type to PostgreSQL type.""" 

186 if data_type is None: 

187 return "TEXT" 

188 

189 data_type_lower = data_type.lower().strip() 

190 

191 # Check for varchar(N) pattern 

192 if data_type_lower.startswith("varchar"): 

193 return data_type.upper() # VARCHAR(N) 

194 

195 # Map other types 

196 type_mapping = { 

197 "integer": "INTEGER", 

198 "int": "INTEGER", 

199 "bigint": "BIGINT", 

200 "float": "DOUBLE PRECISION", 

201 "double": "DOUBLE PRECISION", 

202 "date": "DATE", 

203 "datetime": "TIMESTAMP", 

204 "timestamp": "TIMESTAMP", 

205 "text": "TEXT", 

206 "string": "TEXT", 

207 } 

208 

209 return type_mapping.get(data_type_lower, "TEXT") 

210 

211 def create_table_if_not_exists( 

212 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None 

213 ) -> None: 

214 """Create table if it doesn't exist.""" 

215 column_defs = [] 

216 for col_name, col_type in columns.items(): 

217 column_defs.append(sql.SQL("{} {}").format(sql.Identifier(col_name), sql.SQL(col_type))) 

218 

219 # Add primary key constraint if specified 

220 if primary_keys: 

221 pk_constraint = sql.SQL("PRIMARY KEY ({})").format( 

222 sql.SQL(", ").join(sql.Identifier(pk) for pk in primary_keys) 

223 ) 

224 column_defs.append(pk_constraint) 

225 

226 query = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format( 

227 sql.Identifier(table_name), sql.SQL(", ").join(column_defs) 

228 ) 

229 self.execute(query.as_string(self.conn)) 

230 self.commit() 

231 

232 def get_existing_columns(self, table_name: str) -> set[str]: 

233 """Get set of existing column names in a table. 

234 

235 Uses case-insensitive comparison to handle quoted identifiers that preserve case. 

236 """ 

237 query = """ 

238 SELECT column_name 

239 FROM information_schema.columns 

240 WHERE LOWER(table_name) = LOWER(%s) 

241 """ 

242 results = self.fetchall(query, (table_name,)) 

243 return {row[0].lower() for row in results} 

244 

245 def add_column(self, table_name: str, column_name: str, column_type: str) -> None: 

246 """Add a new column to an existing table.""" 

247 query = sql.SQL("ALTER TABLE {} ADD COLUMN {} {}").format( 

248 sql.Identifier(table_name), 

249 sql.Identifier(column_name), 

250 sql.SQL(column_type), 

251 ) 

252 self.execute(query.as_string(self.conn)) 

253 self.commit() 

254 

255 def upsert_row( 

256 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any] 

257 ) -> None: 

258 """Upsert a row into the database.""" 

259 columns = list(row_data.keys()) 

260 values = tuple(row_data.values()) 

261 

262 insert_query = sql.SQL( 

263 "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET {}" 

264 ).format( 

265 sql.Identifier(table_name), 

266 sql.SQL(", ").join(sql.Identifier(col) for col in columns), 

267 sql.SQL(", ").join(sql.Placeholder() * len(values)), 

268 sql.SQL(", ").join(sql.Identifier(col) for col in conflict_columns), 

269 sql.SQL(", ").join( 

270 sql.SQL("{} = EXCLUDED.{}").format(sql.Identifier(col), sql.Identifier(col)) 

271 for col in columns 

272 if col not in conflict_columns 

273 ), 

274 ) 

275 self.execute(insert_query.as_string(self.conn), values) 

276 self.commit() 

277 

278 def count_stale_records_compound( 

279 self, 

280 table_name: str, 

281 id_columns: list[str], 

282 filter_columns: dict[str, str], 

283 current_ids: set[tuple], 

284 ) -> int: 

285 """Count records that would be deleted using compound filter key. 

286 

287 Args: 

288 table_name: Name of the table 

289 id_columns: List of ID column names (for compound keys) 

290 filter_columns: Dictionary of column_name -> value to filter by (compound key) 

291 current_ids: Set of ID tuples from the current CSV 

292 

293 Returns: 

294 Count of records that would be deleted 

295 """ 

296 if not current_ids or not filter_columns: 

297 return 0 

298 

299 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...) 

300 filter_conditions = [ 

301 sql.SQL("{} = %s").format(sql.Identifier(col)) for col in filter_columns 

302 ] 

303 

304 if len(id_columns) == 1: 

305 # Single key - simpler query 

306 current_ids_list = [ 

307 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids 

308 ] 

309 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format( 

310 sql.Identifier(table_name), 

311 sql.SQL(" AND ").join(filter_conditions), 

312 sql.Identifier(id_columns[0]), 

313 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)), 

314 ) 

315 params = tuple(list(filter_columns.values()) + current_ids_list) 

316 else: 

317 # Compound key - use row value constructor 

318 id_cols_sql = sql.SQL("({})").format( 

319 sql.SQL(", ").join(sql.Identifier(col) for col in id_columns) 

320 ) 

321 placeholders = sql.SQL(", ").join( 

322 sql.SQL("({})").format(sql.SQL(", ").join(sql.Placeholder() * len(id_columns))) 

323 for _ in current_ids 

324 ) 

325 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format( 

326 sql.Identifier(table_name), 

327 sql.SQL(" AND ").join(filter_conditions), 

328 id_cols_sql, 

329 placeholders, 

330 ) 

331 # Flatten the list of tuples for params 

332 id_params = [val for id_tuple in current_ids for val in id_tuple] 

333 params = tuple(list(filter_columns.values()) + id_params) 

334 

335 count_result = self.fetchall(count_query.as_string(self.conn), params) 

336 return count_result[0][0] if count_result else 0 

337 

338 def delete_stale_records_compound( 

339 self, 

340 table_name: str, 

341 id_columns: list[str], 

342 filter_columns: dict[str, str], 

343 current_ids: set[tuple], 

344 ) -> int: 

345 """Delete records from database that aren't in current CSV using compound filter key. 

346 

347 Args: 

348 table_name: Name of the table 

349 id_columns: List of ID column names (for compound keys) 

350 filter_columns: Dictionary of column_name -> value to filter by (compound key) 

351 current_ids: Set of ID tuples from the current CSV 

352 

353 Returns: 

354 Count of records deleted 

355 """ 

356 if not current_ids or not filter_columns: 

357 return 0 

358 

359 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...) 

360 filter_conditions = [ 

361 sql.SQL("{} = %s").format(sql.Identifier(col)) for col in filter_columns 

362 ] 

363 

364 if len(id_columns) == 1: 

365 # Single key - simpler query 

366 current_ids_list = [ 

367 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids 

368 ] 

369 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format( 

370 sql.Identifier(table_name), 

371 sql.SQL(" AND ").join(filter_conditions), 

372 sql.Identifier(id_columns[0]), 

373 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)), 

374 ) 

375 delete_query = sql.SQL("DELETE FROM {} WHERE {} AND {} NOT IN ({})").format( 

376 sql.Identifier(table_name), 

377 sql.SQL(" AND ").join(filter_conditions), 

378 sql.Identifier(id_columns[0]), 

379 sql.SQL(", ").join(sql.Placeholder() * len(current_ids_list)), 

380 ) 

381 params = tuple(list(filter_columns.values()) + current_ids_list) 

382 else: 

383 # Compound key - use row value constructor 

384 id_cols_sql = sql.SQL("({})").format( 

385 sql.SQL(", ").join(sql.Identifier(col) for col in id_columns) 

386 ) 

387 placeholders = sql.SQL(", ").join( 

388 sql.SQL("({})").format(sql.SQL(", ").join(sql.Placeholder() * len(id_columns))) 

389 for _ in current_ids 

390 ) 

391 count_query = sql.SQL("SELECT COUNT(*) FROM {} WHERE {} AND {} NOT IN ({})").format( 

392 sql.Identifier(table_name), 

393 sql.SQL(" AND ").join(filter_conditions), 

394 id_cols_sql, 

395 placeholders, 

396 ) 

397 delete_query = sql.SQL("DELETE FROM {} WHERE {} AND {} NOT IN ({})").format( 

398 sql.Identifier(table_name), 

399 sql.SQL(" AND ").join(filter_conditions), 

400 id_cols_sql, 

401 placeholders, 

402 ) 

403 # Flatten the list of tuples for params 

404 id_params = [val for id_tuple in current_ids for val in id_tuple] 

405 params = tuple(list(filter_columns.values()) + id_params) 

406 

407 # Count first 

408 count_sql = count_query.as_string(self.conn) 

409 logger.debug(f"PostgreSQL count query: {count_sql}") 

410 logger.debug(f"PostgreSQL count params: {params}") 

411 count_result = self.fetchall(count_sql, params) 

412 deleted_count = count_result[0][0] if count_result else 0 

413 

414 # Then delete 

415 delete_sql = delete_query.as_string(self.conn) 

416 logger.debug(f"PostgreSQL delete query: {delete_sql}") 

417 logger.debug(f"PostgreSQL delete params: {params}") 

418 logger.debug(f"PostgreSQL deleted count: {deleted_count}") 

419 self.execute(delete_sql, params) 

420 self.commit() 

421 

422 return deleted_count 

423 

424 def get_existing_indexes(self, table_name: str) -> set[str]: 

425 """Get set of existing index names for a table. 

426 

427 Uses case-insensitive comparison to handle quoted identifiers that preserve case. 

428 """ 

429 query = """ 

430 SELECT indexname 

431 FROM pg_indexes 

432 WHERE LOWER(tablename) = LOWER(%s) 

433 """ 

434 results = self.fetchall(query, (table_name,)) 

435 return {row[0].lower() for row in results} 

436 

437 def create_index( 

438 self, table_name: str, index_name: str, columns: list[tuple[str, str]] 

439 ) -> None: 

440 """Create an index on the specified columns.""" 

441 # Build column list with order 

442 column_parts = [] 

443 for col_name, order in columns: 

444 column_parts.append(sql.SQL("{} {}").format(sql.Identifier(col_name), sql.SQL(order))) 

445 

446 query = sql.SQL("CREATE INDEX IF NOT EXISTS {} ON {} ({})").format( 

447 sql.Identifier(index_name), 

448 sql.Identifier(table_name), 

449 sql.SQL(", ").join(column_parts), 

450 ) 

451 

452 self.execute(query.as_string(self.conn)) 

453 self.commit() 

454 

455 def table_exists(self, table_name: str) -> bool: 

456 """Check if a table exists in the database. 

457 

458 Uses case-insensitive comparison to handle quoted identifiers that preserve case. 

459 

460 Args: 

461 table_name: Name of the table to check 

462 

463 Returns: 

464 True if table exists, False otherwise 

465 """ 

466 query = """ 

467 SELECT EXISTS ( 

468 SELECT FROM information_schema.tables 

469 WHERE LOWER(table_name) = LOWER(%s) 

470 ) 

471 """ 

472 result = self.fetchall(query, (table_name,)) 

473 return result[0][0] if result else False 

474 

475 

476class SQLiteBackend: 

477 """SQLite database backend.""" 

478 

479 def __init__(self, connection_string: str) -> None: 

480 """Initialize SQLite connection.""" 

481 # Extract database path from connection string 

482 # Supports: sqlite:///path/to/db.db or sqlite:///:memory: 

483 if connection_string.startswith("sqlite:///"): 

484 db_path = connection_string[10:] # Remove 'sqlite:///' 

485 elif connection_string.startswith("sqlite://"): 

486 db_path = connection_string[9:] # Remove 'sqlite://' 

487 else: 

488 db_path = connection_string 

489 

490 self.conn = sqlite3.connect(db_path) 

491 self.cursor = self.conn.cursor() 

492 

493 def execute(self, query: str, params: tuple[Any, ...] | None = None) -> None: 

494 """Execute a query.""" 

495 if params: 

496 self.cursor.execute(query, params) 

497 else: 

498 self.cursor.execute(query) 

499 

500 def fetchall(self, query: str, params: tuple[Any, ...] | None = None) -> list[tuple[Any, ...]]: 

501 """Fetch all results from a query.""" 

502 if params: 

503 self.cursor.execute(query, params) 

504 else: 

505 self.cursor.execute(query) 

506 return self.cursor.fetchall() 

507 

508 def commit(self) -> None: 

509 """Commit the current transaction.""" 

510 self.conn.commit() 

511 

512 def close(self) -> None: 

513 """Close the connection.""" 

514 self.cursor.close() 

515 self.conn.close() 

516 

517 def map_data_type(self, data_type: str | None) -> str: 

518 """Map config data type to SQLite type.""" 

519 if data_type is None: 

520 return "TEXT" 

521 

522 data_type_lower = data_type.lower().strip() 

523 

524 # SQLite doesn't have VARCHAR, use TEXT 

525 if data_type_lower.startswith("varchar"): 

526 return "TEXT" 

527 

528 # Map other types 

529 type_mapping = { 

530 "integer": "INTEGER", 

531 "int": "INTEGER", 

532 "bigint": "INTEGER", # SQLite INTEGER is 8-byte signed, equivalent to BIGINT 

533 "float": "REAL", 

534 "double": "REAL", 

535 "date": "TEXT", 

536 "datetime": "TEXT", 

537 "timestamp": "TEXT", 

538 "text": "TEXT", 

539 "string": "TEXT", 

540 } 

541 

542 return type_mapping.get(data_type_lower, "TEXT") 

543 

544 def create_table_if_not_exists( 

545 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None 

546 ) -> None: 

547 """Create table if it doesn't exist.""" 

548 column_defs_str = ", ".join( 

549 f'"{col_name}" {col_type}' for col_name, col_type in columns.items() 

550 ) 

551 

552 # Add primary key constraint if specified 

553 if primary_keys: 

554 pk_columns = ", ".join(f'"{pk}"' for pk in primary_keys) 

555 column_defs_str += f", PRIMARY KEY ({pk_columns})" 

556 

557 query = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({column_defs_str})' 

558 self.execute(query) 

559 self.commit() 

560 

561 def get_existing_columns(self, table_name: str) -> set[str]: 

562 """Get set of existing column names in a table.""" 

563 query = f'PRAGMA table_info("{table_name}")' 

564 results = self.fetchall(query) 

565 # PRAGMA table_info returns: (cid, name, type, notnull, dflt_value, pk) 

566 return {row[1].lower() for row in results} 

567 

568 def add_column(self, table_name: str, column_name: str, column_type: str) -> None: 

569 """Add a new column to an existing table.""" 

570 query = f'ALTER TABLE "{table_name}" ADD COLUMN "{column_name}" {column_type}' 

571 self.execute(query) 

572 self.commit() 

573 

574 def upsert_row( 

575 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any] 

576 ) -> None: 

577 """Upsert a row into the database.""" 

578 columns = list(row_data.keys()) 

579 values = tuple(row_data.values()) 

580 

581 columns_str = ", ".join(f'"{col}"' for col in columns) 

582 placeholders = ", ".join("?" * len(values)) 

583 update_str = ", ".join( 

584 f'"{col}" = excluded."{col}"' for col in columns if col not in conflict_columns 

585 ) 

586 

587 # SQLite ON CONFLICT clause with multiple columns 

588 conflict_cols_str = ", ".join(f'"{col}"' for col in conflict_columns) 

589 

590 query = f'INSERT INTO "{table_name}" ({columns_str}) VALUES ({placeholders}) ' 

591 query += f"ON CONFLICT ({conflict_cols_str}) DO UPDATE SET {update_str}" 

592 

593 self.execute(query, values) 

594 self.commit() 

595 

596 def get_existing_indexes(self, table_name: str) -> set[str]: 

597 """Get set of existing index names for a table.""" 

598 query = "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name=?" 

599 results = self.fetchall(query, (table_name,)) 

600 return {row[0].lower() for row in results} 

601 

602 def create_index( 

603 self, table_name: str, index_name: str, columns: list[tuple[str, str]] 

604 ) -> None: 

605 """Create an index on the specified columns.""" 

606 # Build column list with order 

607 column_parts = [] 

608 for col_name, order in columns: 

609 column_parts.append(f'"{col_name}" {order}') 

610 

611 columns_str = ", ".join(column_parts) 

612 query = f'CREATE INDEX IF NOT EXISTS "{index_name}" ON "{table_name}" ({columns_str})' 

613 

614 self.execute(query) 

615 self.commit() 

616 

617 def table_exists(self, table_name: str) -> bool: 

618 """Check if a table exists in the database. 

619 

620 Args: 

621 table_name: Name of the table to check 

622 

623 Returns: 

624 True if table exists, False otherwise 

625 """ 

626 query = "SELECT name FROM sqlite_master WHERE type='table' AND name=?" 

627 result = self.fetchall(query, (table_name,)) 

628 return len(result) > 0 

629 

630 def delete_stale_records_compound( 

631 self, 

632 table_name: str, 

633 id_columns: list[str], 

634 filter_columns: dict[str, str], 

635 current_ids: set[tuple], 

636 ) -> int: 

637 """Delete records from database that aren't in current CSV using compound filter key. 

638 

639 Args: 

640 table_name: Name of the table 

641 id_columns: List of ID column names (for compound keys) 

642 filter_columns: Dictionary of column_name -> value to filter by (compound key) 

643 current_ids: Set of ID tuples from the current CSV 

644 

645 Returns: 

646 Count of records deleted 

647 """ 

648 if not current_ids or not filter_columns: 

649 return 0 

650 

651 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...) 

652 filter_conditions = [f'"{col}" = ?' for col in filter_columns] 

653 

654 if len(id_columns) == 1: 

655 # Single key - simpler query 

656 current_ids_list = [ 

657 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids 

658 ] 

659 placeholders = ", ".join("?" * len(current_ids_list)) 

660 count_query = f""" 

661 SELECT COUNT(*) FROM "{table_name}" 

662 WHERE {" AND ".join(filter_conditions)} 

663 AND "{id_columns[0]}" NOT IN ({placeholders}) 

664 """ 

665 delete_query = f""" 

666 DELETE FROM "{table_name}" 

667 WHERE {" AND ".join(filter_conditions)} 

668 AND "{id_columns[0]}" NOT IN ({placeholders}) 

669 """ 

670 params = tuple(list(filter_columns.values()) + current_ids_list) 

671 else: 

672 # Compound key - use row value constructor 

673 quoted_cols = [f'"{col}"' for col in id_columns] 

674 id_cols = f"({', '.join(quoted_cols)})" 

675 placeholders = ", ".join(f"({', '.join('?' * len(id_columns))})" for _ in current_ids) 

676 count_query = f""" 

677 SELECT COUNT(*) FROM "{table_name}" 

678 WHERE {" AND ".join(filter_conditions)} 

679 AND {id_cols} NOT IN ({placeholders}) 

680 """ 

681 delete_query = f""" 

682 DELETE FROM "{table_name}" 

683 WHERE {" AND ".join(filter_conditions)} 

684 AND {id_cols} NOT IN ({placeholders}) 

685 """ 

686 # Flatten the list of tuples for params 

687 id_params = [val for id_tuple in current_ids for val in id_tuple] 

688 params = tuple(list(filter_columns.values()) + id_params) 

689 

690 # Count first 

691 logger.debug(f"SQLite count query: {count_query}") 

692 logger.debug(f"SQLite count params: {params}") 

693 count_result = self.fetchall(count_query, params) 

694 deleted_count = count_result[0][0] if count_result else 0 

695 

696 # Delete stale records 

697 logger.debug(f"SQLite delete query: {delete_query}") 

698 logger.debug(f"SQLite delete params: {params}") 

699 logger.debug(f"SQLite deleted count: {deleted_count}") 

700 self.execute(delete_query, params) 

701 self.commit() 

702 

703 return deleted_count 

704 

705 def count_stale_records_compound( 

706 self, 

707 table_name: str, 

708 id_columns: list[str], 

709 filter_columns: dict[str, str], 

710 current_ids: set[tuple], 

711 ) -> int: 

712 """Count records that would be deleted using compound filter key. 

713 

714 Args: 

715 table_name: Name of the table 

716 id_columns: List of ID column names (for compound keys) 

717 filter_columns: Dictionary of column_name -> value to filter by (compound key) 

718 current_ids: Set of ID tuples from the current CSV 

719 

720 Returns: 

721 Count of records that would be deleted 

722 """ 

723 if not current_ids or not filter_columns: 

724 return 0 

725 

726 # Build WHERE clause: WHERE col1 = ? AND col2 = ? AND (id1, id2) NOT IN (...) 

727 filter_conditions = [f'"{col}" = ?' for col in filter_columns] 

728 

729 if len(id_columns) == 1: 

730 # Single key - simpler query 

731 current_ids_list = [ 

732 id_val[0] if isinstance(id_val, tuple) else id_val for id_val in current_ids 

733 ] 

734 placeholders = ", ".join("?" * len(current_ids_list)) 

735 count_query = f""" 

736 SELECT COUNT(*) FROM "{table_name}" 

737 WHERE {" AND ".join(filter_conditions)} 

738 AND "{id_columns[0]}" NOT IN ({placeholders}) 

739 """ 

740 params = tuple(list(filter_columns.values()) + current_ids_list) 

741 else: 

742 # Compound key - use row value constructor 

743 quoted_cols = [f'"{col}"' for col in id_columns] 

744 id_cols = f"({', '.join(quoted_cols)})" 

745 placeholders = ", ".join(f"({', '.join('?' * len(id_columns))})" for _ in current_ids) 

746 count_query = f""" 

747 SELECT COUNT(*) FROM "{table_name}" 

748 WHERE {" AND ".join(filter_conditions)} 

749 AND {id_cols} NOT IN ({placeholders}) 

750 """ 

751 # Flatten the list of tuples for params 

752 id_params = [val for id_tuple in current_ids for val in id_tuple] 

753 params = tuple(list(filter_columns.values()) + id_params) 

754 

755 count_result = self.fetchall(count_query, params) 

756 return count_result[0][0] if count_result else 0 

757 

758 

759class DatabaseConnection: 

760 """Database connection handler supporting PostgreSQL and SQLite.""" 

761 

762 def __init__(self, connection_string: str) -> None: 

763 """Initialize database connection. 

764 

765 Args: 

766 connection_string: Database connection string 

767 - PostgreSQL: postgresql://user:pass@host:port/db 

768 - SQLite: sqlite:///path/to/db.db or sqlite:///:memory: 

769 """ 

770 self.connection_string = connection_string 

771 self.backend: DatabaseBackend | None = None 

772 

773 def __enter__(self) -> DatabaseConnection: 

774 """Enter context manager.""" 

775 if self.connection_string.startswith("sqlite"): 

776 self.backend = SQLiteBackend(self.connection_string) 

777 elif self.connection_string.startswith("postgres"): 

778 self.backend = PostgreSQLBackend(self.connection_string) 

779 else: 

780 raise ValueError( 

781 f"Unsupported database type in connection string: {self.connection_string}" 

782 ) 

783 return self 

784 

785 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

786 """Exit context manager.""" 

787 if self.backend: 

788 self.backend.close() 

789 

790 def create_table_if_not_exists( 

791 self, table_name: str, columns: dict[str, str], primary_keys: list[str] | None = None 

792 ) -> None: 

793 """Create table if it doesn't exist.""" 

794 if not self.backend: 

795 raise RuntimeError("Database connection not established") 

796 self.backend.create_table_if_not_exists(table_name, columns, primary_keys) 

797 

798 def get_existing_columns(self, table_name: str) -> set[str]: 

799 """Get set of existing column names in a table.""" 

800 if not self.backend: 

801 raise RuntimeError("Database connection not established") 

802 return self.backend.get_existing_columns(table_name) 

803 

804 def add_column(self, table_name: str, column_name: str, column_type: str) -> None: 

805 """Add a new column to an existing table.""" 

806 if not self.backend: 

807 raise RuntimeError("Database connection not established") 

808 self.backend.add_column(table_name, column_name, column_type) 

809 

810 def upsert_row( 

811 self, table_name: str, conflict_columns: list[str], row_data: dict[str, Any] 

812 ) -> None: 

813 """Upsert a row into the database.""" 

814 if not self.backend: 

815 raise RuntimeError("Database connection not established") 

816 self.backend.upsert_row(table_name, conflict_columns, row_data) 

817 

818 def delete_stale_records_compound( 

819 self, 

820 table_name: str, 

821 id_columns: list[str], 

822 filter_columns: dict[str, str], 

823 current_ids: set[tuple], 

824 ) -> int: 

825 """Delete records from database that aren't in current CSV using compound filter key.""" 

826 if not self.backend: 

827 raise RuntimeError("Database connection not established") 

828 return self.backend.delete_stale_records_compound( 

829 table_name, id_columns, filter_columns, current_ids 

830 ) 

831 

832 def count_stale_records_compound( 

833 self, 

834 table_name: str, 

835 id_columns: list[str], 

836 filter_columns: dict[str, str], 

837 current_ids: set[tuple], 

838 ) -> int: 

839 """Count records that would be deleted using compound filter key.""" 

840 if not self.backend: 

841 raise RuntimeError("Database connection not established") 

842 return self.backend.count_stale_records_compound( 

843 table_name, id_columns, filter_columns, current_ids 

844 ) 

845 

846 def get_existing_indexes(self, table_name: str) -> set[str]: 

847 """Get set of existing index names for a table.""" 

848 if not self.backend: 

849 raise RuntimeError("Database connection not established") 

850 return self.backend.get_existing_indexes(table_name) 

851 

852 def create_index( 

853 self, table_name: str, index_name: str, columns: list[tuple[str, str]] 

854 ) -> None: 

855 """Create an index on the specified columns.""" 

856 if not self.backend: 

857 raise RuntimeError("Database connection not established") 

858 self.backend.create_index(table_name, index_name, columns) 

859 

860 def table_exists(self, table_name: str) -> bool: 

861 """Check if a table exists in the database. 

862 

863 Args: 

864 table_name: Name of the table to check 

865 

866 Returns: 

867 True if table exists, False otherwise 

868 """ 

869 if not self.backend: 

870 raise RuntimeError("Database connection not established") 

871 return self.backend.table_exists(table_name) 

872 

873 def _validate_id_columns(self, job: CrumpJob, csv_columns: set[str]) -> set[str]: 

874 """Validate that required ID columns exist in CSV. 

875 

876 Args: 

877 job: CrumpJob configuration 

878 csv_columns: Set of column names from CSV 

879 

880 Returns: 

881 Set of ID column names from CSV 

882 

883 Raises: 

884 ValueError: If any ID column is missing from CSV 

885 """ 

886 id_csv_columns = set() 

887 for id_col in job.id_mapping: 

888 # Skip validation for custom functions (no csv_column) 

889 if id_col.csv_column is None: 

890 # Custom function - validate input columns instead 

891 if id_col.input_columns: 

892 for input_col in id_col.input_columns: 

893 if input_col not in csv_columns: 

894 raise ValueError( 

895 f"Input column '{input_col}' for custom function " 

896 f"'{id_col.db_column}' not found in CSV" 

897 ) 

898 continue 

899 

900 if id_col.csv_column not in csv_columns: 

901 raise ValueError(f"ID column '{id_col.csv_column}' not found in CSV") 

902 id_csv_columns.add(id_col.csv_column) 

903 return id_csv_columns 

904 

905 def _determine_sync_columns( 

906 self, job: CrumpJob, csv_columns: set[str], id_csv_columns: set[str] 

907 ) -> list[Any]: 

908 """Determine which columns to sync based on job configuration. 

909 

910 When failure_mode is set, missing CSV columns for configured mappings are 

911 tolerated (the column is kept so rows can receive default/null values). 

912 Custom function input columns that are missing always raise ValueError. 

913 

914 Args: 

915 job: CrumpJob configuration 

916 csv_columns: Set of column names from CSV 

917 id_csv_columns: Set of ID column names 

918 

919 Returns: 

920 List of ColumnMapping objects for columns to sync 

921 

922 Raises: 

923 ValueError: If a custom function input column is missing from CSV 

924 """ 

925 if job.columns: 

926 # Specific columns defined 

927 sync_columns = list(job.id_mapping) + job.columns 

928 for col_mapping in job.columns: 

929 # Skip validation for custom functions (no csv_column) 

930 if col_mapping.csv_column is None: 

931 # Custom function - validate input columns instead 

932 if col_mapping.input_columns: 

933 for input_col in col_mapping.input_columns: 

934 if input_col not in csv_columns: 

935 raise ValueError( 

936 f"Input column '{input_col}' for custom function " 

937 f"'{col_mapping.db_column}' not found in CSV" 

938 ) 

939 continue 

940 

941 if col_mapping.csv_column not in csv_columns: 

942 # Column is missing from CSV - log warning but keep it 

943 # Row validation will handle this per-row based on failure_mode 

944 logger.warning( 

945 f"Column '{col_mapping.csv_column}' defined in config " 

946 f"but not found in CSV file" 

947 ) 

948 else: 

949 # Sync all columns 

950 sync_columns = list(job.id_mapping) 

951 for csv_col in csv_columns: 

952 if csv_col not in id_csv_columns: 

953 sync_columns.append(ColumnMapping(csv_col, csv_col)) 

954 

955 return sync_columns 

956 

957 def _build_column_definitions(self, sync_columns: list[Any], job: CrumpJob) -> dict[str, str]: 

958 """Build column definitions with SQL types and nullable constraints. 

959 

960 Args: 

961 sync_columns: List of ColumnMapping objects 

962 job: CrumpJob configuration 

963 

964 Returns: 

965 Dictionary mapping column names to SQL type definitions (including NULL/NOT NULL) 

966 """ 

967 if not self.backend: 

968 raise RuntimeError("Database connection not established") 

969 columns_def = {} 

970 for col_mapping in sync_columns: 

971 sql_type = self.backend.map_data_type(col_mapping.data_type) 

972 

973 # Add nullable constraint if specified 

974 if col_mapping.nullable is not None: 

975 if col_mapping.nullable: 

976 sql_type += " NULL" 

977 else: 

978 sql_type += " NOT NULL" 

979 

980 columns_def[col_mapping.db_column] = sql_type 

981 

982 # Add filename_to_column columns if configured 

983 if job.filename_to_column: 

984 for col_mapping in job.filename_to_column.columns.values(): 

985 sql_type = self.backend.map_data_type(col_mapping.data_type) 

986 columns_def[col_mapping.db_column] = sql_type 

987 

988 return columns_def 

989 

990 def _setup_table_schema( 

991 self, job: CrumpJob, columns_def: dict[str, str], primary_keys: list[str] 

992 ) -> bool: 

993 """Create table and add missing columns/indexes. 

994 

995 Args: 

996 job: CrumpJob configuration 

997 columns_def: Dictionary mapping column names to SQL types 

998 primary_keys: List of primary key column names 

999 

1000 Returns: 

1001 True if schema changes were made (table created, columns added, or indexes created) 

1002 """ 

1003 schema_changed = False 

1004 

1005 # Check if table exists before creating 

1006 table_existed = self.table_exists(job.target_table) 

1007 

1008 # Create table if it doesn't exist 

1009 self.create_table_if_not_exists(job.target_table, columns_def, primary_keys) 

1010 

1011 if not table_existed: 

1012 schema_changed = True 

1013 

1014 # Check for schema evolution: add missing columns from config 

1015 existing_columns = self.get_existing_columns(job.target_table) 

1016 for col_name, col_type in columns_def.items(): 

1017 if col_name.lower() not in existing_columns: 

1018 self.add_column(job.target_table, col_name, col_type) 

1019 schema_changed = True 

1020 

1021 # Create indexes that don't already exist 

1022 if job.indexes: 

1023 existing_indexes = self.get_existing_indexes(job.target_table) 

1024 for index in job.indexes: 

1025 if index.name.lower() not in existing_indexes: 

1026 index_columns = [(col.column, col.order) for col in index.columns] 

1027 self.create_index(job.target_table, index.name, index_columns) 

1028 schema_changed = True 

1029 

1030 return schema_changed 

1031 

1032 def _should_include_row( 

1033 self, row_index: int, total_rows: int, sample_percentage: float | None 

1034 ) -> bool: 

1035 """Determine if a row should be included based on sampling percentage. 

1036 

1037 Args: 

1038 row_index: Zero-based index of the current row 

1039 total_rows: Total number of rows in the dataset 

1040 sample_percentage: Optional percentage of rows to sample (0-100) 

1041 

1042 Returns: 

1043 True if row should be included, False otherwise 

1044 """ 

1045 # If no sampling or 100%, include all rows 

1046 if sample_percentage is None or sample_percentage >= 100: 

1047 return True 

1048 

1049 # If 0%, exclude all rows (edge case) 

1050 if sample_percentage <= 0: 

1051 return False 

1052 

1053 # Always include first row 

1054 if row_index == 0: 

1055 return True 

1056 

1057 # Always include last row 

1058 if row_index == total_rows - 1: 

1059 return True 

1060 

1061 # Sample other rows based on percentage 

1062 # For 10%, interval = 10, so include rows 0, 10, 20, 30... 

1063 # For 25%, interval = 4, so include rows 0, 4, 8, 12... 

1064 interval = int(100 / sample_percentage) 

1065 return row_index % interval == 0 

1066 

1067 @staticmethod 

1068 def _get_varchar_limit(data_type: str | None) -> int | None: 

1069 """Extract the character limit from a varchar(N) type string. 

1070 

1071 Args: 

1072 data_type: Data type string, e.g. 'varchar(50)' 

1073 

1074 Returns: 

1075 The limit N, or None if not a varchar type 

1076 """ 

1077 if data_type is None: 

1078 return None 

1079 import re as _re 

1080 

1081 match = _re.match(r"varchar\((\d+)\)", data_type.lower().strip()) 

1082 if match: 

1083 return int(match.group(1)) 

1084 return None 

1085 

1086 @staticmethod 

1087 def _get_default_value(data_type: str | None) -> Any: 

1088 """Get the permissive default value for a non-nullable column. 

1089 

1090 Args: 

1091 data_type: The configured data type 

1092 

1093 Returns: 

1094 0 for integer/numeric types, empty string for text/string types 

1095 """ 

1096 if data_type is None: 

1097 return "" 

1098 dt_lower = data_type.lower().strip() 

1099 if dt_lower in ("integer", "int", "bigint"): 

1100 return 0 

1101 if dt_lower in ("float", "double"): 

1102 return 0.0 

1103 return "" 

1104 

1105 def _validate_and_fix_row( 

1106 self, 

1107 row_data: dict[str, Any], 

1108 sync_columns: list[Any], 

1109 job: CrumpJob, 

1110 csv_row: dict[str, Any], 

1111 ) -> dict[str, Any] | None: 

1112 """Validate a transformed row and apply failure_mode rules. 

1113 

1114 Handles: 

1115 - Missing nullable fields → NULL (both modes) 

1116 - Missing non-nullable fields → skip row (STRICT), default value (PERMISSIVE) 

1117 - String exceeding varchar limit → skip row (STRICT), truncate (PERMISSIVE) 

1118 

1119 Args: 

1120 row_data: The transformed row data (db_column → value) 

1121 sync_columns: List of ColumnMapping objects 

1122 job: CrumpJob configuration 

1123 csv_row: The original CSV row (for context in logging) 

1124 

1125 Returns: 

1126 The validated/fixed row_data dict, or None if the row should be skipped 

1127 """ 

1128 failure_mode = job.failure_mode 

1129 

1130 for col_mapping in sync_columns: 

1131 db_col = col_mapping.db_column 

1132 

1133 # Determine if this column's value is missing from the CSV 

1134 # A value is "missing" if: 

1135 # - The db_col key is absent from row_data, OR 

1136 # - The value is None (set by apply_row_transformations for missing CSV cols), OR 

1137 # - The CSV column was not present in the original row (empty string artifact) 

1138 value = row_data.get(db_col) 

1139 is_missing = ( 

1140 db_col not in row_data 

1141 or value is None 

1142 or ( 

1143 value == "" 

1144 and col_mapping.csv_column is not None 

1145 and col_mapping.csv_column not in csv_row 

1146 ) 

1147 ) 

1148 

1149 if is_missing: 

1150 if col_mapping.nullable is False: 

1151 # Non-nullable field missing 

1152 if failure_mode == FailureMode.STRICT: 

1153 logger.warning( 

1154 f"STRICT mode: Skipping row - missing non-nullable field '{db_col}'" 

1155 ) 

1156 return None 

1157 else: 

1158 # PERMISSIVE: use default value 

1159 default = self._get_default_value(col_mapping.data_type) 

1160 logger.warning( 

1161 f"PERMISSIVE mode: Using default value {default!r} " 

1162 f"for missing non-nullable field '{db_col}'" 

1163 ) 

1164 row_data[db_col] = default 

1165 else: 

1166 # Nullable or unspecified → NULL 

1167 row_data[db_col] = None 

1168 

1169 # Check varchar limit 

1170 varchar_limit = self._get_varchar_limit(col_mapping.data_type) 

1171 if varchar_limit is not None and db_col in row_data and row_data[db_col] is not None: 

1172 value = str(row_data[db_col]) 

1173 if len(value) > varchar_limit: 

1174 if failure_mode == FailureMode.STRICT: 

1175 logger.warning( 

1176 f"STRICT mode: Skipping row - value for '{db_col}' " 

1177 f"exceeds varchar({varchar_limit}) limit " 

1178 f"(length {len(value)})" 

1179 ) 

1180 return None 

1181 else: 

1182 # PERMISSIVE: truncate 

1183 logger.warning( 

1184 f"PERMISSIVE mode: Truncating value for '{db_col}' " 

1185 f"from {len(value)} to {varchar_limit} characters" 

1186 ) 

1187 row_data[db_col] = value[:varchar_limit] 

1188 

1189 return row_data 

1190 

1191 def _process_tabular_rows( 

1192 self, 

1193 reader: Any, 

1194 job: CrumpJob, 

1195 sync_columns: list[Any], 

1196 primary_keys: list[str], 

1197 filename_values: dict[str, str] | None = None, 

1198 ) -> tuple[int, set[tuple]]: 

1199 """Process and upsert tabular file rows into database. 

1200 

1201 Args: 

1202 reader: Tabular file reader (DictReader interface) 

1203 job: CrumpJob configuration 

1204 sync_columns: List of ColumnMapping objects 

1205 primary_keys: List of primary key column names 

1206 filename_values: Optional dict of values extracted from filename 

1207 

1208 Returns: 

1209 Tuple of (rows_synced, synced_ids) where synced_ids are tuples of ID values 

1210 """ 

1211 rows_synced = 0 

1212 rows_skipped = 0 

1213 synced_ids: set[tuple] = set() 

1214 

1215 # For sampling, we need to know total row count first 

1216 if job.sample_percentage is not None and job.sample_percentage < 100: 

1217 # Read all rows into memory to get total count and apply sampling 

1218 all_rows = list(reader) 

1219 total_rows = len(all_rows) 

1220 

1221 for row_index, row in enumerate(all_rows): 

1222 # Check if this row should be included 

1223 if not self._should_include_row(row_index, total_rows, job.sample_percentage): 

1224 continue 

1225 

1226 # Apply column transformations 

1227 row_data = apply_row_transformations( 

1228 row, sync_columns, job.filename_to_column, filename_values 

1229 ) 

1230 

1231 # Validate and fix row based on failure_mode 

1232 validated = self._validate_and_fix_row(row_data, sync_columns, job, row) 

1233 if validated is None: 

1234 rows_skipped += 1 

1235 continue 

1236 

1237 self.upsert_row(job.target_table, primary_keys, validated) 

1238 

1239 # Track synced IDs as tuples (for compound key support) 

1240 id_values = tuple(validated[id_col.db_column] for id_col in job.id_mapping) 

1241 synced_ids.add(id_values) 

1242 rows_synced += 1 

1243 else: 

1244 # No sampling - process rows normally without loading into memory 

1245 for row in reader: 

1246 # Apply column transformations 

1247 row_data = apply_row_transformations( 

1248 row, sync_columns, job.filename_to_column, filename_values 

1249 ) 

1250 

1251 # Validate and fix row based on failure_mode 

1252 validated = self._validate_and_fix_row(row_data, sync_columns, job, row) 

1253 if validated is None: 

1254 rows_skipped += 1 

1255 continue 

1256 

1257 self.upsert_row(job.target_table, primary_keys, validated) 

1258 

1259 # Track synced IDs as tuples (for compound key support) 

1260 id_values = tuple(validated[id_col.db_column] for id_col in job.id_mapping) 

1261 synced_ids.add(id_values) 

1262 rows_synced += 1 

1263 

1264 if rows_skipped > 0: 

1265 logger.warning(f"Skipped {rows_skipped} rows due to validation failures") 

1266 

1267 # In STRICT mode, if the file had rows but ALL were rejected, raise an error 

1268 if job.failure_mode == FailureMode.STRICT and rows_skipped > 0 and rows_synced == 0: 

1269 raise ValueError( 

1270 f"STRICT mode: All {rows_skipped} row(s) were rejected due to " 

1271 f"validation failures. No data was imported into '{job.target_table}'." 

1272 ) 

1273 

1274 return rows_synced, synced_ids 

1275 

1276 def _count_and_track_tabular_rows( 

1277 self, 

1278 file_path: Path, 

1279 job: CrumpJob, 

1280 sync_columns: list[Any], 

1281 filename_values: dict[str, str] | None = None, 

1282 ) -> tuple[int, set[tuple]]: 

1283 """Count CSV rows and track synced IDs without database operations. 

1284 

1285 This helper method processes the CSV to count rows and collect IDs that would be synced, 

1286 which is shared logic between dry-run and actual sync operations. 

1287 

1288 Args: 

1289 file_path: Path to tabular file (CSV or Parquet) 

1290 job: CrumpJob configuration 

1291 sync_columns: List of ColumnMapping objects 

1292 filename_values: Optional dict of values extracted from filename 

1293 

1294 Returns: 

1295 Tuple of (row_count, synced_ids) where synced_ids are tuples of ID values 

1296 """ 

1297 row_count = 0 

1298 synced_ids: set[tuple] = set() 

1299 

1300 file_format = _detect_file_format(file_path) 

1301 

1302 with create_reader(file_path, file_format=file_format) as reader: 

1303 # For sampling, we need to know total row count first 

1304 if job.sample_percentage is not None and job.sample_percentage < 100: 

1305 # Read all rows into memory to get total count and apply sampling 

1306 all_rows = list(reader) 

1307 total_rows = len(all_rows) 

1308 

1309 for row_index, row in enumerate(all_rows): 

1310 # Check if this row should be included 

1311 if not self._should_include_row(row_index, total_rows, job.sample_percentage): 

1312 continue 

1313 

1314 # Apply column transformations 

1315 row_data = apply_row_transformations( 

1316 row, sync_columns, job.filename_to_column, filename_values 

1317 ) 

1318 

1319 # Track synced IDs as tuples (for compound key support) 

1320 id_values = tuple(row_data[id_col.db_column] for id_col in job.id_mapping) 

1321 synced_ids.add(id_values) 

1322 row_count += 1 

1323 else: 

1324 # No sampling - process rows normally 

1325 for row in reader: 

1326 # Apply column transformations 

1327 row_data = apply_row_transformations( 

1328 row, sync_columns, job.filename_to_column, filename_values 

1329 ) 

1330 

1331 # Track synced IDs as tuples (for compound key support) 

1332 id_values = tuple(row_data[id_col.db_column] for id_col in job.id_mapping) 

1333 synced_ids.add(id_values) 

1334 row_count += 1 

1335 

1336 return row_count, synced_ids 

1337 

1338 def _prepare_sync( 

1339 self, file_path: Path, job: CrumpJob 

1340 ) -> tuple[set[str], list[Any], dict[str, str]]: 

1341 """Prepare for sync by validating CSV and building schema definitions. 

1342 

1343 Args: 

1344 file_path: Path to tabular file (CSV or Parquet) 

1345 job: CrumpJob configuration 

1346 

1347 Returns: 

1348 Tuple of (csv_columns, sync_columns, columns_def) 

1349 

1350 Raises: 

1351 FileNotFoundError: If CSV file doesn't exist 

1352 ValueError: If CSV is invalid or columns don't match 

1353 """ 

1354 if not file_path.exists(): 

1355 raise FileNotFoundError(f"File not found: {file_path}") 

1356 

1357 file_format = _detect_file_format(file_path) 

1358 

1359 with create_reader(file_path, file_format=file_format) as reader: 

1360 if not reader.fieldnames: 

1361 raise ValueError("File has no columns") 

1362 csv_columns = set(reader.fieldnames) 

1363 

1364 # Validate and determine columns to sync 

1365 id_csv_columns = self._validate_id_columns(job, csv_columns) 

1366 sync_columns = self._determine_sync_columns(job, csv_columns, id_csv_columns) 

1367 

1368 # Build schema definitions 

1369 columns_def = self._build_column_definitions(sync_columns, job) 

1370 

1371 return csv_columns, sync_columns, columns_def 

1372 

1373 def sync_tabular_file_dry_run( 

1374 self, 

1375 file_path: Path, 

1376 job: CrumpJob, 

1377 filename_values: dict[str, str] | None = None, 

1378 ) -> DryRunSummary: 

1379 """Simulate syncing a CSV file without making database changes. 

1380 

1381 Args: 

1382 file_path: Path to tabular file (CSV or Parquet) 

1383 job: CrumpJob configuration 

1384 filename_values: Optional dict of values extracted from filename 

1385 

1386 Returns: 

1387 DryRunSummary with details of what would be changed 

1388 

1389 Raises: 

1390 FileNotFoundError: If CSV file doesn't exist 

1391 ValueError: If CSV is invalid or columns don't match 

1392 """ 

1393 summary = DryRunSummary() 

1394 summary.table_name = job.target_table 

1395 

1396 # Prepare sync (validates CSV and builds schema) 

1397 csv_columns, sync_columns, columns_def = self._prepare_sync(file_path, job) 

1398 

1399 # Check what schema changes would be made 

1400 summary.table_exists = self.table_exists(job.target_table) 

1401 

1402 if summary.table_exists: 

1403 # Check for new columns 

1404 existing_columns = self.get_existing_columns(job.target_table) 

1405 for col_name, col_type in columns_def.items(): 

1406 if col_name.lower() not in existing_columns: 

1407 summary.new_columns.append((col_name, col_type)) 

1408 

1409 # Check for new indexes 

1410 if job.indexes: 

1411 existing_indexes = self.get_existing_indexes(job.target_table) 

1412 for index in job.indexes: 

1413 if index.name.lower() not in existing_indexes: 

1414 summary.new_indexes.append(index.name) 

1415 

1416 # Count rows and track IDs that would be synced 

1417 # NOTE: This counts all CSV rows, even if they match existing data. 

1418 # A more accurate implementation would query existing data and compare, 

1419 # but that would be expensive for large datasets. For now, we report 

1420 # the upper bound of rows that could be updated. 

1421 # If there are new columns, all rows will need updating regardless. 

1422 summary.rows_to_sync, synced_ids = self._count_and_track_tabular_rows( 

1423 file_path, job, sync_columns, filename_values 

1424 ) 

1425 

1426 # Count stale records that would be deleted 

1427 if job.filename_to_column and filename_values and summary.table_exists: 

1428 delete_key_columns = job.filename_to_column.get_delete_key_columns() 

1429 if delete_key_columns: 

1430 # Build compound key values from filename_values 

1431 delete_key_values = {} 

1432 for col_name, col_mapping in job.filename_to_column.columns.items(): 

1433 if col_mapping.use_to_delete_old_rows and col_name in filename_values: 

1434 delete_key_values[col_mapping.db_column] = filename_values[col_name] 

1435 

1436 id_columns = [id_col.db_column for id_col in job.id_mapping] 

1437 summary.rows_to_delete = self.count_stale_records_compound( 

1438 job.target_table, 

1439 id_columns, 

1440 delete_key_values, 

1441 synced_ids, 

1442 ) 

1443 

1444 return summary 

1445 

1446 def sync_tabular_file( 

1447 self, 

1448 file_path: Path, 

1449 job: CrumpJob, 

1450 filename_values: dict[str, str] | None = None, 

1451 enable_history: bool = False, 

1452 ) -> int: 

1453 """Sync a CSV file to the database using job configuration. 

1454 

1455 Args: 

1456 file_path: Path to tabular file (CSV or Parquet) 

1457 job: CrumpJob configuration 

1458 filename_values: Optional dict of values extracted from filename 

1459 enable_history: Whether to record sync history 

1460 

1461 Returns: 

1462 Number of rows synced 

1463 

1464 Raises: 

1465 FileNotFoundError: If CSV file doesn't exist 

1466 ValueError: If CSV is invalid or columns don't match 

1467 """ 

1468 from crump.history import get_utc_now, record_sync_history 

1469 

1470 # Track timing if history is enabled 

1471 start_time = get_utc_now() if enable_history else None 

1472 rows_deleted = 0 

1473 schema_changed = False 

1474 error_message: str | None = None 

1475 success = False 

1476 

1477 try: 

1478 # Prepare sync (validates CSV and builds schema) 

1479 csv_columns, sync_columns, columns_def = self._prepare_sync(file_path, job) 

1480 

1481 # Build schema and setup table 

1482 primary_keys = [id_col.db_column for id_col in job.id_mapping] 

1483 logger.debug(f"Primary keys for table {job.target_table}: {primary_keys}") 

1484 schema_changed = self._setup_table_schema(job, columns_def, primary_keys) 

1485 

1486 # Process rows 

1487 file_format = _detect_file_format(file_path) 

1488 with create_reader(file_path, file_format=file_format) as reader: 

1489 rows_synced, synced_ids = self._process_tabular_rows( 

1490 reader, job, sync_columns, primary_keys, filename_values 

1491 ) 

1492 

1493 # Clean up stale records 

1494 if job.filename_to_column and filename_values: 

1495 delete_key_columns = job.filename_to_column.get_delete_key_columns() 

1496 if delete_key_columns: 

1497 # Build compound key values from filename_values 

1498 delete_key_values = {} 

1499 for col_name, col_mapping in job.filename_to_column.columns.items(): 

1500 if col_mapping.use_to_delete_old_rows and col_name in filename_values: 

1501 delete_key_values[col_mapping.db_column] = filename_values[col_name] 

1502 

1503 id_columns = [id_col.db_column for id_col in job.id_mapping] 

1504 rows_deleted = self.delete_stale_records_compound( 

1505 job.target_table, 

1506 id_columns, 

1507 delete_key_values, 

1508 synced_ids, 

1509 ) 

1510 

1511 success = True 

1512 return rows_synced 

1513 

1514 except Exception as e: 

1515 error_message = str(e) 

1516 raise 

1517 

1518 finally: 

1519 # Record history if enabled and we have a backend 

1520 if enable_history and self.backend and start_time: 

1521 end_time = get_utc_now() 

1522 # If sync failed, rows_synced might not be set 

1523 final_rows_synced = rows_synced if success else 0 

1524 try: 

1525 record_sync_history( 

1526 backend=self.backend, 

1527 file_path=file_path, 

1528 table_name=job.target_table, 

1529 rows_upserted=final_rows_synced, 

1530 rows_deleted=rows_deleted, 

1531 schema_changed=schema_changed, 

1532 start_time=start_time, 

1533 end_time=end_time, 

1534 success=success, 

1535 error=error_message, 

1536 ) 

1537 except Exception as hist_error: 

1538 # Don't fail the sync if history recording fails 

1539 logger.warning(f"Failed to record sync history: {hist_error}") 

1540 

1541 

1542def sync_file_to_db( 

1543 file_path: Path, 

1544 job: CrumpJob, 

1545 db_connection_string: str, 

1546 filename_values: dict[str, str] | None = None, 

1547 enable_history: bool = False, 

1548) -> int: 

1549 """Sync a tabular file (CSV or Parquet) to database. 

1550 

1551 Args: 

1552 file_path: Path to the tabular file (CSV or Parquet) 

1553 job: CrumpJob configuration 

1554 db_connection_string: Database connection string (PostgreSQL or SQLite) 

1555 filename_values: Optional dict of values extracted from filename 

1556 enable_history: Whether to record sync history 

1557 

1558 Returns: 

1559 Number of rows synced 

1560 """ 

1561 with DatabaseConnection(db_connection_string) as db: 

1562 return db.sync_tabular_file(file_path, job, filename_values, enable_history) 

1563 

1564 

1565def sync_file_to_db_dry_run( 

1566 file_path: Path, 

1567 job: CrumpJob, 

1568 db_connection_string: str, 

1569 filename_values: dict[str, str] | None = None, 

1570) -> DryRunSummary: 

1571 """Simulate syncing a tabular file without making database changes. 

1572 

1573 Args: 

1574 file_path: Path to the tabular file (CSV or Parquet) 

1575 job: CrumpJob configuration 

1576 db_connection_string: Database connection string 

1577 filename_values: Optional dict of values extracted from filename 

1578 

1579 Returns: 

1580 DryRunSummary with details of what would be changed 

1581 """ 

1582 with DatabaseConnection(db_connection_string) as db: 

1583 return db.sync_tabular_file_dry_run(file_path, job, filename_values) 

1584 

1585 

1586# Backward compatibility aliases