Coverage for src/dataknobs_data/records.py: 43%

256 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:34 -0700

1"""Structured data records with typed fields and metadata. 

2 

3This module defines the Record class for representing structured data with 

4typed fields, validation, and conversion utilities for database operations. 

5""" 

6 

7from __future__ import annotations 

8 

9import uuid 

10from collections import OrderedDict 

11from dataclasses import dataclass, field 

12from typing import Any, TYPE_CHECKING 

13 

14from .fields import Field, FieldType 

15 

16if TYPE_CHECKING: 

17 from collections.abc import Iterator 

18 

19 

20@dataclass 

21class Record: 

22 """Represents a structured data record with fields and metadata. 

23 

24 The record ID can be accessed via the `id` property, which: 

25 - Returns the storage_id if set (database-assigned ID) 

26 - Falls back to user-defined 'id' field if present 

27 - Returns None if no ID is available 

28 

29 This separation allows records to have both: 

30 - A user-defined 'id' field as part of their data 

31 - A system-assigned storage_id for database operations 

32 

33 Example: 

34 ```python 

35 from dataknobs_data import Record, Field, FieldType 

36 

37 # Create record from dict 

38 record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"}) 

39 

40 # Access field values 

41 print(record.get_value("name")) # "Alice" 

42 print(record["age"]) # 30 

43 print(record.name) # "Alice" (attribute access) 

44 

45 # Set field values 

46 record.set_value("age", 31) 

47 record["city"] = "New York" 

48 

49 # Work with metadata 

50 record.metadata["source"] = "user_input" 

51 

52 # Convert to dict 

53 data = record.to_dict() # {"name": "Alice", "age": 31, "email": "...", "city": "..."} 

54 ``` 

55 """ 

56 

57 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict) 

58 metadata: dict[str, Any] = field(default_factory=dict) 

59 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id 

60 _storage_id: str | None = field(default=None, repr=False) 

61 

62 def __init__( 

63 self, 

64 data: dict[str, Any] | OrderedDict[str, Field] | None = None, 

65 metadata: dict[str, Any] | None = None, 

66 id: str | None = None, 

67 storage_id: str | None = None, 

68 ): 

69 """Initialize a record from various data formats. 

70 

71 Args: 

72 data: Can be a dict of field names to values, or an OrderedDict of Field objects 

73 metadata: Optional metadata for the record 

74 id: Optional unique identifier for the record (deprecated, use storage_id) 

75 storage_id: Optional storage system identifier for the record 

76 

77 Example: 

78 ```python 

79 # From simple dict 

80 record = Record({"name": "Alice", "age": 30}) 

81 

82 # With metadata 

83 record = Record( 

84 data={"name": "Bob"}, 

85 metadata={"source": "api", "timestamp": "2024-01-01"} 

86 ) 

87 

88 # With storage_id 

89 record = Record( 

90 data={"name": "Charlie"}, 

91 storage_id="550e8400-e29b-41d4-a716-446655440000" 

92 ) 

93 ``` 

94 """ 

95 self.metadata = metadata or {} 

96 self.fields = OrderedDict() 

97 self._id = id # Deprecated 

98 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id 

99 

100 # Process data first to populate fields 

101 if data: 

102 if isinstance(data, OrderedDict) and all( 

103 isinstance(v, Field) for v in data.values() 

104 ): 

105 self.fields = data 

106 else: 

107 for key, value in data.items(): 

108 if isinstance(value, Field): 

109 # Ensure the field has the correct name 

110 if value.name is None or value.name == "embedding": 

111 value.name = key 

112 self.fields[key] = value 

113 else: 

114 self.fields[key] = Field(name=key, value=value) 

115 

116 # Now check for ID from various sources if not explicitly provided 

117 if self._id is None: 

118 # Check metadata 

119 if "id" in self.metadata: 

120 self._id = str(self.metadata["id"]) 

121 # Check fields for id 

122 elif "id" in self.fields: 

123 value = self.get_value("id") 

124 if value is not None: 

125 self._id = str(value) 

126 # Sync to metadata 

127 self.metadata["id"] = self._id 

128 # Check fields for record_id 

129 elif "record_id" in self.fields: 

130 value = self.get_value("record_id") 

131 if value is not None: 

132 self._id = str(value) 

133 # Sync to metadata 

134 self.metadata["id"] = self._id 

135 

136 @property 

137 def storage_id(self) -> str | None: 

138 """Get the storage system ID (database-assigned ID).""" 

139 return self._storage_id 

140 

141 @storage_id.setter 

142 def storage_id(self, value: str | None) -> None: 

143 """Set the storage system ID.""" 

144 self._storage_id = value 

145 # Also update _id for backwards compatibility 

146 self._id = value 

147 

148 @property 

149 def id(self) -> str | None: 

150 """Get the record ID. 

151 

152 Priority order: 

153 1. Storage ID (database-assigned) if set 

154 2. User-defined 'id' field value 

155 3. Metadata 'id' (for backwards compatibility) 

156 4. record_id field (common in DataFrames) 

157 

158 Returns the first ID found, or None if no ID is present. 

159 """ 

160 # 1. Prefer storage ID (database-assigned) 

161 if self._storage_id is not None: 

162 return self._storage_id 

163 

164 # 2. Fall back to legacy _id if set 

165 if self._id is not None: 

166 return self._id 

167 

168 # 3. Check for 'id' field in user data 

169 if "id" in self.fields: 

170 value = self.get_value("id") 

171 if value is not None: 

172 return str(value) 

173 

174 # 4. Check metadata (backwards compatibility) 

175 if "id" in self.metadata: 

176 return str(self.metadata["id"]) 

177 

178 # 5. Check for 'record_id' field (common in DataFrames) 

179 if "record_id" in self.fields: 

180 value = self.get_value("record_id") 

181 if value is not None: 

182 return str(value) 

183 

184 return None 

185 

186 @id.setter 

187 def id(self, value: str | None) -> None: 

188 """Set the record ID. 

189 

190 This sets the storage_id, which is the database-assigned ID. 

191 It does NOT modify user data fields. 

192 """ 

193 self._storage_id = value 

194 self._id = value # Backwards compatibility 

195 

196 # Update metadata for backward compatibility 

197 if value is not None: 

198 self.metadata["id"] = value 

199 elif "id" in self.metadata: 

200 del self.metadata["id"] 

201 

202 def generate_id(self) -> str: 

203 """Generate and set a new UUID for this record. 

204 

205 Returns: 

206 The generated UUID string 

207 """ 

208 new_id = str(uuid.uuid4()) 

209 self.id = new_id 

210 return new_id 

211 

212 def get_user_id(self) -> str | None: 

213 """Get the user-defined ID field value (not the storage ID). 

214  

215 This explicitly returns the value of the 'id' field in the record's data, 

216 ignoring any storage_id that may be set. 

217  

218 Returns: 

219 The value of the 'id' field if present, None otherwise 

220 """ 

221 if "id" in self.fields: 

222 value = self.get_value("id") 

223 if value is not None: 

224 return str(value) 

225 return None 

226 

227 def has_storage_id(self) -> bool: 

228 """Check if this record has a storage system ID assigned. 

229  

230 Returns: 

231 True if storage_id is set, False otherwise 

232 """ 

233 return self._storage_id is not None 

234 

235 def get_field(self, name: str) -> Field | None: 

236 """Get a field by name.""" 

237 return self.fields.get(name) 

238 

239 def get_value(self, name: str, default: Any = None) -> Any: 

240 """Get a field's value by name, supporting dot-notation for nested paths. 

241 

242 Args: 

243 name: Field name or dot-notation path (e.g., "metadata.type") 

244 default: Default value if field not found 

245 

246 Returns: 

247 The field value or default 

248 

249 Example: 

250 ```python 

251 record = Record({ 

252 "name": "Alice", 

253 "config": {"timeout": 30, "retries": 3} 

254 }) 

255 

256 # Simple field access 

257 name = record.get_value("name") # "Alice" 

258 

259 # Nested path access 

260 timeout = record.get_value("config.timeout") # 30 

261 

262 # With default 

263 missing = record.get_value("missing_field", "default") # "default" 

264 ``` 

265 """ 

266 # Check if this is a nested path 

267 if "." in name: 

268 return self.get_nested_value(name, default) 

269 

270 # Simple field lookup 

271 field = self.get_field(name) 

272 return field.value if field else default 

273 

274 def get_nested_value(self, path: str, default: Any = None) -> Any: 

275 """Get a value from a nested path using dot notation. 

276 

277 Supports paths like: 

278 - "metadata.type" - access metadata field (if exists) or metadata dict attribute 

279 - "fields.temperature" - access field values 

280 - "metadata.config.timeout" - nested dict access 

281 

282 Args: 

283 path: Dot-notation path to the value 

284 default: Default value if path not found 

285 

286 Returns: 

287 The value at the path or default 

288 """ 

289 parts = path.split(".", 1) 

290 if len(parts) == 1: 

291 # No more nesting, get the value 

292 return self.get_value(parts[0], default) 

293 

294 root, remaining = parts 

295 

296 # Handle special root paths 

297 if root == "metadata": 

298 # Check if "metadata" is a field first, before falling back to attribute 

299 if root in self.fields: 

300 # It's a field, navigate through its value 

301 field_value = self.get_value(root, None) 

302 if isinstance(field_value, dict): 

303 return self._traverse_dict(field_value, remaining, default) 

304 return default 

305 elif self.metadata: 

306 # Fall back to record's metadata attribute 

307 return self._traverse_dict(self.metadata, remaining, default) 

308 else: 

309 return default 

310 elif root == "fields": 

311 # Get field value by name 

312 if "." in remaining: 

313 # Nested path within field value (if it's a dict) 

314 field_name, field_path = remaining.split(".", 1) 

315 field_value = self.get_value(field_name, None) 

316 if isinstance(field_value, dict): 

317 return self._traverse_dict(field_value, field_path, default) 

318 return default 

319 else: 

320 # Simple field access 

321 return self.get_value(remaining, default) 

322 else: 

323 # Check if it's a field containing a dict 

324 field_value = self.get_value(root, None) 

325 if isinstance(field_value, dict): 

326 return self._traverse_dict(field_value, remaining, default) 

327 return default 

328 

329 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any: 

330 """Traverse a dictionary using dot notation. 

331 

332 Args: 

333 data: Dictionary to traverse 

334 path: Dot-notation path 

335 default: Default value if path not found 

336 

337 Returns: 

338 Value at path or default 

339 """ 

340 parts = path.split(".") 

341 current = data 

342 

343 for part in parts: 

344 if isinstance(current, dict) and part in current: 

345 current = current[part] 

346 else: 

347 return default 

348 

349 return current 

350 

351 def set_field( 

352 self, 

353 name: str, 

354 value: Any, 

355 field_type: FieldType | None = None, 

356 field_metadata: dict[str, Any] | None = None, 

357 ) -> None: 

358 """Set or update a field.""" 

359 self.fields[name] = Field( 

360 name=name, value=value, type=field_type, metadata=field_metadata or {} 

361 ) 

362 

363 def set_value(self, name: str, value: Any) -> None: 

364 """Set a field's value by name. 

365  

366 Convenience method that creates the field if it doesn't exist. 

367 """ 

368 if name in self.fields: 

369 self.fields[name].value = value 

370 else: 

371 self.set_field(name, value) 

372 

373 @property 

374 def data(self) -> dict[str, Any]: 

375 """Get all field values as a dictionary. 

376  

377 Provides a simple dict-like view of the record's data. 

378 """ 

379 return {name: field.value for name, field in self.fields.items()} 

380 

381 def remove_field(self, name: str) -> bool: 

382 """Remove a field by name. Returns True if field was removed.""" 

383 if name in self.fields: 

384 del self.fields[name] 

385 return True 

386 return False 

387 

388 def has_field(self, name: str) -> bool: 

389 """Check if a field exists.""" 

390 return name in self.fields 

391 

392 def field_names(self) -> list[str]: 

393 """Get list of field names.""" 

394 return list(self.fields.keys()) 

395 

396 def field_count(self) -> int: 

397 """Get the number of fields.""" 

398 return len(self.fields) 

399 

400 def __getitem__(self, key: str | int) -> Any: 

401 """Get field value by name or field by index. 

402 

403 For string keys, returns the field value directly (dict-like access). 

404 For integer keys, returns the Field object at that index for backward compatibility. 

405 """ 

406 if isinstance(key, str): 

407 if key not in self.fields: 

408 raise KeyError(f"Field '{key}' not found") 

409 return self.fields[key].value 

410 elif isinstance(key, int): 

411 field_list = list(self.fields.values()) 

412 if key < 0 or key >= len(field_list): 

413 raise IndexError(f"Field index {key} out of range") 

414 return field_list[key] 

415 else: 

416 raise TypeError(f"Key must be str or int, got {type(key)}") 

417 

418 def __setitem__(self, key: str, value: Field | Any) -> None: 

419 """Set field by name. 

420 

421 Can accept either a Field object or a raw value. 

422 When given a raw value, creates a new Field automatically. 

423 """ 

424 if isinstance(value, Field): 

425 self.fields[key] = value 

426 else: 

427 self.set_field(key, value) 

428 

429 def __delitem__(self, key: str) -> None: 

430 """Delete field by name.""" 

431 if key not in self.fields: 

432 raise KeyError(f"Field '{key}' not found") 

433 del self.fields[key] 

434 

435 def __contains__(self, key: str) -> bool: 

436 """Check if field exists.""" 

437 return key in self.fields 

438 

439 def __iter__(self) -> Iterator[str]: 

440 """Iterate over field names.""" 

441 return iter(self.fields) 

442 

443 def __len__(self) -> int: 

444 """Get number of fields.""" 

445 return len(self.fields) 

446 

447 def validate(self) -> bool: 

448 """Validate all fields in the record.""" 

449 return all(field.validate() for field in self.fields.values()) 

450 

451 def get_field_object(self, key: str) -> Field: 

452 """Get the Field object by name. 

453 

454 Use this method when you need access to the Field object itself, 

455 not just its value. 

456 

457 Args: 

458 key: Field name 

459 

460 Returns: 

461 The Field object 

462 

463 Raises: 

464 KeyError: If field not found 

465 """ 

466 if key not in self.fields: 

467 raise KeyError(f"Field '{key}' not found") 

468 return self.fields[key] 

469 

470 def __getattr__(self, name: str) -> Any: 

471 """Get field value by attribute access. 

472 

473 Provides convenient attribute-style access to field values. 

474 Falls back to normal attribute access for non-field attributes. 

475 

476 Args: 

477 name: Attribute/field name 

478 

479 Returns: 

480 Field value if field exists, otherwise raises AttributeError 

481 """ 

482 # Avoid infinite recursion for special attributes 

483 if name.startswith("_") or name in ("fields", "metadata", "id"): 

484 raise AttributeError( 

485 f"'{type(self).__name__}' object has no attribute '{name}'" 

486 ) 

487 

488 # Check if it's a field 

489 if hasattr(self, "fields") and name in self.fields: 

490 return self.fields[name].value 

491 

492 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'") 

493 

494 def __setattr__(self, name: str, value: Any) -> None: 

495 """Set field value by attribute access. 

496 

497 Allows setting field values using attribute syntax. 

498 Special attributes (fields, metadata, _id, _storage_id) are handled normally. 

499 Properties (id, storage_id) are also handled specially. 

500 

501 Args: 

502 name: Attribute/field name 

503 value: Value to set 

504 """ 

505 # Handle special attributes and private attributes normally 

506 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"): 

507 super().__setattr__(name, value) 

508 # Handle properties that have setters 

509 elif name in ("id", "storage_id"): 

510 # Use the property setter 

511 object.__setattr__(self, name, value) 

512 elif hasattr(self, "fields") and name in self.fields: 

513 # Update existing field value 

514 self.fields[name].value = value 

515 else: 

516 # For new fields during normal operation, create them 

517 # But during __init__, we need to use normal attribute setting 

518 if hasattr(self, "fields"): 

519 self.set_field(name, value) 

520 else: 

521 super().__setattr__(name, value) 

522 

523 def to_dict( 

524 self, 

525 include_metadata: bool = False, 

526 flatten: bool = True, 

527 include_field_objects: bool = True, 

528 ) -> dict[str, Any]: 

529 """Convert record to dictionary. 

530 

531 Args: 

532 include_metadata: Whether to include metadata in the output 

533 flatten: If True (default), return just field values; if False, return structured format 

534 include_field_objects: If True and not flattened, return full Field objects 

535 

536 Returns: 

537 Dictionary representation of the record 

538 """ 

539 if flatten: 

540 # Simple dict with just values (default behavior for ergonomics) 

541 result = {} 

542 for name, field in self.fields.items(): 

543 # Handle VectorField specially to ensure JSON serialization 

544 if hasattr(field, 'to_list') and callable(field.to_list): 

545 # VectorField has a to_list() method for serialization 

546 result[name] = field.to_list() 

547 else: 

548 result[name] = field.value 

549 if self.id: 

550 result["_id"] = self.id 

551 if include_metadata and self.metadata: 

552 result["_metadata"] = self.metadata 

553 else: 

554 # Structured format for serialization 

555 if include_field_objects: 

556 result = { 

557 "fields": { 

558 name: field.to_dict() for name, field in self.fields.items() 

559 } 

560 } 

561 else: 

562 result = { 

563 "fields": {name: field.value for name, field in self.fields.items()} 

564 } 

565 if self.id: 

566 result["id"] = self.id 

567 if include_metadata: 

568 result["metadata"] = self.metadata 

569 return result 

570 

571 @classmethod 

572 def from_dict(cls, data: dict[str, Any]) -> Record: 

573 """Create a record from a dictionary representation. 

574 

575 Args: 

576 data: Dictionary containing record data 

577 

578 Returns: 

579 A new Record instance 

580 

581 Example: 

582 ```python 

583 # From simple dict 

584 data = {"name": "Alice", "age": 30} 

585 record = Record.from_dict(data) 

586 

587 # From structured format 

588 data = { 

589 "fields": { 

590 "name": {"value": "Alice", "type": "string"}, 

591 "age": {"value": 30, "type": "integer"} 

592 }, 

593 "metadata": {"source": "api"} 

594 } 

595 record = Record.from_dict(data) 

596 ``` 

597 """ 

598 if "fields" in data: 

599 fields = OrderedDict() 

600 for name, field_data in data["fields"].items(): 

601 if isinstance(field_data, dict) and "value" in field_data: 

602 # Add name to field_data for Field.from_dict 

603 field_data_with_name = {"name": name, **field_data} 

604 fields[name] = Field.from_dict(field_data_with_name) 

605 else: 

606 fields[name] = Field(name=name, value=field_data) 

607 metadata = data.get("metadata", {}) 

608 record_id = data.get("id") or data.get("_id") 

609 return cls(data=fields, metadata=metadata, id=record_id) 

610 else: 

611 # Check for _id in flattened format 

612 record_id = data.pop("_id", None) if "_id" in data else None 

613 return cls(data=data, id=record_id) 

614 

615 def copy(self, deep: bool = True) -> Record: 

616 """Create a copy of the record. 

617 

618 Args: 

619 deep: If True, create deep copies of fields and metadata 

620 """ 

621 if deep: 

622 import copy 

623 

624 new_fields = OrderedDict() 

625 for name, field in self.fields.items(): 

626 # Preserve the actual field type (Field or VectorField) 

627 if hasattr(field, '__class__'): 

628 # Use the actual class of the field 

629 field_class = field.__class__ 

630 if field_class.__name__ == 'VectorField': 

631 # Import VectorField if needed 

632 from dataknobs_data.fields import VectorField 

633 new_fields[name] = VectorField( 

634 name=field.name, 

635 value=copy.deepcopy(field.value), 

636 dimensions=getattr(field, 'dimensions', None), 

637 source_field=getattr(field, 'source_field', None), 

638 model_name=getattr(field, 'model_name', None), 

639 model_version=getattr(field, 'model_version', None), 

640 metadata=copy.deepcopy(field.metadata), 

641 ) 

642 else: 

643 new_fields[name] = Field( 

644 name=field.name, 

645 value=copy.deepcopy(field.value), 

646 type=field.type, 

647 metadata=copy.deepcopy(field.metadata), 

648 ) 

649 else: 

650 # Fallback to regular Field 

651 new_fields[name] = Field( 

652 name=field.name, 

653 value=copy.deepcopy(field.value), 

654 type=field.type, 

655 metadata=copy.deepcopy(field.metadata), 

656 ) 

657 new_metadata = copy.deepcopy(self.metadata) 

658 else: 

659 new_fields = OrderedDict(self.fields) # type: ignore[arg-type] 

660 new_metadata = self.metadata.copy() 

661 

662 return Record(data=new_fields, metadata=new_metadata, id=self.id) 

663 

664 def project(self, field_names: list[str]) -> Record: 

665 """Create a new record with only specified fields. 

666 

667 Args: 

668 field_names: List of field names to include in the projection 

669 

670 Returns: 

671 A new Record containing only the specified fields 

672 

673 Example: 

674 ```python 

675 record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"}) 

676 

677 # Project to specific fields 

678 subset = record.project(["name", "age"]) 

679 print(subset.field_names()) # ["name", "age"] 

680 ``` 

681 """ 

682 projected_fields = OrderedDict() 

683 for name in field_names: 

684 if name in self.fields: 

685 projected_fields[name] = self.fields[name] 

686 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id) 

687 

688 def merge(self, other: Record, overwrite: bool = True) -> Record: 

689 """Merge another record into this one. 

690 

691 Args: 

692 other: The record to merge 

693 overwrite: If True, overwrite existing fields; if False, keep existing 

694 

695 Returns: 

696 A new merged record 

697 """ 

698 merged_fields = OrderedDict(self.fields) 

699 for name, field_obj in other.fields.items(): 

700 if overwrite or name not in merged_fields: 

701 merged_fields[name] = field_obj 

702 

703 merged_metadata = self.metadata.copy() 

704 if overwrite: 

705 merged_metadata.update(other.metadata) 

706 

707 # Use the ID from this record, or from other if this doesn't have one 

708 merged_id = self.id if self.id else other.id 

709 

710 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)