Source code for askemblaex.metadata
"""
askemblaex/metadata.py
Metadata building, reading, writing, and merging for extraction outputs.
Each processed document has a metadata file at::
<output_root>/<file_hash>/<file_hash>.metadata._.json
The metadata records source file information, extraction state, processing
steps, and schema versioning.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
# Safe module-level logger — no handlers attached here.
# Call setup_main_logger() from your entrypoint.
logger = logging.getLogger("askemblaex")
def _utc_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
[docs]
def build_metadata(
src_path: Path,
*,
file_hash: str,
hash_algo: str = "sha256",
) -> dict[str, Any]:
"""
Build a fresh metadata dict for a source file.
Reads page count from PDFs (via pypdf) and EXIF data from images (via
Pillow). The returned dict conforms to the askemblaex metadata schema
and is ready to be written with :func:`write_metadata`.
Args:
src_path: Path to the source file (PDF or image).
file_hash: Pre-computed content hash (hex digest) of the file.
hash_algo: Name of the hash algorithm used (default ``"sha256"``).
Returns:
Metadata dict with keys ``_key``, ``source``, ``raw``,
``extraction``, and ``version``.
"""
now = _utc_now()
meta: dict[str, Any] = {
"_key": file_hash,
"source": {
"filename": src_path.name,
"type": src_path.suffix.lower(),
"title": src_path.stem,
"created_utc": now,
"local": True,
"uris": [],
},
"processing": None,
"raw": {
"content": "",
"content_type": "text/plain",
"encoding": "utf-8",
"checksum": None,
"page_count": None,
"process": {
"ai": {
"openai": "gpt4o",
}
},
},
"extraction": {
"started_utc": None,
"completed_utc": None,
"complete": False,
"steps": {
"azure_computer_vision": False,
"pymupdf": False,
"pdfplumber": False,
"azure_docint": False,
"reconciled": False,
"embeddings": False,
},
},
"version": {
"schema": 1,
"content_version": 1,
"embedding_version": None,
"is_active": True,
},
}
# Pull page count and native PDF metadata
ext = src_path.suffix.lower()
if ext == ".pdf":
try:
try:
from pypdf import PdfReader # type: ignore
except Exception:
from PyPDF2 import PdfReader # type: ignore
reader = PdfReader(str(src_path))
meta["raw"]["page_count"] = len(reader.pages)
pdf_meta = reader.metadata
if pdf_meta:
native: dict[str, str] = {}
for k, v in dict(pdf_meta).items():
if v is None:
continue
s = str(v).strip()
if not s:
continue
key = str(k).lstrip("/").strip()
if key:
native[key] = s
if native:
meta["source"]["pdf_meta"] = native
except Exception:
pass
elif ext in {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp"}:
try:
from PIL import Image as PILImage, ExifTags
with PILImage.open(src_path) as img:
image_info: dict[str, str] = {}
for k, v in img.info.items():
if v is None:
continue
s = str(v).strip()
if s:
image_info[f"image_{k}"] = s
exif = img.getexif()
if exif:
for tag_id, value in exif.items():
if value is None:
continue
s = str(value).strip()
if not s:
continue
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
image_info[f"exif_{tag}"] = s
if image_info:
meta["source"]["image_meta"] = image_info
except Exception:
pass
return meta
[docs]
def write_metadata(out_dir: Path, file_hash: str, metadata: dict[str, Any]) -> None:
"""
Write a metadata dict to ``<out_dir>/<file_hash>.metadata._.json``.
Creates *out_dir* (and any parents) if it does not exist.
Args:
out_dir: Output directory for this document.
file_hash: Content hash of the source file — used as the filename prefix.
metadata: Metadata dict, typically produced by :func:`build_metadata`.
"""
out_dir.mkdir(parents=True, exist_ok=True)
meta_path = out_dir / f"{file_hash}.metadata._.json"
meta_path.write_text(json.dumps(metadata, indent=2, ensure_ascii=False), encoding="utf-8")
[docs]
def load_metadata(
out_dir: Path,
file_hash: str,
*,
logger: logging.Logger | None = None,
) -> Optional[dict[str, Any]]:
"""
Load a metadata dict from ``<out_dir>/<file_hash>.metadata._.json``.
Args:
out_dir: Output directory for this document.
file_hash: Content hash of the source file.
logger: Optional logger; defaults to ``"askemblaex.metadata"``.
Returns:
Parsed metadata dict, or ``None`` if the file does not exist or
cannot be parsed.
"""
log = logger or logging.getLogger("askemblaex.metadata")
meta_path = out_dir / f"{file_hash}.metadata._.json"
if not meta_path.exists() or not meta_path.is_file():
log.debug("Metadata file not found: %s", meta_path)
return None
try:
data = json.loads(meta_path.read_text(encoding="utf-8"))
log.debug("Loaded metadata: %s", meta_path.name)
return data
except Exception as e:
log.error("Failed to load metadata: %s (%s)", meta_path.name, e)
return None
[docs]
def metadata_file_exists(out_dir: Path, file_hash: str) -> bool:
"""
Return ``True`` if a metadata file exists for *file_hash* in *out_dir*.
Args:
out_dir: Document output directory.
file_hash: Content hash of the source file.
Returns:
``True`` if ``<out_dir>/<file_hash>.metadata._.json`` exists.
"""
return (out_dir / f"{file_hash}.metadata._.json").is_file()
[docs]
def get_metadata_path_if_exists(out_dir: Path, file_hash: str) -> Path | None:
"""
Return the metadata file path if it exists, otherwise ``None``.
Args:
out_dir: Document output directory.
file_hash: Content hash of the source file.
Returns:
:class:`pathlib.Path` to the metadata file, or ``None``.
"""
p = out_dir / f"{file_hash}.metadata._.json"
return p if p.is_file() else None
[docs]
def merge_metadata(
existing: dict[str, Any],
new: dict[str, Any],
*,
overwrite: bool = False,
) -> dict[str, Any]:
"""
Shallow-merge two metadata dicts and stamp ``last_updated_utc``.
Args:
existing: Base metadata dict (typically loaded from disk).
new: New values to merge in. ``None`` values are ignored.
overwrite: If ``True``, values in *new* replace values in *existing*.
If ``False`` (default), existing values win.
Returns:
Merged metadata dict with ``last_updated_utc`` set to the current
UTC time.
"""
merged = dict(existing)
for key, value in new.items():
if value is None:
continue
if key not in merged:
merged[key] = value
elif overwrite:
merged[key] = value
merged["last_updated_utc"] = _utc_now()
return merged