Coverage for src / dataknobs_data / records.py: 20%
256 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 15:45 -0700
1"""Structured data records with typed fields and metadata.
3This module defines the Record class for representing structured data with
4typed fields, validation, and conversion utilities for database operations.
5"""
7from __future__ import annotations
9import uuid
10from collections import OrderedDict
11from dataclasses import dataclass, field
12from typing import Any, TYPE_CHECKING
14from .fields import Field, FieldType
16if TYPE_CHECKING:
17 from collections.abc import Iterator
20@dataclass
21class Record:
22 """Represents a structured data record with fields and metadata.
24 The record ID can be accessed via the `id` property, which:
25 - Returns the storage_id if set (database-assigned ID)
26 - Falls back to user-defined 'id' field if present
27 - Returns None if no ID is available
29 This separation allows records to have both:
30 - A user-defined 'id' field as part of their data
31 - A system-assigned storage_id for database operations
33 Example:
34 ```python
35 from dataknobs_data import Record, Field, FieldType
37 # Create record from dict
38 record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})
40 # Access field values
41 print(record.get_value("name")) # "Alice"
42 print(record["age"]) # 30
43 print(record.name) # "Alice" (attribute access)
45 # Set field values
46 record.set_value("age", 31)
47 record["city"] = "New York"
49 # Work with metadata
50 record.metadata["source"] = "user_input"
52 # Convert to dict
53 data = record.to_dict() # {"name": "Alice", "age": 31, "email": "...", "city": "..."}
54 ```
55 """
57 fields: OrderedDict[str, Field] = field(default_factory=OrderedDict)
58 metadata: dict[str, Any] = field(default_factory=dict)
59 _id: str | None = field(default=None, repr=False) # Deprecated, use storage_id
60 _storage_id: str | None = field(default=None, repr=False)
62 def __init__(
63 self,
64 data: dict[str, Any] | OrderedDict[str, Field] | None = None,
65 metadata: dict[str, Any] | None = None,
66 id: str | None = None,
67 storage_id: str | None = None,
68 ):
69 """Initialize a record from various data formats.
71 Args:
72 data: Can be a dict of field names to values, or an OrderedDict of Field objects
73 metadata: Optional metadata for the record
74 id: Optional unique identifier for the record (deprecated, use storage_id)
75 storage_id: Optional storage system identifier for the record
77 Example:
78 ```python
79 # From simple dict
80 record = Record({"name": "Alice", "age": 30})
82 # With metadata
83 record = Record(
84 data={"name": "Bob"},
85 metadata={"source": "api", "timestamp": "2024-01-01"}
86 )
88 # With storage_id
89 record = Record(
90 data={"name": "Charlie"},
91 storage_id="550e8400-e29b-41d4-a716-446655440000"
92 )
93 ```
94 """
95 self.metadata = metadata or {}
96 self.fields = OrderedDict()
97 self._id = id # Deprecated
98 self._storage_id = storage_id or id # Use storage_id if provided, fall back to id
100 # Process data first to populate fields
101 if data:
102 if isinstance(data, OrderedDict) and all(
103 isinstance(v, Field) for v in data.values()
104 ):
105 self.fields = data
106 else:
107 for key, value in data.items():
108 if isinstance(value, Field):
109 # Ensure the field has the correct name
110 if value.name is None or value.name == "embedding":
111 value.name = key
112 self.fields[key] = value
113 else:
114 self.fields[key] = Field(name=key, value=value)
116 # Now check for ID from various sources if not explicitly provided
117 if self._id is None:
118 # Check metadata
119 if "id" in self.metadata:
120 self._id = str(self.metadata["id"])
121 # Check fields for id
122 elif "id" in self.fields:
123 value = self.get_value("id")
124 if value is not None:
125 self._id = str(value)
126 # Sync to metadata
127 self.metadata["id"] = self._id
128 # Check fields for record_id
129 elif "record_id" in self.fields:
130 value = self.get_value("record_id")
131 if value is not None:
132 self._id = str(value)
133 # Sync to metadata
134 self.metadata["id"] = self._id
136 @property
137 def storage_id(self) -> str | None:
138 """Get the storage system ID (database-assigned ID)."""
139 return self._storage_id
141 @storage_id.setter
142 def storage_id(self, value: str | None) -> None:
143 """Set the storage system ID."""
144 self._storage_id = value
145 # Also update _id for backwards compatibility
146 self._id = value
148 @property
149 def id(self) -> str | None:
150 """Get the record ID.
152 Priority order:
153 1. Storage ID (database-assigned) if set
154 2. User-defined 'id' field value
155 3. Metadata 'id' (for backwards compatibility)
156 4. record_id field (common in DataFrames)
158 Returns the first ID found, or None if no ID is present.
159 """
160 # 1. Prefer storage ID (database-assigned)
161 if self._storage_id is not None:
162 return self._storage_id
164 # 2. Fall back to legacy _id if set
165 if self._id is not None:
166 return self._id
168 # 3. Check for 'id' field in user data
169 if "id" in self.fields:
170 value = self.get_value("id")
171 if value is not None:
172 return str(value)
174 # 4. Check metadata (backwards compatibility)
175 if "id" in self.metadata:
176 return str(self.metadata["id"])
178 # 5. Check for 'record_id' field (common in DataFrames)
179 if "record_id" in self.fields:
180 value = self.get_value("record_id")
181 if value is not None:
182 return str(value)
184 return None
186 @id.setter
187 def id(self, value: str | None) -> None:
188 """Set the record ID.
190 This sets the storage_id, which is the database-assigned ID.
191 It does NOT modify user data fields.
192 """
193 self._storage_id = value
194 self._id = value # Backwards compatibility
196 # Update metadata for backward compatibility
197 if value is not None:
198 self.metadata["id"] = value
199 elif "id" in self.metadata:
200 del self.metadata["id"]
202 def generate_id(self) -> str:
203 """Generate and set a new UUID for this record.
205 Returns:
206 The generated UUID string
207 """
208 new_id = str(uuid.uuid4())
209 self.id = new_id
210 return new_id
212 def get_user_id(self) -> str | None:
213 """Get the user-defined ID field value (not the storage ID).
215 This explicitly returns the value of the 'id' field in the record's data,
216 ignoring any storage_id that may be set.
218 Returns:
219 The value of the 'id' field if present, None otherwise
220 """
221 if "id" in self.fields:
222 value = self.get_value("id")
223 if value is not None:
224 return str(value)
225 return None
227 def has_storage_id(self) -> bool:
228 """Check if this record has a storage system ID assigned.
230 Returns:
231 True if storage_id is set, False otherwise
232 """
233 return self._storage_id is not None
235 def get_field(self, name: str) -> Field | None:
236 """Get a field by name."""
237 return self.fields.get(name)
239 def get_value(self, name: str, default: Any = None) -> Any:
240 """Get a field's value by name, supporting dot-notation for nested paths.
242 Args:
243 name: Field name or dot-notation path (e.g., "metadata.type")
244 default: Default value if field not found
246 Returns:
247 The field value or default
249 Example:
250 ```python
251 record = Record({
252 "name": "Alice",
253 "config": {"timeout": 30, "retries": 3}
254 })
256 # Simple field access
257 name = record.get_value("name") # "Alice"
259 # Nested path access
260 timeout = record.get_value("config.timeout") # 30
262 # With default
263 missing = record.get_value("missing_field", "default") # "default"
264 ```
265 """
266 # Check if this is a nested path
267 if "." in name:
268 return self.get_nested_value(name, default)
270 # Simple field lookup
271 field = self.get_field(name)
272 return field.value if field else default
274 def get_nested_value(self, path: str, default: Any = None) -> Any:
275 """Get a value from a nested path using dot notation.
277 Supports paths like:
278 - "metadata.type" - access metadata field (if exists) or metadata dict attribute
279 - "fields.temperature" - access field values
280 - "metadata.config.timeout" - nested dict access
282 Args:
283 path: Dot-notation path to the value
284 default: Default value if path not found
286 Returns:
287 The value at the path or default
288 """
289 parts = path.split(".", 1)
290 if len(parts) == 1:
291 # No more nesting, get the value
292 return self.get_value(parts[0], default)
294 root, remaining = parts
296 # Handle special root paths
297 if root == "metadata":
298 # Check if "metadata" is a field first, before falling back to attribute
299 if root in self.fields:
300 # It's a field, navigate through its value
301 field_value = self.get_value(root, None)
302 if isinstance(field_value, dict):
303 return self._traverse_dict(field_value, remaining, default)
304 return default
305 elif self.metadata:
306 # Fall back to record's metadata attribute
307 return self._traverse_dict(self.metadata, remaining, default)
308 else:
309 return default
310 elif root == "fields":
311 # Get field value by name
312 if "." in remaining:
313 # Nested path within field value (if it's a dict)
314 field_name, field_path = remaining.split(".", 1)
315 field_value = self.get_value(field_name, None)
316 if isinstance(field_value, dict):
317 return self._traverse_dict(field_value, field_path, default)
318 return default
319 else:
320 # Simple field access
321 return self.get_value(remaining, default)
322 else:
323 # Check if it's a field containing a dict
324 field_value = self.get_value(root, None)
325 if isinstance(field_value, dict):
326 return self._traverse_dict(field_value, remaining, default)
327 return default
329 def _traverse_dict(self, data: dict, path: str, default: Any = None) -> Any:
330 """Traverse a dictionary using dot notation.
332 Args:
333 data: Dictionary to traverse
334 path: Dot-notation path
335 default: Default value if path not found
337 Returns:
338 Value at path or default
339 """
340 parts = path.split(".")
341 current = data
343 for part in parts:
344 if isinstance(current, dict) and part in current:
345 current = current[part]
346 else:
347 return default
349 return current
351 def set_field(
352 self,
353 name: str,
354 value: Any,
355 field_type: FieldType | None = None,
356 field_metadata: dict[str, Any] | None = None,
357 ) -> None:
358 """Set or update a field."""
359 self.fields[name] = Field(
360 name=name, value=value, type=field_type, metadata=field_metadata or {}
361 )
363 def set_value(self, name: str, value: Any) -> None:
364 """Set a field's value by name.
366 Convenience method that creates the field if it doesn't exist.
367 """
368 if name in self.fields:
369 self.fields[name].value = value
370 else:
371 self.set_field(name, value)
373 @property
374 def data(self) -> dict[str, Any]:
375 """Get all field values as a dictionary.
377 Provides a simple dict-like view of the record's data.
378 """
379 return {name: field.value for name, field in self.fields.items()}
381 def remove_field(self, name: str) -> bool:
382 """Remove a field by name. Returns True if field was removed."""
383 if name in self.fields:
384 del self.fields[name]
385 return True
386 return False
388 def has_field(self, name: str) -> bool:
389 """Check if a field exists."""
390 return name in self.fields
392 def field_names(self) -> list[str]:
393 """Get list of field names."""
394 return list(self.fields.keys())
396 def field_count(self) -> int:
397 """Get the number of fields."""
398 return len(self.fields)
400 def __getitem__(self, key: str | int) -> Any:
401 """Get field value by name or field by index.
403 For string keys, returns the field value directly (dict-like access).
404 For integer keys, returns the Field object at that index for backward compatibility.
405 """
406 if isinstance(key, str):
407 if key not in self.fields:
408 raise KeyError(f"Field '{key}' not found")
409 return self.fields[key].value
410 elif isinstance(key, int):
411 field_list = list(self.fields.values())
412 if key < 0 or key >= len(field_list):
413 raise IndexError(f"Field index {key} out of range")
414 return field_list[key]
415 else:
416 raise TypeError(f"Key must be str or int, got {type(key)}")
418 def __setitem__(self, key: str, value: Field | Any) -> None:
419 """Set field by name.
421 Can accept either a Field object or a raw value.
422 When given a raw value, creates a new Field automatically.
423 """
424 if isinstance(value, Field):
425 self.fields[key] = value
426 else:
427 self.set_field(key, value)
429 def __delitem__(self, key: str) -> None:
430 """Delete field by name."""
431 if key not in self.fields:
432 raise KeyError(f"Field '{key}' not found")
433 del self.fields[key]
435 def __contains__(self, key: str) -> bool:
436 """Check if field exists."""
437 return key in self.fields
439 def __iter__(self) -> Iterator[str]:
440 """Iterate over field names."""
441 return iter(self.fields)
443 def __len__(self) -> int:
444 """Get number of fields."""
445 return len(self.fields)
447 def validate(self) -> bool:
448 """Validate all fields in the record."""
449 return all(field.validate() for field in self.fields.values())
451 def get_field_object(self, key: str) -> Field:
452 """Get the Field object by name.
454 Use this method when you need access to the Field object itself,
455 not just its value.
457 Args:
458 key: Field name
460 Returns:
461 The Field object
463 Raises:
464 KeyError: If field not found
465 """
466 if key not in self.fields:
467 raise KeyError(f"Field '{key}' not found")
468 return self.fields[key]
470 def __getattr__(self, name: str) -> Any:
471 """Get field value by attribute access.
473 Provides convenient attribute-style access to field values.
474 Falls back to normal attribute access for non-field attributes.
476 Args:
477 name: Attribute/field name
479 Returns:
480 Field value if field exists, otherwise raises AttributeError
481 """
482 # Avoid infinite recursion for special attributes
483 if name.startswith("_") or name in ("fields", "metadata", "id"):
484 raise AttributeError(
485 f"'{type(self).__name__}' object has no attribute '{name}'"
486 )
488 # Check if it's a field
489 if hasattr(self, "fields") and name in self.fields:
490 return self.fields[name].value
492 raise AttributeError(f"'{type(self).__name__}' object has no field '{name}'")
494 def __setattr__(self, name: str, value: Any) -> None:
495 """Set field value by attribute access.
497 Allows setting field values using attribute syntax.
498 Special attributes (fields, metadata, _id, _storage_id) are handled normally.
499 Properties (id, storage_id) are also handled specially.
501 Args:
502 name: Attribute/field name
503 value: Value to set
504 """
505 # Handle special attributes and private attributes normally
506 if name in ("fields", "metadata", "_id", "_storage_id") or name.startswith("_"):
507 super().__setattr__(name, value)
508 # Handle properties that have setters
509 elif name in ("id", "storage_id"):
510 # Use the property setter
511 object.__setattr__(self, name, value)
512 elif hasattr(self, "fields") and name in self.fields:
513 # Update existing field value
514 self.fields[name].value = value
515 else:
516 # For new fields during normal operation, create them
517 # But during __init__, we need to use normal attribute setting
518 if hasattr(self, "fields"):
519 self.set_field(name, value)
520 else:
521 super().__setattr__(name, value)
523 def to_dict(
524 self,
525 include_metadata: bool = False,
526 flatten: bool = True,
527 include_field_objects: bool = True,
528 ) -> dict[str, Any]:
529 """Convert record to dictionary.
531 Args:
532 include_metadata: Whether to include metadata in the output
533 flatten: If True (default), return just field values; if False, return structured format
534 include_field_objects: If True and not flattened, return full Field objects
536 Returns:
537 Dictionary representation of the record
538 """
539 if flatten:
540 # Simple dict with just values (default behavior for ergonomics)
541 result = {}
542 for name, field in self.fields.items():
543 # Handle VectorField specially to ensure JSON serialization
544 if hasattr(field, 'to_list') and callable(field.to_list):
545 # VectorField has a to_list() method for serialization
546 result[name] = field.to_list()
547 else:
548 result[name] = field.value
549 if self.id:
550 result["_id"] = self.id
551 if include_metadata and self.metadata:
552 result["_metadata"] = self.metadata
553 else:
554 # Structured format for serialization
555 if include_field_objects:
556 result = {
557 "fields": {
558 name: field.to_dict() for name, field in self.fields.items()
559 }
560 }
561 else:
562 result = {
563 "fields": {name: field.value for name, field in self.fields.items()}
564 }
565 if self.id:
566 result["id"] = self.id
567 if include_metadata:
568 result["metadata"] = self.metadata
569 return result
571 @classmethod
572 def from_dict(cls, data: dict[str, Any]) -> Record:
573 """Create a record from a dictionary representation.
575 Args:
576 data: Dictionary containing record data
578 Returns:
579 A new Record instance
581 Example:
582 ```python
583 # From simple dict
584 data = {"name": "Alice", "age": 30}
585 record = Record.from_dict(data)
587 # From structured format
588 data = {
589 "fields": {
590 "name": {"value": "Alice", "type": "string"},
591 "age": {"value": 30, "type": "integer"}
592 },
593 "metadata": {"source": "api"}
594 }
595 record = Record.from_dict(data)
596 ```
597 """
598 if "fields" in data:
599 fields = OrderedDict()
600 for name, field_data in data["fields"].items():
601 if isinstance(field_data, dict) and "value" in field_data:
602 # Add name to field_data for Field.from_dict
603 field_data_with_name = {"name": name, **field_data}
604 fields[name] = Field.from_dict(field_data_with_name)
605 else:
606 fields[name] = Field(name=name, value=field_data)
607 metadata = data.get("metadata", {})
608 record_id = data.get("id") or data.get("_id")
609 return cls(data=fields, metadata=metadata, id=record_id)
610 else:
611 # Check for _id in flattened format
612 record_id = data.pop("_id", None) if "_id" in data else None
613 return cls(data=data, id=record_id)
615 def copy(self, deep: bool = True) -> Record:
616 """Create a copy of the record.
618 Args:
619 deep: If True, create deep copies of fields and metadata
620 """
621 if deep:
622 import copy
624 new_fields = OrderedDict()
625 for name, field in self.fields.items():
626 # Preserve the actual field type (Field or VectorField)
627 if hasattr(field, '__class__'):
628 # Use the actual class of the field
629 field_class = field.__class__
630 if field_class.__name__ == 'VectorField':
631 # Import VectorField if needed
632 from dataknobs_data.fields import VectorField
633 new_fields[name] = VectorField(
634 name=field.name,
635 value=copy.deepcopy(field.value),
636 dimensions=getattr(field, 'dimensions', None),
637 source_field=getattr(field, 'source_field', None),
638 model_name=getattr(field, 'model_name', None),
639 model_version=getattr(field, 'model_version', None),
640 metadata=copy.deepcopy(field.metadata),
641 )
642 else:
643 new_fields[name] = Field(
644 name=field.name,
645 value=copy.deepcopy(field.value),
646 type=field.type,
647 metadata=copy.deepcopy(field.metadata),
648 )
649 else:
650 # Fallback to regular Field
651 new_fields[name] = Field(
652 name=field.name,
653 value=copy.deepcopy(field.value),
654 type=field.type,
655 metadata=copy.deepcopy(field.metadata),
656 )
657 new_metadata = copy.deepcopy(self.metadata)
658 else:
659 new_fields = OrderedDict(self.fields) # type: ignore[arg-type]
660 new_metadata = self.metadata.copy()
662 return Record(data=new_fields, metadata=new_metadata, id=self.id)
664 def project(self, field_names: list[str]) -> Record:
665 """Create a new record with only specified fields.
667 Args:
668 field_names: List of field names to include in the projection
670 Returns:
671 A new Record containing only the specified fields
673 Example:
674 ```python
675 record = Record({"name": "Alice", "age": 30, "email": "alice@example.com"})
677 # Project to specific fields
678 subset = record.project(["name", "age"])
679 print(subset.field_names()) # ["name", "age"]
680 ```
681 """
682 projected_fields = OrderedDict()
683 for name in field_names:
684 if name in self.fields:
685 projected_fields[name] = self.fields[name]
686 return Record(data=projected_fields, metadata=self.metadata.copy(), id=self.id)
688 def merge(self, other: Record, overwrite: bool = True) -> Record:
689 """Merge another record into this one.
691 Args:
692 other: The record to merge
693 overwrite: If True, overwrite existing fields; if False, keep existing
695 Returns:
696 A new merged record
697 """
698 merged_fields = OrderedDict(self.fields)
699 for name, field_obj in other.fields.items():
700 if overwrite or name not in merged_fields:
701 merged_fields[name] = field_obj
703 merged_metadata = self.metadata.copy()
704 if overwrite:
705 merged_metadata.update(other.metadata)
707 # Use the ID from this record, or from other if this doesn't have one
708 merged_id = self.id if self.id else other.id
710 return Record(data=merged_fields, metadata=merged_metadata, id=merged_id)