Coverage for src/dataknobs_data/pandas/type_mapper.py: 0%

247 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-29 14:14 -0600

1"""Type mapping between DataKnobs Field types and Pandas dtypes.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from dataclasses import dataclass 

7from datetime import datetime 

8from typing import Any, TYPE_CHECKING 

9 

10import numpy as np 

11import pandas as pd 

12from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype 

13 

14from dataknobs_data.fields import FieldType 

15 

16if TYPE_CHECKING: 

17 from collections.abc import Callable 

18 

19 

20@dataclass 

21class PandasTypeMapping: 

22 """Mapping configuration for type conversion.""" 

23 field_type: FieldType 

24 pandas_dtype: str | type | np.dtype 

25 nullable: bool = True 

26 converter: Callable | None = None 

27 reverse_converter: Callable | None = None 

28 

29 

30class TypeMapper: 

31 """Handles type mapping between DataKnobs Field types and Pandas dtypes.""" 

32 

33 def __init__(self): 

34 """Initialize type mapper with default mappings.""" 

35 self._init_mappings() 

36 

37 def _init_mappings(self): 

38 """Initialize type mappings.""" 

39 self._field_to_pandas: dict[FieldType, PandasTypeMapping] = { 

40 FieldType.STRING: PandasTypeMapping( 

41 field_type=FieldType.STRING, 

42 pandas_dtype="string", # pd.StringDtype() 

43 nullable=True 

44 ), 

45 FieldType.INTEGER: PandasTypeMapping( 

46 field_type=FieldType.INTEGER, 

47 pandas_dtype="Int64", # pd.Int64Dtype() 

48 nullable=True 

49 ), 

50 FieldType.FLOAT: PandasTypeMapping( 

51 field_type=FieldType.FLOAT, 

52 pandas_dtype="Float64", # pd.Float64Dtype() 

53 nullable=True 

54 ), 

55 FieldType.BOOLEAN: PandasTypeMapping( 

56 field_type=FieldType.BOOLEAN, 

57 pandas_dtype="boolean", # pd.BooleanDtype() 

58 nullable=True 

59 ), 

60 FieldType.DATETIME: PandasTypeMapping( 

61 field_type=FieldType.DATETIME, 

62 pandas_dtype="datetime64[ns]", 

63 nullable=True, 

64 converter=self._to_datetime, 

65 reverse_converter=self._from_datetime 

66 ), 

67 FieldType.JSON: PandasTypeMapping( 

68 field_type=FieldType.JSON, 

69 pandas_dtype="object", 

70 nullable=True, 

71 converter=self._to_json_object, 

72 reverse_converter=self._from_json_object 

73 ), 

74 FieldType.BINARY: PandasTypeMapping( 

75 field_type=FieldType.BINARY, 

76 pandas_dtype="object", 

77 nullable=True 

78 ), 

79 FieldType.TEXT: PandasTypeMapping( 

80 field_type=FieldType.TEXT, 

81 pandas_dtype="string", 

82 nullable=True 

83 ), 

84 } 

85 

86 # Reverse mapping from pandas to field types 

87 self._pandas_to_field: dict[str, FieldType] = { 

88 "string": FieldType.STRING, 

89 "int64": FieldType.INTEGER, 

90 "float64": FieldType.FLOAT, 

91 "boolean": FieldType.BOOLEAN, 

92 "datetime64[ns]": FieldType.DATETIME, 

93 "object": FieldType.STRING, # Default object to STRING, not JSON 

94 } 

95 

96 def field_type_to_pandas(self, field_type: FieldType) -> str | type | np.dtype: 

97 """Convert FieldType to pandas dtype. 

98  

99 Args: 

100 field_type: DataKnobs FieldType 

101  

102 Returns: 

103 Corresponding pandas dtype 

104 """ 

105 mapping = self._field_to_pandas.get(field_type) 

106 if mapping: 

107 return mapping.pandas_dtype 

108 return "object" # Default fallback 

109 

110 def pandas_to_field_type(self, dtype: str | np.dtype | type) -> FieldType: 

111 """Infer FieldType from pandas dtype. 

112  

113 Args: 

114 dtype: Pandas dtype 

115  

116 Returns: 

117 Corresponding FieldType 

118 """ 

119 dtype_str = str(dtype).lower() 

120 

121 # Direct mapping 

122 if dtype_str in self._pandas_to_field: 

123 return self._pandas_to_field[dtype_str] 

124 

125 # Infer from dtype categories 

126 if "int" in dtype_str: 

127 return FieldType.INTEGER 

128 elif "float" in dtype_str: 

129 return FieldType.FLOAT 

130 elif "bool" in dtype_str: 

131 return FieldType.BOOLEAN 

132 elif "datetime" in dtype_str or "timestamp" in dtype_str: 

133 return FieldType.DATETIME 

134 elif dtype_str == "string" or dtype_str == "object": 

135 return FieldType.STRING 

136 elif "bytes" in dtype_str: 

137 return FieldType.BINARY 

138 

139 return FieldType.STRING # Default fallback 

140 

141 def convert_value_to_pandas(self, value: Any, field_type: FieldType) -> Any: 

142 """Convert a field value to pandas-compatible format. 

143  

144 Args: 

145 value: Value to convert 

146 field_type: Source field type 

147  

148 Returns: 

149 Pandas-compatible value 

150 """ 

151 if value is None: 

152 return pd.NA 

153 

154 mapping = self._field_to_pandas.get(field_type) 

155 if mapping and mapping.converter: 

156 return mapping.converter(value) 

157 

158 return value 

159 

160 def convert_value_from_pandas(self, value: Any, field_type: FieldType) -> Any: 

161 """Convert a pandas value to field-compatible format. 

162  

163 Args: 

164 value: Pandas value 

165 field_type: Target field type 

166  

167 Returns: 

168 Field-compatible value 

169 """ 

170 # Handle pandas NA/NaN/None 

171 # Use try-except to handle arrays and other special cases 

172 try: 

173 if pd.isna(value): 

174 return None 

175 except (TypeError, ValueError): 

176 # pd.isna can fail on arrays/lists 

177 pass 

178 

179 mapping = self._field_to_pandas.get(field_type) 

180 if mapping and mapping.reverse_converter: 

181 return mapping.reverse_converter(value) 

182 

183 # Handle numpy types 

184 if isinstance(value, (np.integer, np.floating, np.bool_)): 

185 return value.item() 

186 

187 return value 

188 

189 def infer_field_type_from_value(self, value: Any) -> FieldType: 

190 """Infer FieldType from a Python value. 

191  

192 Args: 

193 value: Value to analyze 

194  

195 Returns: 

196 Inferred FieldType 

197 """ 

198 if value is None: 

199 return FieldType.STRING # Default for null 

200 

201 # Check for pandas NA separately to avoid array ambiguity 

202 try: 

203 if pd.isna(value): 

204 return FieldType.STRING 

205 except (TypeError, ValueError): 

206 # pd.isna can fail on some types like lists 

207 pass 

208 

209 if isinstance(value, (bool, np.bool_)): 

210 return FieldType.BOOLEAN 

211 elif isinstance(value, (int, np.integer)): 

212 return FieldType.INTEGER 

213 elif isinstance(value, (float, np.floating)): 

214 return FieldType.FLOAT 

215 elif isinstance(value, (datetime, pd.Timestamp)): 

216 return FieldType.DATETIME 

217 elif isinstance(value, bytes): # type: ignore[unreachable] 

218 return FieldType.BINARY 

219 elif isinstance(value, (dict, list)): 

220 return FieldType.JSON 

221 elif isinstance(value, str): 

222 if len(value) > 1000: 

223 return FieldType.TEXT 

224 return FieldType.STRING 

225 else: 

226 # Complex objects as JSON 

227 return FieldType.JSON 

228 

229 def cast_series(self, series: pd.Series, field_type: FieldType) -> pd.Series: 

230 """Cast a pandas Series to the appropriate dtype for a FieldType. 

231  

232 Args: 

233 series: Series to cast 

234 field_type: Target field type 

235  

236 Returns: 

237 Casted Series 

238 """ 

239 target_dtype = self.field_type_to_pandas(field_type) 

240 

241 try: 

242 # Special handling for datetime 

243 if field_type == FieldType.DATETIME: 

244 return pd.to_datetime(series, errors='coerce') 

245 

246 # Special handling for JSON 

247 if field_type == FieldType.JSON: 

248 return series.apply(self._ensure_json_serializable) 

249 

250 # Standard casting 

251 return series.astype(target_dtype) # type: ignore[arg-type] 

252 except (TypeError, ValueError): 

253 # If casting fails, return as object dtype 

254 return series.astype("object") 

255 

256 @staticmethod 

257 def _to_datetime(value: Any) -> pd.Timestamp: 

258 """Convert value to pandas Timestamp.""" 

259 if isinstance(value, (str, datetime)): 

260 return pd.Timestamp(value) 

261 elif isinstance(value, (int, float)): 

262 # Assume Unix timestamp 

263 return pd.Timestamp(value, unit='s') 

264 return value 

265 

266 @staticmethod 

267 def _from_datetime(value: Any) -> datetime: 

268 """Convert pandas Timestamp to datetime.""" 

269 if isinstance(value, pd.Timestamp): 

270 return value.to_pydatetime() 

271 elif isinstance(value, str): 

272 return pd.Timestamp(value).to_pydatetime() 

273 return value 

274 

275 @staticmethod 

276 def _to_json_object(value: Any) -> Any: 

277 """Ensure value is JSON-serializable object.""" 

278 if isinstance(value, str): 

279 try: 

280 return json.loads(value) 

281 except (json.JSONDecodeError, TypeError): 

282 return value 

283 return value 

284 

285 @staticmethod 

286 def _from_json_object(value: Any) -> Any: 

287 """Convert object to JSON-compatible format.""" 

288 if isinstance(value, (dict, list)): 

289 return value 

290 elif isinstance(value, str): 

291 try: 

292 return json.loads(value) 

293 except (json.JSONDecodeError, TypeError): 

294 return value 

295 return value 

296 

297 @staticmethod 

298 def _ensure_json_serializable(value: Any) -> Any: 

299 """Ensure value is JSON-serializable.""" 

300 if pd.isna(value): 

301 return None 

302 if isinstance(value, (dict, list)): 

303 return value 

304 if isinstance(value, str): 

305 try: 

306 return json.loads(value) 

307 except (json.JSONDecodeError, TypeError): 

308 return value 

309 # Convert other types to string representation 

310 return str(value) 

311 

312 def infer_field_type(self, series: pd.Series) -> str: 

313 """Infer field type from a pandas Series. 

314  

315 Args: 

316 series: Series to analyze 

317  

318 Returns: 

319 Field type string 

320 """ 

321 # Remove nulls for analysis 

322 non_null = series.dropna() 

323 

324 if len(non_null) == 0: 

325 return "string" # Default for empty 

326 

327 # Check dtypes 

328 if is_bool_dtype(non_null): 

329 return "boolean" 

330 elif is_datetime64_any_dtype(non_null): 

331 return "datetime" 

332 elif is_numeric_dtype(non_null): 

333 # Check if all values are integers 

334 if non_null.apply(lambda x: isinstance(x, (int, np.integer)) or (isinstance(x, float) and x.is_integer())).all(): 

335 return "integer" 

336 else: 

337 return "number" 

338 else: 

339 # Check values for special types 

340 sample = non_null.iloc[0] if len(non_null) > 0 else None 

341 if sample is not None: 

342 if isinstance(sample, (datetime, pd.Timestamp, pd._libs.tslibs.timestamps.Timestamp, pd._libs.tslibs.nattype.NaTType)): 

343 return "datetime" 

344 elif hasattr(sample, '__class__') and 'date' in sample.__class__.__name__.lower(): # type: ignore[unreachable] 

345 return "date" 

346 elif hasattr(sample, '__class__') and 'time' in sample.__class__.__name__.lower(): 

347 return "time" 

348 else: 

349 # Other object types 

350 return "string" 

351 else: 

352 # No sample available 

353 return "string" 

354 

355 def get_pandas_dtype(self, field_type: str) -> str: 

356 """Get pandas dtype for a field type string. 

357  

358 Args: 

359 field_type: Field type string 

360  

361 Returns: 

362 Pandas dtype string 

363 """ 

364 dtype_map = { 

365 "string": "object", 

366 "integer": "int64", 

367 "number": "float64", 

368 "float": "float64", 

369 "boolean": "bool", 

370 "datetime": "datetime64[ns]", 

371 "date": "object", 

372 "time": "object", 

373 "json": "object", 

374 "binary": "object", 

375 "text": "object", 

376 } 

377 return dtype_map.get(field_type.lower(), "object") 

378 

379 def convert_value(self, value: Any, target_type: str) -> Any: 

380 """Convert a value to target type. 

381  

382 Args: 

383 value: Value to convert 

384 target_type: Target type string 

385  

386 Returns: 

387 Converted value 

388 """ 

389 if value is None or pd.isna(value): 

390 return None 

391 

392 target_type = target_type.lower() 

393 

394 if target_type == "integer": 

395 if isinstance(value, str): 

396 return int(float(value)) 

397 return int(value) 

398 elif target_type == "number" or target_type == "float": 

399 return float(value) 

400 elif target_type == "string": 

401 return str(value) 

402 elif target_type == "boolean": 

403 if isinstance(value, str): 

404 return value.lower() in ('true', '1', 'yes') 

405 return bool(value) 

406 elif target_type == "datetime": 

407 if isinstance(value, str): 

408 return pd.Timestamp(value) 

409 return value 

410 elif target_type == "date": 

411 if isinstance(value, str): 

412 return pd.Timestamp(value).date() 

413 elif hasattr(value, 'date'): 

414 return value.date() 

415 return value 

416 elif target_type == "time": 

417 if isinstance(value, str): 

418 return pd.Timestamp(value).time() 

419 elif hasattr(value, 'time'): 

420 return value.time() 

421 return value 

422 

423 return value 

424 

425 def cast_dataframe_dtypes(self, df: pd.DataFrame, dtype_map: dict[str, str]) -> pd.DataFrame: 

426 """Cast DataFrame columns to specified dtypes. 

427  

428 Args: 

429 df: DataFrame to cast 

430 dtype_map: Dictionary of column: dtype 

431  

432 Returns: 

433 DataFrame with casted dtypes 

434 """ 

435 result_df = df.copy() 

436 

437 for col, dtype in dtype_map.items(): 

438 if col in result_df.columns: 

439 try: 

440 if dtype == "string": 

441 # Use string dtype for nullable strings 

442 result_df[col] = result_df[col].astype("string") 

443 else: 

444 result_df[col] = result_df[col].astype(dtype) # type: ignore[call-overload] 

445 except (TypeError, ValueError): 

446 # If casting fails, leave as is 

447 pass 

448 

449 return result_df 

450 

451 def normalize_timezone(self, series: pd.Series, target_tz: str) -> pd.Series: 

452 """Normalize timezone for datetime series. 

453  

454 Args: 

455 series: Datetime series 

456 target_tz: Target timezone 

457  

458 Returns: 

459 Series with normalized timezone 

460 """ 

461 if not is_datetime64_any_dtype(series): 

462 # Try to convert to datetime first 

463 series = pd.to_datetime(series, errors='coerce') 

464 

465 # If series is timezone-naive, localize it 

466 if series.dt.tz is None: 

467 return series.dt.tz_localize(target_tz) 

468 else: 

469 # If timezone-aware, convert to target timezone 

470 return series.dt.tz_convert(target_tz) 

471 

472 def get_optimal_dtype(self, series: pd.Series) -> str: 

473 """Determine optimal dtype for a Series based on its values. 

474  

475 Args: 

476 series: Series to analyze 

477  

478 Returns: 

479 Optimal dtype string 

480 """ 

481 # Remove nulls for analysis 

482 non_null = series.dropna() 

483 

484 if len(non_null) == 0: 

485 return "string" # Default for empty 

486 

487 # Try to infer the best dtype 

488 try: 

489 # Check for boolean 

490 if non_null.apply(lambda x: isinstance(x, bool)).all(): 

491 return "bool" 

492 

493 # Check for integer 

494 if non_null.apply(lambda x: isinstance(x, (int, np.integer)) or (isinstance(x, float) and x.is_integer())).all(): 

495 # Determine the smallest int type that can hold the values 

496 min_val = non_null.min() 

497 max_val = non_null.max() 

498 

499 if min_val >= -128 and max_val <= 127: 

500 return "int8" 

501 elif min_val >= -32768 and max_val <= 32767: 

502 return "int16" 

503 elif min_val >= -2147483648 and max_val <= 2147483647: 

504 return "int32" 

505 else: 

506 return "int64" 

507 

508 # Check for float 

509 if non_null.apply(lambda x: isinstance(x, (int, float, np.number))).all(): 

510 # For floats, prefer float32 for small ranges 

511 max_val = non_null.abs().max() 

512 if max_val <= 3.4e38: 

513 return "float32" 

514 else: 

515 return "float64" 

516 

517 # Check for datetime 

518 try: 

519 # Use infer_datetime_format to suppress the warning 

520 pd.to_datetime(non_null, format='mixed') 

521 return "datetime64[ns]" 

522 except (ValueError, TypeError): 

523 pass 

524 

525 # Default to object for strings and mixed types 

526 return "object" 

527 except Exception: 

528 return "object"