Coverage for src/dataknobs_data/pandas/type_mapper.py: 0%
247 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-29 14:14 -0600
1"""Type mapping between DataKnobs Field types and Pandas dtypes."""
3from __future__ import annotations
5import json
6from dataclasses import dataclass
7from datetime import datetime
8from typing import Any, TYPE_CHECKING
10import numpy as np
11import pandas as pd
12from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
14from dataknobs_data.fields import FieldType
16if TYPE_CHECKING:
17 from collections.abc import Callable
20@dataclass
21class PandasTypeMapping:
22 """Mapping configuration for type conversion."""
23 field_type: FieldType
24 pandas_dtype: str | type | np.dtype
25 nullable: bool = True
26 converter: Callable | None = None
27 reverse_converter: Callable | None = None
30class TypeMapper:
31 """Handles type mapping between DataKnobs Field types and Pandas dtypes."""
33 def __init__(self):
34 """Initialize type mapper with default mappings."""
35 self._init_mappings()
37 def _init_mappings(self):
38 """Initialize type mappings."""
39 self._field_to_pandas: dict[FieldType, PandasTypeMapping] = {
40 FieldType.STRING: PandasTypeMapping(
41 field_type=FieldType.STRING,
42 pandas_dtype="string", # pd.StringDtype()
43 nullable=True
44 ),
45 FieldType.INTEGER: PandasTypeMapping(
46 field_type=FieldType.INTEGER,
47 pandas_dtype="Int64", # pd.Int64Dtype()
48 nullable=True
49 ),
50 FieldType.FLOAT: PandasTypeMapping(
51 field_type=FieldType.FLOAT,
52 pandas_dtype="Float64", # pd.Float64Dtype()
53 nullable=True
54 ),
55 FieldType.BOOLEAN: PandasTypeMapping(
56 field_type=FieldType.BOOLEAN,
57 pandas_dtype="boolean", # pd.BooleanDtype()
58 nullable=True
59 ),
60 FieldType.DATETIME: PandasTypeMapping(
61 field_type=FieldType.DATETIME,
62 pandas_dtype="datetime64[ns]",
63 nullable=True,
64 converter=self._to_datetime,
65 reverse_converter=self._from_datetime
66 ),
67 FieldType.JSON: PandasTypeMapping(
68 field_type=FieldType.JSON,
69 pandas_dtype="object",
70 nullable=True,
71 converter=self._to_json_object,
72 reverse_converter=self._from_json_object
73 ),
74 FieldType.BINARY: PandasTypeMapping(
75 field_type=FieldType.BINARY,
76 pandas_dtype="object",
77 nullable=True
78 ),
79 FieldType.TEXT: PandasTypeMapping(
80 field_type=FieldType.TEXT,
81 pandas_dtype="string",
82 nullable=True
83 ),
84 }
86 # Reverse mapping from pandas to field types
87 self._pandas_to_field: dict[str, FieldType] = {
88 "string": FieldType.STRING,
89 "int64": FieldType.INTEGER,
90 "float64": FieldType.FLOAT,
91 "boolean": FieldType.BOOLEAN,
92 "datetime64[ns]": FieldType.DATETIME,
93 "object": FieldType.STRING, # Default object to STRING, not JSON
94 }
96 def field_type_to_pandas(self, field_type: FieldType) -> str | type | np.dtype:
97 """Convert FieldType to pandas dtype.
99 Args:
100 field_type: DataKnobs FieldType
102 Returns:
103 Corresponding pandas dtype
104 """
105 mapping = self._field_to_pandas.get(field_type)
106 if mapping:
107 return mapping.pandas_dtype
108 return "object" # Default fallback
110 def pandas_to_field_type(self, dtype: str | np.dtype | type) -> FieldType:
111 """Infer FieldType from pandas dtype.
113 Args:
114 dtype: Pandas dtype
116 Returns:
117 Corresponding FieldType
118 """
119 dtype_str = str(dtype).lower()
121 # Direct mapping
122 if dtype_str in self._pandas_to_field:
123 return self._pandas_to_field[dtype_str]
125 # Infer from dtype categories
126 if "int" in dtype_str:
127 return FieldType.INTEGER
128 elif "float" in dtype_str:
129 return FieldType.FLOAT
130 elif "bool" in dtype_str:
131 return FieldType.BOOLEAN
132 elif "datetime" in dtype_str or "timestamp" in dtype_str:
133 return FieldType.DATETIME
134 elif dtype_str == "string" or dtype_str == "object":
135 return FieldType.STRING
136 elif "bytes" in dtype_str:
137 return FieldType.BINARY
139 return FieldType.STRING # Default fallback
141 def convert_value_to_pandas(self, value: Any, field_type: FieldType) -> Any:
142 """Convert a field value to pandas-compatible format.
144 Args:
145 value: Value to convert
146 field_type: Source field type
148 Returns:
149 Pandas-compatible value
150 """
151 if value is None:
152 return pd.NA
154 mapping = self._field_to_pandas.get(field_type)
155 if mapping and mapping.converter:
156 return mapping.converter(value)
158 return value
160 def convert_value_from_pandas(self, value: Any, field_type: FieldType) -> Any:
161 """Convert a pandas value to field-compatible format.
163 Args:
164 value: Pandas value
165 field_type: Target field type
167 Returns:
168 Field-compatible value
169 """
170 # Handle pandas NA/NaN/None
171 # Use try-except to handle arrays and other special cases
172 try:
173 if pd.isna(value):
174 return None
175 except (TypeError, ValueError):
176 # pd.isna can fail on arrays/lists
177 pass
179 mapping = self._field_to_pandas.get(field_type)
180 if mapping and mapping.reverse_converter:
181 return mapping.reverse_converter(value)
183 # Handle numpy types
184 if isinstance(value, (np.integer, np.floating, np.bool_)):
185 return value.item()
187 return value
189 def infer_field_type_from_value(self, value: Any) -> FieldType:
190 """Infer FieldType from a Python value.
192 Args:
193 value: Value to analyze
195 Returns:
196 Inferred FieldType
197 """
198 if value is None:
199 return FieldType.STRING # Default for null
201 # Check for pandas NA separately to avoid array ambiguity
202 try:
203 if pd.isna(value):
204 return FieldType.STRING
205 except (TypeError, ValueError):
206 # pd.isna can fail on some types like lists
207 pass
209 if isinstance(value, (bool, np.bool_)):
210 return FieldType.BOOLEAN
211 elif isinstance(value, (int, np.integer)):
212 return FieldType.INTEGER
213 elif isinstance(value, (float, np.floating)):
214 return FieldType.FLOAT
215 elif isinstance(value, (datetime, pd.Timestamp)):
216 return FieldType.DATETIME
217 elif isinstance(value, bytes): # type: ignore[unreachable]
218 return FieldType.BINARY
219 elif isinstance(value, (dict, list)):
220 return FieldType.JSON
221 elif isinstance(value, str):
222 if len(value) > 1000:
223 return FieldType.TEXT
224 return FieldType.STRING
225 else:
226 # Complex objects as JSON
227 return FieldType.JSON
229 def cast_series(self, series: pd.Series, field_type: FieldType) -> pd.Series:
230 """Cast a pandas Series to the appropriate dtype for a FieldType.
232 Args:
233 series: Series to cast
234 field_type: Target field type
236 Returns:
237 Casted Series
238 """
239 target_dtype = self.field_type_to_pandas(field_type)
241 try:
242 # Special handling for datetime
243 if field_type == FieldType.DATETIME:
244 return pd.to_datetime(series, errors='coerce')
246 # Special handling for JSON
247 if field_type == FieldType.JSON:
248 return series.apply(self._ensure_json_serializable)
250 # Standard casting
251 return series.astype(target_dtype) # type: ignore[arg-type]
252 except (TypeError, ValueError):
253 # If casting fails, return as object dtype
254 return series.astype("object")
256 @staticmethod
257 def _to_datetime(value: Any) -> pd.Timestamp:
258 """Convert value to pandas Timestamp."""
259 if isinstance(value, (str, datetime)):
260 return pd.Timestamp(value)
261 elif isinstance(value, (int, float)):
262 # Assume Unix timestamp
263 return pd.Timestamp(value, unit='s')
264 return value
266 @staticmethod
267 def _from_datetime(value: Any) -> datetime:
268 """Convert pandas Timestamp to datetime."""
269 if isinstance(value, pd.Timestamp):
270 return value.to_pydatetime()
271 elif isinstance(value, str):
272 return pd.Timestamp(value).to_pydatetime()
273 return value
275 @staticmethod
276 def _to_json_object(value: Any) -> Any:
277 """Ensure value is JSON-serializable object."""
278 if isinstance(value, str):
279 try:
280 return json.loads(value)
281 except (json.JSONDecodeError, TypeError):
282 return value
283 return value
285 @staticmethod
286 def _from_json_object(value: Any) -> Any:
287 """Convert object to JSON-compatible format."""
288 if isinstance(value, (dict, list)):
289 return value
290 elif isinstance(value, str):
291 try:
292 return json.loads(value)
293 except (json.JSONDecodeError, TypeError):
294 return value
295 return value
297 @staticmethod
298 def _ensure_json_serializable(value: Any) -> Any:
299 """Ensure value is JSON-serializable."""
300 if pd.isna(value):
301 return None
302 if isinstance(value, (dict, list)):
303 return value
304 if isinstance(value, str):
305 try:
306 return json.loads(value)
307 except (json.JSONDecodeError, TypeError):
308 return value
309 # Convert other types to string representation
310 return str(value)
312 def infer_field_type(self, series: pd.Series) -> str:
313 """Infer field type from a pandas Series.
315 Args:
316 series: Series to analyze
318 Returns:
319 Field type string
320 """
321 # Remove nulls for analysis
322 non_null = series.dropna()
324 if len(non_null) == 0:
325 return "string" # Default for empty
327 # Check dtypes
328 if is_bool_dtype(non_null):
329 return "boolean"
330 elif is_datetime64_any_dtype(non_null):
331 return "datetime"
332 elif is_numeric_dtype(non_null):
333 # Check if all values are integers
334 if non_null.apply(lambda x: isinstance(x, (int, np.integer)) or (isinstance(x, float) and x.is_integer())).all():
335 return "integer"
336 else:
337 return "number"
338 else:
339 # Check values for special types
340 sample = non_null.iloc[0] if len(non_null) > 0 else None
341 if sample is not None:
342 if isinstance(sample, (datetime, pd.Timestamp, pd._libs.tslibs.timestamps.Timestamp, pd._libs.tslibs.nattype.NaTType)):
343 return "datetime"
344 elif hasattr(sample, '__class__') and 'date' in sample.__class__.__name__.lower(): # type: ignore[unreachable]
345 return "date"
346 elif hasattr(sample, '__class__') and 'time' in sample.__class__.__name__.lower():
347 return "time"
348 else:
349 # Other object types
350 return "string"
351 else:
352 # No sample available
353 return "string"
355 def get_pandas_dtype(self, field_type: str) -> str:
356 """Get pandas dtype for a field type string.
358 Args:
359 field_type: Field type string
361 Returns:
362 Pandas dtype string
363 """
364 dtype_map = {
365 "string": "object",
366 "integer": "int64",
367 "number": "float64",
368 "float": "float64",
369 "boolean": "bool",
370 "datetime": "datetime64[ns]",
371 "date": "object",
372 "time": "object",
373 "json": "object",
374 "binary": "object",
375 "text": "object",
376 }
377 return dtype_map.get(field_type.lower(), "object")
379 def convert_value(self, value: Any, target_type: str) -> Any:
380 """Convert a value to target type.
382 Args:
383 value: Value to convert
384 target_type: Target type string
386 Returns:
387 Converted value
388 """
389 if value is None or pd.isna(value):
390 return None
392 target_type = target_type.lower()
394 if target_type == "integer":
395 if isinstance(value, str):
396 return int(float(value))
397 return int(value)
398 elif target_type == "number" or target_type == "float":
399 return float(value)
400 elif target_type == "string":
401 return str(value)
402 elif target_type == "boolean":
403 if isinstance(value, str):
404 return value.lower() in ('true', '1', 'yes')
405 return bool(value)
406 elif target_type == "datetime":
407 if isinstance(value, str):
408 return pd.Timestamp(value)
409 return value
410 elif target_type == "date":
411 if isinstance(value, str):
412 return pd.Timestamp(value).date()
413 elif hasattr(value, 'date'):
414 return value.date()
415 return value
416 elif target_type == "time":
417 if isinstance(value, str):
418 return pd.Timestamp(value).time()
419 elif hasattr(value, 'time'):
420 return value.time()
421 return value
423 return value
425 def cast_dataframe_dtypes(self, df: pd.DataFrame, dtype_map: dict[str, str]) -> pd.DataFrame:
426 """Cast DataFrame columns to specified dtypes.
428 Args:
429 df: DataFrame to cast
430 dtype_map: Dictionary of column: dtype
432 Returns:
433 DataFrame with casted dtypes
434 """
435 result_df = df.copy()
437 for col, dtype in dtype_map.items():
438 if col in result_df.columns:
439 try:
440 if dtype == "string":
441 # Use string dtype for nullable strings
442 result_df[col] = result_df[col].astype("string")
443 else:
444 result_df[col] = result_df[col].astype(dtype) # type: ignore[call-overload]
445 except (TypeError, ValueError):
446 # If casting fails, leave as is
447 pass
449 return result_df
451 def normalize_timezone(self, series: pd.Series, target_tz: str) -> pd.Series:
452 """Normalize timezone for datetime series.
454 Args:
455 series: Datetime series
456 target_tz: Target timezone
458 Returns:
459 Series with normalized timezone
460 """
461 if not is_datetime64_any_dtype(series):
462 # Try to convert to datetime first
463 series = pd.to_datetime(series, errors='coerce')
465 # If series is timezone-naive, localize it
466 if series.dt.tz is None:
467 return series.dt.tz_localize(target_tz)
468 else:
469 # If timezone-aware, convert to target timezone
470 return series.dt.tz_convert(target_tz)
472 def get_optimal_dtype(self, series: pd.Series) -> str:
473 """Determine optimal dtype for a Series based on its values.
475 Args:
476 series: Series to analyze
478 Returns:
479 Optimal dtype string
480 """
481 # Remove nulls for analysis
482 non_null = series.dropna()
484 if len(non_null) == 0:
485 return "string" # Default for empty
487 # Try to infer the best dtype
488 try:
489 # Check for boolean
490 if non_null.apply(lambda x: isinstance(x, bool)).all():
491 return "bool"
493 # Check for integer
494 if non_null.apply(lambda x: isinstance(x, (int, np.integer)) or (isinstance(x, float) and x.is_integer())).all():
495 # Determine the smallest int type that can hold the values
496 min_val = non_null.min()
497 max_val = non_null.max()
499 if min_val >= -128 and max_val <= 127:
500 return "int8"
501 elif min_val >= -32768 and max_val <= 32767:
502 return "int16"
503 elif min_val >= -2147483648 and max_val <= 2147483647:
504 return "int32"
505 else:
506 return "int64"
508 # Check for float
509 if non_null.apply(lambda x: isinstance(x, (int, float, np.number))).all():
510 # For floats, prefer float32 for small ranges
511 max_val = non_null.abs().max()
512 if max_val <= 3.4e38:
513 return "float32"
514 else:
515 return "float64"
517 # Check for datetime
518 try:
519 # Use infer_datetime_format to suppress the warning
520 pd.to_datetime(non_null, format='mixed')
521 return "datetime64[ns]"
522 except (ValueError, TypeError):
523 pass
525 # Default to object for strings and mixed types
526 return "object"
527 except Exception:
528 return "object"