Coverage for nlp_manager/parse_gate.py: 43%
141 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/nlp_manager/parse_gate.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**NLP handler for external GATE NLP tools.**
28The pipe encoding (Python -> Java stdin, Java stdout -> Python) is fixed to be
29UTF-8, here and in the Java code.
31"""
33import logging
34import os
35import shlex
36import subprocess
37from typing import Any, Dict, Generator, List, Tuple
39from cardinal_pythonlib.cmdline import cmdline_quote
40from cardinal_pythonlib.dicts import (
41 rename_keys_in_dict,
42 set_null_values_in_dict,
43)
44from cardinal_pythonlib.lists import chunks
45from cardinal_pythonlib.tsv import tsv_pairs_to_dict
46from sqlalchemy import Column, Index
48from crate_anon.nlp_manager.base_nlp_parser import (
49 BaseNlpParser,
50 TextProcessingFailed,
51)
52from crate_anon.nlp_manager.constants import (
53 MAX_SQL_FIELD_LEN,
54 ProcessorConfigKeys,
55 GateFieldNames,
56)
57from crate_anon.nlp_manager.nlp_definition import (
58 NlpDefinition,
59)
60from crate_anon.nlprp.constants import NlprpKeys, NlprpValues
61from crate_anon.nlp_manager.output_user_config import OutputUserConfig
63log = logging.getLogger(__name__)
66# =============================================================================
67# Process handling
68# =============================================================================
69# Have Python host the client process, communicating with stdin/stdout?
70# http://eyalarubas.com/python-subproc-nonblock.html
71# https://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate # noqa: E501
72# Java process could be a network server.
73# http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html
74# http://www.tutorialspoint.com/java/java_networking.htm
75# OK, first one works; that's easier.
78class Gate(BaseNlpParser):
79 """
80 EXTERNAL.
82 Abstract NLP processor controlling an external process, typically our Java
83 interface to GATE programs, ``CrateGatePipeline.java`` (but it could be any
84 external program).
86 We send text to it, it parses the text, and it sends us back results, which
87 we return as dictionaries. The specific text sought depends on the
88 configuration file and the specific GATE program used.
90 For details of GATE, see https://www.gate.ac.uk/.
91 """
93 _ = """
95 Notes:
97 - PROBLEM when attempting to use KConnect (Bio-YODIE): its source code is
98 full of direct calls to ``System.out.println()``.
100 POTENTIAL SOLUTIONS:
102 - named pipes:
104 - ``os.mkfifo()`` - Unix only.
105 - ``win32pipe`` - https://stackoverflow.com/questions/286614
107 - ZeroMQ with some sort of security
109 - ``pip install zmq``
110 - some sort of Java binding (``jzmq``, ``jeromq``...)
112 - redirect ``stdout`` in our Java handler
114 - ``System.setOut()``... yes, that works.
115 - Implemented and exposed as ``--suppress_gate_stdout``.
117 """
119 uses_external_tool = True
121 def __init__(
122 self,
123 nlpdef: NlpDefinition,
124 cfg_processor_name: str,
125 commit: bool = False,
126 ) -> None:
127 """
128 Args:
129 nlpdef:
130 a :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
131 cfg_processor_name:
132 the name of a CRATE NLP config file section (from which we may
133 choose to get extra config information)
134 commit:
135 force a COMMIT whenever we insert data? You should specify this
136 in multiprocess mode, or you may get database deadlocks.
137 """
138 super().__init__(
139 nlpdef=nlpdef,
140 cfg_processor_name=cfg_processor_name,
141 commit=commit,
142 friendly_name="GATE",
143 )
145 if not nlpdef and not cfg_processor_name:
146 # Debugging only
147 self._debug_mode = True
148 self._max_external_prog_uses = 0
149 self._input_terminator = "input_terminator"
150 self._output_terminator = "output_terminator"
151 typepairs = [] # type: List[str]
152 self._progenvsection = ""
153 progargs = ""
154 logtag = ""
155 else:
156 self._debug_mode = False
157 self._max_external_prog_uses = self._cfgsection.opt_int_positive(
158 ProcessorConfigKeys.MAX_EXTERNAL_PROG_USES, default=0
159 )
160 self._input_terminator = self._cfgsection.opt_str(
161 ProcessorConfigKeys.INPUT_TERMINATOR, required=True
162 )
163 self._output_terminator = self._cfgsection.opt_str(
164 ProcessorConfigKeys.OUTPUT_TERMINATOR, required=True
165 )
166 typepairs = self._cfgsection.opt_strlist(
167 ProcessorConfigKeys.OUTPUTTYPEMAP, required=True, lower=False
168 )
169 self._progenvsection = self._cfgsection.opt_str(
170 ProcessorConfigKeys.PROGENVSECTION
171 )
172 progargs = self._cfgsection.opt_str(
173 ProcessorConfigKeys.PROGARGS, required=True
174 )
175 logtag = nlpdef.logtag or "."
176 # Also, ensure the user doesn't specify desttable (would be
177 # confusing).
178 if self._cfgsection.opt_str(ProcessorConfigKeys.DESTTABLE):
179 raise ValueError(
180 f"For GATE processors, don't specify "
181 f"{ProcessorConfigKeys.DESTTABLE!r}; table information is "
182 f"in {ProcessorConfigKeys.OUTPUTTYPEMAP!r}"
183 )
185 self._outputtypemap = {} # type: Dict[str, OutputUserConfig]
186 self._type_to_tablename = {} # type: Dict[str, str]
187 for annottype, outputsection in chunks(typepairs, 2):
188 annottype = annottype.lower()
189 c = OutputUserConfig(
190 config_parser=nlpdef.parser,
191 cfg_output_name=outputsection,
192 )
193 self._outputtypemap[annottype] = c
194 self._type_to_tablename[annottype] = c.dest_tablename
196 if self._progenvsection:
197 # noinspection PyTypeChecker
198 self._env = nlpdef.get_env_dict(self._progenvsection, os.environ)
199 else:
200 self._env = os.environ.copy()
201 self._env["NLPLOGTAG"] = logtag
202 # ... We have ensured that this is not empty for real use, because
203 # passing a "-lt" switch with no parameter will make
204 # CrateGatePipeline.java complain and stop. The environment variable
205 # is read via the "progargs" config argument, as follows.
207 formatted_progargs = progargs.format(**self._env)
208 self._progargs = shlex.split(formatted_progargs)
210 self._n_uses = 0
211 self._pipe_encoding = "utf8"
212 self._p = None # the subprocess
213 self._started = False
215 # Sanity checks
216 for ty, tn in self._type_to_tablename.items():
217 assert (
218 len(tn) <= MAX_SQL_FIELD_LEN
219 ), f"Table name too long (max {MAX_SQL_FIELD_LEN} characters)"
221 # -------------------------------------------------------------------------
222 # External process control
223 # -------------------------------------------------------------------------
225 def _start(self) -> None:
226 """
227 Launch the external process, with stdin/stdout connections to it.
228 """
229 if self._started:
230 return
231 args = self._progargs
232 log.info(f"Launching command: {cmdline_quote(args)}")
233 self._p = subprocess.Popen(
234 args,
235 stdin=subprocess.PIPE,
236 stdout=subprocess.PIPE,
237 # stderr=subprocess.PIPE,
238 shell=False,
239 bufsize=1,
240 )
241 # ... don't ask for stderr to be piped if you don't want it; firstly,
242 # there's a risk that if you don't consume it, something hangs, and
243 # secondly if you don't consume it, you see it on the console, which is
244 # helpful.
245 self._started = True
247 def _encode_to_subproc_stdin(self, text: str) -> None:
248 """
249 Send text to the external program (via its stdin), encoding it in the
250 process (typically to UTF-8).
251 """
252 log.debug("SENDING: " + text)
253 bytes_ = text.encode(self._pipe_encoding)
254 self._p.stdin.write(bytes_)
256 def _flush_subproc_stdin(self) -> None:
257 """
258 Flushes what we're sending to the external program via its stdin.
259 """
260 self._p.stdin.flush()
262 def _decode_from_subproc_stdout(self) -> str:
263 """
264 Decode what we've received from the external program's stdout, from its
265 specific encoding (usually UTF-8) to a Python string.
266 """
267 bytes_ = self._p.stdout.readline()
268 text = bytes_.decode(self._pipe_encoding)
269 log.debug("RECEIVING: " + repr(text))
270 return text
272 def _finish(self) -> None:
273 """
274 Close down the external process.
275 """
276 if not self._started:
277 return
278 # close p.stdout, wait for the subprocess to exit
279 self._p.communicate()
280 self._started = False
281 self._n_uses = 0
283 def _restart(self) -> None:
284 """
285 Close down the external process and restart it.
286 """
287 self._finish()
288 self._start()
290 # -------------------------------------------------------------------------
291 # Input processing
292 # -------------------------------------------------------------------------
294 def parse(
295 self, text: str
296 ) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
297 """
298 - Send text to the external process, and receive the result.
299 - Note that associated data is not passed into this function, and is
300 kept in the Python environment, so we can't run into any problems
301 with the transfer to/from the Java program garbling important data.
302 All we send to the subprocess is the text (and an input_terminator).
303 Then, we may receive MULTIPLE sets of data back ("your text contains
304 the following 7 people/drug references/whatever"), followed
305 eventually by the output_terminator, at which point this set is
306 complete.
307 """
308 self._start() # ensure started
310 try:
311 # Send
312 log.debug("writing: " + text)
313 self._encode_to_subproc_stdin(text)
314 self._encode_to_subproc_stdin(os.linesep)
315 self._encode_to_subproc_stdin(self._input_terminator + os.linesep)
316 self._flush_subproc_stdin() # required in the Python 3 system
318 # Receive
319 for line in iter(
320 self._decode_from_subproc_stdout,
321 self._output_terminator + os.linesep,
322 ):
323 # ... iterate until the sentinel output_terminator is received
324 line = line.rstrip("\n")
325 # ... remove trailing newline, but NOT TABS
326 # ... if you strip tabs, you get superfluous
327 # "Bad chunk, not of length 2" messages.
328 log.debug("stdout received: " + line)
329 d = tsv_pairs_to_dict(line)
330 log.debug(f"dictionary received: {d}")
331 try:
332 annottype = d[GateFieldNames.TYPE].lower()
333 except KeyError:
334 raise ValueError("_type information not in data received")
335 if annottype not in self._type_to_tablename:
336 log.warning(
337 f"Unknown annotation type, skipping: {annottype}"
338 )
339 continue
340 c = self._outputtypemap[annottype]
341 rename_keys_in_dict(d, c.renames)
342 set_null_values_in_dict(d, c.null_literals)
343 yield self._type_to_tablename[annottype], d
345 self._n_uses += 1
346 # Restart subprocess?
347 if 0 < self._max_external_prog_uses <= self._n_uses:
348 log.info(f"relaunching app after {self._n_uses} uses")
349 self._restart()
351 except BrokenPipeError:
352 log.error("Broken pipe; relaunching app")
353 self._restart()
354 raise TextProcessingFailed()
356 # -------------------------------------------------------------------------
357 # Test
358 # -------------------------------------------------------------------------
360 def test(self, verbose: bool = False) -> None:
361 """
362 Test the :func:`send` function.
363 """
364 if self._debug_mode:
365 return
366 self.test_parser(
367 ["Bob Hope visited Seattle.", "James Joyce wrote Ulysses."]
368 )
370 # -------------------------------------------------------------------------
371 # Database structure
372 # -------------------------------------------------------------------------
374 def dest_tables_columns(self) -> Dict[str, List[Column]]:
375 # docstring in superclass
376 tables = {} # type: Dict[str, List[Column]]
377 for anottype, otconfig in self._outputtypemap.items():
378 tables[
379 otconfig.dest_tablename
380 ] = self._standard_gate_columns() + otconfig.get_columns(
381 self.dest_engine
382 )
383 return tables
385 def dest_tables_indexes(self) -> Dict[str, List[Index]]:
386 # docstring in superclass
387 tables = {} # type: Dict[str, List[Index]]
388 for anottype, otconfig in self._outputtypemap.items():
389 dest_tablename = otconfig.dest_tablename
390 tables[dest_tablename] = (
391 self._standard_gate_indexes(dest_tablename) + otconfig.indexes
392 )
393 return tables
395 def nlprp_name(self) -> str:
396 # docstring in superclass
397 if self._debug_mode:
398 return super().nlprp_name()
399 else:
400 return self._nlpdef.name
402 def nlprp_schema_info(self, sql_dialect: str = None) -> Dict[str, Any]:
403 # We do not absolutely need to override nlprp_schema_info(). Although
404 # CRATE's GATE processor doesn't automatically know its schema, it is
405 # told by the config (i.e. by the user) and can pass on that
406 # information. However, for debug mode, it's helpful to override.
407 if self._debug_mode:
408 return {NlprpKeys.SCHEMA_TYPE: NlprpValues.UNKNOWN}
409 else:
410 return super().nlprp_schema_info(sql_dialect)