Coverage for nlp_manager/nlp_manager.py: 20%

517 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1#!/usr/bin/env python 

2 

3""" 

4crate_anon/nlp_manager/nlp_manager.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

9 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

10 

11 This file is part of CRATE. 

12 

13 CRATE is free software: you can redistribute it and/or modify 

14 it under the terms of the GNU General Public License as published by 

15 the Free Software Foundation, either version 3 of the License, or 

16 (at your option) any later version. 

17 

18 CRATE is distributed in the hope that it will be useful, 

19 but WITHOUT ANY WARRANTY; without even the implied warranty of 

20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

21 GNU General Public License for more details. 

22 

23 You should have received a copy of the GNU General Public License 

24 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

25 

26=============================================================================== 

27 

28**Manage natural-language processing (NLP) via internal and external tools.** 

29 

30Speed testing: 

31 

32- 8 processes, extracting person, location from a mostly text database 

33- commit off during full (non-incremental) processing (much faster) 

34- needs lots of RAM; e.g. Java subprocess uses 1.4 Gb per process as an 

35 average (rises from ~250Mb to ~1.4Gb and falls; steady rise means memory 

36 leak!); tested on a 16 Gb machine. See also the ``max_external_prog_uses`` 

37 parameter. 

38 

39.. code-block:: python 

40 

41 from __future__ import division 

42 test_size_mb = 1887 

43 n_person_tags_found = 

44 n_locations_tags_found = 

45 time_s = 10333 # 10333 s for main bit; 10465 including indexing; is 2.9 hours 

46 speed_mb_per_s = test_size_mb / time_s 

47 

48... gives 0.18 Mb/s, and note that's 1.9 Gb of *text*, not of attachments. 

49 

50- With incremental option, and nothing to do: same run took 18 s. 

51- During the main run, snapshot CPU usage: 

52 

53 .. code-block:: none 

54 

55 java about 81% across all processes, everything else close to 0 

56 (using about 12 Gb RAM total) 

57 ... or 75-85% * 8 [from top] 

58 mysqld about 18% [from top] 

59 nlp_manager.py about 4-5% * 8 [from top] 

60 

61""" # noqa: E501 

62 

63# ============================================================================= 

64# Imports 

65# ============================================================================= 

66 

67import argparse 

68import csv 

69import json 

70import logging 

71import os 

72import sys 

73from typing import ( 

74 Any, 

75 Dict, 

76 List, 

77 Optional, 

78 Set, 

79 Tuple, 

80 TYPE_CHECKING, 

81) 

82 

83from cardinal_pythonlib.datetimefunc import get_now_utc_pendulum 

84from cardinal_pythonlib.fileops import purge 

85from cardinal_pythonlib.logs import configure_logger_for_colour 

86from cardinal_pythonlib.sqlalchemy.core_query import count_star 

87from cardinal_pythonlib.timing import MultiTimerContext, timer 

88from sqlalchemy.engine.base import Engine 

89from sqlalchemy.orm.session import Session 

90from sqlalchemy.schema import Column, Index, MetaData, Table 

91from sqlalchemy.types import BigInteger, String 

92 

93from crate_anon.anonymise.constants import ( 

94 DEFAULT_CHUNKSIZE, 

95 DEFAULT_REPORT_EVERY, 

96 TABLE_KWARGS, 

97 SEP, 

98) 

99from crate_anon.anonymise.dbholder import DatabaseHolder 

100from crate_anon.common.argparse_assist import ( 

101 RawDescriptionArgumentDefaultsRichHelpFormatter, 

102) 

103from crate_anon.common.constants import JSON_INDENT 

104from crate_anon.common.exceptions import call_main_with_exception_reporting 

105from crate_anon.common.formatting import print_record_counts 

106from crate_anon.common.inputfunc import gen_chunks_from_files 

107from crate_anon.common.stringfunc import relevant_for_nlp 

108from crate_anon.nlp_manager.all_processors import ( 

109 possible_processor_names_including_cloud, 

110 possible_processor_table, 

111) 

112from crate_anon.nlp_manager.base_nlp_parser import ( 

113 BaseNlpParser, 

114 TextProcessingFailed, 

115) 

116from crate_anon.nlp_manager.cloud_parser import Cloud 

117from crate_anon.nlp_manager.cloud_request import ( 

118 CloudRequest, 

119 CloudRequestListProcessors, 

120 CloudRequestProcess, 

121 CloudRequestQueueManagement, 

122 extract_nlprp_top_level_results, 

123 parse_nlprp_docresult_metadata, 

124) 

125from crate_anon.nlp_manager.cloud_request_sender import CloudRequestSender 

126from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo 

127from crate_anon.nlp_manager.constants import ( 

128 DEFAULT_REPORT_EVERY_NLP, 

129 FN_SRCDB, 

130 FN_SRCFIELD, 

131 FN_SRCPKFIELD, 

132 FN_SRCPKSTR, 

133 FN_SRCPKVAL, 

134 FN_SRCTABLE, 

135 MAX_STRING_PK_LENGTH, 

136 NLP_CONFIG_ENV_VAR, 

137 NlpDefConfigKeys, 

138 TRUNCATED_FLAG, 

139) 

140from crate_anon.nlp_manager.input_field_config import InputFieldConfig 

141from crate_anon.nlp_manager.models import NlpRecord 

142from crate_anon.nlp_manager.nlp_definition import ( 

143 NlpDefinition, 

144 demo_nlp_config, 

145) 

146from crate_anon.version import CRATE_VERSION, CRATE_VERSION_DATE 

147 

148# from crate_anon.common.profiling import do_cprofile 

149 

150if TYPE_CHECKING: 

151 from http.cookiejar import CookieJar 

152 

153log = logging.getLogger(__name__) 

154 

155TIMING_DROP_REMAKE = "drop_remake" 

156TIMING_DELETE_WHERE_NO_SOURCE = "delete_where_no_source" 

157TIMING_PROGRESS_DB_ADD = "progress_db_add" 

158TIMING_PROGREC_TOTAL = "progrec_total" 

159TIMING_PROGREC_CREATE = "create_progrec" 

160 

161 

162# ============================================================================= 

163# Simple information classes 

164# ============================================================================= 

165 

166 

167class DbInfo: 

168 """ 

169 Simple object carrying information about a database. 

170 Used by :func:`delete_where_no_source`. 

171 """ 

172 

173 def __init__( 

174 self, 

175 session: Session = None, 

176 engine: Engine = None, 

177 metadata: MetaData = None, 

178 db: DatabaseHolder = None, 

179 temptable: Table = None, 

180 ) -> None: 

181 self.session = session 

182 self.engine = engine 

183 self.metadata = metadata 

184 self.db = db 

185 self.temptable = temptable 

186 

187 

188# ============================================================================= 

189# Database operations 

190# ============================================================================= 

191 

192 

193def delete_where_no_source( 

194 nlpdef: NlpDefinition, 

195 ifconfig: InputFieldConfig, 

196 report_every: int = DEFAULT_REPORT_EVERY, 

197 chunksize: int = DEFAULT_CHUNKSIZE, 

198) -> None: 

199 """ 

200 Delete destination records where source records no longer exist. 

201 

202 Args: 

203 nlpdef: 

204 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

205 ifconfig: 

206 `crate_anon.nlp_manager.input_field_config.InputFieldConfig` 

207 report_every: 

208 report to the log every *n* source rows 

209 chunksize: 

210 insert into the SQLAlchemy session every *n* records 

211 

212 Development thoughts: 

213 

214 - Can't do this in a single SQL command, since the engine can't necessarily 

215 see both databases. 

216 - Can't use a single temporary table, since the progress database isn't 

217 necessarily the same as any of the destination database(s). 

218 - Can't do this in a multiprocess way, because we're trying to do a 

219 ``DELETE WHERE NOT IN``. 

220 - So my first attempt was: fetch all source PKs (which, by definition, do 

221 exist), stash them in memory, and do a ``DELETE WHERE NOT IN`` based on 

222 those specified values (or, if there are no PKs in the source, delete 

223 everything from the destination). 

224 

225 Problems with that: 

226 

227 - This is IMPERFECT if we have string source PKs and there are hash 

228 collisions (e.g. PKs for records X and Y both hash to the same thing; 

229 record X is deleted; then its processed version might not be). 

230 - With massive tables, we might run out of memory or (much more likely) 

231 SQL parameter slots. -- This is now happening; error looks like: 

232 pyodbc.ProgrammingError: ('The SQL contains 30807 parameter parkers, but 

233 2717783 parameters were supplied', 'HY000') 

234 

235 A better way might be: 

236 

237 - for each table, make a temporary table in the same database 

238 - populate that table with (source PK integer/hash, source PK string) pairs 

239 - delete where pairs don't match -- is that portable SQL? 

240 https://stackoverflow.com/questions/7356108/sql-query-for-deleting-rows-with-not-in-using-2-columns # noqa: E501 

241 

242 More efficient would be to make one table per destination database. 

243 

244 On the "delete where multiple fields don't match": 

245 

246 - Single field syntax is 

247 

248 .. code-block:: sql 

249 

250 DELETE FROM a WHERE a1 NOT IN (SELECT b1 FROM b) 

251 

252 - Multiple field syntax is 

253 

254 .. code-block:: sql 

255 

256 DELETE FROM a WHERE NOT EXISTS ( 

257 SELECT 1 FROM b 

258 WHERE a.a1 = b.b1 

259 AND a.a2 = b.b2 

260 ) 

261 

262 - In SQLAlchemy, :func:`exists`: 

263 

264 - https://stackoverflow.com/questions/14600619 

265 - https://docs.sqlalchemy.org/en/latest/core/selectable.html 

266 

267 - Furthermore, in SQL ``NULL = NULL`` is false (it's null), and ``NULL <> 

268 NULL`` is also false (it's null), so we have to do an explicit null 

269 check. You do that with ``field == None``. See 

270 https://stackoverflow.com/questions/21668606. We're aiming, therefore, 

271 for: 

272 

273 .. code-block:: sql 

274 

275 DELETE FROM a WHERE NOT EXISTS ( 

276 SELECT 1 FROM b 

277 WHERE a.a1 = b.b1 

278 AND ( 

279 a.a2 = b.b2 

280 OR (a.a2 IS NULL AND b.b2 IS NULL) 

281 ) 

282 ) 

283 

284 """ 

285 

286 # ------------------------------------------------------------------------- 

287 # Sub-functions 

288 # ------------------------------------------------------------------------- 

289 

290 def insert(records_: List[Dict[str, Any]]) -> None: 

291 n_rows = len(records_) 

292 log.debug(f"... inserting {n_rows} records") 

293 for db in databases: 

294 db.session.execute(db.temptable.insert(), records_) 

295 nlpdef.notify_transaction( 

296 db.session, n_rows=n_rows, n_bytes=sys.getsizeof(records_) 

297 ) 

298 

299 def commit() -> None: 

300 for db in databases: 

301 nlpdef.commit(db.session) 

302 

303 # ------------------------------------------------------------------------- 

304 # Main code 

305 # ------------------------------------------------------------------------- 

306 # Use info log level, otherwise it looks like our code hangs with very 

307 # large databases. 

308 

309 log.info( 

310 f"delete_where_no_source: examining source table " 

311 f"{ifconfig.srcdb}.{ifconfig.srctable}; MAY BE SLOW" 

312 ) 

313 

314 # Start our list with the progress database 

315 databases = [ 

316 DbInfo( 

317 session=nlpdef.progressdb_session, 

318 engine=nlpdef.progressdb_engine, 

319 metadata=nlpdef.progressdb_metadata, 

320 db=nlpdef.progdb, 

321 temptable=None, 

322 ) 

323 ] 

324 

325 # Add the processors' destination databases 

326 for processor in nlpdef.processors: # of type TableMaker 

327 if isinstance(processor, Cloud) and not processor.available_remotely: 

328 continue 

329 session = processor.dest_session 

330 if any(x.session == session for x in databases): 

331 continue # already exists 

332 databases.append( 

333 DbInfo( 

334 session=session, 

335 engine=processor.dest_engine, 

336 metadata=processor.dest_metadata, 

337 db=processor.destdb, 

338 ) 

339 ) 

340 

341 # Make a temporary table in each database (note: the Table objects become 

342 # affiliated to their engine, I think, so make separate ones for each). 

343 log.info(f"... using {len(databases)} destination database(s)") 

344 log.info("... dropping (if exists) and creating temporary table(s)") 

345 for database in databases: 

346 temptable = Table( 

347 nlpdef.temporary_tablename, 

348 database.metadata, 

349 Column(FN_SRCPKVAL, BigInteger), # not PK, as may be a hash 

350 Column(FN_SRCPKSTR, String(MAX_STRING_PK_LENGTH)), 

351 **TABLE_KWARGS, 

352 ) 

353 temptable.drop(database.engine, checkfirst=True) 

354 temptable.create(database.engine, checkfirst=True) 

355 database.temptable = temptable 

356 

357 # Insert PKs into temporary tables 

358 

359 n = count_star(ifconfig.source_session, ifconfig.srctable) 

360 log.info( 

361 f"... populating temporary table(s): {n} records to go; " 

362 f"working in chunks of {chunksize}" 

363 ) 

364 i = 0 

365 records = [] # type: List[Dict[str, Any]] 

366 for pkval, pkstr in ifconfig.gen_src_pks(): 

367 i += 1 

368 if report_every and i % report_every == 0: 

369 log.info(f"... src row# {i} / {n}") 

370 records.append({FN_SRCPKVAL: pkval, FN_SRCPKSTR: pkstr}) 

371 if i % chunksize == 0: 

372 insert(records) 

373 records = [] # type: List[Dict[str, Any]] 

374 if records: # remainder 

375 insert(records) 

376 

377 # Commit 

378 commit() 

379 

380 # Index, for speed 

381 log.info("... creating index(es) on temporary table(s)") 

382 for database in databases: 

383 temptable = database.temptable 

384 index = Index("_temptable_idx", temptable.columns[FN_SRCPKVAL]) 

385 index.create(database.engine) 

386 

387 # DELETE FROM desttable WHERE destpk NOT IN (SELECT srcpk FROM temptable) 

388 log.info("... deleting from progress/destination DBs where appropriate") 

389 

390 # Delete from progress database 

391 prog_db = databases[0] 

392 prog_temptable = prog_db.temptable 

393 ifconfig.delete_progress_records_where_srcpk_not(prog_temptable) 

394 

395 # Delete from others 

396 for processor in nlpdef.processors: 

397 if isinstance(processor, Cloud) and not processor.available_remotely: 

398 continue 

399 database = [ 

400 x for x in databases if x.session == processor.dest_session 

401 ][0] 

402 temptable = database.temptable 

403 processor.delete_where_srcpk_not(ifconfig, temptable) 

404 

405 # Drop temporary tables 

406 log.info("... dropping temporary table(s)") 

407 for database in databases: 

408 database.temptable.drop(database.engine, checkfirst=True) 

409 

410 # Commit 

411 commit() 

412 

413 # Update metadata to reflect the fact that the temporary tables have been 

414 # dropped 

415 for database in databases: 

416 database.db.update_metadata() 

417 

418 

419def drop_remake( 

420 nlpdef: NlpDefinition, 

421 incremental: bool = False, 

422 skipdelete: bool = False, 

423 report_every: int = DEFAULT_REPORT_EVERY, 

424 chunksize: int = DEFAULT_CHUNKSIZE, 

425) -> None: 

426 """ 

427 Drop output tables and recreate them. 

428 

429 Args: 

430 nlpdef: 

431 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

432 incremental: incremental processing mode? 

433 skipdelete: 

434 For incremental updates, skip deletion of rows present in the 

435 destination but not the source 

436 report_every: report to the log every *n* source rows 

437 chunksize: insert into the SQLAlchemy session every *n* records 

438 """ 

439 # Not parallel. 

440 # ------------------------------------------------------------------------- 

441 # 1. Progress database 

442 # ------------------------------------------------------------------------- 

443 progengine = nlpdef.progressdb_engine 

444 if not incremental: 

445 log.debug("Dropping progress tables") 

446 # noinspection PyUnresolvedReferences 

447 NlpRecord.__table__.drop(progengine, checkfirst=True) 

448 log.info("Creating progress table (with index)") 

449 # noinspection PyUnresolvedReferences 

450 NlpRecord.__table__.create(progengine, checkfirst=True) 

451 

452 # ------------------------------------------------------------------------- 

453 # 2. Output database(s) 

454 # ------------------------------------------------------------------------- 

455 

456 pretty_names = [] # type: List[str] 

457 for processor in nlpdef.processors: 

458 if isinstance(processor, Cloud) and not processor.available_remotely: 

459 continue 

460 new_pretty_names = processor.make_tables(drop_first=not incremental) 

461 for npn in new_pretty_names: 

462 if npn in pretty_names: 

463 log.warning( 

464 f"An NLP processor has tried to re-make a table " 

465 f"made by one of its colleagues: {npn}" 

466 ) 

467 pretty_names.extend(new_pretty_names) 

468 

469 # ------------------------------------------------------------------------- 

470 # 3. Delete WHERE NOT IN for incremental 

471 # ------------------------------------------------------------------------- 

472 for ifconfig in nlpdef.inputfieldconfigs: 

473 with MultiTimerContext(timer, TIMING_DELETE_WHERE_NO_SOURCE): 

474 if incremental: 

475 if not skipdelete: 

476 delete_where_no_source( 

477 nlpdef, 

478 ifconfig, 

479 report_every=report_every, 

480 chunksize=chunksize, 

481 ) 

482 else: # full 

483 ifconfig.delete_all_progress_records() 

484 

485 # ------------------------------------------------------------------------- 

486 # 4. Overall commit (superfluous) 

487 # ------------------------------------------------------------------------- 

488 nlpdef.commit_all() 

489 

490 

491# ============================================================================= 

492# Core functions: local NLP 

493# ============================================================================= 

494 

495 

496def process_nlp( 

497 nlpdef: NlpDefinition, 

498 incremental: bool = False, 

499 report_every: int = DEFAULT_REPORT_EVERY_NLP, 

500 tasknum: int = 0, 

501 ntasks: int = 1, 

502) -> None: 

503 """ 

504 Main NLP processing function. Fetch text, send it to the NLP processor(s), 

505 storing the results, and make a note in the progress database. 

506 

507 Args: 

508 nlpdef: 

509 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

510 incremental: 

511 Incremental processing (skip previously processed records). 

512 report_every: 

513 Report to the log every *n* source rows. 

514 tasknum: 

515 Which task number am I? 

516 ntasks: 

517 How many tasks are there in total? 

518 """ 

519 log.info(SEP + "NLP") 

520 session = nlpdef.progressdb_session 

521 if not nlpdef.noncloud_processors: 

522 errmsg = ( 

523 f"Can't use NLP definition {nlpdef.name!r} as it has no " 

524 f"local processors (e.g. only has cloud processors). Specify the " 

525 f"cloud option to process via the cloud." 

526 ) 

527 log.critical(errmsg) 

528 raise ValueError(errmsg) 

529 

530 for ifconfig in nlpdef.inputfieldconfigs: 

531 i = 0 # record count within this process 

532 recnum = tasknum # record count overall 

533 totalcount = ifconfig.get_count() # total number of records in table 

534 for text, other_values in ifconfig.gen_text( 

535 tasknum=tasknum, ntasks=ntasks 

536 ): 

537 log.debug(len(text)) 

538 i += 1 

539 pkval = other_values[FN_SRCPKVAL] 

540 pkstr = other_values[FN_SRCPKSTR] 

541 if report_every and i % report_every == 0: 

542 log.info( 

543 "Processing {db}.{t}.{c}, PK: {pkf}={pkv} " 

544 "({overall}record {approx}{recnum}/{totalcount})" 

545 "{thisproc}".format( 

546 db=other_values[FN_SRCDB], 

547 t=other_values[FN_SRCTABLE], 

548 c=other_values[FN_SRCFIELD], 

549 pkf=other_values[FN_SRCPKFIELD], 

550 pkv=pkstr if pkstr else pkval, 

551 overall="overall " if ntasks > 1 else "", 

552 approx="~" if pkstr and ntasks > 1 else "", 

553 # ... string hashing means approx. distribution 

554 recnum=recnum + 1, 

555 totalcount=totalcount, 

556 thisproc=( 

557 " ({i}/~{proccount} this process)".format( 

558 i=i, proccount=totalcount // ntasks 

559 ) 

560 if ntasks > 1 

561 else "" 

562 ), 

563 ) 

564 ) 

565 recnum += ntasks 

566 # log.debug("other_values={}".format(repr(other_values))) 

567 srchash = nlpdef.hash(text) 

568 

569 progrec = None 

570 if incremental: 

571 progrec = ifconfig.get_progress_record(pkval, pkstr) 

572 if progrec is not None: 

573 if progrec.srchash == srchash: 

574 log.debug("Record previously processed; skipping") 

575 continue 

576 else: 

577 log.debug("Record has changed") 

578 else: 

579 log.debug("Record is new") 

580 

581 processor_failure = False 

582 for processor in nlpdef.noncloud_processors: 

583 if incremental: 

584 processor.delete_dest_record( 

585 ifconfig, pkval, pkstr, commit=incremental 

586 ) 

587 try: 

588 processor.process(text, other_values) 

589 except TextProcessingFailed: 

590 processor_failure = True 

591 

592 # If at least one processor failed, don't tell the progress 

593 # database that this record has been handled. That means that if 

594 # the administrator fixes this problem, this record will be 

595 # re-processed. (There may be some stray output from other, 

596 # successful, processors, but that will be deleted before 

597 # reprocessing in a subsequent incremental update.) 

598 if processor_failure: 

599 log.error( 

600 f"At least one processor failed for this record " 

601 f"(srctable={ifconfig.srctable!r}, " 

602 f"pkfield={ifconfig.srcpkfield!r}, " 

603 f"pkval={pkval!r}, pkstr={pkstr!r}); " 

604 f"not marking it as processed." 

605 ) 

606 continue 

607 

608 # Make a note in the progress database that we've processed a 

609 # source record. 

610 truncated = other_values[TRUNCATED_FLAG] 

611 if not truncated or nlpdef.record_truncated_values: 

612 if progrec: # modifying an existing record 

613 progrec.whenprocessedutc = nlpdef.now 

614 progrec.srchash = srchash 

615 else: # creating a new record 

616 progrec = NlpRecord( 

617 # Quasi-key fields: 

618 srcdb=ifconfig.srcdb, 

619 srctable=ifconfig.srctable, 

620 srcpkval=pkval, 

621 srcpkstr=pkstr, 

622 srcfield=ifconfig.srcfield, 

623 nlpdef=nlpdef.name, 

624 # Other fields: 

625 srcpkfield=ifconfig.srcpkfield, 

626 whenprocessedutc=nlpdef.now, 

627 srchash=srchash, 

628 ) 

629 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD): 

630 session.add(progrec) 

631 

632 # In incremental mode, do we commit immediately, because other 

633 # processes may need this table promptly... ? 

634 

635 # force_commit = False # definitely wrong; crashes as below 

636 # force_commit = incremental 

637 force_commit = ntasks > 1 

638 

639 # - A single source record should not be processed by >1 CRATE 

640 # process. So in theory there should be no conflicts. 

641 # - However, databases can lock in various ways. Can we 

642 # guarantee it'll do something sensible? 

643 # - See also 

644 # https://en.wikipedia.org/wiki/Isolation_(database_systems) 

645 # http://skien.cc/blog/2014/02/06/sqlalchemy-and-race-conditions-follow-up/ # noqa: E501 

646 # http://docs.sqlalchemy.org/en/latest/core/connections.html?highlight=execution_options#sqlalchemy.engine.Connection.execution_options # noqa: E501 

647 # - However, empirically, setting this to False gives 

648 # "Transaction (Process ID xx) was deadlocked on lock 

649 # resources with another process and has been chosen as the 

650 # deadlock victim. Rerun the transaction." -- with a SELECT 

651 # query. 

652 # - SQL Server uses READ COMMITTED as the default isolation 

653 # level. 

654 # - https://technet.microsoft.com/en-us/library/jj856598(v=sql.110).aspx # noqa: E501 

655 

656 nlpdef.notify_transaction( 

657 session=session, 

658 n_rows=1, 

659 n_bytes=sys.getsizeof(progrec), # approx 

660 force_commit=force_commit, 

661 ) 

662 

663 nlpdef.commit_all() 

664 

665 

666# ============================================================================= 

667# Core functions: cloud NLP 

668# ============================================================================= 

669 

670 

671def process_cloud_nlp( 

672 crinfo: CloudRunInfo, 

673 incremental: bool = False, 

674 report_every: int = DEFAULT_REPORT_EVERY_NLP, 

675) -> None: 

676 """ 

677 Process text by sending it off to the cloud processors in queued mode. 

678 """ 

679 log.info(SEP + "NLP") 

680 nlpdef = crinfo.nlpdef 

681 filename = crinfo.cloudcfg.data_filename 

682 # Start with blank file 

683 open(filename, "w").close() 

684 # Use append so that, if there's a problem part-way through, we don't lose 

685 # all data 

686 with open(filename, "a") as request_data: 

687 for ifconfig in nlpdef.inputfieldconfigs: 

688 generated_text = ifconfig.gen_text() 

689 global_recnum = 0 # Global record number within this ifconfig 

690 sender = CloudRequestSender( 

691 text_generator=generated_text, 

692 crinfo=crinfo, 

693 ifconfig=ifconfig, 

694 incremental=incremental, 

695 report_every=report_every, 

696 ) 

697 

698 records_left = True 

699 while records_left: 

700 ( 

701 cloud_requests, 

702 records_left, 

703 global_recnum, 

704 ) = sender.send_requests(global_recnum) 

705 for cloud_request in cloud_requests: 

706 if cloud_request.queue_id: 

707 request_data.write( 

708 f"{ifconfig.name},{cloud_request.queue_id}\n" 

709 ) 

710 else: 

711 log.warning("Sent request does not contain queue_id.") 

712 

713 

714def retrieve_nlp_data(crinfo: CloudRunInfo, incremental: bool = False) -> None: 

715 """ 

716 Try to retrieve the data from the cloud processors. 

717 """ 

718 nlpdef = crinfo.nlpdef 

719 session = nlpdef.progressdb_session 

720 cloudcfg = crinfo.cloudcfg 

721 filename = cloudcfg.data_filename 

722 if not os.path.exists(filename): 

723 log.error( 

724 f"File {filename!r} does not exist. " 

725 f"Request may not have been sent." 

726 ) 

727 raise FileNotFoundError 

728 with open(filename, "r") as request_data: 

729 reqdata = request_data.readlines() 

730 i = 1 # number of requests 

731 cookies = None # type: Optional[CookieJar] 

732 # with open(filename, 'w') as request_data: 

733 remaining_data = [] # type: List[str] 

734 ifconfig_cache = {} # type: Dict[str, InputFieldConfig] 

735 all_ready = True # not necessarily true, but need for later 

736 count = 0 # count of records before a write to the database 

737 uncommitted_data = False 

738 for line in reqdata: 

739 # Are there are records (whether ready or not) associated with 

740 # the queue_id 

741 records_exist = False 

742 if_section, queue_id = line.strip().split(",") 

743 if if_section in ifconfig_cache: 

744 ifconfig = ifconfig_cache[if_section] 

745 else: 

746 ifconfig = InputFieldConfig( 

747 nlpdef=nlpdef, cfg_input_name=if_section 

748 ) 

749 ifconfig_cache[if_section] = ifconfig 

750 seen_srchashs = [] # type: List[str] 

751 cloud_request = CloudRequestProcess(crinfo=crinfo) 

752 cloud_request.set_queue_id(queue_id) 

753 log.info(f"Atempting to retrieve data from request #{i} ...") 

754 i += 1 

755 ready = cloud_request.check_if_ready(cookies) 

756 if cloud_request.cookies: 

757 cookies = cloud_request.cookies 

758 

759 if not ready: 

760 # If results are not ready for this particular queue_id, put 

761 # back in file 

762 remaining_data.append(f"{if_section},{queue_id}\n") 

763 all_ready = False 

764 else: 

765 docresultlist = extract_nlprp_top_level_results( 

766 cloud_request.nlp_data 

767 ) 

768 for result in docresultlist: 

769 # There are records associated with the given queue_id 

770 records_exist = True 

771 uncommitted_data = True 

772 # 'metadata' is just 'other_values' from before 

773 _, pkval, pkstr, srchash = parse_nlprp_docresult_metadata( 

774 result 

775 ) 

776 progrec = None 

777 if incremental: 

778 progrec = ifconfig.get_progress_record(pkval, pkstr) 

779 crinfo.delete_dest_records( 

780 ifconfig, pkval, pkstr, commit=True 

781 ) 

782 elif srchash in seen_srchashs: 

783 progrec = ifconfig.get_progress_record(pkval, pkstr) 

784 seen_srchashs.append(srchash) 

785 # Make a note in the progress database that we've processed 

786 # a source record 

787 if progrec: # modifying an existing record 

788 progrec.whenprocessedutc = nlpdef.now 

789 progrec.srchash = srchash 

790 else: # creating a new record 

791 progrec = NlpRecord( 

792 # Quasi-key fields: 

793 srcdb=ifconfig.srcdb, 

794 srctable=ifconfig.srctable, 

795 srcpkval=pkval, 

796 srcpkstr=pkstr, 

797 srcfield=ifconfig.srcfield, 

798 nlpdef=nlpdef.name, 

799 # Other fields: 

800 srcpkfield=ifconfig.srcpkfield, 

801 whenprocessedutc=nlpdef.now, 

802 srchash=srchash, 

803 ) 

804 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD): 

805 session.add(progrec) 

806 count += 1 

807 if records_exist: 

808 log.info("Request ready.") 

809 cloud_request.process_all() 

810 if count >= cloudcfg.limit_before_commit: 

811 nlpdef.commit_all() 

812 count = 0 

813 uncommitted_data = False 

814 else: 

815 log.warning(f"No records found for queue_id {queue_id}.") 

816 if uncommitted_data: 

817 nlpdef.commit_all() 

818 if all_ready: 

819 os.remove(filename) 

820 else: 

821 # Put this here to avoid losing the queue_ids if something goes wrong 

822 with open(filename, "w") as request_data: 

823 for data in remaining_data: 

824 request_data.write(data) 

825 log.info( 

826 "There are still results to be processed. Re-run this " 

827 "command later to retrieve them." 

828 ) 

829 

830 

831# @do_cprofile 

832def process_cloud_now( 

833 crinfo: CloudRunInfo, 

834 incremental: bool = False, 

835 report_every: int = DEFAULT_REPORT_EVERY_NLP, 

836) -> None: 

837 """ 

838 Process text by sending it off to the cloud processors in non-queued mode. 

839 """ 

840 nlpdef = crinfo.nlpdef 

841 session = nlpdef.progressdb_session 

842 for ifconfig in nlpdef.inputfieldconfigs: 

843 global_recnum = 0 # Global record number within this ifconfig 

844 generated_text = ifconfig.gen_text() 

845 sender = CloudRequestSender( 

846 text_generator=generated_text, 

847 crinfo=crinfo, 

848 ifconfig=ifconfig, 

849 incremental=incremental, 

850 report_every=report_every, 

851 queue=False, 

852 ) 

853 

854 records_left = True 

855 while records_left: 

856 ( 

857 cloud_requests, 

858 records_left, 

859 global_recnum, 

860 ) = sender.send_requests(global_recnum) 

861 progrecs = set() # type: Set[NlpRecord] 

862 for cloud_request in cloud_requests: 

863 if cloud_request.request_failed: 

864 continue 

865 # (a) handle the actual data 

866 cloud_request.process_all() 

867 # (b) handle the progress records 

868 docresultlist = extract_nlprp_top_level_results( 

869 cloud_request.nlp_data 

870 ) 

871 for result in docresultlist: 

872 # 'metadata' is just 'other_values' from before 

873 _, pkval, pkstr, srchash = parse_nlprp_docresult_metadata( 

874 result 

875 ) 

876 progrec = None 

877 if incremental: 

878 # A word of explanation: to get here, the record must 

879 # have already been found worthy of updating. This is 

880 # now ensured by the CloudRequestSender, which will 

881 # skip relevant unchanged records. 

882 crinfo.delete_dest_records( 

883 ifconfig, pkval, pkstr, commit=True 

884 ) 

885 # Record progress in progress database 

886 progrec = ifconfig.get_progress_record(pkval, pkstr) 

887 # Check that we haven't already done the progrec for this 

888 # record to avoid clashes - it's possible as each processor 

889 # may contain results for each record and a set of results 

890 # is a list of processors and their results 

891 # 

892 # if srchash in seen_srchashs: 

893 # progrec = ifconfig.get_progress_record(pkval, pkstr) 

894 # 

895 # Make a note in the progress database that we've processed 

896 # a source record 

897 if progrec: # modifying an existing record 

898 progrec.whenprocessedutc = nlpdef.now 

899 progrec.srchash = srchash 

900 else: # creating a new record 

901 progrec = NlpRecord( 

902 # Quasi-key fields: 

903 srcdb=ifconfig.srcdb, 

904 srctable=ifconfig.srctable, 

905 srcpkval=pkval, 

906 srcpkstr=pkstr, 

907 srcfield=ifconfig.srcfield, 

908 nlpdef=nlpdef.name, 

909 # Other fields: 

910 srcpkfield=ifconfig.srcpkfield, 

911 whenprocessedutc=nlpdef.now, 

912 srchash=srchash, 

913 ) 

914 progrecs.add(progrec) 

915 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD): 

916 log.info("Adding to database...") 

917 session.bulk_save_objects(progrecs) 

918 session.commit() 

919 

920 nlpdef.commit_all() 

921 

922 

923def cancel_request(nlpdef: NlpDefinition, cancel_all: bool = False) -> None: 

924 """ 

925 Delete pending requests from the server's queue. 

926 """ 

927 nlpname = nlpdef.name 

928 cloudcfg = nlpdef.get_cloud_config_or_raise() 

929 cloud_request = CloudRequestQueueManagement(nlpdef=nlpdef) 

930 

931 if cancel_all: 

932 # Deleting all from queue! 

933 cloud_request.delete_all_from_queue() 

934 log.info("All cloud requests cancelled.") 

935 # Should the files be deleted in the program or is that dangerous? 

936 # ... OK now we guarantee that CRATE will create and use a specific 

937 # directory. 

938 purge(cloudcfg.req_data_dir, "*") 

939 return 

940 # Otherwise: 

941 

942 filename = cloudcfg.data_filename 

943 if not os.path.exists(filename): 

944 log.error( 

945 f"File {filename!r} does not exist. " 

946 f"Request may not have been sent." 

947 ) 

948 raise FileNotFoundError 

949 queue_ids = [] # type: List[str] 

950 with open(filename, "r") as request_data: 

951 reqdata = request_data.readlines() 

952 for line in reqdata: 

953 if_section, queue_id = line.strip().split(",") 

954 queue_ids.append(queue_id) 

955 cloud_request.delete_from_queue(queue_ids) 

956 # Remove the file with the request info 

957 os.remove(filename) 

958 log.info(f"Cloud request for nlp definition {nlpname} cancelled.") 

959 

960 

961def show_cloud_queue(nlpdef: NlpDefinition) -> None: 

962 """ 

963 Get list of the user's queued requests and print to screen. 

964 """ 

965 cloud_request = CloudRequestQueueManagement(nlpdef=nlpdef) 

966 queue = cloud_request.show_queue() 

967 if not queue: 

968 log.info("No requests in queue.") 

969 return 

970 writer = None 

971 for entry in queue: 

972 if writer is None: # first line 

973 writer = csv.DictWriter(sys.stdout, fieldnames=entry.keys()) 

974 writer.writeheader() 

975 writer.writerow(entry) 

976 

977 

978def print_cloud_processors( 

979 nlpdef: NlpDefinition, indent: int = 4, sort_keys: bool = True 

980) -> None: 

981 """ 

982 Print remote processor definitions to the screen. 

983 """ 

984 cloud_request = CloudRequestListProcessors(nlpdef=nlpdef) 

985 procs = cloud_request.get_remote_processors() 

986 asdictlist = [p.infodict for p in procs] 

987 text = json.dumps(asdictlist, indent=indent, sort_keys=sort_keys) 

988 print(text) 

989 

990 

991# ============================================================================= 

992# Database info 

993# ============================================================================= 

994 

995 

996def show_source_counts(nlpdef: NlpDefinition) -> None: 

997 """ 

998 Print (to stdout) the number of records in all source tables. 

999 

1000 Args: 

1001 nlpdef: 

1002 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

1003 """ 

1004 print("SOURCE TABLE RECORD COUNTS:") 

1005 counts = [] # type: List[Tuple[str, int]] 

1006 for ifconfig in nlpdef.inputfieldconfigs: 

1007 session = ifconfig.source_session 

1008 dbname = ifconfig.srcdb 

1009 tablename = ifconfig.srctable 

1010 n = count_star(session, tablename) 

1011 counts.append((f"{dbname}.{tablename}", n)) 

1012 print_record_counts(counts) 

1013 

1014 

1015def show_dest_counts(nlpdef: NlpDefinition) -> None: 

1016 """ 

1017 Print (to stdout) the number of records in all destination tables. 

1018 

1019 Args: 

1020 nlpdef: 

1021 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

1022 """ 

1023 print("DESTINATION TABLE RECORD COUNTS:") 

1024 counts = [] # type: List[Tuple[str, int]] 

1025 for processor in nlpdef.processors: 

1026 session = processor.dest_session 

1027 dbname = processor.dest_dbname 

1028 for tablename in processor.get_tablenames(): 

1029 n = count_star(session, tablename) 

1030 counts.append((f"DESTINATION: {dbname}.{tablename}", n)) 

1031 print_record_counts(counts) 

1032 

1033 

1034# ============================================================================= 

1035# NLP testing 

1036# ============================================================================= 

1037 

1038 

1039def test_nlp_stdin(nlpdef: NlpDefinition) -> None: 

1040 """ 

1041 Tests NLP processor(s) by sending stdin to it/them. 

1042 

1043 Args: 

1044 nlpdef: 

1045 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

1046 """ 

1047 processors = nlpdef.processors 

1048 processor_names = ", ".join( 

1049 p.friendly_name_with_section for p in processors 

1050 ) 

1051 log.info(f"Testing NLP processors: {processor_names}") 

1052 if nlpdef.uses_cloud_processors: 

1053 crinfo = CloudRunInfo( 

1054 nlpdef, debug_post_request=True, debug_post_response=True 

1055 ) 

1056 else: 

1057 crinfo = None 

1058 for text in gen_chunks_from_files( 

1059 filenames=["-"], 

1060 stdin_prompt="Please type lines of text to be processed. " 

1061 "End with a blank line.", 

1062 chunk_terminator_line="", 

1063 ): 

1064 if relevant_for_nlp(text): 

1065 log.info(f"INPUT: {text!r}") 

1066 result_found = False 

1067 for p in processors: # type: BaseNlpParser 

1068 

1069 if p.is_cloud_processor(): 

1070 # Cloud processor. 

1071 assert crinfo is not None 

1072 assert isinstance(p, Cloud) 

1073 procreq = CloudRequestProcess( 

1074 crinfo=crinfo, 

1075 nlpdef=nlpdef, 

1076 debug_post_request=True, 

1077 debug_post_response=True, 

1078 ) 

1079 procreq.add_text(text, metadata={}) 

1080 procreq.send_process_request(queue=False) 

1081 results = extract_nlprp_top_level_results(procreq.nlp_data) 

1082 result_found = True 

1083 # ... may not really be true, but we have something to show 

1084 formatted_results = json.dumps(results, indent=JSON_INDENT) 

1085 log.info(f"RESULTS:\n{formatted_results}") 

1086 

1087 else: 

1088 # Local (non-cloud) NLP processor. 

1089 assert isinstance(p, BaseNlpParser) 

1090 for tablename, valuedict in p.parse(text): 

1091 result_found = True 

1092 log.info(f"RESULT: {tablename}: {valuedict}") 

1093 

1094 if not result_found: 

1095 log.info("[No results.]") 

1096 else: 

1097 log.info("Ignoring irrelevant line.") 

1098 

1099 

1100# ============================================================================= 

1101# Main 

1102# ============================================================================= 

1103 

1104 

1105def inner_main() -> None: 

1106 """ 

1107 Indirect command-line entry point. See command-line help. 

1108 """ 

1109 version = f"Version {CRATE_VERSION} ({CRATE_VERSION_DATE})" 

1110 description = f"NLP manager. {version}. Created by Rudolf Cardinal." 

1111 

1112 # todo: better with a subcommand parser? 

1113 

1114 # noinspection PyTypeChecker 

1115 parser = argparse.ArgumentParser( 

1116 description=description, 

1117 formatter_class=RawDescriptionArgumentDefaultsRichHelpFormatter, 

1118 ) 

1119 

1120 config_options = parser.add_argument_group("Config options") 

1121 config_options.add_argument( 

1122 "--config", 

1123 help=f"Config file (overriding environment variable " 

1124 f"{NLP_CONFIG_ENV_VAR})", 

1125 ) 

1126 config_options.add_argument( 

1127 "--nlpdef", help="NLP definition name (from config file)" 

1128 ) 

1129 

1130 mode_group = config_options.add_mutually_exclusive_group() 

1131 mode_group.add_argument( 

1132 "-i", 

1133 "--incremental", 

1134 dest="incremental", 

1135 action="store_true", 

1136 help="Process only new/changed information, where possible", 

1137 default=True, 

1138 ) 

1139 mode_group.add_argument( 

1140 "-f", 

1141 "--full", 

1142 dest="incremental", 

1143 action="store_false", 

1144 help="Drop and remake everything", 

1145 default=False, 

1146 ) 

1147 

1148 config_options.add_argument( 

1149 "--dropremake", 

1150 action="store_true", 

1151 help="Drop/remake destination tables only", 

1152 ) 

1153 config_options.add_argument( 

1154 "--skipdelete", 

1155 dest="skipdelete", 

1156 action="store_true", 

1157 help="For incremental updates, skip deletion of rows " 

1158 "present in the destination but not the source", 

1159 ) 

1160 config_options.add_argument( 

1161 "--nlp", action="store_true", help="Perform NLP processing only" 

1162 ) 

1163 

1164 config_options.add_argument( 

1165 "--chunksize", 

1166 type=int, 

1167 default=DEFAULT_CHUNKSIZE, 

1168 help="Number of records copied in a chunk when copying PKs from one " 

1169 "database to another", 

1170 ) 

1171 

1172 reporting_options = parser.add_argument_group("Reporting options") 

1173 reporting_options.add_argument( 

1174 "--verbose", 

1175 "-v", 

1176 action="store_true", 

1177 help="Be verbose (use twice for extra verbosity)", 

1178 ) 

1179 reporting_options.add_argument( 

1180 "--report_every_fast", 

1181 type=int, 

1182 default=DEFAULT_REPORT_EVERY, 

1183 help="Report insert progress (for fast operations) every n rows in " 

1184 "verbose mode", 

1185 ) 

1186 reporting_options.add_argument( 

1187 "--report_every_nlp", 

1188 type=int, 

1189 default=DEFAULT_REPORT_EVERY_NLP, 

1190 help="Report progress for NLP every n rows in verbose mode", 

1191 ) 

1192 reporting_options.add_argument( 

1193 "--echo", action="store_true", help="Echo SQL" 

1194 ) 

1195 reporting_options.add_argument( 

1196 "--timing", action="store_true", help="Show detailed timing breakdown" 

1197 ) 

1198 

1199 multiproc_options = parser.add_argument_group("Multiprocessing options") 

1200 multiproc_options.add_argument( 

1201 "--process", 

1202 type=int, 

1203 default=0, 

1204 help="For multiprocess mode: specify process number", 

1205 ) 

1206 multiproc_options.add_argument( 

1207 "--nprocesses", 

1208 type=int, 

1209 default=1, 

1210 help="For multiprocess mode: specify total number of processes " 

1211 "(launched somehow, of which this is to be one)", 

1212 ) 

1213 multiproc_options.add_argument( 

1214 "--processcluster", default="", help="Process cluster name" 

1215 ) 

1216 

1217 info_actions = parser.add_argument_group("Info actions") 

1218 info_actions.add_argument("--version", action="version", version=version) 

1219 info_actions.add_argument( 

1220 "--democonfig", action="store_true", help="Print a demo config file" 

1221 ) 

1222 info_actions.add_argument( 

1223 "--listprocessors", 

1224 action="store_true", 

1225 help="Show all possible built-in NLP processor names", 

1226 ) 

1227 info_actions.add_argument( 

1228 "--describeprocessors", 

1229 action="store_true", 

1230 help="Show details of all built-in NLP processors", 

1231 ) 

1232 info_actions.add_argument( 

1233 "--test_nlp", 

1234 action="store_true", 

1235 help="Test the NLP processor(s) for the selected definition, " 

1236 "by sending text from stdin to them", 

1237 ) 

1238 info_actions.add_argument( 

1239 "--print_local_processors", 

1240 action="store_true", 

1241 help="For the chosen NLP definition, establish which local NLP " 

1242 "processors are involved (if any). Show detailed information " 

1243 "about these processors (as NLPRP JSON), then stop", 

1244 ) 

1245 info_actions.add_argument( 

1246 "--print_cloud_processors", 

1247 action="store_true", 

1248 help=f"For the chosen NLP definition, establish the relevant cloud " 

1249 f"server, if applicable (from the " 

1250 f"{NlpDefConfigKeys.CLOUD_CONFIG!r} parameter). Ask that remote " 

1251 f"server about its available NLP processors. Show detailed " 

1252 f"information about these remote processors (as NLPRP JSON), " 

1253 f"then stop", 

1254 ) 

1255 info_actions.add_argument( 

1256 "--count", 

1257 action="store_true", 

1258 help="Count records in source/destination databases, then stop", 

1259 ) 

1260 

1261 cloud_options = parser.add_argument_group("Cloud options") 

1262 cloud_options.add_argument( 

1263 "--cloud", 

1264 action="store_true", 

1265 help="Use cloud-based NLP processing tools. Queued mode by default.", 

1266 ) 

1267 cloud_options.add_argument( 

1268 "--immediate", 

1269 action="store_true", 

1270 help="To be used with 'cloud'. Process immediately.", 

1271 ) 

1272 cloud_options.add_argument( 

1273 "--retrieve", action="store_true", help="Retrieve NLP data from cloud" 

1274 ) 

1275 cloud_options.add_argument( 

1276 "--cancelrequest", 

1277 action="store_true", 

1278 help="Cancel pending requests for the nlpdef specified", 

1279 ) 

1280 cloud_options.add_argument( 

1281 "--cancelall", 

1282 action="store_true", 

1283 help="Cancel all pending cloud requests. WARNING: this option " 

1284 "cancels all pending requests - not just those for the nlp " 

1285 "definition specified", 

1286 ) 

1287 cloud_options.add_argument( 

1288 "--showqueue", 

1289 action="store_true", 

1290 help="Shows all pending cloud requests.", 

1291 ) 

1292 

1293 args = parser.parse_args() 

1294 

1295 # Validate args 

1296 if args.nprocesses < 1: 

1297 raise ValueError("--nprocesses must be >=1") 

1298 if args.process < 0 or args.process >= args.nprocesses: 

1299 raise ValueError( 

1300 "--process argument must be from 0 to (nprocesses - 1) inclusive" 

1301 ) 

1302 if args.config: 

1303 os.environ[NLP_CONFIG_ENV_VAR] = args.config 

1304 if args.cloud and args.retrieve: 

1305 raise ValueError("--cloud and --retrieve cannot be used together") 

1306 

1307 # Verbosity and logging 

1308 mynames = [] # type: List[str] 

1309 if args.processcluster: 

1310 mynames.append(args.processcluster) 

1311 if args.nprocesses > 1: 

1312 mynames.append(f"proc{args.process}") 

1313 loglevel = logging.DEBUG if args.verbose else logging.INFO 

1314 rootlogger = logging.getLogger() 

1315 configure_logger_for_colour(rootlogger, level=loglevel, extranames=mynames) 

1316 

1317 # ------------------------------------------------------------------------- 

1318 # Information/test options 

1319 # ------------------------------------------------------------------------- 

1320 

1321 # Demo config? 

1322 if args.democonfig: 

1323 print(demo_nlp_config()) 

1324 return 

1325 

1326 # List or describe processors? 

1327 if args.listprocessors: 

1328 print("\n".join(possible_processor_names_including_cloud())) 

1329 return 

1330 if args.describeprocessors: 

1331 print(possible_processor_table()) 

1332 return 

1333 

1334 # Otherwise, we need a valid NLP definition. 

1335 if args.nlpdef is None: 

1336 raise ValueError( 

1337 "Must specify nlpdef parameter (unless --democonfig, " 

1338 "--listprocessors, or --describeprocessors used)" 

1339 ) 

1340 

1341 everything = not any([args.dropremake, args.nlp]) 

1342 

1343 # Report args 

1344 log.debug(f"arguments: {args}") 

1345 

1346 # Load/validate config 

1347 nlpdef = NlpDefinition( 

1348 args.nlpdef, logtag="_".join(mynames).replace(" ", "_") 

1349 ) 

1350 nlpdef.set_echo(args.echo) 

1351 

1352 # Count only? 

1353 if args.count: 

1354 show_source_counts(nlpdef) 

1355 show_dest_counts(nlpdef) 

1356 return 

1357 

1358 # Show configured processor definitions only? 

1359 if args.print_local_processors: 

1360 print(nlpdef.nlprp_local_processors_json()) 

1361 return 

1362 if args.print_cloud_processors: 

1363 print_cloud_processors(nlpdef) 

1364 return 

1365 

1366 # Test NLP processor via stdin? 

1367 if args.test_nlp: 

1368 test_nlp_stdin(nlpdef) 

1369 return 

1370 

1371 # ------------------------------------------------------------------------- 

1372 # Cloud queue manipulation options 

1373 # ------------------------------------------------------------------------- 

1374 

1375 # Delete from queue - do this before Drop/Remake and return so we don't 

1376 # drop all the tables just to cancel the request 

1377 # Same for 'showqueue'. All of these need config as they require url etc. 

1378 if args.cancelrequest: 

1379 cancel_request(nlpdef) 

1380 return 

1381 if args.cancelall: 

1382 cancel_request(nlpdef, cancel_all=args.cancelall) 

1383 return 

1384 if args.showqueue: 

1385 show_cloud_queue(nlpdef) 

1386 return 

1387 

1388 # ------------------------------------------------------------------------- 

1389 # Main NLP options 

1390 # ------------------------------------------------------------------------- 

1391 

1392 crinfo = None # type: Optional[CloudRunInfo] # for type checker! 

1393 if args.cloud or args.retrieve: 

1394 # Set appropriate things for cloud - need to do this before 

1395 # drop_remake, but after cancel_request or show_cloud_queue to avoid 

1396 # unecessary requests 

1397 cloudcfg = nlpdef.get_cloud_config_or_raise() 

1398 CloudRequest.set_rate_limit(cloudcfg.rate_limit_hz) 

1399 crinfo = CloudRunInfo(nlpdef) 

1400 

1401 log.info(f"Starting: incremental={args.incremental}") 

1402 start = get_now_utc_pendulum() 

1403 timer.set_timing(args.timing, reset=True) 

1404 

1405 # 1. Drop/remake tables. Single-tasking only. 

1406 with MultiTimerContext(timer, TIMING_DROP_REMAKE): 

1407 if args.dropremake or everything: 

1408 drop_remake( 

1409 nlpdef, 

1410 incremental=args.incremental, 

1411 skipdelete=args.skipdelete, 

1412 report_every=args.report_every_fast, 

1413 chunksize=args.chunksize, 

1414 ) 

1415 

1416 # From here, in a multiprocessing environment, trap any errors simply so 

1417 # we can report the process number clearly. 

1418 

1419 # 2. NLP 

1420 if args.nlp or everything: 

1421 if args.cloud: 

1422 if args.immediate: 

1423 process_cloud_now( 

1424 crinfo, 

1425 incremental=args.incremental, 

1426 report_every=args.report_every_nlp, 

1427 ) 

1428 else: 

1429 process_cloud_nlp( 

1430 crinfo, 

1431 incremental=args.incremental, 

1432 report_every=args.report_every_nlp, 

1433 ) 

1434 elif args.retrieve: 

1435 retrieve_nlp_data(crinfo, incremental=args.incremental) 

1436 else: 

1437 process_nlp( 

1438 nlpdef, 

1439 incremental=args.incremental, 

1440 report_every=args.report_every_nlp, 

1441 tasknum=args.process, 

1442 ntasks=args.nprocesses, 

1443 ) 

1444 

1445 log.info("Finished") 

1446 end = get_now_utc_pendulum() 

1447 time_taken = end - start 

1448 log.info(f"Time taken: {time_taken.total_seconds():.3f} seconds") 

1449 

1450 if args.timing: 

1451 timer.report() 

1452 

1453 

1454def main() -> None: 

1455 """ 

1456 Command-line entry point. 

1457 """ 

1458 call_main_with_exception_reporting(inner_main) 

1459 

1460 

1461# ============================================================================= 

1462# Command-line entry point 

1463# ============================================================================= 

1464 

1465if __name__ == "__main__": 

1466 main()