Coverage for src / dataknobs_data / backends / sqlite_mixins.py: 25%
65 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""SQLite-specific mixins for vector support and other functionality."""
3from __future__ import annotations
5import json
6import logging
8import numpy as np
10from typing import TYPE_CHECKING
11from ..fields import VectorField
12from ..vector.types import DistanceMetric
14if TYPE_CHECKING:
15 from ..records import Record
18logger = logging.getLogger(__name__)
21class SQLiteVectorSupport:
22 """Vector support for SQLite using JSON storage and Python-based similarity."""
24 def __init__(self):
25 """Initialize vector support tracking."""
26 self._vector_dimensions = {}
27 self._vector_fields = {}
29 def _has_vector_fields(self, record: Record) -> bool:
30 """Check if record has vector fields.
32 Args:
33 record: Record to check
35 Returns:
36 True if record has vector fields
37 """
38 return any(isinstance(field, VectorField)
39 for field in record.fields.values())
41 def _extract_vector_dimensions(self, record: Record) -> dict[str, int]:
42 """Extract dimensions from vector fields in a record.
44 Args:
45 record: Record containing potential vector fields
47 Returns:
48 Dictionary mapping field names to dimensions
49 """
50 dimensions = {}
51 for name, field in record.fields.items():
52 if isinstance(field, VectorField):
53 if field.value is not None:
54 if isinstance(field.value, np.ndarray):
55 dimensions[name] = field.value.shape[0]
56 elif isinstance(field.value, list):
57 dimensions[name] = len(field.value)
58 elif field.dimensions:
59 dimensions[name] = field.dimensions
60 return dimensions
62 def _update_vector_dimensions(self, record: Record) -> None:
63 """Update tracked vector dimensions from a record.
65 Args:
66 record: Record containing vector fields
67 """
68 dimensions = self._extract_vector_dimensions(record)
69 self._vector_dimensions.update(dimensions)
71 # Track which fields are vectors
72 for name, field in record.fields.items():
73 if isinstance(field, VectorField):
74 self._vector_fields[name] = {
75 "dimensions": dimensions.get(name),
76 "source_field": field.source_field,
77 "model_name": field.model_name,
78 "model_version": field.model_version,
79 }
81 def _serialize_vector(self, vector: np.ndarray | list) -> str:
82 """Serialize a vector to JSON string for storage.
84 Args:
85 vector: Vector as numpy array or list
87 Returns:
88 JSON string representation
89 """
90 if isinstance(vector, np.ndarray):
91 vector = vector.tolist()
92 return json.dumps(vector)
94 def _deserialize_vector(self, vector_str: str) -> np.ndarray | None:
95 """Deserialize a vector from JSON string.
97 Args:
98 vector_str: JSON string representation
100 Returns:
101 Numpy array
102 """
103 if not vector_str:
104 return None
105 try:
106 vector_list = json.loads(vector_str)
107 return np.array(vector_list, dtype=np.float32)
108 except (json.JSONDecodeError, TypeError, ValueError):
109 return None
111 def _compute_similarity(
112 self,
113 vec1: np.ndarray | None,
114 vec2: np.ndarray | None,
115 metric: DistanceMetric = DistanceMetric.COSINE
116 ) -> float:
117 """Compute similarity between two vectors.
119 Args:
120 vec1: First vector
121 vec2: Second vector
122 metric: Distance metric to use
124 Returns:
125 Similarity score (higher is more similar)
126 """
127 if vec1 is None or vec2 is None:
128 return 0.0
130 # Ensure vectors are numpy arrays
131 if not isinstance(vec1, np.ndarray):
132 vec1 = np.array(vec1, dtype=np.float32) # type: ignore[unreachable]
133 if not isinstance(vec2, np.ndarray):
134 vec2 = np.array(vec2, dtype=np.float32) # type: ignore[unreachable]
136 # Check dimensions match
137 if vec1.shape != vec2.shape:
138 raise ValueError(f"Vector dimensions don't match: {vec1.shape} vs {vec2.shape}")
140 if metric == DistanceMetric.COSINE:
141 # Cosine similarity
142 norm1 = np.linalg.norm(vec1)
143 norm2 = np.linalg.norm(vec2)
144 if norm1 == 0 or norm2 == 0:
145 return 0.0
146 return float(np.dot(vec1, vec2) / (norm1 * norm2))
148 elif metric == DistanceMetric.EUCLIDEAN:
149 # Convert Euclidean distance to similarity (inverse)
150 distance = float(np.linalg.norm(vec1 - vec2))
151 return 1.0 / (1.0 + distance)
153 elif metric == DistanceMetric.DOT_PRODUCT:
154 # Dot product similarity
155 return float(np.dot(vec1, vec2))
157 else:
158 raise ValueError(f"Unsupported metric: {metric}")