Source code for scitex_clew._stamp

#!/usr/bin/env python3
# Timestamp: "2026-02-09 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-python/src/scitex/verify/_stamp.py
"""External hash timestamping for temporal integrity.

Provides independent temporal proof that a verification chain was consistent
at a specific point in time. Only hashes are transmitted — never actual data.

Backends (increasing trust level):
  - file:    Local JSON file with timestamp (development/testing)
  - rfc3161: RFC 3161 Timestamping Authority (production standard)
  - zenodo:  Zenodo deposit with DOI (archival, citable)
"""

from __future__ import annotations

import hashlib
import json
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional

from ._db import get_db

STAMP_BACKENDS = ("file", "rfc3161", "zenodo", "scitex_cloud")


@dataclass
class Stamp:
    """A temporal proof record."""

    stamp_id: str
    root_hash: str
    timestamp: str
    backend: str
    service_url: Optional[str]
    response_token: Optional[str]
    run_count: int
    metadata: Optional[Dict] = None

    def to_dict(self) -> Dict:
        return {
            "stamp_id": self.stamp_id,
            "root_hash": self.root_hash,
            "timestamp": self.timestamp,
            "backend": self.backend,
            "service_url": self.service_url,
            "response_token": self.response_token,
            "run_count": self.run_count,
            "metadata": self.metadata,
        }


def migrate_add_stamps_table(db_path: Path) -> None:
    """Create stamps table if not present. Safe to call multiple times."""
    conn = sqlite3.connect(str(db_path))
    try:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS stamps (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                stamp_id TEXT UNIQUE NOT NULL,
                root_hash TEXT NOT NULL,
                timestamp TEXT NOT NULL,
                backend TEXT NOT NULL,
                service_url TEXT,
                response_token TEXT,
                run_count INTEGER,
                metadata TEXT
            )
            """
        )
        conn.execute("CREATE INDEX IF NOT EXISTS idx_stamps_hash ON stamps(root_hash)")
        conn.commit()
    finally:
        conn.close()


def compute_root_hash(session_ids: Optional[List[str]] = None) -> Dict:
    """Compute a Merkle-like root hash over all (or selected) runs.

    The root hash combines all run combined_hashes in deterministic order,
    providing a single fingerprint for the entire verification state.

    Parameters
    ----------
    session_ids : list of str, optional
        Specific sessions to include. If None, includes all successful runs.

    Returns
    -------
    dict
        {root_hash, run_count, session_ids}
    """
    db = get_db()
    conn = sqlite3.connect(str(db.db_path))
    conn.row_factory = sqlite3.Row
    try:
        if session_ids:
            placeholders = ",".join("?" * len(session_ids))
            rows = conn.execute(
                f"SELECT session_id, combined_hash FROM runs "
                f"WHERE session_id IN ({placeholders}) "
                f"ORDER BY session_id",
                session_ids,
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT session_id, combined_hash FROM runs "
                "WHERE status = 'success' AND combined_hash IS NOT NULL "
                "ORDER BY session_id"
            ).fetchall()

        if not rows:
            return {"root_hash": None, "run_count": 0, "session_ids": []}

        hasher = hashlib.sha256()
        ids = []
        for row in rows:
            hasher.update(row["session_id"].encode())
            hasher.update((row["combined_hash"] or "").encode())
            ids.append(row["session_id"])

        return {
            "root_hash": hasher.hexdigest(),
            "run_count": len(ids),
            "session_ids": ids,
        }
    finally:
        conn.close()


[docs] def stamp( backend: str = "file", service_url: Optional[str] = None, session_ids: Optional[List[str]] = None, output_dir: Optional[str] = None, ) -> Stamp: """Record root hash with external timestamp. Parameters ---------- backend : str One of: file, rfc3161, zenodo. service_url : str, optional URL for RFC 3161 TSA or Zenodo API. session_ids : list of str, optional Specific sessions to stamp. If None, stamps all successful runs. output_dir : str, optional Directory for file-based stamps (default: <db_dir>/stamps, i.e. .scitex/clew/runtime/stamps/). Returns ------- Stamp The timestamp proof record. """ if backend not in STAMP_BACKENDS: raise ValueError( f"Invalid backend '{backend}'. Must be one of: {STAMP_BACKENDS}" ) root = compute_root_hash(session_ids) if not root["root_hash"]: raise ValueError("No runs to stamp (no successful runs with combined hashes)") now = datetime.now(timezone.utc).isoformat() root_hash = root["root_hash"] raw = f"{root_hash}:{now}" stamp_id = f"stamp_{hashlib.sha256(raw.encode()).hexdigest()[:12]}" if backend == "file": result = _stamp_file(stamp_id, root, now, output_dir) elif backend == "rfc3161": result = _stamp_rfc3161(stamp_id, root, now, service_url) elif backend == "zenodo": result = _stamp_zenodo(stamp_id, root, now, service_url) elif backend == "scitex_cloud": result = _stamp_scitex_cloud(stamp_id, root, now, service_url) else: raise ValueError(f"Unsupported backend: {backend}") stamp_obj = Stamp( stamp_id=stamp_id, root_hash=root["root_hash"], timestamp=now, backend=backend, service_url=result.get("service_url"), response_token=result.get("response_token"), run_count=root["run_count"], metadata={"session_ids": root["session_ids"]}, ) # Store in database db = get_db() _ensure_stamps_table(db) conn = sqlite3.connect(str(db.db_path)) try: conn.execute( """ INSERT INTO stamps (stamp_id, root_hash, timestamp, backend, service_url, response_token, run_count, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( stamp_obj.stamp_id, stamp_obj.root_hash, stamp_obj.timestamp, stamp_obj.backend, stamp_obj.service_url, stamp_obj.response_token, stamp_obj.run_count, json.dumps(stamp_obj.metadata), ), ) conn.commit() finally: conn.close() return stamp_obj
[docs] def check_stamp(stamp_id: Optional[str] = None) -> Dict: """Verify a stamp against current verification state. Parameters ---------- stamp_id : str, optional Specific stamp to check. If None, checks the latest stamp. Returns ------- dict {stamp, current_root_hash, matches, details} """ db = get_db() _ensure_stamps_table(db) conn = sqlite3.connect(str(db.db_path)) conn.row_factory = sqlite3.Row try: if stamp_id: row = conn.execute( "SELECT * FROM stamps WHERE stamp_id = ?", (stamp_id,) ).fetchone() else: row = conn.execute( "SELECT * FROM stamps ORDER BY id DESC LIMIT 1" ).fetchone() if not row: return {"status": "not_found", "message": "No stamps found"} stored_stamp = Stamp( stamp_id=row["stamp_id"], root_hash=row["root_hash"], timestamp=row["timestamp"], backend=row["backend"], service_url=row["service_url"], response_token=row["response_token"], run_count=row["run_count"], metadata=json.loads(row["metadata"]) if row["metadata"] else None, ) # Recompute root hash from the same sessions session_ids = ( stored_stamp.metadata.get("session_ids") if stored_stamp.metadata else None ) current = compute_root_hash(session_ids) matches = current["root_hash"] == stored_stamp.root_hash details = [] if matches: details.append(f"Root hash matches stamp from {stored_stamp.timestamp}") else: details.append(f"Root hash CHANGED since stamp at {stored_stamp.timestamp}") details.append(f" Stamped: {stored_stamp.root_hash[:32]}...") details.append(f" Current: {current['root_hash'][:32]}...") if current["run_count"] != stored_stamp.run_count: details.append( f" Run count changed: {stored_stamp.run_count}{current['run_count']}" ) return { "stamp": stored_stamp.to_dict(), "current_root_hash": current["root_hash"], "matches": matches, "details": details, } finally: conn.close()
[docs] def list_stamps(limit: int = 20) -> List[Stamp]: """List all stamps.""" db = get_db() _ensure_stamps_table(db) conn = sqlite3.connect(str(db.db_path)) conn.row_factory = sqlite3.Row try: rows = conn.execute( "SELECT * FROM stamps ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() return [ Stamp( stamp_id=r["stamp_id"], root_hash=r["root_hash"], timestamp=r["timestamp"], backend=r["backend"], service_url=r["service_url"], response_token=r["response_token"], run_count=r["run_count"], metadata=json.loads(r["metadata"]) if r["metadata"] else None, ) for r in rows ] finally: conn.close()
# ── Backend implementations ── def _stamp_file(stamp_id, root, timestamp, output_dir=None): """File-based stamping: write JSON proof to local directory.""" if output_dir: stamp_dir = Path(output_dir) else: db = get_db() stamp_dir = db.db_path.parent / "stamps" stamp_dir.mkdir(parents=True, exist_ok=True) stamp_path = stamp_dir / f"{stamp_id}.json" proof = { "stamp_id": stamp_id, "root_hash": root["root_hash"], "timestamp": timestamp, "run_count": root["run_count"], "backend": "file", } stamp_path.write_text(json.dumps(proof, indent=2)) return {"service_url": str(stamp_path), "response_token": None} def _stamp_rfc3161(stamp_id, root, timestamp, service_url=None): """RFC 3161 Timestamping Authority.""" try: import rfc3161ng except ImportError: raise ImportError( "RFC 3161 stamping requires 'rfc3161ng' package. " "Install with: pip install rfc3161ng" ) url = service_url or "http://zeitstempel.dfn.de" certificate = rfc3161ng.RemoteTimestamper(url) hash_bytes = bytes.fromhex(root["root_hash"]) tst = certificate.timestamp(data=hash_bytes) token_hex = tst.hex() if isinstance(tst, bytes) else str(tst) return {"service_url": url, "response_token": token_hex[:256]} def _stamp_zenodo(stamp_id, root, timestamp, service_url=None): """Zenodo deposit: create a record with the root hash.""" raise NotImplementedError( "Zenodo stamping is planned for a future release. " "Use 'file' or 'rfc3161' backend instead." ) def _stamp_scitex_cloud(stamp_id, root, timestamp, service_url=None): """SciTeX Cloud registry: register root hash with server-side timestamp.""" from ._registry import get_registry registry = get_registry(base_url=service_url) result = registry.register( root["root_hash"], source_type="stamp", metadata={ "stamp_id": stamp_id, "run_count": root["run_count"], "timestamp": timestamp, }, ) url = service_url or registry.base_url token = ( result.get("data", {}).get("registered_at") if result.get("success") else None ) return {"service_url": url, "response_token": token} def _ensure_stamps_table(db) -> None: """Ensure the stamps table exists.""" migrate_add_stamps_table(db.db_path) __all__ = [ "STAMP_BACKENDS", "Stamp", "check_stamp", "compute_root_hash", "list_stamps", "migrate_add_stamps_table", "stamp", ]