Coverage for src/dataknobs_data/backends/file.py: 17%

578 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1"""File-based database backend implementation.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import csv 

7import gzip 

8import json 

9import os 

10import platform 

11import tempfile 

12import threading 

13import time 

14import uuid 

15from pathlib import Path 

16from typing import Any, TYPE_CHECKING 

17 

18from dataknobs_config import ConfigurableBase 

19 

20from ..database import AsyncDatabase, SyncDatabase 

21from ..query import Query 

22from ..records import Record 

23from ..streaming import AsyncStreamingMixin, StreamConfig, StreamingMixin, StreamResult 

24from ..vector import VectorOperationsMixin 

25from ..vector.bulk_embed_mixin import BulkEmbedMixin 

26from ..vector.python_vector_search import PythonVectorSearchMixin 

27from .sqlite_mixins import SQLiteVectorSupport 

28from .vector_config_mixin import VectorConfigMixin 

29 

30if TYPE_CHECKING: 

31 from collections.abc import AsyncIterator, Iterator 

32 

33 

34class FileLock: 

35 """Cross-platform file locking.""" 

36 

37 def __init__(self, filepath: str): 

38 self.filepath = filepath 

39 self.lockfile = filepath + ".lock" 

40 self.lock_handle = None 

41 

42 def acquire(self): 

43 """Acquire the file lock.""" 

44 if platform.system() == "Windows": 

45 import msvcrt 

46 

47 while True: 

48 try: 

49 self.lock_handle = open(self.lockfile, "wb") 

50 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_NBLCK, 1) 

51 break 

52 except OSError: 

53 if self.lock_handle: 

54 self.lock_handle.close() # type: ignore[unreachable] 

55 import time 

56 

57 time.sleep(0.01) 

58 else: 

59 import fcntl 

60 

61 self.lock_handle = open(self.lockfile, "wb") 

62 fcntl.lockf(self.lock_handle, fcntl.LOCK_EX) 

63 

64 def release(self): 

65 """Release the file lock.""" 

66 if self.lock_handle: 

67 if platform.system() == "Windows": # type: ignore[unreachable] 

68 import msvcrt 

69 

70 try: 

71 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_UNLCK, 1) 

72 except OSError: 

73 pass 

74 self.lock_handle.close() 

75 try: 

76 os.remove(self.lockfile) 

77 except (OSError, FileNotFoundError): 

78 pass 

79 

80 def __enter__(self): 

81 self.acquire() 

82 return self 

83 

84 def __exit__(self, exc_type, exc_val, exc_tb): 

85 self.release() 

86 

87 

88class FileFormat: 

89 """Base class for file format handlers.""" 

90 

91 @staticmethod 

92 def load(filepath: str) -> dict[str, dict[str, Any]]: 

93 """Load data from file.""" 

94 raise NotImplementedError 

95 

96 @staticmethod 

97 def save(filepath: str, data: dict[str, dict[str, Any]]): 

98 """Save data to file.""" 

99 raise NotImplementedError 

100 

101 

102class JSONFormat(FileFormat): 

103 """JSON file format handler.""" 

104 

105 @staticmethod 

106 def load(filepath: str) -> dict[str, dict[str, Any]]: 

107 """Load data from JSON file.""" 

108 if not os.path.exists(filepath): 

109 return {} 

110 

111 # Check if file is empty 

112 if os.path.getsize(filepath) == 0: 

113 return {} 

114 

115 try: 

116 if filepath.endswith(".gz"): 

117 try: 

118 with gzip.open(filepath, "rt", encoding="utf-8") as f: 

119 content = f.read() 

120 if not content.strip(): 

121 return {} 

122 data = json.loads(content) 

123 except (gzip.BadGzipFile, OSError): 

124 # File has .gz extension but isn't gzipped, treat as regular file 

125 with open(filepath, encoding="utf-8") as f: 

126 content = f.read() 

127 if not content.strip(): 

128 return {} 

129 data = json.loads(content) 

130 else: 

131 with open(filepath, encoding="utf-8") as f: 

132 content = f.read() 

133 if not content.strip(): 

134 return {} 

135 data = json.loads(content) 

136 

137 return data 

138 except json.JSONDecodeError: 

139 return {} 

140 

141 @staticmethod 

142 def save(filepath: str, data: dict[str, dict[str, Any]]): 

143 """Save data to JSON file.""" 

144 if filepath.endswith(".gz"): 

145 with gzip.open(filepath, "wt", encoding="utf-8") as f: 

146 json.dump(data, f, indent=2, ensure_ascii=False) 

147 else: 

148 with open(filepath, "w", encoding="utf-8") as f: 

149 json.dump(data, f, indent=2, ensure_ascii=False) 

150 

151 

152class CSVFormat(FileFormat): 

153 """CSV file format handler.""" 

154 

155 @staticmethod 

156 def load(filepath: str) -> dict[str, dict[str, Any]]: 

157 """Load data from CSV file.""" 

158 if not os.path.exists(filepath): 

159 return {} 

160 

161 # Check if file is empty 

162 if os.path.getsize(filepath) == 0: 

163 return {} 

164 

165 data = {} 

166 try: 

167 if filepath.endswith(".gz"): 

168 with gzip.open(filepath, "rt", encoding="utf-8") as f: 

169 reader = csv.DictReader(f) 

170 for row in reader: 

171 if "__id__" in row: 

172 record_id = row.pop("__id__") 

173 # Try to deserialize JSON strings back to objects 

174 fields = {} 

175 for key, value in row.items(): 

176 if value and isinstance(value, str): 

177 # Try to parse as JSON if it looks like JSON 

178 if (value.startswith('{') and value.endswith('}')) or \ 

179 (value.startswith('[') and value.endswith(']')): 

180 try: 

181 fields[key] = json.loads(value) 

182 except json.JSONDecodeError: 

183 fields[key] = value 

184 else: 

185 fields[key] = value 

186 else: 

187 fields[key] = value 

188 data[record_id] = {"fields": fields} 

189 else: 

190 with open(filepath, encoding="utf-8") as f: 

191 reader = csv.DictReader(f) 

192 for row in reader: 

193 if "__id__" in row: 

194 record_id = row.pop("__id__") 

195 # Try to deserialize JSON strings back to objects 

196 fields = {} 

197 for key, value in row.items(): 

198 if value and isinstance(value, str): 

199 # Try to parse as JSON if it looks like JSON 

200 if (value.startswith('{') and value.endswith('}')) or \ 

201 (value.startswith('[') and value.endswith(']')): 

202 try: 

203 fields[key] = json.loads(value) 

204 except json.JSONDecodeError: 

205 fields[key] = value 

206 else: 

207 fields[key] = value 

208 else: 

209 fields[key] = value 

210 data[record_id] = {"fields": fields} 

211 except (OSError, csv.Error): 

212 return {} 

213 

214 return data 

215 

216 @staticmethod 

217 def save(filepath: str, data: dict[str, dict[str, Any]]): 

218 """Save data to CSV file.""" 

219 if not data: 

220 if filepath.endswith(".gz"): 

221 with gzip.open(filepath, "wt", encoding="utf-8") as f: 

222 f.write("") 

223 else: 

224 with open(filepath, "w", encoding="utf-8") as f: 

225 f.write("") 

226 return 

227 

228 # Extract all field names and prepare flattened data 

229 all_fields = set() 

230 flattened_data = {} 

231 for record_id, record_data in data.items(): 

232 if "fields" in record_data: 

233 # Flatten field values for CSV format 

234 flat_fields = {} 

235 for field_name, field_data in record_data["fields"].items(): 

236 # Handle both full field dicts and simple values 

237 if isinstance(field_data, dict) and "value" in field_data: 

238 value = field_data["value"] 

239 else: 

240 value = field_data 

241 

242 # Serialize complex types as JSON strings 

243 if isinstance(value, (dict, list)): 

244 flat_fields[field_name] = json.dumps(value) 

245 else: 

246 flat_fields[field_name] = value 

247 all_fields.add(field_name) 

248 flattened_data[record_id] = flat_fields 

249 

250 fieldnames = ["__id__"] + sorted(list(all_fields)) 

251 

252 if filepath.endswith(".gz"): 

253 with gzip.open(filepath, "wt", encoding="utf-8", newline="") as f: 

254 writer = csv.DictWriter(f, fieldnames=fieldnames) 

255 writer.writeheader() 

256 for record_id, fields in flattened_data.items(): 

257 row = {"__id__": record_id} 

258 row.update(fields) 

259 writer.writerow(row) 

260 else: 

261 with open(filepath, "w", encoding="utf-8", newline="") as f: 

262 writer = csv.DictWriter(f, fieldnames=fieldnames) 

263 writer.writeheader() 

264 for record_id, fields in flattened_data.items(): 

265 row = {"__id__": record_id} 

266 row.update(fields) 

267 writer.writerow(row) 

268 

269 

270class ParquetFormat(FileFormat): 

271 """Parquet file format handler.""" 

272 

273 @staticmethod 

274 def load(filepath: str) -> dict[str, dict[str, Any]]: 

275 """Load data from Parquet file.""" 

276 if not os.path.exists(filepath): 

277 return {} 

278 

279 try: 

280 import pandas as pd 

281 

282 df = pd.read_parquet(filepath) 

283 data = {} 

284 

285 for idx, row in df.iterrows(): 

286 row_dict = row.to_dict() 

287 if "__id__" in row_dict: 

288 record_id = row_dict.pop("__id__") 

289 else: 

290 record_id = str(idx) 

291 

292 # Remove NaN values 

293 fields = {k: v for k, v in row_dict.items() if pd.notna(v)} 

294 data[record_id] = {"fields": fields} 

295 

296 return data 

297 except ImportError as e: 

298 raise ImportError("Parquet support requires pandas and pyarrow packages") from e 

299 

300 @staticmethod 

301 def save(filepath: str, data: dict[str, dict[str, Any]]): 

302 """Save data to Parquet file.""" 

303 try: 

304 import pandas as pd 

305 

306 if not data: 

307 # Create empty DataFrame 

308 df = pd.DataFrame() 

309 else: 

310 rows = [] 

311 for record_id, record_data in data.items(): 

312 row = {"__id__": record_id} 

313 if "fields" in record_data: 

314 # Flatten field values for Parquet format 

315 for field_name, field_data in record_data["fields"].items(): 

316 # Handle both full field dicts and simple values 

317 if isinstance(field_data, dict) and "value" in field_data: 

318 row[field_name] = field_data["value"] 

319 else: 

320 row[field_name] = field_data 

321 rows.append(row) 

322 

323 df = pd.DataFrame(rows) 

324 

325 df.to_parquet(filepath, index=False, compression="snappy") 

326 except ImportError as e: 

327 raise ImportError("Parquet support requires pandas and pyarrow packages") from e 

328 

329 

330class AsyncFileDatabase( # type: ignore[misc] 

331 AsyncDatabase, 

332 AsyncStreamingMixin, 

333 ConfigurableBase, 

334 VectorConfigMixin, 

335 SQLiteVectorSupport, 

336 PythonVectorSearchMixin, 

337 BulkEmbedMixin, 

338 VectorOperationsMixin 

339): 

340 """Async file-based database implementation.""" 

341 

342 FORMAT_HANDLERS = { 

343 ".json": JSONFormat, 

344 ".csv": CSVFormat, 

345 ".tsv": CSVFormat, 

346 ".parquet": ParquetFormat, 

347 ".pq": ParquetFormat, 

348 } 

349 

350 def __init__(self, config: dict[str, Any] | None = None): 

351 super().__init__(config) 

352 self.filepath = self.config.get("path", "data.json") 

353 self.format = self.config.get("format") 

354 self.compression = self.config.get("compression", None) 

355 self._lock = asyncio.Lock() 

356 self._file_lock = FileLock(self.filepath) 

357 

358 # Detect format from file extension if not specified 

359 if not self.format: 

360 path = Path(self.filepath) 

361 # Check for compression 

362 if path.suffix == ".gz": 

363 self.compression = "gzip" 

364 path = Path(path.stem) 

365 

366 ext = path.suffix.lower() 

367 if ext in self.FORMAT_HANDLERS: 

368 self.format = ext.lstrip(".") 

369 else: 

370 self.format = "json" # Default to JSON 

371 

372 # Apply compression to filepath if specified 

373 if self.compression == "gzip" and not self.filepath.endswith(".gz"): 

374 self.filepath += ".gz" 

375 

376 # Get the appropriate format handler 

377 ext = f".{self.format}" 

378 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat) 

379 

380 # Initialize vector support 

381 self._parse_vector_config(config or {}) 

382 self._init_vector_state() 

383 

384 @classmethod 

385 def from_config(cls, config: dict) -> AsyncFileDatabase: 

386 """Create from config dictionary.""" 

387 return cls(config) 

388 

389 def _generate_id(self) -> str: 

390 """Generate a unique ID for a record.""" 

391 return str(uuid.uuid4()) 

392 

393 async def _load_data(self) -> dict[str, Record]: 

394 """Load all data from file.""" 

395 with self._file_lock: 

396 raw_data = self.handler.load(self.filepath) 

397 data = {} 

398 for record_id, record_dict in raw_data.items(): 

399 data[record_id] = Record.from_dict(record_dict) 

400 return data 

401 

402 async def _save_data(self, data: dict[str, Record]): 

403 """Save all data to file atomically.""" 

404 # Convert records to dictionaries 

405 raw_data = {} 

406 for record_id, record in data.items(): 

407 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False) 

408 

409 # Write to temporary file first 

410 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".") 

411 os.close(temp_fd) 

412 

413 try: 

414 with self._file_lock: 

415 self.handler.save(temp_path, raw_data) 

416 # Atomic rename 

417 os.replace(temp_path, self.filepath) 

418 except Exception: 

419 # Clean up temp file on error 

420 if os.path.exists(temp_path): 

421 os.remove(temp_path) 

422 raise 

423 

424 async def create(self, record: Record) -> str: 

425 """Create a new record in the file.""" 

426 async with self._lock: 

427 data = await self._load_data() 

428 # Use centralized method to prepare record 

429 record_copy, storage_id = self._prepare_record_for_storage(record) 

430 data[storage_id] = record_copy 

431 await self._save_data(data) 

432 return storage_id 

433 

434 async def read(self, id: str) -> Record | None: 

435 """Read a record from the file.""" 

436 async with self._lock: 

437 data = await self._load_data() 

438 record = data.get(id) 

439 # Use centralized method to prepare record 

440 return self._prepare_record_from_storage(record, id) 

441 

442 async def update(self, id: str, record: Record) -> bool: 

443 """Update a record in the file.""" 

444 async with self._lock: 

445 data = await self._load_data() 

446 if id in data: 

447 data[id] = record.copy(deep=True) 

448 await self._save_data(data) 

449 return True 

450 return False 

451 

452 async def delete(self, id: str) -> bool: 

453 """Delete a record from the file.""" 

454 async with self._lock: 

455 data = await self._load_data() 

456 if id in data: 

457 del data[id] 

458 await self._save_data(data) 

459 return True 

460 return False 

461 

462 async def exists(self, id: str) -> bool: 

463 """Check if a record exists in the file.""" 

464 async with self._lock: 

465 data = await self._load_data() 

466 return id in data 

467 

468 async def upsert(self, id: str, record: Record) -> str: 

469 """Update or insert a record with the specified ID.""" 

470 async with self._lock: 

471 data = await self._load_data() 

472 data[id] = record.copy(deep=True) 

473 await self._save_data(data) 

474 return id 

475 

476 async def search(self, query: Query) -> list[Record]: 

477 """Search for records matching the query.""" 

478 async with self._lock: 

479 data = await self._load_data() 

480 results = [] 

481 

482 for record_id, record in data.items(): 

483 # Apply filters 

484 matches = True 

485 for filter in query.filters: 

486 field_value = record.get_value(filter.field) 

487 if not filter.matches(field_value): 

488 matches = False 

489 break 

490 

491 if matches: 

492 results.append((record_id, record)) 

493 

494 # Use the helper method from base class 

495 return self._process_search_results(results, query, deep_copy=True) 

496 

497 async def _count_all(self) -> int: 

498 """Count all records in the file.""" 

499 async with self._lock: 

500 data = await self._load_data() 

501 return len(data) 

502 

503 async def clear(self) -> int: 

504 """Clear all records from the file.""" 

505 async with self._lock: 

506 data = await self._load_data() 

507 count = len(data) 

508 await self._save_data({}) 

509 return count 

510 

511 async def create_batch(self, records: list[Record]) -> list[str]: 

512 """Create multiple records efficiently.""" 

513 async with self._lock: 

514 data = await self._load_data() 

515 ids = [] 

516 for record in records: 

517 record_id = self._generate_id() 

518 data[record_id] = record.copy(deep=True) 

519 ids.append(record_id) 

520 await self._save_data(data) 

521 return ids 

522 

523 async def read_batch(self, ids: list[str]) -> list[Record | None]: 

524 """Read multiple records efficiently.""" 

525 async with self._lock: 

526 data = await self._load_data() 

527 results = [] 

528 for record_id in ids: 

529 record = data.get(record_id) 

530 results.append(record.copy(deep=True) if record else None) 

531 return results 

532 

533 async def delete_batch(self, ids: list[str]) -> list[bool]: 

534 """Delete multiple records efficiently.""" 

535 async with self._lock: 

536 data = await self._load_data() 

537 results = [] 

538 modified = False 

539 for record_id in ids: 

540 if record_id in data: 

541 del data[record_id] 

542 results.append(True) 

543 modified = True 

544 else: 

545 results.append(False) 

546 

547 if modified: 

548 await self._save_data(data) 

549 

550 return results 

551 

552 async def stream_read( 

553 self, 

554 query: Query | None = None, 

555 config: StreamConfig | None = None 

556 ) -> AsyncIterator[Record]: 

557 """Stream records from file.""" 

558 # For file backend, we can use the default implementation 

559 # since we need to load all data anyway 

560 config = config or StreamConfig() 

561 

562 # Use search to get all matching records 

563 if query: 

564 records = await self.search(query) 

565 else: 

566 records = await self.search(Query()) 

567 

568 # Yield records in batches for consistency 

569 for i in range(0, len(records), config.batch_size): 

570 batch = records[i:i + config.batch_size] 

571 for record in batch: 

572 yield record 

573 

574 async def stream_write( 

575 self, 

576 records: AsyncIterator[Record], 

577 config: StreamConfig | None = None 

578 ) -> StreamResult: 

579 """Stream records into file.""" 

580 # Use the default implementation from mixin 

581 return await self._default_stream_write(records, config) 

582 

583 async def vector_search( 

584 self, 

585 query_vector, 

586 vector_field: str = "embedding", 

587 k: int = 10, 

588 filter=None, 

589 metric=None, 

590 **kwargs 

591 ): 

592 """Perform vector similarity search using Python calculations. 

593  

594 Note: This implementation reads all records from disk to perform 

595 the search locally. For better performance with large datasets, 

596 consider using SQLite or a dedicated vector database. 

597 """ 

598 return await self.python_vector_search_async( 

599 query_vector=query_vector, 

600 vector_field=vector_field, 

601 k=k, 

602 filter=filter, 

603 metric=metric, 

604 **kwargs 

605 ) 

606 

607 

608class SyncFileDatabase( # type: ignore[misc] 

609 SyncDatabase, 

610 StreamingMixin, 

611 ConfigurableBase, 

612 VectorConfigMixin, 

613 SQLiteVectorSupport, 

614 PythonVectorSearchMixin, 

615 BulkEmbedMixin, 

616 VectorOperationsMixin 

617): 

618 """Synchronous file-based database implementation.""" 

619 

620 FORMAT_HANDLERS = { 

621 ".json": JSONFormat, 

622 ".csv": CSVFormat, 

623 ".tsv": CSVFormat, 

624 ".parquet": ParquetFormat, 

625 ".pq": ParquetFormat, 

626 } 

627 

628 def __init__(self, config: dict[str, Any] | None = None): 

629 super().__init__(config) 

630 self.filepath = self.config.get("path", "data.json") 

631 self.format = self.config.get("format") 

632 self.compression = self.config.get("compression", None) 

633 self._lock = threading.RLock() 

634 self._file_lock = FileLock(self.filepath) 

635 

636 # Detect format from file extension if not specified 

637 if not self.format: 

638 path = Path(self.filepath) 

639 # Check for compression 

640 if path.suffix == ".gz": 

641 self.compression = "gzip" 

642 path = Path(path.stem) 

643 

644 ext = path.suffix.lower() 

645 if ext in self.FORMAT_HANDLERS: 

646 self.format = ext.lstrip(".") 

647 else: 

648 self.format = "json" # Default to JSON 

649 

650 # Apply compression to filepath if specified 

651 if self.compression == "gzip" and not self.filepath.endswith(".gz"): 

652 self.filepath += ".gz" 

653 

654 # Get the appropriate format handler 

655 ext = f".{self.format}" 

656 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat) 

657 

658 # Initialize vector support 

659 self._parse_vector_config(config or {}) 

660 self._init_vector_state() 

661 

662 @classmethod 

663 def from_config(cls, config: dict) -> SyncFileDatabase: 

664 """Create from config dictionary.""" 

665 return cls(config) 

666 

667 def _generate_id(self) -> str: 

668 """Generate a unique ID for a record.""" 

669 return str(uuid.uuid4()) 

670 

671 def _load_data(self) -> dict[str, Record]: 

672 """Load all data from file.""" 

673 with self._file_lock: 

674 raw_data = self.handler.load(self.filepath) 

675 data = {} 

676 for record_id, record_dict in raw_data.items(): 

677 data[record_id] = Record.from_dict(record_dict) 

678 return data 

679 

680 def _save_data(self, data: dict[str, Record]): 

681 """Save all data to file atomically.""" 

682 # Convert records to dictionaries 

683 raw_data = {} 

684 for record_id, record in data.items(): 

685 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False) 

686 

687 # Write to temporary file first 

688 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".") 

689 os.close(temp_fd) 

690 

691 try: 

692 with self._file_lock: 

693 self.handler.save(temp_path, raw_data) 

694 # Atomic rename 

695 os.replace(temp_path, self.filepath) 

696 except Exception: 

697 # Clean up temp file on error 

698 if os.path.exists(temp_path): 

699 os.remove(temp_path) 

700 raise 

701 

702 def _do_set_data(self, data: dict[str, Record], record: Record) -> str: 

703 """Ensure record has a storage ID, set data[id]=record.copy() and return the ID""" 

704 # Use centralized method to prepare record 

705 record_copy, storage_id = self._prepare_record_for_storage(record) 

706 

707 # Store the record 

708 data[storage_id] = record_copy 

709 return storage_id 

710 

711 def create(self, record: Record) -> str: 

712 """Create a new record in the file.""" 

713 with self._lock: 

714 data = self._load_data() 

715 # Use record's ID if it has one, otherwise generate a new one 

716 record_id = self._do_set_data(data, record) 

717 self._save_data(data) 

718 return record_id 

719 

720 def read(self, id: str) -> Record | None: 

721 """Read a record from the file.""" 

722 with self._lock: 

723 data = self._load_data() 

724 record = data.get(id) 

725 # Use centralized method to prepare record 

726 return self._prepare_record_from_storage(record, id) 

727 

728 def update(self, id: str, record: Record) -> bool: 

729 """Update a record in the file.""" 

730 with self._lock: 

731 data = self._load_data() 

732 if id in data: 

733 data[id] = record.copy(deep=True) 

734 self._save_data(data) 

735 return True 

736 return False 

737 

738 def delete(self, id: str) -> bool: 

739 """Delete a record from the file.""" 

740 with self._lock: 

741 data = self._load_data() 

742 if id in data: 

743 del data[id] 

744 self._save_data(data) 

745 return True 

746 return False 

747 

748 def exists(self, id: str) -> bool: 

749 """Check if a record exists in the file.""" 

750 with self._lock: 

751 data = self._load_data() 

752 return id in data 

753 

754 def upsert(self, id: str, record: Record) -> str: 

755 """Update or insert a record with the specified ID.""" 

756 with self._lock: 

757 data = self._load_data() 

758 data[id] = record.copy(deep=True) 

759 self._save_data(data) 

760 return id 

761 

762 def search(self, query: Query) -> list[Record]: 

763 """Search for records matching the query.""" 

764 with self._lock: 

765 data = self._load_data() 

766 results = [] 

767 

768 for record_id, record in data.items(): 

769 # Apply filters 

770 matches = True 

771 for filter in query.filters: 

772 field_value = record.get_value(filter.field) 

773 if not filter.matches(field_value): 

774 matches = False 

775 break 

776 

777 if matches: 

778 results.append((record_id, record)) 

779 

780 # Use the helper method from base class 

781 return self._process_search_results(results, query, deep_copy=True) 

782 

783 def _count_all(self) -> int: 

784 """Count all records in the file.""" 

785 with self._lock: 

786 data = self._load_data() 

787 return len(data) 

788 

789 def clear(self) -> int: 

790 """Clear all records from the file.""" 

791 with self._lock: 

792 data = self._load_data() 

793 count = len(data) 

794 self._save_data({}) 

795 return count 

796 

797 def create_batch(self, records: list[Record]) -> list[str]: 

798 """Create multiple records efficiently.""" 

799 with self._lock: 

800 data = self._load_data() 

801 ids = [] 

802 for record in records: 

803 record_id = self._do_set_data(data, record) 

804 ids.append(record_id) 

805 self._save_data(data) 

806 return ids 

807 

808 def read_batch(self, ids: list[str]) -> list[Record | None]: 

809 """Read multiple records efficiently.""" 

810 with self._lock: 

811 data = self._load_data() 

812 results = [] 

813 for record_id in ids: 

814 record = data.get(record_id) 

815 results.append(record.copy(deep=True) if record else None) 

816 return results 

817 

818 def delete_batch(self, ids: list[str]) -> list[bool]: 

819 """Delete multiple records efficiently.""" 

820 with self._lock: 

821 data = self._load_data() 

822 results = [] 

823 modified = False 

824 for record_id in ids: 

825 if record_id in data: 

826 del data[record_id] 

827 results.append(True) 

828 modified = True 

829 else: 

830 results.append(False) 

831 

832 if modified: 

833 self._save_data(data) 

834 

835 return results 

836 

837 def stream_read( 

838 self, 

839 query: Query | None = None, 

840 config: StreamConfig | None = None 

841 ) -> Iterator[Record]: 

842 """Stream records from file.""" 

843 # For file backend, we can use the default implementation 

844 # since we need to load all data anyway 

845 config = config or StreamConfig() 

846 

847 # Use search to get all matching records 

848 if query: 

849 records = self.search(query) 

850 else: 

851 records = self.search(Query()) 

852 

853 # Yield records in batches for consistency 

854 for i in range(0, len(records), config.batch_size): 

855 batch = records[i:i + config.batch_size] 

856 for record in batch: 

857 yield record 

858 

859 def stream_write( 

860 self, 

861 records: Iterator[Record], 

862 config: StreamConfig | None = None 

863 ) -> StreamResult: 

864 """Stream records into file.""" 

865 # Use the default implementation 

866 config = config or StreamConfig() 

867 result = StreamResult() 

868 start_time = time.time() 

869 quitting = False 

870 

871 def do_write_batch(batch: list) -> bool: 

872 """Write batch with individual retries, return False to quit""" 

873 retval = True 

874 try: 

875 ids = self.create_batch(batch) 

876 result.successful += len(ids) 

877 result.total_processed += len(batch) 

878 except Exception: 

879 # Try creating each item again and catch specific error items 

880 for rec in batch: 

881 result.total_processed += 1 

882 try: 

883 self.create(rec) 

884 result.successful += 1 

885 except Exception as e: 

886 # This item failed again 

887 result.failed += 1 

888 result.add_error(None, e) 

889 if config.on_error: 

890 if not config.on_error(e, rec): 

891 retval = False 

892 break 

893 else: 

894 # Without "on_error", quit streaming 

895 retval = False 

896 break 

897 return retval 

898 

899 batch = [] 

900 for record in records: 

901 batch.append(record) 

902 

903 if len(batch) >= config.batch_size: 

904 # Write batch 

905 quitting = not do_write_batch(batch) 

906 if quitting: 

907 # Got signal to quit 

908 break 

909 batch = [] 

910 

911 # Write remaining batch 

912 if batch and not quitting: 

913 do_write_batch(batch) 

914 

915 result.duration = time.time() - start_time 

916 return result 

917 

918 def vector_search( 

919 self, 

920 query_vector, 

921 vector_field: str = "embedding", 

922 k: int = 10, 

923 filter=None, 

924 metric=None, 

925 **kwargs 

926 ): 

927 """Perform vector similarity search using Python calculations. 

928  

929 Note: This implementation reads all records from disk to perform 

930 the search locally. For better performance with large datasets, 

931 consider using SQLite or a dedicated vector database. 

932 """ 

933 return self.python_vector_search_sync( 

934 query_vector=query_vector, 

935 vector_field=vector_field, 

936 k=k, 

937 filter=filter, 

938 metric=metric, 

939 **kwargs 

940 )