Coverage for src/dataknobs_data/backends/file.py: 17%
578 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1"""File-based database backend implementation."""
3from __future__ import annotations
5import asyncio
6import csv
7import gzip
8import json
9import os
10import platform
11import tempfile
12import threading
13import time
14import uuid
15from pathlib import Path
16from typing import Any, TYPE_CHECKING
18from dataknobs_config import ConfigurableBase
20from ..database import AsyncDatabase, SyncDatabase
21from ..query import Query
22from ..records import Record
23from ..streaming import AsyncStreamingMixin, StreamConfig, StreamingMixin, StreamResult
24from ..vector import VectorOperationsMixin
25from ..vector.bulk_embed_mixin import BulkEmbedMixin
26from ..vector.python_vector_search import PythonVectorSearchMixin
27from .sqlite_mixins import SQLiteVectorSupport
28from .vector_config_mixin import VectorConfigMixin
30if TYPE_CHECKING:
31 from collections.abc import AsyncIterator, Iterator
34class FileLock:
35 """Cross-platform file locking."""
37 def __init__(self, filepath: str):
38 self.filepath = filepath
39 self.lockfile = filepath + ".lock"
40 self.lock_handle = None
42 def acquire(self):
43 """Acquire the file lock."""
44 if platform.system() == "Windows":
45 import msvcrt
47 while True:
48 try:
49 self.lock_handle = open(self.lockfile, "wb")
50 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_NBLCK, 1)
51 break
52 except OSError:
53 if self.lock_handle:
54 self.lock_handle.close() # type: ignore[unreachable]
55 import time
57 time.sleep(0.01)
58 else:
59 import fcntl
61 self.lock_handle = open(self.lockfile, "wb")
62 fcntl.lockf(self.lock_handle, fcntl.LOCK_EX)
64 def release(self):
65 """Release the file lock."""
66 if self.lock_handle:
67 if platform.system() == "Windows": # type: ignore[unreachable]
68 import msvcrt
70 try:
71 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_UNLCK, 1)
72 except OSError:
73 pass
74 self.lock_handle.close()
75 try:
76 os.remove(self.lockfile)
77 except (OSError, FileNotFoundError):
78 pass
80 def __enter__(self):
81 self.acquire()
82 return self
84 def __exit__(self, exc_type, exc_val, exc_tb):
85 self.release()
88class FileFormat:
89 """Base class for file format handlers."""
91 @staticmethod
92 def load(filepath: str) -> dict[str, dict[str, Any]]:
93 """Load data from file."""
94 raise NotImplementedError
96 @staticmethod
97 def save(filepath: str, data: dict[str, dict[str, Any]]):
98 """Save data to file."""
99 raise NotImplementedError
102class JSONFormat(FileFormat):
103 """JSON file format handler."""
105 @staticmethod
106 def load(filepath: str) -> dict[str, dict[str, Any]]:
107 """Load data from JSON file."""
108 if not os.path.exists(filepath):
109 return {}
111 # Check if file is empty
112 if os.path.getsize(filepath) == 0:
113 return {}
115 try:
116 if filepath.endswith(".gz"):
117 try:
118 with gzip.open(filepath, "rt", encoding="utf-8") as f:
119 content = f.read()
120 if not content.strip():
121 return {}
122 data = json.loads(content)
123 except (gzip.BadGzipFile, OSError):
124 # File has .gz extension but isn't gzipped, treat as regular file
125 with open(filepath, encoding="utf-8") as f:
126 content = f.read()
127 if not content.strip():
128 return {}
129 data = json.loads(content)
130 else:
131 with open(filepath, encoding="utf-8") as f:
132 content = f.read()
133 if not content.strip():
134 return {}
135 data = json.loads(content)
137 return data
138 except json.JSONDecodeError:
139 return {}
141 @staticmethod
142 def save(filepath: str, data: dict[str, dict[str, Any]]):
143 """Save data to JSON file."""
144 if filepath.endswith(".gz"):
145 with gzip.open(filepath, "wt", encoding="utf-8") as f:
146 json.dump(data, f, indent=2, ensure_ascii=False)
147 else:
148 with open(filepath, "w", encoding="utf-8") as f:
149 json.dump(data, f, indent=2, ensure_ascii=False)
152class CSVFormat(FileFormat):
153 """CSV file format handler."""
155 @staticmethod
156 def load(filepath: str) -> dict[str, dict[str, Any]]:
157 """Load data from CSV file."""
158 if not os.path.exists(filepath):
159 return {}
161 # Check if file is empty
162 if os.path.getsize(filepath) == 0:
163 return {}
165 data = {}
166 try:
167 if filepath.endswith(".gz"):
168 with gzip.open(filepath, "rt", encoding="utf-8") as f:
169 reader = csv.DictReader(f)
170 for row in reader:
171 if "__id__" in row:
172 record_id = row.pop("__id__")
173 # Try to deserialize JSON strings back to objects
174 fields = {}
175 for key, value in row.items():
176 if value and isinstance(value, str):
177 # Try to parse as JSON if it looks like JSON
178 if (value.startswith('{') and value.endswith('}')) or \
179 (value.startswith('[') and value.endswith(']')):
180 try:
181 fields[key] = json.loads(value)
182 except json.JSONDecodeError:
183 fields[key] = value
184 else:
185 fields[key] = value
186 else:
187 fields[key] = value
188 data[record_id] = {"fields": fields}
189 else:
190 with open(filepath, encoding="utf-8") as f:
191 reader = csv.DictReader(f)
192 for row in reader:
193 if "__id__" in row:
194 record_id = row.pop("__id__")
195 # Try to deserialize JSON strings back to objects
196 fields = {}
197 for key, value in row.items():
198 if value and isinstance(value, str):
199 # Try to parse as JSON if it looks like JSON
200 if (value.startswith('{') and value.endswith('}')) or \
201 (value.startswith('[') and value.endswith(']')):
202 try:
203 fields[key] = json.loads(value)
204 except json.JSONDecodeError:
205 fields[key] = value
206 else:
207 fields[key] = value
208 else:
209 fields[key] = value
210 data[record_id] = {"fields": fields}
211 except (OSError, csv.Error):
212 return {}
214 return data
216 @staticmethod
217 def save(filepath: str, data: dict[str, dict[str, Any]]):
218 """Save data to CSV file."""
219 if not data:
220 if filepath.endswith(".gz"):
221 with gzip.open(filepath, "wt", encoding="utf-8") as f:
222 f.write("")
223 else:
224 with open(filepath, "w", encoding="utf-8") as f:
225 f.write("")
226 return
228 # Extract all field names and prepare flattened data
229 all_fields = set()
230 flattened_data = {}
231 for record_id, record_data in data.items():
232 if "fields" in record_data:
233 # Flatten field values for CSV format
234 flat_fields = {}
235 for field_name, field_data in record_data["fields"].items():
236 # Handle both full field dicts and simple values
237 if isinstance(field_data, dict) and "value" in field_data:
238 value = field_data["value"]
239 else:
240 value = field_data
242 # Serialize complex types as JSON strings
243 if isinstance(value, (dict, list)):
244 flat_fields[field_name] = json.dumps(value)
245 else:
246 flat_fields[field_name] = value
247 all_fields.add(field_name)
248 flattened_data[record_id] = flat_fields
250 fieldnames = ["__id__"] + sorted(list(all_fields))
252 if filepath.endswith(".gz"):
253 with gzip.open(filepath, "wt", encoding="utf-8", newline="") as f:
254 writer = csv.DictWriter(f, fieldnames=fieldnames)
255 writer.writeheader()
256 for record_id, fields in flattened_data.items():
257 row = {"__id__": record_id}
258 row.update(fields)
259 writer.writerow(row)
260 else:
261 with open(filepath, "w", encoding="utf-8", newline="") as f:
262 writer = csv.DictWriter(f, fieldnames=fieldnames)
263 writer.writeheader()
264 for record_id, fields in flattened_data.items():
265 row = {"__id__": record_id}
266 row.update(fields)
267 writer.writerow(row)
270class ParquetFormat(FileFormat):
271 """Parquet file format handler."""
273 @staticmethod
274 def load(filepath: str) -> dict[str, dict[str, Any]]:
275 """Load data from Parquet file."""
276 if not os.path.exists(filepath):
277 return {}
279 try:
280 import pandas as pd
282 df = pd.read_parquet(filepath)
283 data = {}
285 for idx, row in df.iterrows():
286 row_dict = row.to_dict()
287 if "__id__" in row_dict:
288 record_id = row_dict.pop("__id__")
289 else:
290 record_id = str(idx)
292 # Remove NaN values
293 fields = {k: v for k, v in row_dict.items() if pd.notna(v)}
294 data[record_id] = {"fields": fields}
296 return data
297 except ImportError as e:
298 raise ImportError("Parquet support requires pandas and pyarrow packages") from e
300 @staticmethod
301 def save(filepath: str, data: dict[str, dict[str, Any]]):
302 """Save data to Parquet file."""
303 try:
304 import pandas as pd
306 if not data:
307 # Create empty DataFrame
308 df = pd.DataFrame()
309 else:
310 rows = []
311 for record_id, record_data in data.items():
312 row = {"__id__": record_id}
313 if "fields" in record_data:
314 # Flatten field values for Parquet format
315 for field_name, field_data in record_data["fields"].items():
316 # Handle both full field dicts and simple values
317 if isinstance(field_data, dict) and "value" in field_data:
318 row[field_name] = field_data["value"]
319 else:
320 row[field_name] = field_data
321 rows.append(row)
323 df = pd.DataFrame(rows)
325 df.to_parquet(filepath, index=False, compression="snappy")
326 except ImportError as e:
327 raise ImportError("Parquet support requires pandas and pyarrow packages") from e
330class AsyncFileDatabase( # type: ignore[misc]
331 AsyncDatabase,
332 AsyncStreamingMixin,
333 ConfigurableBase,
334 VectorConfigMixin,
335 SQLiteVectorSupport,
336 PythonVectorSearchMixin,
337 BulkEmbedMixin,
338 VectorOperationsMixin
339):
340 """Async file-based database implementation."""
342 FORMAT_HANDLERS = {
343 ".json": JSONFormat,
344 ".csv": CSVFormat,
345 ".tsv": CSVFormat,
346 ".parquet": ParquetFormat,
347 ".pq": ParquetFormat,
348 }
350 def __init__(self, config: dict[str, Any] | None = None):
351 super().__init__(config)
352 self.filepath = self.config.get("path", "data.json")
353 self.format = self.config.get("format")
354 self.compression = self.config.get("compression", None)
355 self._lock = asyncio.Lock()
356 self._file_lock = FileLock(self.filepath)
358 # Detect format from file extension if not specified
359 if not self.format:
360 path = Path(self.filepath)
361 # Check for compression
362 if path.suffix == ".gz":
363 self.compression = "gzip"
364 path = Path(path.stem)
366 ext = path.suffix.lower()
367 if ext in self.FORMAT_HANDLERS:
368 self.format = ext.lstrip(".")
369 else:
370 self.format = "json" # Default to JSON
372 # Apply compression to filepath if specified
373 if self.compression == "gzip" and not self.filepath.endswith(".gz"):
374 self.filepath += ".gz"
376 # Get the appropriate format handler
377 ext = f".{self.format}"
378 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat)
380 # Initialize vector support
381 self._parse_vector_config(config or {})
382 self._init_vector_state()
384 @classmethod
385 def from_config(cls, config: dict) -> AsyncFileDatabase:
386 """Create from config dictionary."""
387 return cls(config)
389 def _generate_id(self) -> str:
390 """Generate a unique ID for a record."""
391 return str(uuid.uuid4())
393 async def _load_data(self) -> dict[str, Record]:
394 """Load all data from file."""
395 with self._file_lock:
396 raw_data = self.handler.load(self.filepath)
397 data = {}
398 for record_id, record_dict in raw_data.items():
399 data[record_id] = Record.from_dict(record_dict)
400 return data
402 async def _save_data(self, data: dict[str, Record]):
403 """Save all data to file atomically."""
404 # Convert records to dictionaries
405 raw_data = {}
406 for record_id, record in data.items():
407 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False)
409 # Write to temporary file first
410 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".")
411 os.close(temp_fd)
413 try:
414 with self._file_lock:
415 self.handler.save(temp_path, raw_data)
416 # Atomic rename
417 os.replace(temp_path, self.filepath)
418 except Exception:
419 # Clean up temp file on error
420 if os.path.exists(temp_path):
421 os.remove(temp_path)
422 raise
424 async def create(self, record: Record) -> str:
425 """Create a new record in the file."""
426 async with self._lock:
427 data = await self._load_data()
428 # Use centralized method to prepare record
429 record_copy, storage_id = self._prepare_record_for_storage(record)
430 data[storage_id] = record_copy
431 await self._save_data(data)
432 return storage_id
434 async def read(self, id: str) -> Record | None:
435 """Read a record from the file."""
436 async with self._lock:
437 data = await self._load_data()
438 record = data.get(id)
439 # Use centralized method to prepare record
440 return self._prepare_record_from_storage(record, id)
442 async def update(self, id: str, record: Record) -> bool:
443 """Update a record in the file."""
444 async with self._lock:
445 data = await self._load_data()
446 if id in data:
447 data[id] = record.copy(deep=True)
448 await self._save_data(data)
449 return True
450 return False
452 async def delete(self, id: str) -> bool:
453 """Delete a record from the file."""
454 async with self._lock:
455 data = await self._load_data()
456 if id in data:
457 del data[id]
458 await self._save_data(data)
459 return True
460 return False
462 async def exists(self, id: str) -> bool:
463 """Check if a record exists in the file."""
464 async with self._lock:
465 data = await self._load_data()
466 return id in data
468 async def upsert(self, id: str, record: Record) -> str:
469 """Update or insert a record with the specified ID."""
470 async with self._lock:
471 data = await self._load_data()
472 data[id] = record.copy(deep=True)
473 await self._save_data(data)
474 return id
476 async def search(self, query: Query) -> list[Record]:
477 """Search for records matching the query."""
478 async with self._lock:
479 data = await self._load_data()
480 results = []
482 for record_id, record in data.items():
483 # Apply filters
484 matches = True
485 for filter in query.filters:
486 field_value = record.get_value(filter.field)
487 if not filter.matches(field_value):
488 matches = False
489 break
491 if matches:
492 results.append((record_id, record))
494 # Use the helper method from base class
495 return self._process_search_results(results, query, deep_copy=True)
497 async def _count_all(self) -> int:
498 """Count all records in the file."""
499 async with self._lock:
500 data = await self._load_data()
501 return len(data)
503 async def clear(self) -> int:
504 """Clear all records from the file."""
505 async with self._lock:
506 data = await self._load_data()
507 count = len(data)
508 await self._save_data({})
509 return count
511 async def create_batch(self, records: list[Record]) -> list[str]:
512 """Create multiple records efficiently."""
513 async with self._lock:
514 data = await self._load_data()
515 ids = []
516 for record in records:
517 record_id = self._generate_id()
518 data[record_id] = record.copy(deep=True)
519 ids.append(record_id)
520 await self._save_data(data)
521 return ids
523 async def read_batch(self, ids: list[str]) -> list[Record | None]:
524 """Read multiple records efficiently."""
525 async with self._lock:
526 data = await self._load_data()
527 results = []
528 for record_id in ids:
529 record = data.get(record_id)
530 results.append(record.copy(deep=True) if record else None)
531 return results
533 async def delete_batch(self, ids: list[str]) -> list[bool]:
534 """Delete multiple records efficiently."""
535 async with self._lock:
536 data = await self._load_data()
537 results = []
538 modified = False
539 for record_id in ids:
540 if record_id in data:
541 del data[record_id]
542 results.append(True)
543 modified = True
544 else:
545 results.append(False)
547 if modified:
548 await self._save_data(data)
550 return results
552 async def stream_read(
553 self,
554 query: Query | None = None,
555 config: StreamConfig | None = None
556 ) -> AsyncIterator[Record]:
557 """Stream records from file."""
558 # For file backend, we can use the default implementation
559 # since we need to load all data anyway
560 config = config or StreamConfig()
562 # Use search to get all matching records
563 if query:
564 records = await self.search(query)
565 else:
566 records = await self.search(Query())
568 # Yield records in batches for consistency
569 for i in range(0, len(records), config.batch_size):
570 batch = records[i:i + config.batch_size]
571 for record in batch:
572 yield record
574 async def stream_write(
575 self,
576 records: AsyncIterator[Record],
577 config: StreamConfig | None = None
578 ) -> StreamResult:
579 """Stream records into file."""
580 # Use the default implementation from mixin
581 return await self._default_stream_write(records, config)
583 async def vector_search(
584 self,
585 query_vector,
586 vector_field: str = "embedding",
587 k: int = 10,
588 filter=None,
589 metric=None,
590 **kwargs
591 ):
592 """Perform vector similarity search using Python calculations.
594 Note: This implementation reads all records from disk to perform
595 the search locally. For better performance with large datasets,
596 consider using SQLite or a dedicated vector database.
597 """
598 return await self.python_vector_search_async(
599 query_vector=query_vector,
600 vector_field=vector_field,
601 k=k,
602 filter=filter,
603 metric=metric,
604 **kwargs
605 )
608class SyncFileDatabase( # type: ignore[misc]
609 SyncDatabase,
610 StreamingMixin,
611 ConfigurableBase,
612 VectorConfigMixin,
613 SQLiteVectorSupport,
614 PythonVectorSearchMixin,
615 BulkEmbedMixin,
616 VectorOperationsMixin
617):
618 """Synchronous file-based database implementation."""
620 FORMAT_HANDLERS = {
621 ".json": JSONFormat,
622 ".csv": CSVFormat,
623 ".tsv": CSVFormat,
624 ".parquet": ParquetFormat,
625 ".pq": ParquetFormat,
626 }
628 def __init__(self, config: dict[str, Any] | None = None):
629 super().__init__(config)
630 self.filepath = self.config.get("path", "data.json")
631 self.format = self.config.get("format")
632 self.compression = self.config.get("compression", None)
633 self._lock = threading.RLock()
634 self._file_lock = FileLock(self.filepath)
636 # Detect format from file extension if not specified
637 if not self.format:
638 path = Path(self.filepath)
639 # Check for compression
640 if path.suffix == ".gz":
641 self.compression = "gzip"
642 path = Path(path.stem)
644 ext = path.suffix.lower()
645 if ext in self.FORMAT_HANDLERS:
646 self.format = ext.lstrip(".")
647 else:
648 self.format = "json" # Default to JSON
650 # Apply compression to filepath if specified
651 if self.compression == "gzip" and not self.filepath.endswith(".gz"):
652 self.filepath += ".gz"
654 # Get the appropriate format handler
655 ext = f".{self.format}"
656 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat)
658 # Initialize vector support
659 self._parse_vector_config(config or {})
660 self._init_vector_state()
662 @classmethod
663 def from_config(cls, config: dict) -> SyncFileDatabase:
664 """Create from config dictionary."""
665 return cls(config)
667 def _generate_id(self) -> str:
668 """Generate a unique ID for a record."""
669 return str(uuid.uuid4())
671 def _load_data(self) -> dict[str, Record]:
672 """Load all data from file."""
673 with self._file_lock:
674 raw_data = self.handler.load(self.filepath)
675 data = {}
676 for record_id, record_dict in raw_data.items():
677 data[record_id] = Record.from_dict(record_dict)
678 return data
680 def _save_data(self, data: dict[str, Record]):
681 """Save all data to file atomically."""
682 # Convert records to dictionaries
683 raw_data = {}
684 for record_id, record in data.items():
685 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False)
687 # Write to temporary file first
688 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".")
689 os.close(temp_fd)
691 try:
692 with self._file_lock:
693 self.handler.save(temp_path, raw_data)
694 # Atomic rename
695 os.replace(temp_path, self.filepath)
696 except Exception:
697 # Clean up temp file on error
698 if os.path.exists(temp_path):
699 os.remove(temp_path)
700 raise
702 def _do_set_data(self, data: dict[str, Record], record: Record) -> str:
703 """Ensure record has a storage ID, set data[id]=record.copy() and return the ID"""
704 # Use centralized method to prepare record
705 record_copy, storage_id = self._prepare_record_for_storage(record)
707 # Store the record
708 data[storage_id] = record_copy
709 return storage_id
711 def create(self, record: Record) -> str:
712 """Create a new record in the file."""
713 with self._lock:
714 data = self._load_data()
715 # Use record's ID if it has one, otherwise generate a new one
716 record_id = self._do_set_data(data, record)
717 self._save_data(data)
718 return record_id
720 def read(self, id: str) -> Record | None:
721 """Read a record from the file."""
722 with self._lock:
723 data = self._load_data()
724 record = data.get(id)
725 # Use centralized method to prepare record
726 return self._prepare_record_from_storage(record, id)
728 def update(self, id: str, record: Record) -> bool:
729 """Update a record in the file."""
730 with self._lock:
731 data = self._load_data()
732 if id in data:
733 data[id] = record.copy(deep=True)
734 self._save_data(data)
735 return True
736 return False
738 def delete(self, id: str) -> bool:
739 """Delete a record from the file."""
740 with self._lock:
741 data = self._load_data()
742 if id in data:
743 del data[id]
744 self._save_data(data)
745 return True
746 return False
748 def exists(self, id: str) -> bool:
749 """Check if a record exists in the file."""
750 with self._lock:
751 data = self._load_data()
752 return id in data
754 def upsert(self, id: str, record: Record) -> str:
755 """Update or insert a record with the specified ID."""
756 with self._lock:
757 data = self._load_data()
758 data[id] = record.copy(deep=True)
759 self._save_data(data)
760 return id
762 def search(self, query: Query) -> list[Record]:
763 """Search for records matching the query."""
764 with self._lock:
765 data = self._load_data()
766 results = []
768 for record_id, record in data.items():
769 # Apply filters
770 matches = True
771 for filter in query.filters:
772 field_value = record.get_value(filter.field)
773 if not filter.matches(field_value):
774 matches = False
775 break
777 if matches:
778 results.append((record_id, record))
780 # Use the helper method from base class
781 return self._process_search_results(results, query, deep_copy=True)
783 def _count_all(self) -> int:
784 """Count all records in the file."""
785 with self._lock:
786 data = self._load_data()
787 return len(data)
789 def clear(self) -> int:
790 """Clear all records from the file."""
791 with self._lock:
792 data = self._load_data()
793 count = len(data)
794 self._save_data({})
795 return count
797 def create_batch(self, records: list[Record]) -> list[str]:
798 """Create multiple records efficiently."""
799 with self._lock:
800 data = self._load_data()
801 ids = []
802 for record in records:
803 record_id = self._do_set_data(data, record)
804 ids.append(record_id)
805 self._save_data(data)
806 return ids
808 def read_batch(self, ids: list[str]) -> list[Record | None]:
809 """Read multiple records efficiently."""
810 with self._lock:
811 data = self._load_data()
812 results = []
813 for record_id in ids:
814 record = data.get(record_id)
815 results.append(record.copy(deep=True) if record else None)
816 return results
818 def delete_batch(self, ids: list[str]) -> list[bool]:
819 """Delete multiple records efficiently."""
820 with self._lock:
821 data = self._load_data()
822 results = []
823 modified = False
824 for record_id in ids:
825 if record_id in data:
826 del data[record_id]
827 results.append(True)
828 modified = True
829 else:
830 results.append(False)
832 if modified:
833 self._save_data(data)
835 return results
837 def stream_read(
838 self,
839 query: Query | None = None,
840 config: StreamConfig | None = None
841 ) -> Iterator[Record]:
842 """Stream records from file."""
843 # For file backend, we can use the default implementation
844 # since we need to load all data anyway
845 config = config or StreamConfig()
847 # Use search to get all matching records
848 if query:
849 records = self.search(query)
850 else:
851 records = self.search(Query())
853 # Yield records in batches for consistency
854 for i in range(0, len(records), config.batch_size):
855 batch = records[i:i + config.batch_size]
856 for record in batch:
857 yield record
859 def stream_write(
860 self,
861 records: Iterator[Record],
862 config: StreamConfig | None = None
863 ) -> StreamResult:
864 """Stream records into file."""
865 # Use the default implementation
866 config = config or StreamConfig()
867 result = StreamResult()
868 start_time = time.time()
869 quitting = False
871 def do_write_batch(batch: list) -> bool:
872 """Write batch with individual retries, return False to quit"""
873 retval = True
874 try:
875 ids = self.create_batch(batch)
876 result.successful += len(ids)
877 result.total_processed += len(batch)
878 except Exception:
879 # Try creating each item again and catch specific error items
880 for rec in batch:
881 result.total_processed += 1
882 try:
883 self.create(rec)
884 result.successful += 1
885 except Exception as e:
886 # This item failed again
887 result.failed += 1
888 result.add_error(None, e)
889 if config.on_error:
890 if not config.on_error(e, rec):
891 retval = False
892 break
893 else:
894 # Without "on_error", quit streaming
895 retval = False
896 break
897 return retval
899 batch = []
900 for record in records:
901 batch.append(record)
903 if len(batch) >= config.batch_size:
904 # Write batch
905 quitting = not do_write_batch(batch)
906 if quitting:
907 # Got signal to quit
908 break
909 batch = []
911 # Write remaining batch
912 if batch and not quitting:
913 do_write_batch(batch)
915 result.duration = time.time() - start_time
916 return result
918 def vector_search(
919 self,
920 query_vector,
921 vector_field: str = "embedding",
922 k: int = 10,
923 filter=None,
924 metric=None,
925 **kwargs
926 ):
927 """Perform vector similarity search using Python calculations.
929 Note: This implementation reads all records from disk to perform
930 the search locally. For better performance with large datasets,
931 consider using SQLite or a dedicated vector database.
932 """
933 return self.python_vector_search_sync(
934 query_vector=query_vector,
935 vector_field=vector_field,
936 k=k,
937 filter=filter,
938 metric=metric,
939 **kwargs
940 )