Coverage for src/instawell/processing/step_01_ingest_data.py: 94%

86 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-07 15:47 -0600

1import json 

2import logging 

3from collections import defaultdict 

4 

5import pandas as pd 

6 

7from instawell.core.data_models import Replicate, UniqueCondition 

8from instawell.core.exp_context import ExperimentContext 

9from instawell.core.parser import condition_from_string 

10from instawell.core.steps import StepFiles 

11from instawell.utils.logging_util import setup_experiment_logging 

12 

13logger = logging.getLogger(__name__) 

14 

15 

16def _parse_layout(layout_df: pd.DataFrame, replicates: set[str]) -> None: 

17 """ 

18 Modifies replicates in place 

19 """ 

20 for col in layout_df.columns: 

21 if col.startswith("well") or col.startswith("Well"): 

22 continue 

23 replicates.update(layout_df[col].unique()) 

24 

25 

26def _parse_conditions( 

27 layout_df: pd.DataFrame, 

28 replicates: set[str], 

29 experiment_info: dict[str, UniqueCondition], 

30 ctx: ExperimentContext, 

31) -> None: 

32 """ 

33 Extract unique conditions from the layout DataFrame. 

34 Currently a placeholder for any future condition extraction steps. 

35 """ 

36 _well_identifier = ctx.well_col_identifier.strip().lower() 

37 for index, row in layout_df.iterrows(): 

38 for col in layout_df.columns: 

39 if col.lower().startswith(_well_identifier): 

40 continue 

41 condition_str = row[col] 

42 

43 # Skip empty or placeholder conditions 

44 if condition_str in {ctx.empty_condition_mask, ""}: 

45 continue 

46 if pd.isna(condition_str): 

47 continue 

48 # Use the new parser to extract components 

49 try: 

50 condition_obj = condition_from_string( 

51 condition_str, 

52 delimiter=ctx.condition_separator, 

53 fields=ctx.fields, 

54 include_replicates=False, # We'll add replicates manually 

55 ) 

56 except ValueError as e: 

57 logger.warning( 

58 f"Failed to parse condition '{condition_str}' in row {index}, column {col}: {e}" 

59 ) 

60 continue 

61 

62 # Create a replicate for this well 

63 replicate = Replicate( 

64 well_row=row[ctx.well_col_identifier], 

65 well_column=str(col), 

66 well_name=row[ctx.well_col_identifier] + str(col), 

67 ) 

68 

69 # Get the full condition name 

70 full_name = condition_obj.full_name 

71 

72 # Add to experiment info 

73 if full_name in replicates: 

74 experiment_info[full_name] = condition_obj 

75 replicates.remove(full_name) 

76 experiment_info[full_name].replicates.append(replicate) 

77 else: 

78 experiment_info[full_name].replicates.append(replicate) 

79 

80 # Check for conditions with only one replicate 

81 for condition, info in experiment_info.items(): 

82 if len(info.replicates) == 1: 

83 logger.warning(f"Condition {condition} has only one replicate. Check for typos!") 

84 

85 # Check if all replicates are accounted for 

86 if len(replicates) == 0 or ( 

87 len(replicates) == 1 and ctx.empty_condition_placeholder in replicates 

88 ): 

89 logger.info("All replicates accounted for in the layout data.") 

90 else: 

91 logger.warning( 

92 f"There may be an error with layout data: {replicates}. Please check the layout data." 

93 ) 

94 

95 

96def _save_experiment_info( 

97 experiment_info: dict[str, UniqueCondition], ctx: ExperimentContext 

98) -> None: 

99 # Save experiment info to JSON 

100 info_path = ctx.experiment_dir / "experiment_info.json" 

101 info_path.parent.mkdir(parents=True, exist_ok=True) 

102 

103 # Create .gitignore in experiment directory 

104 with open(info_path.parent / ".gitignore", "w") as f: 

105 f.write("*\n") 

106 

107 # Save the experiment info 

108 with open(info_path, "w") as f: 

109 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4) 

110 

111 

112def _get_unique_conditions( 

113 layout_df: pd.DataFrame, 

114 ctx: ExperimentContext, 

115) -> dict[str, UniqueCondition]: 

116 experiment_info = defaultdict(UniqueCondition) 

117 replicates: set[str] = set() 

118 _parse_layout(layout_df, replicates) 

119 _parse_conditions(layout_df, replicates, experiment_info, ctx) 

120 _save_experiment_info(experiment_info, ctx) 

121 return experiment_info 

122 

123 

124def _initial_raw_data_organize( 

125 initial_raw_data: pd.DataFrame, 

126 experiment_info: dict[str, UniqueCondition], 

127 ctx: ExperimentContext, 

128) -> pd.DataFrame: 

129 """ 

130 Organizes the raw data based on the layout data. 

131 """ 

132 # Create a new DataFrame to hold the organized data, hardcode Temperature as the temperature column 

133 raw_data_long = initial_raw_data.melt( 

134 id_vars=[ctx.temperature_column], var_name="well", value_name="value" 

135 ) 

136 if ctx.temperature_column != "Temperature": 

137 raw_data_long = raw_data_long.rename(columns={ctx.temperature_column: "Temperature"}) 

138 

139 # Initialize field columns to ensure they exist (regardless of whether loop runs) 

140 # raw_data_long["ligand"] = "" 

141 # raw_data_long["protein"] = "" 

142 # raw_data_long["buffer"] = "" 

143 # raw_data_long["concentration"] = "" 

144 

145 sep = ctx.condition_separator 

146 for _, info in experiment_info.items(): 

147 ligand = info.ligand_name 

148 protein = info.protein_name 

149 buffer = info.buffer_condition 

150 concentration = info.concentration 

151 

152 replicate_wells = [rep.well_name for rep in info.replicates] 

153 

154 # create a mask for the rows that have a well that is in replicate_wells 

155 mask = raw_data_long["well"].isin(replicate_wells) 

156 

157 # add the columns to the raw data long 

158 

159 raw_data_long.loc[mask, "ligand"] = ligand 

160 raw_data_long.loc[mask, "protein"] = protein 

161 raw_data_long.loc[mask, "buffer"] = buffer 

162 raw_data_long.loc[mask, "concentration"] = concentration 

163 raw_data_long["well_unqcond"] = ( 

164 raw_data_long["well"] 

165 + sep 

166 + raw_data_long["concentration"] 

167 + sep 

168 + raw_data_long["ligand"] 

169 + sep 

170 + raw_data_long["protein"] 

171 + sep 

172 + raw_data_long["buffer"] 

173 ) 

174 return raw_data_long 

175 

176 

177def ingest_data( 

178 ctx: ExperimentContext, 

179) -> None: 

180 """ 

181 The first step of the data processing pipeline. 

182 It reads the raw data and layout data, gets the unique conditions, 

183 and organizes the raw data based on the layout data. 

184 

185 Args: 

186 ctx: ExperimentContext containing experiment configuration, can be from setup_experiment() or from load_experiment_context() 

187 

188 Examples: 

189 >>> # Default field order 

190 >>> first_step("raw.csv", "layout.csv", "exp1") 

191 >>> # Custom field order 

192 >>> first_step("raw.csv", "layout.csv", "exp1", 

193 ... fields=("ligand", "protein", "concentration", "buffer")) 

194 """ 

195 

196 if ctx.log_to_file: 

197 setup_experiment_logging( 

198 experiment_dir=ctx.experiment_dir, filename="experiment.log", level=ctx.log_level 

199 ) 

200 initial_raw_data = pd.read_csv(ctx.raw_data_path) 

201 layout_data = pd.read_csv(ctx.layout_data_path) 

202 # Copy the raw data and layout data to the experiment directory 

203 

204 # Get unique conditions from the layout data 

205 experiment_info = _get_unique_conditions(layout_data, ctx) 

206 

207 # Organize the raw data based on the layout data 

208 raw_organized_data = _initial_raw_data_organize(initial_raw_data, experiment_info, ctx) 

209 

210 # write the organized data to a csv file in the experiment directory 

211 organized_data_path = ctx.experiment_dir / StepFiles.INGESTED_DATA 

212 raw_organized_data.to_csv(organized_data_path, index=False) 

213 

214 # write to an experiment log file 

215 logger.info(f"Organized data saved to {organized_data_path}")