Coverage for nlp_manager/cloud_parser.py: 85%
131 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/nlp_manager/cloud_parser.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26Send text to a cloud-based NLPRP server for processing.
28"""
30import logging
31from typing import Any, Dict, List, Optional, Tuple, Type, Union
33from cardinal_pythonlib.lists import chunks
34from sqlalchemy.schema import Column, Index
35from sqlalchemy import types as sqlatypes
37from crate_anon.nlp_manager.nlp_definition import NlpDefinition
38from crate_anon.nlp_manager.constants import ProcessorConfigKeys, NlpDefValues
39from crate_anon.nlp_manager.output_user_config import OutputUserConfig
40from crate_anon.nlprp.constants import NlprpKeys, NlprpValues
41from crate_anon.nlp_manager.base_nlp_parser import TableMaker
42from crate_anon.nlp_webserver.server_processor import ServerProcessor
44log = logging.getLogger(__name__)
47# =============================================================================
48# Cloud class for cloud-based processsors
49# =============================================================================
52class Cloud(TableMaker):
53 """
54 EXTERNAL.
56 Abstract NLP processor that passes information to a remote (cloud-based)
57 NLP system via the NLPRP protocol. The processor at the other end might be
58 of any kind.
59 """
61 _is_cloud_processor = True
63 def __init__(
64 self,
65 nlpdef: Optional[NlpDefinition],
66 cfg_processor_name: Optional[str],
67 commit: bool = False,
68 ) -> None:
69 """
70 Args:
71 nlpdef:
72 :class:`crate_anon.nlp_manager.nlp_definition.NlpDefinition`
73 cfg_processor_name:
74 the config section for the processor
75 commit:
76 force a COMMIT whenever we insert data? You should specify this
77 in multiprocess mode, or you may get database deadlocks.
78 """
79 super().__init__(
80 nlpdef, cfg_processor_name, commit, friendly_name="Cloud"
81 )
82 self.remote_processor_info = None # type: Optional[ServerProcessor]
83 self.schema_type = None
84 self.sql_dialect = None
85 self.schema = None # type: Optional[Dict[str, Any]]
86 self.available_remotely = False # update later if available
87 # Output section
88 self._outputtypemap = {} # type: Dict[str, OutputUserConfig]
89 self._type_to_tablename = {} # type: Dict[str, str]
91 if not nlpdef and not cfg_processor_name:
92 # Debugging only
93 self.procname = ""
94 self.procversion = ""
95 self.format = ""
96 else:
97 self.procname = self._cfgsection.opt_str(
98 ProcessorConfigKeys.PROCESSOR_NAME, required=True
99 )
100 self.procversion = self._cfgsection.opt_str(
101 ProcessorConfigKeys.PROCESSOR_VERSION, default=None
102 )
103 # Made format required so people are less likely to make mistakes
104 self.format = self._cfgsection.opt_str(
105 ProcessorConfigKeys.PROCESSOR_FORMAT, required=True
106 )
107 # Output section - bit of repetition from the 'Gate' parser
108 typepairs = self._cfgsection.opt_strlist(
109 ProcessorConfigKeys.OUTPUTTYPEMAP, required=True, lower=False
110 )
111 for output_type, outputsection in chunks(typepairs, 2):
112 output_type = output_type.lower()
113 c = OutputUserConfig(
114 config_parser=nlpdef.parser,
115 cfg_output_name=outputsection,
116 schema_required=False,
117 )
118 self._outputtypemap[output_type] = c
119 self._type_to_tablename[output_type] = c.dest_tablename
120 # Also, ensure the user doesn't specify desttable (would be
121 # confusing).
122 if self._cfgsection.opt_str(ProcessorConfigKeys.DESTTABLE):
123 raise ValueError(
124 f"For cloud processors, don't specify "
125 f"{ProcessorConfigKeys.DESTTABLE!r}; table information is "
126 f"in {ProcessorConfigKeys.OUTPUTTYPEMAP!r}"
127 )
129 @staticmethod
130 def get_coltype_parts(coltype_str: str) -> Tuple[str, Union[str, int]]:
131 """
132 Get root column type and parameter, i.e. for VARCHAR(50)
133 root column type is VARCHAR and parameter is 50.
134 """
135 parts = [x.strip() for x in coltype_str.replace(")", "").split("(")]
136 if len(parts) == 1:
137 col_str = parts[0]
138 parameter = ""
139 else:
140 try:
141 col_str, parameter = parts
142 except ValueError: # e.g. "too many values to unpack"
143 log.error(f"Invalid column type in response: {coltype_str}")
144 raise
145 try:
146 # Turn the parameter into an integer if it's supposed to be one
147 parameter = int(parameter)
148 except ValueError:
149 pass
150 return col_str, parameter
152 @staticmethod
153 def data_type_str_to_coltype(
154 data_type_str: str,
155 ) -> Type[sqlatypes.TypeEngine]:
156 """
157 Get the SQLAlchemy column type class which fits with the data type
158 specified. Currently we IGNORE self.sql_dialect.
159 """
160 coltype = getattr(sqlatypes, data_type_str)
161 # Check if 'coltype' is really an sqlalchemy column type
162 if issubclass(coltype, sqlatypes.TypeEngine):
163 return coltype
164 raise NotImplementedError(
165 f"Don't know the SQLAlchemy column type corresponding to "
166 f"data type: {data_type_str!r}"
167 )
169 def is_tabular(self) -> bool:
170 """
171 Is the format of the schema information given by the remote processor
172 tabular?
173 """
174 return self.schema_type == NlprpValues.TABULAR
176 def get_tabular_schema_tablenames(self) -> List[str]:
177 """
178 Returns the names of the tables in the tabular schema (or an empty list
179 if we do not have a tabular schema).
180 """
181 if not self.is_tabular():
182 return []
183 return list(self.schema.keys())
185 def get_local_from_remote_tablename(self, remote_tablename: str) -> str:
186 """
187 When the remote server specifies a table name, we need to map it to
188 a local database table name.
190 Raises KeyError on failure.
191 """
192 try:
193 return self.get_tablename_from_type(remote_tablename)
194 except KeyError:
195 raise KeyError(
196 "No local table name defined for remote table "
197 f"{remote_tablename!r}"
198 )
200 def get_first_local_tablename(self) -> str:
201 """
202 Used in some circumstances when the remote processor doesn't specify
203 a table.
204 """
205 assert len(self._type_to_tablename) > 0
206 return self._type_to_tablename[0]
208 def get_tablename_from_type(self, output_type: str) -> str:
209 """
210 For simple remote GATE processors, or cloud processors: for a given
211 annotation type (GATE) or remote table name (cloud), return the
212 destination table name.
214 Enforces lower-case lookup.
216 Will raise KeyError if this fails.
217 """
218 return self._type_to_tablename[output_type.lower()]
220 def get_otconf_from_type(self, output_type: str) -> OutputUserConfig:
221 """
222 For a GATE annotation type, or cloud remote table name, return the
223 corresponding OutputUserConfig.
225 Enforces lower-case lookup.
227 Will raise KeyError if this fails.
228 """
229 return self._outputtypemap[output_type.lower()]
231 def _standard_columns_if_gate(self) -> List[Column]:
232 """
233 Returns standard columns for GATE output if ``self.format`` is GATE.
234 Returns an empty list otherwise.
235 """
236 if self.format == NlpDefValues.FORMAT_GATE:
237 return self._standard_gate_columns()
238 else:
239 return []
241 def _standard_indexes_if_gate(self, dest_tablename: str) -> List[Index]:
242 """
243 Returns standard indexes for GATE output if ``self.format`` is GATE.
244 Returns an empty list otherwise.
245 """
246 if self.format == NlpDefValues.FORMAT_GATE:
247 return self._standard_gate_indexes(dest_tablename)
248 else:
249 return []
251 def _confirm_available(self, available: bool = True) -> None:
252 """
253 Set the attribute 'available_remotely', which indicates whether
254 a requested processor is actually available from the specified server.
255 """
256 self.available_remotely = available
258 def set_procinfo_if_correct(
259 self, remote_processor: ServerProcessor
260 ) -> None:
261 """
262 Checks if a processor dictionary, with all the NLPLP-specified info
263 a processor should have, belongs to this processor. If it does, then
264 we add the information from the procesor dictionary.
265 """
266 if self.procname != remote_processor.name:
267 return
268 if (remote_processor.is_default_version and not self.procversion) or (
269 self.procversion == remote_processor.version
270 ):
271 self._set_processor_info(remote_processor)
273 def _set_processor_info(self, remote_processor: ServerProcessor) -> None:
274 """
275 Add the information from a processor dictionary. If it contains
276 table information, this allows us to create the correct tables when
277 the time comes.
278 """
279 # This won't be called unless the remote processor is available
280 self._confirm_available()
281 self.remote_processor_info = remote_processor
282 # self.name = processor_dict[NKeys.NAME]
283 self.schema_type = remote_processor.schema_type
284 if remote_processor.is_tabular():
285 self.schema = remote_processor.tabular_schema
286 self.sql_dialect = remote_processor.sql_dialect
287 # Check that, by this stage, we either have a tabular schema from
288 # the processor, or we have user-specified destfields
289 assert self.is_tabular() or all(
290 x.destfields for x in self._outputtypemap.values()
291 ), (
292 "You haven't specified a table structure and the processor hasn't "
293 "provided one."
294 )
296 def dest_tables_columns(self) -> Dict[str, List[Column]]:
297 """
298 Describes the destination table(s) that this NLP processor wants to
299 write to.
301 Returns:
302 dict: a dictionary of ``{tablename: destination_columns}``, where
303 ``destination_columns`` is a list of SQLAlchemy :class:`Column`
304 objects.
306 If there is an NLPRP remote table specification (tabular_schema
307 method), we start with that.
309 Then we add any user-defined tables. If there is both a remote
310 definition and a local definition, the local definition overrides the
311 remote definition. If the destination table info has no columns,
312 however, it is not used for table creation.
314 There may in principle be other tables too in the local config that are
315 absent in the remote info (unusual!).
316 """
317 table_columns = {} # type: Dict[str, List[Column]]
319 # 1. NLPRP remote specification.
320 if self.is_tabular():
321 for remote_tablename, columndefs in self.schema.items():
322 # We may start with predefined GATE columns (but this might
323 # return an empty list). We'll then add to it, if additional
324 # information is provided.
325 column_objects = [] # type: List[Column]
326 dest_tname = self.get_local_from_remote_tablename(
327 remote_tablename
328 )
329 column_renames = self.get_otconf_from_type(
330 remote_tablename
331 ).renames
332 for column_info in columndefs:
333 colname = column_info[NlprpKeys.COLUMN_NAME]
334 # Rename (or keep the same if no applicable rename):
335 colname = column_renames.get(colname, colname)
336 col_str, parameter = self.get_coltype_parts(
337 column_info[NlprpKeys.COLUMN_TYPE]
338 )
339 data_type_str = column_info[NlprpKeys.DATA_TYPE]
340 # We could use col_str or data_type_str here.
341 coltype = self.data_type_str_to_coltype(data_type_str)
342 column_objects.append(
343 Column(
344 name=colname,
345 type_=coltype(parameter) if parameter else coltype,
346 comment=column_info.get(NlprpKeys.COLUMN_COMMENT),
347 nullable=column_info[NlprpKeys.IS_NULLABLE],
348 )
349 )
350 if not column_objects:
351 raise ValueError(
352 "Remote error: NLPRP server declares table "
353 f"{remote_tablename!r} but provides no column "
354 "information for it."
355 )
356 table_columns[dest_tname] = column_objects
358 # 2. User specification.
359 for output_type, otconfig in self._outputtypemap.items():
360 if otconfig.destfields:
361 # The user has specified columns.
362 table_columns[
363 otconfig.dest_tablename
364 ] = self._standard_columns_if_gate() + otconfig.get_columns(
365 self.dest_engine
366 )
367 else:
368 # The user has noted the existence of the table, but hasn't
369 # specified columns.
370 if otconfig.dest_tablename not in table_columns:
371 raise ValueError(
372 f"Local table {otconfig.dest_tablename!r} has no "
373 "remote definition, and no columns are defined for it "
374 "in the config file either."
375 )
376 # Otherwise: defined remotely, with no local detail; that's OK.
377 continue
379 # Done.
380 return table_columns
382 def dest_tables_indexes(self) -> Dict[str, List[Index]]:
383 """
384 Describes indexes that this NLP processor suggests for its destination
385 table(s).
387 Returns:
388 dict: a dictionary of ``{tablename: indexes}``, where ``indexes``
389 is a list of SQLAlchemy :class:`Index` objects.
391 The NLPRP remote table specification doesn't include indexing. So all
392 indexing information is from our config file, whether for GATE or
393 cloud processors.
394 """
395 table_indexes = {} # type: Dict[str, List[Index]]
396 for output_type, otconfig in self._outputtypemap.items():
397 dest_tablename = otconfig.dest_tablename
398 table_indexes[dest_tablename] = (
399 self._standard_indexes_if_gate(dest_tablename)
400 + otconfig.indexes
401 )
402 return table_indexes