Coverage for src/dataknobs_data/records.py: 40%
256 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:15 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:15 -0600
1from __future__ import annotations
3import uuid
4from collections import OrderedDict
5from dataclasses import dataclass, field
6from typing import Any, TYPE_CHECKING
8from .fields import Field, FieldType
10if TYPE_CHECKING:
11 from collections.abc import Iterator
14@dataclass
15class Record:
16 """Represents a structured data record with fields and metadata.
18 The record ID can be accessed via the `id` property, which:
19 - Returns the storage_id if set (database-assigned ID)
20 - Falls back to user-defined 'id' field if present
21 - Returns None if no ID is available
23 This separation allows records to have both:
24 - A user-defined 'id' field as part of their data
25 - A system-assigned storage_id for database operations
26 """
28 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict)
29 metadata: dict[str, Any] = field(default_factory=dict)
30 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id
31 _storage_id: str | None = field(default=None, repr=False)
33 def __init__(
34 self,
35 data: dict[str, Any] | OrderedDict[str, Field] | None = None,
36 metadata: dict[str, Any] | None = None,
37 id: str | None = None,
38 storage_id: str | None = None,
39 ):
40 """Initialize a record from various data formats.
42 Args:
43 data: Can be a dict of field names to values, or an OrderedDict of Field objects
44 metadata: Optional metadata for the record
45 id: Optional unique identifier for the record (deprecated, use storage_id)
46 storage_id: Optional storage system identifier for the record
47 """
48 self.metadata = metadata or {}
49 self.fields = OrderedDict()
50 self._id = id # Deprecated
51 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id
53 # Process data first to populate fields
54 if data:
55 if isinstance(data, OrderedDict) and all(
56 isinstance(v, Field) for v in data.values()
57 ):
58 self.fields = data
59 else:
60 for key, value in data.items():
61 if isinstance(value, Field):
62 # Ensure the field has the correct name
63 if value.name is None or value.name == "embedding":
64 value.name = key
65 self.fields[key] = value
66 else:
67 self.fields[key] = Field(name=key, value=value)
69 # Now check for ID from various sources if not explicitly provided
70 if self._id is None:
71 # Check metadata
72 if "id" in self.metadata:
73 self._id = str(self.metadata["id"])
74 # Check fields for id
75 elif "id" in self.fields:
76 value = self.get_value("id")
77 if value is not None:
78 self._id = str(value)
79 # Sync to metadata
80 self.metadata["id"] = self._id
81 # Check fields for record_id
82 elif "record_id" in self.fields:
83 value = self.get_value("record_id")
84 if value is not None:
85 self._id = str(value)
86 # Sync to metadata
87 self.metadata["id"] = self._id
89 @property
90 def storage_id(self) -> str | None:
91 """Get the storage system ID (database-assigned ID)."""
92 return self._storage_id
94 @storage_id.setter
95 def storage_id(self, value: str | None) -> None:
96 """Set the storage system ID."""
97 self._storage_id = value
98 # Also update _id for backwards compatibility
99 self._id = value
101 @property
102 def id(self) -> str | None:
103 """Get the record ID.
105 Priority order:
106 1. Storage ID (database-assigned) if set
107 2. User-defined 'id' field value
108 3. Metadata 'id' (for backwards compatibility)
109 4. record_id field (common in DataFrames)
111 Returns the first ID found, or None if no ID is present.
112 """
113 # 1. Prefer storage ID (database-assigned)
114 if self._storage_id is not None:
115 return self._storage_id
117 # 2. Fall back to legacy _id if set
118 if self._id is not None:
119 return self._id
121 # 3. Check for 'id' field in user data
122 if "id" in self.fields:
123 value = self.get_value("id")
124 if value is not None:
125 return str(value)
127 # 4. Check metadata (backwards compatibility)
128 if "id" in self.metadata:
129 return str(self.metadata["id"])
131 # 5. Check for 'record_id' field (common in DataFrames)
132 if "record_id" in self.fields:
133 value = self.get_value("record_id")
134 if value is not None:
135 return str(value)
137 return None
139 @id.setter
140 def id(self, value: str | None) -> None:
141 """Set the record ID.
143 This sets the storage_id, which is the database-assigned ID.
144 It does NOT modify user data fields.
145 """
146 self._storage_id = value
147 self._id = value # Backwards compatibility
149 # Update metadata for backward compatibility
150 if value is not None:
151 self.metadata["id"] = value
152 elif "id" in self.metadata:
153 del self.metadata["id"]
155 def generate_id(self) -> str:
156 """Generate and set a new UUID for this record.
158 Returns:
159 The generated UUID string
160 """
161 new_id = str(uuid.uuid4())
162 self.id = new_id
163 return new_id
165 def get_user_id(self) -> str | None:
166 """Get the user-defined ID field value (not the storage ID).
168 This explicitly returns the value of the 'id' field in the record's data,
169 ignoring any storage_id that may be set.
171 Returns:
172 The value of the 'id' field if present, None otherwise
173 """
174 if "id" in self.fields:
175 value = self.get_value("id")
176 if value is not None:
177 return str(value)
178 return None
180 def has_storage_id(self) -> bool:
181 """Check if this record has a storage system ID assigned.
183 Returns:
184 True if storage_id is set, False otherwise
185 """
186 return self._storage_id is not None
188 def get_field(self, name: str) -> Field | None:
189 """Get a field by name."""
190 return self.fields.get(name)
192 def get_value(self, name: str, default: Any = None) -> Any:
193 """Get a field's value by name, supporting dot-notation for nested paths.
195 Args:
196 name: Field name or dot-notation path (e.g., "metadata.type")
197 default: Default value if field not found
199 Returns:
200 The field value or default
201 """
202 # Check if this is a nested path
203 if "." in name:
204 return self.get_nested_value(name, default)
206 # Simple field lookup
207 field = self.get_field(name)
208 return field.value if field else default
210 def get_nested_value(self, path: str, default: Any = None) -> Any:
211 """Get a value from a nested path using dot notation.
213 Supports paths like:
214 - "metadata.type" - access metadata field (if exists) or metadata dict attribute
215 - "fields.temperature" - access field values
216 - "metadata.config.timeout" - nested dict access
218 Args:
219 path: Dot-notation path to the value
220 default: Default value if path not found
222 Returns:
223 The value at the path or default
224 """
225 parts = path.split(".", 1)
226 if len(parts) == 1:
227 # No more nesting, get the value
228 return self.get_value(parts[0], default)
230 root, remaining = parts
232 # Handle special root paths
233 if root == "metadata":
234 # Check if "metadata" is a field first, before falling back to attribute
235 if root in self.fields:
236 # It's a field, navigate through its value
237 field_value = self.get_value(root, None)
238 if isinstance(field_value, dict):
239 return self._traverse_dict(field_value, remaining, default)
240 return default
241 elif self.metadata:
242 # Fall back to record's metadata attribute
243 return self._traverse_dict(self.metadata, remaining, default)
244 else:
245 return default
246 elif root == "fields":
247 # Get field value by name
248 if "." in remaining:
249 # Nested path within field value (if it's a dict)
250 field_name, field_path = remaining.split(".", 1)
251 field_value = self.get_value(field_name, None)
252 if isinstance(field_value, dict):
253 return self._traverse_dict(field_value, field_path, default)
254 return default
255 else:
256 # Simple field access
257 return self.get_value(remaining, default)
258 else:
259 # Check if it's a field containing a dict
260 field_value = self.get_value(root, None)
261 if isinstance(field_value, dict):
262 return self._traverse_dict(field_value, remaining, default)
263 return default
265 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any:
266 """Traverse a dictionary using dot notation.
268 Args:
269 data: Dictionary to traverse
270 path: Dot-notation path
271 default: Default value if path not found
273 Returns:
274 Value at path or default
275 """
276 parts = path.split(".")
277 current = data
279 for part in parts:
280 if isinstance(current, dict) and part in current:
281 current = current[part]
282 else:
283 return default
285 return current
287 def set_field(
288 self,
289 name: str,
290 value: Any,
291 field_type: FieldType | None = None,
292 field_metadata: dict[str, Any] | None = None,
293 ) -> None:
294 """Set or update a field."""
295 self.fields[name] = Field(
296 name=name, value=value, type=field_type, metadata=field_metadata or {}
297 )
299 def set_value(self, name: str, value: Any) -> None:
300 """Set a field's value by name.
302 Convenience method that creates the field if it doesn't exist.
303 """
304 if name in self.fields:
305 self.fields[name].value = value
306 else:
307 self.set_field(name, value)
309 @property
310 def data(self) -> dict[str, Any]:
311 """Get all field values as a dictionary.
313 Provides a simple dict-like view of the record's data.
314 """
315 return {name: field.value for name, field in self.fields.items()}
317 def remove_field(self, name: str) -> bool:
318 """Remove a field by name. Returns True if field was removed."""
319 if name in self.fields:
320 del self.fields[name]
321 return True
322 return False
324 def has_field(self, name: str) -> bool:
325 """Check if a field exists."""
326 return name in self.fields
328 def field_names(self) -> list[str]:
329 """Get list of field names."""
330 return list(self.fields.keys())
332 def field_count(self) -> int:
333 """Get the number of fields."""
334 return len(self.fields)
336 def __getitem__(self, key: str | int) -> Any:
337 """Get field value by name or field by index.
339 For string keys, returns the field value directly (dict-like access).
340 For integer keys, returns the Field object at that index for backward compatibility.
341 """
342 if isinstance(key, str):
343 if key not in self.fields:
344 raise KeyError(f"Field '{key}' not found")
345 return self.fields[key].value
346 elif isinstance(key, int):
347 field_list = list(self.fields.values())
348 if key < 0 or key >= len(field_list):
349 raise IndexError(f"Field index {key} out of range")
350 return field_list[key]
351 else:
352 raise TypeError(f"Key must be str or int, got {type(key)}")
354 def __setitem__(self, key: str, value: Field | Any) -> None:
355 """Set field by name.
357 Can accept either a Field object or a raw value.
358 When given a raw value, creates a new Field automatically.
359 """
360 if isinstance(value, Field):
361 self.fields[key] = value
362 else:
363 self.set_field(key, value)
365 def __delitem__(self, key: str) -> None:
366 """Delete field by name."""
367 if key not in self.fields:
368 raise KeyError(f"Field '{key}' not found")
369 del self.fields[key]
371 def __contains__(self, key: str) -> bool:
372 """Check if field exists."""
373 return key in self.fields
375 def __iter__(self) -> Iterator[str]:
376 """Iterate over field names."""
377 return iter(self.fields)
379 def __len__(self) -> int:
380 """Get number of fields."""
381 return len(self.fields)
383 def validate(self) -> bool:
384 """Validate all fields in the record."""
385 return all(field.validate() for field in self.fields.values())
387 def get_field_object(self, key: str) -> Field:
388 """Get the Field object by name.
390 Use this method when you need access to the Field object itself,
391 not just its value.
393 Args:
394 key: Field name
396 Returns:
397 The Field object
399 Raises:
400 KeyError: If field not found
401 """
402 if key not in self.fields:
403 raise KeyError(f"Field '{key}' not found")
404 return self.fields[key]
406 def __getattr__(self, name: str) -> Any:
407 """Get field value by attribute access.
409 Provides convenient attribute-style access to field values.
410 Falls back to normal attribute access for non-field attributes.
412 Args:
413 name: Attribute/field name
415 Returns:
416 Field value if field exists, otherwise raises AttributeError
417 """
418 # Avoid infinite recursion for special attributes
419 if name.startswith("_") or name in ("fields", "metadata", "id"):
420 raise AttributeError(
421 f"'{type(self).__name__}' object has no attribute '{name}'"
422 )
424 # Check if it's a field
425 if hasattr(self, "fields") and name in self.fields:
426 return self.fields[name].value
428 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'")
430 def __setattr__(self, name: str, value: Any) -> None:
431 """Set field value by attribute access.
433 Allows setting field values using attribute syntax.
434 Special attributes (fields, metadata, _id, _storage_id) are handled normally.
435 Properties (id, storage_id) are also handled specially.
437 Args:
438 name: Attribute/field name
439 value: Value to set
440 """
441 # Handle special attributes and private attributes normally
442 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"):
443 super().__setattr__(name, value)
444 # Handle properties that have setters
445 elif name in ("id", "storage_id"):
446 # Use the property setter
447 object.__setattr__(self, name, value)
448 elif hasattr(self, "fields") and name in self.fields:
449 # Update existing field value
450 self.fields[name].value = value
451 else:
452 # For new fields during normal operation, create them
453 # But during __init__, we need to use normal attribute setting
454 if hasattr(self, "fields"):
455 self.set_field(name, value)
456 else:
457 super().__setattr__(name, value)
459 def to_dict(
460 self,
461 include_metadata: bool = False,
462 flatten: bool = True,
463 include_field_objects: bool = True,
464 ) -> dict[str, Any]:
465 """Convert record to dictionary.
467 Args:
468 include_metadata: Whether to include metadata in the output
469 flatten: If True (default), return just field values; if False, return structured format
470 include_field_objects: If True and not flattened, return full Field objects
472 Returns:
473 Dictionary representation of the record
474 """
475 if flatten:
476 # Simple dict with just values (default behavior for ergonomics)
477 result = {}
478 for name, field in self.fields.items():
479 # Handle VectorField specially to ensure JSON serialization
480 if hasattr(field, 'to_list') and callable(field.to_list):
481 # VectorField has a to_list() method for serialization
482 result[name] = field.to_list()
483 else:
484 result[name] = field.value
485 if self.id:
486 result["_id"] = self.id
487 if include_metadata and self.metadata:
488 result["_metadata"] = self.metadata
489 else:
490 # Structured format for serialization
491 if include_field_objects:
492 result = {
493 "fields": {
494 name: field.to_dict() for name, field in self.fields.items()
495 }
496 }
497 else:
498 result = {
499 "fields": {name: field.value for name, field in self.fields.items()}
500 }
501 if self.id:
502 result["id"] = self.id
503 if include_metadata:
504 result["metadata"] = self.metadata
505 return result
507 @classmethod
508 def from_dict(cls, data: dict[str, Any]) -> Record:
509 """Create a record from a dictionary representation."""
510 if "fields" in data:
511 fields = OrderedDict()
512 for name, field_data in data["fields"].items():
513 if isinstance(field_data, dict) and "value" in field_data:
514 # Add name to field_data for Field.from_dict
515 field_data_with_name = {"name": name, **field_data}
516 fields[name] = Field.from_dict(field_data_with_name)
517 else:
518 fields[name] = Field(name=name, value=field_data)
519 metadata = data.get("metadata", {})
520 record_id = data.get("id") or data.get("_id")
521 return cls(data=fields, metadata=metadata, id=record_id)
522 else:
523 # Check for _id in flattened format
524 record_id = data.pop("_id", None) if "_id" in data else None
525 return cls(data=data, id=record_id)
527 def copy(self, deep: bool = True) -> Record:
528 """Create a copy of the record.
530 Args:
531 deep: If True, create deep copies of fields and metadata
532 """
533 if deep:
534 import copy
536 new_fields = OrderedDict()
537 for name, field in self.fields.items():
538 # Preserve the actual field type (Field or VectorField)
539 if hasattr(field, '__class__'):
540 # Use the actual class of the field
541 field_class = field.__class__
542 if field_class.__name__ == 'VectorField':
543 # Import VectorField if needed
544 from dataknobs_data.fields import VectorField
545 new_fields[name] = VectorField(
546 name=field.name,
547 value=copy.deepcopy(field.value),
548 dimensions=getattr(field, 'dimensions', None),
549 source_field=getattr(field, 'source_field', None),
550 model_name=getattr(field, 'model_name', None),
551 model_version=getattr(field, 'model_version', None),
552 metadata=copy.deepcopy(field.metadata),
553 )
554 else:
555 new_fields[name] = Field(
556 name=field.name,
557 value=copy.deepcopy(field.value),
558 type=field.type,
559 metadata=copy.deepcopy(field.metadata),
560 )
561 else:
562 # Fallback to regular Field
563 new_fields[name] = Field(
564 name=field.name,
565 value=copy.deepcopy(field.value),
566 type=field.type,
567 metadata=copy.deepcopy(field.metadata),
568 )
569 new_metadata = copy.deepcopy(self.metadata)
570 else:
571 new_fields = OrderedDict(self.fields) # type: ignore[arg-type]
572 new_metadata = self.metadata.copy()
574 return Record(data=new_fields, metadata=new_metadata, id=self.id)
576 def project(self, field_names: list[str]) -> Record:
577 """Create a new record with only specified fields."""
578 projected_fields = OrderedDict()
579 for name in field_names:
580 if name in self.fields:
581 projected_fields[name] = self.fields[name]
582 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id)
584 def merge(self, other: Record, overwrite: bool = True) -> Record:
585 """Merge another record into this one.
587 Args:
588 other: The record to merge
589 overwrite: If True, overwrite existing fields; if False, keep existing
591 Returns:
592 A new merged record
593 """
594 merged_fields = OrderedDict(self.fields)
595 for name, field_obj in other.fields.items():
596 if overwrite or name not in merged_fields:
597 merged_fields[name] = field_obj
599 merged_metadata = self.metadata.copy()
600 if overwrite:
601 merged_metadata.update(other.metadata)
603 # Use the ID from this record, or from other if this doesn't have one
604 merged_id = self.id if self.id else other.id
606 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)