Module gamslib.sip.utils

Utility functions for the GAMS SIP package creation and validation.

Provides helpers for validating object directories, extracting IDs, calculating hashes, counting files and bytes, and fetching JSON schemas for validation.

Features

  • Validates object directory structure and required files.
  • Extracts and validates object and datastream IDs.
  • Calculates MD5, SHA512 hashes for files.
  • Counts files and bytes in a directory tree.
  • Fetches and parses JSON schemas from URLs, with error handling.

Usage

Use validate_object_dir(object_path) to check an object directory. Use extract_id(path) to extract and validate an object or datastream ID. Use md5hash()(file), sha512hash()(file), or sha256hash(file) for file checksums. Use count_bytes()(root_dir) or count_files()(root_dir) for directory statistics. Use fetch_json_schema()(url) to retrieve a JSON schema from a remote URL.

Functions

def count_bytes(root_dir: pathlib._local.Path) ‑> int
Expand source code
def count_bytes(root_dir: Path) -> int:
    """
    Count the number of bytes of all files below root_dir.

    Args:
        root_dir (Path): Directory to count bytes in.

    Returns:
        int: Total number of bytes in all files.
    """
    total_bytes = 0
    for file in root_dir.rglob("*"):
        if file.is_file():
            total_bytes += file.stat().st_size
    return total_bytes

Count the number of bytes of all files below root_dir.

Args

root_dir : Path
Directory to count bytes in.

Returns

int
Total number of bytes in all files.
def count_files(root_dir: pathlib._local.Path) ‑> int
Expand source code
def count_files(root_dir: Path) -> int:
    """
    Count the number of all files below root_dir.

    Args:
        root_dir (Path): Directory to count files in.

    Returns:
        int: Total number of files.
    """
    total_files = 0
    for file in root_dir.rglob("*"):
        if file.is_file():
            total_files += 1
    return total_files

Count the number of all files below root_dir.

Args

root_dir : Path
Directory to count files in.

Returns

int
Total number of files.
def fetch_json_schema(url: str) ‑> dict
Expand source code
@lru_cache()
def fetch_json_schema(url: str) -> dict:
    """
    Fetch a JSON schema from a URL.

    Args:
        url (str): URL to fetch the JSON schema from.

    Returns:
        dict: Parsed JSON schema.

    Raises:
        BagValidationError: If the schema cannot be fetched or is not valid JSON.
    """
    if url == GAMS_SIP_SCHEMA_URL:
        logger.debug("Using embedded GAMS SIP schema")
        return read_sip_schema_from_package()
    try:
        logger.debug("Fetching JSON schema from %s", url)
        response = requests.get(url, timeout=20)
        if not response.ok:
            raise BagValidationError(
                f"Failed to fetch JSON schema from '{url}': HTTP status code {response.status_code}"
            )
    except requests.RequestException as e:
        raise BagValidationError(
            f"Failed to fetch JSON schema from '{url}': {e}"
        ) from e

    try:
        return response.json()
    except (
        requests.JSONDecodeError,
        requests.exceptions.InvalidJSONError,
        TypeError,
    ) as e:
        raise BagValidationError(
            f"Schema referenced in 'sip.json' is not valid JSON: {e}"
        ) from e

Fetch a JSON schema from a URL.

Args

url : str
URL to fetch the JSON schema from.

Returns

dict
Parsed JSON schema.

Raises

BagValidationError
If the schema cannot be fetched or is not valid JSON.
def is_bag(bag_path: pathlib._local.Path) ‑> bool
Expand source code
def is_bag(bag_path: Path) -> bool:
    """Check if the given path points to a Bag.

    It does not check the validity of the Bag, only if the structure indicates
    that it looks like a Bag.

    To check the validity of the Bag, unpack it using the unpack function
    and use the validate_object_dir function.

    pag_path can be either a directory or a file (zip).

    Args:
        bag_path (Path): The path to the directory to check.

    Returns:
        bool: True if the path points to a Bag, False otherwise.
    """
    expected_files = {
        "bagit.txt",
        "manifest-md5.txt",
        "manifest-sha512.txt",
        "data/meta/sip.json",
    }
    looks_like_a_bag = False
    all_files = set()
    if bag_path.is_dir():
        all_files = {
            file_path.relative_to(bag_path).as_posix()
            for file_path in bag_path.rglob("*")
        }
    elif bag_path.is_file() and bag_path.suffix == ".zip":
        with zipfile.ZipFile(bag_path, "r") as zip_ref:
            all_files = set(zip_ref.namelist())
    if expected_files.issubset(all_files):
        looks_like_a_bag = True
    else:
        missing_files = expected_files - all_files
        warnings.warn(
            f"Path {bag_path} is missing expected Bag files: "
            f"{', '.join(sorted(missing_files))}"
        )
        looks_like_a_bag = False
    return looks_like_a_bag

Check if the given path points to a Bag.

It does not check the validity of the Bag, only if the structure indicates that it looks like a Bag.

To check the validity of the Bag, unpack it using the unpack function and use the validate_object_dir function.

pag_path can be either a directory or a file (zip).

Args

bag_path : Path
The path to the directory to check.

Returns

bool
True if the path points to a Bag, False otherwise.
def md5hash(file: pathlib._local.Path) ‑> str
Expand source code
def md5hash(file: Path) -> str:
    """
    Calculate the MD5 hash of a file.

    Args:
        file (Path): Path to the file.

    Returns:
        str: MD5 hash as a hexadecimal string.
    """
    return hashlib.md5(file.read_bytes()).hexdigest()

Calculate the MD5 hash of a file.

Args

file : Path
Path to the file.

Returns

str
MD5 hash as a hexadecimal string.
def read_sip_schema_from_package()
Expand source code
def read_sip_schema_from_package():
    """
    Read the SIP JSON schema from the package data.

    The schema file is located in the sip subpackage under the resources directory.

    Returns:
        dict: Parsed JSON schema.
    """
    with SCHEMA_PATH.open() as f:
        return json.load(f)

Read the SIP JSON schema from the package data.

The schema file is located in the sip subpackage under the resources directory.

Returns

dict
Parsed JSON schema.
def sha512hash(file: pathlib._local.Path) ‑> str
Expand source code
def sha512hash(file: Path) -> str:
    """
    Calculate the SHA512 hash of a file.

    Args:
        file (Path): Path to the file.

    Returns:
        str: SHA512 hash as a hexadecimal string.
    """
    return hashlib.sha512(file.read_bytes()).hexdigest()

Calculate the SHA512 hash of a file.

Args

file : Path
Path to the file.

Returns

str
SHA512 hash as a hexadecimal string.