Coverage for src/dataknobs_data/pandas/converter.py: 0%

184 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""Core converter between DataKnobs Records and Pandas DataFrames.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6from typing import Any 

7 

8import pandas as pd 

9 

10from dataknobs_data.fields import Field 

11from dataknobs_data.records import Record 

12 

13from .metadata import MetadataStrategy 

14from .type_mapper import TypeMapper 

15 

16 

17@dataclass 

18class ConversionOptions: 

19 """Options for conversion between Records and DataFrames.""" 

20 include_metadata: bool = False 

21 metadata_columns: list[str] | None = None # Columns to treat as metadata 

22 flatten_nested: bool = False # Flatten nested structures 

23 preserve_index: bool = True 

24 use_index_as_id: bool = False # Use DataFrame index as record ID 

25 type_mapping: dict[str, str] | None = None # Custom type mappings 

26 null_handling: str = "preserve" # "preserve", "drop", "fill" 

27 datetime_format: str | None = None # Format for datetime conversion 

28 timezone: str | None = None # Timezone for datetime conversion 

29 

30 # Keep these for backward compatibility 

31 preserve_types: bool = True 

32 index_column: str | None = None # Use specific field as index 

33 flatten_json: bool = False 

34 metadata_strategy: MetadataStrategy = MetadataStrategy.ATTRS 

35 handle_missing: str = "preserve" # "preserve", "drop", "fill" 

36 fill_value: Any = None 

37 

38 def __post_init__(self): 

39 """Initialize default values for mutable parameters.""" 

40 if self.metadata_columns is None: 

41 self.metadata_columns = [] 

42 if self.type_mapping is None: 

43 self.type_mapping = {} 

44 

45 def merge_metadata(self, meta1: dict[str, Any], meta2: dict[str, Any]) -> dict[str, Any]: 

46 """Merge two metadata dictionaries. 

47  

48 Args: 

49 meta1: First metadata dict 

50 meta2: Second metadata dict (overwrites meta1 on conflicts) 

51  

52 Returns: 

53 Merged metadata dictionary 

54 """ 

55 result = meta1.copy() 

56 for key, value in meta2.items(): 

57 if key in result and isinstance(result[key], dict) and isinstance(value, dict): 

58 # Recursively merge nested dicts 

59 result[key] = self.merge_metadata(result[key], value) 

60 else: 

61 result[key] = value 

62 return result 

63 

64 

65class DataFrameConverter: 

66 """Converts between DataKnobs Records and Pandas DataFrames.""" 

67 

68 def __init__(self, type_mapper: TypeMapper | None = None): 

69 """Initialize converter. 

70  

71 Args: 

72 type_mapper: Custom type mapper (uses default if None) 

73 """ 

74 self.type_mapper = type_mapper or TypeMapper() 

75 

76 def records_to_dataframe( 

77 self, 

78 records: list[Record], 

79 options: ConversionOptions | None = None 

80 ) -> pd.DataFrame: 

81 """Convert list of Records to DataFrame. 

82  

83 Args: 

84 records: List of Records to convert 

85 options: Conversion options 

86  

87 Returns: 

88 Pandas DataFrame 

89 """ 

90 options = options or ConversionOptions() 

91 

92 if not records: 

93 return pd.DataFrame() 

94 

95 # Extract data from records 

96 data_rows = [] 

97 for record in records: 

98 row = {} 

99 

100 # Add field values 

101 for field_name, field in record.fields.items(): 

102 if options.flatten_nested and isinstance(field.value, dict): 

103 # Flatten nested dictionaries 

104 for nested_key, nested_val in field.value.items(): 

105 row[f"{field_name}.{nested_key}"] = nested_val 

106 else: 

107 row[field_name] = field.value 

108 

109 # Add metadata as a column if requested 

110 if options.include_metadata and record.metadata: 

111 row["_metadata"] = record.metadata 

112 

113 data_rows.append(row) 

114 

115 # Create DataFrame 

116 df = pd.DataFrame(data_rows) 

117 

118 # Preserve column order from the first record for consistency 

119 # This maintains the order fields were added to records 

120 if not df.empty and records: 

121 # Get column order from first record's fields 

122 first_record = records[0] 

123 column_order = [] 

124 

125 # Add field columns in the order they appear in the record 

126 for field_name in first_record.fields.keys(): 

127 if options.flatten_nested and isinstance(first_record.fields[field_name].value, dict): 

128 # Add flattened columns 

129 for nested_key in first_record.fields[field_name].value.keys(): 

130 col_name = f"{field_name}.{nested_key}" 

131 if col_name in df.columns: 

132 column_order.append(col_name) 

133 elif field_name in df.columns: 

134 column_order.append(field_name) 

135 

136 # Add any remaining columns (like _metadata) at the end 

137 for col in df.columns: 

138 if col not in column_order: 

139 column_order.append(col) 

140 

141 # Reorder DataFrame columns 

142 df = df[column_order] 

143 

144 # Set index if specified 

145 if options.index_column and options.index_column in df.columns: 

146 df = df.set_index(options.index_column) 

147 elif options.preserve_index: 

148 # Only use record IDs as index if: 

149 # 1. They exist 

150 # 2. They're not coming from a data field (id or record_id columns) 

151 # 3. They don't look like auto-generated UUIDs 

152 record_ids = [r.id for r in records] 

153 

154 # Check if the IDs are from data fields 

155 ids_from_fields = any( 

156 'id' in r.fields or 'record_id' in r.fields 

157 for r in records 

158 ) 

159 

160 # Only set index if IDs exist, aren't from fields, and aren't UUIDs 

161 if (any(record_ids) and not ids_from_fields and not all( 

162 id and len(id) == 36 and id.count('-') == 4 

163 for id in record_ids if id 

164 )): 

165 df.index = record_ids 

166 df.index.name = "record_id" 

167 

168 return df 

169 

170 def dataframe_to_records( 

171 self, 

172 df: pd.DataFrame, 

173 options: ConversionOptions | None = None 

174 ) -> list[Record]: 

175 """Convert DataFrame to list of Records. 

176  

177 Args: 

178 df: DataFrame to convert 

179 options: Conversion options 

180  

181 Returns: 

182 List of Records 

183 """ 

184 options = options or ConversionOptions() 

185 

186 records = [] 

187 

188 # Convert each row to a Record 

189 for idx, row in df.iterrows(): 

190 # Extract metadata for this row from metadata columns 

191 row_metadata = {} 

192 if options.metadata_columns: 

193 for col in options.metadata_columns: 

194 if col in row.index: 

195 col_value = row[col] 

196 # If the column value is a dict, merge it into metadata 

197 if isinstance(col_value, dict): 

198 row_metadata.update(col_value) 

199 else: 

200 # Otherwise store it with the column name (without leading underscore) 

201 row_metadata[col.lstrip('_')] = col_value 

202 

203 # Prepare row data (excluding metadata columns) 

204 row_data = {} 

205 for col in row.index: 

206 if options.metadata_columns is None or col not in options.metadata_columns: 

207 row_data[col] = row[col] 

208 

209 # Determine record ID 

210 record_id = None 

211 if options.use_index_as_id: 

212 if isinstance(idx, str): 

213 record_id = idx 

214 elif idx is not None and not pd.isna(idx): # type: ignore[call-overload] 

215 record_id = str(idx) 

216 

217 # Create record 

218 record = Record(data=row_data, metadata=row_metadata, id=record_id) 

219 records.append(record) 

220 

221 return records 

222 

223 def record_to_series(self, record: Record) -> pd.Series: 

224 """Convert a single Record to a Pandas Series. 

225  

226 Args: 

227 record: Record to convert 

228  

229 Returns: 

230 Pandas Series 

231 """ 

232 data = {} 

233 for field_name, field in record.fields.items(): 

234 if field.type is not None: 

235 value = self.type_mapper.convert_value_to_pandas(field.value, field.type) 

236 else: 

237 value = field.value 

238 data[field_name] = value 

239 

240 series = pd.Series(data) 

241 if record.id: 

242 series.name = record.id 

243 

244 return series 

245 

246 def series_to_record( 

247 self, 

248 series: pd.Series, 

249 record_id: str | None = None 

250 ) -> Record: 

251 """Convert a Pandas Series to a Record. 

252  

253 Args: 

254 series: Series to convert 

255 record_id: Optional record ID 

256  

257 Returns: 

258 Record 

259 """ 

260 # Get ID - series.name is Hashable, we need str | None 

261 id_value = record_id 

262 if not id_value and hasattr(series, 'name'): 

263 name = series.name 

264 id_value = str(name) if name is not None else None 

265 record = Record(id=id_value) 

266 

267 for column, value in series.items(): 

268 # Skip metadata columns 

269 if isinstance(column, str) and column.startswith("_meta_"): 

270 continue 

271 

272 # Infer field type 

273 field_type = self.type_mapper.infer_field_type_from_value(value) 

274 

275 # Convert value 

276 field_value = self.type_mapper.convert_value_from_pandas(value, field_type) 

277 

278 # Create field 

279 field = Field( 

280 name=str(column), 

281 value=field_value, 

282 type=field_type 

283 ) 

284 record.fields[str(column)] = field 

285 

286 return record 

287 

288 def _series_to_record( 

289 self, 

290 row: pd.Series, 

291 idx: Any, 

292 options: ConversionOptions 

293 ) -> Record: 

294 """Convert a DataFrame row to a Record. 

295  

296 Args: 

297 row: DataFrame row as Series 

298 idx: Row index 

299 options: Conversion options 

300  

301 Returns: 

302 Record 

303 """ 

304 # Determine record ID 

305 record_id = None 

306 if options.preserve_index: 

307 if isinstance(idx, str): 

308 record_id = idx 

309 elif idx is not None and not pd.isna(idx): 

310 record_id = str(idx) 

311 

312 record = Record(id=record_id) 

313 

314 for original_column, value in row.items(): 

315 # Skip metadata columns 

316 if isinstance(original_column, str) and original_column.startswith("_meta_"): 

317 continue 

318 

319 # Handle multi-index columns 

320 column = original_column 

321 if isinstance(column, tuple): 

322 column = column[0] # Use first level 

323 

324 # Infer field type 

325 field_type = self.type_mapper.infer_field_type_from_value(value) 

326 

327 # Convert value 

328 field_value = self.type_mapper.convert_value_from_pandas(value, field_type) 

329 

330 # Create field 

331 field = Field( 

332 name=str(column), 

333 value=field_value, 

334 type=field_type 

335 ) 

336 record.fields[str(column)] = field 

337 

338 return record 

339 

340 def _flatten_json_value(self, value: Any) -> Any: 

341 """Flatten JSON value for DataFrame insertion. 

342  

343 Args: 

344 value: JSON value (dict or list) 

345  

346 Returns: 

347 Flattened value or string representation 

348 """ 

349 # Check for None explicitly first 

350 if value is None: 

351 return value 

352 

353 # Check for pandas NA types 

354 try: 

355 if pd.isna(value): 

356 return value 

357 except (TypeError, ValueError): 

358 # pd.isna doesn't work with lists/dicts 

359 pass 

360 

361 if isinstance(value, dict): 

362 # For dict, could expand to multiple columns 

363 # For now, convert to string 

364 return str(value) 

365 elif isinstance(value, list): 

366 # For list, convert to string 

367 return str(value) 

368 

369 return value 

370 

371 def validate_conversion( 

372 self, 

373 records: list[Record], 

374 df: pd.DataFrame, 

375 options: ConversionOptions | None = None 

376 ) -> dict[str, Any]: 

377 """Validate conversion accuracy. 

378  

379 Args: 

380 records: Original records 

381 df: Converted DataFrame 

382 options: Conversion options used 

383  

384 Returns: 

385 Validation report 

386 """ 

387 options = options or ConversionOptions() 

388 

389 report: dict[str, Any] = { 

390 "record_count_match": len(records) == len(df), 

391 "original_record_count": len(records), 

392 "dataframe_row_count": len(df), 

393 "field_preservation": {}, 

394 "type_preservation": {}, 

395 "value_accuracy": {} 

396 } 

397 

398 # Check field preservation 

399 original_fields = set() 

400 for record in records: 

401 original_fields.update(record.fields.keys()) 

402 

403 df_columns = set(df.columns) 

404 if options.metadata_strategy == MetadataStrategy.COLUMNS: 

405 df_columns = {col for col in df_columns if not col.startswith("_meta_")} 

406 

407 report["field_preservation"] = { 

408 "original_fields": sorted(original_fields), 

409 "dataframe_columns": sorted(df_columns), 

410 "missing_fields": sorted(original_fields - df_columns), 

411 "extra_columns": sorted(df_columns - original_fields) 

412 } 

413 

414 # Check type preservation if enabled 

415 if options.preserve_types: 

416 for record in records[:10]: # Sample first 10 records 

417 for field_name, field in record.fields.items(): 

418 if field_name in df.columns and field.type is not None: 

419 df_dtype = str(df[field_name].dtype) 

420 expected_dtype = str(self.type_mapper.field_type_to_pandas(field.type)) 

421 if df_dtype != expected_dtype: 

422 type_preservation = report["type_preservation"] 

423 if not isinstance(type_preservation, dict): 

424 raise TypeError("type_preservation should be a dict") 

425 type_preservation[field_name] = { 

426 "expected": expected_dtype, 

427 "actual": df_dtype 

428 } 

429 

430 return report