Coverage for src / dataknobs_data / backends / file.py: 15%
634 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""File-based database backend implementation."""
3from __future__ import annotations
5import asyncio
6import csv
7import gzip
8import json
9import os
10import platform
11import tempfile
12import threading
13import time
14import uuid
15from pathlib import Path
16from typing import Any, TYPE_CHECKING
18from dataknobs_config import ConfigurableBase
20from ..database import AsyncDatabase, SyncDatabase
21from ..query import Query
22from ..records import Record
23from ..streaming import AsyncStreamingMixin, StreamConfig, StreamingMixin, StreamResult
24from ..vector import VectorOperationsMixin
25from ..vector.bulk_embed_mixin import BulkEmbedMixin
26from ..vector.python_vector_search import PythonVectorSearchMixin
27from .sqlite_mixins import SQLiteVectorSupport
28from .vector_config_mixin import VectorConfigMixin
30if TYPE_CHECKING:
31 from collections.abc import AsyncIterator, Iterator
34class FileLock:
35 """Cross-platform file locking."""
37 def __init__(self, filepath: str):
38 self.filepath = filepath
39 self.lockfile = filepath + ".lock"
40 self.lock_handle = None
42 def acquire(self):
43 """Acquire the file lock."""
44 if platform.system() == "Windows":
45 import msvcrt
47 while True:
48 try:
49 self.lock_handle = open(self.lockfile, "wb")
50 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_NBLCK, 1)
51 break
52 except OSError:
53 if self.lock_handle:
54 self.lock_handle.close() # type: ignore[unreachable]
55 import time
57 time.sleep(0.01)
58 else:
59 import fcntl
61 self.lock_handle = open(self.lockfile, "wb")
62 fcntl.lockf(self.lock_handle, fcntl.LOCK_EX)
64 def release(self):
65 """Release the file lock."""
66 if self.lock_handle:
67 if platform.system() == "Windows": # type: ignore[unreachable]
68 import msvcrt
70 try:
71 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_UNLCK, 1)
72 except OSError:
73 pass
74 self.lock_handle.close()
75 try:
76 os.remove(self.lockfile)
77 except (OSError, FileNotFoundError):
78 pass
80 def __enter__(self):
81 self.acquire()
82 return self
84 def __exit__(self, exc_type, exc_val, exc_tb):
85 self.release()
88class FileFormat:
89 """Base class for file format handlers."""
91 @staticmethod
92 def load(filepath: str) -> dict[str, dict[str, Any]]:
93 """Load data from file."""
94 raise NotImplementedError
96 @staticmethod
97 def save(filepath: str, data: dict[str, dict[str, Any]]):
98 """Save data to file."""
99 raise NotImplementedError
102class JSONFormat(FileFormat):
103 """JSON file format handler."""
105 @staticmethod
106 def load(filepath: str) -> dict[str, dict[str, Any]]:
107 """Load data from JSON file."""
108 if not os.path.exists(filepath):
109 return {}
111 # Check if file is empty
112 if os.path.getsize(filepath) == 0:
113 return {}
115 try:
116 if filepath.endswith(".gz"):
117 try:
118 with gzip.open(filepath, "rt", encoding="utf-8") as f:
119 content = f.read()
120 if not content.strip():
121 return {}
122 data = json.loads(content)
123 except (gzip.BadGzipFile, OSError):
124 # File has .gz extension but isn't gzipped, treat as regular file
125 with open(filepath, encoding="utf-8") as f:
126 content = f.read()
127 if not content.strip():
128 return {}
129 data = json.loads(content)
130 else:
131 with open(filepath, encoding="utf-8") as f:
132 content = f.read()
133 if not content.strip():
134 return {}
135 data = json.loads(content)
137 return data
138 except json.JSONDecodeError:
139 return {}
141 @staticmethod
142 def save(filepath: str, data: dict[str, dict[str, Any]]):
143 """Save data to JSON file."""
144 if filepath.endswith(".gz"):
145 with gzip.open(filepath, "wt", encoding="utf-8") as f:
146 json.dump(data, f, indent=2, ensure_ascii=False)
147 else:
148 with open(filepath, "w", encoding="utf-8") as f:
149 json.dump(data, f, indent=2, ensure_ascii=False)
152class CSVFormat(FileFormat):
153 """CSV file format handler."""
155 @staticmethod
156 def load(filepath: str) -> dict[str, dict[str, Any]]:
157 """Load data from CSV file."""
158 if not os.path.exists(filepath):
159 return {}
161 # Check if file is empty
162 if os.path.getsize(filepath) == 0:
163 return {}
165 data = {}
166 try:
167 if filepath.endswith(".gz"):
168 with gzip.open(filepath, "rt", encoding="utf-8") as f:
169 reader = csv.DictReader(f)
170 for row in reader:
171 if "__id__" in row:
172 record_id = row.pop("__id__")
173 # Try to deserialize JSON strings back to objects
174 fields = {}
175 for key, value in row.items():
176 if value and isinstance(value, str):
177 # Try to parse as JSON if it looks like JSON
178 if (value.startswith('{') and value.endswith('}')) or \
179 (value.startswith('[') and value.endswith(']')):
180 try:
181 fields[key] = json.loads(value)
182 except json.JSONDecodeError:
183 fields[key] = value
184 else:
185 fields[key] = value
186 else:
187 fields[key] = value
188 data[record_id] = {"fields": fields}
189 else:
190 with open(filepath, encoding="utf-8") as f:
191 reader = csv.DictReader(f)
192 for row in reader:
193 if "__id__" in row:
194 record_id = row.pop("__id__")
195 # Try to deserialize JSON strings back to objects
196 fields = {}
197 for key, value in row.items():
198 if value and isinstance(value, str):
199 # Try to parse as JSON if it looks like JSON
200 if (value.startswith('{') and value.endswith('}')) or \
201 (value.startswith('[') and value.endswith(']')):
202 try:
203 fields[key] = json.loads(value)
204 except json.JSONDecodeError:
205 fields[key] = value
206 else:
207 fields[key] = value
208 else:
209 fields[key] = value
210 data[record_id] = {"fields": fields}
211 except (OSError, csv.Error):
212 return {}
214 return data
216 @staticmethod
217 def save(filepath: str, data: dict[str, dict[str, Any]]):
218 """Save data to CSV file."""
219 if not data:
220 if filepath.endswith(".gz"):
221 with gzip.open(filepath, "wt", encoding="utf-8") as f:
222 f.write("")
223 else:
224 with open(filepath, "w", encoding="utf-8") as f:
225 f.write("")
226 return
228 # Extract all field names and prepare flattened data
229 all_fields = set()
230 flattened_data = {}
231 for record_id, record_data in data.items():
232 if "fields" in record_data:
233 # Flatten field values for CSV format
234 flat_fields = {}
235 for field_name, field_data in record_data["fields"].items():
236 # Handle both full field dicts and simple values
237 if isinstance(field_data, dict) and "value" in field_data:
238 value = field_data["value"]
239 else:
240 value = field_data
242 # Serialize complex types as JSON strings
243 if isinstance(value, (dict, list)):
244 flat_fields[field_name] = json.dumps(value)
245 else:
246 flat_fields[field_name] = value
247 all_fields.add(field_name)
248 flattened_data[record_id] = flat_fields
250 fieldnames = ["__id__"] + sorted(list(all_fields))
252 if filepath.endswith(".gz"):
253 with gzip.open(filepath, "wt", encoding="utf-8", newline="") as f:
254 writer = csv.DictWriter(f, fieldnames=fieldnames)
255 writer.writeheader()
256 for record_id, fields in flattened_data.items():
257 row = {"__id__": record_id}
258 row.update(fields)
259 writer.writerow(row)
260 else:
261 with open(filepath, "w", encoding="utf-8", newline="") as f:
262 writer = csv.DictWriter(f, fieldnames=fieldnames)
263 writer.writeheader()
264 for record_id, fields in flattened_data.items():
265 row = {"__id__": record_id}
266 row.update(fields)
267 writer.writerow(row)
270class ParquetFormat(FileFormat):
271 """Parquet file format handler."""
273 @staticmethod
274 def load(filepath: str) -> dict[str, dict[str, Any]]:
275 """Load data from Parquet file."""
276 if not os.path.exists(filepath):
277 return {}
279 try:
280 import pandas as pd
282 df = pd.read_parquet(filepath)
283 data = {}
285 for idx, row in df.iterrows():
286 row_dict = row.to_dict()
287 if "__id__" in row_dict:
288 record_id = row_dict.pop("__id__")
289 else:
290 record_id = str(idx)
292 # Remove NaN values
293 fields = {k: v for k, v in row_dict.items() if pd.notna(v)}
294 data[record_id] = {"fields": fields}
296 return data
297 except ImportError as e:
298 raise ImportError("Parquet support requires pandas and pyarrow packages") from e
300 @staticmethod
301 def save(filepath: str, data: dict[str, dict[str, Any]]):
302 """Save data to Parquet file."""
303 try:
304 import pandas as pd
306 if not data:
307 # Create empty DataFrame
308 df = pd.DataFrame()
309 else:
310 rows = []
311 for record_id, record_data in data.items():
312 row = {"__id__": record_id}
313 if "fields" in record_data:
314 # Flatten field values for Parquet format
315 for field_name, field_data in record_data["fields"].items():
316 # Handle both full field dicts and simple values
317 if isinstance(field_data, dict) and "value" in field_data:
318 row[field_name] = field_data["value"]
319 else:
320 row[field_name] = field_data
321 rows.append(row)
323 df = pd.DataFrame(rows)
325 df.to_parquet(filepath, index=False, compression="snappy")
326 except ImportError as e:
327 raise ImportError("Parquet support requires pandas and pyarrow packages") from e
330class AsyncFileDatabase( # type: ignore[misc]
331 AsyncDatabase,
332 AsyncStreamingMixin,
333 ConfigurableBase,
334 VectorConfigMixin,
335 SQLiteVectorSupport,
336 PythonVectorSearchMixin,
337 BulkEmbedMixin,
338 VectorOperationsMixin
339):
340 """Async file-based database implementation."""
342 FORMAT_HANDLERS = {
343 ".json": JSONFormat,
344 ".csv": CSVFormat,
345 ".tsv": CSVFormat,
346 ".parquet": ParquetFormat,
347 ".pq": ParquetFormat,
348 }
350 def __init__(self, config: dict[str, Any] | None = None):
351 super().__init__(config)
353 # If no path specified, use a temporary file instead of polluting CWD
354 if "path" not in self.config:
355 # Create a unique temporary file that won't conflict
356 temp_file = tempfile.NamedTemporaryFile(
357 prefix="dataknobs_async_db_",
358 suffix=".json",
359 delete=False
360 )
361 self.filepath = temp_file.name
362 temp_file.close()
363 self._is_temp_file = True
364 else:
365 self.filepath = self.config["path"]
366 self._is_temp_file = False
368 self.format = self.config.get("format")
369 self.compression = self.config.get("compression", None)
370 self._lock = asyncio.Lock()
371 self._file_lock = FileLock(self.filepath)
373 # Detect format from file extension if not specified
374 if not self.format:
375 path = Path(self.filepath)
376 # Check for compression
377 if path.suffix == ".gz":
378 self.compression = "gzip"
379 path = Path(path.stem)
381 ext = path.suffix.lower()
382 if ext in self.FORMAT_HANDLERS:
383 self.format = ext.lstrip(".")
384 else:
385 self.format = "json" # Default to JSON
387 # Apply compression to filepath if specified
388 if self.compression == "gzip" and not self.filepath.endswith(".gz"):
389 self.filepath += ".gz"
391 # Get the appropriate format handler
392 ext = f".{self.format}"
393 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat)
395 # Initialize vector support
396 self._parse_vector_config(config or {})
397 self._init_vector_state()
399 @classmethod
400 def from_config(cls, config: dict) -> AsyncFileDatabase:
401 """Create from config dictionary."""
402 return cls(config)
404 def _generate_id(self) -> str:
405 """Generate a unique ID for a record."""
406 return str(uuid.uuid4())
408 async def _load_data(self) -> dict[str, Record]:
409 """Load all data from file."""
410 with self._file_lock:
411 raw_data = self.handler.load(self.filepath)
412 data = {}
413 for record_id, record_dict in raw_data.items():
414 data[record_id] = Record.from_dict(record_dict)
415 return data
417 async def _save_data(self, data: dict[str, Record]):
418 """Save all data to file atomically."""
419 # Convert records to dictionaries
420 raw_data = {}
421 for record_id, record in data.items():
422 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False)
424 # Write to temporary file first
425 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".")
426 os.close(temp_fd)
428 try:
429 with self._file_lock:
430 self.handler.save(temp_path, raw_data)
431 # Atomic rename
432 os.replace(temp_path, self.filepath)
433 except Exception:
434 # Clean up temp file on error
435 if os.path.exists(temp_path):
436 os.remove(temp_path)
437 raise
439 async def create(self, record: Record) -> str:
440 """Create a new record in the file."""
441 async with self._lock:
442 data = await self._load_data()
443 # Use centralized method to prepare record
444 record_copy, storage_id = self._prepare_record_for_storage(record)
445 data[storage_id] = record_copy
446 await self._save_data(data)
447 return storage_id
449 async def read(self, id: str) -> Record | None:
450 """Read a record from the file."""
451 async with self._lock:
452 data = await self._load_data()
453 record = data.get(id)
454 # Use centralized method to prepare record
455 return self._prepare_record_from_storage(record, id)
457 async def update(self, id: str, record: Record) -> bool:
458 """Update a record in the file."""
459 async with self._lock:
460 data = await self._load_data()
461 if id in data:
462 data[id] = record.copy(deep=True)
463 await self._save_data(data)
464 return True
465 return False
467 async def delete(self, id: str) -> bool:
468 """Delete a record from the file."""
469 async with self._lock:
470 data = await self._load_data()
471 if id in data:
472 del data[id]
473 await self._save_data(data)
474 return True
475 return False
477 async def exists(self, id: str) -> bool:
478 """Check if a record exists in the file."""
479 async with self._lock:
480 data = await self._load_data()
481 return id in data
483 async def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
484 """Update or insert a record.
486 Can be called as:
487 - upsert(id, record) - explicit ID and record
488 - upsert(record) - extract ID from record using Record's built-in logic
489 """
490 # Determine ID and record based on arguments
491 if isinstance(id_or_record, str):
492 id = id_or_record
493 if record is None:
494 raise ValueError("Record required when ID is provided")
495 else:
496 record = id_or_record
497 id = record.id
498 if id is None:
499 import uuid # type: ignore[unreachable]
500 id = str(uuid.uuid4())
501 record.storage_id = id
503 async with self._lock:
504 data = await self._load_data()
505 data[id] = record.copy(deep=True)
506 await self._save_data(data)
507 return id
509 async def search(self, query: Query) -> list[Record]:
510 """Search for records matching the query."""
511 async with self._lock:
512 data = await self._load_data()
513 results = []
515 for record_id, record in data.items():
516 # Apply filters
517 matches = True
518 for filter in query.filters:
519 # Special handling for 'id' field
520 if filter.field == 'id':
521 field_value = record_id
522 else:
523 field_value = record.get_value(filter.field)
524 if not filter.matches(field_value):
525 matches = False
526 break
528 if matches:
529 results.append((record_id, record))
531 # Use the helper method from base class
532 return self._process_search_results(results, query, deep_copy=True)
534 async def _count_all(self) -> int:
535 """Count all records in the file."""
536 async with self._lock:
537 data = await self._load_data()
538 return len(data)
540 async def clear(self) -> int:
541 """Clear all records from the file."""
542 async with self._lock:
543 data = await self._load_data()
544 count = len(data)
545 await self._save_data({})
546 return count
548 async def create_batch(self, records: list[Record]) -> list[str]:
549 """Create multiple records efficiently."""
550 async with self._lock:
551 data = await self._load_data()
552 ids = []
553 for record in records:
554 record_id = self._generate_id()
555 data[record_id] = record.copy(deep=True)
556 ids.append(record_id)
557 await self._save_data(data)
558 return ids
560 async def read_batch(self, ids: list[str]) -> list[Record | None]:
561 """Read multiple records efficiently."""
562 async with self._lock:
563 data = await self._load_data()
564 results = []
565 for record_id in ids:
566 record = data.get(record_id)
567 results.append(record.copy(deep=True) if record else None)
568 return results
570 async def delete_batch(self, ids: list[str]) -> list[bool]:
571 """Delete multiple records efficiently."""
572 async with self._lock:
573 data = await self._load_data()
574 results = []
575 modified = False
576 for record_id in ids:
577 if record_id in data:
578 del data[record_id]
579 results.append(True)
580 modified = True
581 else:
582 results.append(False)
584 if modified:
585 await self._save_data(data)
587 return results
589 async def stream_read(
590 self,
591 query: Query | None = None,
592 config: StreamConfig | None = None
593 ) -> AsyncIterator[Record]:
594 """Stream records from file."""
595 # For file backend, we can use the default implementation
596 # since we need to load all data anyway
597 config = config or StreamConfig()
599 # Use search to get all matching records
600 if query:
601 records = await self.search(query)
602 else:
603 records = await self.search(Query())
605 # Yield records in batches for consistency
606 for i in range(0, len(records), config.batch_size):
607 batch = records[i:i + config.batch_size]
608 for record in batch:
609 yield record
611 async def stream_write(
612 self,
613 records: AsyncIterator[Record],
614 config: StreamConfig | None = None
615 ) -> StreamResult:
616 """Stream records into file."""
617 # Use the default implementation from mixin
618 return await self._default_stream_write(records, config)
620 async def vector_search(
621 self,
622 query_vector,
623 vector_field: str = "embedding",
624 k: int = 10,
625 filter=None,
626 metric=None,
627 **kwargs
628 ):
629 """Perform vector similarity search using Python calculations.
631 Note: This implementation reads all records from disk to perform
632 the search locally. For better performance with large datasets,
633 consider using SQLite or a dedicated vector database.
634 """
635 return await self.python_vector_search_async(
636 query_vector=query_vector,
637 vector_field=vector_field,
638 k=k,
639 filter=filter,
640 metric=metric,
641 **kwargs
642 )
644 async def close(self) -> None:
645 """Close the database and clean up temporary files if needed."""
646 # Clean up temporary file if it was created
647 if getattr(self, '_is_temp_file', False) and self.filepath:
648 try:
649 if os.path.exists(self.filepath):
650 Path(self.filepath).unlink()
651 # Also remove lock file if it exists
652 lock_file = self.filepath + ".lock"
653 if os.path.exists(lock_file):
654 Path(lock_file).unlink()
655 except OSError:
656 pass # Best effort cleanup
659class SyncFileDatabase( # type: ignore[misc]
660 SyncDatabase,
661 StreamingMixin,
662 ConfigurableBase,
663 VectorConfigMixin,
664 SQLiteVectorSupport,
665 PythonVectorSearchMixin,
666 BulkEmbedMixin,
667 VectorOperationsMixin
668):
669 """Synchronous file-based database implementation."""
671 FORMAT_HANDLERS = {
672 ".json": JSONFormat,
673 ".csv": CSVFormat,
674 ".tsv": CSVFormat,
675 ".parquet": ParquetFormat,
676 ".pq": ParquetFormat,
677 }
679 def __init__(self, config: dict[str, Any] | None = None):
680 super().__init__(config)
682 # If no path specified, use a temporary file instead of polluting CWD
683 if "path" not in self.config:
684 # Create a unique temporary file that won't conflict
685 temp_file = tempfile.NamedTemporaryFile(
686 prefix="dataknobs_sync_db_",
687 suffix=".json",
688 delete=False
689 )
690 self.filepath = temp_file.name
691 temp_file.close()
692 self._is_temp_file = True
693 else:
694 self.filepath = self.config["path"]
695 self._is_temp_file = False
697 self.format = self.config.get("format")
698 self.compression = self.config.get("compression", None)
699 self._lock = threading.RLock()
700 self._file_lock = FileLock(self.filepath)
702 # Detect format from file extension if not specified
703 if not self.format:
704 path = Path(self.filepath)
705 # Check for compression
706 if path.suffix == ".gz":
707 self.compression = "gzip"
708 path = Path(path.stem)
710 ext = path.suffix.lower()
711 if ext in self.FORMAT_HANDLERS:
712 self.format = ext.lstrip(".")
713 else:
714 self.format = "json" # Default to JSON
716 # Apply compression to filepath if specified
717 if self.compression == "gzip" and not self.filepath.endswith(".gz"):
718 self.filepath += ".gz"
720 # Get the appropriate format handler
721 ext = f".{self.format}"
722 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat)
724 # Initialize vector support
725 self._parse_vector_config(config or {})
726 self._init_vector_state()
728 @classmethod
729 def from_config(cls, config: dict) -> SyncFileDatabase:
730 """Create from config dictionary."""
731 return cls(config)
733 def _generate_id(self) -> str:
734 """Generate a unique ID for a record."""
735 return str(uuid.uuid4())
737 def _load_data(self) -> dict[str, Record]:
738 """Load all data from file."""
739 with self._file_lock:
740 raw_data = self.handler.load(self.filepath)
741 data = {}
742 for record_id, record_dict in raw_data.items():
743 data[record_id] = Record.from_dict(record_dict)
744 return data
746 def _save_data(self, data: dict[str, Record]):
747 """Save all data to file atomically."""
748 # Convert records to dictionaries
749 raw_data = {}
750 for record_id, record in data.items():
751 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False)
753 # Write to temporary file first
754 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".")
755 os.close(temp_fd)
757 try:
758 with self._file_lock:
759 self.handler.save(temp_path, raw_data)
760 # Atomic rename
761 os.replace(temp_path, self.filepath)
762 except Exception:
763 # Clean up temp file on error
764 if os.path.exists(temp_path):
765 os.remove(temp_path)
766 raise
768 def _do_set_data(self, data: dict[str, Record], record: Record) -> str:
769 """Ensure record has a storage ID, set data[id]=record.copy() and return the ID"""
770 # Use centralized method to prepare record
771 record_copy, storage_id = self._prepare_record_for_storage(record)
773 # Store the record
774 data[storage_id] = record_copy
775 return storage_id
777 def create(self, record: Record) -> str:
778 """Create a new record in the file."""
779 with self._lock:
780 data = self._load_data()
781 # Use record's ID if it has one, otherwise generate a new one
782 record_id = self._do_set_data(data, record)
783 self._save_data(data)
784 return record_id
786 def read(self, id: str) -> Record | None:
787 """Read a record from the file."""
788 with self._lock:
789 data = self._load_data()
790 record = data.get(id)
791 # Use centralized method to prepare record
792 return self._prepare_record_from_storage(record, id)
794 def update(self, id: str, record: Record) -> bool:
795 """Update a record in the file."""
796 with self._lock:
797 data = self._load_data()
798 if id in data:
799 data[id] = record.copy(deep=True)
800 self._save_data(data)
801 return True
802 return False
804 def delete(self, id: str) -> bool:
805 """Delete a record from the file."""
806 with self._lock:
807 data = self._load_data()
808 if id in data:
809 del data[id]
810 self._save_data(data)
811 return True
812 return False
814 def exists(self, id: str) -> bool:
815 """Check if a record exists in the file."""
816 with self._lock:
817 data = self._load_data()
818 return id in data
820 def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str:
821 """Update or insert a record.
823 Can be called as:
824 - upsert(id, record) - explicit ID and record
825 - upsert(record) - extract ID from record using Record's built-in logic
826 """
827 # Determine ID and record based on arguments
828 if isinstance(id_or_record, str):
829 id = id_or_record
830 if record is None:
831 raise ValueError("Record required when ID is provided")
832 else:
833 record = id_or_record
834 id = record.id
835 if id is None:
836 import uuid # type: ignore[unreachable]
837 id = str(uuid.uuid4())
838 record.storage_id = id
840 with self._lock:
841 data = self._load_data()
842 data[id] = record.copy(deep=True)
843 self._save_data(data)
844 return id
846 def search(self, query: Query) -> list[Record]:
847 """Search for records matching the query."""
848 with self._lock:
849 data = self._load_data()
850 results = []
852 for record_id, record in data.items():
853 # Apply filters
854 matches = True
855 for filter in query.filters:
856 # Special handling for 'id' field
857 if filter.field == 'id':
858 field_value = record_id
859 else:
860 field_value = record.get_value(filter.field)
861 if not filter.matches(field_value):
862 matches = False
863 break
865 if matches:
866 results.append((record_id, record))
868 # Use the helper method from base class
869 return self._process_search_results(results, query, deep_copy=True)
871 def _count_all(self) -> int:
872 """Count all records in the file."""
873 with self._lock:
874 data = self._load_data()
875 return len(data)
877 def clear(self) -> int:
878 """Clear all records from the file."""
879 with self._lock:
880 data = self._load_data()
881 count = len(data)
882 self._save_data({})
883 return count
885 def create_batch(self, records: list[Record]) -> list[str]:
886 """Create multiple records efficiently."""
887 with self._lock:
888 data = self._load_data()
889 ids = []
890 for record in records:
891 record_id = self._do_set_data(data, record)
892 ids.append(record_id)
893 self._save_data(data)
894 return ids
896 def read_batch(self, ids: list[str]) -> list[Record | None]:
897 """Read multiple records efficiently."""
898 with self._lock:
899 data = self._load_data()
900 results = []
901 for record_id in ids:
902 record = data.get(record_id)
903 results.append(record.copy(deep=True) if record else None)
904 return results
906 def delete_batch(self, ids: list[str]) -> list[bool]:
907 """Delete multiple records efficiently."""
908 with self._lock:
909 data = self._load_data()
910 results = []
911 modified = False
912 for record_id in ids:
913 if record_id in data:
914 del data[record_id]
915 results.append(True)
916 modified = True
917 else:
918 results.append(False)
920 if modified:
921 self._save_data(data)
923 return results
925 def stream_read(
926 self,
927 query: Query | None = None,
928 config: StreamConfig | None = None
929 ) -> Iterator[Record]:
930 """Stream records from file."""
931 # For file backend, we can use the default implementation
932 # since we need to load all data anyway
933 config = config or StreamConfig()
935 # Use search to get all matching records
936 if query:
937 records = self.search(query)
938 else:
939 records = self.search(Query())
941 # Yield records in batches for consistency
942 for i in range(0, len(records), config.batch_size):
943 batch = records[i:i + config.batch_size]
944 for record in batch:
945 yield record
947 def stream_write(
948 self,
949 records: Iterator[Record],
950 config: StreamConfig | None = None
951 ) -> StreamResult:
952 """Stream records into file."""
953 # Use the default implementation
954 config = config or StreamConfig()
955 result = StreamResult()
956 start_time = time.time()
957 quitting = False
959 def do_write_batch(batch: list) -> bool:
960 """Write batch with individual retries, return False to quit"""
961 retval = True
962 try:
963 ids = self.create_batch(batch)
964 result.successful += len(ids)
965 result.total_processed += len(batch)
966 except Exception:
967 # Try creating each item again and catch specific error items
968 for rec in batch:
969 result.total_processed += 1
970 try:
971 self.create(rec)
972 result.successful += 1
973 except Exception as e:
974 # This item failed again
975 result.failed += 1
976 result.add_error(None, e)
977 if config.on_error:
978 if not config.on_error(e, rec):
979 retval = False
980 break
981 else:
982 # Without "on_error", quit streaming
983 retval = False
984 break
985 return retval
987 batch = []
988 for record in records:
989 batch.append(record)
991 if len(batch) >= config.batch_size:
992 # Write batch
993 quitting = not do_write_batch(batch)
994 if quitting:
995 # Got signal to quit
996 break
997 batch = []
999 # Write remaining batch
1000 if batch and not quitting:
1001 do_write_batch(batch)
1003 result.duration = time.time() - start_time
1004 return result
1006 def vector_search(
1007 self,
1008 query_vector,
1009 vector_field: str = "embedding",
1010 k: int = 10,
1011 filter=None,
1012 metric=None,
1013 **kwargs
1014 ):
1015 """Perform vector similarity search using Python calculations.
1017 Note: This implementation reads all records from disk to perform
1018 the search locally. For better performance with large datasets,
1019 consider using SQLite or a dedicated vector database.
1020 """
1021 return self.python_vector_search_sync(
1022 query_vector=query_vector,
1023 vector_field=vector_field,
1024 k=k,
1025 filter=filter,
1026 metric=metric,
1027 **kwargs
1028 )
1030 def close(self) -> None:
1031 """Close the database and clean up temporary files if needed."""
1032 # Clean up temporary file if it was created
1033 if getattr(self, '_is_temp_file', False) and self.filepath:
1034 try:
1035 if os.path.exists(self.filepath):
1036 Path(self.filepath).unlink()
1037 # Also remove lock file if it exists
1038 lock_file = self.filepath + ".lock"
1039 if os.path.exists(lock_file):
1040 Path(lock_file).unlink()
1041 except OSError:
1042 pass # Best effort cleanup