Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# TODO: Needs a better name; too many modules are already called "concat" 

2from collections import defaultdict 

3import copy 

4 

5import numpy as np 

6 

7from pandas._libs import internals as libinternals, tslibs 

8from pandas.util._decorators import cache_readonly 

9 

10from pandas.core.dtypes.cast import maybe_promote 

11from pandas.core.dtypes.common import ( 

12 _get_dtype, 

13 is_categorical_dtype, 

14 is_datetime64_dtype, 

15 is_datetime64tz_dtype, 

16 is_extension_array_dtype, 

17 is_float_dtype, 

18 is_numeric_dtype, 

19 is_sparse, 

20 is_timedelta64_dtype, 

21) 

22from pandas.core.dtypes.concat import concat_compat 

23from pandas.core.dtypes.missing import isna 

24 

25import pandas.core.algorithms as algos 

26 

27 

28def get_mgr_concatenation_plan(mgr, indexers): 

29 """ 

30 Construct concatenation plan for given block manager and indexers. 

31 

32 Parameters 

33 ---------- 

34 mgr : BlockManager 

35 indexers : dict of {axis: indexer} 

36 

37 Returns 

38 ------- 

39 plan : list of (BlockPlacement, JoinUnit) tuples 

40 

41 """ 

42 # Calculate post-reindex shape , save for item axis which will be separate 

43 # for each block anyway. 

44 mgr_shape = list(mgr.shape) 

45 for ax, indexer in indexers.items(): 

46 mgr_shape[ax] = len(indexer) 

47 mgr_shape = tuple(mgr_shape) 

48 

49 if 0 in indexers: 

50 ax0_indexer = indexers.pop(0) 

51 blknos = algos.take_1d(mgr._blknos, ax0_indexer, fill_value=-1) 

52 blklocs = algos.take_1d(mgr._blklocs, ax0_indexer, fill_value=-1) 

53 else: 

54 

55 if mgr._is_single_block: 

56 blk = mgr.blocks[0] 

57 return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))] 

58 

59 ax0_indexer = None 

60 blknos = mgr._blknos 

61 blklocs = mgr._blklocs 

62 

63 plan = [] 

64 for blkno, placements in libinternals.get_blkno_placements(blknos, group=False): 

65 

66 assert placements.is_slice_like 

67 

68 join_unit_indexers = indexers.copy() 

69 

70 shape = list(mgr_shape) 

71 shape[0] = len(placements) 

72 shape = tuple(shape) 

73 

74 if blkno == -1: 

75 unit = JoinUnit(None, shape) 

76 else: 

77 blk = mgr.blocks[blkno] 

78 ax0_blk_indexer = blklocs[placements.indexer] 

79 

80 unit_no_ax0_reindexing = ( 

81 len(placements) == len(blk.mgr_locs) 

82 and 

83 # Fastpath detection of join unit not 

84 # needing to reindex its block: no ax0 

85 # reindexing took place and block 

86 # placement was sequential before. 

87 ( 

88 ( 

89 ax0_indexer is None 

90 and blk.mgr_locs.is_slice_like 

91 and blk.mgr_locs.as_slice.step == 1 

92 ) 

93 or 

94 # Slow-ish detection: all indexer locs 

95 # are sequential (and length match is 

96 # checked above). 

97 (np.diff(ax0_blk_indexer) == 1).all() 

98 ) 

99 ) 

100 

101 # Omit indexer if no item reindexing is required. 

102 if unit_no_ax0_reindexing: 

103 join_unit_indexers.pop(0, None) 

104 else: 

105 join_unit_indexers[0] = ax0_blk_indexer 

106 

107 unit = JoinUnit(blk, shape, join_unit_indexers) 

108 

109 plan.append((placements, unit)) 

110 

111 return plan 

112 

113 

114class JoinUnit: 

115 def __init__(self, block, shape, indexers=None): 

116 # Passing shape explicitly is required for cases when block is None. 

117 if indexers is None: 

118 indexers = {} 

119 self.block = block 

120 self.indexers = indexers 

121 self.shape = shape 

122 

123 def __repr__(self) -> str: 

124 return f"{type(self).__name__}({repr(self.block)}, {self.indexers})" 

125 

126 @cache_readonly 

127 def needs_filling(self): 

128 for indexer in self.indexers.values(): 

129 # FIXME: cache results of indexer == -1 checks. 

130 if (indexer == -1).any(): 

131 return True 

132 

133 return False 

134 

135 @cache_readonly 

136 def dtype(self): 

137 if self.block is None: 

138 raise AssertionError("Block is None, no dtype") 

139 

140 if not self.needs_filling: 

141 return self.block.dtype 

142 else: 

143 return _get_dtype(maybe_promote(self.block.dtype, self.block.fill_value)[0]) 

144 

145 @cache_readonly 

146 def is_na(self): 

147 if self.block is None: 

148 return True 

149 

150 if not self.block._can_hold_na: 

151 return False 

152 

153 # Usually it's enough to check but a small fraction of values to see if 

154 # a block is NOT null, chunks should help in such cases. 1000 value 

155 # was chosen rather arbitrarily. 

156 values = self.block.values 

157 if self.block.is_categorical: 

158 values_flat = values.categories 

159 elif is_sparse(self.block.values.dtype): 

160 return False 

161 elif self.block.is_extension: 

162 values_flat = values 

163 else: 

164 values_flat = values.ravel(order="K") 

165 total_len = values_flat.shape[0] 

166 chunk_len = max(total_len // 40, 1000) 

167 for i in range(0, total_len, chunk_len): 

168 if not isna(values_flat[i : i + chunk_len]).all(): 

169 return False 

170 

171 return True 

172 

173 def get_reindexed_values(self, empty_dtype, upcasted_na): 

174 if upcasted_na is None: 

175 # No upcasting is necessary 

176 fill_value = self.block.fill_value 

177 values = self.block.get_values() 

178 else: 

179 fill_value = upcasted_na 

180 

181 if self.is_na: 

182 if getattr(self.block, "is_object", False): 

183 # we want to avoid filling with np.nan if we are 

184 # using None; we already know that we are all 

185 # nulls 

186 values = self.block.values.ravel(order="K") 

187 if len(values) and values[0] is None: 

188 fill_value = None 

189 

190 if getattr(self.block, "is_datetimetz", False) or is_datetime64tz_dtype( 

191 empty_dtype 

192 ): 

193 if self.block is None: 

194 array = empty_dtype.construct_array_type() 

195 return array( 

196 np.full(self.shape[1], fill_value.value), dtype=empty_dtype 

197 ) 

198 elif getattr(self.block, "is_categorical", False): 

199 pass 

200 elif getattr(self.block, "is_extension", False): 

201 pass 

202 else: 

203 missing_arr = np.empty(self.shape, dtype=empty_dtype) 

204 missing_arr.fill(fill_value) 

205 return missing_arr 

206 

207 if not self.indexers: 

208 if not self.block._can_consolidate: 

209 # preserve these for validation in concat_compat 

210 return self.block.values 

211 

212 if self.block.is_bool and not self.block.is_categorical: 

213 # External code requested filling/upcasting, bool values must 

214 # be upcasted to object to avoid being upcasted to numeric. 

215 values = self.block.astype(np.object_).values 

216 elif self.block.is_extension: 

217 values = self.block.values 

218 else: 

219 # No dtype upcasting is done here, it will be performed during 

220 # concatenation itself. 

221 values = self.block.get_values() 

222 

223 if not self.indexers: 

224 # If there's no indexing to be done, we want to signal outside 

225 # code that this array must be copied explicitly. This is done 

226 # by returning a view and checking `retval.base`. 

227 values = values.view() 

228 

229 else: 

230 for ax, indexer in self.indexers.items(): 

231 values = algos.take_nd(values, indexer, axis=ax, fill_value=fill_value) 

232 

233 return values 

234 

235 

236def concatenate_join_units(join_units, concat_axis, copy): 

237 """ 

238 Concatenate values from several join units along selected axis. 

239 """ 

240 if concat_axis == 0 and len(join_units) > 1: 

241 # Concatenating join units along ax0 is handled in _merge_blocks. 

242 raise AssertionError("Concatenating join units along axis0") 

243 

244 empty_dtype, upcasted_na = _get_empty_dtype_and_na(join_units) 

245 

246 to_concat = [ 

247 ju.get_reindexed_values(empty_dtype=empty_dtype, upcasted_na=upcasted_na) 

248 for ju in join_units 

249 ] 

250 

251 if len(to_concat) == 1: 

252 # Only one block, nothing to concatenate. 

253 concat_values = to_concat[0] 

254 if copy: 

255 if isinstance(concat_values, np.ndarray): 

256 # non-reindexed (=not yet copied) arrays are made into a view 

257 # in JoinUnit.get_reindexed_values 

258 if concat_values.base is not None: 

259 concat_values = concat_values.copy() 

260 else: 

261 concat_values = concat_values.copy() 

262 else: 

263 concat_values = concat_compat(to_concat, axis=concat_axis) 

264 

265 return concat_values 

266 

267 

268def _get_empty_dtype_and_na(join_units): 

269 """ 

270 Return dtype and N/A values to use when concatenating specified units. 

271 

272 Returned N/A value may be None which means there was no casting involved. 

273 

274 Returns 

275 ------- 

276 dtype 

277 na 

278 """ 

279 if len(join_units) == 1: 

280 blk = join_units[0].block 

281 if blk is None: 

282 return np.float64, np.nan 

283 

284 if _is_uniform_reindex(join_units): 

285 # FIXME: integrate property 

286 empty_dtype = join_units[0].block.dtype 

287 upcasted_na = join_units[0].block.fill_value 

288 return empty_dtype, upcasted_na 

289 

290 has_none_blocks = False 

291 dtypes = [None] * len(join_units) 

292 for i, unit in enumerate(join_units): 

293 if unit.block is None: 

294 has_none_blocks = True 

295 else: 

296 dtypes[i] = unit.dtype 

297 

298 upcast_classes = defaultdict(list) 

299 null_upcast_classes = defaultdict(list) 

300 for dtype, unit in zip(dtypes, join_units): 

301 if dtype is None: 

302 continue 

303 

304 if is_categorical_dtype(dtype): 

305 upcast_cls = "category" 

306 elif is_datetime64tz_dtype(dtype): 

307 upcast_cls = "datetimetz" 

308 elif issubclass(dtype.type, np.bool_): 

309 upcast_cls = "bool" 

310 elif issubclass(dtype.type, np.object_): 

311 upcast_cls = "object" 

312 elif is_datetime64_dtype(dtype): 

313 upcast_cls = "datetime" 

314 elif is_timedelta64_dtype(dtype): 

315 upcast_cls = "timedelta" 

316 elif is_sparse(dtype): 

317 upcast_cls = dtype.subtype.name 

318 elif is_extension_array_dtype(dtype): 

319 upcast_cls = "object" 

320 elif is_float_dtype(dtype) or is_numeric_dtype(dtype): 

321 upcast_cls = dtype.name 

322 else: 

323 upcast_cls = "float" 

324 

325 # Null blocks should not influence upcast class selection, unless there 

326 # are only null blocks, when same upcasting rules must be applied to 

327 # null upcast classes. 

328 if unit.is_na: 

329 null_upcast_classes[upcast_cls].append(dtype) 

330 else: 

331 upcast_classes[upcast_cls].append(dtype) 

332 

333 if not upcast_classes: 

334 upcast_classes = null_upcast_classes 

335 

336 # TODO: de-duplicate with maybe_promote? 

337 # create the result 

338 if "object" in upcast_classes: 

339 return np.dtype(np.object_), np.nan 

340 elif "bool" in upcast_classes: 

341 if has_none_blocks: 

342 return np.dtype(np.object_), np.nan 

343 else: 

344 return np.dtype(np.bool_), None 

345 elif "category" in upcast_classes: 

346 return np.dtype(np.object_), np.nan 

347 elif "datetimetz" in upcast_classes: 

348 # GH-25014. We use NaT instead of iNaT, since this eventually 

349 # ends up in DatetimeArray.take, which does not allow iNaT. 

350 dtype = upcast_classes["datetimetz"] 

351 return dtype[0], tslibs.NaT 

352 elif "datetime" in upcast_classes: 

353 return np.dtype("M8[ns]"), np.datetime64("NaT", "ns") 

354 elif "timedelta" in upcast_classes: 

355 return np.dtype("m8[ns]"), np.timedelta64("NaT", "ns") 

356 else: # pragma 

357 try: 

358 g = np.find_common_type(upcast_classes, []) 

359 except TypeError: 

360 # At least one is an ExtensionArray 

361 return np.dtype(np.object_), np.nan 

362 else: 

363 if is_float_dtype(g): 

364 return g, g.type(np.nan) 

365 elif is_numeric_dtype(g): 

366 if has_none_blocks: 

367 return np.float64, np.nan 

368 else: 

369 return g, None 

370 

371 msg = "invalid dtype determination in get_concat_dtype" 

372 raise AssertionError(msg) 

373 

374 

375def is_uniform_join_units(join_units): 

376 """ 

377 Check if the join units consist of blocks of uniform type that can 

378 be concatenated using Block.concat_same_type instead of the generic 

379 concatenate_join_units (which uses `concat_compat`). 

380 

381 """ 

382 return ( 

383 # all blocks need to have the same type 

384 all(type(ju.block) is type(join_units[0].block) for ju in join_units) 

385 and # noqa 

386 # no blocks that would get missing values (can lead to type upcasts) 

387 # unless we're an extension dtype. 

388 all(not ju.is_na or ju.block.is_extension for ju in join_units) 

389 and 

390 # no blocks with indexers (as then the dimensions do not fit) 

391 all(not ju.indexers for ju in join_units) 

392 and 

393 # only use this path when there is something to concatenate 

394 len(join_units) > 1 

395 ) 

396 

397 

398def _is_uniform_reindex(join_units) -> bool: 

399 return ( 

400 # TODO: should this be ju.block._can_hold_na? 

401 all(ju.block and ju.block.is_extension for ju in join_units) 

402 and len({ju.block.dtype.name for ju in join_units}) == 1 

403 ) 

404 

405 

406def _trim_join_unit(join_unit, length): 

407 """ 

408 Reduce join_unit's shape along item axis to length. 

409 

410 Extra items that didn't fit are returned as a separate block. 

411 """ 

412 

413 if 0 not in join_unit.indexers: 

414 extra_indexers = join_unit.indexers 

415 

416 if join_unit.block is None: 

417 extra_block = None 

418 else: 

419 extra_block = join_unit.block.getitem_block(slice(length, None)) 

420 join_unit.block = join_unit.block.getitem_block(slice(length)) 

421 else: 

422 extra_block = join_unit.block 

423 

424 extra_indexers = copy.copy(join_unit.indexers) 

425 extra_indexers[0] = extra_indexers[0][length:] 

426 join_unit.indexers[0] = join_unit.indexers[0][:length] 

427 

428 extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:] 

429 join_unit.shape = (length,) + join_unit.shape[1:] 

430 

431 return JoinUnit(block=extra_block, indexers=extra_indexers, shape=extra_shape) 

432 

433 

434def combine_concat_plans(plans, concat_axis): 

435 """ 

436 Combine multiple concatenation plans into one. 

437 

438 existing_plan is updated in-place. 

439 """ 

440 if len(plans) == 1: 

441 for p in plans[0]: 

442 yield p[0], [p[1]] 

443 

444 elif concat_axis == 0: 

445 offset = 0 

446 for plan in plans: 

447 last_plc = None 

448 

449 for plc, unit in plan: 

450 yield plc.add(offset), [unit] 

451 last_plc = plc 

452 

453 if last_plc is not None: 

454 offset += last_plc.as_slice.stop 

455 

456 else: 

457 num_ended = [0] 

458 

459 def _next_or_none(seq): 

460 retval = next(seq, None) 

461 if retval is None: 

462 num_ended[0] += 1 

463 return retval 

464 

465 plans = list(map(iter, plans)) 

466 next_items = list(map(_next_or_none, plans)) 

467 

468 while num_ended[0] != len(next_items): 

469 if num_ended[0] > 0: 

470 raise ValueError("Plan shapes are not aligned") 

471 

472 placements, units = zip(*next_items) 

473 

474 lengths = list(map(len, placements)) 

475 min_len, max_len = min(lengths), max(lengths) 

476 

477 if min_len == max_len: 

478 yield placements[0], units 

479 next_items[:] = map(_next_or_none, plans) 

480 else: 

481 yielded_placement = None 

482 yielded_units = [None] * len(next_items) 

483 for i, (plc, unit) in enumerate(next_items): 

484 yielded_units[i] = unit 

485 if len(plc) > min_len: 

486 # _trim_join_unit updates unit in place, so only 

487 # placement needs to be sliced to skip min_len. 

488 next_items[i] = (plc[min_len:], _trim_join_unit(unit, min_len)) 

489 else: 

490 yielded_placement = plc 

491 next_items[i] = _next_or_none(plans[i]) 

492 

493 yield yielded_placement, yielded_units