Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ utils \ converters.py: 0%
246 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1"""
2Converters to convert old JSON schema to new JSON schema and then to pydantic basemodel
3and then to pydantic basemodel with types.
5"""
7import json
9# =====================================================
10# Imports
11# =====================================================
12from pathlib import Path
13from typing import Any, Dict, Union
15import black
16import isort
17from loguru import logger
20# try:
21# from datamodel_code_generator import DataModelType, PythonVersion
22# from datamodel_code_generator.model import get_data_model_types
23# from datamodel_code_generator.parser.jsonschema import JsonSchemaParser
24# except ImportError:
25# logger.warning(
26# "datamodel-codegen is not installed. Please install it using 'pip install datamodel-codegen'."
27# )
29# =====================================================
30# Constants
31# =====================================================
32# Define the path to the standards directory and the mt_metadata directory
33STANDARDS_SAVEPATH = Path(__file__).parent.parent.joinpath("standards")
34MTMETADATA_SAVEPATH = Path(__file__).parent.parent
36TYPE_MAPPING = {
37 "string": "str",
38 "integer": "int",
39 "number": "float",
40 "boolean": "bool",
41 "bool": "bool",
42 "array": "List[Any]",
43 "object": "Dict[str, Any]",
44}
46JSON_TYPE_MAPPING = {
47 "string": "string",
48 "integer": "integer",
49 "float": "number",
50 "boolean": "boolean",
51 "array": "array",
52 "object": "object",
53 "null": "null",
54}
55TAB = " " * 4
56# =====================================================
59def load_json(filename: Union[str, Path]) -> Dict[str, Any]:
60 """
61 Load a JSON file and return its contents as a dictionary.
63 Args:
64 filename (Union[str, Path]): The path to the JSON file.
66 Returns:
67 Dict[str, Any]: The contents of the JSON file as a dictionary.
68 """
69 with open(filename, "r") as f:
70 data = json.load(f)
71 return data
74def write_json(filename: Union[str, Path], data: Dict[str, Any]) -> None:
75 """
76 Write a dictionary to a JSON file.
78 Args:
79 filename (Union[str, Path]): The path to the JSON file.
80 data (Dict[str, Any]): The data to write to the file.
81 """
82 with open(filename, "w") as f:
83 json.dump(data, f, indent=4)
86def get_default_value(
87 data_type: str, default_value: Any = None, required: bool = False
88) -> Any:
89 """
90 Get default value based on information provided.
92 Parameters
93 ----------
94 data_type : str
95 data type name
96 default_value : Any, optional
97 given default value, by default None
98 required : bool, optional
99 is required, by default False
101 Returns
102 -------
103 Any
104 default value
105 """
107 if not required:
108 return None
110 if data_type in ["string"]:
111 if default_value is None:
112 return ""
113 else:
114 return f"'{str(default_value)}'"
115 elif data_type in ["int"]:
116 if default_value is None:
117 return 0
118 else:
119 return int(default_value)
120 elif data_type in ["float", "number"]:
121 if default_value is None:
122 return 0.0
123 elif isinstance(default_value, str):
124 try:
125 return float(default_value)
126 except ValueError:
127 return 0.0
128 elif isinstance(default_value, (list, tuple)):
129 return []
131 else:
132 return float(default_value)
133 elif data_type in ["boolean"]:
134 return bool(default_value)
137def get_alias_name(alias_name: str) -> str:
138 """
139 Get the alias name, and return None if empty
141 Parameters
142 ----------
143 alias_name : str
144 alias name
145 """
146 if alias_name in [[], None, "", "None", "none"]:
147 return None
148 else:
149 return alias_name
152def get_new_basemodel_filename(
153 filename: Path | str, save_path: Path = MTMETADATA_SAVEPATH
154) -> Path:
155 """
156 Get new file name for new BaseModel.
158 Will place into `mt_metadata/mt_metadata/...`
160 Parameters
161 ----------
162 filename : Path | str
163 json schema standards file name
164 save_path : Path, optional
165 default path to save to, by default MTMETADATA_SAVEPATH
167 Returns
168 -------
169 Path
170 new file path to new BaseModel object.
171 """
172 filename = Path(filename)
173 # Get the parts of the filename
174 parts = Path(filename).parts
175 index = parts.index("standards") + 1
176 new_file_directory = save_path.joinpath("\\".join(parts[index:-1]))
177 new_file_directory.mkdir(parents=True, exist_ok=True)
178 new_filename = new_file_directory.joinpath(f"{filename.stem}_basemodel.py")
179 return new_filename
182def get_new_schema_filename(
183 filename: str | Path, save_path: Path = STANDARDS_SAVEPATH
184) -> Path:
185 """
186 Get new file path to a JSON schema file. Will be place into
187 `mt_metadata/mt_metadata/standards/...`
189 Parameters
190 ----------
191 filename : str | Path
192 old JSON file
193 save_path : Path, optional
194 default directory to save to, by default STANDARDS_SAVEPATH
196 Returns
197 -------
198 Path
199 new file path to JSON Schema file.
200 """
202 parts = Path(filename).parts
203 index = parts.index("mt_metadata") + 2
204 new_file_directory = save_path.joinpath("\\".join(parts[index:-2]))
205 new_file_directory.mkdir(parents=True, exist_ok=True)
206 new_filename = new_file_directory.joinpath(filename.name)
207 return new_filename
210def to_json_schema(filename: str | Path) -> Path:
211 """
212 Convert old JSON files to a JSON Schema file.
214 Parameters
215 ----------
216 filename : Union[str, Path]
217 file path to old JSON file
219 Returns
220 -------
221 Path
222 File path to new JSON Schema file
224 Raises
225 ------
226 KeyError
227 if `type` is not in old JSON file
228 """
229 filename = Path(filename)
230 old = load_json(filename)
231 object_name = filename.stem
233 new = {"title": object_name}
234 new["type"] = "object"
235 new["properties"] = {}
236 new["required"] = []
237 new["description"] = object_name
238 for key, value in old.items():
239 new["properties"][key] = {}
241 # map type to JSON schema type
242 try:
243 json_type = JSON_TYPE_MAPPING[value["type"]]
244 except KeyError:
245 raise KeyError(f"Could not find the type {value['type']} in the type dict.")
246 # if the style is a list then use array
247 if "list" in value["style"]:
248 new["properties"][key]["type"] = "array"
249 new["properties"][key]["default"] = []
250 new["properties"][key]["items"] = {}
251 new["properties"][key]["items"]["type"] = json_type
253 else:
254 new["properties"][key]["default"] = get_default_value(
255 value["type"],
256 default_value=value["default"],
257 required=value["required"],
258 )
259 new["properties"][key]["description"] = value["description"]
260 new["properties"][key]["title"] = key
261 new["properties"][key]["examples"] = value["example"]
262 new["properties"][key]["type"] = json_type
263 new["properties"][key]["alias"] = get_alias_name(value["alias"])
264 new["properties"][key]["units"] = value["units"]
265 if value["required"]:
266 new["required"].append(key)
268 # need to sort out string formats
269 if value["style"] == "controlled vocabulary":
270 new["properties"][key]["enum"] = value["options"]
272 elif value["style"] == "alpha numeric":
273 new["properties"][key]["pattern"] = "^[a-zA-Z0-9]*$"
275 elif value["style"] in ["date time", "date", "time"]:
276 new["properties"][key]["format"] = "date-time"
278 elif value["style"] in ["email"]:
279 new["properties"][key]["format"] = "email"
281 elif value["style"] in ["url"]:
282 new["properties"][key]["format"] = "uri"
284 # write new file
285 new_file = get_new_schema_filename(filename)
286 write_json(new_file, new)
288 return new_file
291# def from_jsonschema_to_pydantic_basemodel(filename: Union[str, Path], **kwargs) -> Path:
292# """
293# make basemodel from json schema
295# Parameters
296# ----------
297# filename : _type_
298# _description_
299# """
300# filename = Path(filename)
301# new_filename = get_new_basemodel_filename(filename, MTMETADATA_SAVEPATH)
303# data_model_types = get_data_model_types(
304# DataModelType.PydanticV2BaseModel,
305# target_python_version=PythonVersion.PY_311,
306# target_datetime_class=MTime,
307# )
309# parser = JsonSchemaParser(
310# filename,
311# data_model_type=data_model_types.data_model,
312# data_model_root_type=data_model_types.root_model,
313# data_model_field_type=data_model_types.field_model,
314# data_type_manager_type=data_model_types.data_type_manager,
315# dump_resolve_reference_action=data_model_types.dump_resolve_reference_action,
316# field_extra_keys=["alias", "units", "default", "required"],
317# use_annotated=True,
318# use_union_operator=True,
319# field_constraints=True,
320# snake_case_field=True,
321# allow_extra_fields=True,
322# strip_default_none=False,
323# field_include_all_keys=True,
324# apply_default_values_for_required_fields=True,
325# )
327# result = parser.parse()
329# with open(new_filename, "w") as fid:
330# fid.write(result)
332# return new_filename
335def snake_to_camel(snake_str: str) -> str:
336 components = snake_str.split("_")
337 camel_case_str = "".join(x.title() for x in components)
338 return camel_case_str
341type_imports = {
342 "List": "from typing import List",
343 "Dict": "from typing import Dict",
344 "Any": "from typing import Any",
345}
348def generate_pydantic_basemodel(json_schema_filename: Union[str, Path]) -> str:
349 """
350 Generate a Pydantic model from a JSON schema file and save it to a Python file.
351 The generated model will use `Annotated` and `Field` for type annotations.
353 Parameters
354 ----------
355 json_schema_filename : str | Path
356 path to the JSON schema file
358 Returns
359 -------
360 Path
361 _description_
362 """
363 json_schema_filename = Path(json_schema_filename)
364 if not json_schema_filename.exists():
365 raise FileNotFoundError(f"{json_schema_filename} does not exist.")
366 if not json_schema_filename.suffix == ".json":
367 raise FileNotFoundError(
368 f"{json_schema_filename} is not a json file. Please provide a json file."
369 )
371 with open(json_schema_filename, "r") as fid:
372 schema = json.load(fid)
374 new_filename = get_new_basemodel_filename(json_schema_filename, MTMETADATA_SAVEPATH)
376 class_definitions = []
377 class_name = snake_to_camel(schema.get("title", "GeneratedModel"))
379 required_fields = schema.get("required", [])
380 properties = schema.get("properties", {})
382 imports = ["from typing import Annotated", "from pydantic import Field"]
384 datetime_keys = []
385 enum_lines = []
386 has_comment = False
387 has_units = False
388 # Create field definitions
389 for field_name, field_attrs in properties.items():
390 if field_name in ["units", "unit"]:
391 has_units = True
392 # Check if the field is a comment
393 elif field_name in ["comments", "comment"]:
394 has_comment = True
395 field_type = "Comment"
396 imports.append("from mt_metadata.common import Comment")
397 field_attrs["default_factory"] = "lambda: Comment()"
398 # class_definitions.append(f"{TAB}{field_name}: {field_type}")
400 # continue
401 # Fallback to Any if type is unknown
402 else:
403 field_type = TYPE_MAPPING.get(field_attrs.get("type", "string"), "Any")
404 # get typing imports
405 for type_key in type_imports.keys():
406 if type_key in field_type:
407 imports.append(type_imports[type_key])
409 # if date time then use MTime as the object, need to add some types
410 # a default factory.
411 if field_attrs.get("format") == "date-time":
412 field_type = "MTime | str | float | int | np.datetime64 | pd.Timestamp"
413 imports.append("import numpy as np")
414 imports.append("import pandas as pd")
415 imports.append("from mt_metadata.common.mttime import MTime")
416 field_attrs["default_factory"] = "lambda: MTime(time_stamp=None)"
417 datetime_keys.append(field_name)
419 # if email format the use EmailStr object and import
420 elif field_attrs.get("format") == "email":
421 field_type = "EmailStr"
422 imports.append("from pydantic import EmailStr")
423 # if uri format the use HttpUrl object and import
424 elif field_attrs.get("format") == "uri":
425 field_type = "HttpUrl"
426 imports.append("from pydantic import HttpUrl")
428 # enumerated types
429 if field_attrs.get("enum", None) is not None:
430 # Convert enum list to a string representation
431 enum_lines.append(f"class {snake_to_camel(field_name)}Enum(str, Enum):")
432 for enum_value in field_attrs["enum"]:
433 enum_lines.append(
434 f"{TAB}{enum_value.replace(' ', '_')} = '{enum_value}'"
435 )
436 imports.append("from enum import Enum")
437 field_type = f"{snake_to_camel(field_name)}Enum"
439 # check if required. Again required is a metadata standard not
440 # a pydantic standard. If required in pydantic then the user
441 # must supply a default value. Which is not the older way
442 # mt-metadata was used, and not the desired way of using it.
443 field_attrs["required"] = True
444 if field_name not in required_fields:
445 if "Comment" in field_type:
446 field_type = "Comment"
447 else:
448 field_type = f"{field_type} | None"
449 field_attrs["required"] = False
451 # get the default value based on type
452 field_default = get_default_value(
453 field_attrs["type"],
454 default_value=field_attrs["default"],
455 required=field_name in required_fields,
456 )
457 # "" is skipped by pydantic need to set it at "''"
458 if field_default in [""]:
459 field_default = "''"
460 elif isinstance(field_default, str) and "''" in field_default:
461 field_default = field_default.replace("''", '"')
462 if field_default == '"':
463 field_default = '""'
465 # Use Annotated with Field
466 field_definition = f"{TAB}{field_name}: Annotated[{field_type}, Field("
467 field_parts = [field_definition]
469 # Add attributes to Field
470 if field_attrs.get("default_factory", None) is None:
471 field_parts.append(f"{TAB}default={field_default},")
472 else:
473 field_parts.append(
474 f"{TAB}default_factory={field_attrs['default_factory']},"
475 )
477 # need to add json_schema_extra attributes [units, required]
478 json_schema_extra = {}
479 for attr_name, attr_value in field_attrs.items():
480 if attr_name in [
481 "default",
482 "title",
483 "format",
484 "enum",
485 "type",
486 "default_factory",
487 ]:
488 continue
489 elif attr_name in ["examples"]:
490 attr_value = [attr_value]
491 # newer versions of pydantic use examples in json_schema_extra
492 # field_parts.append(f"{TAB}{attr_name}={repr(attr_value)},")
493 json_schema_extra["examples"] = repr(attr_value)
494 elif attr_name in ["units", "required"]:
495 json_schema_extra[attr_name] = attr_value
497 else:
498 field_parts.append(f"{TAB}{attr_name}={repr(attr_value)},")
500 # Add json_schema_extra as a dictionary
501 if json_schema_extra:
502 json_extra_line = f"{TAB}json_schema_extra=" + "{"
503 for jkey, jvalue in json_schema_extra.items():
504 json_extra_line += f"'{jkey}':{repr(jvalue)},"
505 json_extra_line += "},\n"
506 field_parts.append(json_extra_line)
508 # if field_attrs["required"]:
509 field_parts.append(f"{TAB})]\n")
510 # else:
511 # field_parts.append(f")] = {field_default}\n")
513 class_definitions.append("\n".join(field_parts))
515 if datetime_keys:
516 imports.append("from pydantic import field_validator")
517 for key in datetime_keys:
518 class_definitions.append(
519 f"{TAB}@field_validator('{key}', mode='before')\n"
520 f"{TAB}@classmethod\n"
521 f"{TAB}def validate_{key}(cls, field_value: MTime | float | int | np.datetime64 | pd.Timestamp | str):\n"
522 f"{TAB*2}return MTime(time_stamp=field_value)\n"
523 )
525 if has_comment:
526 class_definitions.append(
527 f"{TAB}@field_validator('comments', mode='before')\n"
528 f"{TAB}@classmethod\n"
529 f"{TAB}def validate_comments(cls, value, info: ValidationInfo) -> Comment:\n"
530 f"{TAB*2}if isinstance(value, str):\n"
531 f"{TAB*3}return Comment(value=value)\n"
532 f"{TAB*2}return value\n"
533 )
535 imports.append("from pydantic import field_validator, ValidationInfo")
537 if has_units:
538 print(f"adding units to {new_filename}")
539 class_definitions.append(
540 f"{TAB}@field_validator('units', mode='before')\n"
541 f"{TAB}@classmethod\n"
542 f"{TAB}def validate_units(cls, value: str) -> str:\n"
543 f"{TAB*2}if value in [None, '']:\n"
544 f"{TAB*3}return ''\n"
545 f"{TAB*2}try:\n"
546 f"{TAB*3}unit_object = get_unit_object(value)\n"
547 f"{TAB*3}return unit_object.name\n"
548 f"{TAB*2}except ValueError as error:\n"
549 f"{TAB*3}raise KeyError(error)\n"
550 f"{TAB*2}except KeyError as error:\n"
551 f"{TAB*3}raise KeyError(error)\n"
552 )
553 imports.append("from mt_metadata.common.units import get_unit_object")
554 imports.append("from pydantic import field_validator, ValidationInfo")
556 # Generate the class definition, dont need config dict as that is
557 # already initiated in MetadataBase.
558 class_code = [
559 f"class {class_name}(MetadataBase):",
560 "\n".join(class_definitions) or f"{TAB}pass",
561 ]
563 imports = "\n".join(imports)
564 lines = [
565 "#=====================================================",
566 "# Imports",
567 "#=====================================================",
568 f"{imports}",
569 "from mt_metadata.base import MetadataBase",
570 "#=====================================================",
571 ]
573 lines += enum_lines
574 lines += class_code
575 line = "\n".join(lines)
577 return clean_and_format_code(line, new_filename)
580def clean_and_format_code(code_str: str, filename: str | Path | None = None) -> str:
581 """
582 Clean and format Python code by removing unused imports and formatting with isort and black.
584 Parameters
585 ----------
586 code_str : str
587 Python code as a string
588 filename : str, optional
589 Filename for error reporting, by default None
591 Returns
592 -------
593 str
594 Cleaned and formatted code
595 """
596 # First, remove unused imports using autoflake
597 try:
598 import autoflake
600 code_str = autoflake.fix_code(
601 code_str,
602 remove_all_unused_imports=True,
603 remove_unused_variables=False,
604 expand_star_imports=True,
605 )
606 except ImportError:
607 logger.warning(
608 "autoflake is not installed. Unused imports will not be removed. "
609 "Install with 'pip install autoflake'."
610 )
611 except Exception as error:
612 if filename:
613 logger.warning(f"{filename} Error removing unused imports: {error}")
614 else:
615 logger.warning(f"Error removing unused imports: {error}")
617 # Then format using isort
618 try:
619 import_config = {
620 "force_single_line": False, # One import per line
621 "force_alphabetical_sort_within_sections": True, # Sort alphabetically within sections
622 "order_by_type": True, # Order by import type
623 "sections": ["FUTURE", "STDLIB", "THIRDPARTY", "FIRSTPARTY", "LOCALFOLDER"],
624 "lines_after_imports": 2, # Add 2 blank lines after imports
625 }
627 code_str = isort.code(code_str, **import_config)
628 except Exception as error:
629 if filename:
630 logger.warning(f"{filename} Error formatting code using isort: {error}")
631 else:
632 logger.warning(f"Error formatting code using isort: {error}")
634 # Finally format using black
635 try:
636 code_str = black.format_str(code_str, mode=black.FileMode())
637 except Exception as error:
638 if filename:
639 logger.warning(f"{filename} Error formatting code using black: {error}")
640 else:
641 logger.warning(f"Error formatting code using black: {error}")
643 # Write the formatted code back to the file
644 if filename is not None:
645 with open(filename, "w") as f:
646 f.write(code_str)
648 return code_str
651def reformat(filename: str | Path) -> str:
652 """
653 Reformat a Python file by removing unused imports and formatting with isort and black.
655 Parameters
656 ----------
657 filename : str | Path
658 Path to the Python file to be reformatted
659 """
660 filename = Path(filename)
661 if not filename.exists():
662 raise FileNotFoundError(f"{filename} does not exist.")
664 with open(filename, "r") as f:
665 code_str = f.read()
667 # Clean and format the code
668 return clean_and_format_code(code_str, filename)