Coverage for polypandas/io.py: 77%
104 statements
« prev ^ index » next coverage.py v7.6.1, created at 2026-02-24 14:21 -0500
« prev ^ index » next coverage.py v7.6.1, created at 2026-02-24 14:21 -0500
1"""Data export and import utilities for polypandas."""
3from pathlib import Path
4from typing import Any, Dict, List
6from polypandas.exceptions import PolypandasError
7from polypandas.protocols import is_pandas_available
10class DataIOError(PolypandasError):
11 """Raised when data I/O operations fail."""
13 pass
16def save_as_parquet(df: Any, path: str, **kwargs: Any) -> None:
17 """Save DataFrame as Parquet file.
19 Args:
20 df: pandas DataFrame to save.
21 path: Output path for Parquet file.
22 **kwargs: Additional arguments for df.to_parquet() (e.g. index=False).
24 Raises:
25 DataIOError: If save operation fails.
26 """
27 if not is_pandas_available():
28 raise DataIOError("pandas is required for Parquet operations")
30 import pandas as pd
32 if not isinstance(df, pd.DataFrame):
33 raise DataIOError("Expected a pandas DataFrame")
35 try:
36 Path(path).parent.mkdir(parents=True, exist_ok=True)
37 df.to_parquet(path, **kwargs)
38 except Exception as e:
39 raise DataIOError(f"Failed to save Parquet file: {e}") from e
42def save_as_json(df: Any, path: str, **kwargs: Any) -> None:
43 """Save DataFrame as JSON file."""
44 if not is_pandas_available():
45 raise DataIOError("pandas is required for JSON operations")
47 try:
48 Path(path).parent.mkdir(parents=True, exist_ok=True)
49 df.to_json(path, **kwargs)
50 except Exception as e:
51 raise DataIOError(f"Failed to save JSON file: {e}") from e
54def save_as_csv(
55 df: Any,
56 path: str,
57 header: bool = True,
58 **kwargs: Any,
59) -> None:
60 """Save DataFrame as CSV file."""
61 if not is_pandas_available():
62 raise DataIOError("pandas is required for CSV operations")
64 try:
65 Path(path).parent.mkdir(parents=True, exist_ok=True)
66 df.to_csv(path, header=header, **kwargs)
67 except Exception as e:
68 raise DataIOError(f"Failed to save CSV file: {e}") from e
71def load_parquet(path: str, **kwargs: Any) -> Any:
72 """Load DataFrame from Parquet file."""
73 if not is_pandas_available():
74 raise DataIOError("pandas is required for Parquet operations")
76 import pandas as pd
78 try:
79 return pd.read_parquet(path, **kwargs)
80 except Exception as e:
81 raise DataIOError(f"Failed to load Parquet file: {e}") from e
84def load_json(path: str, **kwargs: Any) -> Any:
85 """Load DataFrame from JSON file."""
86 if not is_pandas_available():
87 raise DataIOError("pandas is required for JSON operations")
89 import pandas as pd
91 try:
92 return pd.read_json(path, **kwargs)
93 except Exception as e:
94 raise DataIOError(f"Failed to load JSON file: {e}") from e
97def load_csv(
98 path: str,
99 header: int = 0,
100 **kwargs: Any,
101) -> Any:
102 """Load DataFrame from CSV file."""
103 if not is_pandas_available():
104 raise DataIOError("pandas is required for CSV operations")
106 import pandas as pd
108 try:
109 return pd.read_csv(path, header=header, **kwargs)
110 except Exception as e:
111 raise DataIOError(f"Failed to load CSV file: {e}") from e
114def load_and_validate(
115 path: str,
116 expected_schema: Any = None,
117 validate_schema: bool = True,
118) -> Any:
119 """Load data file and optionally validate against expected dtype dict.
121 Args:
122 path: Path to data file (.parquet, .json, or .csv).
123 expected_schema: Optional dict of column name -> expected dtype (string).
124 validate_schema: Whether to validate that columns and dtypes match.
126 Returns:
127 Loaded pandas DataFrame.
128 """
129 if not is_pandas_available():
130 raise DataIOError("pandas is required for data loading")
132 file_path = Path(path)
133 suffix = file_path.suffix.lower()
135 if suffix == ".parquet":
136 df = load_parquet(path)
137 elif suffix == ".json":
138 df = load_json(path)
139 elif suffix in (".csv", ".txt"):
140 df = load_csv(path)
141 else:
142 raise DataIOError(f"Unsupported file format: {suffix}. Supported: .parquet, .json, .csv")
144 if validate_schema and expected_schema is not None:
145 from polypandas.testing import assert_column_exists
147 for col in expected_schema:
148 assert_column_exists(df, col)
149 # Optionally check dtypes
150 for col, dtype in expected_schema.items():
151 if col in df.columns and str(df[col].dtype) != str(dtype):
152 raise DataIOError(f"Column '{col}' has dtype {df[col].dtype}, expected {dtype}")
154 return df
157def save_dicts_as_json(data: List[Dict[str, Any]], path: str) -> None:
158 """Save list of dictionaries as JSON lines file. Works without pandas."""
159 import json
161 try:
162 file_path = Path(path)
163 file_path.parent.mkdir(parents=True, exist_ok=True)
164 with open(file_path, "w") as f:
165 for record in data:
166 json.dump(record, f, default=str)
167 f.write("\n")
168 except Exception as e:
169 raise DataIOError(f"Failed to save JSON file: {e}") from e
172def load_dicts_from_json(path: str) -> List[Dict[str, Any]]:
173 """Load list of dictionaries from JSON lines file. Works without pandas."""
174 import json
176 try:
177 file_path = Path(path)
178 if not file_path.exists():
179 raise DataIOError(f"File not found: {path}")
180 data = []
181 with open(file_path) as f:
182 for line in f:
183 if line.strip():
184 data.append(json.loads(line))
185 return data
186 except DataIOError:
187 raise
188 except Exception as e:
189 raise DataIOError(f"Failed to load JSON file: {e}") from e