Coverage for crateweb/research/views.py: 12%
1338 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
« prev ^ index » next coverage.py v7.8.0, created at 2025-08-27 10:34 -0500
1"""
2crate_anon/crateweb/research/views.py
4===============================================================================
6 Copyright (C) 2015, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CRATE.
11 CRATE is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CRATE is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CRATE. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**CRATE views on the research database.**
28"""
30import copy
31import datetime
32import json
33import logging
34from os.path import basename
35from typing import Any, Dict, Iterable, List, Sequence, Type, Union, Optional
37from cardinal_pythonlib.typing_helpers import Pep249DatabaseCursorType
38from cardinal_pythonlib.dbfunc import get_fieldnames_from_cursor
39from cardinal_pythonlib.django.function_cache import django_cache_function
40from cardinal_pythonlib.django.serve import file_response, serve_file
41from cardinal_pythonlib.exceptions import recover_info_from_exception
42from cardinal_pythonlib.hash import hash64
43from cardinal_pythonlib.httpconst import ContentType
44from cardinal_pythonlib.logs import BraceStyleAdapter
45from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
46from cardinal_pythonlib.psychiatry.drugs import Drug, all_drugs_where
47from django import forms
48from django.conf import settings
49from django.contrib.auth.decorators import user_passes_test
50from django.core.exceptions import (
51 ObjectDoesNotExist,
52 ValidationError,
53)
54from django.db import DatabaseError, ProgrammingError
55from django.db.models import Q, QuerySet
56from django.http.response import (
57 HttpResponse,
58 HttpResponseBase,
59 HttpResponseBadRequest,
60 HttpResponseRedirect,
61)
62from django.http.request import HttpRequest
63from django.shortcuts import get_object_or_404, redirect, render
64from django.template.loader import render_to_string
65from django.urls import reverse
66from django.utils.html import escape
67from django.views.decorators.cache import cache_control
68from mako.exceptions import TemplateLookupException
69from pyparsing import ParseException
71from crate_anon.common.constants import JSON_SEPARATORS_COMPACT
73from crate_anon.common.sql import (
74 ColumnId,
75 escape_sql_string_literal,
76 escape_sql_string_or_int_literal,
77 SQL_OPS_MULTIPLE_VALUES,
78 SQL_OPS_VALUE_UNNECESSARY,
79 TableId,
80 toggle_distinct,
81 WhereCondition,
82)
83from crate_anon.crateweb.config.constants import UrlNames, UrlKeys
84from crate_anon.crateweb.core.utils import (
85 guess_mimetype,
86 is_clinician,
87 is_superuser,
88 paginate,
89)
90from crate_anon.crateweb.research.archive_backend import (
91 archive_attachment_url,
92 ARCHIVE_CONTEXT,
93 ARCHIVE_IS_CONFIGURED,
94 archive_mako_lookup,
95 archive_misconfigured_response,
96 archive_root_url,
97 archive_static_url,
98 archive_template_url,
99 ArchiveContextKeys,
100 audit_archive_attachment,
101 audit_archive_template,
102 CACHE_CONTROL_MAX_AGE_ARCHIVE_ATTACHMENTS,
103 CACHE_CONTROL_MAX_AGE_ARCHIVE_STATIC,
104 CACHE_CONTROL_MAX_AGE_ARCHIVE_TEMPLATES,
105 DEFAULT_GUESS_CONTENT_TYPE,
106 get_archive_attachment_filepath,
107 get_archive_static_filepath,
108)
109from crate_anon.crateweb.research.errors import DatabaseStructureNotUnderstood
110from crate_anon.crateweb.research.forms import (
111 AddHighlightForm,
112 AddQueryForm,
113 ClinicianAllTextFromPidForm,
114 DatabasePickerForm,
115 DEFAULT_MIN_TEXT_FIELD_LENGTH,
116 FieldPickerInfo,
117 ManualPeQueryForm,
118 PidLookupForm,
119 QueryBuilderForm,
120 RidLookupForm,
121 SQLHelperTextAnywhereForm,
122 SQLHelperFindAnywhereForm,
123 SQLHelperDrugTypeForm,
124)
125from crate_anon.crateweb.research.html_functions import (
126 highlight_text,
127 HtmlElementCounter,
128 make_result_element,
129 make_collapsible_sql_query,
130 N_CSS_HIGHLIGHT_CLASSES,
131 prettify_sql_css,
132 prettify_sql_html,
133 prettify_sql_and_args,
134)
135from crate_anon.crateweb.research.models import (
136 get_executed_researchdb_cursor,
137 get_executed_researchdb_cursor_qmark_placeholders,
138 Highlight,
139 PatientExplorer,
140 PatientMultiQuery,
141 PidLookup,
142 Query,
143 SitewideQuery,
144)
145from crate_anon.crateweb.research.research_db_info import (
146 PatientFieldPythonTypes,
147 get_research_db_info,
148 SingleResearchDatabase,
149)
150from crate_anon.crateweb.research.sql_writer import (
151 add_to_select,
152 SelectElement,
153)
154from crate_anon.crateweb.userprofile.models import (
155 get_patients_per_page,
156 UserProfile,
157)
158from crate_anon.nlp_manager.constants import (
159 # Fieldnames for CRATE NLP table
160 FN_NLPDEF,
161 FN_SRCDB,
162 FN_SRCFIELD,
163 FN_SRCPKFIELD,
164 FN_SRCPKSTR,
165 FN_SRCPKVAL,
166 FN_SRCTABLE,
167)
169log = BraceStyleAdapter(logging.getLogger(__name__))
172# =============================================================================
173# Constants
174# =============================================================================
176# Maximum number of characters to show of a query in html
177MAX_LEN_SHOW = 20000
180# Prefix for inline pid and mpid conversion
181PID_PREFIX = "~pid"
182MPID_PREFIX = "~mpid"
185# =============================================================================
186# Helper functions
187# =============================================================================
190def validate_blank_form(request: HttpRequest) -> None:
191 """
192 Checks that the request is (a) a POST request, and (b) passes CRSF
193 validation.
195 Args:
196 request: the :class:`django.http.request.HttpRequest`
198 Raises:
199 :exc:`django.core.exceptions.ValidationError` if it fails
201 """
202 if request.method != "POST":
203 raise ValidationError("Use HTTP POST, not HTTP GET or other methods")
204 form = forms.Form(request.POST)
205 if not form.is_valid(): # checks CSRF
206 raise ValidationError("Form failed validation")
209def query_context(request: HttpRequest) -> Dict[str, Any]:
210 """
211 Query context dictionary used for (nearly?) *every* request.
213 Args:
214 request: the :class:`django.http.request.HttpRequest`
216 Returns:
217 dict: a dictionary with core information about the request, like the
218 currently selected query/Patient Explorer ID for the user.
220 Notes:
222 - Try to minimize SQL here, as these calls will be used for EVERY
223 request.
225 - This problem can be circumvented with a per-request cache; see
226 https://stackoverflow.com/questions/3151469/per-request-cache-in-django
228 """
229 query_id = Query.get_active_query_id_or_none(request)
230 pe_id = PatientExplorer.get_active_pe_id_or_none(request)
231 return {
232 "query_selected": query_id is not None,
233 "current_query_id": query_id,
234 "pe_selected": pe_id is not None,
235 "current_pe_id": pe_id,
236 }
239def datetime_iso_for_filename() -> str:
240 """
241 Returns a date/time as a string formatted for filenames.
242 """
243 dtnow = datetime.datetime.now()
244 return dtnow.strftime("%Y%m%d_%H%M%S")
247# =============================================================================
248# Errors
249# =============================================================================
252def generic_error(request: HttpRequest, error: str) -> HttpResponse:
253 """
254 Returns a generic error response.
256 Args:
257 request: the :class:`django.http.request.HttpRequest`
258 error: the error text
260 Returns:
261 a :class:`django.http.response.HttpResponse`
263 """
264 context = {
265 "error": error,
266 }
267 return render(request, "generic_error.html", context)
270# =============================================================================
271# Queries
272# =============================================================================
275@django_cache_function(timeout=None)
276# @lru_cache(maxsize=None)
277def get_db_structure_json() -> str:
278 """
279 Returns the research database structure in JSON format.
280 """
281 log.debug("get_db_structure_json")
282 research_database_info = get_research_db_info()
283 colinfolist = research_database_info.get_colinfolist()
284 if not colinfolist:
285 log.warning("get_db_structure_json(): colinfolist is empty")
286 info = [] # type: List[Dict[str, Any]]
287 for dbinfo in research_database_info.dbinfolist:
288 log.info(f"get_db_structure_json: schema {dbinfo.schema_identifier}")
289 if not dbinfo.eligible_for_query_builder:
290 log.debug(
291 f"Skipping schema={dbinfo.schema_identifier}: "
292 f"not eligible for query builder"
293 )
294 continue
295 schema_cil = [
296 x
297 for x in colinfolist
298 if x.table_catalog == dbinfo.database
299 and x.table_schema == dbinfo.schema_name
300 ]
301 table_info = [] # type: List[Dict[str, Any]]
302 for table in sorted(set(x.table_name for x in schema_cil)):
303 table_cil = [x for x in schema_cil if x.table_name == table]
304 if not any(
305 x for x in table_cil if x.column_name == dbinfo.trid_field
306 ):
307 # This table doesn't contain a TRID, so we will skip it.
308 log.debug(
309 f"... skipping table {table}: "
310 f"no TRID [{dbinfo.trid_field}]"
311 )
312 continue
313 if not any(
314 x for x in table_cil if x.column_name == dbinfo.rid_field
315 ):
316 # This table doesn't contain a RID, so we will skip it.
317 log.debug(
318 f"... skipping table {table}: "
319 f"no RID [{dbinfo.rid_field}]"
320 )
321 continue
322 column_info = [] # type: List[Dict[str, str]]
323 for ci in sorted(table_cil, key=lambda x: x.column_name):
324 column_info.append(
325 {
326 "colname": ci.column_name,
327 "coltype": ci.querybuilder_type,
328 "rawtype": ci.column_type,
329 "comment": ci.column_comment or "",
330 }
331 )
332 if column_info:
333 table_info.append(
334 {
335 "table": table,
336 "columns": column_info,
337 }
338 )
339 log.debug(f"... using table {table}: {len(column_info)} columns")
340 if table_info:
341 info.append(
342 {
343 "database": dbinfo.database,
344 "schema": dbinfo.schema_name,
345 "tables": table_info,
346 }
347 )
348 json_result = json.dumps(info, separators=JSON_SEPARATORS_COMPACT)
349 log.debug(
350 f"... get_db_structure_json returning string of size "
351 f"{len(json_result)}"
352 )
353 return json_result
356def query_build(request: HttpRequest) -> HttpResponse:
357 """
358 Assisted query builder, based on the data structure read from the research
359 database.
361 Args:
362 request: the :class:`django.http.request.HttpRequest`
364 Returns:
365 a :class:`django.http.response.HttpResponse`
366 """
367 # NOTES FOR FIRST METHOD, with lots (and lots) of forms.
368 # - In what follows, we want a normal template but we want to include a
369 # large chunk of raw HTML. I was doing this with
370 # {{ builder_html | safe }} within the template, but it was very slow
371 # (e.g. 500ms on my machine; 50s on the CPFT "sandpit" server,
372 # 2016-06-28). The delay was genuinely in the template rendering, it
373 # seems, based on profiling and manual log calls.
374 # - A simple string replacement, as below, was about 7% of the total time
375 # (e.g. 3300ms instead of 50s).
376 # - Other alternatives might include the Jinja2 template system, which is
377 # apparently faster than the Django default, but we may not need further
378 # optimization.
379 # - Another, potentially better, solution, is not to send dozens or
380 # hundreds of forms, but to write some Javascript to make this happen
381 # mostly on the client side. Might look better, too. (Yes, it does.)
383 # NB: first "submit" button takes the Enter key, so place WHERE
384 # before SELECT so users can hit enter in the WHERE value fields.
386 # - If you provide the "request=request" argument to
387 # render_to_string it gives you the CSRF token.
388 # - Another way is to ignore "request" and use render_to_string
389 # with a manually crafted context including 'csrf_token'.
390 # (This avoids the global context processors.)
391 # - Note that the CSRF token prevents simple caching of the forms.
392 # - But we can't cache anyway if we're going to have some forms
393 # (differentially) non-collapsed at the start, e.g. on form POST.
394 # - Also harder work to do this HTML manually (rather than with
395 # template rendering), because the csrf_token ends up like:
396 # <input type='hidden' name='csrfmiddlewaretoken' value='RGN5UZnTVkLFAVNtXRpJwn5CclBRAdLr' /> # noqa: E501
398 # noinspection PyUnresolvedReferences
399 profile = request.user.profile # type: UserProfile
400 parse_error = ""
401 research_database_info = get_research_db_info()
402 default_database = research_database_info.get_default_database_name()
403 default_schema = research_database_info.get_default_schema_name()
404 with_database = research_database_info.uses_database_level()
405 form = None
407 if request.method == "POST":
408 grammar = research_database_info.grammar
409 try:
410 if "global_clear" in request.POST:
411 profile.sql_scratchpad = ""
412 profile.save()
414 elif "global_toggle_distinct" in request.POST:
415 profile.sql_scratchpad = toggle_distinct(
416 profile.sql_scratchpad, grammar=grammar
417 )
418 profile.save()
420 elif "global_save" in request.POST:
421 return query_submit(request, profile.sql_scratchpad, run=False)
423 elif "global_run" in request.POST:
424 return query_submit(request, profile.sql_scratchpad, run=True)
426 else:
427 form = QueryBuilderForm(request.POST, request.FILES)
428 if form.is_valid():
429 database = (
430 form.cleaned_data["database"] if with_database else ""
431 )
432 schema = form.cleaned_data["schema"]
433 table = form.cleaned_data["table"]
434 column = form.cleaned_data["column"]
435 column_id = ColumnId(
436 db=database, schema=schema, table=table, column=column
437 )
438 table_id = column_id.table_id
440 if "submit_select" in request.POST:
441 profile.sql_scratchpad = add_to_select(
442 profile.sql_scratchpad,
443 select_elements=[
444 SelectElement(column_id=column_id)
445 ],
446 magic_join=True,
447 grammar=grammar,
448 )
450 elif "submit_select_star" in request.POST:
451 select_elements = [
452 SelectElement(column_id=c.column_id)
453 for c in research_database_info.all_columns(
454 table_id
455 )
456 ]
457 profile.sql_scratchpad = add_to_select(
458 profile.sql_scratchpad,
459 select_elements=select_elements,
460 magic_join=True,
461 grammar=grammar,
462 )
464 elif "submit_where" in request.POST:
465 datatype = form.cleaned_data["datatype"]
466 op = form.cleaned_data["where_op"]
467 # Value
468 if op in SQL_OPS_MULTIPLE_VALUES:
469 value = form.file_values_list
470 elif op in SQL_OPS_VALUE_UNNECESSARY:
471 value = None
472 else:
473 value = form.get_cleaned_where_value()
474 # WHERE fragment
475 wherecond = WhereCondition(
476 column_id=column_id,
477 op=op,
478 datatype=datatype,
479 value_or_values=value,
480 )
481 profile.sql_scratchpad = add_to_select(
482 profile.sql_scratchpad,
483 where_type="AND",
484 where_conditions=[wherecond],
485 magic_join=True,
486 grammar=grammar,
487 )
489 else:
490 raise ValueError("Bad form command!")
491 profile.save()
493 else:
494 pass
496 except (ParseException, DatabaseStructureNotUnderstood) as e:
497 parse_error = str(e)
499 if form is None:
500 form = QueryBuilderForm()
502 starting_values_dict = {
503 "database": form.data.get("database", "") if with_database else "",
504 "schema": form.data.get("schema", ""),
505 "table": form.data.get("table", ""),
506 "column": form.data.get("column", ""),
507 "op": form.data.get("where_op", ""),
508 "date_value": form.data.get("date_value", ""),
509 # Impossible to set file_value programmatically. (See querybuilder.js.)
510 "float_value": form.data.get("float_value", ""),
511 "int_value": form.data.get("int_value", ""),
512 "string_value": form.data.get("string_value", ""),
513 "offer_where": bool(profile.sql_scratchpad), # existing SELECT?
514 "form_errors": "<br>".join(
515 f"{k}: {v}" for k, v in form.errors.items()
516 ),
517 "default_database": default_database,
518 "default_schema": default_schema,
519 "with_database": with_database,
520 }
521 context = {
522 "nav_on_querybuilder": True,
523 "sql": prettify_sql_html(profile.sql_scratchpad),
524 "parse_error": parse_error,
525 "database_structure": get_db_structure_json(),
526 "starting_values": json.dumps(
527 starting_values_dict, separators=JSON_SEPARATORS_COMPACT
528 ),
529 "sql_dialect": settings.RESEARCH_DB_DIALECT,
530 "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL,
531 "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL,
532 "sql_highlight_css": prettify_sql_css(),
533 }
534 context.update(query_context(request))
535 return render(request, "query_build.html", context)
538def get_all_queries(request: HttpRequest) -> QuerySet:
539 """
540 Return all database queries for the current user.
542 Args:
543 request: the :class:`django.http.request.HttpRequest`
545 Returns:
546 request: a :class:`django.db.models.QuerySet` for
547 :class:`crate_anon.crateweb.research.models.Query` objects
549 """
550 return Query.objects.filter(user=request.user, deleted=False).order_by(
551 "-active", "-created"
552 )
555def get_all_sitewide_queries() -> QuerySet:
556 """
557 Returns all site-wide queries.
559 Returns:
560 request: a :class:`django.db.models.QuerySet` for
561 :class:`crate_anon.crateweb.research.models.SitewideQuery` objects
563 """
564 return SitewideQuery.objects.filter(deleted=False).order_by("-created")
567def get_identical_queries(
568 request: HttpRequest, sql: str, sitewide: bool = False
569) -> List[Query]:
570 """
571 Returns all queries that are identical to the SQL provided.
573 This saves us creating a new query when one exists already that's
574 identical.
576 We check by hash.
578 Args:
579 request: the :class:`django.http.request.HttpRequest`
580 sql: SQL text
581 sitewide: check sitewide, rather than user-specific, queries?
583 Returns:
584 list: :class:`crate_anon.crateweb.research.models.Query` objects
586 """
587 if sitewide:
588 all_queries = get_all_sitewide_queries()
589 else:
590 all_queries = get_all_queries(request)
592 # identical_queries = all_queries.filter(sql=sql)
593 #
594 # - 2017-02-03: we had a problem here, in which the parameter was sent to
595 # SQL Server as type NTEXT, but the field "sql" is NVARCHAR(MAX), leading
596 # to "The data types nvarchar(max) and ntext are incompatible in the
597 # equal to operator."
598 # - The Django field type TextField is converted to NVARCHAR(MAX) by
599 # django-pyodbc-azure, in sql_server/pyodbc/base.py, also at [1].
600 # - That seems fine; NVARCHAR(MAX) seems more capable than NTEXT.
601 # NTEXT is deprecated.
602 # - Error is reproducible with
603 # ... WHERE sql = CAST('hello' AS NTEXT) ...
604 # - The order of the types in the error message matches the order in the
605 # SQL statement.
606 # - A solution would be to cast the parameter as
607 # CAST(some_parameter AS NVARCHAR(MAX))
608 # - Fixed by upgrading pyodbc from 3.1.1 to 4.0.3
609 # - Added to FAQ
610 # - WARNING: the problem came back with pyodbc==4.0.6, but not fixed again
611 # by downgrading to 4.0.3
612 # - See also [2].
613 # - An alternative solution would not be to compare on the long text, but
614 # store and compare on a hash of it.
615 # - The problem is that either pyodbc or ODBC itself, somehow, is sending
616 # the string parameter as NTEXT.
617 # Similar Perl problem: [3].
618 #
619 # - In pyodbc, the key functions are:
620 # cursor.cpp: static PyObject* execute(...)
621 # -> params.cpp: bool PrepareAndBind(...)
622 # -> GetParameterInfo // THIS ONE
623 # Parameter will be of type str.
624 # This will fail for PyBytes_Check [4].
625 # This will match for PyUnicode_Check [5].
626 # Thus:
627 # -> GetUnicodeInfo
628 # ... and depending on the string length of the
629 # parameter, this returns either
630 # SQL_WVARCHAR -> NVARCHAR on SQL Server [6], for short strings # noqa: E501
631 # SQL_WLONGVARCHAR -> NTEXT on SQL Server [6], for long strings # noqa: E501
632 # ... and the length depends on
633 # -> connection.h: cur->cnxn->GetMaxLength(info.ValueType); # noqa: E501
634 # -> BindParameter
635 # in cursor.cpp
636 #
637 # - Now we also have pyodbc docs: [7].
638 #
639 # - Anyway, the upshot is that there is some unpredictabilty in sending
640 # very long parameters... the intermittency would be explained by some
641 # dependency on string length.
642 # - Empirically, it fails somewhere around 1,900 characters.
643 #
644 # - Could switch away from pyodbc, e.g. to Django-mssql [8, 9].
645 # But, as per the CRATE manual, there were version incompatibilities
646 # here. Tried again with v1.8, but it gave configuration errors
647 # (ADODB.Connection; Provider cannot be found. It may not be properly
648 # installed.) Anyway, pyodbc is good enough for SQLAlchemy.
649 #
650 # [1] https://github.com/michiya/django-pyodbc-azure/blob/azure-1.10/sql_server/pyodbc/base.py # noqa: E501
651 # [2] https://github.com/mkleehammer/pyodbc/blob/master/tests2/informixtests.py # noqa: E501
652 # [3] https://stackoverflow.com/questions/13090907
653 # [4] https://docs.python.org/3/c-api/bytes.html
654 # [5] https://docs.python.org/3/c-api/unicode.html
655 # [6] https://documentation.progress.com/output/DataDirect/DataDirectCloud/index.html#page/queries/microsoft-sql-server-data-types.html # noqa: E501
656 # [7] https://github.com/mkleehammer/pyodbc/wiki/Data-Types
657 # [8] https://docs.djangoproject.com/en/1.10/ref/databases/#using-a-3rd-party-database-backend # noqa: E501
658 # [9] https://django-mssql.readthedocs.io/en/latest/
660 # Screw it, let's use a hash. We can use our hash64() function and
661 # a Django BigIntegerField.
663 identical_queries = all_queries.filter(sql_hash=hash64(sql))
664 # Now eliminate any chance of errors via hash collisions by double-checking
665 # the Python objects:
666 return [q for q in identical_queries if q.sql == sql]
669# noinspection PyUnusedLocal
670@user_passes_test(is_clinician)
671def parse_privileged_sql(request: HttpRequest, sql: str) -> List[Any]:
672 """
673 Parses clinicians' queries to find rid from pid.
675 SQL should be, e.g.:
676 'SELECT * FROM tablename WHERE ~pid:dbname = pidnumber' or
677 'SELECT * FROM tablename WHERE ~mpid:dbname IN (value1, value2, ...)'
678 where dbname has the secret lookup table the user wants to use.
680 Args:
681 request: the :class:`django.http.request.HttpRequest`
682 sql: SQL text
684 Returns:
685 [bool, str] where the bool is success (0) or failure (1).
686 If success, the str is the new sql and if failure it's the
687 error message.
688 """
689 sql_components = sql.split()
690 new_sql = ""
691 i = 0
692 research_database_info = get_research_db_info()
693 while i < len(sql_components):
694 split_component = sql_components[i].split(":")
695 if len(split_component) == 2 and (
696 split_component[0] == PID_PREFIX
697 or split_component[0] == MPID_PREFIX
698 ):
699 id_type, dbname = split_component
700 try:
701 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
702 except ValueError:
703 return [1, f"No such database with name '{dbname}'"]
704 if not dbinfo.secret_lookup_db:
705 return [1, f"Database '{dbname}' has no pid to rid lookup"]
706 rid_field = dbinfo.rid_field
707 i += 1
708 try:
709 operator = sql_components[i]
710 except IndexError:
711 return [1, "No operator given"]
712 if operator.upper() == "IN":
713 i += 1
714 try:
715 if not sql_components[i].startswith("("):
716 return [
717 1,
718 "When using the operator 'IN', values "
719 "must be enclosed with brackets",
720 ]
721 except IndexError:
722 return [1, "Missing values in clause"]
723 values = []
724 at_end = False
725 while not at_end:
726 try:
727 current = sql_components[i]
728 except IndexError:
729 return [1, "Final bracket missing in list of values"]
730 values.extend(
731 [
732 x
733 for x in current.replace("(", "")
734 .replace(")", "")
735 .split(",")
736 if x
737 ]
738 )
739 if ")" in current or i >= len(sql_components):
740 at_end = True
741 i += 1
742 if id_type == MPID_PREFIX:
743 lookups = PidLookup.objects.using(
744 dbinfo.secret_lookup_db
745 ).filter(Q(mpid__in=values))
746 else:
747 lookups = PidLookup.objects.using(
748 dbinfo.secret_lookup_db
749 ).filter(Q(pid__in=values))
750 rids = [lk.rid for lk in lookups]
751 if rids:
752 rids = [f"'{rid}'" for rid in rids]
753 extra_sql = ",".join(rids)
754 new_sql += f"{rid_field} IN ({extra_sql}) "
755 else:
756 new_sql += f"{rid_field} = ''"
757 elif operator == "=":
758 i += 1
759 try:
760 value = sql_components[i]
761 except IndexError:
762 return [1, "Missing value in clause"]
763 i += 1
764 if id_type == MPID_PREFIX:
765 lookup = (
766 PidLookup.objects.using(dbinfo.secret_lookup_db)
767 .filter(mpid=value)
768 .first()
769 )
770 else:
771 lookup = (
772 PidLookup.objects.using(dbinfo.secret_lookup_db)
773 .filter(pid=value)
774 .first()
775 )
776 rid = "" if not lookup else lookup.rid
777 new_sql += f"{rid_field} {operator} '{rid}' "
778 else:
779 return [
780 1,
781 "pid and mpid conversion does not work with "
782 f"operator '{operator}', only with operators '=' "
783 "and 'IN'.",
784 ]
785 else:
786 new_sql += f"{sql_components[i]} "
787 i += 1
788 # Remove trailing space
789 new_sql = new_sql.strip()
790 return [0, new_sql]
793def query_submit(
794 request: HttpRequest,
795 sql: str,
796 run: bool = False,
797 filter_display: bool = False,
798) -> HttpResponse:
799 """
800 Ancillary function to add a query, and redirect to the editing or
801 run page.
803 Args:
804 request: the :class:`django.http.request.HttpRequest`
805 sql: SQL text
806 run: execute the query and show the results? Otherwise, save the
807 query and return to the editing page
808 filter_display: after saving the query, redirect to the filter page?
810 Returns:
811 a :class:`django.http.response.HttpResponse`
812 """
813 if is_clinician(request.user):
814 parsed_sql = parse_privileged_sql(request, sql)
815 if not parsed_sql[0]:
816 sql = parsed_sql[1]
817 else:
818 return generic_error(request, parsed_sql[1])
819 elif PID_PREFIX in sql or MPID_PREFIX in sql:
820 return generic_error(
821 request,
822 "Only clinicians are authorised to use " "pid to rid conversion",
823 )
824 identical_queries = get_identical_queries(request, sql)
825 if identical_queries:
826 identical_queries[0].activate()
827 query_id = identical_queries[0].id
828 else:
829 query = Query(sql=sql, raw=True, user=request.user, active=True)
830 query.save()
831 query_id = query.id
832 # redirect to a new URL:
833 if run:
834 return redirect(UrlNames.RESULTS, query_id)
835 elif filter_display:
836 return redirect(UrlNames.EDIT_DISPLAY, query_id)
837 else:
838 return redirect(UrlNames.QUERY)
841def show_query(request: HttpRequest, query_id: str) -> HttpResponse:
842 query = get_object_or_404(Query, id=query_id)
843 context = {
844 "query": query,
845 "sql_highlight_css": prettify_sql_css(),
846 }
847 return render(request, "query_show.html", context)
850# @do_cprofile
851def query_edit_select(request: HttpRequest) -> HttpResponse:
852 """
853 View to edit SQL for the current ``SELECT`` query (and/or run it).
855 Args:
856 request: the :class:`django.http.request.HttpRequest`
858 Returns:
859 a :class:`django.http.response.HttpResponse`
860 """
861 # log.debug("query")
862 # if this is a POST request we need to process the form data
863 if request.method == "POST":
864 # create a form instance and populate it with data from the request:
865 form = AddQueryForm(request.POST)
866 # check whether it's valid:
867 if form.is_valid():
868 cmd_run = "submit_run" in request.POST
869 cmd_add = "submit_add" in request.POST
870 cmd_builder = "submit_builder" in request.POST
871 cmd_filter = "submit_filter" in request.POST
872 # process the data in form.cleaned_data as required
873 sql = form.cleaned_data["sql"]
874 if cmd_add or cmd_run:
875 run = "submit_run" in request.POST
876 return query_submit(request, sql, run)
877 elif cmd_builder:
878 # noinspection PyUnresolvedReferences
879 profile = request.user.profile # type: UserProfile
880 profile.sql_scratchpad = sql
881 profile.save()
882 return redirect(UrlNames.BUILD_QUERY)
883 elif cmd_filter:
884 # If filtering, also add the query
885 return query_submit(request, sql, filter_display=True)
886 else:
887 raise ValueError("Bad command!")
889 # if a GET (or any other method) we'll create a blank form
890 values = {} # type: Dict[str, Any]
891 all_queries = get_all_queries(request)
892 active_queries = all_queries.filter(active=True)
893 if active_queries:
894 values["sql"] = active_queries[0].get_original_sql()
895 form = AddQueryForm(values)
896 queries = paginate(request, all_queries)
897 # noinspection PyUnresolvedReferences
898 profile = request.user.profile # type: UserProfile
899 element_counter = HtmlElementCounter()
900 for q in queries:
901 # Format sql only if it hasn't been done already
902 if not q.get_formatted_sql():
903 q.save() # calls 'set_formatted_sql'
904 q.formatted_query_safe = make_collapsible_sql_query(
905 q.get_formatted_sql(),
906 element_counter=element_counter,
907 collapse_at_n_lines=profile.collapse_at_n_lines,
908 )
909 sql = q.get_original_sql()
910 if len(sql) < MAX_LEN_SHOW:
911 q.truncated_sql = None
912 else:
913 # Have to use plain sql for this (not coloured) in case it cuts it
914 # off after an html start tag but before the end tag
915 q.truncated_sql = sql[:50]
916 research_database_info = get_research_db_info()
917 context = {
918 "form": form,
919 "queries": queries,
920 "nav_on_query": True,
921 "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL,
922 "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL,
923 "sql_highlight_css": prettify_sql_css(),
924 "dbinfolist": (
925 None
926 if not is_clinician(request.user)
927 else research_database_info.dbinfolist
928 ),
929 }
930 context.update(query_context(request))
931 return render(request, "query_edit_select.html", context)
934@user_passes_test(is_superuser)
935def query_add_sitewide(request: HttpRequest) -> HttpResponse:
936 """
937 Superuser view to add or edit sitewide queries and their descriptions.
939 Args:
940 request: the :class:`django.http.request.HttpRequest`
942 Returns:
943 a :class:`django.http.response.HttpResponse`
944 """
945 if "submit_add" in request.POST:
946 sql = request.POST["sql"]
947 description = request.POST["description"]
948 identical_queries = get_identical_queries(request, sql, sitewide=True)
949 # noinspection PyUnresolvedReferences
950 descriptions = [query.description for query in identical_queries]
951 if not identical_queries:
952 query = SitewideQuery(sql=sql, description=description, raw=True)
953 query.save()
954 elif description not in descriptions:
955 # noinspection PyUnresolvedReferences
956 identical_queries[0].description = description
957 identical_queries[0].save()
958 all_queries = get_all_sitewide_queries()
959 queries = paginate(request, all_queries)
960 # noinspection PyUnresolvedReferences
961 profile = request.user.profile # type: UserProfile
962 element_counter = HtmlElementCounter()
963 for q in queries:
964 # Format sql only if it hasn't been done already
965 if not q.get_formatted_sql():
966 q.save()
967 q.formatted_query_safe = make_collapsible_sql_query(
968 q.get_formatted_sql(),
969 element_counter=element_counter,
970 collapse_at_n_lines=profile.collapse_at_n_lines,
971 )
972 if "edit" in request.POST:
973 query_id = request.POST["query_id"]
974 query = SitewideQuery.objects.get(id=query_id)
975 selected_sql = query.sql
976 selected_description = query.description
977 else:
978 selected_sql = ""
979 selected_description = ""
980 context = {
981 "queries": queries,
982 "selected_sql": selected_sql,
983 "selected_description": selected_description,
984 "sql_highlight_css": prettify_sql_css(),
985 }
986 return render(request, "query_add_sitewide.html", context)
989def show_sitewide_queries(request: HttpRequest) -> HttpResponse:
990 """
991 View to show all site-wide queries.
993 Args:
994 request: the :class:`django.http.request.HttpRequest`
996 Returns:
997 a :class:`django.http.response.HttpResponse`
999 """
1000 queries = get_all_sitewide_queries()
1001 context = {
1002 "queries": queries,
1003 "sql_highlight_css": prettify_sql_css(),
1004 }
1005 return render(request, "show_sitewide_queries.html", context)
1008def query_activate(request: HttpRequest, query_id: str) -> HttpResponse:
1009 """
1010 Activate the specified query for the current user.
1012 Args:
1013 request: the :class:`django.http.request.HttpRequest`
1014 query_id: string form of the integer PK of the
1015 :class:`crate_anon.crateweb.research.models.Query`
1017 Returns:
1018 a :class:`django.http.response.HttpResponse`
1019 """
1020 validate_blank_form(request)
1021 query = get_object_or_404(Query, id=query_id) # type: Query
1022 query.activate()
1023 return redirect(UrlNames.QUERY)
1026def query_delete(request: HttpRequest, query_id: str) -> HttpResponse:
1027 """
1028 Delete (or hide if required for audit purposes) the specified query for the
1029 current user.
1031 Args:
1032 request: the :class:`django.http.request.HttpRequest`
1033 query_id: string form of the integer PK of the
1034 :class:`crate_anon.crateweb.research.models.Query`
1036 Returns:
1037 a :class:`django.http.response.HttpResponse`
1038 """
1039 validate_blank_form(request)
1040 query = get_object_or_404(Query, id=query_id) # type: Query
1041 query.delete_if_permitted()
1042 return redirect(UrlNames.QUERY)
1045@user_passes_test(is_superuser)
1046def sitewide_query_delete(request: HttpRequest, query_id: str) -> HttpResponse:
1047 """
1048 Delete a site-wide query.
1050 Args:
1051 request: the :class:`django.http.request.HttpRequest`
1052 query_id: string form of the integer PK of the
1053 :class:`crate_anon.crateweb.research.models.SitewideQuery`
1055 Returns:
1056 a :class:`django.http.response.HttpResponse`
1058 Note:
1060 - When sitewide queries are used, their SQL is added to the user's
1061 personal libraries. All auditing therefore relates to users' personal
1062 query libraries. Sitewide queries cannot be executed "standalone".
1063 - As a result, we use a raw
1064 :meth:`crate_anon.crateweb.research.models.SitewideQuery.delete``, rather
1065 than the system used by
1066 :meth:`crate_anon.crateweb.research.models.Query.delete_if_permitted()`.
1068 """
1069 validate_blank_form(request)
1070 query = get_object_or_404(
1071 SitewideQuery, id=query_id
1072 ) # type: SitewideQuery
1073 query.delete()
1074 return redirect(UrlNames.SITEWIDE_QUERIES)
1077def sitewide_query_process(
1078 request: HttpRequest, query_id: str
1079) -> HttpResponse:
1080 """
1081 Takes a sitewide query ID and receives (through ``POST``) replacements for
1082 the placeholders. Then adds the code to user's personal library or adds and
1083 runs it.
1085 Args:
1086 request: the :class:`django.http.request.HttpRequest`
1087 query_id: string form of the integer PK of the
1088 :class:`crate_anon.crateweb.research.models.SitewideQuery`
1090 Returns:
1091 a :class:`django.http.response.HttpResponse`
1092 """
1093 validate_blank_form(request)
1094 cmd_add = "submit_add" in request.POST
1095 cmd_run = "submit_run" in request.POST
1096 if cmd_add or cmd_run:
1097 query = get_object_or_404(SitewideQuery, id=query_id)
1098 sql = ""
1099 for i, chunk in enumerate(query.sql_chunks):
1100 if i % 2 == 0:
1101 # add the original SQL - the even-numbered chunks
1102 sql += chunk
1103 else:
1104 # add SQL to replace the placeholders
1105 chunknum = f"chunk{i + 1}"
1106 if chunknum in request.POST:
1107 replacement = request.POST[chunknum]
1108 else:
1109 replacement = ""
1110 sql += replacement
1111 return query_submit(request, sql, run=cmd_run)
1112 else:
1113 return redirect(UrlNames.STANDARD_QUERIES)
1116def no_query_selected(request: HttpRequest) -> HttpResponse:
1117 """
1118 View to say "no query selected" when one should have been.
1120 Args:
1121 request: the :class:`django.http.request.HttpRequest`
1123 Returns:
1124 a :class:`django.http.response.HttpResponse`
1125 """
1126 return render(request, "query_none_selected.html", query_context(request))
1129def query_count(request: HttpRequest, query_id: str) -> HttpResponse:
1130 """
1131 View ``COUNT(*)`` from the specific query.
1133 Args:
1134 request: the :class:`django.http.request.HttpRequest`
1135 query_id: string form of the integer PK of the
1136 :class:`crate_anon.crateweb.research.models.Query`
1138 Returns:
1139 a :class:`django.http.response.HttpResponse`
1140 """
1141 if query_id is None:
1142 return no_query_selected(request)
1143 try:
1144 query_id = int(query_id)
1145 # ... conceivably might raise TypeError (from e.g. None), ValueError
1146 # (from e.g. "xyz"), but both should be filtered out by the URL parser
1147 query = Query.objects.get(id=query_id, user=request.user)
1148 # ... will return None if not found, but may raise something derived
1149 # from ObjectDoesNotExist or (in principle, if this weren't a PK)
1150 # MultipleObjectsReturned;
1151 # https://docs.djangoproject.com/en/1.9/ref/models/querysets/#django.db.models.query.QuerySet.get # noqa: E501
1152 except ObjectDoesNotExist:
1153 return render_bad_query_id(request, query_id)
1154 return render_resultcount(request, query)
1157def query_count_current(request: HttpRequest) -> HttpResponse:
1158 """
1159 View ``COUNT(*)`` from the user's current query.
1161 Args:
1162 request: the :class:`django.http.request.HttpRequest`
1164 Returns:
1165 a :class:`django.http.response.HttpResponse`
1166 """
1167 query = Query.get_active_query_or_none(request)
1168 if query is None:
1169 return no_query_selected(request)
1170 return render_resultcount(request, query)
1173class NlpSourceResult:
1174 """
1175 Serves as the return value for :func:`get_source_results`.
1176 """
1178 def __init__(
1179 self,
1180 fieldnames: List[str] = None,
1181 results: Sequence[Sequence[Any]] = None,
1182 sql: str = None,
1183 error: str = None,
1184 ) -> None:
1185 """
1186 Args:
1187 fieldnames: fieldnames in source record
1188 results: source record result
1189 sql: SQL used in getting source text
1190 error: error message
1191 """
1192 self.fieldnames = fieldnames or [] # type: List[str]
1193 self.results = results or [] # type: List[List[Any]]
1194 self.sql = sql
1195 self.error = error
1198@django_cache_function(timeout=None)
1199def get_source_results(
1200 srcdb: str,
1201 srctable: str,
1202 srcfield: str,
1203 srcpkfield: str,
1204 srcpk: Union[str, int],
1205) -> NlpSourceResult:
1206 """
1207 Get source text for CRATE NLP table record.
1209 Args:
1210 srcdb: source database as given by the nlp record
1211 srctable: source table
1212 srcfield: source field
1213 srcpkfield: the fieldname in the source table which contains the
1214 primary key
1215 srcpk: source primary key value
1217 Returns:
1218 a :class:`NlpSourceResult`
1219 """
1220 research_database_info = get_research_db_info()
1221 try:
1222 dbname = research_database_info.nlp_sourcedb_map[srcdb]
1223 except KeyError:
1224 return NlpSourceResult(
1225 error=f"No source database in settings.NLP_SOURCEDB_MAP "
1226 f"named {srcdb}"
1227 )
1228 try:
1229 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
1230 except ValueError:
1231 return NlpSourceResult(
1232 error=f"No source database in settings.RESEARCH_DB_INFO "
1233 f"named {dbname}, for source database {srcdb}"
1234 )
1235 full_tablename = dbinfo.schema_identifier + "." + srctable
1236 sql = f"SELECT {srcfield} FROM {full_tablename} WHERE {srcpkfield}={srcpk}"
1237 try:
1238 # noinspection PyTypeChecker
1239 cursor = get_executed_researchdb_cursor(
1240 sql
1241 ) # type: Pep249DatabaseCursorType
1242 except ProgrammingError:
1243 return NlpSourceResult(
1244 error=f"Table or fieldname incorrect in: {srctable}.{srcfield}"
1245 )
1246 fieldnames = get_fieldnames_from_cursor(cursor)
1247 results = cursor.fetchall()
1248 return NlpSourceResult(fieldnames=fieldnames, results=results, sql=sql)
1251def source_info(
1252 request: HttpRequest,
1253 srcdb: str,
1254 srctable: str,
1255 srcfield: str,
1256 srcpkfield: str,
1257 srcpkval: Optional[str],
1258 srcpkstr: Optional[str],
1259) -> HttpResponse:
1260 """
1261 Show source information for a record in a CRATE NLP table.
1263 Args:
1264 request: the :class:`django.http.request.HttpRequest`
1265 srcdb: source database as given by the nlp record
1266 srctable: source table
1267 srcfield: source field
1268 srcpkfield: the fieldname in the source table which contains the
1269 primary key
1270 srcpkval: primary key value in source table - may be 'None'
1271 srcpkstr: primary key string in source table - may be 'None'
1273 Returns:
1274 a :class:`django.http.response.HttpResponse`
1276 Only one of srcpkval and srcpkstr should be 'None', and this will be given
1277 as a string because it's through a URL link.
1278 """
1279 if srcpkstr == "None":
1280 srcpkstr = None
1281 srcpk = srcpkstr if srcpkstr else srcpkval
1282 nlpsourceresult = get_source_results(
1283 srcdb, srctable, srcfield, srcpkfield, srcpk
1284 )
1285 if nlpsourceresult.error:
1286 return generic_error(
1287 request, f"Source info lookup failed: {nlpsourceresult.error}"
1288 )
1289 results = nlpsourceresult.results
1290 if not results:
1291 log.warning(f"No source data found. SQL: {nlpsourceresult.sql}")
1292 else:
1293 if len(results) > 1:
1294 log.warning(
1295 f"More than one source record found. "
1296 f"SQL: {nlpsourceresult.sql}"
1297 )
1298 table_html = resultset_html_table(
1299 fieldnames=nlpsourceresult.fieldnames,
1300 rows=results,
1301 element_counter=HtmlElementCounter(),
1302 )
1303 context = {
1304 "table_html": table_html,
1305 "sql": prettify_sql_html(nlpsourceresult.sql),
1306 "sql_highlight_css": prettify_sql_css(),
1307 }
1308 # context.update(query_context(request))
1309 return render(request, "source_information.html", context)
1312def query_results(request: HttpRequest, query_id: str) -> HttpResponse:
1313 """
1314 View the results of chosen query, in conventional tabular format.
1316 Args:
1317 request: the :class:`django.http.request.HttpRequest`
1318 query_id: string form of the integer PK of the
1319 :class:`crate_anon.crateweb.research.models.Query`
1321 Returns:
1322 a :class:`django.http.response.HttpResponse`
1323 """
1324 if query_id is None:
1325 return no_query_selected(request)
1326 try:
1327 query_id = int(query_id)
1328 query = Query.objects.get(id=query_id, user=request.user)
1329 except ObjectDoesNotExist:
1330 return render_bad_query_id(request, query_id)
1331 # noinspection PyUnresolvedReferences
1332 profile = request.user.profile # type: UserProfile
1333 highlights = Highlight.get_active_highlights(request)
1334 return render_resultset(
1335 request,
1336 query,
1337 highlights,
1338 collapse_at_len=profile.collapse_at_len,
1339 collapse_at_n_lines=profile.collapse_at_n_lines,
1340 line_length=profile.line_length,
1341 )
1344def query_results_recordwise(
1345 request: HttpRequest, query_id: str
1346) -> HttpResponse:
1347 """
1348 View results of chosen query, in recordwise tabular format.
1350 Args:
1351 request: the :class:`django.http.request.HttpRequest`
1352 query_id: string form of the integer PK of the
1353 :class:`crate_anon.crateweb.research.models.Query`
1355 Returns:
1356 a :class:`django.http.response.HttpResponse`
1357 """
1358 if query_id is None:
1359 return no_query_selected(request)
1360 try:
1361 query_id = int(query_id)
1362 query = Query.objects.get(id=query_id, user=request.user)
1363 except ObjectDoesNotExist:
1364 return render_bad_query_id(request, query_id)
1365 # noinspection PyUnresolvedReferences
1366 profile = request.user.profile # type: UserProfile
1367 highlights = Highlight.get_active_highlights(request)
1368 return render_resultset_recordwise(
1369 request,
1370 query,
1371 highlights,
1372 collapse_at_len=profile.collapse_at_len,
1373 collapse_at_n_lines=profile.collapse_at_n_lines,
1374 line_length=profile.line_length,
1375 )
1378def query_tsv(request: HttpRequest, query_id: str) -> HttpResponse:
1379 """
1380 Download TSV of the specified query.
1382 Args:
1383 request: the :class:`django.http.request.HttpRequest`
1384 query_id: string form of the integer PK of the
1385 :class:`crate_anon.crateweb.research.models.Query`
1387 Returns:
1388 a :class:`django.http.response.HttpResponse`
1389 """
1390 query = get_object_or_404(Query, id=query_id) # type: Query
1391 try:
1392 return file_response(
1393 query.make_tsv(),
1394 content_type=ContentType.TSV,
1395 filename="crate_results_{num}_{datetime}.tsv".format(
1396 num=query.id,
1397 datetime=datetime_iso_for_filename(),
1398 ),
1399 )
1400 except DatabaseError as exception:
1401 return render_bad_query(request, query, exception)
1404def query_excel(request: HttpRequest, query_id: str) -> HttpResponse:
1405 """
1406 Serves an XLSX (Excel) file with the results of the specified query.
1408 Args:
1409 request: the :class:`django.http.request.HttpRequest`
1410 query_id: string form of the integer PK of the
1411 :class:`crate_anon.crateweb.research.models.Query`
1413 Returns:
1414 a :class:`django.http.response.HttpResponse`
1415 """
1416 query = get_object_or_404(Query, id=query_id) # type: Query
1417 try:
1418 return file_response(
1419 query.make_excel(),
1420 content_type=ContentType.XLSX,
1421 filename="crate_query_{}_{}.xlsx".format(
1422 query_id, datetime_iso_for_filename()
1423 ),
1424 )
1425 except DatabaseError as exception:
1426 return render_bad_query(request, query, exception)
1429def edit_display(request: HttpRequest, query_id: str) -> HttpResponse:
1430 """
1431 Edit the 'display' attribute of the selected query by choosing a list of
1432 columns to show when the results are displayed.
1433 """
1434 query = get_object_or_404(Query, user=request.user, id=query_id)
1435 display_fields = query.get_display_list()
1436 try:
1437 fieldnames = query.get_column_names()
1438 except DatabaseError as exception:
1439 query.audit(failed=True, fail_msg=str(exception))
1440 return render_bad_query(request, query, exception)
1441 context = {
1442 "query": query,
1443 "display_fields": display_fields,
1444 "fieldnames": fieldnames,
1445 }
1446 return render(request, "edit_display.html", context)
1449def save_display(request: HttpRequest, query_id: str) -> HttpResponse:
1450 """
1451 Save changes to the 'display' attribute of the selected query, and to the
1452 value of 'no_null'.
1453 """
1454 query = get_object_or_404(Query, user=request.user, id=query_id)
1455 if request.method == "POST":
1456 try:
1457 fieldnames = query.get_column_names()
1458 except DatabaseError as exception:
1459 query.audit(failed=True, fail_msg=str(exception))
1460 return render_bad_query(request, query, exception)
1461 display = [] # type: List[str]
1462 # noinspection PyArgumentList,PyCallByClass
1463 display_fieldnames = request.POST.getlist("include_field")
1464 for display_fieldname in display_fieldnames:
1465 if display_fieldname in fieldnames:
1466 display.append(display_fieldname)
1467 query.set_display_list(display)
1468 # If the user has selected 'no_null' set this attribute to True
1469 # noinspection PyArgumentList,PyCallByClass
1470 query.no_null = request.POST.get("no_null") == "true"
1471 query.save()
1472 return query_edit_select(request)
1475# @user_passes_test(is_superuser)
1476# def audit(request):
1477# """
1478# View audit log
1479# """
1480# all_audits = QueryAudit.objects.all()\
1481# .select_related('query', 'query__user')\
1482# .order_by('-id')
1483# audits = paginate(request, all_audits)
1484# context = {'audits': audits}
1485# return render(request, 'audit.html', context)
1488# =============================================================================
1489# Internal functions for views on queries
1490# =============================================================================
1492# def make_demo_query_unless_exists(request):
1493# DEMOQUERY = Query(
1494# pk=1,
1495# sql="SELECT * FROM notes\nWHERE note LIKE '%Adam%'\nLIMIT 20",
1496# raw=True,
1497# user=request.user,
1498# )
1499# DEMOQUERY.save()
1500# H1 = Highlight(pk=1, text="Aaron", colour=0, user=request.user)
1501# H1.save()
1502# H2 = Highlight(pk=2, text="Adam", colour=0, user=request.user)
1503# H2.save()
1504# H3 = Highlight(pk=3, text="October", colour=1, user=request.user)
1505# H3.save()
1507# EXCEPTIONS FOR HOMEBREW SQL.
1508# You can see:
1509# - django.db.ProgrammingError
1510# - django.db.OperationalError
1511# - InternalError (?django.db.utils.InternalError)
1512# ... but I think all are subclasses of django.db.utils.DatabaseError
1515def render_resultcount(request: HttpRequest, query: Query) -> HttpResponse:
1516 """
1517 Displays the number of rows that a given query will fetch.
1519 Args:
1520 request: the :class:`django.http.request.HttpRequest`
1521 query: a :class:`crate_anon.crateweb.research.models.Query`
1523 Returns:
1524 a :class:`django.http.response.HttpResponse`
1525 """
1526 if query is None:
1527 return render_missing_query(request)
1528 try:
1529 rowcount = query.get_rowcount()
1530 query.audit(count_only=True, n_records=rowcount)
1531 context = {
1532 "rowcount": rowcount,
1533 "sql": query.get_original_sql(),
1534 "nav_on_count": True,
1535 }
1536 context.update(query_context(request))
1537 return render(request, "query_count.html", context)
1538 # See above re exception classes
1539 except DatabaseError as exception:
1540 query.audit(count_only=True, failed=True, fail_msg=str(exception))
1541 return render_bad_query(request, query, exception)
1544def resultset_html_table(
1545 fieldnames: List[str],
1546 rows: List[List[Any]],
1547 element_counter: HtmlElementCounter,
1548 start_index: int = 0,
1549 highlight_dict: Dict[int, List[Highlight]] = None,
1550 collapse_at_len: int = None,
1551 collapse_at_n_lines: int = None,
1552 line_length: int = None,
1553 ditto: bool = True,
1554 ditto_html: str = "″",
1555 no_ditto_cols: List[int] = None,
1556 null: str = "<i>NULL</i>",
1557) -> str:
1558 """
1559 Returns an HTML table representing a set of results from a query. Its
1560 columns are the database columns; its rows are the database rows.
1562 Args:
1563 fieldnames:
1564 list of column names
1565 rows:
1566 list of rows (each row being a list of values in the same order as
1567 ``fieldnames``)
1568 element_counter:
1569 a :class:`crate_anon.crateweb.research.html_functions.HtmlElementCounter`,
1570 which will be modified
1571 start_index:
1572 the zero-based index of the first row in this table (used for
1573 pagination, when the second and subsequent tables don't start with
1574 the first row of the result set)
1575 highlight_dict:
1576 an optional dictionary mapping highlight colour to all the
1577 :class:`crate_anon.crateweb.research.models.Highlight` objects that
1578 use it (e.g.: ``2`` maps to highlight objects for all the separate
1579 pieces of text to be highlighted in colour 2)
1580 collapse_at_len:
1581 if specified, the string length beyond which the cell will be
1582 collapsed
1583 collapse_at_n_lines:
1584 if specified, the number of lines beyond which the cell will be
1585 collapsed
1586 line_length:
1587 if specified, the line length to word-wrap at
1588 ditto:
1589 whether to replace cells that are identical to the cell immediately
1590 above with ditto marks
1591 ditto_html:
1592 the HTML string to use as a ditto mark
1593 no_ditto_cols:
1594 column indexes (zero-based) for which ditto marks should never be
1595 used
1596 null:
1597 the HTML string to use for database ``NULL`` (Python ``None``)
1598 values
1600 Returns:
1601 str: HTML
1603 """ # noqa: E501
1604 # Considered but not implemented: hiding table columns
1605 # ... see esp "tr > *:nth-child(n)" at
1606 # https://stackoverflow.com/questions/5440657/how-to-hide-columns-in-html-table # noqa: E501
1607 nlptable = False
1608 if FN_NLPDEF in fieldnames:
1609 srcdb_ind = srctable_ind = srcfield_ind = None
1610 srcpkfield_ind = srcpkval_ind = srcpkstr_ind = None
1611 for i, field in enumerate(fieldnames):
1612 if field == FN_SRCDB:
1613 srcdb_ind = i
1614 elif field == FN_SRCTABLE:
1615 srctable_ind = i
1616 elif field == FN_SRCFIELD:
1617 srcfield_ind = i
1618 elif field == FN_SRCPKFIELD:
1619 srcpkfield_ind = i
1620 elif field == FN_SRCPKVAL:
1621 srcpkval_ind = i
1622 elif field == FN_SRCPKSTR:
1623 srcpkstr_ind = i
1624 if all(
1625 (
1626 srcdb_ind,
1627 srctable_ind,
1628 srcfield_ind,
1629 srcpkfield_ind,
1630 srcpkval_ind,
1631 srcpkstr_ind,
1632 )
1633 ):
1634 nlptable = True
1635 no_ditto_cols = no_ditto_cols or []
1636 ditto_cell = f' <td class="queryresult ditto">{ditto_html}</td>\n'
1637 html = "<table>\n"
1638 html += " <tr>\n"
1639 html += " <th><i>#</i></th>\n"
1640 for field in fieldnames:
1641 html += f" <th>{escape(field)}</th>\n"
1642 if nlptable:
1643 html += " <th>Link to source</th>\n"
1644 html += " </tr>\n"
1645 for row_index, row in enumerate(rows):
1646 # row_index is zero-based within this table
1647 html += ' <tr class="{}">\n'.format(
1648 "stripy_even" if row_index % 2 == 0 else "stripy_odd"
1649 )
1650 # Row number
1651 html += " <td><b><i>{}</i></b></td>\n".format(
1652 row_index + start_index + 1
1653 )
1654 # Values
1655 for col_index, value in enumerate(row):
1656 if (
1657 row_index > 0
1658 and ditto
1659 and col_index not in no_ditto_cols
1660 and value == rows[row_index - 1][col_index]
1661 ):
1662 html += ditto_cell
1663 else:
1664 html += ' <td class="queryresult">{}</td>\n'.format(
1665 make_result_element(
1666 value,
1667 element_counter=element_counter,
1668 highlight_dict=highlight_dict,
1669 collapse_at_len=collapse_at_len,
1670 collapse_at_n_lines=collapse_at_n_lines,
1671 line_length=line_length,
1672 null=null,
1673 )
1674 )
1675 # If it's an NLP table, add link to source info
1676 if nlptable:
1677 # noinspection PyUnboundLocalVariable
1678 source_url = reverse(
1679 UrlNames.SRCINFO,
1680 kwargs={
1681 "srcdb": row[srcdb_ind],
1682 "srctable": row[srctable_ind],
1683 "srcfield": row[srcfield_ind],
1684 "srcpkfield": row[srcpkfield_ind],
1685 "srcpkval": row[srcpkval_ind],
1686 "srcpkstr": row[srcpkstr_ind],
1687 },
1688 )
1689 html += f' <td><a href="{source_url}">Source info</a></td>\n'
1690 html += " </tr>\n"
1691 html += "</table>\n"
1692 return html
1695def single_record_html_table(
1696 fieldnames: List[str],
1697 record: List[Any],
1698 element_counter: HtmlElementCounter,
1699 highlight_dict: Dict[int, List[Highlight]] = None,
1700 collapse_at_len: int = None,
1701 collapse_at_n_lines: int = None,
1702 line_length: int = None,
1703) -> str:
1704 """
1705 Returns an HTML table representing a set of results from a query, in
1706 recordwise format. It has two columns, effectively "database column" and
1707 "value"; its rows are the database columns; it displays a single database
1708 result row.
1710 Args:
1711 fieldnames:
1712 list of column names
1713 record:
1714 a single result row, i.e. a list of values in the same order as
1715 ``fieldnames``
1716 element_counter:
1717 a :class:`crate_anon.crateweb.research.html_functions.HtmlElementCounter`,
1718 which will be modified
1719 highlight_dict:
1720 an optional dictionary mapping highlight colour to all the
1721 :class:`crate_anon.crateweb.research.models.Highlight` objects that
1722 use it (e.g.: ``2`` maps to highlight objects for all the separate
1723 pieces of text to be highlighted in colour 2)
1724 collapse_at_len:
1725 if specified, the string length beyond which the cell will be
1726 collapsed
1727 collapse_at_n_lines:
1728 if specified, the number of lines beyond which the cell will be
1729 collapsed
1730 line_length:
1731 if specified, the line length to word-wrap at
1733 Returns:
1734 str: HTML
1736 """ # noqa: E501
1737 table_html = ""
1738 if FN_NLPDEF in fieldnames:
1739 srcdb_ind = srctable_ind = srcfield_ind = None
1740 srcpkfield_ind = srcpkval_ind = srcpkstr_ind = None
1741 for i, field in enumerate(fieldnames):
1742 if field == FN_SRCDB:
1743 srcdb_ind = i
1744 elif field == FN_SRCTABLE:
1745 srctable_ind = i
1746 elif field == FN_SRCFIELD:
1747 srcfield_ind = i
1748 elif field == FN_SRCPKFIELD:
1749 srcpkfield_ind = i
1750 elif field == FN_SRCPKVAL:
1751 srcpkval_ind = i
1752 elif field == FN_SRCPKSTR:
1753 srcpkstr_ind = i
1754 if all(
1755 (
1756 srcdb_ind,
1757 srctable_ind,
1758 srcfield_ind,
1759 srcpkfield_ind,
1760 srcpkval_ind,
1761 srcpkstr_ind,
1762 )
1763 ):
1764 # If it's an NLP table, add link to source info above the results
1765 # noinspection PyUnboundLocalVariable
1766 source_url = reverse(
1767 UrlNames.SRCINFO,
1768 kwargs={
1769 "srcdb": record[srcdb_ind],
1770 "srctable": record[srctable_ind],
1771 "srcfield": record[srcfield_ind],
1772 "srcpkfield": record[srcpkfield_ind],
1773 "srcpkval": record[srcpkval_ind],
1774 "srcpkstr": record[srcpkstr_ind],
1775 },
1776 )
1777 table_html += (
1778 f'<b><a href="{source_url}">See NLP source info</a></b>\n'
1779 )
1780 table_html += "<table>\n"
1781 for col_index, value in enumerate(record):
1782 fieldname = fieldnames[col_index]
1783 table_html += ' <tr class="{}">\n'.format(
1784 "stripy_even" if col_index % 2 == 0 else "stripy_odd"
1785 )
1786 table_html += f" <th>{escape(fieldname)}</th>"
1787 table_html += ' <td class="queryresult">{}</td>\n'.format(
1788 make_result_element(
1789 value,
1790 element_counter=element_counter,
1791 highlight_dict=highlight_dict,
1792 collapse_at_len=collapse_at_len,
1793 collapse_at_n_lines=collapse_at_n_lines,
1794 line_length=line_length,
1795 collapsed=False,
1796 )
1797 )
1798 table_html += " </tr>\n"
1799 table_html += "</table>\n"
1800 return table_html
1803def render_resultset(
1804 request: HttpRequest,
1805 query: Query,
1806 highlights: Iterable[Highlight],
1807 collapse_at_len: int = None,
1808 collapse_at_n_lines: int = None,
1809 line_length: int = None,
1810 ditto: bool = True,
1811 ditto_html: str = "″",
1812) -> HttpResponse:
1813 """
1814 Show the results of a user's query in paginated, tabular format.
1816 Args:
1817 request:
1818 the :class:`django.http.request.HttpRequest`
1819 query:
1820 a :class:`crate_anon.crateweb.research.models.Query` to execute
1821 highlights:
1822 an iterable of
1823 :class:`crate_anon.crateweb.research.models.Highlight` objects to
1824 apply colourful highlighting to the results
1825 collapse_at_len:
1826 if specified, the string length beyond which the cell will be
1827 collapsed
1828 collapse_at_n_lines:
1829 if specified, the number of lines beyond which the cell will be
1830 collapsed
1831 line_length:
1832 if specified, the line length to word-wrap at
1833 ditto:
1834 whether to replace cells that are identical to the cell immediately
1835 above with ditto marks
1836 ditto_html:
1837 the HTML string to use as a ditto mark
1839 Returns:
1840 a :class:`django.http.response.HttpResponse`
1842 """
1843 # Query
1844 if query is None:
1845 return render_missing_query(request)
1846 try:
1847 rows = query.get_display_rows()
1848 fieldnames = query.get_display_column_names()
1849 rowcount = query.get_rowcount()
1850 query.audit(n_records=rowcount)
1851 except DatabaseError as exception:
1852 query.audit(failed=True, fail_msg=str(exception))
1853 return render_bad_query(request, query, exception)
1854 row_indexes = list(range(len(rows)))
1855 # We don't need to process all rows before we paginate.
1856 page = paginate(request, row_indexes)
1857 start_index = page.start_index() - 1
1858 end_index = page.end_index() - 1
1859 display_rows = rows[start_index : end_index + 1]
1860 # Highlights
1861 highlight_dict = Highlight.as_ordered_dict(highlights)
1862 # Table
1863 element_counter = HtmlElementCounter()
1864 table_html = resultset_html_table(
1865 fieldnames=fieldnames,
1866 rows=display_rows,
1867 element_counter=element_counter,
1868 start_index=start_index,
1869 highlight_dict=highlight_dict,
1870 collapse_at_len=collapse_at_len,
1871 collapse_at_n_lines=collapse_at_n_lines,
1872 line_length=line_length,
1873 ditto=ditto,
1874 ditto_html=ditto_html,
1875 )
1876 # Wich columns are displayed
1877 display_columns = query.get_display_column_names()
1878 all_columns = query.get_column_names()
1879 omit_columns = [x for x in all_columns if x not in display_columns]
1880 # Set last_run of the query to now
1881 query.update_last_run()
1882 # Render
1883 context = {
1884 "table_html": table_html,
1885 "page": page,
1886 "rowcount": rowcount,
1887 "sql": prettify_sql_html(query.get_original_sql()),
1888 "nav_on_results": True,
1889 "sql_highlight_css": prettify_sql_css(),
1890 "display_columns": display_columns,
1891 "omit_columns": omit_columns,
1892 "no_null": query.no_null,
1893 "query_id": query.id,
1894 }
1895 context.update(query_context(request))
1896 return render(request, "query_result.html", context)
1899def render_resultset_recordwise(
1900 request: HttpRequest,
1901 query: Query,
1902 highlights: Iterable[Highlight],
1903 collapse_at_len: int = None,
1904 collapse_at_n_lines: int = None,
1905 line_length: int = None,
1906) -> HttpResponse:
1907 """
1908 Show the results of a user's query in recordwise format.
1910 Args:
1911 request:
1912 the :class:`django.http.request.HttpRequest`
1913 query:
1914 a :class:`crate_anon.crateweb.research.models.Query` to execute
1915 highlights:
1916 an iterable of
1917 :class:`crate_anon.crateweb.research.models.Highlight` objects to
1918 apply colourful highlighting to the results
1919 collapse_at_len:
1920 if specified, the string length beyond which the cell will be
1921 collapsed
1922 collapse_at_n_lines:
1923 if specified, the number of lines beyond which the cell will be
1924 collapsed
1925 line_length:
1926 if specified, the line length to word-wrap at
1928 Returns:
1929 a :class:`django.http.response.HttpResponse`
1931 """
1932 # Query
1933 if query is None:
1934 return render_missing_query(request)
1935 try:
1936 rows = query.get_display_rows()
1937 fieldnames = query.get_display_column_names()
1938 rowcount = query.get_rowcount()
1939 query.audit(n_records=rowcount)
1940 except DatabaseError as exception:
1941 query.audit(failed=True, fail_msg=str(exception))
1942 return render_bad_query(request, query, exception)
1943 row_indexes = list(range(len(rows)))
1944 # We don't need to process all rows before we paginate.
1945 page = paginate(request, row_indexes, per_page=1)
1946 # Highlights
1947 highlight_dict = Highlight.as_ordered_dict(highlights)
1948 if rows:
1949 record_index = page.start_index() - 1
1950 record = rows[record_index]
1951 # Table
1952 element_counter = HtmlElementCounter()
1953 table_html = f"<p><i>Record {page.start_index()}</i></p>\n"
1954 table_html += single_record_html_table(
1955 fieldnames=fieldnames,
1956 record=record,
1957 element_counter=element_counter,
1958 highlight_dict=highlight_dict,
1959 collapse_at_len=collapse_at_len,
1960 collapse_at_n_lines=collapse_at_n_lines,
1961 line_length=line_length,
1962 )
1963 else:
1964 table_html = "<b>No rows returned.</b>"
1965 # Wich columns are displayed
1966 display_columns = query.get_display_column_names()
1967 all_columns = query.get_column_names()
1968 omit_columns = [x for x in all_columns if x not in display_columns]
1969 # Set last_run of the query to now
1970 query.update_last_run()
1971 # Render
1972 context = {
1973 "table_html": table_html,
1974 "page": page,
1975 "rowcount": rowcount,
1976 "sql": prettify_sql_html(query.get_original_sql()),
1977 "nav_on_results_recordwise": True,
1978 "sql_highlight_css": prettify_sql_css(),
1979 "display_columns": display_columns,
1980 "omit_columns": omit_columns,
1981 "no_null": query.no_null,
1982 "query_id": query.id,
1983 }
1984 context.update(query_context(request))
1985 return render(request, "query_result.html", context)
1988def render_missing_query(request: HttpRequest) -> HttpResponse:
1989 """
1990 A view saying "missing query".
1992 Args:
1993 request: the :class:`django.http.request.HttpRequest`
1995 Returns:
1996 a :class:`django.http.response.HttpResponse`
1997 """
1998 return render(request, "query_missing.html", query_context(request))
2001def render_bad_query(
2002 request: HttpRequest, query: Query, exception: Exception
2003) -> HttpResponse:
2004 """
2005 A view saying "your query failed". This is the normal thing to see if the
2006 user has entered bad SQL.
2008 Args:
2009 request:
2010 the :class:`django.http.request.HttpRequest`
2011 query:
2012 the :class:`crate_anon.crateweb.research.models.Query` that went
2013 wrong
2014 exception:
2015 the Python exception that resulted, which may have had extra
2016 information attached via
2017 :func:`cardinal_pythonlib.exceptions.add_info_to_exception`
2019 Returns:
2020 a :class:`django.http.response.HttpResponse`
2021 """
2022 info = recover_info_from_exception(exception)
2023 final_sql = info.get("sql", "")
2024 args = info.get("args", [])
2025 context = {
2026 "original_sql": prettify_sql_html(query.get_original_sql()),
2027 "final_sql": prettify_sql_and_args(final_sql, args),
2028 "exception": repr(exception),
2029 "sql_highlight_css": prettify_sql_css(),
2030 }
2031 context.update(query_context(request))
2032 return render(request, "query_bad.html", context)
2035def render_bad_query_id(request: HttpRequest, query_id: str) -> HttpResponse:
2036 """
2037 A view saying "bad query ID".
2039 Args:
2040 request: the :class:`django.http.request.HttpRequest`
2041 query_id: the query ID that was bad
2043 Returns:
2044 a :class:`django.http.response.HttpResponse`
2045 """
2046 context = {"query_id": query_id}
2047 context.update(query_context(request))
2048 return render(request, "query_bad_id.html", context)
2051# =============================================================================
2052# Highlights
2053# =============================================================================
2056def highlight_edit_select(request: HttpRequest) -> HttpResponse:
2057 """
2058 Edit or activate highlighting (which will apply to any queries that the
2059 user runs).
2061 Args:
2062 request: the :class:`django.http.request.HttpRequest`
2064 Returns:
2065 a :class:`django.http.response.HttpResponse`
2066 """
2067 all_highlights = Highlight.objects.filter(user=request.user).order_by(
2068 "text", "colour"
2069 )
2070 if request.method == "POST":
2071 form = AddHighlightForm(request.POST)
2072 if form.is_valid():
2073 colour = form.cleaned_data["colour"]
2074 text = form.cleaned_data["text"]
2075 identicals = all_highlights.filter(colour=colour, text=text)
2076 if identicals:
2077 identicals[0].activate()
2078 else:
2079 highlight = Highlight(
2080 colour=colour, text=text, user=request.user, active=True
2081 )
2082 highlight.save()
2083 return redirect(UrlNames.HIGHLIGHT)
2085 values = {"colour": 0}
2086 form = AddHighlightForm(values)
2087 active_highlights = all_highlights.filter(active=True)
2088 highlight_dict = Highlight.as_ordered_dict(active_highlights)
2089 highlight_descriptions = get_highlight_descriptions(highlight_dict)
2090 highlights = paginate(request, all_highlights)
2091 context = {
2092 "form": form,
2093 "highlights": highlights,
2094 "nav_on_highlight": True,
2095 "N_CSS_HIGHLIGHT_CLASSES": N_CSS_HIGHLIGHT_CLASSES,
2096 "highlight_descriptions": highlight_descriptions,
2097 "colourlist": list(range(N_CSS_HIGHLIGHT_CLASSES)),
2098 }
2099 context.update(query_context(request))
2100 return render(request, "highlight_edit_select.html", context)
2103def highlight_activate(
2104 request: HttpRequest, highlight_id: str
2105) -> HttpResponse:
2106 """
2107 Activate a highlight.
2109 Args:
2110 request: the :class:`django.http.request.HttpRequest`
2111 highlight_id: string form of the integer PK for
2112 :class:`crate_anon.crateweb.research.models.Highlight`
2114 Returns:
2115 a :class:`django.http.response.HttpResponse`
2116 """
2117 validate_blank_form(request)
2118 highlight = get_object_or_404(
2119 Highlight, id=highlight_id
2120 ) # type: Highlight
2121 highlight.activate()
2122 return redirect(UrlNames.HIGHLIGHT)
2125def highlight_deactivate(
2126 request: HttpRequest, highlight_id: str
2127) -> HttpResponse:
2128 """
2129 Deactivate a highlight.
2131 Args:
2132 request: the :class:`django.http.request.HttpRequest`
2133 highlight_id: string form of the integer PK for
2134 :class:`crate_anon.crateweb.research.models.Highlight`
2136 Returns:
2137 a :class:`django.http.response.HttpResponse`
2138 """
2139 validate_blank_form(request)
2140 highlight = get_object_or_404(
2141 Highlight, id=highlight_id
2142 ) # type: Highlight
2143 highlight.deactivate()
2144 return redirect(UrlNames.HIGHLIGHT)
2147def highlight_delete(request: HttpRequest, highlight_id: str) -> HttpResponse:
2148 """
2149 Delete a highlight.
2151 Args:
2152 request: the :class:`django.http.request.HttpRequest`
2153 highlight_id: string form of the integer PK for
2154 :class:`crate_anon.crateweb.research.models.Highlight`
2156 Returns:
2157 a :class:`django.http.response.HttpResponse`
2158 """
2159 validate_blank_form(request)
2160 highlight = get_object_or_404(
2161 Highlight, id=highlight_id
2162 ) # type: Highlight
2163 highlight.delete()
2164 return redirect(UrlNames.HIGHLIGHT)
2167# def render_bad_highlight_id(request, highlight_id):
2168# context = {'highlight_id': highlight_id}
2169# context.update(query_context(request))
2170# return render(request, 'highlight_bad_id.html', context)
2173def get_highlight_descriptions(
2174 highlight_dict: Dict[int, List[Highlight]]
2175) -> List[str]:
2176 """
2177 Returns a list of length up to ``N_CSS_HIGHLIGHT_CLASSES`` of HTML
2178 elements illustrating the highlights.
2180 Args:
2181 highlight_dict:
2182 a dictionary mapping highlight colour to all the
2183 :class:`crate_anon.crateweb.research.models.Highlight` objects that
2184 use it (e.g.: ``2`` maps to highlight objects for all the separate
2185 pieces of text to be highlighted in colour 2)
2187 Returns:
2188 str: HTML describing the highlights
2189 """
2190 desc = [] # type: List[str]
2191 for n in range(N_CSS_HIGHLIGHT_CLASSES):
2192 if n not in highlight_dict:
2193 continue
2194 desc.append(
2195 ", ".join([highlight_text(h.text, n) for h in highlight_dict[n]])
2196 )
2197 return desc
2200# =============================================================================
2201# PID lookup
2202# =============================================================================
2203# In general with these database-choosing functions, don't redirect between
2204# the "generic" and "database-specific" views using POST, because we can't then
2205# add default values to a new form (since the request.POST object is
2206# populated and immutable). Use a dbname query parameter as well.
2207# (That doesn't make it HTTP GET; it makes it HTTP POST with query parameters.)
2210def pid_rid_lookup(
2211 request: HttpRequest, with_db_url_name: str, html_filename: str
2212) -> HttpResponse:
2213 """
2214 Common functionality for :func`pidlookup`, :func:`ridlookup`.
2216 Provides a view/form allowing the user to choose a database, if more than
2217 one is possible, and then redirect to another view once we have that
2218 database choice.
2220 Args:
2221 request: the :class:`django.http.request.HttpRequest`
2222 with_db_url_name: URL name to redirect to, passed as a parameter to
2223 :func:`django.urls.reverse`
2224 html_filename: Django HTML template filename
2226 Returns:
2227 a :class:`django.http.response.HttpResponse`
2228 """
2229 research_database_info = get_research_db_info()
2230 dbinfolist = research_database_info.dbs_with_secret_map
2231 n = len(dbinfolist)
2232 if n == 0:
2233 return generic_error(request, "No databases with lookup map!")
2234 elif n == 1:
2235 dbname = dbinfolist[0].name
2236 return HttpResponseRedirect(reverse(with_db_url_name, args=[dbname]))
2237 else:
2238 form = DatabasePickerForm(request.POST or None, dbinfolist=dbinfolist)
2239 if form.is_valid():
2240 dbname = form.cleaned_data["database"]
2241 return HttpResponseRedirect(
2242 reverse(with_db_url_name, args=[dbname])
2243 )
2244 return render(request, html_filename, {"form": form})
2247def pid_rid_lookup_with_db(
2248 request: HttpRequest,
2249 dbname: str,
2250 form_html_filename: str,
2251 formclass: Any,
2252 result_html_filename: str,
2253) -> HttpResponse:
2254 """
2255 Common functionality for :func:`pidlookup_with_db`,
2256 :func:`ridlookup_with_db`.
2258 Args:
2259 request:
2260 the :class:`django.http.request.HttpRequest`
2261 dbname:
2262 name of the research database to use
2263 form_html_filename:
2264 Django HTML template filename to ask for PID/RID/etc. details
2265 formclass:
2266 form class to use for requesting PID/RID/etc.
2267 result_html_filename:
2268 Django HTML template filename to display results
2270 Returns:
2271 a :class:`django.http.response.HttpResponse`
2272 """
2273 # There's a bug in the Python 3.5 typing module; we can't use
2274 # Union[Type[PidLookupForm], Type[RidLookupForm]] yet; we get
2275 # TypeError: descriptor '__subclasses__' of 'type' object needs an argument
2276 # ... see https://github.com/python/typing/issues/266
2277 research_database_info = get_research_db_info()
2278 try:
2279 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
2280 except ValueError:
2281 return generic_error(request, f"No research database named {dbname!r}")
2282 form = formclass(
2283 request.POST or None, dbinfo=dbinfo
2284 ) # type: Union[PidLookupForm, RidLookupForm]
2285 if form.is_valid():
2286 pids = form.cleaned_data.get("pids") or [] # type: List[int]
2287 mpids = form.cleaned_data.get("mpids") or [] # type: List[int]
2288 trids = form.cleaned_data.get("trids") or [] # type: List[int]
2289 rids = form.cleaned_data.get("rids") or [] # type: List[str]
2290 mrids = form.cleaned_data.get("mrids") or [] # type: List[str]
2291 return render_lookup(
2292 request=request,
2293 dbinfo=dbinfo,
2294 result_html_filename=result_html_filename,
2295 pids=pids,
2296 mpids=mpids,
2297 trids=trids,
2298 rids=rids,
2299 mrids=mrids,
2300 )
2301 context = {
2302 "db_name": dbinfo.name,
2303 "db_description": dbinfo.description,
2304 "form": form,
2305 }
2306 return render(request, form_html_filename, context)
2309@user_passes_test(is_superuser)
2310def pidlookup(request: HttpRequest) -> HttpResponse:
2311 """
2312 Look up PID information from RID information.
2314 Args:
2315 request: the :class:`django.http.request.HttpRequest`
2317 Returns:
2318 a :class:`django.http.response.HttpResponse`
2319 """
2320 return pid_rid_lookup(
2321 request=request,
2322 with_db_url_name="pidlookup_with_db",
2323 html_filename="pid_lookup_choose_db.html",
2324 )
2327@user_passes_test(is_superuser)
2328def pidlookup_with_db(request: HttpRequest, dbname: str) -> HttpResponse:
2329 """
2330 Look up PID information from RID information, for a specific database.
2332 Args:
2333 request: the :class:`django.http.request.HttpRequest`
2334 dbname: name of the research database to use
2336 Returns:
2337 a :class:`django.http.response.HttpResponse`
2338 """
2339 return pid_rid_lookup_with_db(
2340 request=request,
2341 dbname=dbname,
2342 form_html_filename="pid_lookup_form.html",
2343 formclass=PidLookupForm,
2344 result_html_filename="pid_lookup_result.html",
2345 )
2348@user_passes_test(is_clinician)
2349def ridlookup(request: HttpRequest) -> HttpResponse:
2350 """
2351 Look up RID information from PID information.
2353 Args:
2354 request: the :class:`django.http.request.HttpRequest`
2356 Returns:
2357 a :class:`django.http.response.HttpResponse`
2358 """
2359 return pid_rid_lookup(
2360 request=request,
2361 with_db_url_name="ridlookup_with_db",
2362 html_filename="rid_lookup_choose_db.html",
2363 )
2366@user_passes_test(is_clinician)
2367def ridlookup_with_db(request: HttpRequest, dbname: str) -> HttpResponse:
2368 """
2369 Look up RID information from PID information, for a specific database.
2371 Args:
2372 request: the :class:`django.http.request.HttpRequest`
2373 dbname: name of the research database to use
2375 Returns:
2376 a :class:`django.http.response.HttpResponse`
2377 """
2378 return pid_rid_lookup_with_db(
2379 request=request,
2380 dbname=dbname,
2381 form_html_filename="rid_lookup_form.html",
2382 formclass=RidLookupForm,
2383 result_html_filename="rid_lookup_result.html",
2384 )
2387def render_lookup(
2388 request: HttpRequest,
2389 dbinfo: SingleResearchDatabase,
2390 result_html_filename: str,
2391 trids: List[int] = None,
2392 rids: List[str] = None,
2393 mrids: List[str] = None,
2394 pids: List[int] = None,
2395 mpids: List[int] = None,
2396) -> HttpResponse:
2397 """
2398 Shows the output of a PID/RID lookup.
2400 Args:
2401 request:
2402 the :class:`django.http.request.HttpRequest`
2403 dbinfo:
2404 a :class:`crate_anon.crateweb.research.research_db_info.SingleResearchDatabase`
2405 detailing the research database to use
2406 result_html_filename:
2407 Django HTML template filename to display results
2408 trids:
2409 list of TRIDs to look up from
2410 rids:
2411 list of RIDs to look up from
2412 mrids:
2413 list of MRIDs to look up from
2414 pids:
2415 list of PIDs to look up from
2416 mpids:
2417 list of MPIDs to look up from
2419 Returns:
2420 a :class:`django.http.response.HttpResponse`
2421 """ # noqa: E501
2422 # if not request.user.superuser:
2423 # return HttpResponse('Forbidden', status=403)
2424 # # https://stackoverflow.com/questions/3297048/403-forbidden-vs-401-unauthorized-http-responses # noqa: E501
2425 trids = [] if trids is None else trids
2426 rids = [] if rids is None else rids
2427 mrids = [] if mrids is None else mrids
2428 pids = [] if pids is None else pids
2429 mpids = [] if mpids is None else mpids
2431 assert dbinfo.secret_lookup_db
2432 lookups = (
2433 PidLookup.objects.using(dbinfo.secret_lookup_db)
2434 .filter(
2435 Q(trid__in=trids)
2436 | Q(rid__in=rids)
2437 | Q(mrid__in=mrids)
2438 | Q(pid__in=pids)
2439 | Q(mpid__in=mpids)
2440 )
2441 .order_by("pid")
2442 )
2443 context = {
2444 "lookups": lookups,
2445 "trid_field": dbinfo.trid_field,
2446 "trid_description": dbinfo.trid_description,
2447 "rid_field": dbinfo.rid_field,
2448 "rid_description": dbinfo.rid_description,
2449 "mrid_field": dbinfo.mrid_field,
2450 "mrid_description": dbinfo.mrid_description,
2451 "pid_description": dbinfo.pid_description,
2452 "mpid_description": dbinfo.mpid_description,
2453 }
2454 return render(request, result_html_filename, context)
2457# =============================================================================
2458# Research database structure
2459# =============================================================================
2462def structure_table_long(request: HttpRequest) -> HttpResponse:
2463 """
2464 Shows the table structure of the research database(s) in long format.
2466 Args:
2467 request: the :class:`django.http.request.HttpRequest`
2469 Returns:
2470 a :class:`django.http.response.HttpResponse`
2471 """
2472 research_database_info = get_research_db_info()
2473 colinfolist = research_database_info.get_colinfolist()
2474 rowcount = len(colinfolist)
2475 context = {
2476 "paginated": False,
2477 "colinfolist": colinfolist,
2478 "rowcount": rowcount,
2479 "default_database": research_database_info.get_default_database_name(),
2480 "default_schema": research_database_info.get_default_schema_name(),
2481 "with_database": research_database_info.uses_database_level(),
2482 }
2483 return render(request, "database_structure.html", context)
2486def structure_table_paginated(request: HttpRequest) -> HttpResponse:
2487 """
2488 Shows the table structure of the research database(s) in paginated format.
2490 Args:
2491 request: the :class:`django.http.request.HttpRequest`
2493 Returns:
2494 a :class:`django.http.response.HttpResponse`
2495 """
2496 research_database_info = get_research_db_info()
2497 colinfolist = research_database_info.get_colinfolist()
2498 rowcount = len(colinfolist)
2499 colinfolist = paginate(request, colinfolist)
2500 context = {
2501 "paginated": True,
2502 "colinfolist": colinfolist,
2503 "rowcount": rowcount,
2504 "default_database": research_database_info.get_default_database_name(),
2505 "default_schema": research_database_info.get_default_schema_name(),
2506 "with_database": research_database_info.uses_database_level(),
2507 }
2508 return render(request, "database_structure.html", context)
2511@django_cache_function(timeout=None)
2512# @lru_cache(maxsize=None)
2513def get_structure_tree_html() -> str:
2514 """
2515 Returns HTML for an expand-and-collapse tree showing the table structure of
2516 the research database(s).
2518 Returns:
2519 str: HTML
2520 """
2521 research_database_info = get_research_db_info()
2522 table_to_colinfolist = research_database_info.get_colinfolist_by_tables()
2523 content = ""
2524 element_counter = HtmlElementCounter()
2525 grammar = research_database_info.grammar
2526 for table_id, colinfolist in table_to_colinfolist.items():
2527 html_table = render_to_string(
2528 "database_structure_table.html",
2529 {
2530 "colinfolist": colinfolist,
2531 "default_database": research_database_info.get_default_database_name(), # noqa: E501
2532 "default_schema": research_database_info.get_default_schema_name(), # noqa: E501
2533 "with_database": research_database_info.uses_database_level(),
2534 },
2535 )
2536 cd_button = element_counter.visibility_div_spanbutton()
2537 cd_content = element_counter.visibility_div_contentdiv(
2538 contents=html_table
2539 )
2540 content += (
2541 '<div class="titlecolour">{db_schema}.<b>{table}</b>{button}</div>'
2542 "{cd}".format(
2543 db_schema=table_id.database_schema_part(grammar),
2544 table=table_id.table_part(grammar),
2545 button=cd_button,
2546 cd=cd_content,
2547 )
2548 )
2549 return content
2552def structure_tree(request: HttpRequest) -> HttpResponse:
2553 """
2554 Shows an expand-and-collapse tree view of the table structure of the
2555 research database(s).
2557 Args:
2558 request: the :class:`django.http.request.HttpRequest`
2560 Returns:
2561 a :class:`django.http.response.HttpResponse`
2562 """
2563 research_database_info = get_research_db_info()
2564 context = {
2565 "content": get_structure_tree_html(),
2566 "default_database": research_database_info.get_default_database_name(),
2567 "default_schema": research_database_info.get_default_schema_name(),
2568 }
2569 return render(request, "database_structure_tree.html", context)
2572# noinspection PyUnusedLocal
2573def structure_tsv(request: HttpRequest) -> HttpResponse:
2574 """
2575 Serves the table structure of the research database(s) as TSV.
2577 Args:
2578 request: the :class:`django.http.request.HttpRequest`
2580 Returns:
2581 a :class:`django.http.response.HttpResponse`
2582 """
2583 research_database_info = get_research_db_info()
2584 return file_response(
2585 research_database_info.get_tsv(),
2586 content_type=ContentType.TSV,
2587 filename="structure.tsv",
2588 )
2591# noinspection PyUnusedLocal
2592def structure_excel(request: HttpRequest) -> HttpResponse:
2593 """
2594 Serves the table structure of the research database(s) as an Excel XLSX
2595 file.
2597 Args:
2598 request: the :class:`django.http.request.HttpRequest`
2600 Returns:
2601 a :class:`django.http.response.HttpResponse`
2602 """
2603 research_database_info = get_research_db_info()
2604 return file_response(
2605 research_database_info.get_excel(),
2606 content_type=ContentType.TSV,
2607 filename="structure.xlsx",
2608 )
2611# =============================================================================
2612# Local help on structure
2613# =============================================================================
2616def local_structure_help(request: HttpRequest) -> HttpResponse:
2617 """
2618 Serves a locally specifed help page.
2620 Args:
2621 request: the :class:`django.http.request.HttpRequest`
2623 Returns:
2624 a :class:`django.http.response.HttpResponse`
2625 """
2626 if settings.DATABASE_HELP_HTML_FILENAME:
2627 with open(settings.DATABASE_HELP_HTML_FILENAME, "r") as infile:
2628 content = infile.read()
2629 return HttpResponse(content.encode("utf8"))
2630 else:
2631 content = "<p>No local help available.</p>"
2632 context = {"content": content}
2633 return render(request, "local_structure_help.html", context)
2636# =============================================================================
2637# SQL helpers
2638# =============================================================================
2641def textmatch(
2642 column_name: str, fragment: str, as_fulltext: bool, dialect: str = "mysql"
2643) -> str:
2644 """
2645 Returns SQL to check for the presence of text anywhere in a field.
2647 Args:
2648 column_name: name of the column
2649 fragment: piece of text to look for
2650 as_fulltext: use a FULLTEXT search if the database dialect supports it
2651 dialect: dialect name (``mysql``, ``mssql`` are known)
2653 Returns:
2654 str: SQL fragment like:
2656 - ``column LIKE '%fragment%'`` (ANSI SQL)
2657 - ``MATCH(column) AGAINST ('fragment')`` (MySQL full-text)
2658 - ``CONTAINS(column, 'fragment')`` (Microsoft SQL Server full-text)
2660 """
2661 if as_fulltext and dialect == "mysql":
2662 return f"MATCH({column_name}) AGAINST ('{fragment}')"
2663 elif as_fulltext and dialect == "mssql":
2664 return f"CONTAINS({column_name}, '{fragment}')"
2665 else:
2666 return f"{column_name} LIKE '%{fragment}%'"
2669def drugmatch(drug_type: str, colname: str) -> str:
2670 """
2671 Returns SQL to check for the presence of any drug of type 'drug_type'
2672 anywhere in a field.
2674 Args:
2675 drug_type: drug type to look for
2676 colname: name of the column
2677 """
2678 criteria = {drug_type: True}
2679 drugs = all_drugs_where(**criteria) # type: List[Drug]
2680 drugs_sql_parts = [drug.sql_column_like_drug(colname) for drug in drugs]
2681 drugs_sql = " OR ".join(drugs_sql_parts)
2682 return drugs_sql
2685def textfinder_sql(
2686 patient_id_fieldname: str,
2687 min_length: int,
2688 use_fulltext_index: bool,
2689 include_content: bool,
2690 include_datetime: bool,
2691 fragment: str = "",
2692 drug_type: str = "",
2693 patient_id_value: Union[int, str] = None,
2694 extra_fieldname: str = None,
2695 extra_value: Union[int, str] = None,
2696) -> str:
2697 """
2698 Returns SQL to find the text in ``fragment`` across all tables that contain
2699 the field indicated by ``patient_id_fieldname``, where the length of the
2700 text field is at least ``min_length``.
2702 Args:
2703 patient_id_fieldname:
2704 field (column) name across all tables that contains the patient ID;
2705 any tables that don't contain this column will be ignored
2706 fragment:
2707 fragment of text to find (e.g. "paracetamol")
2708 drug_type:
2709 type of drug to find any example of
2710 min_length:
2711 text fields must be at least this large to bother searching; use
2712 this option to exclude e.g. ``VARCHAR(1)`` columns from the search
2713 use_fulltext_index:
2714 use database full-text indexing?
2715 include_content:
2716 include the text fields in the output?
2717 include_datetime:
2718 include the date/time of each record, if known (see
2719 :meth:`crate_anon.crateweb.research.research_db_info.ResearchDatabaseInfo.get_default_date_column`
2720 patient_id_value:
2721 specify this to restrict to a single patient; the value of the
2722 patient ID column (see ``patient_id_fieldname``) to restrict to
2723 extra_fieldname:
2724 extra_value:
2726 Returns:
2727 str: SQL query
2729 Raises:
2730 :exc:`ValueError` if no tables match the request
2731 """
2732 if not fragment and not drug_type:
2733 raise ValueError(
2734 "Must supply either 'fragment' or 'drug_type' to 'textfinder_sql'"
2735 )
2736 research_database_info = get_research_db_info()
2737 grammar = research_database_info.grammar
2738 tables = research_database_info.tables_containing_field(
2739 patient_id_fieldname
2740 )
2741 if not tables:
2742 raise ValueError(
2743 f"No tables containing fieldname: {patient_id_fieldname}"
2744 )
2745 have_pid_value = patient_id_value is not None and patient_id_value != ""
2746 if have_pid_value:
2747 pidclause = "{patient_id_fieldname} = {value}".format(
2748 patient_id_fieldname=patient_id_fieldname,
2749 value=escape_sql_string_or_int_literal(patient_id_value),
2750 )
2751 else:
2752 pidclause = ""
2753 using_extra = extra_fieldname and extra_value is not None
2754 table_heading = "_table_name"
2755 contents_colname_heading = "_column_name"
2756 datetime_heading = "_datetime"
2758 queries = [] # type: List[str]
2760 def add_query(
2761 table_ident: str,
2762 extra_cols: List[str],
2763 date_value_select: str,
2764 extra_conditions: List[str],
2765 ) -> None:
2766 selectcols = [] # type: List[str]
2767 # Patient ID(s); date
2768 if using_extra:
2769 selectcols.append(
2770 "{lit} AS {ef}".format(
2771 lit=escape_sql_string_or_int_literal(extra_value),
2772 ef=extra_fieldname,
2773 )
2774 )
2775 selectcols.append(patient_id_fieldname)
2776 if include_datetime:
2777 selectcols.append(f"{date_value_select} AS {datetime_heading}")
2778 # +/- table/column/content
2779 selectcols += extra_cols
2780 # Build query
2781 query = f"SELECT {', '.join(selectcols)}\n" f"FROM {table_ident}"
2782 conditions = [] # type: List[str]
2783 if have_pid_value:
2784 conditions.append(pidclause)
2785 conditions.extend(extra_conditions)
2786 query += "\nWHERE " + " AND ".join(conditions)
2787 queries.append(query)
2789 for table_id in tables:
2790 columns = research_database_info.text_columns(
2791 table_id=table_id, min_length=min_length
2792 )
2793 if not columns:
2794 continue
2795 table_identifier = table_id.identifier(grammar)
2796 date_col = research_database_info.get_default_date_column(
2797 table=table_id
2798 )
2799 if date_col:
2800 date_identifier = date_col.identifier(grammar)
2801 else:
2802 date_identifier = "NULL"
2804 if include_content:
2805 # Content required; therefore, one query per text column.
2806 table_select = "'{}' AS {}".format(
2807 escape_sql_string_literal(table_identifier), table_heading
2808 )
2809 for columninfo in columns:
2810 column_identifier = columninfo.column_id.identifier(grammar)
2811 # 'extra_conditions' will be the sql fragment finding either
2812 # the fragment of text supplied or all drugs of the given type
2813 if fragment:
2814 extra = textmatch(
2815 column_name=column_identifier,
2816 fragment=fragment,
2817 as_fulltext=(
2818 columninfo.indexed_fulltext and use_fulltext_index
2819 ),
2820 dialect=settings.RESEARCH_DB_DIALECT,
2821 )
2822 else:
2823 extra = drugmatch(
2824 colname=column_identifier, drug_type=drug_type
2825 )
2826 contentcol_name_select = (
2827 f"'{column_identifier}' AS {contents_colname_heading}"
2828 )
2829 content_select = f"{column_identifier} AS _content"
2830 add_query(
2831 table_ident=table_identifier,
2832 extra_cols=[
2833 table_select,
2834 contentcol_name_select,
2835 content_select,
2836 ],
2837 date_value_select=date_identifier,
2838 extra_conditions=[extra],
2839 )
2841 else:
2842 # Content not required; therefore, one query per table.
2843 elements = [] # type: List[str]
2844 for columninfo in columns:
2845 if fragment:
2846 elmnt = textmatch(
2847 column_name=columninfo.column_id.identifier(grammar),
2848 fragment=fragment,
2849 as_fulltext=(
2850 columninfo.indexed_fulltext and use_fulltext_index
2851 ),
2852 dialect=settings.RESEARCH_DB_DIALECT,
2853 )
2854 else:
2855 elmnt = drugmatch(
2856 colname=columninfo.column_id.identifier(grammar),
2857 drug_type=drug_type,
2858 )
2859 elements.append(elmnt)
2860 add_query(
2861 table_ident=table_identifier,
2862 extra_cols=[],
2863 date_value_select=date_identifier,
2864 extra_conditions=[
2865 "(\n {}\n)".format("\n OR ".join(elements))
2866 ],
2867 )
2869 sql = "\nUNION\n".join(queries)
2870 if sql:
2871 order_by_cols = [] # type: List[str]
2872 if using_extra:
2873 order_by_cols.append(extra_fieldname)
2874 order_by_cols.append(patient_id_fieldname)
2875 if include_datetime:
2876 order_by_cols.append(datetime_heading + " DESC")
2877 if include_content:
2878 order_by_cols.extend([table_heading, contents_colname_heading])
2879 sql += "\nORDER BY " + ", ".join(order_by_cols)
2880 return sql
2883def common_find_text(
2884 request: HttpRequest,
2885 dbinfo: SingleResearchDatabase,
2886 form_class: Type[SQLHelperFindAnywhereForm],
2887 default_values: Dict[str, Any],
2888 permit_pid_search: bool,
2889 html_filename: str,
2890) -> HttpResponse:
2891 """
2892 Finds and displays text anywhere in the database(s), via a ``UNION`` query.
2894 Args:
2895 request:
2896 the :class:`django.http.request.HttpRequest`
2897 dbinfo:
2898 a :class:`crate_anon.crateweb.research.research_db_info.SingleResearchDatabase`
2899 detailing the research database to use
2900 form_class:
2901 form class to use to specify search options;
2902 :class:`crate_anon.crateweb.research.forms.SQLHelperTextAnywhereForm`
2903 or a subclass of it (like
2904 `crate_anon.crateweb.research.forms.ClinicianAllTextFromPidForm`)
2905 default_values:
2906 default values to be passed to the form (see ``form_class``)
2907 permit_pid_search:
2908 allow the user to search by PID/MPID (for clinicians)?
2909 html_filename:
2910 Django HTML template filename to capture search options
2912 Returns:
2913 a :class:`django.http.response.HttpResponse`
2914 """ # noqa: E501
2915 # When you forget about Django forms, go back to:
2916 # http://www.slideshare.net/pydanny/advanced-django-forms-usage
2918 # -------------------------------------------------------------------------
2919 # What may the user use to look up patients?
2920 # -------------------------------------------------------------------------
2921 fk_options = [] # type: List[FieldPickerInfo]
2922 if permit_pid_search:
2923 fk_options.append(
2924 FieldPickerInfo(
2925 value=dbinfo.pid_pseudo_field,
2926 description=(
2927 f"{dbinfo.pid_pseudo_field}: {dbinfo.pid_description}"
2928 ),
2929 type_=PatientFieldPythonTypes.PID,
2930 permits_empty_id=False,
2931 )
2932 )
2933 fk_options.append(
2934 FieldPickerInfo(
2935 value=dbinfo.mpid_pseudo_field,
2936 description=(
2937 f"{dbinfo.mpid_pseudo_field}: {dbinfo.mpid_description}"
2938 ),
2939 type_=PatientFieldPythonTypes.MPID,
2940 permits_empty_id=False,
2941 )
2942 )
2943 assert dbinfo.secret_lookup_db
2944 default_values["fkname"] = dbinfo.pid_pseudo_field
2945 fk_options.append(
2946 FieldPickerInfo(
2947 value=dbinfo.rid_field,
2948 description=f"{dbinfo.rid_field}: {dbinfo.rid_description}",
2949 type_=PatientFieldPythonTypes.RID,
2950 permits_empty_id=True,
2951 ),
2952 )
2953 if dbinfo.secret_lookup_db:
2954 fk_options.append(
2955 FieldPickerInfo(
2956 value=dbinfo.mrid_field,
2957 description=f"{dbinfo.mrid_field}: {dbinfo.mrid_description}",
2958 type_=PatientFieldPythonTypes.MRID,
2959 permits_empty_id=False,
2960 )
2961 )
2963 # We don't want to make too much of the TRID. Let's not offer it as
2964 # a lookup option. If performance becomes a major problem with these
2965 # queries, we could always say "if dbinfo.secret_lookup_db, then
2966 # look up the TRID from the RID (or whatever we're using)".
2967 #
2968 # FieldPickerInfo(value=dbinfo.trid_field,
2969 # description="{}: {}".format(dbinfo.trid_field,
2970 # dbinfo.trid_description),
2971 # type_=PatientFieldPythonTypes.TRID),
2973 form = form_class(request.POST or default_values, fk_options=fk_options)
2974 if form.is_valid():
2975 patient_id_fieldname = form.cleaned_data["fkname"]
2976 pidvalue = form.cleaned_data["patient_id"]
2977 min_length = form.cleaned_data["min_length"]
2979 # ---------------------------------------------------------------------
2980 # Whare are we going to use internally for the lookup?
2981 # ---------------------------------------------------------------------
2982 # For patient lookups, a TRID is quick but not so helpful for
2983 # clinicians. Use the RID.
2984 if patient_id_fieldname == dbinfo.pid_pseudo_field:
2985 lookup = (
2986 PidLookup.objects.using(dbinfo.secret_lookup_db)
2987 .filter(pid=pidvalue)
2988 .first()
2989 ) # type: PidLookup
2990 if lookup is None:
2991 return generic_error(
2992 request, f"No patient with PID {pidvalue!r}"
2993 )
2994 # Replace:
2995 extra_fieldname = patient_id_fieldname
2996 extra_value = pidvalue
2997 patient_id_fieldname = dbinfo.rid_field
2998 pidvalue = lookup.rid # string
2999 elif patient_id_fieldname == dbinfo.mpid_pseudo_field:
3000 lookup = (
3001 PidLookup.objects.using(dbinfo.secret_lookup_db)
3002 .filter(mpid=pidvalue)
3003 .first()
3004 ) # type: PidLookup
3005 if lookup is None:
3006 return generic_error(
3007 request, f"No patient with MPID {pidvalue!r}"
3008 )
3009 # Replace:
3010 extra_fieldname = patient_id_fieldname
3011 extra_value = pidvalue
3012 patient_id_fieldname = dbinfo.rid_field
3013 pidvalue = lookup.rid # string
3015 elif patient_id_fieldname == dbinfo.mrid_field:
3016 # Using MRID. This is not stored in each table. Rather than have
3017 # an absolutely enormous query (SELECT stuff FROM texttable INNER
3018 # JOIN mridtable ON patient_id_stuff WHERE textttable.contents
3019 # LIKE something AND mridtable.mrid = ? UNION SELECT morestuff...)
3020 # let's look up the RID from the MRID. Consequently, we only offer
3021 # MRID lookup if we have a secret lookup table.
3022 lookup = (
3023 PidLookup.objects.using(dbinfo.secret_lookup_db)
3024 .filter(mrid=pidvalue)
3025 .first()
3026 )
3027 if lookup is None:
3028 return generic_error(
3029 request, f"No patient with RID {pidvalue!r}"
3030 )
3031 # Replace:
3032 extra_fieldname = patient_id_fieldname
3033 extra_value = pidvalue
3034 patient_id_fieldname = dbinfo.rid_field
3035 pidvalue = lookup.rid # string
3037 else:
3038 # Using RID directly (or, if we wanted to support it, TRID).
3039 extra_fieldname = None
3040 extra_value = None
3042 # ---------------------------------------------------------------------
3043 # Generate the query
3044 # ---------------------------------------------------------------------
3045 if form_class == SQLHelperDrugTypeForm:
3046 fragment = ""
3047 drug_type = escape_sql_string_literal(
3048 form.cleaned_data["drug_type"]
3049 )
3050 else:
3051 fragment = escape_sql_string_literal(form.cleaned_data["fragment"])
3052 drug_type = ""
3053 try:
3054 sql = textfinder_sql(
3055 patient_id_fieldname=patient_id_fieldname,
3056 fragment=fragment,
3057 drug_type=drug_type,
3058 min_length=min_length,
3059 use_fulltext_index=form.cleaned_data["use_fulltext_index"],
3060 include_content=form.cleaned_data["include_content"],
3061 include_datetime=form.cleaned_data["include_datetime"],
3062 patient_id_value=pidvalue,
3063 extra_fieldname=extra_fieldname,
3064 extra_value=extra_value,
3065 )
3066 # This SQL will link across all available research databases
3067 # where the fieldname conditions are met.
3068 if not sql:
3069 raise ValueError(
3070 f"No fields matched your criteria (text columns of "
3071 f"minimum length {min_length} in tables containing "
3072 f"field {patient_id_fieldname!r})"
3073 )
3074 except ValueError as e:
3075 return generic_error(request, str(e))
3077 # ---------------------------------------------------------------------
3078 # Run, save, or display the query
3079 # ---------------------------------------------------------------------
3080 if "submit_save" in request.POST:
3081 return query_submit(request, sql, run=False)
3082 elif "submit_run" in request.POST:
3083 return query_submit(request, sql, run=True)
3084 else:
3085 return render(request, "sql_fragment.html", {"sql": sql})
3087 # -------------------------------------------------------------------------
3088 # Offer the starting choices
3089 # -------------------------------------------------------------------------
3090 return render(
3091 request,
3092 html_filename,
3093 {
3094 "db_name": dbinfo.name,
3095 "db_description": dbinfo.description,
3096 "form": form,
3097 },
3098 )
3101def sqlhelper_text_anywhere(request: HttpRequest) -> HttpResponse:
3102 """
3103 Picks a database, then redirects to
3104 :func:`sqlhelper_text_anywhere_with_db`.
3106 Args:
3107 request: the :class:`django.http.request.HttpRequest`
3109 Returns:
3110 a :class:`django.http.response.HttpResponse`
3111 """
3112 research_database_info = get_research_db_info()
3113 if research_database_info.single_research_db:
3114 dbname = research_database_info.first_dbinfo.name
3115 return HttpResponseRedirect(
3116 reverse(UrlNames.SQLHELPER_TEXT_ANYWHERE_WITH_DB, args=[dbname])
3117 )
3118 else:
3119 form = DatabasePickerForm(
3120 request.POST or None, dbinfolist=research_database_info.dbinfolist
3121 )
3122 if form.is_valid():
3123 dbname = form.cleaned_data["database"]
3124 return HttpResponseRedirect(
3125 reverse(
3126 UrlNames.SQLHELPER_TEXT_ANYWHERE_WITH_DB, args=[dbname]
3127 )
3128 )
3129 return render(
3130 request,
3131 "sqlhelper_form_text_anywhere_choose_db.html",
3132 {"form": form},
3133 )
3136def sqlhelper_text_anywhere_with_db(
3137 request: HttpRequest, dbname: str
3138) -> HttpResponse:
3139 """
3140 Finds text anywhere in the database(s) via a ``UNION`` query.
3142 Args:
3143 request: the :class:`django.http.request.HttpRequest`
3144 dbname: name of the research database to use
3146 Returns:
3147 a :class:`django.http.response.HttpResponse`
3148 """
3149 research_database_info = get_research_db_info()
3150 try:
3151 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
3152 except ValueError:
3153 return generic_error(request, f"No research database named {dbname!r}")
3154 default_values = {
3155 "fkname": dbinfo.rid_field,
3156 "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH,
3157 "use_fulltext_index": True,
3158 "include_content": False,
3159 "include_datetime": False,
3160 }
3161 return common_find_text(
3162 request=request,
3163 dbinfo=dbinfo,
3164 form_class=SQLHelperTextAnywhereForm,
3165 default_values=default_values,
3166 permit_pid_search=False,
3167 html_filename="sqlhelper_form_text_anywhere.html",
3168 )
3171def sqlhelper_drug_type(request: HttpRequest) -> HttpResponse:
3172 """
3173 Picks a database, then redirects to
3174 :func:`sqlhelper_drug_type_with_db`.
3176 Args:
3177 request: the :class:`django.http.request.HttpRequest`
3179 Returns:
3180 a :class:`django.http.response.HttpResponse`
3181 """
3182 research_database_info = get_research_db_info()
3183 if research_database_info.single_research_db:
3184 dbname = research_database_info.first_dbinfo.name
3185 return HttpResponseRedirect(
3186 reverse(UrlNames.SQLHELPER_DRUG_TYPE_WITH_DB, args=[dbname])
3187 )
3188 else:
3189 form = DatabasePickerForm(
3190 request.POST or None, dbinfolist=research_database_info.dbinfolist
3191 )
3192 if form.is_valid():
3193 dbname = form.cleaned_data["database"]
3194 return HttpResponseRedirect(
3195 reverse(UrlNames.SQLHELPER_DRUG_TYPE_WITH_DB, args=[dbname])
3196 )
3197 return render(
3198 request, "sqlhelper_form_drug_type_choose_db.html", {"form": form}
3199 )
3202def sqlhelper_drug_type_with_db(
3203 request: HttpRequest, dbname: str
3204) -> HttpResponse:
3205 """
3206 Finds drugs of a given type anywhere in the database(s) via a ``UNION``
3207 query.
3209 Args:
3210 request: the :class:`django.http.request.HttpRequest`
3211 dbname: name of the research database to use
3213 Returns:
3214 a :class:`django.http.response.HttpResponse`
3215 """
3216 research_database_info = get_research_db_info()
3217 try:
3218 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
3219 except ValueError:
3220 return generic_error(request, f"No research database named {dbname!r}")
3221 default_values = {
3222 "fkname": dbinfo.rid_field,
3223 "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH,
3224 "use_fulltext_index": True,
3225 "include_content": False,
3226 "include_datetime": False,
3227 }
3228 return common_find_text(
3229 request=request,
3230 dbinfo=dbinfo,
3231 form_class=SQLHelperDrugTypeForm,
3232 default_values=default_values,
3233 permit_pid_search=False,
3234 html_filename="sqlhelper_form_drugtype.html",
3235 )
3238@user_passes_test(is_clinician)
3239def all_text_from_pid(request: HttpRequest) -> HttpResponse:
3240 """
3241 Picks a database, then redirects to :func:`all_text_from_pid_with_db`.
3243 Args:
3244 request: the :class:`django.http.request.HttpRequest`
3246 Returns:
3247 a :class:`django.http.response.HttpResponse`
3248 """
3249 research_database_info = get_research_db_info()
3250 dbinfolist = research_database_info.dbs_with_secret_map
3251 n = len(dbinfolist)
3252 if n == 0:
3253 return generic_error(request, "No databases with lookup map!")
3254 elif n == 1:
3255 dbname = dbinfolist[0].name
3256 return HttpResponseRedirect(
3257 reverse(UrlNames.ALL_TEXT_FROM_PID_WITH_DB, args=[dbname])
3258 )
3259 else:
3260 form = DatabasePickerForm(request.POST or None, dbinfolist=dbinfolist)
3261 if form.is_valid():
3262 dbname = form.cleaned_data["database"]
3263 return HttpResponseRedirect(
3264 reverse(UrlNames.ALL_TEXT_FROM_PID_WITH_DB, args=[dbname])
3265 )
3266 return render(
3267 request,
3268 "clinician_form_all_text_from_pid_choose_db.html",
3269 {"form": form},
3270 )
3273@user_passes_test(is_clinician)
3274def all_text_from_pid_with_db(
3275 request: HttpRequest, dbname: str
3276) -> HttpResponse:
3277 """
3278 Clinician view to look up a patient's RID from their PID and display
3279 text from any field.
3281 Args:
3282 request: the :class:`django.http.request.HttpRequest`
3283 dbname: name of the research database to use
3285 Returns:
3286 a :class:`django.http.response.HttpResponse`
3287 """
3288 research_database_info = get_research_db_info()
3289 try:
3290 dbinfo = research_database_info.get_dbinfo_by_name(dbname)
3291 except ValueError:
3292 return generic_error(request, f"No research database named {dbname!r}")
3293 default_values = {
3294 "min_length": DEFAULT_MIN_TEXT_FIELD_LENGTH,
3295 "use_fulltext_index": True,
3296 "include_content": True,
3297 "include_datetime": True,
3298 }
3299 return common_find_text(
3300 request=request,
3301 dbinfo=dbinfo,
3302 form_class=ClinicianAllTextFromPidForm,
3303 default_values=default_values,
3304 permit_pid_search=True,
3305 html_filename="clinician_form_all_text_from_pid.html",
3306 )
3309# =============================================================================
3310# Per-patient views: Patient Explorer
3311# =============================================================================
3314def pe_build(request: HttpRequest) -> HttpResponse:
3315 """
3316 View to build/edit a Patient Explorer (see
3317 :class:`crate_anon.crateweb.research.models.PatientExplorer`).
3319 Args:
3320 request: the :class:`django.http.request.HttpRequest`
3322 Returns:
3323 a :class:`django.http.response.HttpResponse`
3325 """
3326 research_database_info = get_research_db_info()
3327 # noinspection PyUnresolvedReferences
3328 profile = request.user.profile # type: UserProfile
3329 default_database = research_database_info.get_default_database_name()
3330 default_schema = research_database_info.get_default_schema_name()
3331 with_database = research_database_info.uses_database_level()
3332 manual_form = None
3333 form = None
3335 if not profile.patient_multiquery_scratchpad:
3336 profile.patient_multiquery_scratchpad = PatientMultiQuery()
3337 pmq = profile.patient_multiquery_scratchpad
3339 if request.method == "POST":
3340 if "global_clear_select" in request.POST:
3341 pmq.clear_output_columns()
3342 profile.save()
3344 elif "global_clear_where" in request.POST:
3345 pmq.clear_patient_conditions()
3346 profile.save()
3348 elif "global_clear_everything" in request.POST:
3349 pmq.clear_output_columns()
3350 pmq.clear_patient_conditions()
3351 pmq.set_override_query("")
3352 profile.save()
3354 elif "global_save" in request.POST:
3355 if pmq.ok_to_run:
3356 return pe_submit(request, pmq, run=False)
3358 elif "global_run" in request.POST:
3359 if pmq.ok_to_run:
3360 return pe_submit(request, pmq, run=True)
3362 elif "global_manual_set" in request.POST:
3363 manual_form = ManualPeQueryForm(request.POST)
3364 if manual_form.is_valid():
3365 sql = manual_form.cleaned_data["sql"]
3366 pmq.set_override_query(sql)
3367 profile.save()
3369 elif "global_manual_clear" in request.POST:
3370 pmq.set_override_query("")
3371 profile.save()
3373 else:
3374 form = QueryBuilderForm(request.POST, request.FILES)
3375 if form.is_valid():
3376 database = (
3377 form.cleaned_data["database"] if with_database else ""
3378 )
3379 schema = form.cleaned_data["schema"]
3380 table = form.cleaned_data["table"]
3381 column = form.cleaned_data["column"]
3382 column_id = ColumnId(
3383 db=database, schema=schema, table=table, column=column
3384 )
3386 if "submit_select" in request.POST:
3387 pmq.add_output_column(column_id)
3389 elif "submit_select_star" in request.POST:
3390 table_id = column_id.table_id
3391 all_column_ids = [
3392 c.column_id
3393 for c in research_database_info.all_columns(table_id)
3394 ]
3395 for c in all_column_ids:
3396 pmq.add_output_column(c)
3398 elif "submit_where" in request.POST:
3399 datatype = form.cleaned_data["datatype"]
3400 op = form.cleaned_data["where_op"]
3401 # Value
3402 if op in SQL_OPS_MULTIPLE_VALUES:
3403 value = form.file_values_list
3404 elif op in SQL_OPS_VALUE_UNNECESSARY:
3405 value = None
3406 else:
3407 value = form.get_cleaned_where_value()
3408 # WHERE fragment
3409 wherecond = WhereCondition(
3410 column_id=column_id,
3411 op=op,
3412 datatype=datatype,
3413 value_or_values=value,
3414 )
3415 pmq.add_patient_condition(wherecond)
3417 else:
3418 raise ValueError("Bad form command!")
3419 profile.save()
3421 else:
3422 # log.critical("not is_valid")
3423 pass
3425 manual_query = pmq.manual_patient_id_query
3427 if form is None:
3428 form = QueryBuilderForm()
3429 if manual_form is None:
3430 manual_form = ManualPeQueryForm({"sql": manual_query})
3432 starting_values_dict = {
3433 "database": form.data.get("database", "") if with_database else "",
3434 "schema": form.data.get("schema", ""),
3435 "table": form.data.get("table", ""),
3436 "column": form.data.get("column", ""),
3437 "op": form.data.get("where_op", ""),
3438 "date_value": form.data.get("date_value", ""),
3439 # Impossible to set file_value programmatically. (See querybuilder.js.)
3440 "float_value": form.data.get("float_value", ""),
3441 "int_value": form.data.get("int_value", ""),
3442 "string_value": form.data.get("string_value", ""),
3443 "offer_where": bool(True),
3444 "form_errors": "<br>".join(
3445 f"{k}: {v}" for k, v in form.errors.items()
3446 ),
3447 "default_database": default_database,
3448 "default_schema": default_schema,
3449 "with_database": with_database,
3450 }
3452 if manual_query:
3453 pmq_patient_conditions = (
3454 "<div><i>Overridden by manual query.</i></div>"
3455 )
3456 pmq_manual_patient_query = prettify_sql_html(
3457 pmq.manual_patient_id_query
3458 )
3459 else:
3460 pmq_patient_conditions = pmq.pt_conditions_html
3461 pmq_manual_patient_query = "<div><i>None</i></div>"
3462 pmq_final_patient_query = prettify_sql_html(
3463 pmq.patient_id_query(with_order_by=True)
3464 )
3466 warnings = ""
3467 if not pmq.has_patient_id_query:
3468 warnings += '<div class="warning">No patient criteria yet</div>'
3469 if not pmq.has_output_columns:
3470 warnings += '<div class="warning">No output columns yet</div>'
3472 context = {
3473 "nav_on_pe_build": True,
3474 "pmq_output_columns": pmq.output_cols_html,
3475 "pmq_patient_conditions": pmq_patient_conditions,
3476 "pmq_manual_patient_query": pmq_manual_patient_query,
3477 "pmq_final_patient_query": pmq_final_patient_query,
3478 "warnings": warnings,
3479 "database_structure": get_db_structure_json(),
3480 "starting_values": json.dumps(
3481 starting_values_dict, separators=JSON_SEPARATORS_COMPACT
3482 ),
3483 "sql_dialect": settings.RESEARCH_DB_DIALECT,
3484 "dialect_mysql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MYSQL,
3485 "dialect_mssql": settings.RESEARCH_DB_DIALECT == SqlaDialectName.MSSQL,
3486 "sql_highlight_css": prettify_sql_css(),
3487 "manual_form": manual_form,
3488 }
3489 context.update(query_context(request))
3490 return render(request, "pe_build.html", context)
3493def pe_choose(request: HttpRequest) -> HttpResponse:
3494 """
3495 Choose one of the user's Patient Explorers (see
3496 :class:`crate_anon.crateweb.research.models.PatientExplorer`).
3498 Args:
3499 request: the :class:`django.http.request.HttpRequest`
3501 Returns:
3502 a :class:`django.http.response.HttpResponse`
3503 """
3504 all_pes = get_all_pes(request)
3505 patient_explorers = paginate(request, all_pes)
3506 context = {
3507 "nav_on_pe_choose": True,
3508 "patient_explorers": patient_explorers,
3509 "sql_highlight_css": prettify_sql_css(),
3510 }
3511 context.update(query_context(request))
3512 return render(request, "pe_choose.html", context)
3515def pe_activate(request: HttpRequest, pe_id: str) -> HttpResponse:
3516 """
3517 Activate one of the user's Patient Explorers.
3519 Args:
3520 request: the :class:`django.http.request.HttpRequest`
3521 pe_id: string form of the integer PK of a
3522 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3524 Returns:
3525 a :class:`django.http.response.HttpResponse`
3526 """
3527 validate_blank_form(request)
3528 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3529 pe.activate()
3530 return redirect(UrlNames.PE_CHOOSE)
3533def pe_delete(request: HttpRequest, pe_id: str) -> HttpResponse:
3534 """
3535 Delete one of the user's Patient Explorers.
3537 Args:
3538 request: the :class:`django.http.request.HttpRequest`
3539 pe_id: string form of the integer PK of a
3540 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3542 Returns:
3543 a :class:`django.http.response.HttpResponse`
3544 """
3545 validate_blank_form(request)
3546 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3547 pe.delete_if_permitted()
3548 return redirect(UrlNames.PE_CHOOSE)
3551def pe_edit(request: HttpRequest, pe_id: str) -> HttpResponse:
3552 """
3553 Edit one of the user's Patient Explorers.
3555 Args:
3556 request: the :class:`django.http.request.HttpRequest`
3557 pe_id: string form of the integer PK of a
3558 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3560 Returns:
3561 a :class:`django.http.response.HttpResponse`
3562 """
3563 validate_blank_form(request)
3564 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3565 # noinspection PyUnresolvedReferences
3566 profile = request.user.profile # type: UserProfile
3567 profile.patient_multiquery_scratchpad = pe.patient_multiquery
3568 profile.save()
3569 return redirect(UrlNames.PE_BUILD)
3572def pe_results(request: HttpRequest, pe_id: str) -> HttpResponse:
3573 """
3574 Show the results of a Patient Explorer, in paginated tabular form.
3576 Args:
3577 request: the :class:`django.http.request.HttpRequest`
3578 pe_id: string form of the integer PK of a
3579 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3581 Returns:
3582 a :class:`django.http.response.HttpResponse`
3583 """
3584 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3585 research_database_info = get_research_db_info()
3586 grammar = research_database_info.grammar
3587 # noinspection PyUnresolvedReferences
3588 profile = request.user.profile # type: UserProfile
3589 highlights = Highlight.get_active_highlights(request)
3590 highlight_dict = Highlight.as_ordered_dict(highlights)
3591 element_counter = HtmlElementCounter()
3592 patient_id_query_html = prettify_sql_html(pe.get_patient_id_query())
3593 patients_per_page = get_patients_per_page(request)
3594 try:
3595 mrids = pe.get_patient_mrids()
3596 page = paginate(request, mrids, per_page=patients_per_page)
3597 active_mrids = list(page) # type: List[str]
3598 results = [] # type: List[Dict[str, Any]]
3599 if active_mrids:
3600 for tsa in pe.all_queries(mrids=active_mrids):
3601 table_id = tsa.table_id
3602 sql = tsa.sql
3603 args = tsa.args
3604 with pe.get_executed_cursor(sql, args) as cursor:
3605 fieldnames = get_fieldnames_from_cursor(cursor)
3606 rows = cursor.fetchall()
3607 table_html = resultset_html_table(
3608 fieldnames=fieldnames,
3609 rows=rows,
3610 element_counter=element_counter,
3611 highlight_dict=highlight_dict,
3612 collapse_at_len=profile.collapse_at_len,
3613 collapse_at_n_lines=profile.collapse_at_n_lines,
3614 line_length=profile.line_length,
3615 )
3616 query_html = element_counter.visibility_div_with_divbutton(
3617 contents=prettify_sql_and_args(sql, args),
3618 title_html="SQL",
3619 )
3620 results.append(
3621 {
3622 "tablename": table_id.identifier(grammar),
3623 "table_html": table_html,
3624 "query_html": query_html,
3625 }
3626 )
3627 n_records = len(mrids)
3628 context = {
3629 "nav_on_pe_results": True,
3630 "results": results,
3631 "page": page,
3632 "rowcount": n_records,
3633 "patient_id_query_html": patient_id_query_html,
3634 "patients_per_page": patients_per_page,
3635 "sql_highlight_css": prettify_sql_css(),
3636 }
3637 context.update(query_context(request))
3638 pe.audit(n_records=n_records)
3639 return render(request, "pe_result.html", context)
3641 except DatabaseError as exception:
3642 pe.audit(failed=True, fail_msg=str(exception))
3643 return render_bad_pe(request, pe, exception)
3646def render_missing_pe(request: HttpRequest) -> HttpResponse:
3647 """
3648 Tell the user that there's no Patient Explorer selected, when there should
3649 have been.
3651 Args:
3652 request: the :class:`django.http.request.HttpRequest`
3654 Returns:
3655 a :class:`django.http.response.HttpResponse`
3656 """
3657 return render(request, "pe_missing.html", query_context(request))
3660# noinspection PyUnusedLocal
3661def render_bad_pe(
3662 request: HttpRequest, pe: PatientExplorer, exception: Exception
3663) -> HttpResponse:
3664 """
3665 A view saying "your Patient Explorer failed".
3667 Args:
3668 request:
3669 the :class:`django.http.request.HttpRequest`
3670 pe:
3671 the :class:`crate_anon.crateweb.research.models.PatientExplorer`
3672 that went wrong
3673 exception:
3674 the Python exception that resulted, which may have had extra
3675 information attached via
3676 :func:`cardinal_pythonlib.exceptions.add_info_to_exception`
3678 Returns:
3679 a :class:`django.http.response.HttpResponse`
3680 """
3681 info = recover_info_from_exception(exception)
3682 final_sql = info.get("sql", "")
3683 args = info.get("args", [])
3684 context = {
3685 "exception": repr(exception),
3686 "query": prettify_sql_and_args(final_sql, args),
3687 "sql_highlight_css": prettify_sql_css(),
3688 }
3689 context.update(query_context(request))
3690 return render(request, "pe_bad.html", context)
3693# def render_bad_pe_id(request: HttpRequest, pe_id: int) -> HttpResponse:
3694# context = {'pe_id': pe_id}
3695# context.update(query_context(request))
3696# return render(request, 'pe_bad_id.html', context)
3699def get_all_pes(request: HttpRequest) -> QuerySet:
3700 """
3701 Return all Patient Explorers for the current user.
3703 Args:
3704 request: the :class:`django.http.request.HttpRequest`
3706 Returns:
3707 request: a :class:`django.db.models.QuerySet` for
3708 :class:`crate_anon.crateweb.research.models.PatientExplorer` objects
3710 """
3711 return PatientExplorer.objects.filter(
3712 user=request.user, deleted=False
3713 ).order_by("-active", "-created")
3716def get_identical_pes(
3717 request: HttpRequest, pmq: PatientMultiQuery
3718) -> List[PatientExplorer]:
3719 """
3720 Return all Patient Explorers for the current user whose query is identical
3721 to the query specified.
3723 Args:
3724 request: the :class:`django.http.request.HttpRequest`
3725 pmq: a :class:`crate_anon.crateweb.research.models.PatientMultiQuery`
3727 Returns:
3728 a list of :class:`crate_anon.crateweb.research.models.PatientExplorer`
3729 objects
3731 """
3732 all_pes = get_all_pes(request)
3734 # identical_pes = all_pes.filter(patient_multiquery=pmq)
3735 #
3736 # ... this works, but does so by converting the parameter (pmq) to its
3737 # JSON representation, presumably via JsonClassField.get_prep_value().
3738 # Accordingly, we can predict problems under SQL Server with very long
3739 # strings; see the problem in query_submit().
3740 # So, we should similarly hash:
3741 identical_pes = all_pes.filter(pmq_hash=pmq.hash64)
3742 # Beware: Python's hash() function will downconvert to 32 bits on 32-bit
3743 # machines; use pmq.hash64() directly, not hash(pmq).
3745 # Double-check in Python in case of hash collision:
3746 return [pe for pe in identical_pes if pe.patient_multiquery == pmq]
3749def pe_submit(
3750 request: HttpRequest, pmq: PatientMultiQuery, run: bool = False
3751) -> HttpResponse:
3752 """
3753 Save a :class:`crate_anon.crateweb.research.models.PatientMultiQuery` as a
3754 :class:`crate_anon.crateweb.research.models.PatientExplorer` for the
3755 current user.
3757 Args:
3758 request: the :class:`django.http.request.HttpRequest`
3759 pmq: a :class:`crate_anon.crateweb.research.models.PatientMultiQuery`
3760 run: run and show results? Otherwise, save and return to the choice
3761 view
3763 Returns:
3764 a :class:`django.http.response.HttpResponse`
3766 """
3767 identical_pes = get_identical_pes(request, pmq)
3768 if identical_pes:
3769 identical_pes[0].activate()
3770 pe_id = identical_pes[0].id
3771 else:
3772 pe = PatientExplorer(
3773 patient_multiquery=pmq, user=request.user, active=True
3774 )
3775 pe.save()
3776 pe_id = pe.id
3777 # log.critical(pprint.pformat(connection.queries)) # show all queries
3778 # redirect to a new URL:
3779 if run:
3780 return redirect(UrlNames.PE_RESULTS, pe_id)
3781 else:
3782 return redirect(UrlNames.PE_CHOOSE)
3785def pe_tsv_zip(request: HttpRequest, pe_id: str) -> HttpResponse:
3786 """
3787 Return the results of a
3788 :class:`crate_anon.crateweb.research.models.PatientExplorer` as a ZIP file
3789 of TSV files.
3791 Args:
3792 request: the :class:`django.http.request.HttpRequest`
3793 pe_id: string form of the integer PK of a
3794 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3796 Returns:
3797 a :class:`django.http.response.HttpResponse`
3798 """
3799 # https://stackoverflow.com/questions/12881294/django-create-a-zip-of-multiple-files-and-make-it-downloadable # noqa: E501
3800 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3801 try:
3802 response = file_response(
3803 pe.get_zipped_tsv_binary(),
3804 content_type=ContentType.ZIP,
3805 filename="crate_pe_{num}_{datetime}.zip".format(
3806 num=pe.id,
3807 datetime=datetime_iso_for_filename(),
3808 ),
3809 )
3810 pe.audit()
3811 return response
3812 except DatabaseError as exception:
3813 pe.audit(failed=True, fail_msg=str(exception))
3814 return render_bad_pe(request, pe, exception)
3817def pe_excel(request: HttpRequest, pe_id: str) -> HttpResponse:
3818 """
3819 Return the results of a
3820 :class:`crate_anon.crateweb.research.models.PatientExplorer` as an Excel
3821 XLSX file.
3823 Args:
3824 request: the :class:`django.http.request.HttpRequest`
3825 pe_id: string form of the integer PK of a
3826 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3828 Returns:
3829 a :class:`django.http.response.HttpResponse`
3830 """
3831 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3832 try:
3833 response = file_response(
3834 pe.get_xlsx_binary(),
3835 content_type=ContentType.XLSX,
3836 filename="crate_pe_{num}_{datetime}.xlsx".format(
3837 num=pe.id,
3838 datetime=datetime_iso_for_filename(),
3839 ),
3840 )
3841 pe.audit()
3842 return response
3843 except DatabaseError as exception:
3844 pe.audit(failed=True, fail_msg=str(exception))
3845 return render_bad_pe(request, pe, exception)
3848def pe_data_finder_results(request: HttpRequest, pe_id: str) -> HttpResponse:
3849 """
3850 Shows the **data finder** view of a
3851 :class:`crate_anon.crateweb.research.models.PatientExplorer`. This counts
3852 records for each table (by patient), without showing all the data.
3854 Args:
3855 request: the :class:`django.http.request.HttpRequest`
3856 pe_id: string form of the integer PK of a
3857 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3859 Returns:
3860 a :class:`django.http.response.HttpResponse`
3861 """
3862 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3863 # noinspection PyUnresolvedReferences
3864 profile = request.user.profile # type: UserProfile
3865 patients_per_page = get_patients_per_page(request)
3866 element_counter = HtmlElementCounter()
3867 patient_id_query_html = prettify_sql_html(pe.get_patient_id_query())
3868 # If this query is done as a UNION, it's massive, e.g. ~410 characters
3869 # * number of tables (e.g. 1448 in one RiO database), for 0.5 Mb of query.
3870 # So do it more sensibly:
3871 try:
3872 mrids = pe.get_patient_mrids()
3873 page = paginate(request, mrids, per_page=patients_per_page)
3874 active_mrids = list(page) # type: List[str]
3875 results_table_html = ""
3876 query_html = ""
3877 if active_mrids:
3878 fieldnames = [] # type: List[str]
3879 rows = [] # type: List[List[Any]]
3880 for tsa in pe.patient_multiquery.gen_data_finder_queries(
3881 mrids=active_mrids
3882 ):
3883 table_identifier = tsa.table_id
3884 sql = tsa.sql
3885 args = tsa.args
3886 with pe.get_executed_cursor(sql, args) as cursor:
3887 if not fieldnames:
3888 fieldnames = get_fieldnames_from_cursor(cursor)
3889 rows = cursor.fetchall()
3890 query_html += (
3891 element_counter.visibility_div_with_divbutton(
3892 contents=prettify_sql_and_args(sql, args),
3893 title_html=f"SQL for {table_identifier}",
3894 )
3895 )
3896 results_table_html = resultset_html_table(
3897 fieldnames=fieldnames,
3898 rows=rows,
3899 element_counter=element_counter,
3900 collapse_at_len=profile.collapse_at_len,
3901 collapse_at_n_lines=profile.collapse_at_n_lines,
3902 line_length=profile.line_length,
3903 no_ditto_cols=[2, 3, 4],
3904 null="",
3905 )
3906 n_records = len(mrids)
3907 context = {
3908 "nav_on_pe_df_results": True,
3909 "some_patients": len(active_mrids) > 0,
3910 "results_table_html": results_table_html,
3911 "query_html": query_html,
3912 "page": page,
3913 "rowcount": n_records,
3914 "patient_id_query_html": patient_id_query_html,
3915 "patients_per_page": patients_per_page,
3916 "sql_highlight_css": prettify_sql_css(),
3917 }
3918 context.update(query_context(request))
3919 pe.audit(count_only=True, n_records=n_records)
3920 return render(request, "pe_df_result.html", context)
3922 except DatabaseError as exception:
3923 pe.audit(failed=True, fail_msg=str(exception))
3924 return render_bad_pe(request, pe, exception)
3927def pe_data_finder_excel(request: HttpRequest, pe_id: str) -> HttpResponse:
3928 """
3929 Serves the data finder view of a
3930 :class:`crate_anon.crateweb.research.models.PatientExplorer` (see
3931 :func:`pe_data_finder_results`) as an Excel XLSX file.
3933 Args:
3934 request: the :class:`django.http.request.HttpRequest`
3935 pe_id: string form of the integer PK of a
3936 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3938 Returns:
3939 a :class:`django.http.response.HttpResponse`
3941 """
3942 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3943 try:
3944 return file_response(
3945 pe.data_finder_excel,
3946 content_type=ContentType.XLSX,
3947 filename="crate_pe_df_{num}_{datetime}.xlsx".format(
3948 num=pe.id,
3949 datetime=datetime_iso_for_filename(),
3950 ),
3951 )
3952 except DatabaseError as exception:
3953 return render_bad_pe(request, pe, exception)
3956def pe_monster_results(request: HttpRequest, pe_id: str) -> HttpResponse:
3957 """
3958 Shows the **monster data** view of a
3959 :class:`crate_anon.crateweb.research.models.PatientExplorer`. This performs
3960 a ``SELECT(*)`` for all rows retrieved by the PatientExplorer.
3962 Args:
3963 request: the :class:`django.http.request.HttpRequest`
3964 pe_id: string form of the integer PK of a
3965 :class:`crate_anon.crateweb.research.models.PatientExplorer`
3967 Returns:
3968 a :class:`django.http.response.HttpResponse`
3970 """
3971 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
3972 research_database_info = get_research_db_info()
3973 grammar = research_database_info.grammar
3974 # noinspection PyUnresolvedReferences
3975 profile = request.user.profile # type: UserProfile
3976 highlights = Highlight.get_active_highlights(request)
3977 highlight_dict = Highlight.as_ordered_dict(highlights)
3978 element_counter = HtmlElementCounter()
3979 patient_id_query_html = prettify_sql_html(pe.get_patient_id_query())
3980 patients_per_page = get_patients_per_page(request)
3981 try:
3982 rids = pe.get_patient_mrids()
3983 page = paginate(request, rids, per_page=patients_per_page)
3984 active_rids = list(page)
3985 results = [] # type: List[Dict[str, Any]]
3986 pmq = pe.patient_multiquery
3987 if active_rids:
3988 for tsa in pmq.gen_monster_queries(mrids=active_rids):
3989 table_id = tsa.table_id
3990 sql = tsa.sql
3991 args = tsa.args
3992 with pe.get_executed_cursor(sql, args) as cursor:
3993 fieldnames = get_fieldnames_from_cursor(cursor)
3994 rows = cursor.fetchall()
3995 if rows:
3996 table_html = resultset_html_table(
3997 fieldnames=fieldnames,
3998 rows=rows,
3999 element_counter=element_counter,
4000 highlight_dict=highlight_dict,
4001 collapse_at_len=profile.collapse_at_len,
4002 collapse_at_n_lines=profile.collapse_at_n_lines,
4003 line_length=profile.line_length,
4004 )
4005 else:
4006 table_html = "<div><i>No data</i></div>"
4007 query_html = element_counter.visibility_div_with_divbutton(
4008 contents=prettify_sql_and_args(sql, args),
4009 title_html="SQL",
4010 )
4011 results.append(
4012 {
4013 "tablename": table_id.identifier(grammar),
4014 "table_html": table_html,
4015 "query_html": query_html,
4016 }
4017 )
4018 context = {
4019 "nav_on_pe_monster_results": True,
4020 "results": results,
4021 "page": page,
4022 "rowcount": len(rids),
4023 "patient_id_query_html": patient_id_query_html,
4024 "patients_per_page": patients_per_page,
4025 "sql_highlight_css": prettify_sql_css(),
4026 }
4027 context.update(query_context(request))
4028 return render(request, "pe_monster_result.html", context)
4030 except DatabaseError as exception:
4031 return render_bad_pe(request, pe, exception)
4034def pe_table_browser(request: HttpRequest, pe_id: str) -> HttpResponse:
4035 """
4036 Shows the **table browser** view of a
4037 :class:`crate_anon.crateweb.research.models.PatientExplorer`. This shows a
4038 list of all tables in the database, with hyperlinks to a single-table
4039 Patient Explorer view for each.
4041 Args:
4042 request: the :class:`django.http.request.HttpRequest`
4043 pe_id: string form of the integer PK of a
4044 :class:`crate_anon.crateweb.research.models.PatientExplorer`
4046 Returns:
4047 a :class:`django.http.response.HttpResponse`
4049 """
4050 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
4051 research_database_info = get_research_db_info()
4052 tables = research_database_info.get_tables()
4053 with_database = research_database_info.uses_database_level()
4054 try:
4055 context = {
4056 "nav_on_pe_table_browser": True,
4057 "pe_id": pe_id,
4058 "tables": tables,
4059 "with_database": with_database,
4060 }
4061 context.update(query_context(request))
4062 return render(request, "pe_table_browser.html", context)
4064 except DatabaseError as exception:
4065 return render_bad_pe(request, pe, exception)
4068def pe_one_table(
4069 request: HttpRequest, pe_id: str, schema: str, table: str, db: str = ""
4070) -> HttpResponse:
4071 """
4072 Shows the **single table** view of a
4073 :class:`crate_anon.crateweb.research.models.PatientExplorer`. This shows
4074 results from a single table.
4076 Args:
4077 request: the :class:`django.http.request.HttpRequest`
4078 pe_id: string form of the integer PK of a
4079 :class:`crate_anon.crateweb.research.models.PatientExplorer`
4080 schema: name of the table's schema
4081 table: name of the table
4082 db: name of the table's database (above the schema level), if appliable
4084 Returns:
4085 a :class:`django.http.response.HttpResponse`
4087 .. todo:: pe_one_table
4089 Might it be better to feed the resulting query back into the main
4090 Query system, allowing users to turn columns on/off, etc.?
4092 At present it forces ``query_id`` to ``None`` and this is detected by
4093 ``query_result.html``.
4095 """
4096 pe = get_object_or_404(PatientExplorer, id=pe_id) # type: PatientExplorer
4097 research_database_info = get_research_db_info()
4098 table_id = TableId(db=db, schema=schema, table=table)
4099 grammar = research_database_info.grammar
4100 highlights = Highlight.get_active_highlights(request)
4101 highlight_dict = Highlight.as_ordered_dict(highlights)
4102 element_counter = HtmlElementCounter()
4103 # noinspection PyUnresolvedReferences
4104 profile = request.user.profile # type: UserProfile
4105 patients_per_page = get_patients_per_page(request)
4106 try:
4107 mrids = pe.get_patient_mrids()
4108 page = paginate(request, mrids, per_page=patients_per_page)
4109 active_mrids = list(page)
4110 table_html = "<div><i>No data</i></div>"
4111 sql = ""
4112 args = [] # type: List[Any]
4113 rowcount = 0
4114 if active_mrids:
4115 mrid_column = research_database_info.get_mrid_column_from_table(
4116 table_id
4117 )
4118 where_clause = "{mrid} IN ({in_clause})".format(
4119 mrid=mrid_column.identifier(grammar),
4120 in_clause=",".join(["?"] * len(active_mrids)),
4121 ) # ... see notes for translate_sql_qmark_to_percent()
4122 args = active_mrids
4123 sql = add_to_select(
4124 sql="",
4125 select_elements=[
4126 SelectElement(
4127 raw_select="*", from_table_for_raw_select=table_id
4128 )
4129 ],
4130 grammar=grammar,
4131 where_conditions=[
4132 WhereCondition(
4133 raw_sql=where_clause,
4134 from_table_for_raw_sql=mrid_column.table_id,
4135 )
4136 ],
4137 magic_join=True,
4138 formatted=True,
4139 )
4140 with pe.get_executed_cursor(sql, args) as cursor:
4141 fieldnames = get_fieldnames_from_cursor(cursor)
4142 rows = cursor.fetchall()
4143 rowcount = cursor.rowcount
4144 if rows:
4145 table_html = resultset_html_table(
4146 fieldnames=fieldnames,
4147 rows=rows,
4148 element_counter=element_counter,
4149 highlight_dict=highlight_dict,
4150 collapse_at_len=profile.collapse_at_len,
4151 collapse_at_n_lines=profile.collapse_at_n_lines,
4152 line_length=profile.line_length,
4153 )
4154 # Render
4155 context = {
4156 "table_html": table_html,
4157 "page": page,
4158 "query_id": None,
4159 "rowcount": rowcount,
4160 "sql": prettify_sql_and_args(sql=sql, args=args),
4161 "sql_highlight_css": prettify_sql_css(),
4162 }
4163 context.update(query_context(request))
4164 return render(request, "query_result.html", context)
4166 except DatabaseError as exception:
4167 return render_bad_pe(request, pe, exception)
4170# =============================================================================
4171# Archive and visualization zone system
4172# =============================================================================
4174# -----------------------------------------------------------------------------
4175# Archive views
4176# -----------------------------------------------------------------------------
4179def launch_archive(request: HttpRequest) -> HttpResponse:
4180 """
4181 Takes the submitted ``patient_id`` (from ``request.POST``) and launches
4182 the archive's root page for that patient.
4184 Args:
4185 request:
4186 the Django :class:`HttpRequest` object
4187 """
4188 if not ARCHIVE_IS_CONFIGURED:
4189 return archive_misconfigured_response()
4190 return redirect(archive_root_url())
4193@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_TEMPLATES)
4194def archive_template(request: HttpRequest) -> HttpResponse:
4195 """
4196 Provides the views for the configurable "archive" system.
4198 Args:
4199 request:
4200 the Django :class:`HttpRequest` object
4202 Returns:
4203 a Django :class:`HttpResponse`.
4205 Note:
4207 - The archive template name is in ``request.GET``; this allows the use of
4208 special (e.g. filename) characters.
4210 - The patient ID is also in ``request.GET``; this allows the optional use
4211 of more complex strings, e.g. JSON, to represent multiple ID numbers for
4212 use with several databases.
4214 - Additional arguments are also in ``request.GET``.
4216 - To create a URL within a Django template, one would use e.g.
4218 .. code-block:: none
4220 <a href="{% url 'archive' patient_id template_name %}">link text</a>
4222 (the ``'archive'`` bit being configured in
4223 :mod:`crate_anon.crateweb.config.urls`).
4225 - But to create a URL within a Mako template, there are several ways...
4226 for example, in CamCOPS via Pyramid, we use ``request.route_url(...)``.
4227 We can make this simple by passing patient-specific function to the
4228 context; see :ref:`the Python Mako context <archive_mako_context>`.
4230 - If we use DMP, it will add a ``request.dmp`` object; see
4231 https://doconix.github.io/django-mako-plus/topics_variables.html.
4232 (We won't use DMP.)
4234 """
4235 if not ARCHIVE_IS_CONFIGURED:
4236 return archive_misconfigured_response()
4238 # noinspection PyCallByClass,PyArgumentList
4239 template_name = request.GET.get(UrlKeys.TEMPLATE)
4240 if not template_name:
4241 return HttpResponseBadRequest(
4242 f"URL arguments must include the key {UrlKeys.TEMPLATE!r}"
4243 )
4244 # log.debug(f"Archive template request: {template_name!r}")
4245 # noinspection PyCallByClass,PyArgumentList
4246 patient_id = request.GET.get(UrlKeys.PATIENT_ID, "")
4248 # -------------------------------------------------------------------------
4249 # URL builders
4250 # -------------------------------------------------------------------------
4251 def same_patient_template_url(_template: str, **kw) -> str:
4252 """
4253 Returns a URL to a template for the same patient.
4254 """
4255 return archive_template_url(_template, patient_id=patient_id, **kw)
4257 def same_patient_attachment_url(_filename: str, **kw) -> str:
4258 """
4259 Returns a URL to an attachment, marked as being for the same patient.
4260 """
4261 return archive_attachment_url(_filename, patient_id=patient_id, **kw)
4263 # -------------------------------------------------------------------------
4264 # Build context
4265 # -------------------------------------------------------------------------
4266 context = copy.copy(ARCHIVE_CONTEXT)
4267 context.update(
4268 {
4269 ArchiveContextKeys.get_patient_template_url: same_patient_template_url, # noqa: E501
4270 ArchiveContextKeys.get_template_url: archive_template_url,
4271 ArchiveContextKeys.get_attachment_url: same_patient_attachment_url,
4272 ArchiveContextKeys.CRATE_HOME_URL: reverse("home"),
4273 ArchiveContextKeys.execute: get_executed_researchdb_cursor_qmark_placeholders, # noqa: E501
4274 ArchiveContextKeys.patient_id: patient_id,
4275 ArchiveContextKeys.query_params: request.GET,
4276 ArchiveContextKeys.request: request,
4277 ArchiveContextKeys.get_static_url: archive_static_url,
4278 }
4279 )
4280 # log.debug("Archive template {!r} with context {!r}",
4281 # template_name, context)
4283 # -------------------------------------------------------------------------
4284 # Render
4285 # -------------------------------------------------------------------------
4286 try:
4287 template = archive_mako_lookup.get_template(template_name)
4288 except TemplateLookupException:
4289 return HttpResponseBadRequest(
4290 f"No such archive template: {template_name!r}"
4291 )
4292 html = template.render(**context)
4294 # noinspection PyArgumentList
4295 query_string = request.GET.urlencode()
4296 audit_archive_template(request, patient_id, query_string)
4298 return HttpResponse(html)
4301@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_ATTACHMENTS)
4302def archive_attachment(request: HttpRequest) -> HttpResponseBase:
4303 """
4304 Serve a binary file from the archive.
4306 The patient_id is not required to find the file -- but is used for the
4307 audit trail. (It would be possible for the end user to look up a file by
4308 name via a faked patient_id. However, this could be established from the
4309 audit log.)
4311 Args:
4312 request:
4313 the Django :class:`HttpRequest` object
4314 """
4315 if not ARCHIVE_IS_CONFIGURED:
4316 return archive_misconfigured_response()
4318 # noinspection PyCallByClass,PyArgumentList
4319 patient_id = request.GET.get(UrlKeys.PATIENT_ID)
4320 if not patient_id:
4321 return HttpResponseBadRequest(
4322 f"URL arguments must include the key {UrlKeys.PATIENT_ID!r}"
4323 )
4324 # noinspection PyCallByClass,PyArgumentList,PyTypeChecker
4325 content_type = request.GET.get(UrlKeys.CONTENT_TYPE, None)
4326 # noinspection PyCallByClass,PyArgumentList
4327 filename = request.GET.get(UrlKeys.FILENAME)
4328 if not filename:
4329 return HttpResponseBadRequest(
4330 f"URL arguments must include the key {UrlKeys.FILENAME!r}"
4331 )
4332 # log.debug(f"Archive attachment request: {filename!r}")
4333 try:
4334 # noinspection PyArgumentList,PyCallByClass
4335 guess_content_type = bool(
4336 int(request.GET.get(UrlKeys.GUESS_CONTENT_TYPE))
4337 )
4338 except (TypeError, ValueError):
4339 guess_content_type = DEFAULT_GUESS_CONTENT_TYPE
4340 # noinspection PyCallByClass,PyTypeChecker
4341 offered_filename = request.GET.get(UrlKeys.OFFERED_FILENAME, None)
4343 full_filename = get_archive_attachment_filepath(filename)
4344 if not full_filename:
4345 return HttpResponseBadRequest(
4346 f"Invalid archive attachment filename: {filename!r}"
4347 )
4349 if content_type:
4350 final_content_type = content_type
4351 elif guess_content_type:
4352 final_content_type = guess_mimetype(filename)
4353 else:
4354 final_content_type = None # ensure it's not ""
4355 # If content_type is None, we wil end up with "application/force-download",
4356 # which is OK.
4358 prefer_inline = bool(final_content_type)
4360 offered_filename = offered_filename or basename(filename)
4362 # log.critical(
4363 # f"archive_attachment(): "
4364 # f"content_type: {content_type!r}, "
4365 # f"guess_content_type: {guess_content_type!r}, "
4366 # f"final_content_type: {final_content_type!r}, "
4367 # f"filename: {filename!r}, "
4368 # f"full_filename: {full_filename!r}, "
4369 # f"offered_filename: {offered_filename!r}, "
4370 # f"prefer_inline: {prefer_inline!r}")
4372 audit_archive_attachment(request, patient_id, filename)
4374 return serve_file(
4375 path_to_file=full_filename,
4376 offered_filename=offered_filename,
4377 content_type=final_content_type,
4378 as_attachment=not prefer_inline,
4379 as_inline=prefer_inline,
4380 )
4383@cache_control(private=True, max_age=CACHE_CONTROL_MAX_AGE_ARCHIVE_STATIC)
4384def archive_static(request: HttpRequest) -> HttpResponseBase:
4385 """
4386 Serve a static file from the archive.
4388 Args:
4389 request:
4390 the Django :class:`HttpRequest` object
4391 """
4392 if not ARCHIVE_IS_CONFIGURED:
4393 return archive_misconfigured_response()
4395 # noinspection PyCallByClass,PyArgumentList
4396 filename = request.GET.get(UrlKeys.FILENAME)
4397 # log.debug(f"Archive static request: {filename!r}")
4398 full_filename = get_archive_static_filepath(filename)
4399 if not full_filename:
4400 return HttpResponseBadRequest(
4401 f"Invalid archive static filename: {filename!r}"
4402 )
4404 return serve_file(
4405 path_to_file=full_filename,
4406 as_attachment=False,
4407 as_inline=False,
4408 default_content_type=None,
4409 )