Coverage for polypandas/io.py: 77%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2026-02-24 14:21 -0500

1"""Data export and import utilities for polypandas.""" 

2 

3from pathlib import Path 

4from typing import Any, Dict, List 

5 

6from polypandas.exceptions import PolypandasError 

7from polypandas.protocols import is_pandas_available 

8 

9 

10class DataIOError(PolypandasError): 

11 """Raised when data I/O operations fail.""" 

12 

13 pass 

14 

15 

16def save_as_parquet(df: Any, path: str, **kwargs: Any) -> None: 

17 """Save DataFrame as Parquet file. 

18 

19 Args: 

20 df: pandas DataFrame to save. 

21 path: Output path for Parquet file. 

22 **kwargs: Additional arguments for df.to_parquet() (e.g. index=False). 

23 

24 Raises: 

25 DataIOError: If save operation fails. 

26 """ 

27 if not is_pandas_available(): 

28 raise DataIOError("pandas is required for Parquet operations") 

29 

30 import pandas as pd 

31 

32 if not isinstance(df, pd.DataFrame): 

33 raise DataIOError("Expected a pandas DataFrame") 

34 

35 try: 

36 Path(path).parent.mkdir(parents=True, exist_ok=True) 

37 df.to_parquet(path, **kwargs) 

38 except Exception as e: 

39 raise DataIOError(f"Failed to save Parquet file: {e}") from e 

40 

41 

42def save_as_json(df: Any, path: str, **kwargs: Any) -> None: 

43 """Save DataFrame as JSON file.""" 

44 if not is_pandas_available(): 

45 raise DataIOError("pandas is required for JSON operations") 

46 

47 try: 

48 Path(path).parent.mkdir(parents=True, exist_ok=True) 

49 df.to_json(path, **kwargs) 

50 except Exception as e: 

51 raise DataIOError(f"Failed to save JSON file: {e}") from e 

52 

53 

54def save_as_csv( 

55 df: Any, 

56 path: str, 

57 header: bool = True, 

58 **kwargs: Any, 

59) -> None: 

60 """Save DataFrame as CSV file.""" 

61 if not is_pandas_available(): 

62 raise DataIOError("pandas is required for CSV operations") 

63 

64 try: 

65 Path(path).parent.mkdir(parents=True, exist_ok=True) 

66 df.to_csv(path, header=header, **kwargs) 

67 except Exception as e: 

68 raise DataIOError(f"Failed to save CSV file: {e}") from e 

69 

70 

71def load_parquet(path: str, **kwargs: Any) -> Any: 

72 """Load DataFrame from Parquet file.""" 

73 if not is_pandas_available(): 

74 raise DataIOError("pandas is required for Parquet operations") 

75 

76 import pandas as pd 

77 

78 try: 

79 return pd.read_parquet(path, **kwargs) 

80 except Exception as e: 

81 raise DataIOError(f"Failed to load Parquet file: {e}") from e 

82 

83 

84def load_json(path: str, **kwargs: Any) -> Any: 

85 """Load DataFrame from JSON file.""" 

86 if not is_pandas_available(): 

87 raise DataIOError("pandas is required for JSON operations") 

88 

89 import pandas as pd 

90 

91 try: 

92 return pd.read_json(path, **kwargs) 

93 except Exception as e: 

94 raise DataIOError(f"Failed to load JSON file: {e}") from e 

95 

96 

97def load_csv( 

98 path: str, 

99 header: int = 0, 

100 **kwargs: Any, 

101) -> Any: 

102 """Load DataFrame from CSV file.""" 

103 if not is_pandas_available(): 

104 raise DataIOError("pandas is required for CSV operations") 

105 

106 import pandas as pd 

107 

108 try: 

109 return pd.read_csv(path, header=header, **kwargs) 

110 except Exception as e: 

111 raise DataIOError(f"Failed to load CSV file: {e}") from e 

112 

113 

114def load_and_validate( 

115 path: str, 

116 expected_schema: Any = None, 

117 validate_schema: bool = True, 

118) -> Any: 

119 """Load data file and optionally validate against expected dtype dict. 

120 

121 Args: 

122 path: Path to data file (.parquet, .json, or .csv). 

123 expected_schema: Optional dict of column name -> expected dtype (string). 

124 validate_schema: Whether to validate that columns and dtypes match. 

125 

126 Returns: 

127 Loaded pandas DataFrame. 

128 """ 

129 if not is_pandas_available(): 

130 raise DataIOError("pandas is required for data loading") 

131 

132 file_path = Path(path) 

133 suffix = file_path.suffix.lower() 

134 

135 if suffix == ".parquet": 

136 df = load_parquet(path) 

137 elif suffix == ".json": 

138 df = load_json(path) 

139 elif suffix in (".csv", ".txt"): 

140 df = load_csv(path) 

141 else: 

142 raise DataIOError(f"Unsupported file format: {suffix}. Supported: .parquet, .json, .csv") 

143 

144 if validate_schema and expected_schema is not None: 

145 from polypandas.testing import assert_column_exists 

146 

147 for col in expected_schema: 

148 assert_column_exists(df, col) 

149 # Optionally check dtypes 

150 for col, dtype in expected_schema.items(): 

151 if col in df.columns and str(df[col].dtype) != str(dtype): 

152 raise DataIOError(f"Column '{col}' has dtype {df[col].dtype}, expected {dtype}") 

153 

154 return df 

155 

156 

157def save_dicts_as_json(data: List[Dict[str, Any]], path: str) -> None: 

158 """Save list of dictionaries as JSON lines file. Works without pandas.""" 

159 import json 

160 

161 try: 

162 file_path = Path(path) 

163 file_path.parent.mkdir(parents=True, exist_ok=True) 

164 with open(file_path, "w") as f: 

165 for record in data: 

166 json.dump(record, f, default=str) 

167 f.write("\n") 

168 except Exception as e: 

169 raise DataIOError(f"Failed to save JSON file: {e}") from e 

170 

171 

172def load_dicts_from_json(path: str) -> List[Dict[str, Any]]: 

173 """Load list of dictionaries from JSON lines file. Works without pandas.""" 

174 import json 

175 

176 try: 

177 file_path = Path(path) 

178 if not file_path.exists(): 

179 raise DataIOError(f"File not found: {path}") 

180 data = [] 

181 with open(file_path) as f: 

182 for line in f: 

183 if line.strip(): 

184 data.append(json.loads(line)) 

185 return data 

186 except DataIOError: 

187 raise 

188 except Exception as e: 

189 raise DataIOError(f"Failed to load JSON file: {e}") from e