Coverage for nlp_manager/input_field_config.py: 72%

218 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/nlp_manager/input_field_config.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Class to define input fields for NLP.** 

27 

28""" 

29 

30import logging 

31 

32# import sys 

33from typing import Any, Dict, Generator, List, Optional, Tuple 

34 

35from cardinal_pythonlib.datetimefunc import get_now_utc_notz_datetime 

36from cardinal_pythonlib.hash import hash64 

37from cardinal_pythonlib.sql.validation import ( 

38 ensure_valid_field_name, 

39 ensure_valid_table_name, 

40) 

41from cardinal_pythonlib.sqlalchemy.core_query import count_star 

42from cardinal_pythonlib.sqlalchemy.schema import ( 

43 is_sqlatype_integer, 

44 get_column_type, 

45 table_or_view_exists, 

46) 

47from cardinal_pythonlib.timing import MultiTimerContext, timer 

48from sqlalchemy import ( 

49 BigInteger, 

50 Column, 

51 DateTime, 

52 Index, 

53 Integer, 

54 String, 

55 Table, 

56) 

57from sqlalchemy.engine.base import Engine 

58from sqlalchemy.orm.session import Session 

59from sqlalchemy.sql import and_, column, exists, null, or_, select, table 

60from sqlalchemy.sql.schema import MetaData 

61 

62from crate_anon.common.parallel import is_my_job_by_hash_prehashed 

63from crate_anon.common.sql import decorate_index_name 

64from crate_anon.common.stringfunc import relevant_for_nlp 

65from crate_anon.nlp_manager.constants import ( 

66 FN_CRATE_VERSION_FIELD, 

67 FN_WHEN_FETCHED, 

68 FN_NLPDEF, 

69 FN_PK, 

70 FN_SRCDATETIMEFIELD, 

71 FN_SRCDATETIMEVAL, 

72 FN_SRCDB, 

73 FN_SRCTABLE, 

74 FN_SRCPKFIELD, 

75 FN_SRCPKVAL, 

76 FN_SRCPKSTR, 

77 FN_SRCFIELD, 

78 TRUNCATED_FLAG, 

79 InputFieldConfigKeys, 

80 MAX_SEMANTIC_VERSION_STRING_LENGTH, 

81 MAX_STRING_PK_LENGTH, 

82) 

83from crate_anon.nlp_manager.constants import ( 

84 full_sectionname, 

85 SqlTypeDbIdentifier, 

86) 

87from crate_anon.nlp_manager.models import NlpRecord 

88from crate_anon.nlp_manager.nlp_definition import ( 

89 NlpConfigPrefixes, 

90 NlpDefinition, 

91) 

92from crate_anon.version import CRATE_VERSION 

93 

94log = logging.getLogger(__name__) 

95 

96TIMING_GEN_TEXT_SQL_SELECT = "gen_text_sql_select" 

97TIMING_PROCESS_GEN_TEXT = "process_generated_text" 

98TIMING_PROGRESS_DB_SELECT = "progress_db_select" 

99TIMING_PROGRESS_DB_DELETE = "progress_db_delete" 

100 

101 

102# ============================================================================= 

103# Input field definition 

104# ============================================================================= 

105 

106 

107class InputFieldConfig: 

108 """ 

109 Class defining an input field for NLP (containing text). 

110 

111 See the documentation for the :ref:`NLP config file <nlp_config>`. 

112 """ 

113 

114 def __init__(self, nlpdef: NlpDefinition, cfg_input_name: str) -> None: 

115 """ 

116 Read config from a configparser section, and also associate with a 

117 specific NLP definition. 

118 

119 Args: 

120 nlpdef: 

121 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`, 

122 the master NLP definition, referring to the master config file 

123 etc. 

124 cfg_input_name: 

125 config section name for the input field definition 

126 """ 

127 self.name = cfg_input_name 

128 cfg = nlpdef.get_config_section( 

129 full_sectionname(NlpConfigPrefixes.INPUT, cfg_input_name) 

130 ) 

131 

132 self._nlpdef = nlpdef 

133 

134 self._srcdb = cfg.opt_str(InputFieldConfigKeys.SRCDB) 

135 self._srctable = cfg.opt_str(InputFieldConfigKeys.SRCTABLE) 

136 self._srcpkfield = cfg.opt_str(InputFieldConfigKeys.SRCPKFIELD) 

137 self._srcfield = cfg.opt_str(InputFieldConfigKeys.SRCFIELD) 

138 self._srcdatetimefield = cfg.opt_str( 

139 InputFieldConfigKeys.SRCDATETIMEFIELD, required=False 

140 ) 

141 # ... new in v0.18.52 

142 # Make these case-sensitive to avoid our failure in renaming SQLA 

143 # Column objects to be lower-case: 

144 self._copyfields = cfg.opt_multiline( 

145 InputFieldConfigKeys.COPYFIELDS 

146 ) # fieldnames 

147 self._indexed_copyfields = cfg.opt_multiline( 

148 InputFieldConfigKeys.INDEXED_COPYFIELDS 

149 ) 

150 self._debug_row_limit = cfg.opt_int( 

151 InputFieldConfigKeys.DEBUG_ROW_LIMIT, default=0 

152 ) 

153 # self._fetch_sorted = opt_bool('fetch_sorted', default=True) 

154 

155 ensure_valid_table_name(self._srctable) 

156 ensure_valid_field_name(self._srcpkfield) 

157 ensure_valid_field_name(self._srcfield) 

158 if self._srcdatetimefield: 

159 ensure_valid_field_name(self._srcdatetimefield) 

160 

161 if len(set(self._indexed_copyfields)) != len(self._indexed_copyfields): 

162 raise ValueError( 

163 f"Redundant indexed_copyfields: {self._indexed_copyfields}" 

164 ) 

165 

166 if len(set(self._copyfields)) != len(self._copyfields): 

167 raise ValueError(f"Redundant copyfields: {self._copyfields}") 

168 

169 indexed_not_copied = set(self._indexed_copyfields) - set( 

170 self._copyfields 

171 ) 

172 if indexed_not_copied: 

173 raise ValueError( 

174 f"Fields in index_copyfields but not in " 

175 f"copyfields: {indexed_not_copied}" 

176 ) 

177 

178 # allfields = [self._srcpkfield, self._srcfield] + self._copyfields 

179 # if len(allfields) != len(set(allfields)): 

180 # raise ValueError( 

181 # f"Field overlap in InputFieldConfig: {section}") 

182 # RE-THOUGHT: OK to copy source text fields etc. if desired. 

183 # It's fine in SQL to say SELECT a, a FROM mytable; 

184 

185 self._db = nlpdef.get_database(self._srcdb) 

186 

187 @property 

188 def srcdb(self) -> str: 

189 """ 

190 Returns the name of the source database. 

191 """ 

192 return self._srcdb 

193 

194 @property 

195 def srctable(self) -> str: 

196 """ 

197 Returns the name of the source table. 

198 """ 

199 return self._srctable 

200 

201 @property 

202 def srcpkfield(self) -> str: 

203 """ 

204 Returns the name of the primary key (PK) field (column) in the source 

205 table. 

206 """ 

207 return self._srcpkfield 

208 

209 @property 

210 def srcfield(self) -> str: 

211 """ 

212 Returns the name of the text field (column) in the source table. 

213 """ 

214 return self._srcfield 

215 

216 @property 

217 def srcdatetimefield(self) -> str: # new in v0.18.52 

218 """ 

219 Returns the name of the field (column) in the source table that defines 

220 the date/time of the source text. 

221 """ 

222 return self._srcdatetimefield 

223 

224 @property 

225 def source_session(self) -> Session: 

226 """ 

227 Returns the SQLAlchemy ORM :class:`Session` for the source database. 

228 """ 

229 return self._db.session 

230 

231 @property 

232 def _source_metadata(self) -> MetaData: 

233 """ 

234 Returns the SQLAlchemy :class:`MetaData` for the source database, 

235 used for reflection (inspection) of the source database structure. 

236 """ 

237 return self._db.metadata 

238 

239 @property 

240 def _source_engine(self) -> Engine: 

241 """ 

242 Returns the SQLAlchemy Core :class:`Engine` for the source database. 

243 """ 

244 return self._db.engine 

245 

246 @property 

247 def _progress_session(self) -> Session: 

248 """ 

249 Returns the SQLAlchemy ORM :class:`Session` for the progress database. 

250 """ 

251 return self._nlpdef.progressdb_session 

252 

253 @staticmethod 

254 def get_core_columns_for_dest() -> List[Column]: 

255 """ 

256 Returns the columns used NLP destination tables, primarily describing 

257 the source. See :ref:`Standard NLP output columns 

258 <standard_nlp_output_columns>`. 

259 

260 Returns: 

261 a list of SQLAlchemy :class:`Column` objects 

262 """ 

263 return [ 

264 Column( 

265 FN_PK, 

266 # Autoincrement under SQLite needs a trick: 

267 # https://docs.sqlalchemy.org/en/20/dialects/sqlite.html 

268 BigInteger().with_variant(Integer, "sqlite"), 

269 primary_key=True, 

270 autoincrement=True, 

271 comment="Arbitrary primary key (PK) of output record", 

272 ), 

273 Column( 

274 FN_NLPDEF, 

275 SqlTypeDbIdentifier, 

276 comment="Name of the NLP definition producing this row", 

277 ), 

278 Column( 

279 FN_SRCDB, 

280 SqlTypeDbIdentifier, 

281 comment="Source database name (from CRATE NLP config)", 

282 ), 

283 Column( 

284 FN_SRCTABLE, SqlTypeDbIdentifier, comment="Source table name" 

285 ), 

286 Column( 

287 FN_SRCPKFIELD, 

288 SqlTypeDbIdentifier, 

289 comment="PK field (column) name in source table", 

290 ), 

291 Column( 

292 FN_SRCPKVAL, 

293 BigInteger, 

294 comment="PK of source record (or integer hash of PK if the " 

295 "PK is a string)", 

296 ), 

297 Column( 

298 FN_SRCPKSTR, 

299 String(MAX_STRING_PK_LENGTH), 

300 comment=f"NULL if the table has an integer PK, but the PK " 

301 f"if the PK was a string, to deal with hash " 

302 f"collisions. Max length: {MAX_STRING_PK_LENGTH}", 

303 ), 

304 Column( 

305 FN_SRCFIELD, 

306 SqlTypeDbIdentifier, 

307 comment="Field (column) name of source text", 

308 ), 

309 Column( 

310 FN_SRCDATETIMEFIELD, 

311 SqlTypeDbIdentifier, 

312 comment="Date/time field (column) name in source table", 

313 ), 

314 Column( 

315 FN_SRCDATETIMEVAL, 

316 DateTime, 

317 nullable=True, 

318 comment="Date/time of source field", 

319 ), 

320 Column( 

321 FN_CRATE_VERSION_FIELD, 

322 String(MAX_SEMANTIC_VERSION_STRING_LENGTH), 

323 nullable=True, 

324 comment="Version of CRATE that generated this NLP record.", 

325 ), 

326 Column( 

327 FN_WHEN_FETCHED, 

328 DateTime, 

329 nullable=True, 

330 comment="Date/time that the NLP processor fetched the " 

331 "record from the source database (in UTC).", 

332 ), 

333 ] 

334 

335 @staticmethod 

336 def get_core_indexes_for_dest( 

337 tablename: str, engine: Engine 

338 ) -> List[Index]: 

339 """ 

340 Returns the core indexes to be applied to the destination tables. 

341 Primarily, these are for columns that refer to the source. 

342 

343 Args: 

344 tablename: 

345 The name of the table to be used in the destination. 

346 engine: 

347 The destination database SQLAlchemy Engine. 

348 

349 Returns: 

350 a list of SQLAlchemy :class:`Index` objects 

351 

352 See 

353 - https://stackoverflow.com/questions/179085/multiple-indexes-vs-multi-column-indexes 

354 """ # noqa: E501 

355 return [ 

356 Index( 

357 decorate_index_name("_idx_srcref", tablename, engine), 

358 # Remember, order matters; more to less specific 

359 # See also BaseNlpParser.delete_dest_record 

360 FN_SRCPKVAL, 

361 FN_NLPDEF, 

362 FN_SRCFIELD, 

363 FN_SRCTABLE, 

364 FN_SRCDB, 

365 FN_SRCPKSTR, 

366 ), 

367 Index( 

368 decorate_index_name("_idx_srcdate", tablename, engine), 

369 FN_SRCDATETIMEVAL, 

370 ), 

371 Index( 

372 decorate_index_name("_idx_deletion", tablename, engine), 

373 # We sometimes delete just using the following; see 

374 # BaseNlpParser.delete_where_srcpk_not 

375 FN_NLPDEF, 

376 FN_SRCFIELD, 

377 FN_SRCTABLE, 

378 FN_SRCDB, 

379 ), 

380 ] 

381 

382 def _require_table_or_view_exists(self) -> None: 

383 """ 

384 Ensure that the source table exists, or raise :exc:`RuntimeError`. 

385 """ 

386 if not table_or_view_exists(self._source_engine, self._srctable): 

387 raise RuntimeError( 

388 f"Missing source table: {self._srcdb}.{self._srctable}" 

389 ) 

390 

391 def get_copy_columns(self) -> List[Column]: 

392 """ 

393 Returns the columns that the user has requested to be copied from the 

394 source table to the NLP destination table. 

395 

396 Returns: 

397 a list of SQLAlchemy :class:`Column` objects 

398 

399 """ 

400 # We read the column type from the source database. 

401 self._require_table_or_view_exists() 

402 meta = self._source_metadata 

403 t = Table(self._srctable, meta, autoload_with=self._source_engine) 

404 copy_columns = [] # type: List[Column] 

405 processed_copy_column_names = [] # type: List[str] 

406 for c in t.columns: 

407 # if c.name.lower() in self._copyfields: 

408 if c.name in self._copyfields: 

409 copied = c.copy() 

410 # Force lower case: 

411 # copied.name = copied.name.lower() 

412 # copied.name = quoted_name(copied.name.lower(), None) 

413 # ... this is not working properly. Keep getting an 

414 # "Unconsumed column names" error with e.g. a source field of 

415 # "Text". 

416 # Try making copyfields case-sensitive instead. 

417 copy_columns.append(copied) 

418 processed_copy_column_names.append(c.name) 

419 # Check all requested fields are present: 

420 missing = set(self._copyfields) - set(processed_copy_column_names) 

421 if missing: 

422 raise RuntimeError( 

423 f"The following fields were requested to be copied but are " 

424 f"absent from the source (NB case-sensitive): {missing}" 

425 ) 

426 # log.critical(copy_columns) 

427 return copy_columns 

428 

429 def get_copy_indexes(self) -> List[Index]: 

430 """ 

431 Returns indexes that should be made in the destination table for 

432 columns that the user has requested to be copied from the source. 

433 

434 Returns: 

435 a list of SQLAlchemy :class:`Index` objects 

436 

437 """ 

438 self._require_table_or_view_exists() 

439 meta = self._source_metadata 

440 t = Table(self._srctable, meta, autoload_with=self._source_engine) 

441 copy_indexes = [] # type: List[Index] 

442 processed_copy_index_col_names = [] # type: List[str] 

443 for c in t.columns: 

444 # if c.name.lower() in self._indexed_copyfields: 

445 if c.name in self._indexed_copyfields: 

446 copied = c.copy() 

447 # See above re case. 

448 idx_name = f"idx_{c.name}" 

449 copy_indexes.append(Index(idx_name, copied)) 

450 processed_copy_index_col_names.append(c.name) 

451 missing = set(self._indexed_copyfields) - set( 

452 processed_copy_index_col_names 

453 ) 

454 if missing: 

455 raise ValueError( 

456 f"The following fields were requested to be copied/indexed but" 

457 f" are absent from the source (NB case-sensitive): {missing}" 

458 ) 

459 return copy_indexes 

460 

461 def is_pk_integer(self) -> bool: 

462 """ 

463 Is the primary key (PK) of the source table an integer? 

464 """ 

465 pkcoltype = get_column_type( 

466 self._source_engine, self._srctable, self._srcpkfield 

467 ) 

468 if not pkcoltype: 

469 raise ValueError( 

470 f"Unable to get column type for column " 

471 f"{self._srctable}.{self._srcpkfield}" 

472 ) 

473 pk_is_integer = is_sqlatype_integer(pkcoltype) 

474 # log.debug(f"pk_is_integer: {pkcoltype!r} -> {pk_is_integer}") 

475 return pk_is_integer 

476 

477 def gen_text( 

478 self, tasknum: int = 0, ntasks: int = 1 

479 ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: 

480 """ 

481 Generate text strings from the source database, for NLP. Text fields 

482 that are NULL, empty, or contain only whitespace, are skipped. 

483 

484 Yields: 

485 tuple: ``text, dict``, where ``text`` is the source text and 

486 ``dict`` is a column-to-value mapping for all other fields (source 

487 reference fields, copy fields). 

488 """ 

489 if 1 < ntasks <= tasknum: 

490 raise RuntimeError(f"Invalid tasknum {tasknum}; must be <{ntasks}") 

491 

492 # --------------------------------------------------------------------- 

493 # Values that are constant to all items we will generate 

494 # (i.e. database/field *names*, plus CRATE version info) 

495 # --------------------------------------------------------------------- 

496 base_dict = { 

497 FN_SRCDB: self._srcdb, 

498 FN_SRCTABLE: self._srctable, 

499 FN_SRCPKFIELD: self._srcpkfield, 

500 FN_SRCFIELD: self._srcfield, 

501 FN_SRCDATETIMEFIELD: self._srcdatetimefield, 

502 FN_CRATE_VERSION_FIELD: CRATE_VERSION, 

503 } 

504 

505 # --------------------------------------------------------------------- 

506 # Build a query 

507 # --------------------------------------------------------------------- 

508 session = self.source_session 

509 pkcol = column(self._srcpkfield) 

510 # ... don't use is_sqlatype_integer with this; it's a column clause, 

511 # not a full column definition. 

512 pk_is_integer = self.is_pk_integer() 

513 

514 # Core columns 

515 colindex_pk = 0 

516 colindex_text = 1 

517 colindex_datetime = 2 

518 colindex_remainder_start = 3 

519 selectcols = [ 

520 pkcol, 

521 column(self._srcfield), 

522 ( 

523 column(self._srcdatetimefield) 

524 if self._srcdatetimefield 

525 else null() 

526 ), 

527 ] 

528 # User-specified extra columns 

529 for extracol in self._copyfields: 

530 selectcols.append(column(extracol)) 

531 

532 query = select(*selectcols).select_from(table(self._srctable)) 

533 # not ordered... 

534 # if self._fetch_sorted: 

535 # query = query.order_by(pkcol) 

536 

537 # --------------------------------------------------------------------- 

538 # Plan our parallel-processing approach 

539 # --------------------------------------------------------------------- 

540 distribute_by_hash = False 

541 if ntasks > 1: 

542 if pk_is_integer: 

543 # Integer PK, so we can be efficient and bake the parallel 

544 # processing work division into the SQL: 

545 query = query.where(pkcol % ntasks == tasknum) 

546 else: 

547 distribute_by_hash = True 

548 

549 # --------------------------------------------------------------------- 

550 # Execute the query 

551 # --------------------------------------------------------------------- 

552 nrows_returned = 0 

553 with MultiTimerContext(timer, TIMING_GEN_TEXT_SQL_SELECT): 

554 when_fetched = get_now_utc_notz_datetime() 

555 result = session.execute(query) 

556 for row in result: # ... "result" is a generator 

557 with MultiTimerContext(timer, TIMING_PROCESS_GEN_TEXT): 

558 # Get PK value 

559 pkval = row[colindex_pk] 

560 

561 # Deal with non-integer PKs 

562 if pk_is_integer: 

563 hashed_pk = None 

564 # ... remove warning about reference before assignment 

565 else: 

566 hashed_pk = hash64(pkval) 

567 if ( 

568 distribute_by_hash 

569 and not is_my_job_by_hash_prehashed( 

570 hashed_pk, tasknum, ntasks 

571 ) 

572 ): 

573 continue 

574 

575 # Optional debug limit on the number of rows 

576 if 0 < self._debug_row_limit <= nrows_returned: 

577 log.warning( 

578 f"Table {self._srcdb}.{self._srctable}: not " 

579 f"fetching more than {self._debug_row_limit} rows " 

580 f"(in total for this process) due to debugging " 

581 f"limits" 

582 ) 

583 result.close() 

584 # http://docs.sqlalchemy.org/en/latest/core/connections.html # noqa: E501 

585 return 

586 

587 # Get text 

588 text = row[colindex_text] 

589 

590 # Skip text that is absent/empty/contains only whitespace: 

591 if not relevant_for_nlp(text): 

592 continue 

593 # We don't modify (e.g. strip) the text, because our NLP 

594 # processor may return relevant character positions, so we 

595 # want those to be correct with respect to the source. 

596 

597 # Get everything else 

598 other_values = dict( 

599 zip(self._copyfields, row[colindex_remainder_start:]) 

600 ) 

601 if pk_is_integer: 

602 other_values[FN_SRCPKVAL] = pkval # an integer 

603 other_values[FN_SRCPKSTR] = None 

604 else: # hashed_pk will have been set above 

605 other_values[FN_SRCPKVAL] = hashed_pk # an integer 

606 other_values[FN_SRCPKSTR] = pkval # a string etc. 

607 other_values[FN_SRCDATETIMEVAL] = row[colindex_datetime] 

608 other_values[FN_WHEN_FETCHED] = when_fetched 

609 other_values.update(base_dict) 

610 if ( 

611 self._nlpdef.truncate_text_at 

612 and len(text) > self._nlpdef.truncate_text_at 

613 ): 

614 text = text[: self._nlpdef.truncate_text_at] 

615 other_values[TRUNCATED_FLAG] = True 

616 else: 

617 other_values[TRUNCATED_FLAG] = False 

618 

619 # Yield the result 

620 yield text, other_values 

621 nrows_returned += 1 

622 

623 def get_count(self) -> int: 

624 """ 

625 Counts records in the source table. 

626 

627 Used for progress monitoring. 

628 """ 

629 return count_star( 

630 session=self.source_session, tablename=self._srctable 

631 ) 

632 

633 def get_progress_record( 

634 self, srcpkval: int, srcpkstr: str = None 

635 ) -> Optional[NlpRecord]: 

636 """ 

637 Fetch a progress record for the given source record, if one exists. 

638 

639 Returns: 

640 :class:`crate_anon.nlp_manager.models.NlpRecord`, or ``None`` 

641 """ 

642 session = self._progress_session 

643 query = ( 

644 session.query(NlpRecord) 

645 .filter(NlpRecord.srcdb == self._srcdb) 

646 .filter(NlpRecord.srctable == self._srctable) 

647 .filter(NlpRecord.srcpkval == srcpkval) 

648 .filter(NlpRecord.srcfield == self._srcfield) 

649 .filter(NlpRecord.nlpdef == self._nlpdef.name) 

650 # Order not important (though the order of the index certainly 

651 # is; see NlpRecord.__table_args__). 

652 # https://stackoverflow.com/questions/11436469/does-order-of-where-clauses-matter-in-sql # noqa: E501 

653 ) 

654 if srcpkstr is not None: 

655 query = query.filter(NlpRecord.srcpkstr == srcpkstr) 

656 # log.critical(query) 

657 with MultiTimerContext(timer, TIMING_PROGRESS_DB_SELECT): 

658 # This was surprisingly slow under SQL Server testing. 

659 return query.one_or_none() 

660 

661 def gen_src_pks(self) -> Generator[Tuple[int, Optional[str]], None, None]: 

662 """ 

663 Generate integer PKs from the source table. 

664 

665 For tables with an integer PK, yields tuples: ``pk_value, None``. 

666 

667 For tables with a string PK, yields tuples: ``pk_hash, pk_value``. 

668 

669 - Timing is subsumed under the timer named 

670 ``TIMING_DELETE_WHERE_NO_SOURCE``. 

671 """ 

672 session = self.source_session 

673 query = select(column(self._srcpkfield)).select_from( 

674 table(self._srctable) 

675 ) 

676 result = session.execute(query) 

677 if self.is_pk_integer(): 

678 for row in result: 

679 yield row[0], None 

680 else: 

681 for row in result: 

682 pkval = row[0] 

683 yield hash64(pkval), pkval 

684 

685 def delete_progress_records_where_srcpk_not( 

686 self, temptable: Optional[Table] 

687 ) -> None: 

688 """ 

689 If ``temptable`` is None, deletes all progress records for this input 

690 field/NLP definition. 

691 

692 If ``temptable`` is a table, deletes records from the progress database 

693 (from this input field/NLP definition) whose source PK is not in the 

694 temporary table. (Used for deleting NLP records when the source has 

695 subsequently been deleted.) 

696 

697 """ 

698 progsession = self._progress_session 

699 log.debug( 

700 f"delete_progress_records_where_srcpk_not... " 

701 f"{self._srcdb}.{self._srctable} -> progressdb" 

702 ) 

703 prog_deletion_query = ( 

704 progsession.query(NlpRecord) 

705 .filter(NlpRecord.srcdb == self._srcdb) 

706 .filter(NlpRecord.srctable == self._srctable) 

707 . 

708 # unnecessary # filter(NlpRecord.srcpkfield == self._srcpkfield). 

709 filter(NlpRecord.nlpdef == self._nlpdef.name) 

710 ) 

711 if temptable is not None: 

712 log.debug("... deleting selectively") 

713 temptable_pkvalcol = temptable.columns[FN_SRCPKVAL] 

714 temptable_pkstrcol = temptable.columns[FN_SRCPKSTR] 

715 prog_deletion_query = prog_deletion_query.filter( 

716 ~exists().where( 

717 and_( 

718 NlpRecord.srcpkval == temptable_pkvalcol, 

719 or_( 

720 NlpRecord.srcpkstr == temptable_pkstrcol, 

721 and_( 

722 NlpRecord.srcpkstr.is_(None), 

723 temptable_pkstrcol.is_(None), 

724 ), 

725 ), 

726 ) 

727 ) 

728 ) 

729 else: 

730 log.debug("... deleting all") 

731 with MultiTimerContext(timer, TIMING_PROGRESS_DB_DELETE): 

732 prog_deletion_query.delete(synchronize_session=False) 

733 # http://docs.sqlalchemy.org/en/latest/orm/query.html#sqlalchemy.orm.query.Query.delete # noqa: E501 

734 self._nlpdef.commit(progsession) 

735 

736 def delete_all_progress_records(self) -> None: 

737 """ 

738 Deletes **all** records from the progress database for this NLP 

739 definition (across all source tables/columns). 

740 """ 

741 progsession = self._progress_session 

742 prog_deletion_query = progsession.query(NlpRecord).filter( 

743 NlpRecord.nlpdef == self._nlpdef.name 

744 ) 

745 log.debug( 

746 f"delete_all_progress_records for NLP definition: " 

747 f"{self._nlpdef.name}" 

748 ) 

749 with MultiTimerContext(timer, TIMING_PROGRESS_DB_DELETE): 

750 prog_deletion_query.delete(synchronize_session=False) 

751 self._nlpdef.commit(progsession)