Coverage for src/instawell/main.py: 56%

402 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-28 21:17 -0500

1import json 

2import logging 

3from collections import defaultdict 

4from pathlib import Path 

5from typing import Optional 

6 

7import numpy as np 

8import pandas as pd 

9import plotly.express as px 

10from pydantic import FilePath 

11 

12from instawell.data_models import Replicate, UniqueCondition 

13from instawell.parser import condition_from_string 

14 

15# set logging level to INFO 

16logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") 

17 

18 

19# class Replicate(BaseModel): 

20# well_row: str 

21# well_column: str 

22# well_name: str 

23# # temp_data: Optional[pd.DataFrame] = None 

24 

25 

26# class UniqueCondition(BaseModel): 

27# full_name: str = "" 

28# concentration: str = "" 

29# ligand_name: str = "" 

30# protein_name: str = "" 

31# buffer_condition: str = "" 

32# replicates: List[Replicate] = Field(default_factory=list) 

33 

34 

35def get_unique_conditions_old( 

36 layout_df: pd.DataFrame, experiment_name: str 

37) -> dict[str, UniqueCondition]: 

38 """ 

39 OLD VERSION - Kept for reference. Uses manual string splitting. 

40 This version doesn't handle underscores in component names. 

41 Use get_unique_conditions() instead. 

42 """ 

43 # layout_df = pd.read_csv(layout_data) 

44 

45 experiment_info = defaultdict(UniqueCondition) 

46 replicates = set() 

47 # loop through columns in layout 

48 for col in layout_df.columns: 

49 if col.startswith("well") or col.startswith("Well"): 

50 continue 

51 replicates.update(layout_df[col].unique()) 

52 

53 for index, row in layout_df.iterrows(): 

54 for col in layout_df.columns: 

55 if col.startswith("well") or col.startswith("Well"): 

56 continue 

57 condition = row[col] 

58 

59 if pd.isna(condition) or condition == "" or condition == "0_0_0_0": 

60 continue 

61 parts = condition.split("_") 

62 if len(parts) < 4: 

63 logging.warning( 

64 f"Condition '{condition}' in row {index}, column {col} does not have enough parts to be valid. Check for typos!" 

65 ) 

66 continue 

67 # print(parts) 

68 full_name = condition 

69 # print(full_name) 

70 concentration = parts[-4] 

71 ligand_name = parts[-3] 

72 protein_name = parts[-2] 

73 buffer_condition = parts[-1] 

74 

75 replicate = Replicate( 

76 well_row=row["Well"], 

77 well_column=str(col), 

78 well_name=row["Well"] + str(col), 

79 ) 

80 condition = UniqueCondition( 

81 full_name=full_name, 

82 concentration=concentration, 

83 ligand_name=ligand_name, 

84 protein_name=protein_name, 

85 buffer_condition=buffer_condition, 

86 ) 

87 if full_name in replicates: 

88 experiment_info[full_name] = condition 

89 # remove the replacate from the replicates set 

90 replicates.remove(full_name) 

91 experiment_info[full_name].replicates.append(replicate) 

92 else: 

93 experiment_info[full_name].replicates.append(replicate) 

94 

95 # loop through the experiment info and see if any conditions have only one replicate 

96 for condition, info in experiment_info.items(): 

97 if len(info.replicates) == 1: 

98 # if there is only one replicate, we can remove the condition 

99 logging.warning(f"Condition {condition} has only one replicate. Check for Typos!") 

100 

101 # check if replicates set is empty or only includes "0_0_0_0" 

102 if len(replicates) == 0 or (len(replicates) == 1 and "0_0_0_0" in replicates): 

103 logging.info("All replicates accounted for in the layout data.") 

104 else: 

105 logging.warning( 

106 f"There may be an error with layout data: {replicates}. Please check the layout data." 

107 ) 

108 

109 # save the experiment info to a json file in the experiment directory 

110 info_path = Path(experiment_name) / "experiment_info.json" 

111 info_path.parent.mkdir(parents=True, exist_ok=True) 

112 # put a .gitignore file in the experiment directory to ignore all files 

113 with open(info_path.parent / ".gitignore", "w") as f: 

114 f.write("*\n") 

115 # save the experiment info to a json file 

116 with open(info_path, "w") as f: 

117 # exp_dict = {k: v.model_dump() for k, v in experiment_info.items()} 

118 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4) 

119 logging.info(f"Experiment info saved to {info_path}") 

120 return experiment_info 

121 

122 

123def get_unique_conditions( 

124 layout_df: pd.DataFrame, 

125 experiment_name: str, 

126 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"), 

127) -> dict[str, UniqueCondition]: 

128 """ 

129 Parse layout data to extract unique experimental conditions and their replicates. 

130 

131 This version uses the robust parser module which handles underscores in component names. 

132 

133 Args: 

134 layout_df: DataFrame with layout information. Must have a 'Well' column and 

135 condition columns (any column not starting with 'well'/'Well') 

136 experiment_name: Name of the experiment (used to create output directory) 

137 fields: Ordered tuple of field names to parse from condition strings. 

138 Default: ("concentration", "ligand", "protein", "buffer") 

139 The last len(fields) components of each condition string will be parsed in this order. 

140 

141 Returns: 

142 Dictionary mapping condition strings to UniqueCondition objects 

143 

144 Examples: 

145 >>> layout_df = pd.read_csv("layout.csv") 

146 >>> # Default parsing 

147 >>> conditions = get_unique_conditions(layout_df, "exp1") 

148 >>> # Custom field order 

149 >>> conditions = get_unique_conditions(layout_df, "exp1", 

150 ... fields=("ligand", "protein", "concentration", "buffer")) 

151 """ 

152 experiment_info = defaultdict(UniqueCondition) 

153 replicates = set() 

154 

155 # Collect all unique conditions first 

156 for col in layout_df.columns: 

157 if col.startswith("well") or col.startswith("Well"): 

158 continue 

159 replicates.update(layout_df[col].unique()) 

160 

161 # Parse each condition in the layout 

162 for index, row in layout_df.iterrows(): 

163 for col in layout_df.columns: 

164 if col.startswith("well") or col.startswith("Well"): 

165 continue 

166 condition_str = row[col] 

167 

168 # Skip empty or placeholder conditions 

169 if pd.isna(condition_str) or condition_str == "" or condition_str == "0_0_0_0": 

170 continue 

171 

172 # Use the new parser to extract components 

173 try: 

174 condition_obj = condition_from_string( 

175 condition_str, 

176 fields=fields, 

177 include_replicates=False # We'll add replicates manually 

178 ) 

179 except ValueError as e: 

180 logging.warning( 

181 f"Failed to parse condition '{condition_str}' in row {index}, column {col}: {e}" 

182 ) 

183 continue 

184 

185 # Create a replicate for this well 

186 replicate = Replicate( 

187 well_row=row["Well"], 

188 well_column=str(col), 

189 well_name=row["Well"] + str(col), 

190 ) 

191 

192 # Get the full condition name 

193 full_name = condition_obj.full_name 

194 

195 # Add to experiment info 

196 if full_name in replicates: 

197 experiment_info[full_name] = condition_obj 

198 replicates.remove(full_name) 

199 experiment_info[full_name].replicates.append(replicate) 

200 else: 

201 experiment_info[full_name].replicates.append(replicate) 

202 

203 # Check for conditions with only one replicate 

204 for condition, info in experiment_info.items(): 

205 if len(info.replicates) == 1: 

206 logging.warning(f"Condition {condition} has only one replicate. Check for typos!") 

207 

208 # Check if all replicates are accounted for 

209 if len(replicates) == 0 or (len(replicates) == 1 and "0_0_0_0" in replicates): 

210 logging.info("All replicates accounted for in the layout data.") 

211 else: 

212 logging.warning( 

213 f"There may be an error with layout data: {replicates}. Please check the layout data." 

214 ) 

215 

216 # Save experiment info to JSON 

217 info_path = Path(experiment_name) / "experiment_info.json" 

218 info_path.parent.mkdir(parents=True, exist_ok=True) 

219 

220 # Create .gitignore in experiment directory 

221 with open(info_path.parent / ".gitignore", "w") as f: 

222 f.write("*\n") 

223 

224 # Save the experiment info 

225 with open(info_path, "w") as f: 

226 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4) 

227 

228 logging.info(f"Experiment info saved to {info_path}") 

229 return experiment_info 

230 

231 

232def initial_raw_data_organize( 

233 initial_raw_data: pd.DataFrame, 

234 experiment_info: dict[str, UniqueCondition], 

235) -> pd.DataFrame: 

236 """ 

237 Organizes the raw data based on the layout data. 

238 """ 

239 # Create a new DataFrame to hold the organized data 

240 raw_data_long = initial_raw_data.melt( 

241 id_vars=["Temperature"], var_name="well", value_name="value" 

242 ) 

243 for condition, info in experiment_info.items(): 

244 ligand = info.ligand_name 

245 protein = info.protein_name 

246 buffer = info.buffer_condition 

247 concentration = info.concentration 

248 

249 replicate_wells = [rep.well_name for rep in info.replicates] 

250 

251 # create a mask for the rows that have a well that is in replicate_wells 

252 mask = raw_data_long["well"].isin(replicate_wells) 

253 

254 # add the columns to the raw data long 

255 raw_data_long.loc[mask, "ligand"] = ligand 

256 raw_data_long.loc[mask, "protein"] = protein 

257 raw_data_long.loc[mask, "buffer"] = buffer 

258 raw_data_long.loc[mask, "concentration"] = concentration 

259 

260 raw_data_long["well_unqcond"] = ( 

261 raw_data_long["well"] 

262 + "_" 

263 + raw_data_long["concentration"] 

264 + "_" 

265 + raw_data_long["ligand"] 

266 + "_" 

267 + raw_data_long["protein"] 

268 + "_" 

269 + raw_data_long["buffer"] 

270 ) 

271 return raw_data_long 

272 

273 

274def first_step( 

275 raw_data_path: FilePath, 

276 layout_data_path: FilePath, 

277 experiment_name: str = "experiment_1", 

278 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"), 

279) -> None: 

280 """ 

281 The first step of the data processing pipeline. 

282 It reads the raw data and layout data, gets the unique conditions, 

283 and organizes the raw data based on the layout data. 

284 

285 Args: 

286 raw_data_path: Path to the raw data CSV file 

287 layout_data_path: Path to the layout CSV file 

288 experiment_name: Name of the experiment (creates a directory with this name) 

289 fields: Ordered tuple of field names to parse from condition strings. 

290 Default: ("concentration", "ligand", "protein", "buffer") 

291 The last len(fields) components will be parsed in this order. 

292 

293 Examples: 

294 >>> # Default field order 

295 >>> first_step("raw.csv", "layout.csv", "exp1") 

296 >>> # Custom field order 

297 >>> first_step("raw.csv", "layout.csv", "exp1", 

298 ... fields=("ligand", "protein", "concentration", "buffer")) 

299 """ 

300 # ensure that the input files are valid paths 

301 

302 # Read the initial raw data 

303 try: 

304 initial_raw_data = pd.read_csv(raw_data_path) 

305 layout_data = pd.read_csv(layout_data_path) 

306 except FileNotFoundError as e: 

307 raise FileNotFoundError(f"File not found: {e.filename}. Please check the file path.") 

308 

309 # create the experiment directory 

310 experiment_dir = Path(experiment_name) 

311 experiment_dir.mkdir(parents=True, exist_ok=True) 

312 with open(experiment_dir / ".gitignore", "w") as f: 

313 f.write("*\n") 

314 

315 # Get unique conditions from the layout data 

316 experiment_info = get_unique_conditions(layout_data, experiment_name, fields) 

317 

318 # Organize the raw data based on the layout data 

319 raw_organized_data = initial_raw_data_organize(initial_raw_data, experiment_info) 

320 

321 # write the organized data to a csv file in the experiment directory 

322 organized_data_path = experiment_dir / "raw_organized_data.csv" 

323 raw_organized_data.to_csv(organized_data_path, index=False) 

324 logging.info(f"Organized data saved to {organized_data_path}") 

325 

326 

327def create_figures_generator(experiment_name: str): 

328 """ 

329 Generates Plotly figures from grouped data. 

330 

331 This function groups the input DataFrame by 'ligand', 'protein', and 

332 'buffer', then yields a line plot figure for each group. 

333 

334 Args: 

335 data_df: A pandas DataFrame containing the data to plot. 

336 It must include 'ligand', 'protein', 'buffer', 

337 'Temperature', 'value', and 'well_unqcond' columns. 

338 

339 Yields: 

340 A Plotly figure object for each group. 

341 """ 

342 # Load the organized data 

343 organized_data_path = Path(experiment_name) / "raw_organized_data.csv" 

344 try: 

345 data_df = pd.read_csv(organized_data_path) 

346 except FileNotFoundError: 

347 raise FileNotFoundError(f"Organized data file not found: {organized_data_path}") 

348 

349 # ensure the necessary columns are present 

350 required_columns = [ 

351 "Temperature", 

352 "well", 

353 "value", 

354 "ligand", 

355 "protein", 

356 "buffer", 

357 "concentration", 

358 "well_unqcond", 

359 ] 

360 for col in required_columns: 

361 if col not in data_df.columns: 

362 raise ValueError(f"Missing required column: {col} in the data.") 

363 grouped_data = data_df.groupby(["ligand", "protein", "buffer"]) 

364 

365 for (ligand, protein, buffer), group in grouped_data: 

366 fig = px.line( 

367 group, 

368 x="Temperature", 

369 y="value", 

370 color="well_unqcond", 

371 title=f"Raw Data for {ligand} and {protein} in {buffer} Buffer", 

372 ) 

373 yield fig 

374 

375 

376def filter_organized_data( 

377 experiment_name: str, 

378 wells_to_filter: list[str], 

379) -> None: 

380 """ 

381 Filters the organized data based on the provided parameters. 

382 """ 

383 organized_data_path = Path(experiment_name) / "raw_organized_data.csv" 

384 try: 

385 organized_data = pd.read_csv(organized_data_path) 

386 except FileNotFoundError: 

387 raise FileNotFoundError(f"Organized data file not found: {organized_data_path}") 

388 filtered_data_path = Path(experiment_name) / "filtered_organized_data.csv" 

389 required_columns = [ 

390 "Temperature", 

391 "well", 

392 "value", 

393 "ligand", 

394 "protein", 

395 "buffer", 

396 "concentration", 

397 "well_unqcond", 

398 ] 

399 for col in required_columns: 

400 if col not in organized_data.columns: 

401 raise ValueError(f"Missing required column: {col} in the data.") 

402 if len(wells_to_filter) == 0: 

403 logging.info("No wells to filter. Returning the original data.") 

404 # save the organized data to a csv file in the experiment directory 

405 organized_data.to_csv(filtered_data_path, index=False) 

406 logging.info(f"Filtered data saved to {filtered_data_path}") 

407 # save a .txt file with the filtered wells 

408 with open(Path(experiment_name) / "filtered_wells.txt", "w") as f: 

409 f.write("No wells filtered.") 

410 return 

411 

412 # first check if each well in wells is in the organized_data 

413 for well in wells_to_filter: 

414 if well not in organized_data["well"].unique(): 

415 raise ValueError( 

416 f"Well {well} not found in organized data. Please check the well names." 

417 ) 

418 # Filter the organized data to remove the specified wells 

419 for well in wells_to_filter: 

420 organized_data = organized_data[organized_data["well"] != well] 

421 logging.info(f"Filtered out well: {well}") 

422 

423 # save the filtered data to a csv file in the experiment directory 

424 organized_data.to_csv(filtered_data_path, index=False) 

425 logging.info(f"Filtered data saved to {filtered_data_path}") 

426 

427 # save a .txt file with the filtered wells 

428 with open(Path(experiment_name) / "filtered_wells.txt", "w") as f: 

429 f.write("\n".join(wells_to_filter)) 

430 

431 

432def split_unqcon_column_old(data: pd.DataFrame) -> pd.DataFrame: 

433 """ 

434 OLD VERSION - Kept for reference. Uses manual string splitting. 

435 This version assumes exactly 4 underscore-separated components. 

436 Use split_unqcon_column() instead. 

437 """ 

438 parts = data["unqcond"].str.split("_", expand=True) 

439 data["concentration"] = parts[0] 

440 data["ligand"] = parts[1] 

441 data["protein"] = parts[2] 

442 data["buffer"] = parts[3] 

443 return data 

444 

445 

446def split_unqcon_column( 

447 data: pd.DataFrame, 

448 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"), 

449) -> pd.DataFrame: 

450 """ 

451 Split the 'unqcond' column into its component parts using the robust parser. 

452 

453 This version handles underscores in component names correctly. 

454 

455 Args: 

456 data: DataFrame with an 'unqcond' column containing condition strings 

457 fields: Ordered tuple of field names to parse from condition strings. 

458 Default: ("concentration", "ligand", "protein", "buffer") 

459 The last len(fields) components of each condition string will be parsed in this order. 

460 

461 Returns: 

462 DataFrame with added columns corresponding to the field names 

463 

464 Examples: 

465 >>> df = pd.DataFrame({"unqcond": ["500uM_ATP_Fic_buffer1"]}) 

466 >>> # Default parsing 

467 >>> df = split_unqcon_column(df) 

468 >>> print(df[["concentration", "ligand", "protein", "buffer"]]) 

469 

470 >>> # Custom field order 

471 >>> df = split_unqcon_column(df, fields=("ligand", "protein", "concentration", "buffer")) 

472 """ 

473 # Parse each condition string 

474 parsed_conditions = [] 

475 for condition_str in data["unqcond"]: 

476 try: 

477 condition_obj = condition_from_string( 

478 condition_str, 

479 fields=fields, 

480 include_replicates=False 

481 ) 

482 # Map the parsed fields to their values 

483 parsed_conditions.append({ 

484 "concentration": condition_obj.concentration, 

485 "ligand": condition_obj.ligand_name, 

486 "protein": condition_obj.protein_name, 

487 "buffer": condition_obj.buffer_condition, 

488 }) 

489 except ValueError as e: 

490 # If parsing fails, use empty strings for all fields 

491 logging.warning(f"Failed to parse condition '{condition_str}': {e}") 

492 parsed_conditions.append({ 

493 "concentration": "", 

494 "ligand": "", 

495 "protein": "", 

496 "buffer": "", 

497 }) 

498 

499 # Add the parsed columns to the dataframe 

500 parsed_df = pd.DataFrame(parsed_conditions) 

501 for field in ["concentration", "ligand", "protein", "buffer"]: 

502 data[field] = parsed_df[field] 

503 

504 return data 

505 

506 

507def avg_across_replicates( 

508 organized_data: pd.DataFrame, 

509) -> pd.DataFrame: 

510 """ 

511 Averages the data across replicates. 

512 """ 

513 # Group by the unique condition and temperature, then average the values 

514 organized_data["unqcond"] = ( 

515 organized_data["concentration"] 

516 + "_" 

517 + organized_data["ligand"] 

518 + "_" 

519 + organized_data["protein"] 

520 + "_" 

521 + organized_data["buffer"] 

522 ) 

523 # Drop well_unqcond as it is not needed for averaging 

524 # organized_data = organized_data.drop(columns=["well_unqcond"]) 

525 # Add a column for the unique condition 

526 # averaged_data = raw_data_long.groupby(['Temperature', 'combination2']).agg({'value': 'mean'}).reset_index() 

527 

528 averaged_data = ( 

529 organized_data.groupby(["Temperature", "unqcond"]).agg({"value": "mean"}).reset_index() 

530 ) 

531 

532 averaged_data_pivot = averaged_data.pivot( 

533 index="Temperature", columns="unqcond", values="value" 

534 ) 

535 

536 # averaged_data_pivot = split_unqcon_column(averaged_data_pivot) 

537 

538 return averaged_data_pivot 

539 

540 

541def convert_concentration_to_float(concentration: str) -> float: 

542 if "uM" in concentration: 

543 return float(concentration.replace("uM", "").strip()) 

544 elif "mM" in concentration: 

545 return float(concentration.replace("mM", "").strip()) * 1000 # Convert mM to uM 

546 elif "nM" in concentration: 

547 # check if it is zero 

548 c = concentration.replace("nM", "").strip() 

549 if c == "0": 

550 return float(c) 

551 return float(c) / 1000 

552 else: 

553 return float(concentration.strip()) 

554 

555 

556def average_accross_replicates(experiment_name: str) -> None: 

557 """ 

558 The second step of the data processing pipeline. 

559 It filters the organized data based on the provided parameters. 

560 """ 

561 filtered_data_path = Path(experiment_name) / "filtered_organized_data.csv" 

562 if not filtered_data_path.exists(): 

563 raise FileNotFoundError(f"Filtered data file not found: {filtered_data_path}") 

564 # Load the filtered data 

565 filtered_data = pd.read_csv(filtered_data_path) 

566 required_columns = [ 

567 "Temperature", 

568 "well", 

569 "value", 

570 "ligand", 

571 "protein", 

572 "buffer", 

573 "concentration", 

574 "well_unqcond", 

575 ] 

576 for col in required_columns: 

577 if col not in filtered_data.columns: 

578 raise ValueError(f"Missing required column: {col} in the data.") 

579 

580 averaged_data = avg_across_replicates(organized_data=filtered_data) 

581 

582 # in the averaged across replicates data, we need to sort the columns (except for Temperature) by matching ligand, protein, and buffer 

583 # Sort the columns based on ligand, protein, and buffer. then within each group, sort by increasing concentration 

584 # get the columns except for Temperature 

585 columns_to_sort = averaged_data.columns[1:] # Exclude 'Temperature' 

586 sorted_columns = sorted( 

587 columns_to_sort, 

588 key=lambda x: ( 

589 x.split("_")[1], # ligand 

590 x.split("_")[2], # protein 

591 x.split("_")[3], # buffer 

592 convert_concentration_to_float( 

593 x.split("_")[0] 

594 ), # concentration, convert to float for sorting 

595 ), 

596 ) 

597 # print(f"Sorted columns: {sorted_columns}") 

598 # Reorder the columns in the DataFrame 

599 averaged_data_sorted = averaged_data[sorted_columns] 

600 # Reset the index to make Temperature a column again 

601 averaged_data_sorted.reset_index(inplace=True) 

602 averaged_data.reset_index(inplace=True) 

603 # Add the Temperature column back to the front 

604 # averaged_data_sorted.insert(0, "Temperature", averaged_data["Temperature"]) 

605 

606 # Save the averaged data to a CSV file in the experiment directory 

607 averaged_data_path = Path(experiment_name) / "averaged_data.csv" 

608 averaged_data_sorted.to_csv(averaged_data_path, index=False) 

609 logging.info(f"Averaged data saved to {averaged_data_path}") 

610 

611 

612def create_averaged_figures_generator(experiment_name: str): 

613 """ 

614 Generates Plotly figures from grouped data. 

615 

616 This function groups the input DataFrame by 'ligand', 'protein', and 

617 'buffer', then yields a line plot figure for each group. 

618 

619 Args: 

620 data_df: A pandas DataFrame containing the data to plot. 

621 It must include 'ligand', 'protein', 'buffer', 

622 'Temperature', 'value', and 'well_unqcond' columns. 

623 

624 Yields: 

625 A Plotly figure object for each group. 

626 """ 

627 # Load the organized data 

628 averaged_data_path = Path(experiment_name) / "averaged_data.csv" 

629 try: 

630 data_df = pd.read_csv(averaged_data_path) 

631 except FileNotFoundError: 

632 raise FileNotFoundError(f"Averaged data file not found: {averaged_data_path}") 

633 

634 # ensire the first column is 'Temperature' 

635 if data_df.columns[0] != "Temperature": 

636 raise ValueError("The first column must be 'Temperature'.") 

637 

638 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value") 

639 # ensure the necessary columns are present 

640 plotting_df = split_unqcon_column(plotting_df) 

641 required_columns = [ 

642 "Temperature", 

643 "unqcond", 

644 "value", 

645 "concentration", 

646 "ligand", 

647 "protein", 

648 "buffer", 

649 ] 

650 

651 for col in required_columns: 

652 if col not in plotting_df.columns: 

653 raise ValueError(f"Missing required column: {col} in the data.") 

654 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"]) 

655 

656 for (ligand, protein, buffer), group in grouped_data: 

657 fig = px.line( 

658 group, 

659 x="Temperature", 

660 y="value", 

661 color="unqcond", 

662 title=f"Avg Raw Data for {ligand} and {protein} in {buffer} Buffer", 

663 ) 

664 yield fig 

665 

666 

667def find_background_column( 

668 data: pd.DataFrame, concentration: str, ligand: str, protein: str, buffer: str 

669) -> Optional[str]: 

670 """Helper function to find the background column in the data.""" 

671 if protein == "NPC": 

672 return None # Dont remove background for NPC 

673 for col in data.columns: 

674 if f"{concentration}_{ligand}_NPC_{buffer}" in col: 

675 return col 

676 return None 

677 

678 

679def subtract_background(experiment_name: str) -> None: 

680 """Finds the background column for each unique condition and subtracts it from the data. The BG col should be in the format 'concentration_ligand_NPC_buffer'.""" 

681 # Build the path to the averaged data 

682 averaged_data_path = Path(experiment_name) / "averaged_data.csv" 

683 if not averaged_data_path.exists(): 

684 raise FileNotFoundError(f"Averaged data file not found: {averaged_data_path}") 

685 # Load the averaged data 

686 data = pd.read_csv(averaged_data_path) 

687 

688 # create a dictionary of the column names 

689 columns_dict = {} 

690 for col in data.columns: 

691 if col == "Temperature": 

692 continue 

693 columns_dict[col] = False 

694 

695 for col in data.columns: 

696 parts = col.split("_") 

697 if len(parts) < 4: 

698 continue 

699 concentration = parts[0] 

700 ligand = parts[1] 

701 protein = parts[2] 

702 buffer = parts[3] 

703 

704 background_col = find_background_column(data, concentration, ligand, protein, buffer) 

705 

706 if background_col and background_col in data.columns: 

707 data[col] = data[col] - data[background_col] 

708 columns_dict[background_col] = True 

709 columns_dict[col] = True 

710 logging.info(f"Background subtracted for {col} using {background_col} as background.") 

711 

712 # ensure that all columns have been marked as True 

713 for col, marked in columns_dict.items(): 

714 if not marked: 

715 logging.warning( 

716 f"Warning: Column {col} was not processed for background subtraction. Check if the background column exists." 

717 ) 

718 

719 # Remove the background columns 

720 data = data.loc[:, ~data.columns.str.contains("NPC")] 

721 

722 # Save the data with background subtracted 

723 background_subtracted_path = Path(experiment_name) / "background_subtracted_data.csv" 

724 data.to_csv(background_subtracted_path, index=False) 

725 logging.info(f"Background subtracted data saved to {background_subtracted_path}") 

726 

727 

728def create_bgsubtracted_figures_generator(experiment_name: str): 

729 """ 

730 Generates Plotly figures from grouped data. 

731 

732 This function groups the input DataFrame by 'ligand', 'protein', and 

733 'buffer', then yields a line plot figure for each group. 

734 

735 Args: 

736 data_df: A pandas DataFrame containing the data to plot. 

737 It must include 'ligand', 'protein', 'buffer', 

738 'Temperature', 'value', and 'well_unqcond' columns. 

739 

740 Yields: 

741 A Plotly figure object for each group. 

742 """ 

743 # Load the organized data 

744 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv" 

745 try: 

746 data_df = pd.read_csv(bg_data_path) 

747 except FileNotFoundError: 

748 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}") 

749 

750 # ensire the first column is 'Temperature' 

751 if data_df.columns[0] != "Temperature": 

752 raise ValueError("The first column must be 'Temperature'.") 

753 

754 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value") 

755 # ensure the necessary columns are present 

756 plotting_df = split_unqcon_column(plotting_df) 

757 required_columns = [ 

758 "Temperature", 

759 "unqcond", 

760 "value", 

761 "concentration", 

762 "ligand", 

763 "protein", 

764 "buffer", 

765 ] 

766 

767 for col in required_columns: 

768 if col not in plotting_df.columns: 

769 raise ValueError(f"Missing required column: {col} in the data.") 

770 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"]) 

771 

772 for (ligand, protein, buffer), group in grouped_data: 

773 fig = px.line( 

774 group, 

775 x="Temperature", 

776 y="value", 

777 color="unqcond", 

778 title=f"BG subtracted Data for {ligand} and {protein} in {buffer} Buffer", 

779 ) 

780 yield fig 

781 

782 

783def min_max_scale(experiment_name: str) -> None: 

784 """Min-max scales the background subtracted data.""" 

785 # Build the path to the background subtracted data 

786 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv" 

787 if not bg_data_path.exists(): 

788 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}") 

789 # Load the background subtracted data 

790 data = pd.read_csv(bg_data_path) 

791 # ensure the first column is 'Temperature' 

792 if data.columns[0] != "Temperature": 

793 raise ValueError("The first column must be 'Temperature'.") 

794 for col in data.columns: 

795 if col.startswith("Temperature"): 

796 continue 

797 if data[col].max() - data[col].min() == 0: 

798 continue # Avoid division by zero 

799 data[col] = (data[col] - data[col].min()) / (data[col].max() - data[col].min()) 

800 # Save the min-max scaled data 

801 scaled_data_path = Path(experiment_name) / "min_max_scaled_data.csv" 

802 data.to_csv(scaled_data_path, index=False) 

803 logging.info(f"Min-max scaled data saved to {scaled_data_path}") 

804 

805 

806def create_bgsub_minmax_figures_generator(experiment_name: str): 

807 """ 

808 Generates Plotly figures from grouped data. 

809 

810 This function groups the input DataFrame by 'ligand', 'protein', and 

811 'buffer', then yields a line plot figure for each group. 

812 

813 Args: 

814 data_df: A pandas DataFrame containing the data to plot. 

815 It must include 'ligand', 'protein', 'buffer', 

816 'Temperature', 'value', and 'well_unqcond' columns. 

817 

818 Yields: 

819 A Plotly figure object for each group. 

820 """ 

821 # Load the organized data 

822 bg_min_max_data_path = Path(experiment_name) / "min_max_scaled_data.csv" 

823 try: 

824 data_df = pd.read_csv(bg_min_max_data_path) 

825 except FileNotFoundError: 

826 raise FileNotFoundError(f"BG subtracted data file not found: {bg_min_max_data_path}") 

827 

828 # ensire the first column is 'Temperature' 

829 if data_df.columns[0] != "Temperature": 

830 raise ValueError("The first column must be 'Temperature'.") 

831 

832 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value") 

833 # ensure the necessary columns are present 

834 plotting_df = split_unqcon_column(plotting_df) 

835 required_columns = [ 

836 "Temperature", 

837 "unqcond", 

838 "value", 

839 "concentration", 

840 "ligand", 

841 "protein", 

842 "buffer", 

843 ] 

844 

845 for col in required_columns: 

846 if col not in plotting_df.columns: 

847 raise ValueError(f"Missing required column: {col} in the data.") 

848 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"]) 

849 

850 for (ligand, protein, buffer), group in grouped_data: 

851 fig = px.line( 

852 group, 

853 x="Temperature", 

854 y="value", 

855 color="unqcond", 

856 title=f"BG subtracted min-max Data for {ligand} and {protein} in {buffer} Buffer", 

857 ) 

858 yield fig 

859 

860 

861def calculate_derivative(experiment_name: str) -> None: 

862 """Calclulates the derivative of the background subtracted data with respect to Temperature. And Min-Max scales the data afterwards.""" 

863 # build a path the the bg subtracted data 

864 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv" 

865 if not bg_data_path.exists(): 

866 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}") 

867 # Load the background subtracted data 

868 data = pd.read_csv(bg_data_path) 

869 # ensure the first column is 'Temperature' 

870 if data.columns[0] != "Temperature": 

871 raise ValueError("The first column must be 'Temperature'.") 

872 

873 # Calculate the derivative with respect to Temperature 

874 data_derivative = data.copy() 

875 for col in data.columns[1:]: # Skip the first column (Temperature) 

876 data_derivative[col] = np.gradient(data[col], data["Temperature"]) 

877 # multiply by -1 to flip the sign 

878 data_derivative[col] *= -1 

879 

880 for col in data_derivative.columns: 

881 if col.startswith("Temperature"): 

882 continue 

883 if data_derivative[col].max() - data_derivative[col].min() == 0: 

884 continue # Avoid division by zero 

885 data_derivative[col] = (data_derivative[col] - data_derivative[col].min()) / ( 

886 data_derivative[col].max() - data_derivative[col].min() 

887 ) 

888 

889 # Save the derivative data 

890 derivative_data_path = Path(experiment_name) / "derivative_data.csv" 

891 data_derivative.to_csv(derivative_data_path, index=False) 

892 logging.info(f"Derivative data saved to {derivative_data_path}") 

893 

894 

895def create_derivative_figures_generator(experiment_name: str): 

896 """ 

897 Generates Plotly figures from grouped data. 

898 

899 This function groups the input DataFrame by 'ligand', 'protein', and 

900 'buffer', then yields a line plot figure for each group. 

901 

902 Args: 

903 data_df: A pandas DataFrame containing the data to plot. 

904 It must include 'ligand', 'protein', 'buffer', 

905 'Temperature', 'value', and 'well_unqcond' columns. 

906 

907 Yields: 

908 A Plotly figure object for each group. 

909 """ 

910 # Load the organized data 

911 derivative_data_path = Path(experiment_name) / "derivative_data.csv" 

912 try: 

913 data_df = pd.read_csv(derivative_data_path) 

914 except FileNotFoundError: 

915 raise FileNotFoundError(f"BG subtracted data file not found: {derivative_data_path}") 

916 

917 # ensire the first column is 'Temperature' 

918 if data_df.columns[0] != "Temperature": 

919 raise ValueError("The first column must be 'Temperature'.") 

920 

921 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value") 

922 # ensure the necessary columns are present 

923 plotting_df = split_unqcon_column(plotting_df) 

924 required_columns = [ 

925 "Temperature", 

926 "unqcond", 

927 "value", 

928 "concentration", 

929 "ligand", 

930 "protein", 

931 "buffer", 

932 ] 

933 

934 for col in required_columns: 

935 if col not in plotting_df.columns: 

936 raise ValueError(f"Missing required column: {col} in the data.") 

937 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"]) 

938 

939 for (ligand, protein, buffer), group in grouped_data: 

940 fig = px.line( 

941 group, 

942 x="Temperature", 

943 y="value", 

944 color="unqcond", 

945 title=f"Derivative Data for {ligand} and {protein} in {buffer} Buffer", 

946 ) 

947 yield fig 

948 

949 

950def find_min_temperature(experiment_name: str) -> None: 

951 """Finds the minimum temperature for each unique condition in the derivative data.""" 

952 # Build the path to the derivative data 

953 derivative_data_path = Path(experiment_name) / "derivative_data.csv" 

954 if not derivative_data_path.exists(): 

955 raise FileNotFoundError(f"Derivative data file not found: {derivative_data_path}") 

956 # Load the derivative data 

957 data = pd.read_csv(derivative_data_path) 

958 # ensure the first column is 'Temperature' 

959 if data.columns[0] != "Temperature": 

960 raise ValueError("The first column must be 'Temperature'.") 

961 min_temps = {} 

962 for col in data.columns: # Skip the first column (Temperature) 

963 if col == "Temperature" or col == "index": 

964 continue 

965 min_index = data[col].idxmin() 

966 min_temp = data["Temperature"].iloc[min_index] 

967 min_temps[col] = min_temp 

968 # need to convert the min_temps dictionary to a DataFrame 

969 min_temps_df = pd.DataFrame(list(min_temps.items()), columns=["unqcond", "min_temperature"]) 

970 # Save the min temperatures to a CSV file 

971 min_temps_df = split_unqcon_column(min_temps_df) 

972 min_temps_path = Path(experiment_name) / "min_temperatures.csv" 

973 min_temps_df.to_csv(min_temps_path, index=False) 

974 logging.info(f"Min temperatures saved to {min_temps_path}") 

975 

976 

977def create_mintemp_figures_generator(experiment_name: str): 

978 """ 

979 Generates Plotly figures from grouped data. 

980 

981 This function groups the input DataFrame by 'ligand', 'protein', and 

982 'buffer', then yields a line plot figure for each group. 

983 

984 Args: 

985 data_df: A pandas DataFrame containing the data to plot. 

986 It must include 'ligand', 'protein', 'buffer', 

987 'Temperature', 'value', and 'well_unqcond' columns. 

988 

989 Yields: 

990 A Plotly figure object for each group. 

991 """ 

992 # Load the organized data 

993 min_temperatures_data_path = Path(experiment_name) / "min_temperatures.csv" 

994 try: 

995 data_df = pd.read_csv(min_temperatures_data_path) 

996 except FileNotFoundError: 

997 raise FileNotFoundError(f"BG subtracted data file not found: {min_temperatures_data_path}") 

998 

999 # ensure the necessary columns are present 

1000 required_columns = [ 

1001 "unqcond", 

1002 "min_temperature", 

1003 "concentration", 

1004 "ligand", 

1005 "protein", 

1006 "buffer", 

1007 ] 

1008 

1009 for col in required_columns: 

1010 if col not in data_df.columns: 

1011 raise ValueError(f"Missing required column: {col} in the data.") 

1012 

1013 data_df["concentration2"] = data_df["concentration"].apply(convert_concentration_to_float) 

1014 grouped_data = data_df.groupby(["ligand", "protein", "buffer"]) 

1015 

1016 for (ligand, protein, buffer), group in grouped_data: 

1017 fig = px.scatter( 

1018 group, 

1019 x="concentration2", 

1020 y="min_temperature", 

1021 color="ligand", 

1022 title=f"Min Temperature for {ligand} and {protein} in {buffer} Buffer", 

1023 ) 

1024 yield fig