Coverage for src/dataknobs_data/backends/file.py: 15%

634 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""File-based database backend implementation.""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import csv 

7import gzip 

8import json 

9import os 

10import platform 

11import tempfile 

12import threading 

13import time 

14import uuid 

15from pathlib import Path 

16from typing import Any, TYPE_CHECKING 

17 

18from dataknobs_config import ConfigurableBase 

19 

20from ..database import AsyncDatabase, SyncDatabase 

21from ..query import Query 

22from ..records import Record 

23from ..streaming import AsyncStreamingMixin, StreamConfig, StreamingMixin, StreamResult 

24from ..vector import VectorOperationsMixin 

25from ..vector.bulk_embed_mixin import BulkEmbedMixin 

26from ..vector.python_vector_search import PythonVectorSearchMixin 

27from .sqlite_mixins import SQLiteVectorSupport 

28from .vector_config_mixin import VectorConfigMixin 

29 

30if TYPE_CHECKING: 

31 from collections.abc import AsyncIterator, Iterator 

32 

33 

34class FileLock: 

35 """Cross-platform file locking.""" 

36 

37 def __init__(self, filepath: str): 

38 self.filepath = filepath 

39 self.lockfile = filepath + ".lock" 

40 self.lock_handle = None 

41 

42 def acquire(self): 

43 """Acquire the file lock.""" 

44 if platform.system() == "Windows": 

45 import msvcrt 

46 

47 while True: 

48 try: 

49 self.lock_handle = open(self.lockfile, "wb") 

50 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_NBLCK, 1) 

51 break 

52 except OSError: 

53 if self.lock_handle: 

54 self.lock_handle.close() # type: ignore[unreachable] 

55 import time 

56 

57 time.sleep(0.01) 

58 else: 

59 import fcntl 

60 

61 self.lock_handle = open(self.lockfile, "wb") 

62 fcntl.lockf(self.lock_handle, fcntl.LOCK_EX) 

63 

64 def release(self): 

65 """Release the file lock.""" 

66 if self.lock_handle: 

67 if platform.system() == "Windows": # type: ignore[unreachable] 

68 import msvcrt 

69 

70 try: 

71 msvcrt.locking(self.lock_handle.fileno(), msvcrt.LK_UNLCK, 1) 

72 except OSError: 

73 pass 

74 self.lock_handle.close() 

75 try: 

76 os.remove(self.lockfile) 

77 except (OSError, FileNotFoundError): 

78 pass 

79 

80 def __enter__(self): 

81 self.acquire() 

82 return self 

83 

84 def __exit__(self, exc_type, exc_val, exc_tb): 

85 self.release() 

86 

87 

88class FileFormat: 

89 """Base class for file format handlers.""" 

90 

91 @staticmethod 

92 def load(filepath: str) -> dict[str, dict[str, Any]]: 

93 """Load data from file.""" 

94 raise NotImplementedError 

95 

96 @staticmethod 

97 def save(filepath: str, data: dict[str, dict[str, Any]]): 

98 """Save data to file.""" 

99 raise NotImplementedError 

100 

101 

102class JSONFormat(FileFormat): 

103 """JSON file format handler.""" 

104 

105 @staticmethod 

106 def load(filepath: str) -> dict[str, dict[str, Any]]: 

107 """Load data from JSON file.""" 

108 if not os.path.exists(filepath): 

109 return {} 

110 

111 # Check if file is empty 

112 if os.path.getsize(filepath) == 0: 

113 return {} 

114 

115 try: 

116 if filepath.endswith(".gz"): 

117 try: 

118 with gzip.open(filepath, "rt", encoding="utf-8") as f: 

119 content = f.read() 

120 if not content.strip(): 

121 return {} 

122 data = json.loads(content) 

123 except (gzip.BadGzipFile, OSError): 

124 # File has .gz extension but isn't gzipped, treat as regular file 

125 with open(filepath, encoding="utf-8") as f: 

126 content = f.read() 

127 if not content.strip(): 

128 return {} 

129 data = json.loads(content) 

130 else: 

131 with open(filepath, encoding="utf-8") as f: 

132 content = f.read() 

133 if not content.strip(): 

134 return {} 

135 data = json.loads(content) 

136 

137 return data 

138 except json.JSONDecodeError: 

139 return {} 

140 

141 @staticmethod 

142 def save(filepath: str, data: dict[str, dict[str, Any]]): 

143 """Save data to JSON file.""" 

144 if filepath.endswith(".gz"): 

145 with gzip.open(filepath, "wt", encoding="utf-8") as f: 

146 json.dump(data, f, indent=2, ensure_ascii=False) 

147 else: 

148 with open(filepath, "w", encoding="utf-8") as f: 

149 json.dump(data, f, indent=2, ensure_ascii=False) 

150 

151 

152class CSVFormat(FileFormat): 

153 """CSV file format handler.""" 

154 

155 @staticmethod 

156 def load(filepath: str) -> dict[str, dict[str, Any]]: 

157 """Load data from CSV file.""" 

158 if not os.path.exists(filepath): 

159 return {} 

160 

161 # Check if file is empty 

162 if os.path.getsize(filepath) == 0: 

163 return {} 

164 

165 data = {} 

166 try: 

167 if filepath.endswith(".gz"): 

168 with gzip.open(filepath, "rt", encoding="utf-8") as f: 

169 reader = csv.DictReader(f) 

170 for row in reader: 

171 if "__id__" in row: 

172 record_id = row.pop("__id__") 

173 # Try to deserialize JSON strings back to objects 

174 fields = {} 

175 for key, value in row.items(): 

176 if value and isinstance(value, str): 

177 # Try to parse as JSON if it looks like JSON 

178 if (value.startswith('{') and value.endswith('}')) or \ 

179 (value.startswith('[') and value.endswith(']')): 

180 try: 

181 fields[key] = json.loads(value) 

182 except json.JSONDecodeError: 

183 fields[key] = value 

184 else: 

185 fields[key] = value 

186 else: 

187 fields[key] = value 

188 data[record_id] = {"fields": fields} 

189 else: 

190 with open(filepath, encoding="utf-8") as f: 

191 reader = csv.DictReader(f) 

192 for row in reader: 

193 if "__id__" in row: 

194 record_id = row.pop("__id__") 

195 # Try to deserialize JSON strings back to objects 

196 fields = {} 

197 for key, value in row.items(): 

198 if value and isinstance(value, str): 

199 # Try to parse as JSON if it looks like JSON 

200 if (value.startswith('{') and value.endswith('}')) or \ 

201 (value.startswith('[') and value.endswith(']')): 

202 try: 

203 fields[key] = json.loads(value) 

204 except json.JSONDecodeError: 

205 fields[key] = value 

206 else: 

207 fields[key] = value 

208 else: 

209 fields[key] = value 

210 data[record_id] = {"fields": fields} 

211 except (OSError, csv.Error): 

212 return {} 

213 

214 return data 

215 

216 @staticmethod 

217 def save(filepath: str, data: dict[str, dict[str, Any]]): 

218 """Save data to CSV file.""" 

219 if not data: 

220 if filepath.endswith(".gz"): 

221 with gzip.open(filepath, "wt", encoding="utf-8") as f: 

222 f.write("") 

223 else: 

224 with open(filepath, "w", encoding="utf-8") as f: 

225 f.write("") 

226 return 

227 

228 # Extract all field names and prepare flattened data 

229 all_fields = set() 

230 flattened_data = {} 

231 for record_id, record_data in data.items(): 

232 if "fields" in record_data: 

233 # Flatten field values for CSV format 

234 flat_fields = {} 

235 for field_name, field_data in record_data["fields"].items(): 

236 # Handle both full field dicts and simple values 

237 if isinstance(field_data, dict) and "value" in field_data: 

238 value = field_data["value"] 

239 else: 

240 value = field_data 

241 

242 # Serialize complex types as JSON strings 

243 if isinstance(value, (dict, list)): 

244 flat_fields[field_name] = json.dumps(value) 

245 else: 

246 flat_fields[field_name] = value 

247 all_fields.add(field_name) 

248 flattened_data[record_id] = flat_fields 

249 

250 fieldnames = ["__id__"] + sorted(list(all_fields)) 

251 

252 if filepath.endswith(".gz"): 

253 with gzip.open(filepath, "wt", encoding="utf-8", newline="") as f: 

254 writer = csv.DictWriter(f, fieldnames=fieldnames) 

255 writer.writeheader() 

256 for record_id, fields in flattened_data.items(): 

257 row = {"__id__": record_id} 

258 row.update(fields) 

259 writer.writerow(row) 

260 else: 

261 with open(filepath, "w", encoding="utf-8", newline="") as f: 

262 writer = csv.DictWriter(f, fieldnames=fieldnames) 

263 writer.writeheader() 

264 for record_id, fields in flattened_data.items(): 

265 row = {"__id__": record_id} 

266 row.update(fields) 

267 writer.writerow(row) 

268 

269 

270class ParquetFormat(FileFormat): 

271 """Parquet file format handler.""" 

272 

273 @staticmethod 

274 def load(filepath: str) -> dict[str, dict[str, Any]]: 

275 """Load data from Parquet file.""" 

276 if not os.path.exists(filepath): 

277 return {} 

278 

279 try: 

280 import pandas as pd 

281 

282 df = pd.read_parquet(filepath) 

283 data = {} 

284 

285 for idx, row in df.iterrows(): 

286 row_dict = row.to_dict() 

287 if "__id__" in row_dict: 

288 record_id = row_dict.pop("__id__") 

289 else: 

290 record_id = str(idx) 

291 

292 # Remove NaN values 

293 fields = {k: v for k, v in row_dict.items() if pd.notna(v)} 

294 data[record_id] = {"fields": fields} 

295 

296 return data 

297 except ImportError as e: 

298 raise ImportError("Parquet support requires pandas and pyarrow packages") from e 

299 

300 @staticmethod 

301 def save(filepath: str, data: dict[str, dict[str, Any]]): 

302 """Save data to Parquet file.""" 

303 try: 

304 import pandas as pd 

305 

306 if not data: 

307 # Create empty DataFrame 

308 df = pd.DataFrame() 

309 else: 

310 rows = [] 

311 for record_id, record_data in data.items(): 

312 row = {"__id__": record_id} 

313 if "fields" in record_data: 

314 # Flatten field values for Parquet format 

315 for field_name, field_data in record_data["fields"].items(): 

316 # Handle both full field dicts and simple values 

317 if isinstance(field_data, dict) and "value" in field_data: 

318 row[field_name] = field_data["value"] 

319 else: 

320 row[field_name] = field_data 

321 rows.append(row) 

322 

323 df = pd.DataFrame(rows) 

324 

325 df.to_parquet(filepath, index=False, compression="snappy") 

326 except ImportError as e: 

327 raise ImportError("Parquet support requires pandas and pyarrow packages") from e 

328 

329 

330class AsyncFileDatabase( # type: ignore[misc] 

331 AsyncDatabase, 

332 AsyncStreamingMixin, 

333 ConfigurableBase, 

334 VectorConfigMixin, 

335 SQLiteVectorSupport, 

336 PythonVectorSearchMixin, 

337 BulkEmbedMixin, 

338 VectorOperationsMixin 

339): 

340 """Async file-based database implementation.""" 

341 

342 FORMAT_HANDLERS = { 

343 ".json": JSONFormat, 

344 ".csv": CSVFormat, 

345 ".tsv": CSVFormat, 

346 ".parquet": ParquetFormat, 

347 ".pq": ParquetFormat, 

348 } 

349 

350 def __init__(self, config: dict[str, Any] | None = None): 

351 super().__init__(config) 

352 

353 # If no path specified, use a temporary file instead of polluting CWD 

354 if "path" not in self.config: 

355 # Create a unique temporary file that won't conflict 

356 temp_file = tempfile.NamedTemporaryFile( 

357 prefix="dataknobs_async_db_", 

358 suffix=".json", 

359 delete=False 

360 ) 

361 self.filepath = temp_file.name 

362 temp_file.close() 

363 self._is_temp_file = True 

364 else: 

365 self.filepath = self.config["path"] 

366 self._is_temp_file = False 

367 

368 self.format = self.config.get("format") 

369 self.compression = self.config.get("compression", None) 

370 self._lock = asyncio.Lock() 

371 self._file_lock = FileLock(self.filepath) 

372 

373 # Detect format from file extension if not specified 

374 if not self.format: 

375 path = Path(self.filepath) 

376 # Check for compression 

377 if path.suffix == ".gz": 

378 self.compression = "gzip" 

379 path = Path(path.stem) 

380 

381 ext = path.suffix.lower() 

382 if ext in self.FORMAT_HANDLERS: 

383 self.format = ext.lstrip(".") 

384 else: 

385 self.format = "json" # Default to JSON 

386 

387 # Apply compression to filepath if specified 

388 if self.compression == "gzip" and not self.filepath.endswith(".gz"): 

389 self.filepath += ".gz" 

390 

391 # Get the appropriate format handler 

392 ext = f".{self.format}" 

393 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat) 

394 

395 # Initialize vector support 

396 self._parse_vector_config(config or {}) 

397 self._init_vector_state() 

398 

399 @classmethod 

400 def from_config(cls, config: dict) -> AsyncFileDatabase: 

401 """Create from config dictionary.""" 

402 return cls(config) 

403 

404 def _generate_id(self) -> str: 

405 """Generate a unique ID for a record.""" 

406 return str(uuid.uuid4()) 

407 

408 async def _load_data(self) -> dict[str, Record]: 

409 """Load all data from file.""" 

410 with self._file_lock: 

411 raw_data = self.handler.load(self.filepath) 

412 data = {} 

413 for record_id, record_dict in raw_data.items(): 

414 data[record_id] = Record.from_dict(record_dict) 

415 return data 

416 

417 async def _save_data(self, data: dict[str, Record]): 

418 """Save all data to file atomically.""" 

419 # Convert records to dictionaries 

420 raw_data = {} 

421 for record_id, record in data.items(): 

422 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False) 

423 

424 # Write to temporary file first 

425 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".") 

426 os.close(temp_fd) 

427 

428 try: 

429 with self._file_lock: 

430 self.handler.save(temp_path, raw_data) 

431 # Atomic rename 

432 os.replace(temp_path, self.filepath) 

433 except Exception: 

434 # Clean up temp file on error 

435 if os.path.exists(temp_path): 

436 os.remove(temp_path) 

437 raise 

438 

439 async def create(self, record: Record) -> str: 

440 """Create a new record in the file.""" 

441 async with self._lock: 

442 data = await self._load_data() 

443 # Use centralized method to prepare record 

444 record_copy, storage_id = self._prepare_record_for_storage(record) 

445 data[storage_id] = record_copy 

446 await self._save_data(data) 

447 return storage_id 

448 

449 async def read(self, id: str) -> Record | None: 

450 """Read a record from the file.""" 

451 async with self._lock: 

452 data = await self._load_data() 

453 record = data.get(id) 

454 # Use centralized method to prepare record 

455 return self._prepare_record_from_storage(record, id) 

456 

457 async def update(self, id: str, record: Record) -> bool: 

458 """Update a record in the file.""" 

459 async with self._lock: 

460 data = await self._load_data() 

461 if id in data: 

462 data[id] = record.copy(deep=True) 

463 await self._save_data(data) 

464 return True 

465 return False 

466 

467 async def delete(self, id: str) -> bool: 

468 """Delete a record from the file.""" 

469 async with self._lock: 

470 data = await self._load_data() 

471 if id in data: 

472 del data[id] 

473 await self._save_data(data) 

474 return True 

475 return False 

476 

477 async def exists(self, id: str) -> bool: 

478 """Check if a record exists in the file.""" 

479 async with self._lock: 

480 data = await self._load_data() 

481 return id in data 

482 

483 async def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str: 

484 """Update or insert a record. 

485  

486 Can be called as: 

487 - upsert(id, record) - explicit ID and record 

488 - upsert(record) - extract ID from record using Record's built-in logic 

489 """ 

490 # Determine ID and record based on arguments 

491 if isinstance(id_or_record, str): 

492 id = id_or_record 

493 if record is None: 

494 raise ValueError("Record required when ID is provided") 

495 else: 

496 record = id_or_record 

497 id = record.id 

498 if id is None: 

499 import uuid # type: ignore[unreachable] 

500 id = str(uuid.uuid4()) 

501 record.storage_id = id 

502 

503 async with self._lock: 

504 data = await self._load_data() 

505 data[id] = record.copy(deep=True) 

506 await self._save_data(data) 

507 return id 

508 

509 async def search(self, query: Query) -> list[Record]: 

510 """Search for records matching the query.""" 

511 async with self._lock: 

512 data = await self._load_data() 

513 results = [] 

514 

515 for record_id, record in data.items(): 

516 # Apply filters 

517 matches = True 

518 for filter in query.filters: 

519 # Special handling for 'id' field 

520 if filter.field == 'id': 

521 field_value = record_id 

522 else: 

523 field_value = record.get_value(filter.field) 

524 if not filter.matches(field_value): 

525 matches = False 

526 break 

527 

528 if matches: 

529 results.append((record_id, record)) 

530 

531 # Use the helper method from base class 

532 return self._process_search_results(results, query, deep_copy=True) 

533 

534 async def _count_all(self) -> int: 

535 """Count all records in the file.""" 

536 async with self._lock: 

537 data = await self._load_data() 

538 return len(data) 

539 

540 async def clear(self) -> int: 

541 """Clear all records from the file.""" 

542 async with self._lock: 

543 data = await self._load_data() 

544 count = len(data) 

545 await self._save_data({}) 

546 return count 

547 

548 async def create_batch(self, records: list[Record]) -> list[str]: 

549 """Create multiple records efficiently.""" 

550 async with self._lock: 

551 data = await self._load_data() 

552 ids = [] 

553 for record in records: 

554 record_id = self._generate_id() 

555 data[record_id] = record.copy(deep=True) 

556 ids.append(record_id) 

557 await self._save_data(data) 

558 return ids 

559 

560 async def read_batch(self, ids: list[str]) -> list[Record | None]: 

561 """Read multiple records efficiently.""" 

562 async with self._lock: 

563 data = await self._load_data() 

564 results = [] 

565 for record_id in ids: 

566 record = data.get(record_id) 

567 results.append(record.copy(deep=True) if record else None) 

568 return results 

569 

570 async def delete_batch(self, ids: list[str]) -> list[bool]: 

571 """Delete multiple records efficiently.""" 

572 async with self._lock: 

573 data = await self._load_data() 

574 results = [] 

575 modified = False 

576 for record_id in ids: 

577 if record_id in data: 

578 del data[record_id] 

579 results.append(True) 

580 modified = True 

581 else: 

582 results.append(False) 

583 

584 if modified: 

585 await self._save_data(data) 

586 

587 return results 

588 

589 async def stream_read( 

590 self, 

591 query: Query | None = None, 

592 config: StreamConfig | None = None 

593 ) -> AsyncIterator[Record]: 

594 """Stream records from file.""" 

595 # For file backend, we can use the default implementation 

596 # since we need to load all data anyway 

597 config = config or StreamConfig() 

598 

599 # Use search to get all matching records 

600 if query: 

601 records = await self.search(query) 

602 else: 

603 records = await self.search(Query()) 

604 

605 # Yield records in batches for consistency 

606 for i in range(0, len(records), config.batch_size): 

607 batch = records[i:i + config.batch_size] 

608 for record in batch: 

609 yield record 

610 

611 async def stream_write( 

612 self, 

613 records: AsyncIterator[Record], 

614 config: StreamConfig | None = None 

615 ) -> StreamResult: 

616 """Stream records into file.""" 

617 # Use the default implementation from mixin 

618 return await self._default_stream_write(records, config) 

619 

620 async def vector_search( 

621 self, 

622 query_vector, 

623 vector_field: str = "embedding", 

624 k: int = 10, 

625 filter=None, 

626 metric=None, 

627 **kwargs 

628 ): 

629 """Perform vector similarity search using Python calculations. 

630  

631 Note: This implementation reads all records from disk to perform 

632 the search locally. For better performance with large datasets, 

633 consider using SQLite or a dedicated vector database. 

634 """ 

635 return await self.python_vector_search_async( 

636 query_vector=query_vector, 

637 vector_field=vector_field, 

638 k=k, 

639 filter=filter, 

640 metric=metric, 

641 **kwargs 

642 ) 

643 

644 async def close(self) -> None: 

645 """Close the database and clean up temporary files if needed.""" 

646 # Clean up temporary file if it was created 

647 if getattr(self, '_is_temp_file', False) and self.filepath: 

648 try: 

649 if os.path.exists(self.filepath): 

650 Path(self.filepath).unlink() 

651 # Also remove lock file if it exists 

652 lock_file = self.filepath + ".lock" 

653 if os.path.exists(lock_file): 

654 Path(lock_file).unlink() 

655 except OSError: 

656 pass # Best effort cleanup 

657 

658 

659class SyncFileDatabase( # type: ignore[misc] 

660 SyncDatabase, 

661 StreamingMixin, 

662 ConfigurableBase, 

663 VectorConfigMixin, 

664 SQLiteVectorSupport, 

665 PythonVectorSearchMixin, 

666 BulkEmbedMixin, 

667 VectorOperationsMixin 

668): 

669 """Synchronous file-based database implementation.""" 

670 

671 FORMAT_HANDLERS = { 

672 ".json": JSONFormat, 

673 ".csv": CSVFormat, 

674 ".tsv": CSVFormat, 

675 ".parquet": ParquetFormat, 

676 ".pq": ParquetFormat, 

677 } 

678 

679 def __init__(self, config: dict[str, Any] | None = None): 

680 super().__init__(config) 

681 

682 # If no path specified, use a temporary file instead of polluting CWD 

683 if "path" not in self.config: 

684 # Create a unique temporary file that won't conflict 

685 temp_file = tempfile.NamedTemporaryFile( 

686 prefix="dataknobs_sync_db_", 

687 suffix=".json", 

688 delete=False 

689 ) 

690 self.filepath = temp_file.name 

691 temp_file.close() 

692 self._is_temp_file = True 

693 else: 

694 self.filepath = self.config["path"] 

695 self._is_temp_file = False 

696 

697 self.format = self.config.get("format") 

698 self.compression = self.config.get("compression", None) 

699 self._lock = threading.RLock() 

700 self._file_lock = FileLock(self.filepath) 

701 

702 # Detect format from file extension if not specified 

703 if not self.format: 

704 path = Path(self.filepath) 

705 # Check for compression 

706 if path.suffix == ".gz": 

707 self.compression = "gzip" 

708 path = Path(path.stem) 

709 

710 ext = path.suffix.lower() 

711 if ext in self.FORMAT_HANDLERS: 

712 self.format = ext.lstrip(".") 

713 else: 

714 self.format = "json" # Default to JSON 

715 

716 # Apply compression to filepath if specified 

717 if self.compression == "gzip" and not self.filepath.endswith(".gz"): 

718 self.filepath += ".gz" 

719 

720 # Get the appropriate format handler 

721 ext = f".{self.format}" 

722 self.handler = self.FORMAT_HANDLERS.get(ext, JSONFormat) 

723 

724 # Initialize vector support 

725 self._parse_vector_config(config or {}) 

726 self._init_vector_state() 

727 

728 @classmethod 

729 def from_config(cls, config: dict) -> SyncFileDatabase: 

730 """Create from config dictionary.""" 

731 return cls(config) 

732 

733 def _generate_id(self) -> str: 

734 """Generate a unique ID for a record.""" 

735 return str(uuid.uuid4()) 

736 

737 def _load_data(self) -> dict[str, Record]: 

738 """Load all data from file.""" 

739 with self._file_lock: 

740 raw_data = self.handler.load(self.filepath) 

741 data = {} 

742 for record_id, record_dict in raw_data.items(): 

743 data[record_id] = Record.from_dict(record_dict) 

744 return data 

745 

746 def _save_data(self, data: dict[str, Record]): 

747 """Save all data to file atomically.""" 

748 # Convert records to dictionaries 

749 raw_data = {} 

750 for record_id, record in data.items(): 

751 raw_data[record_id] = record.to_dict(include_metadata=True, flatten=False) 

752 

753 # Write to temporary file first 

754 temp_fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(self.filepath) or ".") 

755 os.close(temp_fd) 

756 

757 try: 

758 with self._file_lock: 

759 self.handler.save(temp_path, raw_data) 

760 # Atomic rename 

761 os.replace(temp_path, self.filepath) 

762 except Exception: 

763 # Clean up temp file on error 

764 if os.path.exists(temp_path): 

765 os.remove(temp_path) 

766 raise 

767 

768 def _do_set_data(self, data: dict[str, Record], record: Record) -> str: 

769 """Ensure record has a storage ID, set data[id]=record.copy() and return the ID""" 

770 # Use centralized method to prepare record 

771 record_copy, storage_id = self._prepare_record_for_storage(record) 

772 

773 # Store the record 

774 data[storage_id] = record_copy 

775 return storage_id 

776 

777 def create(self, record: Record) -> str: 

778 """Create a new record in the file.""" 

779 with self._lock: 

780 data = self._load_data() 

781 # Use record's ID if it has one, otherwise generate a new one 

782 record_id = self._do_set_data(data, record) 

783 self._save_data(data) 

784 return record_id 

785 

786 def read(self, id: str) -> Record | None: 

787 """Read a record from the file.""" 

788 with self._lock: 

789 data = self._load_data() 

790 record = data.get(id) 

791 # Use centralized method to prepare record 

792 return self._prepare_record_from_storage(record, id) 

793 

794 def update(self, id: str, record: Record) -> bool: 

795 """Update a record in the file.""" 

796 with self._lock: 

797 data = self._load_data() 

798 if id in data: 

799 data[id] = record.copy(deep=True) 

800 self._save_data(data) 

801 return True 

802 return False 

803 

804 def delete(self, id: str) -> bool: 

805 """Delete a record from the file.""" 

806 with self._lock: 

807 data = self._load_data() 

808 if id in data: 

809 del data[id] 

810 self._save_data(data) 

811 return True 

812 return False 

813 

814 def exists(self, id: str) -> bool: 

815 """Check if a record exists in the file.""" 

816 with self._lock: 

817 data = self._load_data() 

818 return id in data 

819 

820 def upsert(self, id_or_record: str | Record, record: Record | None = None) -> str: 

821 """Update or insert a record. 

822  

823 Can be called as: 

824 - upsert(id, record) - explicit ID and record 

825 - upsert(record) - extract ID from record using Record's built-in logic 

826 """ 

827 # Determine ID and record based on arguments 

828 if isinstance(id_or_record, str): 

829 id = id_or_record 

830 if record is None: 

831 raise ValueError("Record required when ID is provided") 

832 else: 

833 record = id_or_record 

834 id = record.id 

835 if id is None: 

836 import uuid # type: ignore[unreachable] 

837 id = str(uuid.uuid4()) 

838 record.storage_id = id 

839 

840 with self._lock: 

841 data = self._load_data() 

842 data[id] = record.copy(deep=True) 

843 self._save_data(data) 

844 return id 

845 

846 def search(self, query: Query) -> list[Record]: 

847 """Search for records matching the query.""" 

848 with self._lock: 

849 data = self._load_data() 

850 results = [] 

851 

852 for record_id, record in data.items(): 

853 # Apply filters 

854 matches = True 

855 for filter in query.filters: 

856 # Special handling for 'id' field 

857 if filter.field == 'id': 

858 field_value = record_id 

859 else: 

860 field_value = record.get_value(filter.field) 

861 if not filter.matches(field_value): 

862 matches = False 

863 break 

864 

865 if matches: 

866 results.append((record_id, record)) 

867 

868 # Use the helper method from base class 

869 return self._process_search_results(results, query, deep_copy=True) 

870 

871 def _count_all(self) -> int: 

872 """Count all records in the file.""" 

873 with self._lock: 

874 data = self._load_data() 

875 return len(data) 

876 

877 def clear(self) -> int: 

878 """Clear all records from the file.""" 

879 with self._lock: 

880 data = self._load_data() 

881 count = len(data) 

882 self._save_data({}) 

883 return count 

884 

885 def create_batch(self, records: list[Record]) -> list[str]: 

886 """Create multiple records efficiently.""" 

887 with self._lock: 

888 data = self._load_data() 

889 ids = [] 

890 for record in records: 

891 record_id = self._do_set_data(data, record) 

892 ids.append(record_id) 

893 self._save_data(data) 

894 return ids 

895 

896 def read_batch(self, ids: list[str]) -> list[Record | None]: 

897 """Read multiple records efficiently.""" 

898 with self._lock: 

899 data = self._load_data() 

900 results = [] 

901 for record_id in ids: 

902 record = data.get(record_id) 

903 results.append(record.copy(deep=True) if record else None) 

904 return results 

905 

906 def delete_batch(self, ids: list[str]) -> list[bool]: 

907 """Delete multiple records efficiently.""" 

908 with self._lock: 

909 data = self._load_data() 

910 results = [] 

911 modified = False 

912 for record_id in ids: 

913 if record_id in data: 

914 del data[record_id] 

915 results.append(True) 

916 modified = True 

917 else: 

918 results.append(False) 

919 

920 if modified: 

921 self._save_data(data) 

922 

923 return results 

924 

925 def stream_read( 

926 self, 

927 query: Query | None = None, 

928 config: StreamConfig | None = None 

929 ) -> Iterator[Record]: 

930 """Stream records from file.""" 

931 # For file backend, we can use the default implementation 

932 # since we need to load all data anyway 

933 config = config or StreamConfig() 

934 

935 # Use search to get all matching records 

936 if query: 

937 records = self.search(query) 

938 else: 

939 records = self.search(Query()) 

940 

941 # Yield records in batches for consistency 

942 for i in range(0, len(records), config.batch_size): 

943 batch = records[i:i + config.batch_size] 

944 for record in batch: 

945 yield record 

946 

947 def stream_write( 

948 self, 

949 records: Iterator[Record], 

950 config: StreamConfig | None = None 

951 ) -> StreamResult: 

952 """Stream records into file.""" 

953 # Use the default implementation 

954 config = config or StreamConfig() 

955 result = StreamResult() 

956 start_time = time.time() 

957 quitting = False 

958 

959 def do_write_batch(batch: list) -> bool: 

960 """Write batch with individual retries, return False to quit""" 

961 retval = True 

962 try: 

963 ids = self.create_batch(batch) 

964 result.successful += len(ids) 

965 result.total_processed += len(batch) 

966 except Exception: 

967 # Try creating each item again and catch specific error items 

968 for rec in batch: 

969 result.total_processed += 1 

970 try: 

971 self.create(rec) 

972 result.successful += 1 

973 except Exception as e: 

974 # This item failed again 

975 result.failed += 1 

976 result.add_error(None, e) 

977 if config.on_error: 

978 if not config.on_error(e, rec): 

979 retval = False 

980 break 

981 else: 

982 # Without "on_error", quit streaming 

983 retval = False 

984 break 

985 return retval 

986 

987 batch = [] 

988 for record in records: 

989 batch.append(record) 

990 

991 if len(batch) >= config.batch_size: 

992 # Write batch 

993 quitting = not do_write_batch(batch) 

994 if quitting: 

995 # Got signal to quit 

996 break 

997 batch = [] 

998 

999 # Write remaining batch 

1000 if batch and not quitting: 

1001 do_write_batch(batch) 

1002 

1003 result.duration = time.time() - start_time 

1004 return result 

1005 

1006 def vector_search( 

1007 self, 

1008 query_vector, 

1009 vector_field: str = "embedding", 

1010 k: int = 10, 

1011 filter=None, 

1012 metric=None, 

1013 **kwargs 

1014 ): 

1015 """Perform vector similarity search using Python calculations. 

1016  

1017 Note: This implementation reads all records from disk to perform 

1018 the search locally. For better performance with large datasets, 

1019 consider using SQLite or a dedicated vector database. 

1020 """ 

1021 return self.python_vector_search_sync( 

1022 query_vector=query_vector, 

1023 vector_field=vector_field, 

1024 k=k, 

1025 filter=filter, 

1026 metric=metric, 

1027 **kwargs 

1028 ) 

1029 

1030 def close(self) -> None: 

1031 """Close the database and clean up temporary files if needed.""" 

1032 # Clean up temporary file if it was created 

1033 if getattr(self, '_is_temp_file', False) and self.filepath: 

1034 try: 

1035 if os.path.exists(self.filepath): 

1036 Path(self.filepath).unlink() 

1037 # Also remove lock file if it exists 

1038 lock_file = self.filepath + ".lock" 

1039 if os.path.exists(lock_file): 

1040 Path(lock_file).unlink() 

1041 except OSError: 

1042 pass # Best effort cleanup