Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Common IO api utilities""" 

2 

3import bz2 

4from collections import abc 

5import gzip 

6from io import BufferedIOBase, BytesIO, RawIOBase 

7import mmap 

8import os 

9import pathlib 

10from typing import IO, Any, AnyStr, Dict, List, Mapping, Optional, Tuple, Union 

11from urllib.parse import ( # noqa 

12 urlencode, 

13 urljoin, 

14 urlparse as parse_url, 

15 uses_netloc, 

16 uses_params, 

17 uses_relative, 

18) 

19import zipfile 

20 

21from pandas._typing import FilePathOrBuffer 

22from pandas.compat import _get_lzma_file, _import_lzma 

23from pandas.errors import ( # noqa 

24 AbstractMethodError, 

25 DtypeWarning, 

26 EmptyDataError, 

27 ParserError, 

28 ParserWarning, 

29) 

30 

31from pandas.core.dtypes.common import is_file_like 

32 

33lzma = _import_lzma() 

34 

35 

36_VALID_URLS = set(uses_relative + uses_netloc + uses_params) 

37_VALID_URLS.discard("") 

38 

39 

40def is_url(url) -> bool: 

41 """ 

42 Check to see if a URL has a valid protocol. 

43 

44 Parameters 

45 ---------- 

46 url : str or unicode 

47 

48 Returns 

49 ------- 

50 isurl : bool 

51 If `url` has a valid protocol return True otherwise False. 

52 """ 

53 if not isinstance(url, str): 

54 return False 

55 return parse_url(url).scheme in _VALID_URLS 

56 

57 

58def _expand_user( 

59 filepath_or_buffer: FilePathOrBuffer[AnyStr], 

60) -> FilePathOrBuffer[AnyStr]: 

61 """Return the argument with an initial component of ~ or ~user 

62 replaced by that user's home directory. 

63 

64 Parameters 

65 ---------- 

66 filepath_or_buffer : object to be converted if possible 

67 

68 Returns 

69 ------- 

70 expanded_filepath_or_buffer : an expanded filepath or the 

71 input if not expandable 

72 """ 

73 if isinstance(filepath_or_buffer, str): 

74 return os.path.expanduser(filepath_or_buffer) 

75 return filepath_or_buffer 

76 

77 

78def validate_header_arg(header) -> None: 

79 if isinstance(header, bool): 

80 raise TypeError( 

81 "Passing a bool to header is invalid. " 

82 "Use header=None for no header or " 

83 "header=int or list-like of ints to specify " 

84 "the row(s) making up the column names" 

85 ) 

86 

87 

88def stringify_path( 

89 filepath_or_buffer: FilePathOrBuffer[AnyStr], 

90) -> FilePathOrBuffer[AnyStr]: 

91 """Attempt to convert a path-like object to a string. 

92 

93 Parameters 

94 ---------- 

95 filepath_or_buffer : object to be converted 

96 

97 Returns 

98 ------- 

99 str_filepath_or_buffer : maybe a string version of the object 

100 

101 Notes 

102 ----- 

103 Objects supporting the fspath protocol (python 3.6+) are coerced 

104 according to its __fspath__ method. 

105 

106 For backwards compatibility with older pythons, pathlib.Path and 

107 py.path objects are specially coerced. 

108 

109 Any other object is passed through unchanged, which includes bytes, 

110 strings, buffers, or anything else that's not even path-like. 

111 """ 

112 if hasattr(filepath_or_buffer, "__fspath__"): 

113 # https://github.com/python/mypy/issues/1424 

114 return filepath_or_buffer.__fspath__() # type: ignore 

115 elif isinstance(filepath_or_buffer, pathlib.Path): 

116 return str(filepath_or_buffer) 

117 return _expand_user(filepath_or_buffer) 

118 

119 

120def is_s3_url(url) -> bool: 

121 """Check for an s3, s3n, or s3a url""" 

122 if not isinstance(url, str): 

123 return False 

124 return parse_url(url).scheme in ["s3", "s3n", "s3a"] 

125 

126 

127def is_gcs_url(url) -> bool: 

128 """Check for a gcs url""" 

129 if not isinstance(url, str): 

130 return False 

131 return parse_url(url).scheme in ["gcs", "gs"] 

132 

133 

134def urlopen(*args, **kwargs): 

135 """ 

136 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of 

137 the stdlib. 

138 """ 

139 import urllib.request 

140 

141 return urllib.request.urlopen(*args, **kwargs) 

142 

143 

144def get_filepath_or_buffer( 

145 filepath_or_buffer: FilePathOrBuffer, 

146 encoding: Optional[str] = None, 

147 compression: Optional[str] = None, 

148 mode: Optional[str] = None, 

149): 

150 """ 

151 If the filepath_or_buffer is a url, translate and return the buffer. 

152 Otherwise passthrough. 

153 

154 Parameters 

155 ---------- 

156 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), 

157 or buffer 

158 compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional 

159 encoding : the encoding to use to decode bytes, default is 'utf-8' 

160 mode : str, optional 

161 

162 Returns 

163 ------- 

164 tuple of ({a filepath_ or buffer or S3File instance}, 

165 encoding, str, 

166 compression, str, 

167 should_close, bool) 

168 """ 

169 filepath_or_buffer = stringify_path(filepath_or_buffer) 

170 

171 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): 

172 req = urlopen(filepath_or_buffer) 

173 content_encoding = req.headers.get("Content-Encoding", None) 

174 if content_encoding == "gzip": 

175 # Override compression based on Content-Encoding header 

176 compression = "gzip" 

177 reader = BytesIO(req.read()) 

178 req.close() 

179 return reader, encoding, compression, True 

180 

181 if is_s3_url(filepath_or_buffer): 

182 from pandas.io import s3 

183 

184 return s3.get_filepath_or_buffer( 

185 filepath_or_buffer, encoding=encoding, compression=compression, mode=mode 

186 ) 

187 

188 if is_gcs_url(filepath_or_buffer): 

189 from pandas.io import gcs 

190 

191 return gcs.get_filepath_or_buffer( 

192 filepath_or_buffer, encoding=encoding, compression=compression, mode=mode 

193 ) 

194 

195 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): 

196 return _expand_user(filepath_or_buffer), None, compression, False 

197 

198 if not is_file_like(filepath_or_buffer): 

199 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" 

200 raise ValueError(msg) 

201 

202 return filepath_or_buffer, None, compression, False 

203 

204 

205def file_path_to_url(path: str) -> str: 

206 """ 

207 converts an absolute native path to a FILE URL. 

208 

209 Parameters 

210 ---------- 

211 path : a path in native format 

212 

213 Returns 

214 ------- 

215 a valid FILE URL 

216 """ 

217 # lazify expensive import (~30ms) 

218 from urllib.request import pathname2url 

219 

220 return urljoin("file:", pathname2url(path)) 

221 

222 

223_compression_to_extension = {"gzip": ".gz", "bz2": ".bz2", "zip": ".zip", "xz": ".xz"} 

224 

225 

226def get_compression_method( 

227 compression: Optional[Union[str, Mapping[str, str]]] 

228) -> Tuple[Optional[str], Dict[str, str]]: 

229 """ 

230 Simplifies a compression argument to a compression method string and 

231 a mapping containing additional arguments. 

232 

233 Parameters 

234 ---------- 

235 compression : str or mapping 

236 If string, specifies the compression method. If mapping, value at key 

237 'method' specifies compression method. 

238 

239 Returns 

240 ------- 

241 tuple of ({compression method}, Optional[str] 

242 {compression arguments}, Dict[str, str]) 

243 

244 Raises 

245 ------ 

246 ValueError on mapping missing 'method' key 

247 """ 

248 if isinstance(compression, Mapping): 

249 compression_args = dict(compression) 

250 try: 

251 compression = compression_args.pop("method") 

252 except KeyError: 

253 raise ValueError("If mapping, compression must have key 'method'") 

254 else: 

255 compression_args = {} 

256 return compression, compression_args 

257 

258 

259def infer_compression( 

260 filepath_or_buffer: FilePathOrBuffer, compression: Optional[str] 

261) -> Optional[str]: 

262 """ 

263 Get the compression method for filepath_or_buffer. If compression='infer', 

264 the inferred compression method is returned. Otherwise, the input 

265 compression method is returned unchanged, unless it's invalid, in which 

266 case an error is raised. 

267 

268 Parameters 

269 ---------- 

270 filepath_or_buffer : str or file handle 

271 File path or object. 

272 compression : {'infer', 'gzip', 'bz2', 'zip', 'xz', None} 

273 If 'infer' and `filepath_or_buffer` is path-like, then detect 

274 compression from the following extensions: '.gz', '.bz2', '.zip', 

275 or '.xz' (otherwise no compression). 

276 

277 Returns 

278 ------- 

279 string or None 

280 

281 Raises 

282 ------ 

283 ValueError on invalid compression specified. 

284 """ 

285 

286 # No compression has been explicitly specified 

287 if compression is None: 

288 return None 

289 

290 # Infer compression 

291 if compression == "infer": 

292 # Convert all path types (e.g. pathlib.Path) to strings 

293 filepath_or_buffer = stringify_path(filepath_or_buffer) 

294 if not isinstance(filepath_or_buffer, str): 

295 # Cannot infer compression of a buffer, assume no compression 

296 return None 

297 

298 # Infer compression from the filename/URL extension 

299 for compression, extension in _compression_to_extension.items(): 

300 if filepath_or_buffer.endswith(extension): 

301 return compression 

302 return None 

303 

304 # Compression has been specified. Check that it's valid 

305 if compression in _compression_to_extension: 

306 return compression 

307 

308 msg = f"Unrecognized compression type: {compression}" 

309 valid = ["infer", None] + sorted(_compression_to_extension) 

310 msg += f"\nValid compression types are {valid}" 

311 raise ValueError(msg) 

312 

313 

314def get_handle( 

315 path_or_buf, 

316 mode: str, 

317 encoding=None, 

318 compression: Optional[Union[str, Mapping[str, Any]]] = None, 

319 memory_map: bool = False, 

320 is_text: bool = True, 

321): 

322 """ 

323 Get file handle for given path/buffer and mode. 

324 

325 Parameters 

326 ---------- 

327 path_or_buf : str or file handle 

328 File path or object. 

329 mode : str 

330 Mode to open path_or_buf with. 

331 encoding : str or None 

332 Encoding to use. 

333 compression : str or dict, default None 

334 If string, specifies compression mode. If dict, value at key 'method' 

335 specifies compression mode. Compression mode must be one of {'infer', 

336 'gzip', 'bz2', 'zip', 'xz', None}. If compression mode is 'infer' 

337 and `filepath_or_buffer` is path-like, then detect compression from 

338 the following extensions: '.gz', '.bz2', '.zip', or '.xz' (otherwise 

339 no compression). If dict and compression mode is 'zip' or inferred as 

340 'zip', other entries passed as additional compression options. 

341 

342 .. versionchanged:: 1.0.0 

343 

344 May now be a dict with key 'method' as compression mode 

345 and other keys as compression options if compression 

346 mode is 'zip'. 

347 

348 memory_map : boolean, default False 

349 See parsers._parser_params for more information. 

350 is_text : boolean, default True 

351 whether file/buffer is in text format (csv, json, etc.), or in binary 

352 mode (pickle, etc.). 

353 

354 Returns 

355 ------- 

356 f : file-like 

357 A file-like object. 

358 handles : list of file-like objects 

359 A list of file-like object that were opened in this function. 

360 """ 

361 try: 

362 from s3fs import S3File 

363 

364 need_text_wrapping = (BufferedIOBase, RawIOBase, S3File) 

365 except ImportError: 

366 need_text_wrapping = (BufferedIOBase, RawIOBase) # type: ignore 

367 

368 handles: List[IO] = list() 

369 f = path_or_buf 

370 

371 # Convert pathlib.Path/py.path.local or string 

372 path_or_buf = stringify_path(path_or_buf) 

373 is_path = isinstance(path_or_buf, str) 

374 

375 compression, compression_args = get_compression_method(compression) 

376 if is_path: 

377 compression = infer_compression(path_or_buf, compression) 

378 

379 if compression: 

380 

381 # GZ Compression 

382 if compression == "gzip": 

383 if is_path: 

384 f = gzip.open(path_or_buf, mode) 

385 else: 

386 f = gzip.GzipFile(fileobj=path_or_buf) 

387 

388 # BZ Compression 

389 elif compression == "bz2": 

390 if is_path: 

391 f = bz2.BZ2File(path_or_buf, mode) 

392 else: 

393 f = bz2.BZ2File(path_or_buf) 

394 

395 # ZIP Compression 

396 elif compression == "zip": 

397 zf = _BytesZipFile(path_or_buf, mode, **compression_args) 

398 # Ensure the container is closed as well. 

399 handles.append(zf) 

400 if zf.mode == "w": 

401 f = zf 

402 elif zf.mode == "r": 

403 zip_names = zf.namelist() 

404 if len(zip_names) == 1: 

405 f = zf.open(zip_names.pop()) 

406 elif len(zip_names) == 0: 

407 raise ValueError(f"Zero files found in ZIP file {path_or_buf}") 

408 else: 

409 raise ValueError( 

410 "Multiple files found in ZIP file." 

411 f" Only one file per ZIP: {zip_names}" 

412 ) 

413 

414 # XZ Compression 

415 elif compression == "xz": 

416 f = _get_lzma_file(lzma)(path_or_buf, mode) 

417 

418 # Unrecognized Compression 

419 else: 

420 msg = f"Unrecognized compression type: {compression}" 

421 raise ValueError(msg) 

422 

423 handles.append(f) 

424 

425 elif is_path: 

426 if encoding: 

427 # Encoding 

428 f = open(path_or_buf, mode, encoding=encoding, newline="") 

429 elif is_text: 

430 # No explicit encoding 

431 f = open(path_or_buf, mode, errors="replace", newline="") 

432 else: 

433 # Binary mode 

434 f = open(path_or_buf, mode) 

435 handles.append(f) 

436 

437 # Convert BytesIO or file objects passed with an encoding 

438 if is_text and (compression or isinstance(f, need_text_wrapping)): 

439 from io import TextIOWrapper 

440 

441 g = TextIOWrapper(f, encoding=encoding, newline="") 

442 if not isinstance(f, (BufferedIOBase, RawIOBase)): 

443 handles.append(g) 

444 f = g 

445 

446 if memory_map and hasattr(f, "fileno"): 

447 try: 

448 wrapped = _MMapWrapper(f) 

449 f.close() 

450 f = wrapped 

451 except Exception: 

452 # we catch any errors that may have occurred 

453 # because that is consistent with the lower-level 

454 # functionality of the C engine (pd.read_csv), so 

455 # leave the file handler as is then 

456 pass 

457 

458 return f, handles 

459 

460 

461class _BytesZipFile(zipfile.ZipFile, BytesIO): # type: ignore 

462 """ 

463 Wrapper for standard library class ZipFile and allow the returned file-like 

464 handle to accept byte strings via `write` method. 

465 

466 BytesIO provides attributes of file-like object and ZipFile.writestr writes 

467 bytes strings into a member of the archive. 

468 """ 

469 

470 # GH 17778 

471 def __init__( 

472 self, 

473 file: FilePathOrBuffer, 

474 mode: str, 

475 archive_name: Optional[str] = None, 

476 **kwargs, 

477 ): 

478 if mode in ["wb", "rb"]: 

479 mode = mode.replace("b", "") 

480 self.archive_name = archive_name 

481 super().__init__(file, mode, zipfile.ZIP_DEFLATED, **kwargs) 

482 

483 def write(self, data): 

484 archive_name = self.filename 

485 if self.archive_name is not None: 

486 archive_name = self.archive_name 

487 super().writestr(archive_name, data) 

488 

489 @property 

490 def closed(self): 

491 return self.fp is None 

492 

493 

494class _MMapWrapper(abc.Iterator): 

495 """ 

496 Wrapper for the Python's mmap class so that it can be properly read in 

497 by Python's csv.reader class. 

498 

499 Parameters 

500 ---------- 

501 f : file object 

502 File object to be mapped onto memory. Must support the 'fileno' 

503 method or have an equivalent attribute 

504 

505 """ 

506 

507 def __init__(self, f: IO): 

508 self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

509 

510 def __getattr__(self, name: str): 

511 return getattr(self.mmap, name) 

512 

513 def __iter__(self) -> "_MMapWrapper": 

514 return self 

515 

516 def __next__(self) -> str: 

517 newbytes = self.mmap.readline() 

518 

519 # readline returns bytes, not str, but Python's CSV reader 

520 # expects str, so convert the output to str before continuing 

521 newline = newbytes.decode("utf-8") 

522 

523 # mmap doesn't raise if reading past the allocated 

524 # data but instead returns an empty string, so raise 

525 # if that is returned 

526 if newline == "": 

527 raise StopIteration 

528 return newline