Source code for scitex_clew._claim

#!/usr/bin/env python3
# Timestamp: "2026-02-09 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-python/src/scitex/verify/_claim.py
"""Claim layer — link paper assertions to verification chain.

Claims represent specific assertions in manuscripts (statistics, figures,
tables) that can be traced back through the verification chain to source data.

Five claim types:
  - statistic: A numerical result (p-value, effect size, etc.)
  - figure:    A figure reference linked to a recipe/image
  - table:     A table reference linked to source CSV
  - text:      A textual assertion linked to computational output
  - value:     A specific computed value (count, percentage, etc.)
"""

from __future__ import annotations

import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

from ._db import get_db

# Canonical claim types
CLAIM_TYPES = ("statistic", "figure", "table", "text", "value")


@dataclass
class Claim:
    """A traceable assertion in a manuscript."""

    claim_id: str
    file_path: str
    line_number: Optional[int]
    claim_type: str
    claim_value: Optional[str]
    source_session: Optional[str]
    source_file: Optional[str]
    source_hash: Optional[str]
    registered_at: Optional[str] = None
    verified_at: Optional[str] = None
    status: str = "registered"

    @property
    def location(self) -> str:
        """Human-readable location string."""
        if self.line_number:
            return f"{self.file_path}:L{self.line_number}"
        return self.file_path

    def to_dict(self) -> Dict:
        return {
            "claim_id": self.claim_id,
            "file_path": self.file_path,
            "line_number": self.line_number,
            "claim_type": self.claim_type,
            "claim_value": self.claim_value,
            "source_session": self.source_session,
            "source_file": self.source_file,
            "source_hash": self.source_hash,
            "registered_at": self.registered_at,
            "verified_at": self.verified_at,
            "status": self.status,
        }


def migrate_add_claims_table(db_path: Path) -> None:
    """Create claims table if not present. Safe to call multiple times."""
    conn = sqlite3.connect(str(db_path))
    try:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS claims (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                claim_id TEXT UNIQUE NOT NULL,
                file_path TEXT NOT NULL,
                line_number INTEGER,
                claim_type TEXT NOT NULL,
                claim_value TEXT,
                source_session TEXT,
                source_file TEXT,
                source_hash TEXT,
                registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                verified_at TIMESTAMP,
                status TEXT DEFAULT 'registered'
            )
            """
        )
        conn.execute("CREATE INDEX IF NOT EXISTS idx_claims_file ON claims(file_path)")
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_claims_source ON claims(source_file)"
        )
        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_claims_session ON claims(source_session)"
        )
        conn.commit()
    finally:
        conn.close()


def _generate_claim_id(
    file_path: str, line_number: Optional[int], claim_type: str
) -> str:
    """Generate a deterministic claim ID."""
    loc = f"{file_path}:L{line_number}" if line_number else file_path
    import hashlib

    h = hashlib.sha256(f"{loc}:{claim_type}".encode()).hexdigest()[:12]
    return f"claim_{h}"


[docs] def add_claim( file_path: str, claim_type: str, line_number: Optional[int] = None, claim_value: Optional[str] = None, source_file: Optional[str] = None, source_session: Optional[str] = None, ) -> Claim: """Register a claim linking a manuscript assertion to the verification chain. Parameters ---------- file_path : str Path to the manuscript file (e.g., paper.tex). claim_type : str One of: statistic, figure, table, text, value. line_number : int, optional Line number in the manuscript. claim_value : str, optional The asserted value (e.g., "p = 0.003"). source_file : str, optional Path to the source file that produced this claim. source_session : str, optional Session ID that produced the source. Returns ------- Claim The registered claim object. """ if claim_type not in CLAIM_TYPES: raise ValueError( f"Invalid claim_type '{claim_type}'. Must be one of: {CLAIM_TYPES}" ) file_path = str(Path(file_path).resolve()) claim_id = _generate_claim_id(file_path, line_number, claim_type) # Compute source hash if source_file exists source_hash = None if source_file: source_file = str(Path(source_file).resolve()) source_path = Path(source_file) if source_path.exists(): from ._hash import hash_file source_hash = hash_file(source_path) # Auto-detect source session if not provided if source_file and not source_session: db = get_db() sessions = db.find_session_by_file(source_file, role="output") if sessions: source_session = sessions[0] claim = Claim( claim_id=claim_id, file_path=file_path, line_number=line_number, claim_type=claim_type, claim_value=claim_value, source_session=source_session, source_file=source_file, source_hash=source_hash, ) # Store in database db = get_db() _ensure_claims_table(db) conn = sqlite3.connect(str(db.db_path)) try: conn.execute( """ INSERT OR REPLACE INTO claims (claim_id, file_path, line_number, claim_type, claim_value, source_session, source_file, source_hash, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'registered') """, ( claim.claim_id, claim.file_path, claim.line_number, claim.claim_type, claim.claim_value, claim.source_session, claim.source_file, claim.source_hash, ), ) conn.commit() finally: conn.close() return claim
[docs] def list_claims( file_path: Optional[str] = None, claim_type: Optional[str] = None, status: Optional[str] = None, limit: int = 100, ) -> List[Claim]: """List registered claims with optional filters. Parameters ---------- file_path : str, optional Filter by manuscript file path. claim_type : str, optional Filter by claim type. status : str, optional Filter by verification status. limit : int Maximum number of claims to return. Returns ------- list of Claim """ db = get_db() _ensure_claims_table(db) query = "SELECT * FROM claims WHERE 1=1" params = [] if file_path: file_path = str(Path(file_path).resolve()) query += " AND file_path = ?" params.append(file_path) if claim_type: query += " AND claim_type = ?" params.append(claim_type) if status: query += " AND status = ?" params.append(status) query += " ORDER BY file_path, line_number LIMIT ?" params.append(limit) conn = sqlite3.connect(str(db.db_path)) conn.row_factory = sqlite3.Row try: rows = conn.execute(query, params).fetchall() return [ Claim( claim_id=row["claim_id"], file_path=row["file_path"], line_number=row["line_number"], claim_type=row["claim_type"], claim_value=row["claim_value"], source_session=row["source_session"], source_file=row["source_file"], source_hash=row["source_hash"], registered_at=row["registered_at"], verified_at=row["verified_at"], status=row["status"], ) for row in rows ] finally: conn.close()
[docs] def verify_claim(claim_id_or_location: str) -> Dict: """Verify a specific claim by checking its source against the verification chain. Parameters ---------- claim_id_or_location : str Either a claim_id or a location string like "paper.tex:L42". Returns ------- dict Verification result with claim details and chain status. """ db = get_db() _ensure_claims_table(db) claim = _resolve_claim(claim_id_or_location, db) if not claim: return { "status": "not_found", "message": f"No claim found for '{claim_id_or_location}'", } result = { "claim": claim.to_dict(), "source_verified": False, "chain_verified": False, "details": [], } # Check source file exists and hash matches if claim.source_file: source_path = Path(claim.source_file) if not source_path.exists(): result["details"].append(f"Source file missing: {claim.source_file}") _update_claim_status(claim.claim_id, "missing", db) result["claim"]["status"] = "missing" return result from ._hash import hash_file current_hash = hash_file(source_path) if ( claim.source_hash and current_hash[: len(claim.source_hash)] == claim.source_hash[: len(current_hash)] ): result["source_verified"] = True result["details"].append("Source file hash matches") else: result["details"].append( f"Source hash mismatch: stored={claim.source_hash}, current={current_hash}" ) _update_claim_status(claim.claim_id, "mismatch", db) result["claim"]["status"] = "mismatch" return result # Verify the chain if we have a source file if claim.source_file: from ._chain import verify_chain try: chain = verify_chain(claim.source_file) result["chain_verified"] = chain.is_verified if chain.is_verified: result["details"].append(f"Chain verified ({len(chain.runs)} runs)") else: result["details"].append( f"Chain verification failed ({len(chain.failed_runs)} failed runs)" ) except Exception as e: result["details"].append(f"Chain verification error: {e}") # Update status if result["source_verified"] and result["chain_verified"]: _update_claim_status(claim.claim_id, "verified", db) result["claim"]["status"] = "verified" elif result["source_verified"]: _update_claim_status(claim.claim_id, "partial", db) result["claim"]["status"] = "partial" return result
def verify_claims_dag( file_path: Optional[str] = None, claim_type: Optional[str] = None, ) -> DAGVerification: """Build a unified DAG from all claims, tracing each back to its source. Parameters ---------- file_path : str, optional Filter claims by manuscript file path. claim_type : str, optional Filter claims by type. Returns ------- DAGVerification Unified verification result covering all claim source chains merged. """ from ._chain import DAGVerification, VerificationStatus from ._dag import verify_dag claims = list_claims(file_path=file_path, claim_type=claim_type) # Collect unique source files from claims source_files = [] for c in claims: if c.source_file and c.source_file not in source_files: source_files.append(c.source_file) if not source_files: return DAGVerification( target_files=[], runs=[], edges=[], status=VerificationStatus.UNKNOWN, topological_order=[], ) return verify_dag(source_files) def _resolve_claim(identifier: str, db) -> Optional[Claim]: """Resolve a claim by ID or location string.""" conn = sqlite3.connect(str(db.db_path)) conn.row_factory = sqlite3.Row try: # Try claim_id first row = conn.execute( "SELECT * FROM claims WHERE claim_id = ?", (identifier,) ).fetchone() if not row: # Try location format: file.tex:L42 match = re.match(r"^(.+):L(\d+)$", identifier) if match: fpath = str(Path(match.group(1)).resolve()) line = int(match.group(2)) row = conn.execute( "SELECT * FROM claims WHERE file_path = ? AND line_number = ?", (fpath, line), ).fetchone() if not row: # Try file path only (returns first match) fpath = str(Path(identifier).resolve()) row = conn.execute( "SELECT * FROM claims WHERE file_path = ? ORDER BY line_number LIMIT 1", (fpath,), ).fetchone() if row: return Claim( claim_id=row["claim_id"], file_path=row["file_path"], line_number=row["line_number"], claim_type=row["claim_type"], claim_value=row["claim_value"], source_session=row["source_session"], source_file=row["source_file"], source_hash=row["source_hash"], registered_at=row["registered_at"], verified_at=row["verified_at"], status=row["status"], ) return None finally: conn.close() def _update_claim_status(claim_id: str, status: str, db) -> None: """Update claim verification status.""" conn = sqlite3.connect(str(db.db_path)) try: conn.execute( "UPDATE claims SET status = ?, verified_at = ? WHERE claim_id = ?", (status, datetime.now().isoformat(), claim_id), ) conn.commit() finally: conn.close() def _ensure_claims_table(db) -> None: """Ensure the claims table exists (run migration).""" migrate_add_claims_table(db.db_path) def format_claims(claims: List[Claim], verbose: bool = False) -> str: """Format claims list for terminal display.""" if not claims: return "No claims registered." lines = [] status_icons = { "registered": "\u25cb", # ○ "verified": "\u2713", # ✓ "mismatch": "\u2717", # ✗ "missing": "?", "partial": "~", } for c in claims: icon = status_icons.get(c.status, "?") loc = c.location val = f" = {c.claim_value}" if c.claim_value else "" lines.append(f" {icon} [{c.claim_type}] {loc}{val}") if verbose and c.source_file: src = Path(c.source_file).name lines.append( f" source: {src} (session: {c.source_session or 'unknown'})" ) return "\n".join(lines) __all__ = [ "CLAIM_TYPES", "Claim", "add_claim", "list_claims", "verify_claim", "verify_claims_dag", "format_claims", "migrate_add_claims_table", ]