Coverage for src/dataknobs_data/pandas/converter.py: 0%
184 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
1"""Core converter between DataKnobs Records and Pandas DataFrames."""
3from __future__ import annotations
5from dataclasses import dataclass
6from typing import Any
8import pandas as pd
10from dataknobs_data.fields import Field
11from dataknobs_data.records import Record
13from .metadata import MetadataStrategy
14from .type_mapper import TypeMapper
17@dataclass
18class ConversionOptions:
19 """Options for conversion between Records and DataFrames."""
20 include_metadata: bool = False
21 metadata_columns: list[str] | None = None # Columns to treat as metadata
22 flatten_nested: bool = False # Flatten nested structures
23 preserve_index: bool = True
24 use_index_as_id: bool = False # Use DataFrame index as record ID
25 type_mapping: dict[str, str] | None = None # Custom type mappings
26 null_handling: str = "preserve" # "preserve", "drop", "fill"
27 datetime_format: str | None = None # Format for datetime conversion
28 timezone: str | None = None # Timezone for datetime conversion
30 # Keep these for backward compatibility
31 preserve_types: bool = True
32 index_column: str | None = None # Use specific field as index
33 flatten_json: bool = False
34 metadata_strategy: MetadataStrategy = MetadataStrategy.ATTRS
35 handle_missing: str = "preserve" # "preserve", "drop", "fill"
36 fill_value: Any = None
38 def __post_init__(self):
39 """Initialize default values for mutable parameters."""
40 if self.metadata_columns is None:
41 self.metadata_columns = []
42 if self.type_mapping is None:
43 self.type_mapping = {}
45 def merge_metadata(self, meta1: dict[str, Any], meta2: dict[str, Any]) -> dict[str, Any]:
46 """Merge two metadata dictionaries.
48 Args:
49 meta1: First metadata dict
50 meta2: Second metadata dict (overwrites meta1 on conflicts)
52 Returns:
53 Merged metadata dictionary
54 """
55 result = meta1.copy()
56 for key, value in meta2.items():
57 if key in result and isinstance(result[key], dict) and isinstance(value, dict):
58 # Recursively merge nested dicts
59 result[key] = self.merge_metadata(result[key], value)
60 else:
61 result[key] = value
62 return result
65class DataFrameConverter:
66 """Converts between DataKnobs Records and Pandas DataFrames."""
68 def __init__(self, type_mapper: TypeMapper | None = None):
69 """Initialize converter.
71 Args:
72 type_mapper: Custom type mapper (uses default if None)
73 """
74 self.type_mapper = type_mapper or TypeMapper()
76 def records_to_dataframe(
77 self,
78 records: list[Record],
79 options: ConversionOptions | None = None
80 ) -> pd.DataFrame:
81 """Convert list of Records to DataFrame.
83 Args:
84 records: List of Records to convert
85 options: Conversion options
87 Returns:
88 Pandas DataFrame
89 """
90 options = options or ConversionOptions()
92 if not records:
93 return pd.DataFrame()
95 # Extract data from records
96 data_rows = []
97 for record in records:
98 row = {}
100 # Add field values
101 for field_name, field in record.fields.items():
102 if options.flatten_nested and isinstance(field.value, dict):
103 # Flatten nested dictionaries
104 for nested_key, nested_val in field.value.items():
105 row[f"{field_name}.{nested_key}"] = nested_val
106 else:
107 row[field_name] = field.value
109 # Add metadata as a column if requested
110 if options.include_metadata and record.metadata:
111 row["_metadata"] = record.metadata
113 data_rows.append(row)
115 # Create DataFrame
116 df = pd.DataFrame(data_rows)
118 # Preserve column order from the first record for consistency
119 # This maintains the order fields were added to records
120 if not df.empty and records:
121 # Get column order from first record's fields
122 first_record = records[0]
123 column_order = []
125 # Add field columns in the order they appear in the record
126 for field_name in first_record.fields.keys():
127 if options.flatten_nested and isinstance(first_record.fields[field_name].value, dict):
128 # Add flattened columns
129 for nested_key in first_record.fields[field_name].value.keys():
130 col_name = f"{field_name}.{nested_key}"
131 if col_name in df.columns:
132 column_order.append(col_name)
133 elif field_name in df.columns:
134 column_order.append(field_name)
136 # Add any remaining columns (like _metadata) at the end
137 for col in df.columns:
138 if col not in column_order:
139 column_order.append(col)
141 # Reorder DataFrame columns
142 df = df[column_order]
144 # Set index if specified
145 if options.index_column and options.index_column in df.columns:
146 df = df.set_index(options.index_column)
147 elif options.preserve_index:
148 # Only use record IDs as index if:
149 # 1. They exist
150 # 2. They're not coming from a data field (id or record_id columns)
151 # 3. They don't look like auto-generated UUIDs
152 record_ids = [r.id for r in records]
154 # Check if the IDs are from data fields
155 ids_from_fields = any(
156 'id' in r.fields or 'record_id' in r.fields
157 for r in records
158 )
160 # Only set index if IDs exist, aren't from fields, and aren't UUIDs
161 if (any(record_ids) and not ids_from_fields and not all(
162 id and len(id) == 36 and id.count('-') == 4
163 for id in record_ids if id
164 )):
165 df.index = record_ids
166 df.index.name = "record_id"
168 return df
170 def dataframe_to_records(
171 self,
172 df: pd.DataFrame,
173 options: ConversionOptions | None = None
174 ) -> list[Record]:
175 """Convert DataFrame to list of Records.
177 Args:
178 df: DataFrame to convert
179 options: Conversion options
181 Returns:
182 List of Records
183 """
184 options = options or ConversionOptions()
186 records = []
188 # Convert each row to a Record
189 for idx, row in df.iterrows():
190 # Extract metadata for this row from metadata columns
191 row_metadata = {}
192 if options.metadata_columns:
193 for col in options.metadata_columns:
194 if col in row.index:
195 col_value = row[col]
196 # If the column value is a dict, merge it into metadata
197 if isinstance(col_value, dict):
198 row_metadata.update(col_value)
199 else:
200 # Otherwise store it with the column name (without leading underscore)
201 row_metadata[col.lstrip('_')] = col_value
203 # Prepare row data (excluding metadata columns)
204 row_data = {}
205 for col in row.index:
206 if options.metadata_columns is None or col not in options.metadata_columns:
207 row_data[col] = row[col]
209 # Determine record ID
210 record_id = None
211 if options.use_index_as_id:
212 if isinstance(idx, str):
213 record_id = idx
214 elif idx is not None and not pd.isna(idx): # type: ignore[call-overload]
215 record_id = str(idx)
217 # Create record
218 record = Record(data=row_data, metadata=row_metadata, id=record_id)
219 records.append(record)
221 return records
223 def record_to_series(self, record: Record) -> pd.Series:
224 """Convert a single Record to a Pandas Series.
226 Args:
227 record: Record to convert
229 Returns:
230 Pandas Series
231 """
232 data = {}
233 for field_name, field in record.fields.items():
234 if field.type is not None:
235 value = self.type_mapper.convert_value_to_pandas(field.value, field.type)
236 else:
237 value = field.value
238 data[field_name] = value
240 series = pd.Series(data)
241 if record.id:
242 series.name = record.id
244 return series
246 def series_to_record(
247 self,
248 series: pd.Series,
249 record_id: str | None = None
250 ) -> Record:
251 """Convert a Pandas Series to a Record.
253 Args:
254 series: Series to convert
255 record_id: Optional record ID
257 Returns:
258 Record
259 """
260 # Get ID - series.name is Hashable, we need str | None
261 id_value = record_id
262 if not id_value and hasattr(series, 'name'):
263 name = series.name
264 id_value = str(name) if name is not None else None
265 record = Record(id=id_value)
267 for column, value in series.items():
268 # Skip metadata columns
269 if isinstance(column, str) and column.startswith("_meta_"):
270 continue
272 # Infer field type
273 field_type = self.type_mapper.infer_field_type_from_value(value)
275 # Convert value
276 field_value = self.type_mapper.convert_value_from_pandas(value, field_type)
278 # Create field
279 field = Field(
280 name=str(column),
281 value=field_value,
282 type=field_type
283 )
284 record.fields[str(column)] = field
286 return record
288 def _series_to_record(
289 self,
290 row: pd.Series,
291 idx: Any,
292 options: ConversionOptions
293 ) -> Record:
294 """Convert a DataFrame row to a Record.
296 Args:
297 row: DataFrame row as Series
298 idx: Row index
299 options: Conversion options
301 Returns:
302 Record
303 """
304 # Determine record ID
305 record_id = None
306 if options.preserve_index:
307 if isinstance(idx, str):
308 record_id = idx
309 elif idx is not None and not pd.isna(idx):
310 record_id = str(idx)
312 record = Record(id=record_id)
314 for original_column, value in row.items():
315 # Skip metadata columns
316 if isinstance(original_column, str) and original_column.startswith("_meta_"):
317 continue
319 # Handle multi-index columns
320 column = original_column
321 if isinstance(column, tuple):
322 column = column[0] # Use first level
324 # Infer field type
325 field_type = self.type_mapper.infer_field_type_from_value(value)
327 # Convert value
328 field_value = self.type_mapper.convert_value_from_pandas(value, field_type)
330 # Create field
331 field = Field(
332 name=str(column),
333 value=field_value,
334 type=field_type
335 )
336 record.fields[str(column)] = field
338 return record
340 def _flatten_json_value(self, value: Any) -> Any:
341 """Flatten JSON value for DataFrame insertion.
343 Args:
344 value: JSON value (dict or list)
346 Returns:
347 Flattened value or string representation
348 """
349 # Check for None explicitly first
350 if value is None:
351 return value
353 # Check for pandas NA types
354 try:
355 if pd.isna(value):
356 return value
357 except (TypeError, ValueError):
358 # pd.isna doesn't work with lists/dicts
359 pass
361 if isinstance(value, dict):
362 # For dict, could expand to multiple columns
363 # For now, convert to string
364 return str(value)
365 elif isinstance(value, list):
366 # For list, convert to string
367 return str(value)
369 return value
371 def validate_conversion(
372 self,
373 records: list[Record],
374 df: pd.DataFrame,
375 options: ConversionOptions | None = None
376 ) -> dict[str, Any]:
377 """Validate conversion accuracy.
379 Args:
380 records: Original records
381 df: Converted DataFrame
382 options: Conversion options used
384 Returns:
385 Validation report
386 """
387 options = options or ConversionOptions()
389 report: dict[str, Any] = {
390 "record_count_match": len(records) == len(df),
391 "original_record_count": len(records),
392 "dataframe_row_count": len(df),
393 "field_preservation": {},
394 "type_preservation": {},
395 "value_accuracy": {}
396 }
398 # Check field preservation
399 original_fields = set()
400 for record in records:
401 original_fields.update(record.fields.keys())
403 df_columns = set(df.columns)
404 if options.metadata_strategy == MetadataStrategy.COLUMNS:
405 df_columns = {col for col in df_columns if not col.startswith("_meta_")}
407 report["field_preservation"] = {
408 "original_fields": sorted(original_fields),
409 "dataframe_columns": sorted(df_columns),
410 "missing_fields": sorted(original_fields - df_columns),
411 "extra_columns": sorted(df_columns - original_fields)
412 }
414 # Check type preservation if enabled
415 if options.preserve_types:
416 for record in records[:10]: # Sample first 10 records
417 for field_name, field in record.fields.items():
418 if field_name in df.columns and field.type is not None:
419 df_dtype = str(df[field_name].dtype)
420 expected_dtype = str(self.type_mapper.field_type_to_pandas(field.type))
421 if df_dtype != expected_dtype:
422 type_preservation = report["type_preservation"]
423 if not isinstance(type_preservation, dict):
424 raise TypeError("type_preservation should be a dict")
425 type_preservation[field_name] = {
426 "expected": expected_dtype,
427 "actual": df_dtype
428 }
430 return report