Coverage for src/dataknobs_data/records.py: 37%

251 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-31 15:06 -0600

1from __future__ import annotations 

2 

3import uuid 

4from collections import OrderedDict 

5from dataclasses import dataclass, field 

6from typing import Any, TYPE_CHECKING 

7 

8from .fields import Field, FieldType 

9 

10if TYPE_CHECKING: 

11 from collections.abc import Iterator 

12 

13 

14@dataclass 

15class Record: 

16 """Represents a structured data record with fields and metadata. 

17 

18 The record ID can be accessed via the `id` property, which: 

19 - Returns the storage_id if set (database-assigned ID) 

20 - Falls back to user-defined 'id' field if present 

21 - Returns None if no ID is available 

22  

23 This separation allows records to have both: 

24 - A user-defined 'id' field as part of their data 

25 - A system-assigned storage_id for database operations 

26 """ 

27 

28 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict) 

29 metadata: dict[str, Any] = field(default_factory=dict) 

30 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id 

31 _storage_id: str | None = field(default=None, repr=False) 

32 

33 def __init__( 

34 self, 

35 data: dict[str, Any] | OrderedDict[str, Field] | None = None, 

36 metadata: dict[str, Any] | None = None, 

37 id: str | None = None, 

38 storage_id: str | None = None, 

39 ): 

40 """Initialize a record from various data formats. 

41 

42 Args: 

43 data: Can be a dict of field names to values, or an OrderedDict of Field objects 

44 metadata: Optional metadata for the record 

45 id: Optional unique identifier for the record (deprecated, use storage_id) 

46 storage_id: Optional storage system identifier for the record 

47 """ 

48 self.metadata = metadata or {} 

49 self.fields = OrderedDict() 

50 self._id = id # Deprecated 

51 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id 

52 

53 # Process data first to populate fields 

54 if data: 

55 if isinstance(data, OrderedDict) and all( 

56 isinstance(v, Field) for v in data.values() 

57 ): 

58 self.fields = data 

59 else: 

60 for key, value in data.items(): 

61 if isinstance(value, Field): 

62 # Ensure the field has the correct name 

63 if value.name is None or value.name == "embedding": 

64 value.name = key 

65 self.fields[key] = value 

66 else: 

67 self.fields[key] = Field(name=key, value=value) 

68 

69 # Now check for ID from various sources if not explicitly provided 

70 if self._id is None: 

71 # Check metadata 

72 if "id" in self.metadata: 

73 self._id = str(self.metadata["id"]) 

74 # Check fields for id 

75 elif "id" in self.fields: 

76 value = self.get_value("id") 

77 if value is not None: 

78 self._id = str(value) 

79 # Sync to metadata 

80 self.metadata["id"] = self._id 

81 # Check fields for record_id 

82 elif "record_id" in self.fields: 

83 value = self.get_value("record_id") 

84 if value is not None: 

85 self._id = str(value) 

86 # Sync to metadata 

87 self.metadata["id"] = self._id 

88 

89 @property 

90 def storage_id(self) -> str | None: 

91 """Get the storage system ID (database-assigned ID).""" 

92 return self._storage_id 

93 

94 @storage_id.setter 

95 def storage_id(self, value: str | None) -> None: 

96 """Set the storage system ID.""" 

97 self._storage_id = value 

98 # Also update _id for backwards compatibility 

99 self._id = value 

100 

101 @property 

102 def id(self) -> str | None: 

103 """Get the record ID. 

104 

105 Priority order: 

106 1. Storage ID (database-assigned) if set 

107 2. User-defined 'id' field value 

108 3. Metadata 'id' (for backwards compatibility) 

109 4. record_id field (common in DataFrames) 

110 

111 Returns the first ID found, or None if no ID is present. 

112 """ 

113 # 1. Prefer storage ID (database-assigned) 

114 if self._storage_id is not None: 

115 return self._storage_id 

116 

117 # 2. Fall back to legacy _id if set 

118 if self._id is not None: 

119 return self._id 

120 

121 # 3. Check for 'id' field in user data 

122 if "id" in self.fields: 

123 value = self.get_value("id") 

124 if value is not None: 

125 return str(value) 

126 

127 # 4. Check metadata (backwards compatibility) 

128 if "id" in self.metadata: 

129 return str(self.metadata["id"]) 

130 

131 # 5. Check for 'record_id' field (common in DataFrames) 

132 if "record_id" in self.fields: 

133 value = self.get_value("record_id") 

134 if value is not None: 

135 return str(value) 

136 

137 return None 

138 

139 @id.setter 

140 def id(self, value: str | None) -> None: 

141 """Set the record ID. 

142 

143 This sets the storage_id, which is the database-assigned ID. 

144 It does NOT modify user data fields. 

145 """ 

146 self._storage_id = value 

147 self._id = value # Backwards compatibility 

148 

149 # Update metadata for backward compatibility 

150 if value is not None: 

151 self.metadata["id"] = value 

152 elif "id" in self.metadata: 

153 del self.metadata["id"] 

154 

155 def generate_id(self) -> str: 

156 """Generate and set a new UUID for this record. 

157 

158 Returns: 

159 The generated UUID string 

160 """ 

161 new_id = str(uuid.uuid4()) 

162 self.id = new_id 

163 return new_id 

164 

165 def get_user_id(self) -> str | None: 

166 """Get the user-defined ID field value (not the storage ID). 

167  

168 This explicitly returns the value of the 'id' field in the record's data, 

169 ignoring any storage_id that may be set. 

170  

171 Returns: 

172 The value of the 'id' field if present, None otherwise 

173 """ 

174 if "id" in self.fields: 

175 value = self.get_value("id") 

176 if value is not None: 

177 return str(value) 

178 return None 

179 

180 def has_storage_id(self) -> bool: 

181 """Check if this record has a storage system ID assigned. 

182  

183 Returns: 

184 True if storage_id is set, False otherwise 

185 """ 

186 return self._storage_id is not None 

187 

188 def get_field(self, name: str) -> Field | None: 

189 """Get a field by name.""" 

190 return self.fields.get(name) 

191 

192 def get_value(self, name: str, default: Any = None) -> Any: 

193 """Get a field's value by name, supporting dot-notation for nested paths. 

194 

195 Args: 

196 name: Field name or dot-notation path (e.g., "metadata.type") 

197 default: Default value if field not found 

198 

199 Returns: 

200 The field value or default 

201 """ 

202 # Check if this is a nested path 

203 if "." in name: 

204 return self.get_nested_value(name, default) 

205 

206 # Simple field lookup 

207 field = self.get_field(name) 

208 return field.value if field else default 

209 

210 def get_nested_value(self, path: str, default: Any = None) -> Any: 

211 """Get a value from a nested path using dot notation. 

212 

213 Supports paths like: 

214 - "metadata.type" - access metadata dict 

215 - "fields.temperature" - access field values 

216 - "metadata.config.timeout" - nested dict access 

217 

218 Args: 

219 path: Dot-notation path to the value 

220 default: Default value if path not found 

221 

222 Returns: 

223 The value at the path or default 

224 """ 

225 parts = path.split(".", 1) 

226 if len(parts) == 1: 

227 # No more nesting, get the value 

228 return self.get_value(parts[0], default) 

229 

230 root, remaining = parts 

231 

232 # Handle special root paths 

233 if root == "metadata": 

234 # Navigate through metadata dict 

235 if not self.metadata: 

236 return default 

237 return self._traverse_dict(self.metadata, remaining, default) 

238 elif root == "fields": 

239 # Get field value by name 

240 if "." in remaining: 

241 # Nested path within field value (if it's a dict) 

242 field_name, field_path = remaining.split(".", 1) 

243 field_value = self.get_value(field_name, None) 

244 if isinstance(field_value, dict): 

245 return self._traverse_dict(field_value, field_path, default) 

246 return default 

247 else: 

248 # Simple field access 

249 return self.get_value(remaining, default) 

250 else: 

251 # Check if it's a field containing a dict 

252 field_value = self.get_value(root, None) 

253 if isinstance(field_value, dict): 

254 return self._traverse_dict(field_value, remaining, default) 

255 return default 

256 

257 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any: 

258 """Traverse a dictionary using dot notation. 

259 

260 Args: 

261 data: Dictionary to traverse 

262 path: Dot-notation path 

263 default: Default value if path not found 

264 

265 Returns: 

266 Value at path or default 

267 """ 

268 parts = path.split(".") 

269 current = data 

270 

271 for part in parts: 

272 if isinstance(current, dict) and part in current: 

273 current = current[part] 

274 else: 

275 return default 

276 

277 return current 

278 

279 def set_field( 

280 self, 

281 name: str, 

282 value: Any, 

283 field_type: FieldType | None = None, 

284 field_metadata: dict[str, Any] | None = None, 

285 ) -> None: 

286 """Set or update a field.""" 

287 self.fields[name] = Field( 

288 name=name, value=value, type=field_type, metadata=field_metadata or {} 

289 ) 

290 

291 def set_value(self, name: str, value: Any) -> None: 

292 """Set a field's value by name. 

293  

294 Convenience method that creates the field if it doesn't exist. 

295 """ 

296 if name in self.fields: 

297 self.fields[name].value = value 

298 else: 

299 self.set_field(name, value) 

300 

301 @property 

302 def data(self) -> dict[str, Any]: 

303 """Get all field values as a dictionary. 

304  

305 Provides a simple dict-like view of the record's data. 

306 """ 

307 return {name: field.value for name, field in self.fields.items()} 

308 

309 def remove_field(self, name: str) -> bool: 

310 """Remove a field by name. Returns True if field was removed.""" 

311 if name in self.fields: 

312 del self.fields[name] 

313 return True 

314 return False 

315 

316 def has_field(self, name: str) -> bool: 

317 """Check if a field exists.""" 

318 return name in self.fields 

319 

320 def field_names(self) -> list[str]: 

321 """Get list of field names.""" 

322 return list(self.fields.keys()) 

323 

324 def field_count(self) -> int: 

325 """Get the number of fields.""" 

326 return len(self.fields) 

327 

328 def __getitem__(self, key: str | int) -> Any: 

329 """Get field value by name or field by index. 

330 

331 For string keys, returns the field value directly (dict-like access). 

332 For integer keys, returns the Field object at that index for backward compatibility. 

333 """ 

334 if isinstance(key, str): 

335 if key not in self.fields: 

336 raise KeyError(f"Field '{key}' not found") 

337 return self.fields[key].value 

338 elif isinstance(key, int): 

339 field_list = list(self.fields.values()) 

340 if key < 0 or key >= len(field_list): 

341 raise IndexError(f"Field index {key} out of range") 

342 return field_list[key] 

343 else: 

344 raise TypeError(f"Key must be str or int, got {type(key)}") 

345 

346 def __setitem__(self, key: str, value: Field | Any) -> None: 

347 """Set field by name. 

348 

349 Can accept either a Field object or a raw value. 

350 When given a raw value, creates a new Field automatically. 

351 """ 

352 if isinstance(value, Field): 

353 self.fields[key] = value 

354 else: 

355 self.set_field(key, value) 

356 

357 def __delitem__(self, key: str) -> None: 

358 """Delete field by name.""" 

359 if key not in self.fields: 

360 raise KeyError(f"Field '{key}' not found") 

361 del self.fields[key] 

362 

363 def __contains__(self, key: str) -> bool: 

364 """Check if field exists.""" 

365 return key in self.fields 

366 

367 def __iter__(self) -> Iterator[str]: 

368 """Iterate over field names.""" 

369 return iter(self.fields) 

370 

371 def __len__(self) -> int: 

372 """Get number of fields.""" 

373 return len(self.fields) 

374 

375 def validate(self) -> bool: 

376 """Validate all fields in the record.""" 

377 return all(field.validate() for field in self.fields.values()) 

378 

379 def get_field_object(self, key: str) -> Field: 

380 """Get the Field object by name. 

381 

382 Use this method when you need access to the Field object itself, 

383 not just its value. 

384 

385 Args: 

386 key: Field name 

387 

388 Returns: 

389 The Field object 

390 

391 Raises: 

392 KeyError: If field not found 

393 """ 

394 if key not in self.fields: 

395 raise KeyError(f"Field '{key}' not found") 

396 return self.fields[key] 

397 

398 def __getattr__(self, name: str) -> Any: 

399 """Get field value by attribute access. 

400 

401 Provides convenient attribute-style access to field values. 

402 Falls back to normal attribute access for non-field attributes. 

403 

404 Args: 

405 name: Attribute/field name 

406 

407 Returns: 

408 Field value if field exists, otherwise raises AttributeError 

409 """ 

410 # Avoid infinite recursion for special attributes 

411 if name.startswith("_") or name in ("fields", "metadata", "id"): 

412 raise AttributeError( 

413 f"'{type(self).__name__}' object has no attribute '{name}'" 

414 ) 

415 

416 # Check if it's a field 

417 if hasattr(self, "fields") and name in self.fields: 

418 return self.fields[name].value 

419 

420 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'") 

421 

422 def __setattr__(self, name: str, value: Any) -> None: 

423 """Set field value by attribute access. 

424 

425 Allows setting field values using attribute syntax. 

426 Special attributes (fields, metadata, _id, _storage_id) are handled normally. 

427 Properties (id, storage_id) are also handled specially. 

428 

429 Args: 

430 name: Attribute/field name 

431 value: Value to set 

432 """ 

433 # Handle special attributes and private attributes normally 

434 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"): 

435 super().__setattr__(name, value) 

436 # Handle properties that have setters 

437 elif name in ("id", "storage_id"): 

438 # Use the property setter 

439 object.__setattr__(self, name, value) 

440 elif hasattr(self, "fields") and name in self.fields: 

441 # Update existing field value 

442 self.fields[name].value = value 

443 else: 

444 # For new fields during normal operation, create them 

445 # But during __init__, we need to use normal attribute setting 

446 if hasattr(self, "fields"): 

447 self.set_field(name, value) 

448 else: 

449 super().__setattr__(name, value) 

450 

451 def to_dict( 

452 self, 

453 include_metadata: bool = False, 

454 flatten: bool = True, 

455 include_field_objects: bool = True, 

456 ) -> dict[str, Any]: 

457 """Convert record to dictionary. 

458 

459 Args: 

460 include_metadata: Whether to include metadata in the output 

461 flatten: If True (default), return just field values; if False, return structured format 

462 include_field_objects: If True and not flattened, return full Field objects 

463 

464 Returns: 

465 Dictionary representation of the record 

466 """ 

467 if flatten: 

468 # Simple dict with just values (default behavior for ergonomics) 

469 result = {} 

470 for name, field in self.fields.items(): 

471 # Handle VectorField specially to ensure JSON serialization 

472 if hasattr(field, 'to_list') and callable(field.to_list): 

473 # VectorField has a to_list() method for serialization 

474 result[name] = field.to_list() 

475 else: 

476 result[name] = field.value 

477 if self.id: 

478 result["_id"] = self.id 

479 if include_metadata and self.metadata: 

480 result["_metadata"] = self.metadata 

481 else: 

482 # Structured format for serialization 

483 if include_field_objects: 

484 result = { 

485 "fields": { 

486 name: field.to_dict() for name, field in self.fields.items() 

487 } 

488 } 

489 else: 

490 result = { 

491 "fields": {name: field.value for name, field in self.fields.items()} 

492 } 

493 if self.id: 

494 result["id"] = self.id 

495 if include_metadata: 

496 result["metadata"] = self.metadata 

497 return result 

498 

499 @classmethod 

500 def from_dict(cls, data: dict[str, Any]) -> Record: 

501 """Create a record from a dictionary representation.""" 

502 if "fields" in data: 

503 fields = OrderedDict() 

504 for name, field_data in data["fields"].items(): 

505 if isinstance(field_data, dict) and "value" in field_data: 

506 # Add name to field_data for Field.from_dict 

507 field_data_with_name = {"name": name, **field_data} 

508 fields[name] = Field.from_dict(field_data_with_name) 

509 else: 

510 fields[name] = Field(name=name, value=field_data) 

511 metadata = data.get("metadata", {}) 

512 record_id = data.get("id") or data.get("_id") 

513 return cls(data=fields, metadata=metadata, id=record_id) 

514 else: 

515 # Check for _id in flattened format 

516 record_id = data.pop("_id", None) if "_id" in data else None 

517 return cls(data=data, id=record_id) 

518 

519 def copy(self, deep: bool = True) -> Record: 

520 """Create a copy of the record. 

521 

522 Args: 

523 deep: If True, create deep copies of fields and metadata 

524 """ 

525 if deep: 

526 import copy 

527 

528 new_fields = OrderedDict() 

529 for name, field in self.fields.items(): 

530 # Preserve the actual field type (Field or VectorField) 

531 if hasattr(field, '__class__'): 

532 # Use the actual class of the field 

533 field_class = field.__class__ 

534 if field_class.__name__ == 'VectorField': 

535 # Import VectorField if needed 

536 from dataknobs_data.fields import VectorField 

537 new_fields[name] = VectorField( 

538 name=field.name, 

539 value=copy.deepcopy(field.value), 

540 dimensions=getattr(field, 'dimensions', None), 

541 source_field=getattr(field, 'source_field', None), 

542 model_name=getattr(field, 'model_name', None), 

543 model_version=getattr(field, 'model_version', None), 

544 metadata=copy.deepcopy(field.metadata), 

545 ) 

546 else: 

547 new_fields[name] = Field( 

548 name=field.name, 

549 value=copy.deepcopy(field.value), 

550 type=field.type, 

551 metadata=copy.deepcopy(field.metadata), 

552 ) 

553 else: 

554 # Fallback to regular Field 

555 new_fields[name] = Field( 

556 name=field.name, 

557 value=copy.deepcopy(field.value), 

558 type=field.type, 

559 metadata=copy.deepcopy(field.metadata), 

560 ) 

561 new_metadata = copy.deepcopy(self.metadata) 

562 else: 

563 new_fields = OrderedDict(self.fields) # type: ignore[arg-type] 

564 new_metadata = self.metadata.copy() 

565 

566 return Record(data=new_fields, metadata=new_metadata, id=self.id) 

567 

568 def project(self, field_names: list[str]) -> Record: 

569 """Create a new record with only specified fields.""" 

570 projected_fields = OrderedDict() 

571 for name in field_names: 

572 if name in self.fields: 

573 projected_fields[name] = self.fields[name] 

574 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id) 

575 

576 def merge(self, other: Record, overwrite: bool = True) -> Record: 

577 """Merge another record into this one. 

578 

579 Args: 

580 other: The record to merge 

581 overwrite: If True, overwrite existing fields; if False, keep existing 

582 

583 Returns: 

584 A new merged record 

585 """ 

586 merged_fields = OrderedDict(self.fields) 

587 for name, field_obj in other.fields.items(): 

588 if overwrite or name not in merged_fields: 

589 merged_fields[name] = field_obj 

590 

591 merged_metadata = self.metadata.copy() 

592 if overwrite: 

593 merged_metadata.update(other.metadata) 

594 

595 # Use the ID from this record, or from other if this doesn't have one 

596 merged_id = self.id if self.id else other.id 

597 

598 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)