Coverage for nlp_manager/parse_gate.py: 43%

141 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-08-27 10:34 -0500

1""" 

2crate_anon/nlp_manager/parse_gate.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CRATE. 

10 

11 CRATE is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CRATE is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CRATE. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**NLP handler for external GATE NLP tools.** 

27 

28The pipe encoding (Python -> Java stdin, Java stdout -> Python) is fixed to be 

29UTF-8, here and in the Java code. 

30 

31""" 

32 

33import logging 

34import os 

35import shlex 

36import subprocess 

37from typing import Any, Dict, Generator, List, Tuple 

38 

39from cardinal_pythonlib.cmdline import cmdline_quote 

40from cardinal_pythonlib.dicts import ( 

41 rename_keys_in_dict, 

42 set_null_values_in_dict, 

43) 

44from cardinal_pythonlib.lists import chunks 

45from cardinal_pythonlib.tsv import tsv_pairs_to_dict 

46from sqlalchemy import Column, Index 

47 

48from crate_anon.nlp_manager.base_nlp_parser import ( 

49 BaseNlpParser, 

50 TextProcessingFailed, 

51) 

52from crate_anon.nlp_manager.constants import ( 

53 MAX_SQL_FIELD_LEN, 

54 ProcessorConfigKeys, 

55 GateFieldNames, 

56) 

57from crate_anon.nlp_manager.nlp_definition import ( 

58 NlpDefinition, 

59) 

60from crate_anon.nlprp.constants import NlprpKeys, NlprpValues 

61from crate_anon.nlp_manager.output_user_config import OutputUserConfig 

62 

63log = logging.getLogger(__name__) 

64 

65 

66# ============================================================================= 

67# Process handling 

68# ============================================================================= 

69# Have Python host the client process, communicating with stdin/stdout? 

70# http://eyalarubas.com/python-subproc-nonblock.html 

71# https://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate # noqa: E501 

72# Java process could be a network server. 

73# http://docs.oracle.com/javase/tutorial/networking/sockets/clientServer.html 

74# http://www.tutorialspoint.com/java/java_networking.htm 

75# OK, first one works; that's easier. 

76 

77 

78class Gate(BaseNlpParser): 

79 """ 

80 EXTERNAL. 

81 

82 Abstract NLP processor controlling an external process, typically our Java 

83 interface to GATE programs, ``CrateGatePipeline.java`` (but it could be any 

84 external program). 

85 

86 We send text to it, it parses the text, and it sends us back results, which 

87 we return as dictionaries. The specific text sought depends on the 

88 configuration file and the specific GATE program used. 

89 

90 For details of GATE, see https://www.gate.ac.uk/. 

91 """ 

92 

93 _ = """ 

94 

95 Notes: 

96 

97 - PROBLEM when attempting to use KConnect (Bio-YODIE): its source code is 

98 full of direct calls to ``System.out.println()``. 

99 

100 POTENTIAL SOLUTIONS: 

101 

102 - named pipes: 

103 

104 - ``os.mkfifo()`` - Unix only. 

105 - ``win32pipe`` - https://stackoverflow.com/questions/286614 

106 

107 - ZeroMQ with some sort of security 

108 

109 - ``pip install zmq`` 

110 - some sort of Java binding (``jzmq``, ``jeromq``...) 

111 

112 - redirect ``stdout`` in our Java handler 

113 

114 - ``System.setOut()``... yes, that works. 

115 - Implemented and exposed as ``--suppress_gate_stdout``. 

116 

117 """ 

118 

119 uses_external_tool = True 

120 

121 def __init__( 

122 self, 

123 nlpdef: NlpDefinition, 

124 cfg_processor_name: str, 

125 commit: bool = False, 

126 ) -> None: 

127 """ 

128 Args: 

129 nlpdef: 

130 a :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition` 

131 cfg_processor_name: 

132 the name of a CRATE NLP config file section (from which we may 

133 choose to get extra config information) 

134 commit: 

135 force a COMMIT whenever we insert data? You should specify this 

136 in multiprocess mode, or you may get database deadlocks. 

137 """ 

138 super().__init__( 

139 nlpdef=nlpdef, 

140 cfg_processor_name=cfg_processor_name, 

141 commit=commit, 

142 friendly_name="GATE", 

143 ) 

144 

145 if not nlpdef and not cfg_processor_name: 

146 # Debugging only 

147 self._debug_mode = True 

148 self._max_external_prog_uses = 0 

149 self._input_terminator = "input_terminator" 

150 self._output_terminator = "output_terminator" 

151 typepairs = [] # type: List[str] 

152 self._progenvsection = "" 

153 progargs = "" 

154 logtag = "" 

155 else: 

156 self._debug_mode = False 

157 self._max_external_prog_uses = self._cfgsection.opt_int_positive( 

158 ProcessorConfigKeys.MAX_EXTERNAL_PROG_USES, default=0 

159 ) 

160 self._input_terminator = self._cfgsection.opt_str( 

161 ProcessorConfigKeys.INPUT_TERMINATOR, required=True 

162 ) 

163 self._output_terminator = self._cfgsection.opt_str( 

164 ProcessorConfigKeys.OUTPUT_TERMINATOR, required=True 

165 ) 

166 typepairs = self._cfgsection.opt_strlist( 

167 ProcessorConfigKeys.OUTPUTTYPEMAP, required=True, lower=False 

168 ) 

169 self._progenvsection = self._cfgsection.opt_str( 

170 ProcessorConfigKeys.PROGENVSECTION 

171 ) 

172 progargs = self._cfgsection.opt_str( 

173 ProcessorConfigKeys.PROGARGS, required=True 

174 ) 

175 logtag = nlpdef.logtag or "." 

176 # Also, ensure the user doesn't specify desttable (would be 

177 # confusing). 

178 if self._cfgsection.opt_str(ProcessorConfigKeys.DESTTABLE): 

179 raise ValueError( 

180 f"For GATE processors, don't specify " 

181 f"{ProcessorConfigKeys.DESTTABLE!r}; table information is " 

182 f"in {ProcessorConfigKeys.OUTPUTTYPEMAP!r}" 

183 ) 

184 

185 self._outputtypemap = {} # type: Dict[str, OutputUserConfig] 

186 self._type_to_tablename = {} # type: Dict[str, str] 

187 for annottype, outputsection in chunks(typepairs, 2): 

188 annottype = annottype.lower() 

189 c = OutputUserConfig( 

190 config_parser=nlpdef.parser, 

191 cfg_output_name=outputsection, 

192 ) 

193 self._outputtypemap[annottype] = c 

194 self._type_to_tablename[annottype] = c.dest_tablename 

195 

196 if self._progenvsection: 

197 # noinspection PyTypeChecker 

198 self._env = nlpdef.get_env_dict(self._progenvsection, os.environ) 

199 else: 

200 self._env = os.environ.copy() 

201 self._env["NLPLOGTAG"] = logtag 

202 # ... We have ensured that this is not empty for real use, because 

203 # passing a "-lt" switch with no parameter will make 

204 # CrateGatePipeline.java complain and stop. The environment variable 

205 # is read via the "progargs" config argument, as follows. 

206 

207 formatted_progargs = progargs.format(**self._env) 

208 self._progargs = shlex.split(formatted_progargs) 

209 

210 self._n_uses = 0 

211 self._pipe_encoding = "utf8" 

212 self._p = None # the subprocess 

213 self._started = False 

214 

215 # Sanity checks 

216 for ty, tn in self._type_to_tablename.items(): 

217 assert ( 

218 len(tn) <= MAX_SQL_FIELD_LEN 

219 ), f"Table name too long (max {MAX_SQL_FIELD_LEN} characters)" 

220 

221 # ------------------------------------------------------------------------- 

222 # External process control 

223 # ------------------------------------------------------------------------- 

224 

225 def _start(self) -> None: 

226 """ 

227 Launch the external process, with stdin/stdout connections to it. 

228 """ 

229 if self._started: 

230 return 

231 args = self._progargs 

232 log.info(f"Launching command: {cmdline_quote(args)}") 

233 self._p = subprocess.Popen( 

234 args, 

235 stdin=subprocess.PIPE, 

236 stdout=subprocess.PIPE, 

237 # stderr=subprocess.PIPE, 

238 shell=False, 

239 bufsize=1, 

240 ) 

241 # ... don't ask for stderr to be piped if you don't want it; firstly, 

242 # there's a risk that if you don't consume it, something hangs, and 

243 # secondly if you don't consume it, you see it on the console, which is 

244 # helpful. 

245 self._started = True 

246 

247 def _encode_to_subproc_stdin(self, text: str) -> None: 

248 """ 

249 Send text to the external program (via its stdin), encoding it in the 

250 process (typically to UTF-8). 

251 """ 

252 log.debug("SENDING: " + text) 

253 bytes_ = text.encode(self._pipe_encoding) 

254 self._p.stdin.write(bytes_) 

255 

256 def _flush_subproc_stdin(self) -> None: 

257 """ 

258 Flushes what we're sending to the external program via its stdin. 

259 """ 

260 self._p.stdin.flush() 

261 

262 def _decode_from_subproc_stdout(self) -> str: 

263 """ 

264 Decode what we've received from the external program's stdout, from its 

265 specific encoding (usually UTF-8) to a Python string. 

266 """ 

267 bytes_ = self._p.stdout.readline() 

268 text = bytes_.decode(self._pipe_encoding) 

269 log.debug("RECEIVING: " + repr(text)) 

270 return text 

271 

272 def _finish(self) -> None: 

273 """ 

274 Close down the external process. 

275 """ 

276 if not self._started: 

277 return 

278 # close p.stdout, wait for the subprocess to exit 

279 self._p.communicate() 

280 self._started = False 

281 self._n_uses = 0 

282 

283 def _restart(self) -> None: 

284 """ 

285 Close down the external process and restart it. 

286 """ 

287 self._finish() 

288 self._start() 

289 

290 # ------------------------------------------------------------------------- 

291 # Input processing 

292 # ------------------------------------------------------------------------- 

293 

294 def parse( 

295 self, text: str 

296 ) -> Generator[Tuple[str, Dict[str, Any]], None, None]: 

297 """ 

298 - Send text to the external process, and receive the result. 

299 - Note that associated data is not passed into this function, and is 

300 kept in the Python environment, so we can't run into any problems 

301 with the transfer to/from the Java program garbling important data. 

302 All we send to the subprocess is the text (and an input_terminator). 

303 Then, we may receive MULTIPLE sets of data back ("your text contains 

304 the following 7 people/drug references/whatever"), followed 

305 eventually by the output_terminator, at which point this set is 

306 complete. 

307 """ 

308 self._start() # ensure started 

309 

310 try: 

311 # Send 

312 log.debug("writing: " + text) 

313 self._encode_to_subproc_stdin(text) 

314 self._encode_to_subproc_stdin(os.linesep) 

315 self._encode_to_subproc_stdin(self._input_terminator + os.linesep) 

316 self._flush_subproc_stdin() # required in the Python 3 system 

317 

318 # Receive 

319 for line in iter( 

320 self._decode_from_subproc_stdout, 

321 self._output_terminator + os.linesep, 

322 ): 

323 # ... iterate until the sentinel output_terminator is received 

324 line = line.rstrip("\n") 

325 # ... remove trailing newline, but NOT TABS 

326 # ... if you strip tabs, you get superfluous 

327 # "Bad chunk, not of length 2" messages. 

328 log.debug("stdout received: " + line) 

329 d = tsv_pairs_to_dict(line) 

330 log.debug(f"dictionary received: {d}") 

331 try: 

332 annottype = d[GateFieldNames.TYPE].lower() 

333 except KeyError: 

334 raise ValueError("_type information not in data received") 

335 if annottype not in self._type_to_tablename: 

336 log.warning( 

337 f"Unknown annotation type, skipping: {annottype}" 

338 ) 

339 continue 

340 c = self._outputtypemap[annottype] 

341 rename_keys_in_dict(d, c.renames) 

342 set_null_values_in_dict(d, c.null_literals) 

343 yield self._type_to_tablename[annottype], d 

344 

345 self._n_uses += 1 

346 # Restart subprocess? 

347 if 0 < self._max_external_prog_uses <= self._n_uses: 

348 log.info(f"relaunching app after {self._n_uses} uses") 

349 self._restart() 

350 

351 except BrokenPipeError: 

352 log.error("Broken pipe; relaunching app") 

353 self._restart() 

354 raise TextProcessingFailed() 

355 

356 # ------------------------------------------------------------------------- 

357 # Test 

358 # ------------------------------------------------------------------------- 

359 

360 def test(self, verbose: bool = False) -> None: 

361 """ 

362 Test the :func:`send` function. 

363 """ 

364 if self._debug_mode: 

365 return 

366 self.test_parser( 

367 ["Bob Hope visited Seattle.", "James Joyce wrote Ulysses."] 

368 ) 

369 

370 # ------------------------------------------------------------------------- 

371 # Database structure 

372 # ------------------------------------------------------------------------- 

373 

374 def dest_tables_columns(self) -> Dict[str, List[Column]]: 

375 # docstring in superclass 

376 tables = {} # type: Dict[str, List[Column]] 

377 for anottype, otconfig in self._outputtypemap.items(): 

378 tables[ 

379 otconfig.dest_tablename 

380 ] = self._standard_gate_columns() + otconfig.get_columns( 

381 self.dest_engine 

382 ) 

383 return tables 

384 

385 def dest_tables_indexes(self) -> Dict[str, List[Index]]: 

386 # docstring in superclass 

387 tables = {} # type: Dict[str, List[Index]] 

388 for anottype, otconfig in self._outputtypemap.items(): 

389 dest_tablename = otconfig.dest_tablename 

390 tables[dest_tablename] = ( 

391 self._standard_gate_indexes(dest_tablename) + otconfig.indexes 

392 ) 

393 return tables 

394 

395 def nlprp_name(self) -> str: 

396 # docstring in superclass 

397 if self._debug_mode: 

398 return super().nlprp_name() 

399 else: 

400 return self._nlpdef.name 

401 

402 def nlprp_schema_info(self, sql_dialect: str = None) -> Dict[str, Any]: 

403 # We do not absolutely need to override nlprp_schema_info(). Although 

404 # CRATE's GATE processor doesn't automatically know its schema, it is 

405 # told by the config (i.e. by the user) and can pass on that 

406 # information. However, for debug mode, it's helpful to override. 

407 if self._debug_mode: 

408 return {NlprpKeys.SCHEMA_TYPE: NlprpValues.UNKNOWN} 

409 else: 

410 return super().nlprp_schema_info(sql_dialect)