Skip to content

API Reference

This section provides detailed documentation for the main classes and methods of the indexed-parquet-dataset library.

IndexedParquetDataset

The main class for indexing and data access. Inherits from torch.utils.data.Dataset (if PyTorch is installed).

indexed_parquet.dataset.IndexedParquetDataset

Bases: Dataset

High-performance Parquet dataset with O(1) random access and Schema Evolution support.

Source code in src\indexed_parquet\dataset.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
class IndexedParquetDataset(Dataset):
    """High-performance Parquet dataset with O(1) random access and Schema Evolution support."""

    def __init__(
        self,
        index: BaseIndex,
        indices: Optional[np.ndarray] = None,
        mapper: Optional[SchemaMapper] = None,
        include_source_column: bool = False,
        source_column_name: str = "__source_file__",
        default_fill_value: Any = None,
        fill_values_by_type: Optional[Dict[str, Any]] = None,
        fill_values_by_column: Optional[Dict[str, Any]] = None,
        auto_fill: bool = False,
        max_open_files: int = 128,
        _type_casts: Optional[Dict[str, type]] = None,
        selected_columns: Optional[List[str]] = None,
    ):
        """Initializes the dataset.

        Args:
            index: A BaseIndex object containing dataset metadata.
            indices: Array of global indices to expose (for subsetting/shuffling).
            mapper: SchemaMapper for column renaming.
            include_source_column: If True, adds a virtual column with the source file path.
            source_column_name: Name of the virtual source column.
            default_fill_value: Value to use for missing data if no specific rule matches.
            fill_values_by_type: Dict mapping PyArrow types to default values.
            fill_values_by_column: Dict mapping column names to default values.
            auto_fill: If True, automatically populates fill_values_by_type with defaults.
                Note: auto_fill does NOT overwrite values already present in fill_values_by_type.
            max_open_files: Maximum number of simultaneously open Parquet file handles (LRU cache).
            _type_casts: Internal. Per-column cast functions used by concat upcasting.
        """
        self.index = index
        self.mapper = mapper or SchemaMapper()
        self.include_source_column = include_source_column
        self.source_column_name = source_column_name

        self.default_fill_value = default_fill_value
        self.fill_values_by_type = fill_values_by_type or {}

        if auto_fill:
            self._apply_auto_fill()

        self.fill_values_by_column = fill_values_by_column or {}
        self._type_casts: Dict[str, type] = _type_casts or {}  # Internal: concat upcasting
        self.selected_columns = selected_columns

        # indices allows for shuffling, filtering, and subsets without modifying the base index
        if indices is not None:
            self.indices = indices
        else:
            self.indices = np.arange(self.index.total_rows)

        # Cumulative row counts for fast lookups
        self.file_offsets = np.cumsum([0] + [f.num_rows for f in self.index.files])

        # LRU cache for open file handles (Lazy Loading)
        self.max_open_files = max_open_files
        self._file_handles: OrderedDict[int, pq.ParquetFile] = OrderedDict()

    def __getstate__(self):
        """Returns the state for pickling, excluding non-picklable file handles."""
        state = self.__dict__.copy()
        state['_file_handles'] = {} # Don't pickle open handles
        return state

    def __setstate__(self, state):
        """Restores the state after unpickling."""
        self.__dict__.update(state)
        self._file_handles = {} # Re-initialize empty cache

    @classmethod
    def from_folder(
        cls, 
        directory: str, 
        pattern: str = "*.parquet", 
        recursive: bool = True, 
        strict_schema: bool = False,
        auto_fill: bool = False,
        **kwargs
    ) -> 'IndexedParquetDataset':
        """Creates an IndexedParquetDataset by scanning a directory."""
        index = scan_directory(directory, pattern, recursive, strict_schema)
        return cls(index, auto_fill=auto_fill, **kwargs)

    @property
    def schema(self) -> List[str]:
        """Returns the list of column names available in the dataset (after mapping)."""
        if self.selected_columns is not None:
            return self.selected_columns

        all_cols = set()

        # 1. Base columns from original files
        for col in self.index.all_columns:
            target = self.mapper.mapping.get(col, col)
            if target != col:
                # Global mapping shadows the original name
                all_cols.add(target)
            else:
                # Local mapping check: is it shadowed in ALL files it appears in?
                is_visible_as_original = False
                for f_info in self.index.files:
                    if col in f_info.columns:
                        f_map = self.mapper.file_mappings.get(f_info.path, {})
                        if col not in f_map:
                            is_visible_as_original = True
                            break
                if is_visible_as_original:
                    all_cols.add(col)

        # 2. Add file-specific mapping targets
        for f_map in self.mapper.file_mappings.values():
            for target_col in f_map.values():
                all_cols.add(target_col)

        # 3. Add computed columns
        for col in self.mapper.transforms.keys():
            all_cols.add(col)

        if self.include_source_column:
            all_cols.add(self.source_column_name)

        return sorted(list(all_cols))

    def __len__(self) -> int:
        return len(self.indices)

    def __repr__(self) -> str:
        return (
            f"IndexedParquetDataset("
            f"rows={len(self):,}, "
            f"files={len(self.index.files)}, "
            f"columns={len(self.schema)}"
            f")"
        )

    def _get_file_and_local_idx(self, global_idx: int) -> tuple[int, int]:
        actual_idx = self.indices[global_idx]
        file_idx = np.searchsorted(self.file_offsets, actual_idx, side='right') - 1
        local_idx = actual_idx - self.file_offsets[file_idx]
        return int(file_idx), int(local_idx)

    def _get_file_handle(self, file_idx: int) -> pq.ParquetFile:
        if file_idx in self._file_handles:
            self._file_handles.move_to_end(file_idx)  # LRU touch
        else:
            self._file_handles[file_idx] = pq.ParquetFile(self.index.files[file_idx].path)
            if len(self._file_handles) > self.max_open_files:
                self._file_handles.popitem(last=False)  # evict least recently used
        return self._file_handles[file_idx]

    def _get_fill_value(self, column_name: str) -> Any:
        """Determines the fill value for a missing column based on hierarchy."""
        if column_name in self.fill_values_by_column:
            return self.fill_values_by_column[column_name]

        # Find original name (best effort)
        orig_name = None
        for k, v in self.mapper.mapping.items():
            if v == column_name:
                orig_name = k
                break
        if orig_name is None: orig_name = column_name

        col_type = self.index.column_types.get(orig_name)
        if col_type in self.fill_values_by_type:
            return self.fill_values_by_type[col_type]

        return self.default_fill_value

    def _deep_fill_nones(self, value: Any, fill: Any) -> Any:
        """Recursively replaces None values inside nested dicts and lists.

        PyArrow struct columns with null-typed fields (e.g. ``seed: null``)
        yield Python dicts containing ``None`` values at arbitrary depth.
        ``default_collate`` cannot handle ``NoneType`` anywhere in a batch,
        so we must sanitize the entire nested structure.

        Args:
            value: The value returned by PyArrow (may be dict, list, scalar or None).
            fill:  The replacement value to use wherever None is found.

        Returns:
            A sanitized copy of *value* with all Nones replaced by *fill*.
        """
        if value is None:
            return fill
        if isinstance(value, dict):
            return {k: self._deep_fill_nones(v, fill) for k, v in value.items()}
        if isinstance(value, list):
            return [self._deep_fill_nones(v, fill) for v in value]
        return value

    def _apply_auto_fill(self):
        """Populates fill_values_by_type with default values for common types."""
        defaults = {
            # Integers
            'int8': 0, 'int16': 0, 'int32': 0, 'int64': 0,
            'uint8': 0, 'uint16': 0, 'uint32': 0, 'uint64': 0,
            # Floats
            'float16': 0.0, 'float32': 0.0, 'float64': 0.0, 'double': 0.0, 'halffloat': 0.0,
            # Strings
            'string': "", 'large_string': "", 'utf8': "", 'large_utf8': "",
            # Bool
            'bool': False,
            # Dictionary/Categorical (best effort)
            'dictionary': ""
        }
        for t in set(self.index.column_types.values()):
            clean_t = t.lower().split('[')[0] # Handle complex types like list[int64]
            if clean_t in defaults and t not in self.fill_values_by_type:
                self.fill_values_by_type[t] = defaults[clean_t]

    def _read_rows_from_file(self, file_idx: int, local_indices: List[int]) -> List[Dict[str, Any]]:
        """Reads multiple rows from a single file efficiently."""
        pf = self._get_file_handle(file_idx)
        file_info = self.index.files[file_idx]
        file_path = file_info.path

        rg_to_indices = {}
        cumulative_rg_rows = 0
        for i, rg_rows in enumerate(file_info.row_groups):
            mask = (np.array(local_indices) >= cumulative_rg_rows) & (np.array(local_indices) < cumulative_rg_rows + rg_rows)
            rg_indices = np.array(local_indices)[mask]
            if len(rg_indices) > 0:
                rg_to_indices[i] = rg_indices - cumulative_rg_rows
            cumulative_rg_rows += rg_rows

        local_idx_to_result = {}
        target_schema = self.schema

        # Performance optimization: determine source columns to read from disk
        # We only read columns that are present in the target_schema and have a 
        # direct mapping to original columns, OR if we have transforms (which might
        # depend on anything, so we read all if transforms are active).
        # Actually, let's just map target_schema back to source columns.
        requested_source_cols = None
        if self.selected_columns is not None and not self.mapper.transforms:
            # We can optimize only if there are no arbitrary row transforms
            requested_source_cols = []
            for col in target_schema:
                if col == self.source_column_name:
                    continue
                src_col = self.mapper.get_source_column(col, file_path)
                if src_col in file_info.columns:
                    requested_source_cols.append(src_col)

            # If no columns were found but we need some, we must read at least one 
            # to keep the row count correct, but PyArrow handles empty column lists.
            # However, check for virtual columns.
            if not requested_source_cols and target_schema:
                # Fallback to reading all if optimization feels risky
                requested_source_cols = None

        for rg_idx, rg_local_indices in rg_to_indices.items():
            table = pf.read_row_group(rg_idx, columns=requested_source_cols)
            rg_start_offset = sum(file_info.row_groups[:rg_idx])

            for l_idx_in_rg in rg_local_indices:
                row_dict = table.slice(l_idx_in_rg, 1).to_pydict()
                item = {k: v[0] for k, v in row_dict.items()}

                # 0. Ensure all columns from index are present (as None/Fill)
                for col in self.index.all_columns:
                    if col not in item:
                        item[col] = self._get_fill_value(self.mapper.mapping.get(col, col))

                # 1. Inject virtual source column if needed
                if self.include_source_column:
                    item[self.source_column_name] = file_path

                # 2. Apply global then file-specific mapping logic & Computed Columns (via Mapper)
                item = self.mapper.map_columns(item, file_path)

                # Ensure all columns in final schema are present
                mapped_item = {}
                for col in target_schema:
                    if col not in item:
                        val = self._get_fill_value(col)
                    else:
                        val = item[col]

                    # Handle None if still None and default is set
                    if val is None:
                        val = self._get_fill_value(col)

                    if col in self._type_casts and val is not None:
                        try:
                            val = self._type_casts[col](val)
                        except (ValueError, TypeError):
                            pass

                    # Recursively sanitize nested dicts/lists (e.g. struct columns
                    # with null-typed fields like `seed: null` in PyArrow schemas).
                    # default_collate cannot handle NoneType anywhere in the tree.
                    if isinstance(val, (dict, list)):
                        fill = self._get_fill_value(col)
                        val = self._deep_fill_nones(val, fill)

                    mapped_item[col] = val

                # 3. Apply row-level transformations (new)
                # These are applied AFTER schema mapping to allow adding new columns
                for row_fn in self.mapper.row_transforms:
                    mapped_item = row_fn(mapped_item)

                local_idx_to_result[l_idx_in_rg + rg_start_offset] = mapped_item

        return [local_idx_to_result[l_idx] for l_idx in local_indices]

    def __getitem__(self, idx: Union[int, List[int], slice, np.ndarray]) -> Any:
        if isinstance(idx, (int, np.integer)):
            if idx < 0: idx += len(self)
            if idx < 0 or idx >= len(self): raise IndexError("Index out of range")
            return self.__getitems__([int(idx)])[0]
        elif isinstance(idx, (list, np.ndarray)):
            return self.__getitems__(list(idx))
        elif isinstance(idx, slice):
            return self.select(idx)
        else:
            raise TypeError(f"Invalid index type: {type(idx)}")

    def __getitems__(self, indices: List[int]) -> List[Dict[str, Any]]:
        file_to_local_indices = {}
        for i, global_idx in enumerate(indices):
            f_idx, l_idx = self._get_file_and_local_idx(global_idx)
            if f_idx not in file_to_local_indices: file_to_local_indices[f_idx] = []
            file_to_local_indices[f_idx].append((i, l_idx))

        results = [None] * len(indices)
        for f_idx, indexed_l_indices in file_to_local_indices.items():
            original_positions = [x[0] for x in indexed_l_indices]
            l_indices = [x[1] for x in indexed_l_indices]
            file_results = self._read_rows_from_file(f_idx, l_indices)
            for pos, res in zip(original_positions, file_results):
                results[pos] = res
        return results # type: ignore

    def shuffle(self, seed: Optional[int] = None) -> 'IndexedParquetDataset':
        rng = np.random.default_rng(seed)
        new_indices = self.indices.copy()
        rng.shuffle(new_indices)
        return self._clone_with_indices(new_indices)

    def select(self, range_or_indices: Union[slice, List[int], np.ndarray]) -> 'IndexedParquetDataset':
        new_indices = self.indices[range_or_indices]
        return self._clone_with_indices(new_indices)

    def limit(self, n: int) -> 'IndexedParquetDataset':
        return self.select(slice(0, n))

    def _clone_with_indices(self, new_indices: np.ndarray) -> 'IndexedParquetDataset':
        return IndexedParquetDataset(
            self.index, new_indices, self.mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=self._type_casts.copy(),
            selected_columns=self.selected_columns,
        )

    def _clone_with_mapper(self, new_mapper: SchemaMapper) -> 'IndexedParquetDataset':
        return IndexedParquetDataset(
            self.index, self.indices.copy(), new_mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=self._type_casts.copy(),
            selected_columns=self.selected_columns,
        )

    def map(
        self, 
        fn: Callable[[dict], dict], 
        *, 
        remove_columns: Optional[List[str]] = None,
        output_schema: Optional[List[str]] = None
    ) -> 'IndexedParquetDataset':
        """Applies a row-level transformation to the dataset.

        Args:
            fn: A function that takes a row (dict) and returns a transformed row (dict).
            remove_columns: Optional list of columns to remove from the result.
            output_schema: Optional explicit list of columns for the new schema.

        Returns:
            A new IndexedParquetDataset instance.
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy(),
            row_transforms=self.mapper.row_transforms.copy()
        )
        effective_fn = fn
        if remove_columns:
            def _wrapped_fn(row, _fn=fn, _cols=remove_columns):
                result = _fn(row)
                for c in _cols:
                    result.pop(c, None)
                return result
            effective_fn = _wrapped_fn

        new_mapper.row_transforms.append(effective_fn)
        ds = self._clone_with_mapper(new_mapper)
        if output_schema:
            ds.selected_columns = output_schema
        return ds


    def train_test_split(
        self, 
        test_size: Union[float, int], 
        shuffle: bool = True, 
        seed: Optional[int] = None, 
        stratify_by: Optional[str] = None
    ) -> tuple['IndexedParquetDataset', 'IndexedParquetDataset']:
        """Splits the dataset into train and test sets."""
        n = len(self)
        if isinstance(test_size, float):
            n_test = int(n * test_size)
        else:
            n_test = test_size
        n_train = n - n_test

        if stratify_by:
            # Read labels for all indices (required for stratification)
            labels = []
            for i in range(len(self)):
                labels.append(self[i][stratify_by])
            labels = np.array(labels)

            unique_labels, inverse = np.unique(labels, return_inverse=True)
            train_indices_list = []
            test_indices_list = []

            rng = np.random.default_rng(seed)

            for i in range(len(unique_labels)):
                idx_in_group = np.where(inverse == i)[0]
                if shuffle:
                    rng.shuffle(idx_in_group)

                group_n_test = int(len(idx_in_group) * (n_test / n))
                test_indices_list.extend(idx_in_group[:group_n_test])
                train_indices_list.extend(idx_in_group[group_n_test:])

            train_indices = self.indices[np.array(train_indices_list)]
            test_indices = self.indices[np.array(test_indices_list)]
        else:
            indices = self.indices.copy()
            if shuffle:
                rng = np.random.default_rng(seed)
                rng.shuffle(indices)

            train_indices = indices[:n_train]
            test_indices = indices[n_train:]

        return self._clone_with_indices(train_indices), self._clone_with_indices(test_indices)

    def copy(self) -> 'IndexedParquetDataset':
        """Returns a copy of the dataset."""
        return self._clone_with_indices(self.indices.copy())

    def select_columns(self, columns: List[str]) -> 'IndexedParquetDataset':
        """Selects a subset of columns to be returned.

        Args:
            columns: List of column names to keep.

        Returns:
            A new IndexedParquetDataset instance with updated schema.
        """
        # Validate columns exist in current schema
        current_schema = self.schema
        for col in columns:
            if col not in current_schema:
                warnings.warn(f"Column '{col}' not found in current schema.")

        ds = self.copy()
        ds.selected_columns = columns
        return ds

    def sample(self, n: int, seed: Optional[int] = None) -> 'IndexedParquetDataset':
        """Returns a random sample of n rows from the dataset.

        Args:
            n: Number of rows to sample.
            seed: Random seed for reproducibility.

        Returns:
            A new IndexedParquetDataset instance.
        """
        if n > len(self):
            n = len(self)

        rng = np.random.default_rng(seed)
        indices = rng.choice(len(self), size=n, replace=False)
        return self.select(indices)

    def iter_batches(self, batch_size: int, shuffle: bool = False, seed: Optional[int] = None):
        """Yields batches of data from the dataset.

        Args:
            batch_size: Number of rows per batch.
            shuffle: Whether to shuffle before iterating.
            seed: Random seed for shuffling.
        """
        ds = self.shuffle(seed) if shuffle else self
        n = len(ds)
        for i in range(0, n, batch_size):
            end = min(i + batch_size, n)
            yield ds[i:end]

    def alias(self, name: str, source: Union[str, Callable]) -> 'IndexedParquetDataset':
        """Creates a new alias for a column or a new computed column.

        Args:
            name: The target name of the column.
            source: Either an original column name (string) or a function function(row) -> value.

        Returns:
            A new IndexedParquetDataset instance.
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy()
        )
        if isinstance(source, str):
            new_mapper.mapping[source] = name
            # Remove transform if we are re-aliasing to a source column
            if name in new_mapper.transforms:
                del new_mapper.transforms[name]
        elif callable(source):
            new_mapper.transforms[name] = source
        else:
            raise TypeError("Alias source must be a string or a callable.")

        return self._clone_with_mapper(new_mapper)

    def set_file_mapping(self, file_path: str, mapping: Dict[str, str]) -> 'IndexedParquetDataset':
        """Sets a file-specific column mapping.

        Args:
            file_path: Absolute path to the file.
            mapping: Dict mapping source column names to target names.
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy()
        )
        # Ensure absolute path
        abs_path = os.path.abspath(file_path)
        new_mapper.file_mappings[abs_path] = mapping
        return self._clone_with_mapper(new_mapper)

    def rename(self, old_name: str, new_name: str) -> 'IndexedParquetDataset':
        """Renames a column."""
        return self.alias(new_name, old_name)


    def cast(self, column: str, target_type: Union[type, str, Callable]) -> 'IndexedParquetDataset':
        """Changes the type of a column using an alias transformation.

        Args:
            column: The name of the column to cast.
            target_type: The target type (int, float, str, etc.) or a callable.
        """
        if isinstance(target_type, str):
            if target_type == 'int': cast_fn = int
            elif target_type in ('float', 'double'): cast_fn = float
            elif target_type in ('str', 'string'): cast_fn = str
            else: raise ValueError(f"Unsupported type string: {target_type}")
        elif callable(target_type):
            cast_fn = target_type # type: ignore
        else:
            raise TypeError("target_type must be a string (int, float, str) or a callable.")

        def transform(row):
            val = row.get(column)
            if val is None:
                return None
            try:
                return cast_fn(val)
            except (ValueError, TypeError):
                return val

        return self.alias(column, transform)

    def to_parquet(self, path: str, chunk_size: int = 1024, shard_size: Optional[int] = None):
        """Materializes the dataset to one or more Parquet files.

        Args:
            path: Output file path or directory (if sharding).
            chunk_size: Cache size for intermediate batch collection.
            shard_size: If set, splits the dataset into multiple files of this many rows.
        """
        if shard_size:
            os.makedirs(path, exist_ok=True)

        writer = None
        rows_in_current_shard = 0
        shard_idx = 0

        def get_shard_path():
            if shard_size:
                return os.path.join(path, f"part_{shard_idx:04d}.parquet")
            return path

        effective_chunk_size = min(chunk_size, shard_size) if shard_size else chunk_size

        for i in range(0, len(self), effective_chunk_size):
            batch_indices = list(range(i, min(i + effective_chunk_size, len(self))))
            batch_data = self[batch_indices]

            if not batch_data: continue

            table = pa.Table.from_pylist(batch_data)

            # Sharding logic: if we are at the start of a new shard, close old and open new
            if shard_size and rows_in_current_shard >= shard_size:
                if writer:
                    writer.close()
                    writer = None
                shard_idx += 1
                rows_in_current_shard = 0

            if writer is None:
                writer = pq.ParquetWriter(get_shard_path(), table.schema)

            writer.write_table(table)
            rows_in_current_shard += len(batch_data)

        if writer:
            writer.close()

    def clone(self, path: str) -> 'IndexedParquetDataset':
        """Materializes all computations and returns a new dataset instance."""
        self.to_parquet(path)
        return IndexedParquetDataset.from_folder(os.path.dirname(path), pattern=os.path.basename(path))

    def filter(
        self, 
        path_pattern: Optional[Union[str, Callable]] = None,
        path_filter: Optional[Union[str, List[str]]] = None,
        column_conditions: Optional[Dict[str, Any]] = None,
        predicate: Optional[Callable[[Dict[str, Any]], bool]] = None
    ) -> 'IndexedParquetDataset':
        if callable(path_pattern): predicate = path_pattern; path_pattern = None
        current_indices = self.indices.copy()

        if path_pattern or path_filter:
            valid_file_indices = []
            filters = [path_filter] if isinstance(path_filter, str) else (path_filter or [])
            for i, f in enumerate(self.index.files):
                match = (path_pattern and isinstance(path_pattern, str) and path_pattern in f.path)
                if not match:
                    for pattern in filters:
                        if fnmatch.fnmatch(f.path, pattern): match = True; break
                if match: valid_file_indices.append(i)

            mask = np.zeros(len(current_indices), dtype=bool)
            for f_idx in valid_file_indices:
                start, end = self.file_offsets[f_idx], self.file_offsets[f_idx + 1]
                mask |= (current_indices >= start) & (current_indices < end)
            current_indices = current_indices[mask]

        if len(current_indices) > 0 and column_conditions:
            file_to_indices = {}
            for idx in current_indices:
                f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
                if f_idx not in file_to_indices: file_to_indices[f_idx] = []
                file_to_indices[f_idx].append(idx)

            new_indices_list = []
            for f_idx, f_global_indices in file_to_indices.items():
                f_info, pf = self.index.files[f_idx], self._get_file_handle(f_idx)
                # Simplified condition check via pyarrow
                table = pf.read(columns=[self.mapper.get_source_column(c) for c in column_conditions.keys() if self.mapper.get_source_column(c) in f_info.columns])
                file_mask = None
                for col, cond in column_conditions.items():
                    src_col = self.mapper.get_source_column(col, f_info.path)
                    if src_col not in table.column_names:
                        c_mask = pa.scalar(None, type=pa.bool_())
                    else:
                        arr = table.column(src_col)
                        if isinstance(cond, tuple):
                            op, val = cond
                            if op == '==': c_mask = pc.equal(arr, val)
                            elif op == '>': c_mask = pc.greater(arr, val)
                            elif op == '>=': c_mask = pc.greater_equal(arr, val)
                            elif op == '<': c_mask = pc.less(arr, val)
                            elif op == '<=': c_mask = pc.less_equal(arr, val)
                            else: c_mask = None
                        else:
                            c_mask = pc.equal(arr, cond)

                    if c_mask is not None:
                        if file_mask is None: file_mask = c_mask
                        else: file_mask = pc.and_(file_mask, c_mask)

                if file_mask is None:
                    file_mask_np = np.ones(len(table), dtype=bool)
                else:
                    file_mask_np = pc.fill_null(file_mask, False).to_numpy().astype(bool)

                f_local_indices = (np.array(f_global_indices) - self.file_offsets[f_idx]).astype(int)
                new_indices_list.append(np.array(f_global_indices)[file_mask_np[f_local_indices]])
            current_indices = np.concatenate(new_indices_list) if new_indices_list else np.array([], dtype=int)

        if len(current_indices) > 0 and predicate:
            temp_ds = self._clone_with_indices(current_indices)
            mask = [predicate(temp_ds[i]) for i in range(len(temp_ds))]
            current_indices = current_indices[np.array(mask)]

        return self._clone_with_indices(current_indices)

    def merge(self, other: 'IndexedParquetDataset') -> 'IndexedParquetDataset':
        """Merges this dataset with another one, deduplicating identical rows.

        A row is considered identical if it comes from the same file and has
        the same local row index.
        """
        # 1. Unified unique files
        self_paths = {f.path: i for i, f in enumerate(self.index.files)}
        other_paths = {f.path: i for i, f in enumerate(other.index.files)}

        all_unique_paths = sorted(list(set(self_paths.keys()) | set(other_paths.keys())))
        path_to_new_idx = {path: i for i, path in enumerate(all_unique_paths)}

        new_files_info = []
        for path in all_unique_paths:
            if path in self_paths:
                new_files_info.append(self.index.files[self_paths[path]])
            else:
                new_files_info.append(other.index.files[other_paths[path]])

        # 2. Map indices to row identity (new_file_idx, local_idx)
        def get_row_identities(ds, path_map):
            ids = []
            for i in range(len(ds)):
                f_idx, l_idx = ds._get_file_and_local_idx(i)
                f_path = ds.index.files[f_idx].path
                new_f_idx = path_map[f_path]
                ids.append((new_f_idx, l_idx))
            return ids

        self_ids = get_row_identities(self, path_to_new_idx)
        other_ids = get_row_identities(other, path_to_new_idx)

        # 3. Deduplicate preserving order (self first, then new from other)
        # We use a dictionary as an ordered set
        seen = {}
        unified_ids = []
        for row_id in (self_ids + other_ids):
            if row_id not in seen:
                seen[row_id] = True
                unified_ids.append(row_id)

        # 4. Create new BaseIndex metadata
        new_total_rows_meta = sum(f.num_rows for f in new_files_info)
        new_all_columns = sorted(list(set(self.index.all_columns) | set(other.index.all_columns)))

        # 5. Type merging logic (upcasting conflicts)
        new_column_types = self.index.column_types.copy()
        type_casts = self._type_casts.copy()
        for col, o_type in other.index.column_types.items():
            if col in new_column_types:
                s_type = new_column_types[col]
                if s_type != o_type:
                    common_type = 'string'
                    s_is_float = 'float' in s_type or 'double' in s_type
                    o_is_float = 'float' in o_type or 'double' in o_type
                    s_is_int = 'int' in s_type
                    o_is_int = 'int' in o_type

                    if (s_is_int and o_is_float) or (s_is_float and o_is_int) or (s_is_float and o_is_float):
                        common_type = 'double'

                    warnings.warn(f"Type mismatch for column '{col}': {s_type} vs {o_type}. Upcasting to {common_type}.")
                    new_column_types[col] = common_type
                    cast_fn = str if common_type == 'string' else (float if common_type == 'double' else None)
                    if cast_fn: type_casts[col] = cast_fn
            else:
                new_column_types[col] = o_type

        new_index = BaseIndex(new_files_info, new_total_rows_meta, new_all_columns, new_column_types)

        # 6. Re-calculate global indices based on new file sequence
        new_file_offsets = np.zeros(len(new_files_info) + 1, dtype=int)
        current_offset = 0
        for i, f in enumerate(new_files_info):
            new_file_offsets[i] = current_offset
            current_offset += f.num_rows
        new_file_offsets[-1] = current_offset

        new_indices = np.array([new_file_offsets[f_idx] + l_idx for f_idx, l_idx in unified_ids], dtype=int)

        # 7. Merge mappers
        new_mapper = self.mapper.merge(
            other.mapper,
            list(self_paths.keys()),
            list(other_paths.keys())
        )

        return IndexedParquetDataset(
            new_index, new_indices, new_mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=type_casts,
            selected_columns=self.selected_columns
        )

    def get_supported_types(self) -> Dict[str, Any]:
        """Returns types and their current defaults."""
        res = {}
        for col, t in self.index.column_types.items():
            default = self.fill_values_by_column.get(col) or self.fill_values_by_type.get(t) or self.default_fill_value
            res[t] = {"example_column": col, "default": default}
        return res

    def generate_collate_fn(self, on_none: str = 'raise') -> Callable:
        """Returns a DataLoader-compatible collate function with robust None handling.

        Args:
            on_none: Strategy for handling None values.
                'raise' (default): Raises a descriptive TypeError.
                'drop': Drops samples containing None from the batch.
                'fill': Replaces None with 0/"" based on fill_values configuration.

        Raises:
            ImportError: If PyTorch is not installed.
        """
        if not _TORCH_AVAILABLE:
            raise ImportError(
                "PyTorch is required for generate_collate_fn. "
                "Install it with: pip install torch"
            )
        # Pre-compute fill map: {column_name -> fill_value} for all known columns.
        # This avoids storing a reference to the full dataset inside CollateHandler.
        fill_map = {col: self._get_fill_value(col) for col in self.schema}
        return CollateHandler(fill_map, on_none)

    def info(self) -> None:
        """Prints summary statistics and metadata for the dataset."""
        total_indexed_rows = self.index.total_rows
        visible_rows = len(self)
        num_indexed_files = len(self.index.files)

        # Calculate visible count per file
        if visible_rows > 0:
            file_indices = np.searchsorted(self.file_offsets, self.indices, side='right') - 1
            unique_f_idx, counts = np.unique(file_indices, return_counts=True)
            f_idx_to_visible_count = dict(zip(unique_f_idx, counts))
        else:
            f_idx_to_visible_count = {}

        active_files = len(f_idx_to_visible_count)

        # Calculate storage size
        total_bytes = 0
        for f in self.index.files:
            if os.path.exists(f.path):
                total_bytes += os.path.getsize(f.path)

        if total_bytes < 1024**2:
            storage_str = f"{total_bytes / 1024:.2f} KB"
        elif total_bytes < 1024**3:
            storage_str = f"{total_bytes / (1024**2):.2f} MB"
        else:
            storage_str = f"{total_bytes / (1024**3):.2f} GB"

        print(f"\n{'='*95}")
        print(f" IndexedParquetDataset Summary")
        print(f"{'='*95}")
        print(f" Files (Active/Indexed):  {active_files}/{num_indexed_files:<6}  |  Storage Size:  {storage_str}")
        print(f" Rows (Visible/Total):    {visible_rows:,}/{total_indexed_rows:,} ({visible_rows/total_indexed_rows:.1%})")
        print(f"{'-'*95}")

        # Files Table
        print("\nFiles in Index:")
        file_header = f"{'#':<3} | {'Visible':>12} | {'Total':>12} | {'%':>6} | {'Groups':>6} | {'Path':<}"
        print(file_header)
        print("-" * 95)
        for i, f in enumerate(self.index.files):
            vis_count = f_idx_to_visible_count.get(i, 0)
            vis_ratio = (vis_count / f.num_rows) if f.num_rows > 0 else 0
            display_path = (f"...{f.path[-45:]}" if len(f.path) > 45 else f.path)
            print(f"{i:<3} | {vis_count:>12,} | {f.num_rows:>12,} | {vis_ratio:>6.1%} | {len(f.row_groups):>6} | {display_path}")

        # Columns Table
        print("\nColumn Statistics (Active State):")
        col_header = f"{'Column':<25} | {'Type':<12} | {'Files':>6} | {'Presence':>8} | {'Visible Rows':>12} | {'Coverage'}"
        print(col_header)
        print("-" * 95)

        visible_schema = self.schema
        for col in visible_schema:
            files_present = 0
            visible_rows_with_col = 0
            orig_name = next((k for k, v in self.mapper.mapping.items() if v == col), col)
            ctype = self.index.column_types.get(orig_name, "n/a")

            for f_idx, f in enumerate(self.index.files):
                src_col = self.mapper.get_source_column(col, f.path)
                if src_col in f.columns:
                    files_present += 1
                    visible_rows_with_col += f_idx_to_visible_count.get(f_idx, 0)

            presence_pct = files_present / num_indexed_files
            coverage_pct = visible_rows_with_col / visible_rows if visible_rows > 0 else 0

            # Truncate column name if too long
            col_display = (col[:22] + "...") if len(col) > 25 else col
            print(f"{col_display:<25} | {ctype:<12} | {files_present:>6} | {presence_pct:>8.1%} | {visible_rows_with_col:>12,} | {coverage_pct:>8.1%}")

        # Mappings
        if self.mapper.mapping or self.mapper.file_mappings or self.mapper.transforms:
            print("\nActive Mappings & Transforms:")
            if self.mapper.mapping:
                print(f"  Aliases: {self.mapper.mapping}")
            if self.mapper.transforms:
                print(f"  Computed Columns: {list(self.mapper.transforms.keys())}")
            if self.mapper.file_mappings:
                print(f"  File-specific overrides active for {len(self.mapper.file_mappings)} files")

        print(f"{'='*95}\n")

    def save_index(self, path: str):
        import pickle
        with open(path, "wb") as f:
            pickle.dump({
                "index": self.index, 
                "indices": self.indices, 
                "mapper": self.mapper.to_dict(), 
                "source": (self.include_source_column, self.source_column_name),
                "fill": (self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column)
            }, f)

    @classmethod
    def load_index(cls, path: str):
        import pickle
        with open(path, "rb") as f:
            d = pickle.load(f)

        inc_source, source_name = d.get("source", (False, "__source_file__"))

        return cls(
            d['index'], 
            d['indices'], 
            SchemaMapper.from_dict(d['mapper']), 
            include_source_column=inc_source,
            source_column_name=source_name,
            default_fill_value=d['fill'][0], 
            fill_values_by_type=d['fill'][1], 
            fill_values_by_column=d['fill'][2]
        )

Attributes

schema property

Returns the list of column names available in the dataset (after mapping).

Functions

from_folder(directory, pattern='*.parquet', recursive=True, strict_schema=False, auto_fill=False, **kwargs) classmethod

Creates an IndexedParquetDataset by scanning a directory.

Source code in src\indexed_parquet\dataset.py
@classmethod
def from_folder(
    cls, 
    directory: str, 
    pattern: str = "*.parquet", 
    recursive: bool = True, 
    strict_schema: bool = False,
    auto_fill: bool = False,
    **kwargs
) -> 'IndexedParquetDataset':
    """Creates an IndexedParquetDataset by scanning a directory."""
    index = scan_directory(directory, pattern, recursive, strict_schema)
    return cls(index, auto_fill=auto_fill, **kwargs)

load_index(path) classmethod

Source code in src\indexed_parquet\dataset.py
@classmethod
def load_index(cls, path: str):
    import pickle
    with open(path, "rb") as f:
        d = pickle.load(f)

    inc_source, source_name = d.get("source", (False, "__source_file__"))

    return cls(
        d['index'], 
        d['indices'], 
        SchemaMapper.from_dict(d['mapper']), 
        include_source_column=inc_source,
        source_column_name=source_name,
        default_fill_value=d['fill'][0], 
        fill_values_by_type=d['fill'][1], 
        fill_values_by_column=d['fill'][2]
    )

save_index(path)

Source code in src\indexed_parquet\dataset.py
def save_index(self, path: str):
    import pickle
    with open(path, "wb") as f:
        pickle.dump({
            "index": self.index, 
            "indices": self.indices, 
            "mapper": self.mapper.to_dict(), 
            "source": (self.include_source_column, self.source_column_name),
            "fill": (self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column)
        }, f)

info()

Prints summary statistics and metadata for the dataset.

Source code in src\indexed_parquet\dataset.py
def info(self) -> None:
    """Prints summary statistics and metadata for the dataset."""
    total_indexed_rows = self.index.total_rows
    visible_rows = len(self)
    num_indexed_files = len(self.index.files)

    # Calculate visible count per file
    if visible_rows > 0:
        file_indices = np.searchsorted(self.file_offsets, self.indices, side='right') - 1
        unique_f_idx, counts = np.unique(file_indices, return_counts=True)
        f_idx_to_visible_count = dict(zip(unique_f_idx, counts))
    else:
        f_idx_to_visible_count = {}

    active_files = len(f_idx_to_visible_count)

    # Calculate storage size
    total_bytes = 0
    for f in self.index.files:
        if os.path.exists(f.path):
            total_bytes += os.path.getsize(f.path)

    if total_bytes < 1024**2:
        storage_str = f"{total_bytes / 1024:.2f} KB"
    elif total_bytes < 1024**3:
        storage_str = f"{total_bytes / (1024**2):.2f} MB"
    else:
        storage_str = f"{total_bytes / (1024**3):.2f} GB"

    print(f"\n{'='*95}")
    print(f" IndexedParquetDataset Summary")
    print(f"{'='*95}")
    print(f" Files (Active/Indexed):  {active_files}/{num_indexed_files:<6}  |  Storage Size:  {storage_str}")
    print(f" Rows (Visible/Total):    {visible_rows:,}/{total_indexed_rows:,} ({visible_rows/total_indexed_rows:.1%})")
    print(f"{'-'*95}")

    # Files Table
    print("\nFiles in Index:")
    file_header = f"{'#':<3} | {'Visible':>12} | {'Total':>12} | {'%':>6} | {'Groups':>6} | {'Path':<}"
    print(file_header)
    print("-" * 95)
    for i, f in enumerate(self.index.files):
        vis_count = f_idx_to_visible_count.get(i, 0)
        vis_ratio = (vis_count / f.num_rows) if f.num_rows > 0 else 0
        display_path = (f"...{f.path[-45:]}" if len(f.path) > 45 else f.path)
        print(f"{i:<3} | {vis_count:>12,} | {f.num_rows:>12,} | {vis_ratio:>6.1%} | {len(f.row_groups):>6} | {display_path}")

    # Columns Table
    print("\nColumn Statistics (Active State):")
    col_header = f"{'Column':<25} | {'Type':<12} | {'Files':>6} | {'Presence':>8} | {'Visible Rows':>12} | {'Coverage'}"
    print(col_header)
    print("-" * 95)

    visible_schema = self.schema
    for col in visible_schema:
        files_present = 0
        visible_rows_with_col = 0
        orig_name = next((k for k, v in self.mapper.mapping.items() if v == col), col)
        ctype = self.index.column_types.get(orig_name, "n/a")

        for f_idx, f in enumerate(self.index.files):
            src_col = self.mapper.get_source_column(col, f.path)
            if src_col in f.columns:
                files_present += 1
                visible_rows_with_col += f_idx_to_visible_count.get(f_idx, 0)

        presence_pct = files_present / num_indexed_files
        coverage_pct = visible_rows_with_col / visible_rows if visible_rows > 0 else 0

        # Truncate column name if too long
        col_display = (col[:22] + "...") if len(col) > 25 else col
        print(f"{col_display:<25} | {ctype:<12} | {files_present:>6} | {presence_pct:>8.1%} | {visible_rows_with_col:>12,} | {coverage_pct:>8.1%}")

    # Mappings
    if self.mapper.mapping or self.mapper.file_mappings or self.mapper.transforms:
        print("\nActive Mappings & Transforms:")
        if self.mapper.mapping:
            print(f"  Aliases: {self.mapper.mapping}")
        if self.mapper.transforms:
            print(f"  Computed Columns: {list(self.mapper.transforms.keys())}")
        if self.mapper.file_mappings:
            print(f"  File-specific overrides active for {len(self.mapper.file_mappings)} files")

    print(f"{'='*95}\n")

shuffle(seed=None)

Source code in src\indexed_parquet\dataset.py
def shuffle(self, seed: Optional[int] = None) -> 'IndexedParquetDataset':
    rng = np.random.default_rng(seed)
    new_indices = self.indices.copy()
    rng.shuffle(new_indices)
    return self._clone_with_indices(new_indices)

train_test_split(test_size, shuffle=True, seed=None, stratify_by=None)

Splits the dataset into train and test sets.

Source code in src\indexed_parquet\dataset.py
def train_test_split(
    self, 
    test_size: Union[float, int], 
    shuffle: bool = True, 
    seed: Optional[int] = None, 
    stratify_by: Optional[str] = None
) -> tuple['IndexedParquetDataset', 'IndexedParquetDataset']:
    """Splits the dataset into train and test sets."""
    n = len(self)
    if isinstance(test_size, float):
        n_test = int(n * test_size)
    else:
        n_test = test_size
    n_train = n - n_test

    if stratify_by:
        # Read labels for all indices (required for stratification)
        labels = []
        for i in range(len(self)):
            labels.append(self[i][stratify_by])
        labels = np.array(labels)

        unique_labels, inverse = np.unique(labels, return_inverse=True)
        train_indices_list = []
        test_indices_list = []

        rng = np.random.default_rng(seed)

        for i in range(len(unique_labels)):
            idx_in_group = np.where(inverse == i)[0]
            if shuffle:
                rng.shuffle(idx_in_group)

            group_n_test = int(len(idx_in_group) * (n_test / n))
            test_indices_list.extend(idx_in_group[:group_n_test])
            train_indices_list.extend(idx_in_group[group_n_test:])

        train_indices = self.indices[np.array(train_indices_list)]
        test_indices = self.indices[np.array(test_indices_list)]
    else:
        indices = self.indices.copy()
        if shuffle:
            rng = np.random.default_rng(seed)
            rng.shuffle(indices)

        train_indices = indices[:n_train]
        test_indices = indices[n_train:]

    return self._clone_with_indices(train_indices), self._clone_with_indices(test_indices)

select(range_or_indices)

Source code in src\indexed_parquet\dataset.py
def select(self, range_or_indices: Union[slice, List[int], np.ndarray]) -> 'IndexedParquetDataset':
    new_indices = self.indices[range_or_indices]
    return self._clone_with_indices(new_indices)

limit(n)

Source code in src\indexed_parquet\dataset.py
def limit(self, n: int) -> 'IndexedParquetDataset':
    return self.select(slice(0, n))

sample(n, seed=None)

Returns a random sample of n rows from the dataset.

Parameters:

Name Type Description Default
n int

Number of rows to sample.

required
seed Optional[int]

Random seed for reproducibility.

None

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src\indexed_parquet\dataset.py
def sample(self, n: int, seed: Optional[int] = None) -> 'IndexedParquetDataset':
    """Returns a random sample of n rows from the dataset.

    Args:
        n: Number of rows to sample.
        seed: Random seed for reproducibility.

    Returns:
        A new IndexedParquetDataset instance.
    """
    if n > len(self):
        n = len(self)

    rng = np.random.default_rng(seed)
    indices = rng.choice(len(self), size=n, replace=False)
    return self.select(indices)

select_columns(columns)

Selects a subset of columns to be returned.

Parameters:

Name Type Description Default
columns List[str]

List of column names to keep.

required

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance with updated schema.

Source code in src\indexed_parquet\dataset.py
def select_columns(self, columns: List[str]) -> 'IndexedParquetDataset':
    """Selects a subset of columns to be returned.

    Args:
        columns: List of column names to keep.

    Returns:
        A new IndexedParquetDataset instance with updated schema.
    """
    # Validate columns exist in current schema
    current_schema = self.schema
    for col in columns:
        if col not in current_schema:
            warnings.warn(f"Column '{col}' not found in current schema.")

    ds = self.copy()
    ds.selected_columns = columns
    return ds

map(fn, *, remove_columns=None, output_schema=None)

Applies a row-level transformation to the dataset.

Parameters:

Name Type Description Default
fn Callable[[dict], dict]

A function that takes a row (dict) and returns a transformed row (dict).

required
remove_columns Optional[List[str]]

Optional list of columns to remove from the result.

None
output_schema Optional[List[str]]

Optional explicit list of columns for the new schema.

None

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src\indexed_parquet\dataset.py
def map(
    self, 
    fn: Callable[[dict], dict], 
    *, 
    remove_columns: Optional[List[str]] = None,
    output_schema: Optional[List[str]] = None
) -> 'IndexedParquetDataset':
    """Applies a row-level transformation to the dataset.

    Args:
        fn: A function that takes a row (dict) and returns a transformed row (dict).
        remove_columns: Optional list of columns to remove from the result.
        output_schema: Optional explicit list of columns for the new schema.

    Returns:
        A new IndexedParquetDataset instance.
    """
    new_mapper = SchemaMapper(
        mapping=self.mapper.mapping.copy(),
        file_mappings=self.mapper.file_mappings.copy(),
        transforms=self.mapper.transforms.copy(),
        row_transforms=self.mapper.row_transforms.copy()
    )
    effective_fn = fn
    if remove_columns:
        def _wrapped_fn(row, _fn=fn, _cols=remove_columns):
            result = _fn(row)
            for c in _cols:
                result.pop(c, None)
            return result
        effective_fn = _wrapped_fn

    new_mapper.row_transforms.append(effective_fn)
    ds = self._clone_with_mapper(new_mapper)
    if output_schema:
        ds.selected_columns = output_schema
    return ds

alias(name, source)

Creates a new alias for a column or a new computed column.

Parameters:

Name Type Description Default
name str

The target name of the column.

required
source Union[str, Callable]

Either an original column name (string) or a function function(row) -> value.

required

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src\indexed_parquet\dataset.py
def alias(self, name: str, source: Union[str, Callable]) -> 'IndexedParquetDataset':
    """Creates a new alias for a column or a new computed column.

    Args:
        name: The target name of the column.
        source: Either an original column name (string) or a function function(row) -> value.

    Returns:
        A new IndexedParquetDataset instance.
    """
    new_mapper = SchemaMapper(
        mapping=self.mapper.mapping.copy(),
        file_mappings=self.mapper.file_mappings.copy(),
        transforms=self.mapper.transforms.copy()
    )
    if isinstance(source, str):
        new_mapper.mapping[source] = name
        # Remove transform if we are re-aliasing to a source column
        if name in new_mapper.transforms:
            del new_mapper.transforms[name]
    elif callable(source):
        new_mapper.transforms[name] = source
    else:
        raise TypeError("Alias source must be a string or a callable.")

    return self._clone_with_mapper(new_mapper)

rename(old_name, new_name)

Renames a column.

Source code in src\indexed_parquet\dataset.py
def rename(self, old_name: str, new_name: str) -> 'IndexedParquetDataset':
    """Renames a column."""
    return self.alias(new_name, old_name)

cast(column, target_type)

Changes the type of a column using an alias transformation.

Parameters:

Name Type Description Default
column str

The name of the column to cast.

required
target_type Union[type, str, Callable]

The target type (int, float, str, etc.) or a callable.

required
Source code in src\indexed_parquet\dataset.py
def cast(self, column: str, target_type: Union[type, str, Callable]) -> 'IndexedParquetDataset':
    """Changes the type of a column using an alias transformation.

    Args:
        column: The name of the column to cast.
        target_type: The target type (int, float, str, etc.) or a callable.
    """
    if isinstance(target_type, str):
        if target_type == 'int': cast_fn = int
        elif target_type in ('float', 'double'): cast_fn = float
        elif target_type in ('str', 'string'): cast_fn = str
        else: raise ValueError(f"Unsupported type string: {target_type}")
    elif callable(target_type):
        cast_fn = target_type # type: ignore
    else:
        raise TypeError("target_type must be a string (int, float, str) or a callable.")

    def transform(row):
        val = row.get(column)
        if val is None:
            return None
        try:
            return cast_fn(val)
        except (ValueError, TypeError):
            return val

    return self.alias(column, transform)

filter(path_pattern=None, path_filter=None, column_conditions=None, predicate=None)

Source code in src\indexed_parquet\dataset.py
def filter(
    self, 
    path_pattern: Optional[Union[str, Callable]] = None,
    path_filter: Optional[Union[str, List[str]]] = None,
    column_conditions: Optional[Dict[str, Any]] = None,
    predicate: Optional[Callable[[Dict[str, Any]], bool]] = None
) -> 'IndexedParquetDataset':
    if callable(path_pattern): predicate = path_pattern; path_pattern = None
    current_indices = self.indices.copy()

    if path_pattern or path_filter:
        valid_file_indices = []
        filters = [path_filter] if isinstance(path_filter, str) else (path_filter or [])
        for i, f in enumerate(self.index.files):
            match = (path_pattern and isinstance(path_pattern, str) and path_pattern in f.path)
            if not match:
                for pattern in filters:
                    if fnmatch.fnmatch(f.path, pattern): match = True; break
            if match: valid_file_indices.append(i)

        mask = np.zeros(len(current_indices), dtype=bool)
        for f_idx in valid_file_indices:
            start, end = self.file_offsets[f_idx], self.file_offsets[f_idx + 1]
            mask |= (current_indices >= start) & (current_indices < end)
        current_indices = current_indices[mask]

    if len(current_indices) > 0 and column_conditions:
        file_to_indices = {}
        for idx in current_indices:
            f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
            if f_idx not in file_to_indices: file_to_indices[f_idx] = []
            file_to_indices[f_idx].append(idx)

        new_indices_list = []
        for f_idx, f_global_indices in file_to_indices.items():
            f_info, pf = self.index.files[f_idx], self._get_file_handle(f_idx)
            # Simplified condition check via pyarrow
            table = pf.read(columns=[self.mapper.get_source_column(c) for c in column_conditions.keys() if self.mapper.get_source_column(c) in f_info.columns])
            file_mask = None
            for col, cond in column_conditions.items():
                src_col = self.mapper.get_source_column(col, f_info.path)
                if src_col not in table.column_names:
                    c_mask = pa.scalar(None, type=pa.bool_())
                else:
                    arr = table.column(src_col)
                    if isinstance(cond, tuple):
                        op, val = cond
                        if op == '==': c_mask = pc.equal(arr, val)
                        elif op == '>': c_mask = pc.greater(arr, val)
                        elif op == '>=': c_mask = pc.greater_equal(arr, val)
                        elif op == '<': c_mask = pc.less(arr, val)
                        elif op == '<=': c_mask = pc.less_equal(arr, val)
                        else: c_mask = None
                    else:
                        c_mask = pc.equal(arr, cond)

                if c_mask is not None:
                    if file_mask is None: file_mask = c_mask
                    else: file_mask = pc.and_(file_mask, c_mask)

            if file_mask is None:
                file_mask_np = np.ones(len(table), dtype=bool)
            else:
                file_mask_np = pc.fill_null(file_mask, False).to_numpy().astype(bool)

            f_local_indices = (np.array(f_global_indices) - self.file_offsets[f_idx]).astype(int)
            new_indices_list.append(np.array(f_global_indices)[file_mask_np[f_local_indices]])
        current_indices = np.concatenate(new_indices_list) if new_indices_list else np.array([], dtype=int)

    if len(current_indices) > 0 and predicate:
        temp_ds = self._clone_with_indices(current_indices)
        mask = [predicate(temp_ds[i]) for i in range(len(temp_ds))]
        current_indices = current_indices[np.array(mask)]

    return self._clone_with_indices(current_indices)

merge(other)

Merges this dataset with another one, deduplicating identical rows.

A row is considered identical if it comes from the same file and has the same local row index.

Source code in src\indexed_parquet\dataset.py
def merge(self, other: 'IndexedParquetDataset') -> 'IndexedParquetDataset':
    """Merges this dataset with another one, deduplicating identical rows.

    A row is considered identical if it comes from the same file and has
    the same local row index.
    """
    # 1. Unified unique files
    self_paths = {f.path: i for i, f in enumerate(self.index.files)}
    other_paths = {f.path: i for i, f in enumerate(other.index.files)}

    all_unique_paths = sorted(list(set(self_paths.keys()) | set(other_paths.keys())))
    path_to_new_idx = {path: i for i, path in enumerate(all_unique_paths)}

    new_files_info = []
    for path in all_unique_paths:
        if path in self_paths:
            new_files_info.append(self.index.files[self_paths[path]])
        else:
            new_files_info.append(other.index.files[other_paths[path]])

    # 2. Map indices to row identity (new_file_idx, local_idx)
    def get_row_identities(ds, path_map):
        ids = []
        for i in range(len(ds)):
            f_idx, l_idx = ds._get_file_and_local_idx(i)
            f_path = ds.index.files[f_idx].path
            new_f_idx = path_map[f_path]
            ids.append((new_f_idx, l_idx))
        return ids

    self_ids = get_row_identities(self, path_to_new_idx)
    other_ids = get_row_identities(other, path_to_new_idx)

    # 3. Deduplicate preserving order (self first, then new from other)
    # We use a dictionary as an ordered set
    seen = {}
    unified_ids = []
    for row_id in (self_ids + other_ids):
        if row_id not in seen:
            seen[row_id] = True
            unified_ids.append(row_id)

    # 4. Create new BaseIndex metadata
    new_total_rows_meta = sum(f.num_rows for f in new_files_info)
    new_all_columns = sorted(list(set(self.index.all_columns) | set(other.index.all_columns)))

    # 5. Type merging logic (upcasting conflicts)
    new_column_types = self.index.column_types.copy()
    type_casts = self._type_casts.copy()
    for col, o_type in other.index.column_types.items():
        if col in new_column_types:
            s_type = new_column_types[col]
            if s_type != o_type:
                common_type = 'string'
                s_is_float = 'float' in s_type or 'double' in s_type
                o_is_float = 'float' in o_type or 'double' in o_type
                s_is_int = 'int' in s_type
                o_is_int = 'int' in o_type

                if (s_is_int and o_is_float) or (s_is_float and o_is_int) or (s_is_float and o_is_float):
                    common_type = 'double'

                warnings.warn(f"Type mismatch for column '{col}': {s_type} vs {o_type}. Upcasting to {common_type}.")
                new_column_types[col] = common_type
                cast_fn = str if common_type == 'string' else (float if common_type == 'double' else None)
                if cast_fn: type_casts[col] = cast_fn
        else:
            new_column_types[col] = o_type

    new_index = BaseIndex(new_files_info, new_total_rows_meta, new_all_columns, new_column_types)

    # 6. Re-calculate global indices based on new file sequence
    new_file_offsets = np.zeros(len(new_files_info) + 1, dtype=int)
    current_offset = 0
    for i, f in enumerate(new_files_info):
        new_file_offsets[i] = current_offset
        current_offset += f.num_rows
    new_file_offsets[-1] = current_offset

    new_indices = np.array([new_file_offsets[f_idx] + l_idx for f_idx, l_idx in unified_ids], dtype=int)

    # 7. Merge mappers
    new_mapper = self.mapper.merge(
        other.mapper,
        list(self_paths.keys()),
        list(other_paths.keys())
    )

    return IndexedParquetDataset(
        new_index, new_indices, new_mapper,
        self.include_source_column, self.source_column_name,
        self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
        max_open_files=self.max_open_files,
        _type_casts=type_casts,
        selected_columns=self.selected_columns
    )

copy()

Returns a copy of the dataset.

Source code in src\indexed_parquet\dataset.py
def copy(self) -> 'IndexedParquetDataset':
    """Returns a copy of the dataset."""
    return self._clone_with_indices(self.indices.copy())

clone(path)

Materializes all computations and returns a new dataset instance.

Source code in src\indexed_parquet\dataset.py
def clone(self, path: str) -> 'IndexedParquetDataset':
    """Materializes all computations and returns a new dataset instance."""
    self.to_parquet(path)
    return IndexedParquetDataset.from_folder(os.path.dirname(path), pattern=os.path.basename(path))

to_parquet(path, chunk_size=1024, shard_size=None)

Materializes the dataset to one or more Parquet files.

Parameters:

Name Type Description Default
path str

Output file path or directory (if sharding).

required
chunk_size int

Cache size for intermediate batch collection.

1024
shard_size Optional[int]

If set, splits the dataset into multiple files of this many rows.

None
Source code in src\indexed_parquet\dataset.py
def to_parquet(self, path: str, chunk_size: int = 1024, shard_size: Optional[int] = None):
    """Materializes the dataset to one or more Parquet files.

    Args:
        path: Output file path or directory (if sharding).
        chunk_size: Cache size for intermediate batch collection.
        shard_size: If set, splits the dataset into multiple files of this many rows.
    """
    if shard_size:
        os.makedirs(path, exist_ok=True)

    writer = None
    rows_in_current_shard = 0
    shard_idx = 0

    def get_shard_path():
        if shard_size:
            return os.path.join(path, f"part_{shard_idx:04d}.parquet")
        return path

    effective_chunk_size = min(chunk_size, shard_size) if shard_size else chunk_size

    for i in range(0, len(self), effective_chunk_size):
        batch_indices = list(range(i, min(i + effective_chunk_size, len(self))))
        batch_data = self[batch_indices]

        if not batch_data: continue

        table = pa.Table.from_pylist(batch_data)

        # Sharding logic: if we are at the start of a new shard, close old and open new
        if shard_size and rows_in_current_shard >= shard_size:
            if writer:
                writer.close()
                writer = None
            shard_idx += 1
            rows_in_current_shard = 0

        if writer is None:
            writer = pq.ParquetWriter(get_shard_path(), table.schema)

        writer.write_table(table)
        rows_in_current_shard += len(batch_data)

    if writer:
        writer.close()

iter_batches(batch_size, shuffle=False, seed=None)

Yields batches of data from the dataset.

Parameters:

Name Type Description Default
batch_size int

Number of rows per batch.

required
shuffle bool

Whether to shuffle before iterating.

False
seed Optional[int]

Random seed for shuffling.

None
Source code in src\indexed_parquet\dataset.py
def iter_batches(self, batch_size: int, shuffle: bool = False, seed: Optional[int] = None):
    """Yields batches of data from the dataset.

    Args:
        batch_size: Number of rows per batch.
        shuffle: Whether to shuffle before iterating.
        seed: Random seed for shuffling.
    """
    ds = self.shuffle(seed) if shuffle else self
    n = len(ds)
    for i in range(0, n, batch_size):
        end = min(i + batch_size, n)
        yield ds[i:end]

generate_collate_fn(on_none='raise')

Returns a DataLoader-compatible collate function with robust None handling.

Parameters:

Name Type Description Default
on_none str

Strategy for handling None values. 'raise' (default): Raises a descriptive TypeError. 'drop': Drops samples containing None from the batch. 'fill': Replaces None with 0/"" based on fill_values configuration.

'raise'

Raises:

Type Description
ImportError

If PyTorch is not installed.

Source code in src\indexed_parquet\dataset.py
def generate_collate_fn(self, on_none: str = 'raise') -> Callable:
    """Returns a DataLoader-compatible collate function with robust None handling.

    Args:
        on_none: Strategy for handling None values.
            'raise' (default): Raises a descriptive TypeError.
            'drop': Drops samples containing None from the batch.
            'fill': Replaces None with 0/"" based on fill_values configuration.

    Raises:
        ImportError: If PyTorch is not installed.
    """
    if not _TORCH_AVAILABLE:
        raise ImportError(
            "PyTorch is required for generate_collate_fn. "
            "Install it with: pip install torch"
        )
    # Pre-compute fill map: {column_name -> fill_value} for all known columns.
    # This avoids storing a reference to the full dataset inside CollateHandler.
    fill_map = {col: self._get_fill_value(col) for col in self.schema}
    return CollateHandler(fill_map, on_none)

CollateHandler

A helper class for correct batch collection in DataLoader. Use it via the ds.generate_collate_fn() method.

indexed_parquet.dataset.CollateHandler

Picklable helper for batch collation.

Stores only a pre-computed fill_map dict instead of a reference to the full dataset, so DataLoader workers (num_workers > 0) receive a minimal copy rather than the entire dataset index.

Source code in src\indexed_parquet\dataset.py
class CollateHandler:
    """Picklable helper for batch collation.

    Stores only a pre-computed fill_map dict instead of a reference to the
    full dataset, so DataLoader workers (num_workers > 0) receive a minimal
    copy rather than the entire dataset index.
    """

    def __init__(self, fill_map: Dict[str, Any], on_none: str):
        self.fill_map = fill_map  # {column_name -> fill_value}
        self.on_none = on_none

    def __call__(self, batch):
        from torch.utils.data._utils.collate import default_collate

        clean_batch = batch
        if self.on_none in ('drop', 'fill'):
            new_batch = []
            for item in batch:
                has_none = any(v is None for v in item.values())
                if has_none:
                    if self.on_none == 'drop':
                        continue
                    else:  # fill
                        item = item.copy()
                        for k, v in item.items():
                            if v is None:
                                item[k] = self.fill_map.get(k)
                new_batch.append(item)
            clean_batch = new_batch

        if not clean_batch:
            return {}

        try:
            return default_collate(clean_batch)
        except TypeError as e:
            if 'NoneType' in str(e):
                for i, item in enumerate(batch):
                    for k, v in item.items():
                        if v is None:
                            raise TypeError(
                                f"Batch collation failed: column '{k}' contains None at batch index {i}.\n"
                                f"PyTorch DataLoader cannot handle None values.\n"
                                f"Fix: Set 'auto_fill=True' or provide 'fill_values' when initializing IndexedParquetDataset."
                            ) from None
            raise e

SchemaMapper

The internal class responsible for schema transformations and calculated columns.

indexed_parquet.schema.SchemaMapper

Handles column mapping and aliasing for the dataset.

Source code in src\indexed_parquet\schema.py
class SchemaMapper:
    """Handles column mapping and aliasing for the dataset."""

    def __init__(
        self, 
        mapping: Optional[Dict[str, str]] = None, 
        file_mappings: Optional[Dict[str, Dict[str, str]]] = None,
        transforms: Optional[Dict[str, Callable]] = None,
        row_transforms: Optional[List[Callable[[dict], dict]]] = None
    ):
        """Initializes the SchemaMapper.

        Args:
            mapping: Global mapping (original name -> target name).
            file_mappings: File-specific mappings (file path -> {original -> target}).
            transforms: Global transformations (target name -> function(row)).
            row_transforms: Row-level transformations (list of functions function(row) -> row).
        """
        self.mapping = mapping if mapping is not None else {}
        self.file_mappings = file_mappings if file_mappings is not None else {}
        self.transforms = transforms if transforms is not None else {}
        self.row_transforms = row_transforms if row_transforms is not None else []
        self._rebuild_reverse_mapping()

    def _rebuild_reverse_mapping(self) -> None:
        """Rebuilds the reverse mapping for fast lookups."""
        self.reverse_mapping = {v: k for k, v in self.mapping.items()}

    def map_columns(self, data: Dict[str, Any], file_path: Optional[str] = None) -> Dict[str, Any]:
        """Renames columns in the input data according to global and file-specific mappings.

        Args:
            data: Raw dictionary of column values.
            file_path: Optional path to the file from which data was read.

        Returns:
            A dictionary with mapped column names.
        """
        # Apply file-specific mapping first if available
        current_data = data
        if file_path and file_path in self.file_mappings:
            f_map = self.file_mappings[file_path]
            current_data = {f_map.get(k, k): v for k, v in current_data.items()}

        if not self.mapping:
            mapped_data = current_data.copy()
        else:
            mapped_data = {}
            for col, val in current_data.items():
                target_name = self.mapping.get(col, col)
                mapped_data[target_name] = val

        # Apply transforms (computed columns)
        if not self.transforms:
            return mapped_data

        for target_name, transform in self.transforms.items():
            try:
                mapped_data[target_name] = transform(mapped_data)
            except Exception:
                pass

        return mapped_data

    def get_source_column(self, target_column: str, file_path: Optional[str] = None) -> str:
        """Returns the original column name for a given target name.

        Args:
            target_column: The mapped name of the column.
            file_path: Optional path to the file.

        Returns:
            The original column name.
        """
        return self.reverse_mapping.get(target_column, target_column)

    def select_source_columns(self, target_columns: List[str]) -> List[str]:
        """Returns a list of original column names required for the requested target columns."""
        return [self.get_source_column(col) for col in target_columns]

    def to_dict(self) -> Dict[str, Any]:
        """Converts the mapper state to a dictionary."""
        return {
            "mapping": self.mapping,
            "file_mappings": self.file_mappings,
            "transforms": self.transforms,
            "row_transforms": self.row_transforms
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'SchemaMapper':
        """Creates a SchemaMapper from a dictionary."""
        return cls(
            mapping=data.get("mapping"),
            file_mappings=data.get("file_mappings"),
            transforms=data.get("transforms"),
            row_transforms=data.get("row_transforms")
        )

    def merge(self, other: 'SchemaMapper', self_files: List[str], other_files: List[str]) -> 'SchemaMapper':
        """Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

        Args:
            other: The other SchemaMapper to merge.
            self_files: List of absolute file paths belonging to the current dataset.
            other_files: List of absolute file paths belonging to the other dataset.

        Returns:
            A new merged SchemaMapper.
        """
        new_file_mappings = self.file_mappings.copy()
        new_file_mappings.update(other.file_mappings)

        new_global_mapping = self.mapping.copy()

        for src_col, target_col in other.mapping.items():
            if src_col in new_global_mapping:
                if new_global_mapping[src_col] != target_col:
                    # Conflict: same source column, different targets.
                    # We preserve the alias for 'other' files by moving it to file_mappings.
                    for f_path in other_files:
                        if f_path not in new_file_mappings:
                            new_file_mappings[f_path] = {}
                        # Only set if not already present in file_mappings
                        if src_col not in new_file_mappings[f_path]:
                            new_file_mappings[f_path][src_col] = target_col
                # If the targets match, no conflict at global level.
            else:
                # No conflict with current global mapping, can add safely.
                new_global_mapping[src_col] = target_col

        new_transforms = self.transforms.copy()
        new_transforms.update(other.transforms)

        new_row_transforms = self.row_transforms + other.row_transforms

        return SchemaMapper(new_global_mapping, new_file_mappings, new_transforms, new_row_transforms)

    def __repr__(self) -> str:
        return f"SchemaMapper(mapping={self.mapping}, file_mappings={self.file_mappings})"

Functions

__init__(mapping=None, file_mappings=None, transforms=None, row_transforms=None)

Initializes the SchemaMapper.

Parameters:

Name Type Description Default
mapping Optional[Dict[str, str]]

Global mapping (original name -> target name).

None
file_mappings Optional[Dict[str, Dict[str, str]]]

File-specific mappings (file path -> {original -> target}).

None
transforms Optional[Dict[str, Callable]]

Global transformations (target name -> function(row)).

None
row_transforms Optional[List[Callable[[dict], dict]]]

Row-level transformations (list of functions function(row) -> row).

None
Source code in src\indexed_parquet\schema.py
def __init__(
    self, 
    mapping: Optional[Dict[str, str]] = None, 
    file_mappings: Optional[Dict[str, Dict[str, str]]] = None,
    transforms: Optional[Dict[str, Callable]] = None,
    row_transforms: Optional[List[Callable[[dict], dict]]] = None
):
    """Initializes the SchemaMapper.

    Args:
        mapping: Global mapping (original name -> target name).
        file_mappings: File-specific mappings (file path -> {original -> target}).
        transforms: Global transformations (target name -> function(row)).
        row_transforms: Row-level transformations (list of functions function(row) -> row).
    """
    self.mapping = mapping if mapping is not None else {}
    self.file_mappings = file_mappings if file_mappings is not None else {}
    self.transforms = transforms if transforms is not None else {}
    self.row_transforms = row_transforms if row_transforms is not None else []
    self._rebuild_reverse_mapping()

from_dict(data) classmethod

Creates a SchemaMapper from a dictionary.

Source code in src\indexed_parquet\schema.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SchemaMapper':
    """Creates a SchemaMapper from a dictionary."""
    return cls(
        mapping=data.get("mapping"),
        file_mappings=data.get("file_mappings"),
        transforms=data.get("transforms"),
        row_transforms=data.get("row_transforms")
    )

get_source_column(target_column, file_path=None)

Returns the original column name for a given target name.

Parameters:

Name Type Description Default
target_column str

The mapped name of the column.

required
file_path Optional[str]

Optional path to the file.

None

Returns:

Type Description
str

The original column name.

Source code in src\indexed_parquet\schema.py
def get_source_column(self, target_column: str, file_path: Optional[str] = None) -> str:
    """Returns the original column name for a given target name.

    Args:
        target_column: The mapped name of the column.
        file_path: Optional path to the file.

    Returns:
        The original column name.
    """
    return self.reverse_mapping.get(target_column, target_column)

map_columns(data, file_path=None)

Renames columns in the input data according to global and file-specific mappings.

Parameters:

Name Type Description Default
data Dict[str, Any]

Raw dictionary of column values.

required
file_path Optional[str]

Optional path to the file from which data was read.

None

Returns:

Type Description
Dict[str, Any]

A dictionary with mapped column names.

Source code in src\indexed_parquet\schema.py
def map_columns(self, data: Dict[str, Any], file_path: Optional[str] = None) -> Dict[str, Any]:
    """Renames columns in the input data according to global and file-specific mappings.

    Args:
        data: Raw dictionary of column values.
        file_path: Optional path to the file from which data was read.

    Returns:
        A dictionary with mapped column names.
    """
    # Apply file-specific mapping first if available
    current_data = data
    if file_path and file_path in self.file_mappings:
        f_map = self.file_mappings[file_path]
        current_data = {f_map.get(k, k): v for k, v in current_data.items()}

    if not self.mapping:
        mapped_data = current_data.copy()
    else:
        mapped_data = {}
        for col, val in current_data.items():
            target_name = self.mapping.get(col, col)
            mapped_data[target_name] = val

    # Apply transforms (computed columns)
    if not self.transforms:
        return mapped_data

    for target_name, transform in self.transforms.items():
        try:
            mapped_data[target_name] = transform(mapped_data)
        except Exception:
            pass

    return mapped_data

merge(other, self_files, other_files)

Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

Parameters:

Name Type Description Default
other 'SchemaMapper'

The other SchemaMapper to merge.

required
self_files List[str]

List of absolute file paths belonging to the current dataset.

required
other_files List[str]

List of absolute file paths belonging to the other dataset.

required

Returns:

Type Description
'SchemaMapper'

A new merged SchemaMapper.

Source code in src\indexed_parquet\schema.py
def merge(self, other: 'SchemaMapper', self_files: List[str], other_files: List[str]) -> 'SchemaMapper':
    """Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

    Args:
        other: The other SchemaMapper to merge.
        self_files: List of absolute file paths belonging to the current dataset.
        other_files: List of absolute file paths belonging to the other dataset.

    Returns:
        A new merged SchemaMapper.
    """
    new_file_mappings = self.file_mappings.copy()
    new_file_mappings.update(other.file_mappings)

    new_global_mapping = self.mapping.copy()

    for src_col, target_col in other.mapping.items():
        if src_col in new_global_mapping:
            if new_global_mapping[src_col] != target_col:
                # Conflict: same source column, different targets.
                # We preserve the alias for 'other' files by moving it to file_mappings.
                for f_path in other_files:
                    if f_path not in new_file_mappings:
                        new_file_mappings[f_path] = {}
                    # Only set if not already present in file_mappings
                    if src_col not in new_file_mappings[f_path]:
                        new_file_mappings[f_path][src_col] = target_col
            # If the targets match, no conflict at global level.
        else:
            # No conflict with current global mapping, can add safely.
            new_global_mapping[src_col] = target_col

    new_transforms = self.transforms.copy()
    new_transforms.update(other.transforms)

    new_row_transforms = self.row_transforms + other.row_transforms

    return SchemaMapper(new_global_mapping, new_file_mappings, new_transforms, new_row_transforms)

select_source_columns(target_columns)

Returns a list of original column names required for the requested target columns.

Source code in src\indexed_parquet\schema.py
def select_source_columns(self, target_columns: List[str]) -> List[str]:
    """Returns a list of original column names required for the requested target columns."""
    return [self.get_source_column(col) for col in target_columns]

to_dict()

Converts the mapper state to a dictionary.

Source code in src\indexed_parquet\schema.py
def to_dict(self) -> Dict[str, Any]:
    """Converts the mapper state to a dictionary."""
    return {
        "mapping": self.mapping,
        "file_mappings": self.file_mappings,
        "transforms": self.transforms,
        "row_transforms": self.row_transforms
    }

BaseIndex

The class that stores index metadata (file offsets, row counts).

indexed_parquet.indexer.BaseIndex dataclass

Metadata about the entire dataset.

Attributes:

Name Type Description
files List[FileInfo]

List of FileInfo objects for all files in the dataset.

total_rows int

Combined number of rows across all files.

all_columns List[str]

Sorted list of all unique columns found across all files.

column_types Dict[str, str]

Dict mapping column names to their PyArrow type as a string.

Source code in src\indexed_parquet\indexer.py
@dataclass
class BaseIndex:
    """Metadata about the entire dataset.

    Attributes:
        files: List of FileInfo objects for all files in the dataset.
        total_rows: Combined number of rows across all files.
        all_columns: Sorted list of all unique columns found across all files.
        column_types: Dict mapping column names to their PyArrow type as a string.
    """
    files: List[FileInfo]
    total_rows: int
    all_columns: List[str]
    column_types: Dict[str, str]