Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" parquet compat """ 

2 

3from typing import Any, Dict, Optional 

4from warnings import catch_warnings 

5 

6from pandas.compat._optional import import_optional_dependency 

7from pandas.errors import AbstractMethodError 

8 

9from pandas import DataFrame, get_option 

10 

11from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url 

12 

13 

14def get_engine(engine: str) -> "BaseImpl": 

15 """ return our implementation """ 

16 

17 if engine == "auto": 

18 engine = get_option("io.parquet.engine") 

19 

20 if engine == "auto": 

21 # try engines in this order 

22 try: 

23 return PyArrowImpl() 

24 except ImportError: 

25 pass 

26 

27 try: 

28 return FastParquetImpl() 

29 except ImportError: 

30 pass 

31 

32 raise ImportError( 

33 "Unable to find a usable engine; " 

34 "tried using: 'pyarrow', 'fastparquet'.\n" 

35 "pyarrow or fastparquet is required for parquet " 

36 "support" 

37 ) 

38 

39 if engine == "pyarrow": 

40 return PyArrowImpl() 

41 elif engine == "fastparquet": 

42 return FastParquetImpl() 

43 

44 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") 

45 

46 

47class BaseImpl: 

48 @staticmethod 

49 def validate_dataframe(df: DataFrame): 

50 

51 if not isinstance(df, DataFrame): 

52 raise ValueError("to_parquet only supports IO with DataFrames") 

53 

54 # must have value column names (strings only) 

55 if df.columns.inferred_type not in {"string", "unicode", "empty"}: 

56 raise ValueError("parquet must have string column names") 

57 

58 # index level names must be strings 

59 valid_names = all( 

60 isinstance(name, str) for name in df.index.names if name is not None 

61 ) 

62 if not valid_names: 

63 raise ValueError("Index level names must be strings") 

64 

65 def write(self, df: DataFrame, path, compression, **kwargs): 

66 raise AbstractMethodError(self) 

67 

68 def read(self, path, columns=None, **kwargs): 

69 raise AbstractMethodError(self) 

70 

71 

72class PyArrowImpl(BaseImpl): 

73 def __init__(self): 

74 import_optional_dependency( 

75 "pyarrow", extra="pyarrow is required for parquet support." 

76 ) 

77 import pyarrow.parquet 

78 

79 # import utils to register the pyarrow extension types 

80 import pandas.core.arrays._arrow_utils # noqa 

81 

82 self.api = pyarrow 

83 

84 def write( 

85 self, 

86 df: DataFrame, 

87 path, 

88 compression="snappy", 

89 coerce_timestamps="ms", 

90 index: Optional[bool] = None, 

91 partition_cols=None, 

92 **kwargs, 

93 ): 

94 self.validate_dataframe(df) 

95 path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") 

96 

97 from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} 

98 if index is not None: 

99 from_pandas_kwargs["preserve_index"] = index 

100 

101 table = self.api.Table.from_pandas(df, **from_pandas_kwargs) 

102 if partition_cols is not None: 

103 self.api.parquet.write_to_dataset( 

104 table, 

105 path, 

106 compression=compression, 

107 coerce_timestamps=coerce_timestamps, 

108 partition_cols=partition_cols, 

109 **kwargs, 

110 ) 

111 else: 

112 self.api.parquet.write_table( 

113 table, 

114 path, 

115 compression=compression, 

116 coerce_timestamps=coerce_timestamps, 

117 **kwargs, 

118 ) 

119 if should_close: 

120 path.close() 

121 

122 def read(self, path, columns=None, **kwargs): 

123 path, _, _, should_close = get_filepath_or_buffer(path) 

124 

125 kwargs["use_pandas_metadata"] = True 

126 result = self.api.parquet.read_table( 

127 path, columns=columns, **kwargs 

128 ).to_pandas() 

129 if should_close: 

130 path.close() 

131 

132 return result 

133 

134 

135class FastParquetImpl(BaseImpl): 

136 def __init__(self): 

137 # since pandas is a dependency of fastparquet 

138 # we need to import on first use 

139 fastparquet = import_optional_dependency( 

140 "fastparquet", extra="fastparquet is required for parquet support." 

141 ) 

142 self.api = fastparquet 

143 

144 def write( 

145 self, 

146 df: DataFrame, 

147 path, 

148 compression="snappy", 

149 index=None, 

150 partition_cols=None, 

151 **kwargs, 

152 ): 

153 self.validate_dataframe(df) 

154 # thriftpy/protocol/compact.py:339: 

155 # DeprecationWarning: tostring() is deprecated. 

156 # Use tobytes() instead. 

157 

158 if "partition_on" in kwargs and partition_cols is not None: 

159 raise ValueError( 

160 "Cannot use both partition_on and " 

161 "partition_cols. Use partition_cols for " 

162 "partitioning data" 

163 ) 

164 elif "partition_on" in kwargs: 

165 partition_cols = kwargs.pop("partition_on") 

166 

167 if partition_cols is not None: 

168 kwargs["file_scheme"] = "hive" 

169 

170 if is_s3_url(path) or is_gcs_url(path): 

171 # if path is s3:// or gs:// we need to open the file in 'wb' mode. 

172 # TODO: Support 'ab' 

173 

174 path, _, _, _ = get_filepath_or_buffer(path, mode="wb") 

175 # And pass the opened file to the fastparquet internal impl. 

176 kwargs["open_with"] = lambda path, _: path 

177 else: 

178 path, _, _, _ = get_filepath_or_buffer(path) 

179 

180 with catch_warnings(record=True): 

181 self.api.write( 

182 path, 

183 df, 

184 compression=compression, 

185 write_index=index, 

186 partition_on=partition_cols, 

187 **kwargs, 

188 ) 

189 

190 def read(self, path, columns=None, **kwargs): 

191 if is_s3_url(path): 

192 from pandas.io.s3 import get_file_and_filesystem 

193 

194 # When path is s3:// an S3File is returned. 

195 # We need to retain the original path(str) while also 

196 # pass the S3File().open function to fsatparquet impl. 

197 s3, filesystem = get_file_and_filesystem(path) 

198 try: 

199 parquet_file = self.api.ParquetFile(path, open_with=filesystem.open) 

200 finally: 

201 s3.close() 

202 else: 

203 path, _, _, _ = get_filepath_or_buffer(path) 

204 parquet_file = self.api.ParquetFile(path) 

205 

206 return parquet_file.to_pandas(columns=columns, **kwargs) 

207 

208 

209def to_parquet( 

210 df: DataFrame, 

211 path, 

212 engine: str = "auto", 

213 compression="snappy", 

214 index: Optional[bool] = None, 

215 partition_cols=None, 

216 **kwargs, 

217): 

218 """ 

219 Write a DataFrame to the parquet format. 

220 

221 Parameters 

222 ---------- 

223 df : DataFrame 

224 path : str 

225 File path or Root Directory path. Will be used as Root Directory path 

226 while writing a partitioned dataset. 

227 

228 .. versionchanged:: 0.24.0 

229 

230 engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' 

231 Parquet library to use. If 'auto', then the option 

232 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

233 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

234 'pyarrow' is unavailable. 

235 compression : {'snappy', 'gzip', 'brotli', None}, default 'snappy' 

236 Name of the compression to use. Use ``None`` for no compression. 

237 index : bool, default None 

238 If ``True``, include the dataframe's index(es) in the file output. If 

239 ``False``, they will not be written to the file. 

240 If ``None``, similar to ``True`` the dataframe's index(es) 

241 will be saved. However, instead of being saved as values, 

242 the RangeIndex will be stored as a range in the metadata so it 

243 doesn't require much space and is faster. Other indexes will 

244 be included as columns in the file output. 

245 

246 .. versionadded:: 0.24.0 

247 

248 partition_cols : str or list, optional, default None 

249 Column names by which to partition the dataset 

250 Columns are partitioned in the order they are given 

251 

252 .. versionadded:: 0.24.0 

253 

254 kwargs 

255 Additional keyword arguments passed to the engine 

256 """ 

257 if isinstance(partition_cols, str): 

258 partition_cols = [partition_cols] 

259 impl = get_engine(engine) 

260 return impl.write( 

261 df, 

262 path, 

263 compression=compression, 

264 index=index, 

265 partition_cols=partition_cols, 

266 **kwargs, 

267 ) 

268 

269 

270def read_parquet(path, engine: str = "auto", columns=None, **kwargs): 

271 """ 

272 Load a parquet object from the file path, returning a DataFrame. 

273 

274 .. versionadded:: 0.21.0 

275 

276 Parameters 

277 ---------- 

278 path : str, path object or file-like object 

279 Any valid string path is acceptable. The string could be a URL. Valid 

280 URL schemes include http, ftp, s3, and file. For file URLs, a host is 

281 expected. A local file could be: 

282 ``file://localhost/path/to/table.parquet``. 

283 A file URL can also be a path to a directory that contains multiple 

284 partitioned parquet files. Both pyarrow and fastparquet support 

285 paths to directories as well as file URLs. A directory path could be: 

286 ``file://localhost/path/to/tables`` 

287 

288 If you want to pass in a path object, pandas accepts any 

289 ``os.PathLike``. 

290 

291 By file-like object, we refer to objects with a ``read()`` method, 

292 such as a file handler (e.g. via builtin ``open`` function) 

293 or ``StringIO``. 

294 engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' 

295 Parquet library to use. If 'auto', then the option 

296 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

297 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

298 'pyarrow' is unavailable. 

299 columns : list, default=None 

300 If not None, only these columns will be read from the file. 

301 

302 .. versionadded:: 0.21.1 

303 **kwargs 

304 Any additional kwargs are passed to the engine. 

305 

306 Returns 

307 ------- 

308 DataFrame 

309 """ 

310 

311 impl = get_engine(engine) 

312 return impl.read(path, columns=columns, **kwargs)