amachine.am_fast.json_utils
1import os 2import orjson 3from typing import Any 4from pathlib import Path 5 6def load_json( filepath : str | Path ) -> None : 7 with open(filepath, 'rb') as f: 8 return orjson.loads(f.read()) 9 10def save_json( data : dict[str,Any], filepath : str | Path ) -> None : 11 temp_path = f"{filepath}.tmp" 12 try: 13 with open(temp_path, 'wb') as f: # Note: 'wb' - orjson.dumps returns bytes 14 f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY )) 15 f.flush() 16 os.fsync(f.fileno()) 17 os.replace(temp_path, filepath) 18 19 except Exception as e: 20 print(f"An error occurred while saving the file: {e}") 21 if os.path.exists(temp_path): 22 try: 23 os.remove(temp_path) 24 except OSError as cleanup_error: 25 print(f"Failed to clean up temporary file '{temp_path}': {cleanup_error}") 26 27def save_json_if_changed( data : dict[str,Any], filepath : str | Path ) -> None : 28 if os.path.exists( filepath ): 29 existing = load_json( filepath ) 30 existing_bytes = orjson.dumps(existing) 31 current_bytes = orjson.dumps(data) 32 if existing_bytes == current_bytes: 33 return 34 safe_save( data, filepath )
def
load_json(filepath: str | pathlib.Path) -> None:
def
save_json(data: dict[str, typing.Any], filepath: str | pathlib.Path) -> None:
11def save_json( data : dict[str,Any], filepath : str | Path ) -> None : 12 temp_path = f"{filepath}.tmp" 13 try: 14 with open(temp_path, 'wb') as f: # Note: 'wb' - orjson.dumps returns bytes 15 f.write(orjson.dumps(data, option=orjson.OPT_INDENT_2 | orjson.OPT_SERIALIZE_NUMPY )) 16 f.flush() 17 os.fsync(f.fileno()) 18 os.replace(temp_path, filepath) 19 20 except Exception as e: 21 print(f"An error occurred while saving the file: {e}") 22 if os.path.exists(temp_path): 23 try: 24 os.remove(temp_path) 25 except OSError as cleanup_error: 26 print(f"Failed to clean up temporary file '{temp_path}': {cleanup_error}")
def
save_json_if_changed(data: dict[str, typing.Any], filepath: str | pathlib.Path) -> None:
28def save_json_if_changed( data : dict[str,Any], filepath : str | Path ) -> None : 29 if os.path.exists( filepath ): 30 existing = load_json( filepath ) 31 existing_bytes = orjson.dumps(existing) 32 current_bytes = orjson.dumps(data) 33 if existing_bytes == current_bytes: 34 return 35 safe_save( data, filepath )