Coverage for src/dataknobs_data/records.py: 37%
251 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-31 15:06 -0600
1from __future__ import annotations
3import uuid
4from collections import OrderedDict
5from dataclasses import dataclass, field
6from typing import Any, TYPE_CHECKING
8from .fields import Field, FieldType
10if TYPE_CHECKING:
11 from collections.abc import Iterator
14@dataclass
15class Record:
16 """Represents a structured data record with fields and metadata.
18 The record ID can be accessed via the `id` property, which:
19 - Returns the storage_id if set (database-assigned ID)
20 - Falls back to user-defined 'id' field if present
21 - Returns None if no ID is available
23 This separation allows records to have both:
24 - A user-defined 'id' field as part of their data
25 - A system-assigned storage_id for database operations
26 """
28 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict)
29 metadata: dict[str, Any] = field(default_factory=dict)
30 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id
31 _storage_id: str | None = field(default=None, repr=False)
33 def __init__(
34 self,
35 data: dict[str, Any] | OrderedDict[str, Field] | None = None,
36 metadata: dict[str, Any] | None = None,
37 id: str | None = None,
38 storage_id: str | None = None,
39 ):
40 """Initialize a record from various data formats.
42 Args:
43 data: Can be a dict of field names to values, or an OrderedDict of Field objects
44 metadata: Optional metadata for the record
45 id: Optional unique identifier for the record (deprecated, use storage_id)
46 storage_id: Optional storage system identifier for the record
47 """
48 self.metadata = metadata or {}
49 self.fields = OrderedDict()
50 self._id = id # Deprecated
51 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id
53 # Process data first to populate fields
54 if data:
55 if isinstance(data, OrderedDict) and all(
56 isinstance(v, Field) for v in data.values()
57 ):
58 self.fields = data
59 else:
60 for key, value in data.items():
61 if isinstance(value, Field):
62 # Ensure the field has the correct name
63 if value.name is None or value.name == "embedding":
64 value.name = key
65 self.fields[key] = value
66 else:
67 self.fields[key] = Field(name=key, value=value)
69 # Now check for ID from various sources if not explicitly provided
70 if self._id is None:
71 # Check metadata
72 if "id" in self.metadata:
73 self._id = str(self.metadata["id"])
74 # Check fields for id
75 elif "id" in self.fields:
76 value = self.get_value("id")
77 if value is not None:
78 self._id = str(value)
79 # Sync to metadata
80 self.metadata["id"] = self._id
81 # Check fields for record_id
82 elif "record_id" in self.fields:
83 value = self.get_value("record_id")
84 if value is not None:
85 self._id = str(value)
86 # Sync to metadata
87 self.metadata["id"] = self._id
89 @property
90 def storage_id(self) -> str | None:
91 """Get the storage system ID (database-assigned ID)."""
92 return self._storage_id
94 @storage_id.setter
95 def storage_id(self, value: str | None) -> None:
96 """Set the storage system ID."""
97 self._storage_id = value
98 # Also update _id for backwards compatibility
99 self._id = value
101 @property
102 def id(self) -> str | None:
103 """Get the record ID.
105 Priority order:
106 1. Storage ID (database-assigned) if set
107 2. User-defined 'id' field value
108 3. Metadata 'id' (for backwards compatibility)
109 4. record_id field (common in DataFrames)
111 Returns the first ID found, or None if no ID is present.
112 """
113 # 1. Prefer storage ID (database-assigned)
114 if self._storage_id is not None:
115 return self._storage_id
117 # 2. Fall back to legacy _id if set
118 if self._id is not None:
119 return self._id
121 # 3. Check for 'id' field in user data
122 if "id" in self.fields:
123 value = self.get_value("id")
124 if value is not None:
125 return str(value)
127 # 4. Check metadata (backwards compatibility)
128 if "id" in self.metadata:
129 return str(self.metadata["id"])
131 # 5. Check for 'record_id' field (common in DataFrames)
132 if "record_id" in self.fields:
133 value = self.get_value("record_id")
134 if value is not None:
135 return str(value)
137 return None
139 @id.setter
140 def id(self, value: str | None) -> None:
141 """Set the record ID.
143 This sets the storage_id, which is the database-assigned ID.
144 It does NOT modify user data fields.
145 """
146 self._storage_id = value
147 self._id = value # Backwards compatibility
149 # Update metadata for backward compatibility
150 if value is not None:
151 self.metadata["id"] = value
152 elif "id" in self.metadata:
153 del self.metadata["id"]
155 def generate_id(self) -> str:
156 """Generate and set a new UUID for this record.
158 Returns:
159 The generated UUID string
160 """
161 new_id = str(uuid.uuid4())
162 self.id = new_id
163 return new_id
165 def get_user_id(self) -> str | None:
166 """Get the user-defined ID field value (not the storage ID).
168 This explicitly returns the value of the 'id' field in the record's data,
169 ignoring any storage_id that may be set.
171 Returns:
172 The value of the 'id' field if present, None otherwise
173 """
174 if "id" in self.fields:
175 value = self.get_value("id")
176 if value is not None:
177 return str(value)
178 return None
180 def has_storage_id(self) -> bool:
181 """Check if this record has a storage system ID assigned.
183 Returns:
184 True if storage_id is set, False otherwise
185 """
186 return self._storage_id is not None
188 def get_field(self, name: str) -> Field | None:
189 """Get a field by name."""
190 return self.fields.get(name)
192 def get_value(self, name: str, default: Any = None) -> Any:
193 """Get a field's value by name, supporting dot-notation for nested paths.
195 Args:
196 name: Field name or dot-notation path (e.g., "metadata.type")
197 default: Default value if field not found
199 Returns:
200 The field value or default
201 """
202 # Check if this is a nested path
203 if "." in name:
204 return self.get_nested_value(name, default)
206 # Simple field lookup
207 field = self.get_field(name)
208 return field.value if field else default
210 def get_nested_value(self, path: str, default: Any = None) -> Any:
211 """Get a value from a nested path using dot notation.
213 Supports paths like:
214 - "metadata.type" - access metadata dict
215 - "fields.temperature" - access field values
216 - "metadata.config.timeout" - nested dict access
218 Args:
219 path: Dot-notation path to the value
220 default: Default value if path not found
222 Returns:
223 The value at the path or default
224 """
225 parts = path.split(".", 1)
226 if len(parts) == 1:
227 # No more nesting, get the value
228 return self.get_value(parts[0], default)
230 root, remaining = parts
232 # Handle special root paths
233 if root == "metadata":
234 # Navigate through metadata dict
235 if not self.metadata:
236 return default
237 return self._traverse_dict(self.metadata, remaining, default)
238 elif root == "fields":
239 # Get field value by name
240 if "." in remaining:
241 # Nested path within field value (if it's a dict)
242 field_name, field_path = remaining.split(".", 1)
243 field_value = self.get_value(field_name, None)
244 if isinstance(field_value, dict):
245 return self._traverse_dict(field_value, field_path, default)
246 return default
247 else:
248 # Simple field access
249 return self.get_value(remaining, default)
250 else:
251 # Check if it's a field containing a dict
252 field_value = self.get_value(root, None)
253 if isinstance(field_value, dict):
254 return self._traverse_dict(field_value, remaining, default)
255 return default
257 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any:
258 """Traverse a dictionary using dot notation.
260 Args:
261 data: Dictionary to traverse
262 path: Dot-notation path
263 default: Default value if path not found
265 Returns:
266 Value at path or default
267 """
268 parts = path.split(".")
269 current = data
271 for part in parts:
272 if isinstance(current, dict) and part in current:
273 current = current[part]
274 else:
275 return default
277 return current
279 def set_field(
280 self,
281 name: str,
282 value: Any,
283 field_type: FieldType | None = None,
284 field_metadata: dict[str, Any] | None = None,
285 ) -> None:
286 """Set or update a field."""
287 self.fields[name] = Field(
288 name=name, value=value, type=field_type, metadata=field_metadata or {}
289 )
291 def set_value(self, name: str, value: Any) -> None:
292 """Set a field's value by name.
294 Convenience method that creates the field if it doesn't exist.
295 """
296 if name in self.fields:
297 self.fields[name].value = value
298 else:
299 self.set_field(name, value)
301 @property
302 def data(self) -> dict[str, Any]:
303 """Get all field values as a dictionary.
305 Provides a simple dict-like view of the record's data.
306 """
307 return {name: field.value for name, field in self.fields.items()}
309 def remove_field(self, name: str) -> bool:
310 """Remove a field by name. Returns True if field was removed."""
311 if name in self.fields:
312 del self.fields[name]
313 return True
314 return False
316 def has_field(self, name: str) -> bool:
317 """Check if a field exists."""
318 return name in self.fields
320 def field_names(self) -> list[str]:
321 """Get list of field names."""
322 return list(self.fields.keys())
324 def field_count(self) -> int:
325 """Get the number of fields."""
326 return len(self.fields)
328 def __getitem__(self, key: str | int) -> Any:
329 """Get field value by name or field by index.
331 For string keys, returns the field value directly (dict-like access).
332 For integer keys, returns the Field object at that index for backward compatibility.
333 """
334 if isinstance(key, str):
335 if key not in self.fields:
336 raise KeyError(f"Field '{key}' not found")
337 return self.fields[key].value
338 elif isinstance(key, int):
339 field_list = list(self.fields.values())
340 if key < 0 or key >= len(field_list):
341 raise IndexError(f"Field index {key} out of range")
342 return field_list[key]
343 else:
344 raise TypeError(f"Key must be str or int, got {type(key)}")
346 def __setitem__(self, key: str, value: Field | Any) -> None:
347 """Set field by name.
349 Can accept either a Field object or a raw value.
350 When given a raw value, creates a new Field automatically.
351 """
352 if isinstance(value, Field):
353 self.fields[key] = value
354 else:
355 self.set_field(key, value)
357 def __delitem__(self, key: str) -> None:
358 """Delete field by name."""
359 if key not in self.fields:
360 raise KeyError(f"Field '{key}' not found")
361 del self.fields[key]
363 def __contains__(self, key: str) -> bool:
364 """Check if field exists."""
365 return key in self.fields
367 def __iter__(self) -> Iterator[str]:
368 """Iterate over field names."""
369 return iter(self.fields)
371 def __len__(self) -> int:
372 """Get number of fields."""
373 return len(self.fields)
375 def validate(self) -> bool:
376 """Validate all fields in the record."""
377 return all(field.validate() for field in self.fields.values())
379 def get_field_object(self, key: str) -> Field:
380 """Get the Field object by name.
382 Use this method when you need access to the Field object itself,
383 not just its value.
385 Args:
386 key: Field name
388 Returns:
389 The Field object
391 Raises:
392 KeyError: If field not found
393 """
394 if key not in self.fields:
395 raise KeyError(f"Field '{key}' not found")
396 return self.fields[key]
398 def __getattr__(self, name: str) -> Any:
399 """Get field value by attribute access.
401 Provides convenient attribute-style access to field values.
402 Falls back to normal attribute access for non-field attributes.
404 Args:
405 name: Attribute/field name
407 Returns:
408 Field value if field exists, otherwise raises AttributeError
409 """
410 # Avoid infinite recursion for special attributes
411 if name.startswith("_") or name in ("fields", "metadata", "id"):
412 raise AttributeError(
413 f"'{type(self).__name__}' object has no attribute '{name}'"
414 )
416 # Check if it's a field
417 if hasattr(self, "fields") and name in self.fields:
418 return self.fields[name].value
420 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'")
422 def __setattr__(self, name: str, value: Any) -> None:
423 """Set field value by attribute access.
425 Allows setting field values using attribute syntax.
426 Special attributes (fields, metadata, _id, _storage_id) are handled normally.
427 Properties (id, storage_id) are also handled specially.
429 Args:
430 name: Attribute/field name
431 value: Value to set
432 """
433 # Handle special attributes and private attributes normally
434 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"):
435 super().__setattr__(name, value)
436 # Handle properties that have setters
437 elif name in ("id", "storage_id"):
438 # Use the property setter
439 object.__setattr__(self, name, value)
440 elif hasattr(self, "fields") and name in self.fields:
441 # Update existing field value
442 self.fields[name].value = value
443 else:
444 # For new fields during normal operation, create them
445 # But during __init__, we need to use normal attribute setting
446 if hasattr(self, "fields"):
447 self.set_field(name, value)
448 else:
449 super().__setattr__(name, value)
451 def to_dict(
452 self,
453 include_metadata: bool = False,
454 flatten: bool = True,
455 include_field_objects: bool = True,
456 ) -> dict[str, Any]:
457 """Convert record to dictionary.
459 Args:
460 include_metadata: Whether to include metadata in the output
461 flatten: If True (default), return just field values; if False, return structured format
462 include_field_objects: If True and not flattened, return full Field objects
464 Returns:
465 Dictionary representation of the record
466 """
467 if flatten:
468 # Simple dict with just values (default behavior for ergonomics)
469 result = {}
470 for name, field in self.fields.items():
471 # Handle VectorField specially to ensure JSON serialization
472 if hasattr(field, 'to_list') and callable(field.to_list):
473 # VectorField has a to_list() method for serialization
474 result[name] = field.to_list()
475 else:
476 result[name] = field.value
477 if self.id:
478 result["_id"] = self.id
479 if include_metadata and self.metadata:
480 result["_metadata"] = self.metadata
481 else:
482 # Structured format for serialization
483 if include_field_objects:
484 result = {
485 "fields": {
486 name: field.to_dict() for name, field in self.fields.items()
487 }
488 }
489 else:
490 result = {
491 "fields": {name: field.value for name, field in self.fields.items()}
492 }
493 if self.id:
494 result["id"] = self.id
495 if include_metadata:
496 result["metadata"] = self.metadata
497 return result
499 @classmethod
500 def from_dict(cls, data: dict[str, Any]) -> Record:
501 """Create a record from a dictionary representation."""
502 if "fields" in data:
503 fields = OrderedDict()
504 for name, field_data in data["fields"].items():
505 if isinstance(field_data, dict) and "value" in field_data:
506 # Add name to field_data for Field.from_dict
507 field_data_with_name = {"name": name, **field_data}
508 fields[name] = Field.from_dict(field_data_with_name)
509 else:
510 fields[name] = Field(name=name, value=field_data)
511 metadata = data.get("metadata", {})
512 record_id = data.get("id") or data.get("_id")
513 return cls(data=fields, metadata=metadata, id=record_id)
514 else:
515 # Check for _id in flattened format
516 record_id = data.pop("_id", None) if "_id" in data else None
517 return cls(data=data, id=record_id)
519 def copy(self, deep: bool = True) -> Record:
520 """Create a copy of the record.
522 Args:
523 deep: If True, create deep copies of fields and metadata
524 """
525 if deep:
526 import copy
528 new_fields = OrderedDict()
529 for name, field in self.fields.items():
530 # Preserve the actual field type (Field or VectorField)
531 if hasattr(field, '__class__'):
532 # Use the actual class of the field
533 field_class = field.__class__
534 if field_class.__name__ == 'VectorField':
535 # Import VectorField if needed
536 from dataknobs_data.fields import VectorField
537 new_fields[name] = VectorField(
538 name=field.name,
539 value=copy.deepcopy(field.value),
540 dimensions=getattr(field, 'dimensions', None),
541 source_field=getattr(field, 'source_field', None),
542 model_name=getattr(field, 'model_name', None),
543 model_version=getattr(field, 'model_version', None),
544 metadata=copy.deepcopy(field.metadata),
545 )
546 else:
547 new_fields[name] = Field(
548 name=field.name,
549 value=copy.deepcopy(field.value),
550 type=field.type,
551 metadata=copy.deepcopy(field.metadata),
552 )
553 else:
554 # Fallback to regular Field
555 new_fields[name] = Field(
556 name=field.name,
557 value=copy.deepcopy(field.value),
558 type=field.type,
559 metadata=copy.deepcopy(field.metadata),
560 )
561 new_metadata = copy.deepcopy(self.metadata)
562 else:
563 new_fields = OrderedDict(self.fields) # type: ignore[arg-type]
564 new_metadata = self.metadata.copy()
566 return Record(data=new_fields, metadata=new_metadata, id=self.id)
568 def project(self, field_names: list[str]) -> Record:
569 """Create a new record with only specified fields."""
570 projected_fields = OrderedDict()
571 for name in field_names:
572 if name in self.fields:
573 projected_fields[name] = self.fields[name]
574 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id)
576 def merge(self, other: Record, overwrite: bool = True) -> Record:
577 """Merge another record into this one.
579 Args:
580 other: The record to merge
581 overwrite: If True, overwrite existing fields; if False, keep existing
583 Returns:
584 A new merged record
585 """
586 merged_fields = OrderedDict(self.fields)
587 for name, field_obj in other.fields.items():
588 if overwrite or name not in merged_fields:
589 merged_fields[name] = field_obj
591 merged_metadata = self.metadata.copy()
592 if overwrite:
593 merged_metadata.update(other.metadata)
595 # Use the ID from this record, or from other if this doesn't have one
596 merged_id = self.id if self.id else other.id
598 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)