Coverage for src/dataknobs_data/records.py: 40%

256 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:15 -0600

1from __future__ import annotations 

2 

3import uuid 

4from collections import OrderedDict 

5from dataclasses import dataclass, field 

6from typing import Any, TYPE_CHECKING 

7 

8from .fields import Field, FieldType 

9 

10if TYPE_CHECKING: 

11 from collections.abc import Iterator 

12 

13 

14@dataclass 

15class Record: 

16 """Represents a structured data record with fields and metadata. 

17 

18 The record ID can be accessed via the `id` property, which: 

19 - Returns the storage_id if set (database-assigned ID) 

20 - Falls back to user-defined 'id' field if present 

21 - Returns None if no ID is available 

22  

23 This separation allows records to have both: 

24 - A user-defined 'id' field as part of their data 

25 - A system-assigned storage_id for database operations 

26 """ 

27 

28 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict) 

29 metadata: dict[str, Any] = field(default_factory=dict) 

30 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id 

31 _storage_id: str | None = field(default=None, repr=False) 

32 

33 def __init__( 

34 self, 

35 data: dict[str, Any] | OrderedDict[str, Field] | None = None, 

36 metadata: dict[str, Any] | None = None, 

37 id: str | None = None, 

38 storage_id: str | None = None, 

39 ): 

40 """Initialize a record from various data formats. 

41 

42 Args: 

43 data: Can be a dict of field names to values, or an OrderedDict of Field objects 

44 metadata: Optional metadata for the record 

45 id: Optional unique identifier for the record (deprecated, use storage_id) 

46 storage_id: Optional storage system identifier for the record 

47 """ 

48 self.metadata = metadata or {} 

49 self.fields = OrderedDict() 

50 self._id = id # Deprecated 

51 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id 

52 

53 # Process data first to populate fields 

54 if data: 

55 if isinstance(data, OrderedDict) and all( 

56 isinstance(v, Field) for v in data.values() 

57 ): 

58 self.fields = data 

59 else: 

60 for key, value in data.items(): 

61 if isinstance(value, Field): 

62 # Ensure the field has the correct name 

63 if value.name is None or value.name == "embedding": 

64 value.name = key 

65 self.fields[key] = value 

66 else: 

67 self.fields[key] = Field(name=key, value=value) 

68 

69 # Now check for ID from various sources if not explicitly provided 

70 if self._id is None: 

71 # Check metadata 

72 if "id" in self.metadata: 

73 self._id = str(self.metadata["id"]) 

74 # Check fields for id 

75 elif "id" in self.fields: 

76 value = self.get_value("id") 

77 if value is not None: 

78 self._id = str(value) 

79 # Sync to metadata 

80 self.metadata["id"] = self._id 

81 # Check fields for record_id 

82 elif "record_id" in self.fields: 

83 value = self.get_value("record_id") 

84 if value is not None: 

85 self._id = str(value) 

86 # Sync to metadata 

87 self.metadata["id"] = self._id 

88 

89 @property 

90 def storage_id(self) -> str | None: 

91 """Get the storage system ID (database-assigned ID).""" 

92 return self._storage_id 

93 

94 @storage_id.setter 

95 def storage_id(self, value: str | None) -> None: 

96 """Set the storage system ID.""" 

97 self._storage_id = value 

98 # Also update _id for backwards compatibility 

99 self._id = value 

100 

101 @property 

102 def id(self) -> str | None: 

103 """Get the record ID. 

104 

105 Priority order: 

106 1. Storage ID (database-assigned) if set 

107 2. User-defined 'id' field value 

108 3. Metadata 'id' (for backwards compatibility) 

109 4. record_id field (common in DataFrames) 

110 

111 Returns the first ID found, or None if no ID is present. 

112 """ 

113 # 1. Prefer storage ID (database-assigned) 

114 if self._storage_id is not None: 

115 return self._storage_id 

116 

117 # 2. Fall back to legacy _id if set 

118 if self._id is not None: 

119 return self._id 

120 

121 # 3. Check for 'id' field in user data 

122 if "id" in self.fields: 

123 value = self.get_value("id") 

124 if value is not None: 

125 return str(value) 

126 

127 # 4. Check metadata (backwards compatibility) 

128 if "id" in self.metadata: 

129 return str(self.metadata["id"]) 

130 

131 # 5. Check for 'record_id' field (common in DataFrames) 

132 if "record_id" in self.fields: 

133 value = self.get_value("record_id") 

134 if value is not None: 

135 return str(value) 

136 

137 return None 

138 

139 @id.setter 

140 def id(self, value: str | None) -> None: 

141 """Set the record ID. 

142 

143 This sets the storage_id, which is the database-assigned ID. 

144 It does NOT modify user data fields. 

145 """ 

146 self._storage_id = value 

147 self._id = value # Backwards compatibility 

148 

149 # Update metadata for backward compatibility 

150 if value is not None: 

151 self.metadata["id"] = value 

152 elif "id" in self.metadata: 

153 del self.metadata["id"] 

154 

155 def generate_id(self) -> str: 

156 """Generate and set a new UUID for this record. 

157 

158 Returns: 

159 The generated UUID string 

160 """ 

161 new_id = str(uuid.uuid4()) 

162 self.id = new_id 

163 return new_id 

164 

165 def get_user_id(self) -> str | None: 

166 """Get the user-defined ID field value (not the storage ID). 

167  

168 This explicitly returns the value of the 'id' field in the record's data, 

169 ignoring any storage_id that may be set. 

170  

171 Returns: 

172 The value of the 'id' field if present, None otherwise 

173 """ 

174 if "id" in self.fields: 

175 value = self.get_value("id") 

176 if value is not None: 

177 return str(value) 

178 return None 

179 

180 def has_storage_id(self) -> bool: 

181 """Check if this record has a storage system ID assigned. 

182  

183 Returns: 

184 True if storage_id is set, False otherwise 

185 """ 

186 return self._storage_id is not None 

187 

188 def get_field(self, name: str) -> Field | None: 

189 """Get a field by name.""" 

190 return self.fields.get(name) 

191 

192 def get_value(self, name: str, default: Any = None) -> Any: 

193 """Get a field's value by name, supporting dot-notation for nested paths. 

194 

195 Args: 

196 name: Field name or dot-notation path (e.g., "metadata.type") 

197 default: Default value if field not found 

198 

199 Returns: 

200 The field value or default 

201 """ 

202 # Check if this is a nested path 

203 if "." in name: 

204 return self.get_nested_value(name, default) 

205 

206 # Simple field lookup 

207 field = self.get_field(name) 

208 return field.value if field else default 

209 

210 def get_nested_value(self, path: str, default: Any = None) -> Any: 

211 """Get a value from a nested path using dot notation. 

212 

213 Supports paths like: 

214 - "metadata.type" - access metadata field (if exists) or metadata dict attribute 

215 - "fields.temperature" - access field values 

216 - "metadata.config.timeout" - nested dict access 

217 

218 Args: 

219 path: Dot-notation path to the value 

220 default: Default value if path not found 

221 

222 Returns: 

223 The value at the path or default 

224 """ 

225 parts = path.split(".", 1) 

226 if len(parts) == 1: 

227 # No more nesting, get the value 

228 return self.get_value(parts[0], default) 

229 

230 root, remaining = parts 

231 

232 # Handle special root paths 

233 if root == "metadata": 

234 # Check if "metadata" is a field first, before falling back to attribute 

235 if root in self.fields: 

236 # It's a field, navigate through its value 

237 field_value = self.get_value(root, None) 

238 if isinstance(field_value, dict): 

239 return self._traverse_dict(field_value, remaining, default) 

240 return default 

241 elif self.metadata: 

242 # Fall back to record's metadata attribute 

243 return self._traverse_dict(self.metadata, remaining, default) 

244 else: 

245 return default 

246 elif root == "fields": 

247 # Get field value by name 

248 if "." in remaining: 

249 # Nested path within field value (if it's a dict) 

250 field_name, field_path = remaining.split(".", 1) 

251 field_value = self.get_value(field_name, None) 

252 if isinstance(field_value, dict): 

253 return self._traverse_dict(field_value, field_path, default) 

254 return default 

255 else: 

256 # Simple field access 

257 return self.get_value(remaining, default) 

258 else: 

259 # Check if it's a field containing a dict 

260 field_value = self.get_value(root, None) 

261 if isinstance(field_value, dict): 

262 return self._traverse_dict(field_value, remaining, default) 

263 return default 

264 

265 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any: 

266 """Traverse a dictionary using dot notation. 

267 

268 Args: 

269 data: Dictionary to traverse 

270 path: Dot-notation path 

271 default: Default value if path not found 

272 

273 Returns: 

274 Value at path or default 

275 """ 

276 parts = path.split(".") 

277 current = data 

278 

279 for part in parts: 

280 if isinstance(current, dict) and part in current: 

281 current = current[part] 

282 else: 

283 return default 

284 

285 return current 

286 

287 def set_field( 

288 self, 

289 name: str, 

290 value: Any, 

291 field_type: FieldType | None = None, 

292 field_metadata: dict[str, Any] | None = None, 

293 ) -> None: 

294 """Set or update a field.""" 

295 self.fields[name] = Field( 

296 name=name, value=value, type=field_type, metadata=field_metadata or {} 

297 ) 

298 

299 def set_value(self, name: str, value: Any) -> None: 

300 """Set a field's value by name. 

301  

302 Convenience method that creates the field if it doesn't exist. 

303 """ 

304 if name in self.fields: 

305 self.fields[name].value = value 

306 else: 

307 self.set_field(name, value) 

308 

309 @property 

310 def data(self) -> dict[str, Any]: 

311 """Get all field values as a dictionary. 

312  

313 Provides a simple dict-like view of the record's data. 

314 """ 

315 return {name: field.value for name, field in self.fields.items()} 

316 

317 def remove_field(self, name: str) -> bool: 

318 """Remove a field by name. Returns True if field was removed.""" 

319 if name in self.fields: 

320 del self.fields[name] 

321 return True 

322 return False 

323 

324 def has_field(self, name: str) -> bool: 

325 """Check if a field exists.""" 

326 return name in self.fields 

327 

328 def field_names(self) -> list[str]: 

329 """Get list of field names.""" 

330 return list(self.fields.keys()) 

331 

332 def field_count(self) -> int: 

333 """Get the number of fields.""" 

334 return len(self.fields) 

335 

336 def __getitem__(self, key: str | int) -> Any: 

337 """Get field value by name or field by index. 

338 

339 For string keys, returns the field value directly (dict-like access). 

340 For integer keys, returns the Field object at that index for backward compatibility. 

341 """ 

342 if isinstance(key, str): 

343 if key not in self.fields: 

344 raise KeyError(f"Field '{key}' not found") 

345 return self.fields[key].value 

346 elif isinstance(key, int): 

347 field_list = list(self.fields.values()) 

348 if key < 0 or key >= len(field_list): 

349 raise IndexError(f"Field index {key} out of range") 

350 return field_list[key] 

351 else: 

352 raise TypeError(f"Key must be str or int, got {type(key)}") 

353 

354 def __setitem__(self, key: str, value: Field | Any) -> None: 

355 """Set field by name. 

356 

357 Can accept either a Field object or a raw value. 

358 When given a raw value, creates a new Field automatically. 

359 """ 

360 if isinstance(value, Field): 

361 self.fields[key] = value 

362 else: 

363 self.set_field(key, value) 

364 

365 def __delitem__(self, key: str) -> None: 

366 """Delete field by name.""" 

367 if key not in self.fields: 

368 raise KeyError(f"Field '{key}' not found") 

369 del self.fields[key] 

370 

371 def __contains__(self, key: str) -> bool: 

372 """Check if field exists.""" 

373 return key in self.fields 

374 

375 def __iter__(self) -> Iterator[str]: 

376 """Iterate over field names.""" 

377 return iter(self.fields) 

378 

379 def __len__(self) -> int: 

380 """Get number of fields.""" 

381 return len(self.fields) 

382 

383 def validate(self) -> bool: 

384 """Validate all fields in the record.""" 

385 return all(field.validate() for field in self.fields.values()) 

386 

387 def get_field_object(self, key: str) -> Field: 

388 """Get the Field object by name. 

389 

390 Use this method when you need access to the Field object itself, 

391 not just its value. 

392 

393 Args: 

394 key: Field name 

395 

396 Returns: 

397 The Field object 

398 

399 Raises: 

400 KeyError: If field not found 

401 """ 

402 if key not in self.fields: 

403 raise KeyError(f"Field '{key}' not found") 

404 return self.fields[key] 

405 

406 def __getattr__(self, name: str) -> Any: 

407 """Get field value by attribute access. 

408 

409 Provides convenient attribute-style access to field values. 

410 Falls back to normal attribute access for non-field attributes. 

411 

412 Args: 

413 name: Attribute/field name 

414 

415 Returns: 

416 Field value if field exists, otherwise raises AttributeError 

417 """ 

418 # Avoid infinite recursion for special attributes 

419 if name.startswith("_") or name in ("fields", "metadata", "id"): 

420 raise AttributeError( 

421 f"'{type(self).__name__}' object has no attribute '{name}'" 

422 ) 

423 

424 # Check if it's a field 

425 if hasattr(self, "fields") and name in self.fields: 

426 return self.fields[name].value 

427 

428 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'") 

429 

430 def __setattr__(self, name: str, value: Any) -> None: 

431 """Set field value by attribute access. 

432 

433 Allows setting field values using attribute syntax. 

434 Special attributes (fields, metadata, _id, _storage_id) are handled normally. 

435 Properties (id, storage_id) are also handled specially. 

436 

437 Args: 

438 name: Attribute/field name 

439 value: Value to set 

440 """ 

441 # Handle special attributes and private attributes normally 

442 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"): 

443 super().__setattr__(name, value) 

444 # Handle properties that have setters 

445 elif name in ("id", "storage_id"): 

446 # Use the property setter 

447 object.__setattr__(self, name, value) 

448 elif hasattr(self, "fields") and name in self.fields: 

449 # Update existing field value 

450 self.fields[name].value = value 

451 else: 

452 # For new fields during normal operation, create them 

453 # But during __init__, we need to use normal attribute setting 

454 if hasattr(self, "fields"): 

455 self.set_field(name, value) 

456 else: 

457 super().__setattr__(name, value) 

458 

459 def to_dict( 

460 self, 

461 include_metadata: bool = False, 

462 flatten: bool = True, 

463 include_field_objects: bool = True, 

464 ) -> dict[str, Any]: 

465 """Convert record to dictionary. 

466 

467 Args: 

468 include_metadata: Whether to include metadata in the output 

469 flatten: If True (default), return just field values; if False, return structured format 

470 include_field_objects: If True and not flattened, return full Field objects 

471 

472 Returns: 

473 Dictionary representation of the record 

474 """ 

475 if flatten: 

476 # Simple dict with just values (default behavior for ergonomics) 

477 result = {} 

478 for name, field in self.fields.items(): 

479 # Handle VectorField specially to ensure JSON serialization 

480 if hasattr(field, 'to_list') and callable(field.to_list): 

481 # VectorField has a to_list() method for serialization 

482 result[name] = field.to_list() 

483 else: 

484 result[name] = field.value 

485 if self.id: 

486 result["_id"] = self.id 

487 if include_metadata and self.metadata: 

488 result["_metadata"] = self.metadata 

489 else: 

490 # Structured format for serialization 

491 if include_field_objects: 

492 result = { 

493 "fields": { 

494 name: field.to_dict() for name, field in self.fields.items() 

495 } 

496 } 

497 else: 

498 result = { 

499 "fields": {name: field.value for name, field in self.fields.items()} 

500 } 

501 if self.id: 

502 result["id"] = self.id 

503 if include_metadata: 

504 result["metadata"] = self.metadata 

505 return result 

506 

507 @classmethod 

508 def from_dict(cls, data: dict[str, Any]) -> Record: 

509 """Create a record from a dictionary representation.""" 

510 if "fields" in data: 

511 fields = OrderedDict() 

512 for name, field_data in data["fields"].items(): 

513 if isinstance(field_data, dict) and "value" in field_data: 

514 # Add name to field_data for Field.from_dict 

515 field_data_with_name = {"name": name, **field_data} 

516 fields[name] = Field.from_dict(field_data_with_name) 

517 else: 

518 fields[name] = Field(name=name, value=field_data) 

519 metadata = data.get("metadata", {}) 

520 record_id = data.get("id") or data.get("_id") 

521 return cls(data=fields, metadata=metadata, id=record_id) 

522 else: 

523 # Check for _id in flattened format 

524 record_id = data.pop("_id", None) if "_id" in data else None 

525 return cls(data=data, id=record_id) 

526 

527 def copy(self, deep: bool = True) -> Record: 

528 """Create a copy of the record. 

529 

530 Args: 

531 deep: If True, create deep copies of fields and metadata 

532 """ 

533 if deep: 

534 import copy 

535 

536 new_fields = OrderedDict() 

537 for name, field in self.fields.items(): 

538 # Preserve the actual field type (Field or VectorField) 

539 if hasattr(field, '__class__'): 

540 # Use the actual class of the field 

541 field_class = field.__class__ 

542 if field_class.__name__ == 'VectorField': 

543 # Import VectorField if needed 

544 from dataknobs_data.fields import VectorField 

545 new_fields[name] = VectorField( 

546 name=field.name, 

547 value=copy.deepcopy(field.value), 

548 dimensions=getattr(field, 'dimensions', None), 

549 source_field=getattr(field, 'source_field', None), 

550 model_name=getattr(field, 'model_name', None), 

551 model_version=getattr(field, 'model_version', None), 

552 metadata=copy.deepcopy(field.metadata), 

553 ) 

554 else: 

555 new_fields[name] = Field( 

556 name=field.name, 

557 value=copy.deepcopy(field.value), 

558 type=field.type, 

559 metadata=copy.deepcopy(field.metadata), 

560 ) 

561 else: 

562 # Fallback to regular Field 

563 new_fields[name] = Field( 

564 name=field.name, 

565 value=copy.deepcopy(field.value), 

566 type=field.type, 

567 metadata=copy.deepcopy(field.metadata), 

568 ) 

569 new_metadata = copy.deepcopy(self.metadata) 

570 else: 

571 new_fields = OrderedDict(self.fields) # type: ignore[arg-type] 

572 new_metadata = self.metadata.copy() 

573 

574 return Record(data=new_fields, metadata=new_metadata, id=self.id) 

575 

576 def project(self, field_names: list[str]) -> Record: 

577 """Create a new record with only specified fields.""" 

578 projected_fields = OrderedDict() 

579 for name in field_names: 

580 if name in self.fields: 

581 projected_fields[name] = self.fields[name] 

582 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id) 

583 

584 def merge(self, other: Record, overwrite: bool = True) -> Record: 

585 """Merge another record into this one. 

586 

587 Args: 

588 other: The record to merge 

589 overwrite: If True, overwrite existing fields; if False, keep existing 

590 

591 Returns: 

592 A new merged record 

593 """ 

594 merged_fields = OrderedDict(self.fields) 

595 for name, field_obj in other.fields.items(): 

596 if overwrite or name not in merged_fields: 

597 merged_fields[name] = field_obj 

598 

599 merged_metadata = self.metadata.copy() 

600 if overwrite: 

601 merged_metadata.update(other.metadata) 

602 

603 # Use the ID from this record, or from other if this doesn't have one 

604 merged_id = self.id if self.id else other.id 

605 

606 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)