Coverage for nlp_manager/nlp_manager.py: 20%
517 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1#!/usr/bin/env python
3"""
4crate_anon/nlp_manager/nlp_manager.py
6===============================================================================
8 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
9 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
11 This file is part of CRATE.
13 CRATE is free software: you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation, either version 3 of the License, or
16 (at your option) any later version.
18 CRATE is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
23 You should have received a copy of the GNU General Public License
24 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
26===============================================================================
28**Manage natural-language processing (NLP) via internal and external tools.**
30Speed testing:
32- 8 processes, extracting person, location from a mostly text database
33- commit off during full (non-incremental) processing (much faster)
34- needs lots of RAM; e.g. Java subprocess uses 1.4 Gb per process as an
35 average (rises from ~250Mb to ~1.4Gb and falls; steady rise means memory
36 leak!); tested on a 16 Gb machine. See also the ``max_external_prog_uses``
37 parameter.
39.. code-block:: python
41 from __future__ import division
42 test_size_mb = 1887
43 n_person_tags_found =
44 n_locations_tags_found =
45 time_s = 10333 # 10333 s for main bit; 10465 including indexing; is 2.9 hours
46 speed_mb_per_s = test_size_mb / time_s
48... gives 0.18 Mb/s, and note that's 1.9 Gb of *text*, not of attachments.
50- With incremental option, and nothing to do: same run took 18 s.
51- During the main run, snapshot CPU usage:
53 .. code-block:: none
55 java about 81% across all processes, everything else close to 0
56 (using about 12 Gb RAM total)
57 ... or 75-85% * 8 [from top]
58 mysqld about 18% [from top]
59 nlp_manager.py about 4-5% * 8 [from top]
61""" # noqa: E501
63# =============================================================================
64# Imports
65# =============================================================================
67import argparse
68import csv
69import json
70import logging
71import os
72import sys
73from typing import (
74 Any,
75 Dict,
76 List,
77 Optional,
78 Set,
79 Tuple,
80 TYPE_CHECKING,
81)
83from cardinal_pythonlib.datetimefunc import get_now_utc_pendulum
84from cardinal_pythonlib.fileops import purge
85from cardinal_pythonlib.logs import configure_logger_for_colour
86from cardinal_pythonlib.sqlalchemy.core_query import count_star
87from cardinal_pythonlib.timing import MultiTimerContext, timer
88from sqlalchemy.engine.base import Engine
89from sqlalchemy.orm.session import Session
90from sqlalchemy.schema import Column, Index, MetaData, Table
91from sqlalchemy.types import BigInteger, String
93from crate_anon.anonymise.constants import (
94 DEFAULT_CHUNKSIZE,
95 DEFAULT_REPORT_EVERY,
96 TABLE_KWARGS,
97 SEP,
98)
99from crate_anon.anonymise.dbholder import DatabaseHolder
100from crate_anon.common.argparse_assist import (
101 RawDescriptionArgumentDefaultsRichHelpFormatter,
102)
103from crate_anon.common.constants import JSON_INDENT
104from crate_anon.common.exceptions import call_main_with_exception_reporting
105from crate_anon.common.formatting import print_record_counts
106from crate_anon.common.inputfunc import gen_chunks_from_files
107from crate_anon.common.stringfunc import relevant_for_nlp
108from crate_anon.nlp_manager.all_processors import (
109 possible_processor_names_including_cloud,
110 possible_processor_table,
111)
112from crate_anon.nlp_manager.base_nlp_parser import (
113 BaseNlpParser,
114 TextProcessingFailed,
115)
116from crate_anon.nlp_manager.cloud_parser import Cloud
117from crate_anon.nlp_manager.cloud_request import (
118 CloudRequest,
119 CloudRequestListProcessors,
120 CloudRequestProcess,
121 CloudRequestQueueManagement,
122 extract_nlprp_top_level_results,
123 parse_nlprp_docresult_metadata,
124)
125from crate_anon.nlp_manager.cloud_request_sender import CloudRequestSender
126from crate_anon.nlp_manager.cloud_run_info import CloudRunInfo
127from crate_anon.nlp_manager.constants import (
128 DEFAULT_REPORT_EVERY_NLP,
129 FN_SRCDB,
130 FN_SRCFIELD,
131 FN_SRCPKFIELD,
132 FN_SRCPKSTR,
133 FN_SRCPKVAL,
134 FN_SRCTABLE,
135 MAX_STRING_PK_LENGTH,
136 NLP_CONFIG_ENV_VAR,
137 NlpDefConfigKeys,
138 TRUNCATED_FLAG,
139)
140from crate_anon.nlp_manager.input_field_config import InputFieldConfig
141from crate_anon.nlp_manager.models import NlpRecord
142from crate_anon.nlp_manager.nlp_definition import (
143 NlpDefinition,
144 demo_nlp_config,
145)
146from crate_anon.version import CRATE_VERSION, CRATE_VERSION_DATE
148# from crate_anon.common.profiling import do_cprofile
150if TYPE_CHECKING:
151 from http.cookiejar import CookieJar
153log = logging.getLogger(__name__)
155TIMING_DROP_REMAKE = "drop_remake"
156TIMING_DELETE_WHERE_NO_SOURCE = "delete_where_no_source"
157TIMING_PROGRESS_DB_ADD = "progress_db_add"
158TIMING_PROGREC_TOTAL = "progrec_total"
159TIMING_PROGREC_CREATE = "create_progrec"
162# =============================================================================
163# Simple information classes
164# =============================================================================
167class DbInfo:
168 """
169 Simple object carrying information about a database.
170 Used by :func:`delete_where_no_source`.
171 """
173 def __init__(
174 self,
175 session: Session = None,
176 engine: Engine = None,
177 metadata: MetaData = None,
178 db: DatabaseHolder = None,
179 temptable: Table = None,
180 ) -> None:
181 self.session = session
182 self.engine = engine
183 self.metadata = metadata
184 self.db = db
185 self.temptable = temptable
188# =============================================================================
189# Database operations
190# =============================================================================
193def delete_where_no_source(
194 nlpdef: NlpDefinition,
195 ifconfig: InputFieldConfig,
196 report_every: int = DEFAULT_REPORT_EVERY,
197 chunksize: int = DEFAULT_CHUNKSIZE,
198) -> None:
199 """
200 Delete destination records where source records no longer exist.
202 Args:
203 nlpdef:
204 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
205 ifconfig:
206 `crate_anon.nlp_manager.input_field_config.InputFieldConfig`
207 report_every:
208 report to the log every *n* source rows
209 chunksize:
210 insert into the SQLAlchemy session every *n* records
212 Development thoughts:
214 - Can't do this in a single SQL command, since the engine can't necessarily
215 see both databases.
216 - Can't use a single temporary table, since the progress database isn't
217 necessarily the same as any of the destination database(s).
218 - Can't do this in a multiprocess way, because we're trying to do a
219 ``DELETE WHERE NOT IN``.
220 - So my first attempt was: fetch all source PKs (which, by definition, do
221 exist), stash them in memory, and do a ``DELETE WHERE NOT IN`` based on
222 those specified values (or, if there are no PKs in the source, delete
223 everything from the destination).
225 Problems with that:
227 - This is IMPERFECT if we have string source PKs and there are hash
228 collisions (e.g. PKs for records X and Y both hash to the same thing;
229 record X is deleted; then its processed version might not be).
230 - With massive tables, we might run out of memory or (much more likely)
231 SQL parameter slots. -- This is now happening; error looks like:
232 pyodbc.ProgrammingError: ('The SQL contains 30807 parameter parkers, but
233 2717783 parameters were supplied', 'HY000')
235 A better way might be:
237 - for each table, make a temporary table in the same database
238 - populate that table with (source PK integer/hash, source PK string) pairs
239 - delete where pairs don't match -- is that portable SQL?
240 https://stackoverflow.com/questions/7356108/sql-query-for-deleting-rows-with-not-in-using-2-columns # noqa: E501
242 More efficient would be to make one table per destination database.
244 On the "delete where multiple fields don't match":
246 - Single field syntax is
248 .. code-block:: sql
250 DELETE FROM a WHERE a1 NOT IN (SELECT b1 FROM b)
252 - Multiple field syntax is
254 .. code-block:: sql
256 DELETE FROM a WHERE NOT EXISTS (
257 SELECT 1 FROM b
258 WHERE a.a1 = b.b1
259 AND a.a2 = b.b2
260 )
262 - In SQLAlchemy, :func:`exists`:
264 - https://stackoverflow.com/questions/14600619
265 - https://docs.sqlalchemy.org/en/latest/core/selectable.html
267 - Furthermore, in SQL ``NULL = NULL`` is false (it's null), and ``NULL <>
268 NULL`` is also false (it's null), so we have to do an explicit null
269 check. You do that with ``field == None``. See
270 https://stackoverflow.com/questions/21668606. We're aiming, therefore,
271 for:
273 .. code-block:: sql
275 DELETE FROM a WHERE NOT EXISTS (
276 SELECT 1 FROM b
277 WHERE a.a1 = b.b1
278 AND (
279 a.a2 = b.b2
280 OR (a.a2 IS NULL AND b.b2 IS NULL)
281 )
282 )
284 """
286 # -------------------------------------------------------------------------
287 # Sub-functions
288 # -------------------------------------------------------------------------
290 def insert(records_: List[Dict[str, Any]]) -> None:
291 n_rows = len(records_)
292 log.debug(f"... inserting {n_rows} records")
293 for db in databases:
294 db.session.execute(db.temptable.insert(), records_)
295 nlpdef.notify_transaction(
296 db.session, n_rows=n_rows, n_bytes=sys.getsizeof(records_)
297 )
299 def commit() -> None:
300 for db in databases:
301 nlpdef.commit(db.session)
303 # -------------------------------------------------------------------------
304 # Main code
305 # -------------------------------------------------------------------------
306 # Use info log level, otherwise it looks like our code hangs with very
307 # large databases.
309 log.info(
310 f"delete_where_no_source: examining source table "
311 f"{ifconfig.srcdb}.{ifconfig.srctable}; MAY BE SLOW"
312 )
314 # Start our list with the progress database
315 databases = [
316 DbInfo(
317 session=nlpdef.progressdb_session,
318 engine=nlpdef.progressdb_engine,
319 metadata=nlpdef.progressdb_metadata,
320 db=nlpdef.progdb,
321 temptable=None,
322 )
323 ]
325 # Add the processors' destination databases
326 for processor in nlpdef.processors: # of type TableMaker
327 if isinstance(processor, Cloud) and not processor.available_remotely:
328 continue
329 session = processor.dest_session
330 if any(x.session == session for x in databases):
331 continue # already exists
332 databases.append(
333 DbInfo(
334 session=session,
335 engine=processor.dest_engine,
336 metadata=processor.dest_metadata,
337 db=processor.destdb,
338 )
339 )
341 # Make a temporary table in each database (note: the Table objects become
342 # affiliated to their engine, I think, so make separate ones for each).
343 log.info(f"... using {len(databases)} destination database(s)")
344 log.info("... dropping (if exists) and creating temporary table(s)")
345 for database in databases:
346 temptable = Table(
347 nlpdef.temporary_tablename,
348 database.metadata,
349 Column(FN_SRCPKVAL, BigInteger), # not PK, as may be a hash
350 Column(FN_SRCPKSTR, String(MAX_STRING_PK_LENGTH)),
351 **TABLE_KWARGS,
352 )
353 temptable.drop(database.engine, checkfirst=True)
354 temptable.create(database.engine, checkfirst=True)
355 database.temptable = temptable
357 # Insert PKs into temporary tables
359 n = count_star(ifconfig.source_session, ifconfig.srctable)
360 log.info(
361 f"... populating temporary table(s): {n} records to go; "
362 f"working in chunks of {chunksize}"
363 )
364 i = 0
365 records = [] # type: List[Dict[str, Any]]
366 for pkval, pkstr in ifconfig.gen_src_pks():
367 i += 1
368 if report_every and i % report_every == 0:
369 log.info(f"... src row# {i} / {n}")
370 records.append({FN_SRCPKVAL: pkval, FN_SRCPKSTR: pkstr})
371 if i % chunksize == 0:
372 insert(records)
373 records = [] # type: List[Dict[str, Any]]
374 if records: # remainder
375 insert(records)
377 # Commit
378 commit()
380 # Index, for speed
381 log.info("... creating index(es) on temporary table(s)")
382 for database in databases:
383 temptable = database.temptable
384 index = Index("_temptable_idx", temptable.columns[FN_SRCPKVAL])
385 index.create(database.engine)
387 # DELETE FROM desttable WHERE destpk NOT IN (SELECT srcpk FROM temptable)
388 log.info("... deleting from progress/destination DBs where appropriate")
390 # Delete from progress database
391 prog_db = databases[0]
392 prog_temptable = prog_db.temptable
393 ifconfig.delete_progress_records_where_srcpk_not(prog_temptable)
395 # Delete from others
396 for processor in nlpdef.processors:
397 if isinstance(processor, Cloud) and not processor.available_remotely:
398 continue
399 database = [
400 x for x in databases if x.session == processor.dest_session
401 ][0]
402 temptable = database.temptable
403 processor.delete_where_srcpk_not(ifconfig, temptable)
405 # Drop temporary tables
406 log.info("... dropping temporary table(s)")
407 for database in databases:
408 database.temptable.drop(database.engine, checkfirst=True)
410 # Commit
411 commit()
413 # Update metadata to reflect the fact that the temporary tables have been
414 # dropped
415 for database in databases:
416 database.db.update_metadata()
419def drop_remake(
420 nlpdef: NlpDefinition,
421 incremental: bool = False,
422 skipdelete: bool = False,
423 report_every: int = DEFAULT_REPORT_EVERY,
424 chunksize: int = DEFAULT_CHUNKSIZE,
425) -> None:
426 """
427 Drop output tables and recreate them.
429 Args:
430 nlpdef:
431 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
432 incremental: incremental processing mode?
433 skipdelete:
434 For incremental updates, skip deletion of rows present in the
435 destination but not the source
436 report_every: report to the log every *n* source rows
437 chunksize: insert into the SQLAlchemy session every *n* records
438 """
439 # Not parallel.
440 # -------------------------------------------------------------------------
441 # 1. Progress database
442 # -------------------------------------------------------------------------
443 progengine = nlpdef.progressdb_engine
444 if not incremental:
445 log.debug("Dropping progress tables")
446 # noinspection PyUnresolvedReferences
447 NlpRecord.__table__.drop(progengine, checkfirst=True)
448 log.info("Creating progress table (with index)")
449 # noinspection PyUnresolvedReferences
450 NlpRecord.__table__.create(progengine, checkfirst=True)
452 # -------------------------------------------------------------------------
453 # 2. Output database(s)
454 # -------------------------------------------------------------------------
456 pretty_names = [] # type: List[str]
457 for processor in nlpdef.processors:
458 if isinstance(processor, Cloud) and not processor.available_remotely:
459 continue
460 new_pretty_names = processor.make_tables(drop_first=not incremental)
461 for npn in new_pretty_names:
462 if npn in pretty_names:
463 log.warning(
464 f"An NLP processor has tried to re-make a table "
465 f"made by one of its colleagues: {npn}"
466 )
467 pretty_names.extend(new_pretty_names)
469 # -------------------------------------------------------------------------
470 # 3. Delete WHERE NOT IN for incremental
471 # -------------------------------------------------------------------------
472 for ifconfig in nlpdef.inputfieldconfigs:
473 with MultiTimerContext(timer, TIMING_DELETE_WHERE_NO_SOURCE):
474 if incremental:
475 if not skipdelete:
476 delete_where_no_source(
477 nlpdef,
478 ifconfig,
479 report_every=report_every,
480 chunksize=chunksize,
481 )
482 else: # full
483 ifconfig.delete_all_progress_records()
485 # -------------------------------------------------------------------------
486 # 4. Overall commit (superfluous)
487 # -------------------------------------------------------------------------
488 nlpdef.commit_all()
491# =============================================================================
492# Core functions: local NLP
493# =============================================================================
496def process_nlp(
497 nlpdef: NlpDefinition,
498 incremental: bool = False,
499 report_every: int = DEFAULT_REPORT_EVERY_NLP,
500 tasknum: int = 0,
501 ntasks: int = 1,
502) -> None:
503 """
504 Main NLP processing function. Fetch text, send it to the NLP processor(s),
505 storing the results, and make a note in the progress database.
507 Args:
508 nlpdef:
509 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
510 incremental:
511 Incremental processing (skip previously processed records).
512 report_every:
513 Report to the log every *n* source rows.
514 tasknum:
515 Which task number am I?
516 ntasks:
517 How many tasks are there in total?
518 """
519 log.info(SEP + "NLP")
520 session = nlpdef.progressdb_session
521 if not nlpdef.noncloud_processors:
522 errmsg = (
523 f"Can't use NLP definition {nlpdef.name!r} as it has no "
524 f"local processors (e.g. only has cloud processors). Specify the "
525 f"cloud option to process via the cloud."
526 )
527 log.critical(errmsg)
528 raise ValueError(errmsg)
530 for ifconfig in nlpdef.inputfieldconfigs:
531 i = 0 # record count within this process
532 recnum = tasknum # record count overall
533 totalcount = ifconfig.get_count() # total number of records in table
534 for text, other_values in ifconfig.gen_text(
535 tasknum=tasknum, ntasks=ntasks
536 ):
537 log.debug(len(text))
538 i += 1
539 pkval = other_values[FN_SRCPKVAL]
540 pkstr = other_values[FN_SRCPKSTR]
541 if report_every and i % report_every == 0:
542 log.info(
543 "Processing {db}.{t}.{c}, PK: {pkf}={pkv} "
544 "({overall}record {approx}{recnum}/{totalcount})"
545 "{thisproc}".format(
546 db=other_values[FN_SRCDB],
547 t=other_values[FN_SRCTABLE],
548 c=other_values[FN_SRCFIELD],
549 pkf=other_values[FN_SRCPKFIELD],
550 pkv=pkstr if pkstr else pkval,
551 overall="overall " if ntasks > 1 else "",
552 approx="~" if pkstr and ntasks > 1 else "",
553 # ... string hashing means approx. distribution
554 recnum=recnum + 1,
555 totalcount=totalcount,
556 thisproc=(
557 " ({i}/~{proccount} this process)".format(
558 i=i, proccount=totalcount // ntasks
559 )
560 if ntasks > 1
561 else ""
562 ),
563 )
564 )
565 recnum += ntasks
566 # log.debug("other_values={}".format(repr(other_values)))
567 srchash = nlpdef.hash(text)
569 progrec = None
570 if incremental:
571 progrec = ifconfig.get_progress_record(pkval, pkstr)
572 if progrec is not None:
573 if progrec.srchash == srchash:
574 log.debug("Record previously processed; skipping")
575 continue
576 else:
577 log.debug("Record has changed")
578 else:
579 log.debug("Record is new")
581 processor_failure = False
582 for processor in nlpdef.noncloud_processors:
583 if incremental:
584 processor.delete_dest_record(
585 ifconfig, pkval, pkstr, commit=incremental
586 )
587 try:
588 processor.process(text, other_values)
589 except TextProcessingFailed:
590 processor_failure = True
592 # If at least one processor failed, don't tell the progress
593 # database that this record has been handled. That means that if
594 # the administrator fixes this problem, this record will be
595 # re-processed. (There may be some stray output from other,
596 # successful, processors, but that will be deleted before
597 # reprocessing in a subsequent incremental update.)
598 if processor_failure:
599 log.error(
600 f"At least one processor failed for this record "
601 f"(srctable={ifconfig.srctable!r}, "
602 f"pkfield={ifconfig.srcpkfield!r}, "
603 f"pkval={pkval!r}, pkstr={pkstr!r}); "
604 f"not marking it as processed."
605 )
606 continue
608 # Make a note in the progress database that we've processed a
609 # source record.
610 truncated = other_values[TRUNCATED_FLAG]
611 if not truncated or nlpdef.record_truncated_values:
612 if progrec: # modifying an existing record
613 progrec.whenprocessedutc = nlpdef.now
614 progrec.srchash = srchash
615 else: # creating a new record
616 progrec = NlpRecord(
617 # Quasi-key fields:
618 srcdb=ifconfig.srcdb,
619 srctable=ifconfig.srctable,
620 srcpkval=pkval,
621 srcpkstr=pkstr,
622 srcfield=ifconfig.srcfield,
623 nlpdef=nlpdef.name,
624 # Other fields:
625 srcpkfield=ifconfig.srcpkfield,
626 whenprocessedutc=nlpdef.now,
627 srchash=srchash,
628 )
629 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD):
630 session.add(progrec)
632 # In incremental mode, do we commit immediately, because other
633 # processes may need this table promptly... ?
635 # force_commit = False # definitely wrong; crashes as below
636 # force_commit = incremental
637 force_commit = ntasks > 1
639 # - A single source record should not be processed by >1 CRATE
640 # process. So in theory there should be no conflicts.
641 # - However, databases can lock in various ways. Can we
642 # guarantee it'll do something sensible?
643 # - See also
644 # https://en.wikipedia.org/wiki/Isolation_(database_systems)
645 # http://skien.cc/blog/2014/02/06/sqlalchemy-and-race-conditions-follow-up/ # noqa: E501
646 # http://docs.sqlalchemy.org/en/latest/core/connections.html?highlight=execution_options#sqlalchemy.engine.Connection.execution_options # noqa: E501
647 # - However, empirically, setting this to False gives
648 # "Transaction (Process ID xx) was deadlocked on lock
649 # resources with another process and has been chosen as the
650 # deadlock victim. Rerun the transaction." -- with a SELECT
651 # query.
652 # - SQL Server uses READ COMMITTED as the default isolation
653 # level.
654 # - https://technet.microsoft.com/en-us/library/jj856598(v=sql.110).aspx # noqa: E501
656 nlpdef.notify_transaction(
657 session=session,
658 n_rows=1,
659 n_bytes=sys.getsizeof(progrec), # approx
660 force_commit=force_commit,
661 )
663 nlpdef.commit_all()
666# =============================================================================
667# Core functions: cloud NLP
668# =============================================================================
671def process_cloud_nlp(
672 crinfo: CloudRunInfo,
673 incremental: bool = False,
674 report_every: int = DEFAULT_REPORT_EVERY_NLP,
675) -> None:
676 """
677 Process text by sending it off to the cloud processors in queued mode.
678 """
679 log.info(SEP + "NLP")
680 nlpdef = crinfo.nlpdef
681 filename = crinfo.cloudcfg.data_filename
682 # Start with blank file
683 open(filename, "w").close()
684 # Use append so that, if there's a problem part-way through, we don't lose
685 # all data
686 with open(filename, "a") as request_data:
687 for ifconfig in nlpdef.inputfieldconfigs:
688 generated_text = ifconfig.gen_text()
689 global_recnum = 0 # Global record number within this ifconfig
690 sender = CloudRequestSender(
691 text_generator=generated_text,
692 crinfo=crinfo,
693 ifconfig=ifconfig,
694 incremental=incremental,
695 report_every=report_every,
696 )
698 records_left = True
699 while records_left:
700 (
701 cloud_requests,
702 records_left,
703 global_recnum,
704 ) = sender.send_requests(global_recnum)
705 for cloud_request in cloud_requests:
706 if cloud_request.queue_id:
707 request_data.write(
708 f"{ifconfig.name},{cloud_request.queue_id}\n"
709 )
710 else:
711 log.warning("Sent request does not contain queue_id.")
714def retrieve_nlp_data(crinfo: CloudRunInfo, incremental: bool = False) -> None:
715 """
716 Try to retrieve the data from the cloud processors.
717 """
718 nlpdef = crinfo.nlpdef
719 session = nlpdef.progressdb_session
720 cloudcfg = crinfo.cloudcfg
721 filename = cloudcfg.data_filename
722 if not os.path.exists(filename):
723 log.error(
724 f"File {filename!r} does not exist. "
725 f"Request may not have been sent."
726 )
727 raise FileNotFoundError
728 with open(filename, "r") as request_data:
729 reqdata = request_data.readlines()
730 i = 1 # number of requests
731 cookies = None # type: Optional[CookieJar]
732 # with open(filename, 'w') as request_data:
733 remaining_data = [] # type: List[str]
734 ifconfig_cache = {} # type: Dict[str, InputFieldConfig]
735 all_ready = True # not necessarily true, but need for later
736 count = 0 # count of records before a write to the database
737 uncommitted_data = False
738 for line in reqdata:
739 # Are there are records (whether ready or not) associated with
740 # the queue_id
741 records_exist = False
742 if_section, queue_id = line.strip().split(",")
743 if if_section in ifconfig_cache:
744 ifconfig = ifconfig_cache[if_section]
745 else:
746 ifconfig = InputFieldConfig(
747 nlpdef=nlpdef, cfg_input_name=if_section
748 )
749 ifconfig_cache[if_section] = ifconfig
750 seen_srchashs = [] # type: List[str]
751 cloud_request = CloudRequestProcess(crinfo=crinfo)
752 cloud_request.set_queue_id(queue_id)
753 log.info(f"Atempting to retrieve data from request #{i} ...")
754 i += 1
755 ready = cloud_request.check_if_ready(cookies)
756 if cloud_request.cookies:
757 cookies = cloud_request.cookies
759 if not ready:
760 # If results are not ready for this particular queue_id, put
761 # back in file
762 remaining_data.append(f"{if_section},{queue_id}\n")
763 all_ready = False
764 else:
765 docresultlist = extract_nlprp_top_level_results(
766 cloud_request.nlp_data
767 )
768 for result in docresultlist:
769 # There are records associated with the given queue_id
770 records_exist = True
771 uncommitted_data = True
772 # 'metadata' is just 'other_values' from before
773 _, pkval, pkstr, srchash = parse_nlprp_docresult_metadata(
774 result
775 )
776 progrec = None
777 if incremental:
778 progrec = ifconfig.get_progress_record(pkval, pkstr)
779 crinfo.delete_dest_records(
780 ifconfig, pkval, pkstr, commit=True
781 )
782 elif srchash in seen_srchashs:
783 progrec = ifconfig.get_progress_record(pkval, pkstr)
784 seen_srchashs.append(srchash)
785 # Make a note in the progress database that we've processed
786 # a source record
787 if progrec: # modifying an existing record
788 progrec.whenprocessedutc = nlpdef.now
789 progrec.srchash = srchash
790 else: # creating a new record
791 progrec = NlpRecord(
792 # Quasi-key fields:
793 srcdb=ifconfig.srcdb,
794 srctable=ifconfig.srctable,
795 srcpkval=pkval,
796 srcpkstr=pkstr,
797 srcfield=ifconfig.srcfield,
798 nlpdef=nlpdef.name,
799 # Other fields:
800 srcpkfield=ifconfig.srcpkfield,
801 whenprocessedutc=nlpdef.now,
802 srchash=srchash,
803 )
804 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD):
805 session.add(progrec)
806 count += 1
807 if records_exist:
808 log.info("Request ready.")
809 cloud_request.process_all()
810 if count >= cloudcfg.limit_before_commit:
811 nlpdef.commit_all()
812 count = 0
813 uncommitted_data = False
814 else:
815 log.warning(f"No records found for queue_id {queue_id}.")
816 if uncommitted_data:
817 nlpdef.commit_all()
818 if all_ready:
819 os.remove(filename)
820 else:
821 # Put this here to avoid losing the queue_ids if something goes wrong
822 with open(filename, "w") as request_data:
823 for data in remaining_data:
824 request_data.write(data)
825 log.info(
826 "There are still results to be processed. Re-run this "
827 "command later to retrieve them."
828 )
831# @do_cprofile
832def process_cloud_now(
833 crinfo: CloudRunInfo,
834 incremental: bool = False,
835 report_every: int = DEFAULT_REPORT_EVERY_NLP,
836) -> None:
837 """
838 Process text by sending it off to the cloud processors in non-queued mode.
839 """
840 nlpdef = crinfo.nlpdef
841 session = nlpdef.progressdb_session
842 for ifconfig in nlpdef.inputfieldconfigs:
843 global_recnum = 0 # Global record number within this ifconfig
844 generated_text = ifconfig.gen_text()
845 sender = CloudRequestSender(
846 text_generator=generated_text,
847 crinfo=crinfo,
848 ifconfig=ifconfig,
849 incremental=incremental,
850 report_every=report_every,
851 queue=False,
852 )
854 records_left = True
855 while records_left:
856 (
857 cloud_requests,
858 records_left,
859 global_recnum,
860 ) = sender.send_requests(global_recnum)
861 progrecs = set() # type: Set[NlpRecord]
862 for cloud_request in cloud_requests:
863 if cloud_request.request_failed:
864 continue
865 # (a) handle the actual data
866 cloud_request.process_all()
867 # (b) handle the progress records
868 docresultlist = extract_nlprp_top_level_results(
869 cloud_request.nlp_data
870 )
871 for result in docresultlist:
872 # 'metadata' is just 'other_values' from before
873 _, pkval, pkstr, srchash = parse_nlprp_docresult_metadata(
874 result
875 )
876 progrec = None
877 if incremental:
878 # A word of explanation: to get here, the record must
879 # have already been found worthy of updating. This is
880 # now ensured by the CloudRequestSender, which will
881 # skip relevant unchanged records.
882 crinfo.delete_dest_records(
883 ifconfig, pkval, pkstr, commit=True
884 )
885 # Record progress in progress database
886 progrec = ifconfig.get_progress_record(pkval, pkstr)
887 # Check that we haven't already done the progrec for this
888 # record to avoid clashes - it's possible as each processor
889 # may contain results for each record and a set of results
890 # is a list of processors and their results
891 #
892 # if srchash in seen_srchashs:
893 # progrec = ifconfig.get_progress_record(pkval, pkstr)
894 #
895 # Make a note in the progress database that we've processed
896 # a source record
897 if progrec: # modifying an existing record
898 progrec.whenprocessedutc = nlpdef.now
899 progrec.srchash = srchash
900 else: # creating a new record
901 progrec = NlpRecord(
902 # Quasi-key fields:
903 srcdb=ifconfig.srcdb,
904 srctable=ifconfig.srctable,
905 srcpkval=pkval,
906 srcpkstr=pkstr,
907 srcfield=ifconfig.srcfield,
908 nlpdef=nlpdef.name,
909 # Other fields:
910 srcpkfield=ifconfig.srcpkfield,
911 whenprocessedutc=nlpdef.now,
912 srchash=srchash,
913 )
914 progrecs.add(progrec)
915 with MultiTimerContext(timer, TIMING_PROGRESS_DB_ADD):
916 log.info("Adding to database...")
917 session.bulk_save_objects(progrecs)
918 session.commit()
920 nlpdef.commit_all()
923def cancel_request(nlpdef: NlpDefinition, cancel_all: bool = False) -> None:
924 """
925 Delete pending requests from the server's queue.
926 """
927 nlpname = nlpdef.name
928 cloudcfg = nlpdef.get_cloud_config_or_raise()
929 cloud_request = CloudRequestQueueManagement(nlpdef=nlpdef)
931 if cancel_all:
932 # Deleting all from queue!
933 cloud_request.delete_all_from_queue()
934 log.info("All cloud requests cancelled.")
935 # Should the files be deleted in the program or is that dangerous?
936 # ... OK now we guarantee that CRATE will create and use a specific
937 # directory.
938 purge(cloudcfg.req_data_dir, "*")
939 return
940 # Otherwise:
942 filename = cloudcfg.data_filename
943 if not os.path.exists(filename):
944 log.error(
945 f"File {filename!r} does not exist. "
946 f"Request may not have been sent."
947 )
948 raise FileNotFoundError
949 queue_ids = [] # type: List[str]
950 with open(filename, "r") as request_data:
951 reqdata = request_data.readlines()
952 for line in reqdata:
953 if_section, queue_id = line.strip().split(",")
954 queue_ids.append(queue_id)
955 cloud_request.delete_from_queue(queue_ids)
956 # Remove the file with the request info
957 os.remove(filename)
958 log.info(f"Cloud request for nlp definition {nlpname} cancelled.")
961def show_cloud_queue(nlpdef: NlpDefinition) -> None:
962 """
963 Get list of the user's queued requests and print to screen.
964 """
965 cloud_request = CloudRequestQueueManagement(nlpdef=nlpdef)
966 queue = cloud_request.show_queue()
967 if not queue:
968 log.info("No requests in queue.")
969 return
970 writer = None
971 for entry in queue:
972 if writer is None: # first line
973 writer = csv.DictWriter(sys.stdout, fieldnames=entry.keys())
974 writer.writeheader()
975 writer.writerow(entry)
978def print_cloud_processors(
979 nlpdef: NlpDefinition, indent: int = 4, sort_keys: bool = True
980) -> None:
981 """
982 Print remote processor definitions to the screen.
983 """
984 cloud_request = CloudRequestListProcessors(nlpdef=nlpdef)
985 procs = cloud_request.get_remote_processors()
986 asdictlist = [p.infodict for p in procs]
987 text = json.dumps(asdictlist, indent=indent, sort_keys=sort_keys)
988 print(text)
991# =============================================================================
992# Database info
993# =============================================================================
996def show_source_counts(nlpdef: NlpDefinition) -> None:
997 """
998 Print (to stdout) the number of records in all source tables.
1000 Args:
1001 nlpdef:
1002 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
1003 """
1004 print("SOURCE TABLE RECORD COUNTS:")
1005 counts = [] # type: List[Tuple[str, int]]
1006 for ifconfig in nlpdef.inputfieldconfigs:
1007 session = ifconfig.source_session
1008 dbname = ifconfig.srcdb
1009 tablename = ifconfig.srctable
1010 n = count_star(session, tablename)
1011 counts.append((f"{dbname}.{tablename}", n))
1012 print_record_counts(counts)
1015def show_dest_counts(nlpdef: NlpDefinition) -> None:
1016 """
1017 Print (to stdout) the number of records in all destination tables.
1019 Args:
1020 nlpdef:
1021 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
1022 """
1023 print("DESTINATION TABLE RECORD COUNTS:")
1024 counts = [] # type: List[Tuple[str, int]]
1025 for processor in nlpdef.processors:
1026 session = processor.dest_session
1027 dbname = processor.dest_dbname
1028 for tablename in processor.get_tablenames():
1029 n = count_star(session, tablename)
1030 counts.append((f"DESTINATION: {dbname}.{tablename}", n))
1031 print_record_counts(counts)
1034# =============================================================================
1035# NLP testing
1036# =============================================================================
1039def test_nlp_stdin(nlpdef: NlpDefinition) -> None:
1040 """
1041 Tests NLP processor(s) by sending stdin to it/them.
1043 Args:
1044 nlpdef:
1045 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
1046 """
1047 processors = nlpdef.processors
1048 processor_names = ", ".join(
1049 p.friendly_name_with_section for p in processors
1050 )
1051 log.info(f"Testing NLP processors: {processor_names}")
1052 if nlpdef.uses_cloud_processors:
1053 crinfo = CloudRunInfo(
1054 nlpdef, debug_post_request=True, debug_post_response=True
1055 )
1056 else:
1057 crinfo = None
1058 for text in gen_chunks_from_files(
1059 filenames=["-"],
1060 stdin_prompt="Please type lines of text to be processed. "
1061 "End with a blank line.",
1062 chunk_terminator_line="",
1063 ):
1064 if relevant_for_nlp(text):
1065 log.info(f"INPUT: {text!r}")
1066 result_found = False
1067 for p in processors: # type: BaseNlpParser
1069 if p.is_cloud_processor():
1070 # Cloud processor.
1071 assert crinfo is not None
1072 assert isinstance(p, Cloud)
1073 procreq = CloudRequestProcess(
1074 crinfo=crinfo,
1075 nlpdef=nlpdef,
1076 debug_post_request=True,
1077 debug_post_response=True,
1078 )
1079 procreq.add_text(text, metadata={})
1080 procreq.send_process_request(queue=False)
1081 results = extract_nlprp_top_level_results(procreq.nlp_data)
1082 result_found = True
1083 # ... may not really be true, but we have something to show
1084 formatted_results = json.dumps(results, indent=JSON_INDENT)
1085 log.info(f"RESULTS:\n{formatted_results}")
1087 else:
1088 # Local (non-cloud) NLP processor.
1089 assert isinstance(p, BaseNlpParser)
1090 for tablename, valuedict in p.parse(text):
1091 result_found = True
1092 log.info(f"RESULT: {tablename}: {valuedict}")
1094 if not result_found:
1095 log.info("[No results.]")
1096 else:
1097 log.info("Ignoring irrelevant line.")
1100# =============================================================================
1101# Main
1102# =============================================================================
1105def inner_main() -> None:
1106 """
1107 Indirect command-line entry point. See command-line help.
1108 """
1109 version = f"Version {CRATE_VERSION} ({CRATE_VERSION_DATE})"
1110 description = f"NLP manager. {version}. Created by Rudolf Cardinal."
1112 # todo: better with a subcommand parser?
1114 # noinspection PyTypeChecker
1115 parser = argparse.ArgumentParser(
1116 description=description,
1117 formatter_class=RawDescriptionArgumentDefaultsRichHelpFormatter,
1118 )
1120 config_options = parser.add_argument_group("Config options")
1121 config_options.add_argument(
1122 "--config",
1123 help=f"Config file (overriding environment variable "
1124 f"{NLP_CONFIG_ENV_VAR})",
1125 )
1126 config_options.add_argument(
1127 "--nlpdef", help="NLP definition name (from config file)"
1128 )
1130 mode_group = config_options.add_mutually_exclusive_group()
1131 mode_group.add_argument(
1132 "-i",
1133 "--incremental",
1134 dest="incremental",
1135 action="store_true",
1136 help="Process only new/changed information, where possible",
1137 default=True,
1138 )
1139 mode_group.add_argument(
1140 "-f",
1141 "--full",
1142 dest="incremental",
1143 action="store_false",
1144 help="Drop and remake everything",
1145 default=False,
1146 )
1148 config_options.add_argument(
1149 "--dropremake",
1150 action="store_true",
1151 help="Drop/remake destination tables only",
1152 )
1153 config_options.add_argument(
1154 "--skipdelete",
1155 dest="skipdelete",
1156 action="store_true",
1157 help="For incremental updates, skip deletion of rows "
1158 "present in the destination but not the source",
1159 )
1160 config_options.add_argument(
1161 "--nlp", action="store_true", help="Perform NLP processing only"
1162 )
1164 config_options.add_argument(
1165 "--chunksize",
1166 type=int,
1167 default=DEFAULT_CHUNKSIZE,
1168 help="Number of records copied in a chunk when copying PKs from one "
1169 "database to another",
1170 )
1172 reporting_options = parser.add_argument_group("Reporting options")
1173 reporting_options.add_argument(
1174 "--verbose",
1175 "-v",
1176 action="store_true",
1177 help="Be verbose (use twice for extra verbosity)",
1178 )
1179 reporting_options.add_argument(
1180 "--report_every_fast",
1181 type=int,
1182 default=DEFAULT_REPORT_EVERY,
1183 help="Report insert progress (for fast operations) every n rows in "
1184 "verbose mode",
1185 )
1186 reporting_options.add_argument(
1187 "--report_every_nlp",
1188 type=int,
1189 default=DEFAULT_REPORT_EVERY_NLP,
1190 help="Report progress for NLP every n rows in verbose mode",
1191 )
1192 reporting_options.add_argument(
1193 "--echo", action="store_true", help="Echo SQL"
1194 )
1195 reporting_options.add_argument(
1196 "--timing", action="store_true", help="Show detailed timing breakdown"
1197 )
1199 multiproc_options = parser.add_argument_group("Multiprocessing options")
1200 multiproc_options.add_argument(
1201 "--process",
1202 type=int,
1203 default=0,
1204 help="For multiprocess mode: specify process number",
1205 )
1206 multiproc_options.add_argument(
1207 "--nprocesses",
1208 type=int,
1209 default=1,
1210 help="For multiprocess mode: specify total number of processes "
1211 "(launched somehow, of which this is to be one)",
1212 )
1213 multiproc_options.add_argument(
1214 "--processcluster", default="", help="Process cluster name"
1215 )
1217 info_actions = parser.add_argument_group("Info actions")
1218 info_actions.add_argument("--version", action="version", version=version)
1219 info_actions.add_argument(
1220 "--democonfig", action="store_true", help="Print a demo config file"
1221 )
1222 info_actions.add_argument(
1223 "--listprocessors",
1224 action="store_true",
1225 help="Show all possible built-in NLP processor names",
1226 )
1227 info_actions.add_argument(
1228 "--describeprocessors",
1229 action="store_true",
1230 help="Show details of all built-in NLP processors",
1231 )
1232 info_actions.add_argument(
1233 "--test_nlp",
1234 action="store_true",
1235 help="Test the NLP processor(s) for the selected definition, "
1236 "by sending text from stdin to them",
1237 )
1238 info_actions.add_argument(
1239 "--print_local_processors",
1240 action="store_true",
1241 help="For the chosen NLP definition, establish which local NLP "
1242 "processors are involved (if any). Show detailed information "
1243 "about these processors (as NLPRP JSON), then stop",
1244 )
1245 info_actions.add_argument(
1246 "--print_cloud_processors",
1247 action="store_true",
1248 help=f"For the chosen NLP definition, establish the relevant cloud "
1249 f"server, if applicable (from the "
1250 f"{NlpDefConfigKeys.CLOUD_CONFIG!r} parameter). Ask that remote "
1251 f"server about its available NLP processors. Show detailed "
1252 f"information about these remote processors (as NLPRP JSON), "
1253 f"then stop",
1254 )
1255 info_actions.add_argument(
1256 "--count",
1257 action="store_true",
1258 help="Count records in source/destination databases, then stop",
1259 )
1261 cloud_options = parser.add_argument_group("Cloud options")
1262 cloud_options.add_argument(
1263 "--cloud",
1264 action="store_true",
1265 help="Use cloud-based NLP processing tools. Queued mode by default.",
1266 )
1267 cloud_options.add_argument(
1268 "--immediate",
1269 action="store_true",
1270 help="To be used with 'cloud'. Process immediately.",
1271 )
1272 cloud_options.add_argument(
1273 "--retrieve", action="store_true", help="Retrieve NLP data from cloud"
1274 )
1275 cloud_options.add_argument(
1276 "--cancelrequest",
1277 action="store_true",
1278 help="Cancel pending requests for the nlpdef specified",
1279 )
1280 cloud_options.add_argument(
1281 "--cancelall",
1282 action="store_true",
1283 help="Cancel all pending cloud requests. WARNING: this option "
1284 "cancels all pending requests - not just those for the nlp "
1285 "definition specified",
1286 )
1287 cloud_options.add_argument(
1288 "--showqueue",
1289 action="store_true",
1290 help="Shows all pending cloud requests.",
1291 )
1293 args = parser.parse_args()
1295 # Validate args
1296 if args.nprocesses < 1:
1297 raise ValueError("--nprocesses must be >=1")
1298 if args.process < 0 or args.process >= args.nprocesses:
1299 raise ValueError(
1300 "--process argument must be from 0 to (nprocesses - 1) inclusive"
1301 )
1302 if args.config:
1303 os.environ[NLP_CONFIG_ENV_VAR] = args.config
1304 if args.cloud and args.retrieve:
1305 raise ValueError("--cloud and --retrieve cannot be used together")
1307 # Verbosity and logging
1308 mynames = [] # type: List[str]
1309 if args.processcluster:
1310 mynames.append(args.processcluster)
1311 if args.nprocesses > 1:
1312 mynames.append(f"proc{args.process}")
1313 loglevel = logging.DEBUG if args.verbose else logging.INFO
1314 rootlogger = logging.getLogger()
1315 configure_logger_for_colour(rootlogger, level=loglevel, extranames=mynames)
1317 # -------------------------------------------------------------------------
1318 # Information/test options
1319 # -------------------------------------------------------------------------
1321 # Demo config?
1322 if args.democonfig:
1323 print(demo_nlp_config())
1324 return
1326 # List or describe processors?
1327 if args.listprocessors:
1328 print("\n".join(possible_processor_names_including_cloud()))
1329 return
1330 if args.describeprocessors:
1331 print(possible_processor_table())
1332 return
1334 # Otherwise, we need a valid NLP definition.
1335 if args.nlpdef is None:
1336 raise ValueError(
1337 "Must specify nlpdef parameter (unless --democonfig, "
1338 "--listprocessors, or --describeprocessors used)"
1339 )
1341 everything = not any([args.dropremake, args.nlp])
1343 # Report args
1344 log.debug(f"arguments: {args}")
1346 # Load/validate config
1347 nlpdef = NlpDefinition(
1348 args.nlpdef, logtag="_".join(mynames).replace(" ", "_")
1349 )
1350 nlpdef.set_echo(args.echo)
1352 # Count only?
1353 if args.count:
1354 show_source_counts(nlpdef)
1355 show_dest_counts(nlpdef)
1356 return
1358 # Show configured processor definitions only?
1359 if args.print_local_processors:
1360 print(nlpdef.nlprp_local_processors_json())
1361 return
1362 if args.print_cloud_processors:
1363 print_cloud_processors(nlpdef)
1364 return
1366 # Test NLP processor via stdin?
1367 if args.test_nlp:
1368 test_nlp_stdin(nlpdef)
1369 return
1371 # -------------------------------------------------------------------------
1372 # Cloud queue manipulation options
1373 # -------------------------------------------------------------------------
1375 # Delete from queue - do this before Drop/Remake and return so we don't
1376 # drop all the tables just to cancel the request
1377 # Same for 'showqueue'. All of these need config as they require url etc.
1378 if args.cancelrequest:
1379 cancel_request(nlpdef)
1380 return
1381 if args.cancelall:
1382 cancel_request(nlpdef, cancel_all=args.cancelall)
1383 return
1384 if args.showqueue:
1385 show_cloud_queue(nlpdef)
1386 return
1388 # -------------------------------------------------------------------------
1389 # Main NLP options
1390 # -------------------------------------------------------------------------
1392 crinfo = None # type: Optional[CloudRunInfo] # for type checker!
1393 if args.cloud or args.retrieve:
1394 # Set appropriate things for cloud - need to do this before
1395 # drop_remake, but after cancel_request or show_cloud_queue to avoid
1396 # unecessary requests
1397 cloudcfg = nlpdef.get_cloud_config_or_raise()
1398 CloudRequest.set_rate_limit(cloudcfg.rate_limit_hz)
1399 crinfo = CloudRunInfo(nlpdef)
1401 log.info(f"Starting: incremental={args.incremental}")
1402 start = get_now_utc_pendulum()
1403 timer.set_timing(args.timing, reset=True)
1405 # 1. Drop/remake tables. Single-tasking only.
1406 with MultiTimerContext(timer, TIMING_DROP_REMAKE):
1407 if args.dropremake or everything:
1408 drop_remake(
1409 nlpdef,
1410 incremental=args.incremental,
1411 skipdelete=args.skipdelete,
1412 report_every=args.report_every_fast,
1413 chunksize=args.chunksize,
1414 )
1416 # From here, in a multiprocessing environment, trap any errors simply so
1417 # we can report the process number clearly.
1419 # 2. NLP
1420 if args.nlp or everything:
1421 if args.cloud:
1422 if args.immediate:
1423 process_cloud_now(
1424 crinfo,
1425 incremental=args.incremental,
1426 report_every=args.report_every_nlp,
1427 )
1428 else:
1429 process_cloud_nlp(
1430 crinfo,
1431 incremental=args.incremental,
1432 report_every=args.report_every_nlp,
1433 )
1434 elif args.retrieve:
1435 retrieve_nlp_data(crinfo, incremental=args.incremental)
1436 else:
1437 process_nlp(
1438 nlpdef,
1439 incremental=args.incremental,
1440 report_every=args.report_every_nlp,
1441 tasknum=args.process,
1442 ntasks=args.nprocesses,
1443 )
1445 log.info("Finished")
1446 end = get_now_utc_pendulum()
1447 time_taken = end - start
1448 log.info(f"Time taken: {time_taken.total_seconds():.3f} seconds")
1450 if args.timing:
1451 timer.report()
1454def main() -> None:
1455 """
1456 Command-line entry point.
1457 """
1458 call_main_with_exception_reporting(inner_main)
1461# =============================================================================
1462# Command-line entry point
1463# =============================================================================
1465if __name__ == "__main__":
1466 main()