Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ utils \ converters.py: 0%

246 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 00:11 -0800

1""" 

2Converters to convert old JSON schema to new JSON schema and then to pydantic basemodel 

3and then to pydantic basemodel with types. 

4 

5""" 

6 

7import json 

8 

9# ===================================================== 

10# Imports 

11# ===================================================== 

12from pathlib import Path 

13from typing import Any, Dict, Union 

14 

15import black 

16import isort 

17from loguru import logger 

18 

19 

20# try: 

21# from datamodel_code_generator import DataModelType, PythonVersion 

22# from datamodel_code_generator.model import get_data_model_types 

23# from datamodel_code_generator.parser.jsonschema import JsonSchemaParser 

24# except ImportError: 

25# logger.warning( 

26# "datamodel-codegen is not installed. Please install it using 'pip install datamodel-codegen'." 

27# ) 

28 

29# ===================================================== 

30# Constants 

31# ===================================================== 

32# Define the path to the standards directory and the mt_metadata directory 

33STANDARDS_SAVEPATH = Path(__file__).parent.parent.joinpath("standards") 

34MTMETADATA_SAVEPATH = Path(__file__).parent.parent 

35 

36TYPE_MAPPING = { 

37 "string": "str", 

38 "integer": "int", 

39 "number": "float", 

40 "boolean": "bool", 

41 "bool": "bool", 

42 "array": "List[Any]", 

43 "object": "Dict[str, Any]", 

44} 

45 

46JSON_TYPE_MAPPING = { 

47 "string": "string", 

48 "integer": "integer", 

49 "float": "number", 

50 "boolean": "boolean", 

51 "array": "array", 

52 "object": "object", 

53 "null": "null", 

54} 

55TAB = " " * 4 

56# ===================================================== 

57 

58 

59def load_json(filename: Union[str, Path]) -> Dict[str, Any]: 

60 """ 

61 Load a JSON file and return its contents as a dictionary. 

62 

63 Args: 

64 filename (Union[str, Path]): The path to the JSON file. 

65 

66 Returns: 

67 Dict[str, Any]: The contents of the JSON file as a dictionary. 

68 """ 

69 with open(filename, "r") as f: 

70 data = json.load(f) 

71 return data 

72 

73 

74def write_json(filename: Union[str, Path], data: Dict[str, Any]) -> None: 

75 """ 

76 Write a dictionary to a JSON file. 

77 

78 Args: 

79 filename (Union[str, Path]): The path to the JSON file. 

80 data (Dict[str, Any]): The data to write to the file. 

81 """ 

82 with open(filename, "w") as f: 

83 json.dump(data, f, indent=4) 

84 

85 

86def get_default_value( 

87 data_type: str, default_value: Any = None, required: bool = False 

88) -> Any: 

89 """ 

90 Get default value based on information provided. 

91 

92 Parameters 

93 ---------- 

94 data_type : str 

95 data type name 

96 default_value : Any, optional 

97 given default value, by default None 

98 required : bool, optional 

99 is required, by default False 

100 

101 Returns 

102 ------- 

103 Any 

104 default value 

105 """ 

106 

107 if not required: 

108 return None 

109 

110 if data_type in ["string"]: 

111 if default_value is None: 

112 return "" 

113 else: 

114 return f"'{str(default_value)}'" 

115 elif data_type in ["int"]: 

116 if default_value is None: 

117 return 0 

118 else: 

119 return int(default_value) 

120 elif data_type in ["float", "number"]: 

121 if default_value is None: 

122 return 0.0 

123 elif isinstance(default_value, str): 

124 try: 

125 return float(default_value) 

126 except ValueError: 

127 return 0.0 

128 elif isinstance(default_value, (list, tuple)): 

129 return [] 

130 

131 else: 

132 return float(default_value) 

133 elif data_type in ["boolean"]: 

134 return bool(default_value) 

135 

136 

137def get_alias_name(alias_name: str) -> str: 

138 """ 

139 Get the alias name, and return None if empty 

140 

141 Parameters 

142 ---------- 

143 alias_name : str 

144 alias name 

145 """ 

146 if alias_name in [[], None, "", "None", "none"]: 

147 return None 

148 else: 

149 return alias_name 

150 

151 

152def get_new_basemodel_filename( 

153 filename: Path | str, save_path: Path = MTMETADATA_SAVEPATH 

154) -> Path: 

155 """ 

156 Get new file name for new BaseModel. 

157 

158 Will place into `mt_metadata/mt_metadata/...` 

159 

160 Parameters 

161 ---------- 

162 filename : Path | str 

163 json schema standards file name 

164 save_path : Path, optional 

165 default path to save to, by default MTMETADATA_SAVEPATH 

166 

167 Returns 

168 ------- 

169 Path 

170 new file path to new BaseModel object. 

171 """ 

172 filename = Path(filename) 

173 # Get the parts of the filename 

174 parts = Path(filename).parts 

175 index = parts.index("standards") + 1 

176 new_file_directory = save_path.joinpath("\\".join(parts[index:-1])) 

177 new_file_directory.mkdir(parents=True, exist_ok=True) 

178 new_filename = new_file_directory.joinpath(f"{filename.stem}_basemodel.py") 

179 return new_filename 

180 

181 

182def get_new_schema_filename( 

183 filename: str | Path, save_path: Path = STANDARDS_SAVEPATH 

184) -> Path: 

185 """ 

186 Get new file path to a JSON schema file. Will be place into 

187 `mt_metadata/mt_metadata/standards/...` 

188 

189 Parameters 

190 ---------- 

191 filename : str | Path 

192 old JSON file 

193 save_path : Path, optional 

194 default directory to save to, by default STANDARDS_SAVEPATH 

195 

196 Returns 

197 ------- 

198 Path 

199 new file path to JSON Schema file. 

200 """ 

201 

202 parts = Path(filename).parts 

203 index = parts.index("mt_metadata") + 2 

204 new_file_directory = save_path.joinpath("\\".join(parts[index:-2])) 

205 new_file_directory.mkdir(parents=True, exist_ok=True) 

206 new_filename = new_file_directory.joinpath(filename.name) 

207 return new_filename 

208 

209 

210def to_json_schema(filename: str | Path) -> Path: 

211 """ 

212 Convert old JSON files to a JSON Schema file. 

213 

214 Parameters 

215 ---------- 

216 filename : Union[str, Path] 

217 file path to old JSON file 

218 

219 Returns 

220 ------- 

221 Path 

222 File path to new JSON Schema file 

223 

224 Raises 

225 ------ 

226 KeyError 

227 if `type` is not in old JSON file 

228 """ 

229 filename = Path(filename) 

230 old = load_json(filename) 

231 object_name = filename.stem 

232 

233 new = {"title": object_name} 

234 new["type"] = "object" 

235 new["properties"] = {} 

236 new["required"] = [] 

237 new["description"] = object_name 

238 for key, value in old.items(): 

239 new["properties"][key] = {} 

240 

241 # map type to JSON schema type 

242 try: 

243 json_type = JSON_TYPE_MAPPING[value["type"]] 

244 except KeyError: 

245 raise KeyError(f"Could not find the type {value['type']} in the type dict.") 

246 # if the style is a list then use array 

247 if "list" in value["style"]: 

248 new["properties"][key]["type"] = "array" 

249 new["properties"][key]["default"] = [] 

250 new["properties"][key]["items"] = {} 

251 new["properties"][key]["items"]["type"] = json_type 

252 

253 else: 

254 new["properties"][key]["default"] = get_default_value( 

255 value["type"], 

256 default_value=value["default"], 

257 required=value["required"], 

258 ) 

259 new["properties"][key]["description"] = value["description"] 

260 new["properties"][key]["title"] = key 

261 new["properties"][key]["examples"] = value["example"] 

262 new["properties"][key]["type"] = json_type 

263 new["properties"][key]["alias"] = get_alias_name(value["alias"]) 

264 new["properties"][key]["units"] = value["units"] 

265 if value["required"]: 

266 new["required"].append(key) 

267 

268 # need to sort out string formats 

269 if value["style"] == "controlled vocabulary": 

270 new["properties"][key]["enum"] = value["options"] 

271 

272 elif value["style"] == "alpha numeric": 

273 new["properties"][key]["pattern"] = "^[a-zA-Z0-9]*$" 

274 

275 elif value["style"] in ["date time", "date", "time"]: 

276 new["properties"][key]["format"] = "date-time" 

277 

278 elif value["style"] in ["email"]: 

279 new["properties"][key]["format"] = "email" 

280 

281 elif value["style"] in ["url"]: 

282 new["properties"][key]["format"] = "uri" 

283 

284 # write new file 

285 new_file = get_new_schema_filename(filename) 

286 write_json(new_file, new) 

287 

288 return new_file 

289 

290 

291# def from_jsonschema_to_pydantic_basemodel(filename: Union[str, Path], **kwargs) -> Path: 

292# """ 

293# make basemodel from json schema 

294 

295# Parameters 

296# ---------- 

297# filename : _type_ 

298# _description_ 

299# """ 

300# filename = Path(filename) 

301# new_filename = get_new_basemodel_filename(filename, MTMETADATA_SAVEPATH) 

302 

303# data_model_types = get_data_model_types( 

304# DataModelType.PydanticV2BaseModel, 

305# target_python_version=PythonVersion.PY_311, 

306# target_datetime_class=MTime, 

307# ) 

308 

309# parser = JsonSchemaParser( 

310# filename, 

311# data_model_type=data_model_types.data_model, 

312# data_model_root_type=data_model_types.root_model, 

313# data_model_field_type=data_model_types.field_model, 

314# data_type_manager_type=data_model_types.data_type_manager, 

315# dump_resolve_reference_action=data_model_types.dump_resolve_reference_action, 

316# field_extra_keys=["alias", "units", "default", "required"], 

317# use_annotated=True, 

318# use_union_operator=True, 

319# field_constraints=True, 

320# snake_case_field=True, 

321# allow_extra_fields=True, 

322# strip_default_none=False, 

323# field_include_all_keys=True, 

324# apply_default_values_for_required_fields=True, 

325# ) 

326 

327# result = parser.parse() 

328 

329# with open(new_filename, "w") as fid: 

330# fid.write(result) 

331 

332# return new_filename 

333 

334 

335def snake_to_camel(snake_str: str) -> str: 

336 components = snake_str.split("_") 

337 camel_case_str = "".join(x.title() for x in components) 

338 return camel_case_str 

339 

340 

341type_imports = { 

342 "List": "from typing import List", 

343 "Dict": "from typing import Dict", 

344 "Any": "from typing import Any", 

345} 

346 

347 

348def generate_pydantic_basemodel(json_schema_filename: Union[str, Path]) -> str: 

349 """ 

350 Generate a Pydantic model from a JSON schema file and save it to a Python file. 

351 The generated model will use `Annotated` and `Field` for type annotations. 

352 

353 Parameters 

354 ---------- 

355 json_schema_filename : str | Path 

356 path to the JSON schema file 

357 

358 Returns 

359 ------- 

360 Path 

361 _description_ 

362 """ 

363 json_schema_filename = Path(json_schema_filename) 

364 if not json_schema_filename.exists(): 

365 raise FileNotFoundError(f"{json_schema_filename} does not exist.") 

366 if not json_schema_filename.suffix == ".json": 

367 raise FileNotFoundError( 

368 f"{json_schema_filename} is not a json file. Please provide a json file." 

369 ) 

370 

371 with open(json_schema_filename, "r") as fid: 

372 schema = json.load(fid) 

373 

374 new_filename = get_new_basemodel_filename(json_schema_filename, MTMETADATA_SAVEPATH) 

375 

376 class_definitions = [] 

377 class_name = snake_to_camel(schema.get("title", "GeneratedModel")) 

378 

379 required_fields = schema.get("required", []) 

380 properties = schema.get("properties", {}) 

381 

382 imports = ["from typing import Annotated", "from pydantic import Field"] 

383 

384 datetime_keys = [] 

385 enum_lines = [] 

386 has_comment = False 

387 has_units = False 

388 # Create field definitions 

389 for field_name, field_attrs in properties.items(): 

390 if field_name in ["units", "unit"]: 

391 has_units = True 

392 # Check if the field is a comment 

393 elif field_name in ["comments", "comment"]: 

394 has_comment = True 

395 field_type = "Comment" 

396 imports.append("from mt_metadata.common import Comment") 

397 field_attrs["default_factory"] = "lambda: Comment()" 

398 # class_definitions.append(f"{TAB}{field_name}: {field_type}") 

399 

400 # continue 

401 # Fallback to Any if type is unknown 

402 else: 

403 field_type = TYPE_MAPPING.get(field_attrs.get("type", "string"), "Any") 

404 # get typing imports 

405 for type_key in type_imports.keys(): 

406 if type_key in field_type: 

407 imports.append(type_imports[type_key]) 

408 

409 # if date time then use MTime as the object, need to add some types 

410 # a default factory. 

411 if field_attrs.get("format") == "date-time": 

412 field_type = "MTime | str | float | int | np.datetime64 | pd.Timestamp" 

413 imports.append("import numpy as np") 

414 imports.append("import pandas as pd") 

415 imports.append("from mt_metadata.common.mttime import MTime") 

416 field_attrs["default_factory"] = "lambda: MTime(time_stamp=None)" 

417 datetime_keys.append(field_name) 

418 

419 # if email format the use EmailStr object and import 

420 elif field_attrs.get("format") == "email": 

421 field_type = "EmailStr" 

422 imports.append("from pydantic import EmailStr") 

423 # if uri format the use HttpUrl object and import 

424 elif field_attrs.get("format") == "uri": 

425 field_type = "HttpUrl" 

426 imports.append("from pydantic import HttpUrl") 

427 

428 # enumerated types 

429 if field_attrs.get("enum", None) is not None: 

430 # Convert enum list to a string representation 

431 enum_lines.append(f"class {snake_to_camel(field_name)}Enum(str, Enum):") 

432 for enum_value in field_attrs["enum"]: 

433 enum_lines.append( 

434 f"{TAB}{enum_value.replace(' ', '_')} = '{enum_value}'" 

435 ) 

436 imports.append("from enum import Enum") 

437 field_type = f"{snake_to_camel(field_name)}Enum" 

438 

439 # check if required. Again required is a metadata standard not 

440 # a pydantic standard. If required in pydantic then the user 

441 # must supply a default value. Which is not the older way 

442 # mt-metadata was used, and not the desired way of using it. 

443 field_attrs["required"] = True 

444 if field_name not in required_fields: 

445 if "Comment" in field_type: 

446 field_type = "Comment" 

447 else: 

448 field_type = f"{field_type} | None" 

449 field_attrs["required"] = False 

450 

451 # get the default value based on type 

452 field_default = get_default_value( 

453 field_attrs["type"], 

454 default_value=field_attrs["default"], 

455 required=field_name in required_fields, 

456 ) 

457 # "" is skipped by pydantic need to set it at "''" 

458 if field_default in [""]: 

459 field_default = "''" 

460 elif isinstance(field_default, str) and "''" in field_default: 

461 field_default = field_default.replace("''", '"') 

462 if field_default == '"': 

463 field_default = '""' 

464 

465 # Use Annotated with Field 

466 field_definition = f"{TAB}{field_name}: Annotated[{field_type}, Field(" 

467 field_parts = [field_definition] 

468 

469 # Add attributes to Field 

470 if field_attrs.get("default_factory", None) is None: 

471 field_parts.append(f"{TAB}default={field_default},") 

472 else: 

473 field_parts.append( 

474 f"{TAB}default_factory={field_attrs['default_factory']}," 

475 ) 

476 

477 # need to add json_schema_extra attributes [units, required] 

478 json_schema_extra = {} 

479 for attr_name, attr_value in field_attrs.items(): 

480 if attr_name in [ 

481 "default", 

482 "title", 

483 "format", 

484 "enum", 

485 "type", 

486 "default_factory", 

487 ]: 

488 continue 

489 elif attr_name in ["examples"]: 

490 attr_value = [attr_value] 

491 # newer versions of pydantic use examples in json_schema_extra 

492 # field_parts.append(f"{TAB}{attr_name}={repr(attr_value)},") 

493 json_schema_extra["examples"] = repr(attr_value) 

494 elif attr_name in ["units", "required"]: 

495 json_schema_extra[attr_name] = attr_value 

496 

497 else: 

498 field_parts.append(f"{TAB}{attr_name}={repr(attr_value)},") 

499 

500 # Add json_schema_extra as a dictionary 

501 if json_schema_extra: 

502 json_extra_line = f"{TAB}json_schema_extra=" + "{" 

503 for jkey, jvalue in json_schema_extra.items(): 

504 json_extra_line += f"'{jkey}':{repr(jvalue)}," 

505 json_extra_line += "},\n" 

506 field_parts.append(json_extra_line) 

507 

508 # if field_attrs["required"]: 

509 field_parts.append(f"{TAB})]\n") 

510 # else: 

511 # field_parts.append(f")] = {field_default}\n") 

512 

513 class_definitions.append("\n".join(field_parts)) 

514 

515 if datetime_keys: 

516 imports.append("from pydantic import field_validator") 

517 for key in datetime_keys: 

518 class_definitions.append( 

519 f"{TAB}@field_validator('{key}', mode='before')\n" 

520 f"{TAB}@classmethod\n" 

521 f"{TAB}def validate_{key}(cls, field_value: MTime | float | int | np.datetime64 | pd.Timestamp | str):\n" 

522 f"{TAB*2}return MTime(time_stamp=field_value)\n" 

523 ) 

524 

525 if has_comment: 

526 class_definitions.append( 

527 f"{TAB}@field_validator('comments', mode='before')\n" 

528 f"{TAB}@classmethod\n" 

529 f"{TAB}def validate_comments(cls, value, info: ValidationInfo) -> Comment:\n" 

530 f"{TAB*2}if isinstance(value, str):\n" 

531 f"{TAB*3}return Comment(value=value)\n" 

532 f"{TAB*2}return value\n" 

533 ) 

534 

535 imports.append("from pydantic import field_validator, ValidationInfo") 

536 

537 if has_units: 

538 print(f"adding units to {new_filename}") 

539 class_definitions.append( 

540 f"{TAB}@field_validator('units', mode='before')\n" 

541 f"{TAB}@classmethod\n" 

542 f"{TAB}def validate_units(cls, value: str) -> str:\n" 

543 f"{TAB*2}if value in [None, '']:\n" 

544 f"{TAB*3}return ''\n" 

545 f"{TAB*2}try:\n" 

546 f"{TAB*3}unit_object = get_unit_object(value)\n" 

547 f"{TAB*3}return unit_object.name\n" 

548 f"{TAB*2}except ValueError as error:\n" 

549 f"{TAB*3}raise KeyError(error)\n" 

550 f"{TAB*2}except KeyError as error:\n" 

551 f"{TAB*3}raise KeyError(error)\n" 

552 ) 

553 imports.append("from mt_metadata.common.units import get_unit_object") 

554 imports.append("from pydantic import field_validator, ValidationInfo") 

555 

556 # Generate the class definition, dont need config dict as that is 

557 # already initiated in MetadataBase. 

558 class_code = [ 

559 f"class {class_name}(MetadataBase):", 

560 "\n".join(class_definitions) or f"{TAB}pass", 

561 ] 

562 

563 imports = "\n".join(imports) 

564 lines = [ 

565 "#=====================================================", 

566 "# Imports", 

567 "#=====================================================", 

568 f"{imports}", 

569 "from mt_metadata.base import MetadataBase", 

570 "#=====================================================", 

571 ] 

572 

573 lines += enum_lines 

574 lines += class_code 

575 line = "\n".join(lines) 

576 

577 return clean_and_format_code(line, new_filename) 

578 

579 

580def clean_and_format_code(code_str: str, filename: str | Path | None = None) -> str: 

581 """ 

582 Clean and format Python code by removing unused imports and formatting with isort and black. 

583 

584 Parameters 

585 ---------- 

586 code_str : str 

587 Python code as a string 

588 filename : str, optional 

589 Filename for error reporting, by default None 

590 

591 Returns 

592 ------- 

593 str 

594 Cleaned and formatted code 

595 """ 

596 # First, remove unused imports using autoflake 

597 try: 

598 import autoflake 

599 

600 code_str = autoflake.fix_code( 

601 code_str, 

602 remove_all_unused_imports=True, 

603 remove_unused_variables=False, 

604 expand_star_imports=True, 

605 ) 

606 except ImportError: 

607 logger.warning( 

608 "autoflake is not installed. Unused imports will not be removed. " 

609 "Install with 'pip install autoflake'." 

610 ) 

611 except Exception as error: 

612 if filename: 

613 logger.warning(f"{filename} Error removing unused imports: {error}") 

614 else: 

615 logger.warning(f"Error removing unused imports: {error}") 

616 

617 # Then format using isort 

618 try: 

619 import_config = { 

620 "force_single_line": False, # One import per line 

621 "force_alphabetical_sort_within_sections": True, # Sort alphabetically within sections 

622 "order_by_type": True, # Order by import type 

623 "sections": ["FUTURE", "STDLIB", "THIRDPARTY", "FIRSTPARTY", "LOCALFOLDER"], 

624 "lines_after_imports": 2, # Add 2 blank lines after imports 

625 } 

626 

627 code_str = isort.code(code_str, **import_config) 

628 except Exception as error: 

629 if filename: 

630 logger.warning(f"{filename} Error formatting code using isort: {error}") 

631 else: 

632 logger.warning(f"Error formatting code using isort: {error}") 

633 

634 # Finally format using black 

635 try: 

636 code_str = black.format_str(code_str, mode=black.FileMode()) 

637 except Exception as error: 

638 if filename: 

639 logger.warning(f"{filename} Error formatting code using black: {error}") 

640 else: 

641 logger.warning(f"Error formatting code using black: {error}") 

642 

643 # Write the formatted code back to the file 

644 if filename is not None: 

645 with open(filename, "w") as f: 

646 f.write(code_str) 

647 

648 return code_str 

649 

650 

651def reformat(filename: str | Path) -> str: 

652 """ 

653 Reformat a Python file by removing unused imports and formatting with isort and black. 

654 

655 Parameters 

656 ---------- 

657 filename : str | Path 

658 Path to the Python file to be reformatted 

659 """ 

660 filename = Path(filename) 

661 if not filename.exists(): 

662 raise FileNotFoundError(f"{filename} does not exist.") 

663 

664 with open(filename, "r") as f: 

665 code_str = f.read() 

666 

667 # Clean and format the code 

668 return clean_and_format_code(code_str, filename)