Coverage for src / autoencodix / utils / _bulkreader.py: 12%

98 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-21 10:09 +0200

1import os 

2import warnings 

3from typing import Dict, Set, Tuple, Optional, Union 

4 

5import pandas as pd 

6 

7from autoencodix.configs.default_config import DefaultConfig 

8 

9 

10class BulkDataReader: 

11 """Reads bulk data from files based on configuration. 

12 

13 Supports both paired and unpaired data reading strategies. 

14 

15 Attributes: 

16 config: Configuration object 

17 """ 

18 

19 def __init__(self, config: DefaultConfig): 

20 """Initialize the BulkDataReader with a configuration. 

21 

22 Args: 

23 config: Configuration object containing data paths and specifications. 

24 """ 

25 self.config = config 

26 

27 def read_data(self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: 

28 """Read all data according to the configuration. 

29 

30 Returns: 

31 A tuple containing (bulk_dataframes, annotation_dataframes) 

32 """ 

33 if self.config.requires_paired or self.config.requires_paired is None: 

34 return self.read_paired_data() 

35 else: 

36 return self.read_unpaired_data() 

37 

38 def read_paired_data( 

39 self, 

40 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: 

41 """Reads numeric paired data 

42 

43 Returns: 

44 Tuple containing two Dicts: 

45 1. with name of the data as key and pandas DataFrame as value 

46 2. with str 'paired' as key and a common annotaion/metadata as DataFrame 

47 """ 

48 common_samples: Optional[Set[str]] = None 

49 bulk_dfs: Dict[str, pd.DataFrame] = {} 

50 annotation_df = pd.DataFrame() 

51 has_annotation = False 

52 

53 # First pass: read all data files and track common samples 

54 for key, info in self.config.data_config.data_info.items(): 

55 if info.data_type == "IMG": 

56 continue 

57 

58 file_path = os.path.join(info.file_path) 

59 df = self._read_tabular_data(file_path, info.sep or "\t") 

60 

61 if df is None: 

62 continue 

63 

64 if info.data_type == "NUMERIC" and not info.is_single_cell: 

65 current_samples = set(df.index) 

66 if common_samples is None: 

67 common_samples = current_samples 

68 else: 

69 common_samples &= current_samples 

70 

71 bulk_dfs[key] = df 

72 

73 elif info.data_type == "ANNOTATION": 

74 has_annotation = True 

75 annotation_df = df 

76 

77 # Second pass: filter to common samples 

78 if common_samples: 

79 common_samples_list = list(common_samples) 

80 

81 # Reindex bulk dataframes to common samples 

82 for key in bulk_dfs: 

83 bulk_dfs[key] = bulk_dfs[key].reindex(common_samples_list) 

84 

85 # Handle annotation dataframe 

86 if has_annotation: 

87 annotation = annotation_df.reindex(common_samples_list) 

88 else: 

89 # Create empty annotation with common sample indices 

90 annotation_df = pd.DataFrame(index=common_samples_list) 

91 annotation = annotation_df 

92 else: 

93 print("Warning: No common samples found across datasets") 

94 annotation = annotation_df 

95 

96 return bulk_dfs, {"paired": annotation} 

97 

98 def read_unpaired_data( 

99 self, 

100 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: 

101 """Read data without enforcing sample alignment across modalities. 

102 

103 Returns: 

104 A tuple containing (bulk_dataframes, annotation_dataframes) 

105 """ 

106 bulk_dfs: Dict[str, pd.DataFrame] = {} 

107 annotations: Dict[str, pd.DataFrame] = {} 

108 

109 for key, info in self.config.data_config.data_info.items(): 

110 if info.data_type == "IMG" or info.is_single_cell: 

111 continue # Skip image and single-cell data 

112 

113 # Read main data file 

114 file_path = os.path.join(info.file_path) 

115 df = self._read_tabular_data(file_path=file_path, sep=info.sep) 

116 

117 if df is None: 

118 continue 

119 

120 if info.data_type == "NUMERIC": 

121 bulk_dfs[key] = df 

122 

123 if hasattr(info, "extra_anno_file") and info.extra_anno_file: 

124 extra_anno_file = os.path.join(info.extra_anno_file) 

125 extra_anno_df = self._read_tabular_data( 

126 file_path=extra_anno_file, sep=info.sep 

127 ) 

128 if extra_anno_df is not None: 

129 annotations[key] = extra_anno_df 

130 

131 elif info.data_type == "ANNOTATION": 

132 annotations[key] = df 

133 

134 bulk_dfs, annotations = self._validate_and_filter_unpaired( 

135 bulk_dfs, annotations 

136 ) 

137 

138 return bulk_dfs, annotations 

139 

140 def _validate_and_filter_unpaired( 

141 self, 

142 bulk_dfs: Dict[str, pd.DataFrame], 

143 annotations: Dict[str, pd.DataFrame], 

144 ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: 

145 """Validates that all samples in bulk data have a corresponding annotation. 

146 

147 If a single global annotation file is provided, it creates a perfectly 

148 matched annotation dataframe for each bulk dataframe. 

149 

150 Warns and drops samples that do not have a corresponding annotation. 

151 

152 Args: 

153 bulk_dfs: Dictionary of bulk data modalities and their dataframes. 

154 annotations: Dictionary of annotation dataframes, possibly one global one. 

155 

156 Returns: 

157 A tuple of two dictionaries: 

158 1. The filtered bulk dataframes. 

159 2. The new, synchronized annotation dataframes, with keys matching the bulk dataframes. 

160 """ 

161 if not annotations: 

162 warnings.warn( 

163 "No annotation files were provided. Cannot validate sample annotations." 

164 ) 

165 return bulk_dfs, {} 

166 

167 # If annotations have keys that match bulk_dfs, we assume they are already paired. 

168 # This logic focuses on the case where one annotation file is meant for all bulk files. 

169 # A simple heuristic: if there is one annotation file and its key is not in bulk_dfs. 

170 annotation_keys = set(annotations.keys()) 

171 bulk_keys = set(bulk_dfs.keys()) 

172 

173 # Check for the global annotation case 

174 if len(annotation_keys) == 1 and not annotation_keys.intersection(bulk_keys): 

175 global_annotation_key = list(annotation_keys)[0] 

176 global_annotation_df = annotations[global_annotation_key] 

177 

178 filtered_bulk_dfs = {} 

179 synchronized_annotations = {} 

180 

181 for key, data_df in bulk_dfs.items(): 

182 data_samples = data_df.index 

183 annotation_samples = global_annotation_df.index 

184 

185 # Find the intersection of valid sample IDs 

186 valid_ids = data_samples.intersection(annotation_samples) 

187 

188 # Check for and warn about dropped samples 

189 if len(valid_ids) < len(data_samples): 

190 missing_ids = sorted(list(set(data_samples) - set(valid_ids))) 

191 warnings.warn( 

192 f"For data modality '{key}', {len(missing_ids)} sample(s) " 

193 f"were found without a corresponding annotation and will be dropped: {missing_ids}" 

194 ) 

195 

196 # Filter both the data and the annotation to the valid IDs 

197 filtered_bulk_dfs[key] = data_df.loc[valid_ids] 

198 synchronized_annotations[key] = global_annotation_df.loc[valid_ids] 

199 

200 return filtered_bulk_dfs, synchronized_annotations 

201 else: 

202 # Handle the case where annotations are already meant to be paired by key 

203 # (Or a more complex case we are not handling yet) 

204 warnings.warn( 

205 "Proceeding without global annotation synchronization. Assuming annotations are pre-aligned by key." 

206 ) 

207 return bulk_dfs, annotations 

208 

209 def _read_tabular_data( 

210 self, file_path: str, sep: Union[str, None] = None 

211 ) -> pd.DataFrame: 

212 """Read tabular data from a file with error handling. 

213 

214 Args: 

215 file_path: Path to the data file. 

216 sep: Separator character for CSV/TSV files. 

217 

218 Returns: 

219 The loaded DataFrame. 

220 """ 

221 try: 

222 if file_path.endswith(".parquet"): 

223 print(f"reading parquet: {file_path}") 

224 return pd.read_parquet(file_path) 

225 elif file_path.endswith((".csv", ".txt", ".tsv")): 

226 return pd.read_csv(file_path, sep=sep, index_col=0) 

227 else: 

228 raise ValueError( 

229 f"Unsupported file type for {file_path}. Supported formats: .parquet, .csv, .txt, .tsv" 

230 ) 

231 except Exception as e: 

232 raise e