Coverage for src / autoencodix / utils / _bulkreader.py: 12%
98 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:09 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-21 10:09 +0200
1import os
2import warnings
3from typing import Dict, Set, Tuple, Optional, Union
5import pandas as pd
7from autoencodix.configs.default_config import DefaultConfig
10class BulkDataReader:
11 """Reads bulk data from files based on configuration.
13 Supports both paired and unpaired data reading strategies.
15 Attributes:
16 config: Configuration object
17 """
19 def __init__(self, config: DefaultConfig):
20 """Initialize the BulkDataReader with a configuration.
22 Args:
23 config: Configuration object containing data paths and specifications.
24 """
25 self.config = config
27 def read_data(self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
28 """Read all data according to the configuration.
30 Returns:
31 A tuple containing (bulk_dataframes, annotation_dataframes)
32 """
33 if self.config.requires_paired or self.config.requires_paired is None:
34 return self.read_paired_data()
35 else:
36 return self.read_unpaired_data()
38 def read_paired_data(
39 self,
40 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
41 """Reads numeric paired data
43 Returns:
44 Tuple containing two Dicts:
45 1. with name of the data as key and pandas DataFrame as value
46 2. with str 'paired' as key and a common annotaion/metadata as DataFrame
47 """
48 common_samples: Optional[Set[str]] = None
49 bulk_dfs: Dict[str, pd.DataFrame] = {}
50 annotation_df = pd.DataFrame()
51 has_annotation = False
53 # First pass: read all data files and track common samples
54 for key, info in self.config.data_config.data_info.items():
55 if info.data_type == "IMG":
56 continue
58 file_path = os.path.join(info.file_path)
59 df = self._read_tabular_data(file_path, info.sep or "\t")
61 if df is None:
62 continue
64 if info.data_type == "NUMERIC" and not info.is_single_cell:
65 current_samples = set(df.index)
66 if common_samples is None:
67 common_samples = current_samples
68 else:
69 common_samples &= current_samples
71 bulk_dfs[key] = df
73 elif info.data_type == "ANNOTATION":
74 has_annotation = True
75 annotation_df = df
77 # Second pass: filter to common samples
78 if common_samples:
79 common_samples_list = list(common_samples)
81 # Reindex bulk dataframes to common samples
82 for key in bulk_dfs:
83 bulk_dfs[key] = bulk_dfs[key].reindex(common_samples_list)
85 # Handle annotation dataframe
86 if has_annotation:
87 annotation = annotation_df.reindex(common_samples_list)
88 else:
89 # Create empty annotation with common sample indices
90 annotation_df = pd.DataFrame(index=common_samples_list)
91 annotation = annotation_df
92 else:
93 print("Warning: No common samples found across datasets")
94 annotation = annotation_df
96 return bulk_dfs, {"paired": annotation}
98 def read_unpaired_data(
99 self,
100 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
101 """Read data without enforcing sample alignment across modalities.
103 Returns:
104 A tuple containing (bulk_dataframes, annotation_dataframes)
105 """
106 bulk_dfs: Dict[str, pd.DataFrame] = {}
107 annotations: Dict[str, pd.DataFrame] = {}
109 for key, info in self.config.data_config.data_info.items():
110 if info.data_type == "IMG" or info.is_single_cell:
111 continue # Skip image and single-cell data
113 # Read main data file
114 file_path = os.path.join(info.file_path)
115 df = self._read_tabular_data(file_path=file_path, sep=info.sep)
117 if df is None:
118 continue
120 if info.data_type == "NUMERIC":
121 bulk_dfs[key] = df
123 if hasattr(info, "extra_anno_file") and info.extra_anno_file:
124 extra_anno_file = os.path.join(info.extra_anno_file)
125 extra_anno_df = self._read_tabular_data(
126 file_path=extra_anno_file, sep=info.sep
127 )
128 if extra_anno_df is not None:
129 annotations[key] = extra_anno_df
131 elif info.data_type == "ANNOTATION":
132 annotations[key] = df
134 bulk_dfs, annotations = self._validate_and_filter_unpaired(
135 bulk_dfs, annotations
136 )
138 return bulk_dfs, annotations
140 def _validate_and_filter_unpaired(
141 self,
142 bulk_dfs: Dict[str, pd.DataFrame],
143 annotations: Dict[str, pd.DataFrame],
144 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
145 """Validates that all samples in bulk data have a corresponding annotation.
147 If a single global annotation file is provided, it creates a perfectly
148 matched annotation dataframe for each bulk dataframe.
150 Warns and drops samples that do not have a corresponding annotation.
152 Args:
153 bulk_dfs: Dictionary of bulk data modalities and their dataframes.
154 annotations: Dictionary of annotation dataframes, possibly one global one.
156 Returns:
157 A tuple of two dictionaries:
158 1. The filtered bulk dataframes.
159 2. The new, synchronized annotation dataframes, with keys matching the bulk dataframes.
160 """
161 if not annotations:
162 warnings.warn(
163 "No annotation files were provided. Cannot validate sample annotations."
164 )
165 return bulk_dfs, {}
167 # If annotations have keys that match bulk_dfs, we assume they are already paired.
168 # This logic focuses on the case where one annotation file is meant for all bulk files.
169 # A simple heuristic: if there is one annotation file and its key is not in bulk_dfs.
170 annotation_keys = set(annotations.keys())
171 bulk_keys = set(bulk_dfs.keys())
173 # Check for the global annotation case
174 if len(annotation_keys) == 1 and not annotation_keys.intersection(bulk_keys):
175 global_annotation_key = list(annotation_keys)[0]
176 global_annotation_df = annotations[global_annotation_key]
178 filtered_bulk_dfs = {}
179 synchronized_annotations = {}
181 for key, data_df in bulk_dfs.items():
182 data_samples = data_df.index
183 annotation_samples = global_annotation_df.index
185 # Find the intersection of valid sample IDs
186 valid_ids = data_samples.intersection(annotation_samples)
188 # Check for and warn about dropped samples
189 if len(valid_ids) < len(data_samples):
190 missing_ids = sorted(list(set(data_samples) - set(valid_ids)))
191 warnings.warn(
192 f"For data modality '{key}', {len(missing_ids)} sample(s) "
193 f"were found without a corresponding annotation and will be dropped: {missing_ids}"
194 )
196 # Filter both the data and the annotation to the valid IDs
197 filtered_bulk_dfs[key] = data_df.loc[valid_ids]
198 synchronized_annotations[key] = global_annotation_df.loc[valid_ids]
200 return filtered_bulk_dfs, synchronized_annotations
201 else:
202 # Handle the case where annotations are already meant to be paired by key
203 # (Or a more complex case we are not handling yet)
204 warnings.warn(
205 "Proceeding without global annotation synchronization. Assuming annotations are pre-aligned by key."
206 )
207 return bulk_dfs, annotations
209 def _read_tabular_data(
210 self, file_path: str, sep: Union[str, None] = None
211 ) -> pd.DataFrame:
212 """Read tabular data from a file with error handling.
214 Args:
215 file_path: Path to the data file.
216 sep: Separator character for CSV/TSV files.
218 Returns:
219 The loaded DataFrame.
220 """
221 try:
222 if file_path.endswith(".parquet"):
223 print(f"reading parquet: {file_path}")
224 return pd.read_parquet(file_path)
225 elif file_path.endswith((".csv", ".txt", ".tsv")):
226 return pd.read_csv(file_path, sep=sep, index_col=0)
227 else:
228 raise ValueError(
229 f"Unsupported file type for {file_path}. Supported formats: .parquet, .csv, .txt, .tsv"
230 )
231 except Exception as e:
232 raise e