Metadata-Version: 2.4
Name: cjm-graph-storage-adapter-interface
Version: 0.0.4
Summary: Typed graph-storage task adapter interface: the repository-style contract (CRUD, typed query execution, introspection) between graph-storage tool capabilities and the substrate.
Author-email: "Christian J. Mills" <9126128+cj-mills@users.noreply.github.com>
License: Apache-2.0
Project-URL: Repository, https://github.com/cj-mills/cjm-graph-storage-adapter-interface
Project-URL: Documentation, https://cj-mills.github.io/cjm-graph-storage-adapter-interface/
Keywords: nbdev
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cjm_plugin_system>=0.0.44
Requires-Dist: cjm_context_graph_primitives>=0.0.8
Dynamic: license-file

# cjm-graph-storage-adapter-interface


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_graph_storage_adapter_interface
```

## Project Structure

    nbs/
    ├── adapter.ipynb # The graph-storage task contract — `GraphStorageToolProtocol` (BORN REAL) + the `GraphStorageAdapter` ABC. Pass-2 Thread 5: graph storage is a tool capability (sqlite-graph / future postgres-graph) plus **ONE multi-method, repository-style adapter** (CRUD + typed query + introspection) — the canonical multi-op adapter case, NOT 14 separate adapters. The adapter is **domain-neutral** (stores generic `GraphNode`/`GraphEdge`; `Document`/`Segment`/`Correction` mapping stays consumer-side, lock 4) and its implementations run **in-worker beside their tool** (Thread 3).
    └── generic.ipynb # `GenericGraphStorageAdapter` — the generic (backend-agnostic) implementation of the graph-storage task contract. Thread-3 packaging: the GENERIC adapter impl is the dominant case and lives in the dep-light interface lib, reused across all compatible backend tools. Its job is **wire-input normalization + forwarding**: accept wire dicts or typed objects at the task boundary, hand the tool purely-typed arguments, pass typed results through (they wire-encode at the worker boundary).

Total: 2 notebooks

## Module Dependencies

``` mermaid
graph LR
    adapter["adapter<br/>adapter"]
    generic["generic<br/>generic"]

    generic --> adapter
```

*1 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### adapter (`adapter.ipynb`)

> The graph-storage task contract — `GraphStorageToolProtocol` (BORN
> REAL) + the `GraphStorageAdapter` ABC. Pass-2 Thread 5: graph storage
> is a tool capability (sqlite-graph / future postgres-graph) plus **ONE
> multi-method, repository-style adapter** (CRUD + typed query +
> introspection) — the canonical multi-op adapter case, NOT 14 separate
> adapters. The adapter is **domain-neutral** (stores generic
> `GraphNode`/`GraphEdge`; `Document`/`Segment`/`Correction` mapping
> stays consumer-side, lock 4) and its implementations run **in-worker
> beside their tool** (Thread 3).

#### Import

``` python
from cjm_graph_storage_adapter_interface.adapter import (
    GraphStorageToolProtocol,
    GraphStorageAdapter
)
```

#### Classes

``` python
@runtime_checkable
class GraphStorageToolProtocol(Protocol):
    """
    Structural contract for graph-storage tool capabilities (BORN REAL —
    derived from the stage-4 typed surface, not a fused-era mirror).
    
    Each backend tool owns its per-backend translation of the typed query
    expressions and its raw escape (refusing `RawQuery.backend` mismatches).
    The surface is purely typed: graph nouns + query expressions in, typed
    results out.
    """
    
    def add_nodes(self, nodes: List[GraphNode]) -> List[str]: ...
        def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
    
    def add_edges(self, edges: List[GraphEdge]) -> List[str]: ...
    
        # -- point reads + neighborhood + reverse provenance index
        def get_node(self, node_id: str) -> Optional[GraphNode]: ...
    
    def get_node(self, node_id: str) -> Optional[GraphNode]: ...
        def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
    
    def get_edge(self, edge_id: str) -> Optional[GraphEdge]: ...
        def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext: ...
    
    def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext: ...
    
    def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]: ...
        def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
    
    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]: ...
    
        # -- the typed query surface (stage 4; scale-shaped, portable)
        def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
    
    def query_nodes(self, query: NodeQuery) -> NodeQueryResult: ...
        def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
    
    def query_edges(self, query: EdgeQuery) -> EdgeQueryResult: ...
        def raw_query(self, query: RawQuery) -> RawQueryResult: ...
    
    def raw_query(self, query: RawQuery) -> RawQueryResult: ...
    
        # -- update / delete
        def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
    
    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool: ...
        def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
    
    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool: ...
        def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
    
    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int: ...
        def delete_edges(self, edge_ids: List[str]) -> int: ...
    
    def delete_edges(self, edge_ids: List[str]) -> int: ...
    
        # -- introspection / interchange
        def get_schema(self) -> Dict[str, Any]: ...
    
    def get_schema(self) -> Dict[str, Any]: ...
        def integrity_check(self) -> Dict[str, Any]: ...
    
    def integrity_check(self) -> Dict[str, Any]: ...
        def import_graph(self, graph_data: GraphContext,
                         merge_strategy: str = "overwrite") -> Dict[str, int]: ...
    
    def import_graph(self, graph_data: GraphContext,
                         merge_strategy: str = "overwrite") -> Dict[str, int]: ...
    
    def export_graph(self, filter_query: Optional[NodeQuery] = None) -> GraphContext: ...
```

``` python
class GraphStorageAdapter:
    def __init__(
        self,
        tool: GraphStorageToolProtocol,  # The bound tool capability instance (worker-side binding)
    )
    """
    The graph-storage task adapter — ONE multi-method, repository-style
    typed contract (pass-2 Thread 5 lock 1).
    
    Domain-neutral by lock 4: it stores generic `GraphNode`/`GraphEdge`;
    domain node construction (`Document`/`Segment`/`Correction`) stays in the
    consumer (or CR-18 sugar later). A domain-specific op such as
    `verify_spine` is deliberately OFF this surface — verification composes
    from the neutral query aggregates.
    
    Implementations run in-worker beside their tool capability and are
    constructed with the bound tool instance: `AdapterClass(tool)`. The
    adapter is the typed boundary — methods accept wire dicts or typed
    objects for DTO/expression arguments and normalize before touching the
    tool (whose protocol surface is purely typed).
    
    `integrity_check` is the typed introspection op institutionalizing the
    G3 corruption find: loop-backs assert storage health cheaply
    (sqlite -> `PRAGMA quick_check`).
    """
    
    def __init__(
            self,
            tool: GraphStorageToolProtocol,  # The bound tool capability instance (worker-side binding)
        )
    
    def add_nodes(
            self,
            nodes: List[Any],  # GraphNodes or their wire dicts
        ) -> List[str]:  # Created node ids
        "Bulk-create nodes."
    
    def add_edges(
            self,
            edges: List[Any],  # GraphEdges or their wire dicts
        ) -> List[str]:  # Created edge ids
        "Bulk-create edges."
    
    def get_node(
            self,
            node_id: str,  # Node UUID
        ) -> Optional[GraphNode]:  # The node, or None
        "Fetch a single node by id."
    
    def get_edge(
            self,
            edge_id: str,  # Edge UUID
        ) -> Optional[GraphEdge]:  # The edge, or None
        "Fetch a single edge by id."
    
    def get_context(
            self,
            node_id: str,  # Center node UUID
            depth: int = 1,  # Traversal depth (whole-neighborhood reads; scale-shaped reads use query_*)
            filter_labels: Optional[List[str]] = None,  # Restrict returned nodes to these labels
        ) -> GraphContext:  # The neighborhood subgraph
        "Fetch a node's neighborhood subgraph."
    
    def find_nodes_by_source(
            self,
            source_ref: Any,  # SourceRef or its wire dict
        ) -> List[GraphNode]:  # Nodes whose sources match (content-hash-primary)
        "Reverse provenance lookup (content-hash-primary, CR-19)."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label
            limit: int = 100,  # Max nodes returned
        ) -> List[GraphNode]:  # Matching nodes
        "Fetch nodes by label."
    
    def query_nodes(
            self,
            query: Any,  # NodeQuery or its tagged wire dict
        ) -> NodeQueryResult:  # Typed result (nodes / rows / count per query mode)
        "Execute a typed node query (server-side filter/order/page/count)."
    
    def query_edges(
            self,
            query: Any,  # EdgeQuery or its tagged wire dict
        ) -> EdgeQueryResult:  # Typed result (edges / rows / count per query mode)
        "Execute a typed edge query (server-side filter/order/page/count)."
    
    def raw_query(
            self,
            query: Any,  # RawQuery or its tagged wire dict (backend REQUIRED)
        ) -> RawQueryResult:  # Tabular backend-shaped result
        "Execute the marked, backend-coupled raw escape (the promotion forcing function)."
    
    def update_node(
            self,
            node_id: str,  # Node UUID
            properties: Dict[str, Any],  # Properties to merge
        ) -> bool:  # True if the node existed
        "Merge properties into a node."
    
    def update_edge(
            self,
            edge_id: str,  # Edge UUID
            properties: Dict[str, Any],  # Properties to merge
        ) -> bool:  # True if the edge existed
        "Merge properties into an edge."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # Node UUIDs
            cascade: bool = True,  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Bulk-delete nodes."
    
    def delete_edges(
            self,
            edge_ids: List[str],  # Edge UUIDs
        ) -> int:  # Number of edges deleted
        "Bulk-delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Backend-reported schema/ontology summary
            """Report the stored graph's schema (labels, relation types, counts)."""
            ...
    
        @abstractmethod
        def integrity_check(self) -> Dict[str, Any]:  # {"ok": bool, "errors": List[str], "backend": str}
        "Report the stored graph's schema (labels, relation types, counts)."
    
    def integrity_check(self) -> Dict[str, Any]:  # {"ok": bool, "errors": List[str], "backend": str}
            """Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."""
            ...
    
        @abstractmethod
        def import_graph(
            self,
            graph_data: Any,  # GraphContext or its wire dict
            merge_strategy: str = "overwrite",  # skip | overwrite | merge
        ) -> Dict[str, int]:  # Import counts per entity kind
        "Backend self-check (G3 institutionalized; sqlite -> PRAGMA quick_check)."
    
    def import_graph(
            self,
            graph_data: Any,  # GraphContext or its wire dict
            merge_strategy: str = "overwrite",  # skip | overwrite | merge
        ) -> Dict[str, int]:  # Import counts per entity kind
        "Bulk-import a subgraph."
    
    def export_graph(
            self,
            filter_query: Optional[Any] = None,  # NodeQuery (or wire dict) selecting nodes; None = whole graph
        ) -> GraphContext:  # The exported subgraph (matching nodes + edges among them)
        "Export the graph (optionally filtered by a typed node query)."
```

### generic (`generic.ipynb`)

> `GenericGraphStorageAdapter` — the generic (backend-agnostic)
> implementation of the graph-storage task contract. Thread-3 packaging:
> the GENERIC adapter impl is the dominant case and lives in the
> dep-light interface lib, reused across all compatible backend tools.
> Its job is **wire-input normalization + forwarding**: accept wire
> dicts or typed objects at the task boundary, hand the tool
> purely-typed arguments, pass typed results through (they wire-encode
> at the worker boundary).

#### Import

``` python
from cjm_graph_storage_adapter_interface.generic import (
    ensure_node,
    ensure_edge,
    ensure_context,
    ensure_source_ref,
    ensure_node_query,
    ensure_edge_query,
    ensure_raw_query,
    GenericGraphStorageAdapter
)
```

#### Functions

``` python
def ensure_node(
    value: Any,  # GraphNode or its wire dict
) -> GraphNode:  # Typed node
    "Normalize a wire dict to a `GraphNode` (typed values pass through)."
```

``` python
def ensure_edge(
    value: Any,  # GraphEdge or its wire dict
) -> GraphEdge:  # Typed edge
    "Normalize a wire dict to a `GraphEdge` (typed values pass through)."
```

``` python
def ensure_context(
    value: Any,  # GraphContext or its wire dict
) -> GraphContext:  # Typed context
    "Normalize a wire dict to a `GraphContext` (typed values pass through)."
```

``` python
def ensure_source_ref(
    value: Any,  # SourceRef or its wire dict
) -> SourceRef:  # Typed source ref
    "Normalize a wire dict to a `SourceRef` (typed values pass through)."
```

``` python
def ensure_node_query(
    value: Any,  # NodeQuery or its tagged wire dict
) -> NodeQuery:  # Typed query
    "Normalize a tagged wire dict to a `NodeQuery` (typed values pass through)."
```

``` python
def ensure_edge_query(
    value: Any,  # EdgeQuery or its tagged wire dict
) -> EdgeQuery:  # Typed query
    "Normalize a tagged wire dict to an `EdgeQuery` (typed values pass through)."
```

``` python
def ensure_raw_query(
    value: Any,  # RawQuery or its tagged wire dict
) -> RawQuery:  # Typed query
    "Normalize a tagged wire dict to a `RawQuery` (typed values pass through)."
```

#### Classes

``` python
class GenericGraphStorageAdapter(GraphStorageAdapter):
    """
    Generic graph-storage adapter: normalize wire inputs, forward to the
    bound tool, pass typed results through.
    
    Works against ANY tool satisfying `GraphStorageToolProtocol` — the
    backend owns translation, so nothing here is backend-specific.
    """
    
    def add_nodes(self, nodes: List[Any]) -> List[str]:
            """Bulk-create nodes."""
            return self.tool.add_nodes([ensure_node(n) for n in nodes])
    
        def add_edges(self, edges: List[Any]) -> List[str]
        "Bulk-create nodes."
    
    def add_edges(self, edges: List[Any]) -> List[str]:
            """Bulk-create edges."""
            return self.tool.add_edges([ensure_edge(e) for e in edges])
    
        def get_node(self, node_id: str) -> Optional[GraphNode]
        "Bulk-create edges."
    
    def get_node(self, node_id: str) -> Optional[GraphNode]:
            """Fetch a single node by id."""
            return self.tool.get_node(node_id)
    
        def get_edge(self, edge_id: str) -> Optional[GraphEdge]
        "Fetch a single node by id."
    
    def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
            """Fetch a single edge by id."""
            return self.tool.get_edge(edge_id)
    
        def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext
        "Fetch a single edge by id."
    
    def get_context(self, node_id: str, depth: int = 1,
                        filter_labels: Optional[List[str]] = None) -> GraphContext
        "Fetch a node's neighborhood subgraph."
    
    def find_nodes_by_source(self, source_ref: Any) -> List[GraphNode]:
            """Reverse provenance lookup (content-hash-primary)."""
            return self.tool.find_nodes_by_source(ensure_source_ref(source_ref))
    
        def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]
        "Reverse provenance lookup (content-hash-primary)."
    
    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
            """Fetch nodes by label."""
            return self.tool.find_nodes_by_label(label, limit=limit)
    
        def query_nodes(self, query: Any) -> NodeQueryResult
        "Fetch nodes by label."
    
    def query_nodes(self, query: Any) -> NodeQueryResult:
            """Execute a typed node query."""
            return self.tool.query_nodes(ensure_node_query(query))
    
        def query_edges(self, query: Any) -> EdgeQueryResult
        "Execute a typed node query."
    
    def query_edges(self, query: Any) -> EdgeQueryResult:
            """Execute a typed edge query."""
            return self.tool.query_edges(ensure_edge_query(query))
    
        def raw_query(self, query: Any) -> RawQueryResult
        "Execute a typed edge query."
    
    def raw_query(self, query: Any) -> RawQueryResult:
            """Execute the marked, backend-coupled raw escape."""
            return self.tool.raw_query(ensure_raw_query(query))
    
        def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool
        "Execute the marked, backend-coupled raw escape."
    
    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
            """Merge properties into a node."""
            return self.tool.update_node(node_id, properties)
    
        def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool
        "Merge properties into a node."
    
    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
            """Merge properties into an edge."""
            return self.tool.update_edge(edge_id, properties)
    
        def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int
        "Merge properties into an edge."
    
    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
            """Bulk-delete nodes."""
            return self.tool.delete_nodes(node_ids, cascade=cascade)
    
        def delete_edges(self, edge_ids: List[str]) -> int
        "Bulk-delete nodes."
    
    def delete_edges(self, edge_ids: List[str]) -> int:
            """Bulk-delete edges."""
            return self.tool.delete_edges(edge_ids)
    
        def get_schema(self) -> Dict[str, Any]
        "Bulk-delete edges."
    
    def get_schema(self) -> Dict[str, Any]:
            """Report the stored graph's schema."""
            return self.tool.get_schema()
    
        def integrity_check(self) -> Dict[str, Any]
        "Report the stored graph's schema."
    
    def integrity_check(self) -> Dict[str, Any]:
            """Backend self-check (G3 institutionalized)."""
            return self.tool.integrity_check()
    
        def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]
        "Backend self-check (G3 institutionalized)."
    
    def import_graph(self, graph_data: Any, merge_strategy: str = "overwrite") -> Dict[str, int]:
            """Bulk-import a subgraph."""
            return self.tool.import_graph(ensure_context(graph_data), merge_strategy=merge_strategy)
    
        def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
        "Bulk-import a subgraph."
    
    def export_graph(self, filter_query: Optional[Any] = None) -> GraphContext
        "Export the graph (optionally filtered by a typed node query)."
```
