Coverage for src/instawell/main.py: 56%
402 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-28 21:17 -0500
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-28 21:17 -0500
1import json
2import logging
3from collections import defaultdict
4from pathlib import Path
5from typing import Optional
7import numpy as np
8import pandas as pd
9import plotly.express as px
10from pydantic import FilePath
12from instawell.data_models import Replicate, UniqueCondition
13from instawell.parser import condition_from_string
15# set logging level to INFO
16logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
19# class Replicate(BaseModel):
20# well_row: str
21# well_column: str
22# well_name: str
23# # temp_data: Optional[pd.DataFrame] = None
26# class UniqueCondition(BaseModel):
27# full_name: str = ""
28# concentration: str = ""
29# ligand_name: str = ""
30# protein_name: str = ""
31# buffer_condition: str = ""
32# replicates: List[Replicate] = Field(default_factory=list)
35def get_unique_conditions_old(
36 layout_df: pd.DataFrame, experiment_name: str
37) -> dict[str, UniqueCondition]:
38 """
39 OLD VERSION - Kept for reference. Uses manual string splitting.
40 This version doesn't handle underscores in component names.
41 Use get_unique_conditions() instead.
42 """
43 # layout_df = pd.read_csv(layout_data)
45 experiment_info = defaultdict(UniqueCondition)
46 replicates = set()
47 # loop through columns in layout
48 for col in layout_df.columns:
49 if col.startswith("well") or col.startswith("Well"):
50 continue
51 replicates.update(layout_df[col].unique())
53 for index, row in layout_df.iterrows():
54 for col in layout_df.columns:
55 if col.startswith("well") or col.startswith("Well"):
56 continue
57 condition = row[col]
59 if pd.isna(condition) or condition == "" or condition == "0_0_0_0":
60 continue
61 parts = condition.split("_")
62 if len(parts) < 4:
63 logging.warning(
64 f"Condition '{condition}' in row {index}, column {col} does not have enough parts to be valid. Check for typos!"
65 )
66 continue
67 # print(parts)
68 full_name = condition
69 # print(full_name)
70 concentration = parts[-4]
71 ligand_name = parts[-3]
72 protein_name = parts[-2]
73 buffer_condition = parts[-1]
75 replicate = Replicate(
76 well_row=row["Well"],
77 well_column=str(col),
78 well_name=row["Well"] + str(col),
79 )
80 condition = UniqueCondition(
81 full_name=full_name,
82 concentration=concentration,
83 ligand_name=ligand_name,
84 protein_name=protein_name,
85 buffer_condition=buffer_condition,
86 )
87 if full_name in replicates:
88 experiment_info[full_name] = condition
89 # remove the replacate from the replicates set
90 replicates.remove(full_name)
91 experiment_info[full_name].replicates.append(replicate)
92 else:
93 experiment_info[full_name].replicates.append(replicate)
95 # loop through the experiment info and see if any conditions have only one replicate
96 for condition, info in experiment_info.items():
97 if len(info.replicates) == 1:
98 # if there is only one replicate, we can remove the condition
99 logging.warning(f"Condition {condition} has only one replicate. Check for Typos!")
101 # check if replicates set is empty or only includes "0_0_0_0"
102 if len(replicates) == 0 or (len(replicates) == 1 and "0_0_0_0" in replicates):
103 logging.info("All replicates accounted for in the layout data.")
104 else:
105 logging.warning(
106 f"There may be an error with layout data: {replicates}. Please check the layout data."
107 )
109 # save the experiment info to a json file in the experiment directory
110 info_path = Path(experiment_name) / "experiment_info.json"
111 info_path.parent.mkdir(parents=True, exist_ok=True)
112 # put a .gitignore file in the experiment directory to ignore all files
113 with open(info_path.parent / ".gitignore", "w") as f:
114 f.write("*\n")
115 # save the experiment info to a json file
116 with open(info_path, "w") as f:
117 # exp_dict = {k: v.model_dump() for k, v in experiment_info.items()}
118 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4)
119 logging.info(f"Experiment info saved to {info_path}")
120 return experiment_info
123def get_unique_conditions(
124 layout_df: pd.DataFrame,
125 experiment_name: str,
126 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"),
127) -> dict[str, UniqueCondition]:
128 """
129 Parse layout data to extract unique experimental conditions and their replicates.
131 This version uses the robust parser module which handles underscores in component names.
133 Args:
134 layout_df: DataFrame with layout information. Must have a 'Well' column and
135 condition columns (any column not starting with 'well'/'Well')
136 experiment_name: Name of the experiment (used to create output directory)
137 fields: Ordered tuple of field names to parse from condition strings.
138 Default: ("concentration", "ligand", "protein", "buffer")
139 The last len(fields) components of each condition string will be parsed in this order.
141 Returns:
142 Dictionary mapping condition strings to UniqueCondition objects
144 Examples:
145 >>> layout_df = pd.read_csv("layout.csv")
146 >>> # Default parsing
147 >>> conditions = get_unique_conditions(layout_df, "exp1")
148 >>> # Custom field order
149 >>> conditions = get_unique_conditions(layout_df, "exp1",
150 ... fields=("ligand", "protein", "concentration", "buffer"))
151 """
152 experiment_info = defaultdict(UniqueCondition)
153 replicates = set()
155 # Collect all unique conditions first
156 for col in layout_df.columns:
157 if col.startswith("well") or col.startswith("Well"):
158 continue
159 replicates.update(layout_df[col].unique())
161 # Parse each condition in the layout
162 for index, row in layout_df.iterrows():
163 for col in layout_df.columns:
164 if col.startswith("well") or col.startswith("Well"):
165 continue
166 condition_str = row[col]
168 # Skip empty or placeholder conditions
169 if pd.isna(condition_str) or condition_str == "" or condition_str == "0_0_0_0":
170 continue
172 # Use the new parser to extract components
173 try:
174 condition_obj = condition_from_string(
175 condition_str,
176 fields=fields,
177 include_replicates=False # We'll add replicates manually
178 )
179 except ValueError as e:
180 logging.warning(
181 f"Failed to parse condition '{condition_str}' in row {index}, column {col}: {e}"
182 )
183 continue
185 # Create a replicate for this well
186 replicate = Replicate(
187 well_row=row["Well"],
188 well_column=str(col),
189 well_name=row["Well"] + str(col),
190 )
192 # Get the full condition name
193 full_name = condition_obj.full_name
195 # Add to experiment info
196 if full_name in replicates:
197 experiment_info[full_name] = condition_obj
198 replicates.remove(full_name)
199 experiment_info[full_name].replicates.append(replicate)
200 else:
201 experiment_info[full_name].replicates.append(replicate)
203 # Check for conditions with only one replicate
204 for condition, info in experiment_info.items():
205 if len(info.replicates) == 1:
206 logging.warning(f"Condition {condition} has only one replicate. Check for typos!")
208 # Check if all replicates are accounted for
209 if len(replicates) == 0 or (len(replicates) == 1 and "0_0_0_0" in replicates):
210 logging.info("All replicates accounted for in the layout data.")
211 else:
212 logging.warning(
213 f"There may be an error with layout data: {replicates}. Please check the layout data."
214 )
216 # Save experiment info to JSON
217 info_path = Path(experiment_name) / "experiment_info.json"
218 info_path.parent.mkdir(parents=True, exist_ok=True)
220 # Create .gitignore in experiment directory
221 with open(info_path.parent / ".gitignore", "w") as f:
222 f.write("*\n")
224 # Save the experiment info
225 with open(info_path, "w") as f:
226 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4)
228 logging.info(f"Experiment info saved to {info_path}")
229 return experiment_info
232def initial_raw_data_organize(
233 initial_raw_data: pd.DataFrame,
234 experiment_info: dict[str, UniqueCondition],
235) -> pd.DataFrame:
236 """
237 Organizes the raw data based on the layout data.
238 """
239 # Create a new DataFrame to hold the organized data
240 raw_data_long = initial_raw_data.melt(
241 id_vars=["Temperature"], var_name="well", value_name="value"
242 )
243 for condition, info in experiment_info.items():
244 ligand = info.ligand_name
245 protein = info.protein_name
246 buffer = info.buffer_condition
247 concentration = info.concentration
249 replicate_wells = [rep.well_name for rep in info.replicates]
251 # create a mask for the rows that have a well that is in replicate_wells
252 mask = raw_data_long["well"].isin(replicate_wells)
254 # add the columns to the raw data long
255 raw_data_long.loc[mask, "ligand"] = ligand
256 raw_data_long.loc[mask, "protein"] = protein
257 raw_data_long.loc[mask, "buffer"] = buffer
258 raw_data_long.loc[mask, "concentration"] = concentration
260 raw_data_long["well_unqcond"] = (
261 raw_data_long["well"]
262 + "_"
263 + raw_data_long["concentration"]
264 + "_"
265 + raw_data_long["ligand"]
266 + "_"
267 + raw_data_long["protein"]
268 + "_"
269 + raw_data_long["buffer"]
270 )
271 return raw_data_long
274def first_step(
275 raw_data_path: FilePath,
276 layout_data_path: FilePath,
277 experiment_name: str = "experiment_1",
278 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"),
279) -> None:
280 """
281 The first step of the data processing pipeline.
282 It reads the raw data and layout data, gets the unique conditions,
283 and organizes the raw data based on the layout data.
285 Args:
286 raw_data_path: Path to the raw data CSV file
287 layout_data_path: Path to the layout CSV file
288 experiment_name: Name of the experiment (creates a directory with this name)
289 fields: Ordered tuple of field names to parse from condition strings.
290 Default: ("concentration", "ligand", "protein", "buffer")
291 The last len(fields) components will be parsed in this order.
293 Examples:
294 >>> # Default field order
295 >>> first_step("raw.csv", "layout.csv", "exp1")
296 >>> # Custom field order
297 >>> first_step("raw.csv", "layout.csv", "exp1",
298 ... fields=("ligand", "protein", "concentration", "buffer"))
299 """
300 # ensure that the input files are valid paths
302 # Read the initial raw data
303 try:
304 initial_raw_data = pd.read_csv(raw_data_path)
305 layout_data = pd.read_csv(layout_data_path)
306 except FileNotFoundError as e:
307 raise FileNotFoundError(f"File not found: {e.filename}. Please check the file path.")
309 # create the experiment directory
310 experiment_dir = Path(experiment_name)
311 experiment_dir.mkdir(parents=True, exist_ok=True)
312 with open(experiment_dir / ".gitignore", "w") as f:
313 f.write("*\n")
315 # Get unique conditions from the layout data
316 experiment_info = get_unique_conditions(layout_data, experiment_name, fields)
318 # Organize the raw data based on the layout data
319 raw_organized_data = initial_raw_data_organize(initial_raw_data, experiment_info)
321 # write the organized data to a csv file in the experiment directory
322 organized_data_path = experiment_dir / "raw_organized_data.csv"
323 raw_organized_data.to_csv(organized_data_path, index=False)
324 logging.info(f"Organized data saved to {organized_data_path}")
327def create_figures_generator(experiment_name: str):
328 """
329 Generates Plotly figures from grouped data.
331 This function groups the input DataFrame by 'ligand', 'protein', and
332 'buffer', then yields a line plot figure for each group.
334 Args:
335 data_df: A pandas DataFrame containing the data to plot.
336 It must include 'ligand', 'protein', 'buffer',
337 'Temperature', 'value', and 'well_unqcond' columns.
339 Yields:
340 A Plotly figure object for each group.
341 """
342 # Load the organized data
343 organized_data_path = Path(experiment_name) / "raw_organized_data.csv"
344 try:
345 data_df = pd.read_csv(organized_data_path)
346 except FileNotFoundError:
347 raise FileNotFoundError(f"Organized data file not found: {organized_data_path}")
349 # ensure the necessary columns are present
350 required_columns = [
351 "Temperature",
352 "well",
353 "value",
354 "ligand",
355 "protein",
356 "buffer",
357 "concentration",
358 "well_unqcond",
359 ]
360 for col in required_columns:
361 if col not in data_df.columns:
362 raise ValueError(f"Missing required column: {col} in the data.")
363 grouped_data = data_df.groupby(["ligand", "protein", "buffer"])
365 for (ligand, protein, buffer), group in grouped_data:
366 fig = px.line(
367 group,
368 x="Temperature",
369 y="value",
370 color="well_unqcond",
371 title=f"Raw Data for {ligand} and {protein} in {buffer} Buffer",
372 )
373 yield fig
376def filter_organized_data(
377 experiment_name: str,
378 wells_to_filter: list[str],
379) -> None:
380 """
381 Filters the organized data based on the provided parameters.
382 """
383 organized_data_path = Path(experiment_name) / "raw_organized_data.csv"
384 try:
385 organized_data = pd.read_csv(organized_data_path)
386 except FileNotFoundError:
387 raise FileNotFoundError(f"Organized data file not found: {organized_data_path}")
388 filtered_data_path = Path(experiment_name) / "filtered_organized_data.csv"
389 required_columns = [
390 "Temperature",
391 "well",
392 "value",
393 "ligand",
394 "protein",
395 "buffer",
396 "concentration",
397 "well_unqcond",
398 ]
399 for col in required_columns:
400 if col not in organized_data.columns:
401 raise ValueError(f"Missing required column: {col} in the data.")
402 if len(wells_to_filter) == 0:
403 logging.info("No wells to filter. Returning the original data.")
404 # save the organized data to a csv file in the experiment directory
405 organized_data.to_csv(filtered_data_path, index=False)
406 logging.info(f"Filtered data saved to {filtered_data_path}")
407 # save a .txt file with the filtered wells
408 with open(Path(experiment_name) / "filtered_wells.txt", "w") as f:
409 f.write("No wells filtered.")
410 return
412 # first check if each well in wells is in the organized_data
413 for well in wells_to_filter:
414 if well not in organized_data["well"].unique():
415 raise ValueError(
416 f"Well {well} not found in organized data. Please check the well names."
417 )
418 # Filter the organized data to remove the specified wells
419 for well in wells_to_filter:
420 organized_data = organized_data[organized_data["well"] != well]
421 logging.info(f"Filtered out well: {well}")
423 # save the filtered data to a csv file in the experiment directory
424 organized_data.to_csv(filtered_data_path, index=False)
425 logging.info(f"Filtered data saved to {filtered_data_path}")
427 # save a .txt file with the filtered wells
428 with open(Path(experiment_name) / "filtered_wells.txt", "w") as f:
429 f.write("\n".join(wells_to_filter))
432def split_unqcon_column_old(data: pd.DataFrame) -> pd.DataFrame:
433 """
434 OLD VERSION - Kept for reference. Uses manual string splitting.
435 This version assumes exactly 4 underscore-separated components.
436 Use split_unqcon_column() instead.
437 """
438 parts = data["unqcond"].str.split("_", expand=True)
439 data["concentration"] = parts[0]
440 data["ligand"] = parts[1]
441 data["protein"] = parts[2]
442 data["buffer"] = parts[3]
443 return data
446def split_unqcon_column(
447 data: pd.DataFrame,
448 fields: tuple[str, ...] = ("concentration", "ligand", "protein", "buffer"),
449) -> pd.DataFrame:
450 """
451 Split the 'unqcond' column into its component parts using the robust parser.
453 This version handles underscores in component names correctly.
455 Args:
456 data: DataFrame with an 'unqcond' column containing condition strings
457 fields: Ordered tuple of field names to parse from condition strings.
458 Default: ("concentration", "ligand", "protein", "buffer")
459 The last len(fields) components of each condition string will be parsed in this order.
461 Returns:
462 DataFrame with added columns corresponding to the field names
464 Examples:
465 >>> df = pd.DataFrame({"unqcond": ["500uM_ATP_Fic_buffer1"]})
466 >>> # Default parsing
467 >>> df = split_unqcon_column(df)
468 >>> print(df[["concentration", "ligand", "protein", "buffer"]])
470 >>> # Custom field order
471 >>> df = split_unqcon_column(df, fields=("ligand", "protein", "concentration", "buffer"))
472 """
473 # Parse each condition string
474 parsed_conditions = []
475 for condition_str in data["unqcond"]:
476 try:
477 condition_obj = condition_from_string(
478 condition_str,
479 fields=fields,
480 include_replicates=False
481 )
482 # Map the parsed fields to their values
483 parsed_conditions.append({
484 "concentration": condition_obj.concentration,
485 "ligand": condition_obj.ligand_name,
486 "protein": condition_obj.protein_name,
487 "buffer": condition_obj.buffer_condition,
488 })
489 except ValueError as e:
490 # If parsing fails, use empty strings for all fields
491 logging.warning(f"Failed to parse condition '{condition_str}': {e}")
492 parsed_conditions.append({
493 "concentration": "",
494 "ligand": "",
495 "protein": "",
496 "buffer": "",
497 })
499 # Add the parsed columns to the dataframe
500 parsed_df = pd.DataFrame(parsed_conditions)
501 for field in ["concentration", "ligand", "protein", "buffer"]:
502 data[field] = parsed_df[field]
504 return data
507def avg_across_replicates(
508 organized_data: pd.DataFrame,
509) -> pd.DataFrame:
510 """
511 Averages the data across replicates.
512 """
513 # Group by the unique condition and temperature, then average the values
514 organized_data["unqcond"] = (
515 organized_data["concentration"]
516 + "_"
517 + organized_data["ligand"]
518 + "_"
519 + organized_data["protein"]
520 + "_"
521 + organized_data["buffer"]
522 )
523 # Drop well_unqcond as it is not needed for averaging
524 # organized_data = organized_data.drop(columns=["well_unqcond"])
525 # Add a column for the unique condition
526 # averaged_data = raw_data_long.groupby(['Temperature', 'combination2']).agg({'value': 'mean'}).reset_index()
528 averaged_data = (
529 organized_data.groupby(["Temperature", "unqcond"]).agg({"value": "mean"}).reset_index()
530 )
532 averaged_data_pivot = averaged_data.pivot(
533 index="Temperature", columns="unqcond", values="value"
534 )
536 # averaged_data_pivot = split_unqcon_column(averaged_data_pivot)
538 return averaged_data_pivot
541def convert_concentration_to_float(concentration: str) -> float:
542 if "uM" in concentration:
543 return float(concentration.replace("uM", "").strip())
544 elif "mM" in concentration:
545 return float(concentration.replace("mM", "").strip()) * 1000 # Convert mM to uM
546 elif "nM" in concentration:
547 # check if it is zero
548 c = concentration.replace("nM", "").strip()
549 if c == "0":
550 return float(c)
551 return float(c) / 1000
552 else:
553 return float(concentration.strip())
556def average_accross_replicates(experiment_name: str) -> None:
557 """
558 The second step of the data processing pipeline.
559 It filters the organized data based on the provided parameters.
560 """
561 filtered_data_path = Path(experiment_name) / "filtered_organized_data.csv"
562 if not filtered_data_path.exists():
563 raise FileNotFoundError(f"Filtered data file not found: {filtered_data_path}")
564 # Load the filtered data
565 filtered_data = pd.read_csv(filtered_data_path)
566 required_columns = [
567 "Temperature",
568 "well",
569 "value",
570 "ligand",
571 "protein",
572 "buffer",
573 "concentration",
574 "well_unqcond",
575 ]
576 for col in required_columns:
577 if col not in filtered_data.columns:
578 raise ValueError(f"Missing required column: {col} in the data.")
580 averaged_data = avg_across_replicates(organized_data=filtered_data)
582 # in the averaged across replicates data, we need to sort the columns (except for Temperature) by matching ligand, protein, and buffer
583 # Sort the columns based on ligand, protein, and buffer. then within each group, sort by increasing concentration
584 # get the columns except for Temperature
585 columns_to_sort = averaged_data.columns[1:] # Exclude 'Temperature'
586 sorted_columns = sorted(
587 columns_to_sort,
588 key=lambda x: (
589 x.split("_")[1], # ligand
590 x.split("_")[2], # protein
591 x.split("_")[3], # buffer
592 convert_concentration_to_float(
593 x.split("_")[0]
594 ), # concentration, convert to float for sorting
595 ),
596 )
597 # print(f"Sorted columns: {sorted_columns}")
598 # Reorder the columns in the DataFrame
599 averaged_data_sorted = averaged_data[sorted_columns]
600 # Reset the index to make Temperature a column again
601 averaged_data_sorted.reset_index(inplace=True)
602 averaged_data.reset_index(inplace=True)
603 # Add the Temperature column back to the front
604 # averaged_data_sorted.insert(0, "Temperature", averaged_data["Temperature"])
606 # Save the averaged data to a CSV file in the experiment directory
607 averaged_data_path = Path(experiment_name) / "averaged_data.csv"
608 averaged_data_sorted.to_csv(averaged_data_path, index=False)
609 logging.info(f"Averaged data saved to {averaged_data_path}")
612def create_averaged_figures_generator(experiment_name: str):
613 """
614 Generates Plotly figures from grouped data.
616 This function groups the input DataFrame by 'ligand', 'protein', and
617 'buffer', then yields a line plot figure for each group.
619 Args:
620 data_df: A pandas DataFrame containing the data to plot.
621 It must include 'ligand', 'protein', 'buffer',
622 'Temperature', 'value', and 'well_unqcond' columns.
624 Yields:
625 A Plotly figure object for each group.
626 """
627 # Load the organized data
628 averaged_data_path = Path(experiment_name) / "averaged_data.csv"
629 try:
630 data_df = pd.read_csv(averaged_data_path)
631 except FileNotFoundError:
632 raise FileNotFoundError(f"Averaged data file not found: {averaged_data_path}")
634 # ensire the first column is 'Temperature'
635 if data_df.columns[0] != "Temperature":
636 raise ValueError("The first column must be 'Temperature'.")
638 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value")
639 # ensure the necessary columns are present
640 plotting_df = split_unqcon_column(plotting_df)
641 required_columns = [
642 "Temperature",
643 "unqcond",
644 "value",
645 "concentration",
646 "ligand",
647 "protein",
648 "buffer",
649 ]
651 for col in required_columns:
652 if col not in plotting_df.columns:
653 raise ValueError(f"Missing required column: {col} in the data.")
654 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"])
656 for (ligand, protein, buffer), group in grouped_data:
657 fig = px.line(
658 group,
659 x="Temperature",
660 y="value",
661 color="unqcond",
662 title=f"Avg Raw Data for {ligand} and {protein} in {buffer} Buffer",
663 )
664 yield fig
667def find_background_column(
668 data: pd.DataFrame, concentration: str, ligand: str, protein: str, buffer: str
669) -> Optional[str]:
670 """Helper function to find the background column in the data."""
671 if protein == "NPC":
672 return None # Dont remove background for NPC
673 for col in data.columns:
674 if f"{concentration}_{ligand}_NPC_{buffer}" in col:
675 return col
676 return None
679def subtract_background(experiment_name: str) -> None:
680 """Finds the background column for each unique condition and subtracts it from the data. The BG col should be in the format 'concentration_ligand_NPC_buffer'."""
681 # Build the path to the averaged data
682 averaged_data_path = Path(experiment_name) / "averaged_data.csv"
683 if not averaged_data_path.exists():
684 raise FileNotFoundError(f"Averaged data file not found: {averaged_data_path}")
685 # Load the averaged data
686 data = pd.read_csv(averaged_data_path)
688 # create a dictionary of the column names
689 columns_dict = {}
690 for col in data.columns:
691 if col == "Temperature":
692 continue
693 columns_dict[col] = False
695 for col in data.columns:
696 parts = col.split("_")
697 if len(parts) < 4:
698 continue
699 concentration = parts[0]
700 ligand = parts[1]
701 protein = parts[2]
702 buffer = parts[3]
704 background_col = find_background_column(data, concentration, ligand, protein, buffer)
706 if background_col and background_col in data.columns:
707 data[col] = data[col] - data[background_col]
708 columns_dict[background_col] = True
709 columns_dict[col] = True
710 logging.info(f"Background subtracted for {col} using {background_col} as background.")
712 # ensure that all columns have been marked as True
713 for col, marked in columns_dict.items():
714 if not marked:
715 logging.warning(
716 f"Warning: Column {col} was not processed for background subtraction. Check if the background column exists."
717 )
719 # Remove the background columns
720 data = data.loc[:, ~data.columns.str.contains("NPC")]
722 # Save the data with background subtracted
723 background_subtracted_path = Path(experiment_name) / "background_subtracted_data.csv"
724 data.to_csv(background_subtracted_path, index=False)
725 logging.info(f"Background subtracted data saved to {background_subtracted_path}")
728def create_bgsubtracted_figures_generator(experiment_name: str):
729 """
730 Generates Plotly figures from grouped data.
732 This function groups the input DataFrame by 'ligand', 'protein', and
733 'buffer', then yields a line plot figure for each group.
735 Args:
736 data_df: A pandas DataFrame containing the data to plot.
737 It must include 'ligand', 'protein', 'buffer',
738 'Temperature', 'value', and 'well_unqcond' columns.
740 Yields:
741 A Plotly figure object for each group.
742 """
743 # Load the organized data
744 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv"
745 try:
746 data_df = pd.read_csv(bg_data_path)
747 except FileNotFoundError:
748 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}")
750 # ensire the first column is 'Temperature'
751 if data_df.columns[0] != "Temperature":
752 raise ValueError("The first column must be 'Temperature'.")
754 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value")
755 # ensure the necessary columns are present
756 plotting_df = split_unqcon_column(plotting_df)
757 required_columns = [
758 "Temperature",
759 "unqcond",
760 "value",
761 "concentration",
762 "ligand",
763 "protein",
764 "buffer",
765 ]
767 for col in required_columns:
768 if col not in plotting_df.columns:
769 raise ValueError(f"Missing required column: {col} in the data.")
770 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"])
772 for (ligand, protein, buffer), group in grouped_data:
773 fig = px.line(
774 group,
775 x="Temperature",
776 y="value",
777 color="unqcond",
778 title=f"BG subtracted Data for {ligand} and {protein} in {buffer} Buffer",
779 )
780 yield fig
783def min_max_scale(experiment_name: str) -> None:
784 """Min-max scales the background subtracted data."""
785 # Build the path to the background subtracted data
786 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv"
787 if not bg_data_path.exists():
788 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}")
789 # Load the background subtracted data
790 data = pd.read_csv(bg_data_path)
791 # ensure the first column is 'Temperature'
792 if data.columns[0] != "Temperature":
793 raise ValueError("The first column must be 'Temperature'.")
794 for col in data.columns:
795 if col.startswith("Temperature"):
796 continue
797 if data[col].max() - data[col].min() == 0:
798 continue # Avoid division by zero
799 data[col] = (data[col] - data[col].min()) / (data[col].max() - data[col].min())
800 # Save the min-max scaled data
801 scaled_data_path = Path(experiment_name) / "min_max_scaled_data.csv"
802 data.to_csv(scaled_data_path, index=False)
803 logging.info(f"Min-max scaled data saved to {scaled_data_path}")
806def create_bgsub_minmax_figures_generator(experiment_name: str):
807 """
808 Generates Plotly figures from grouped data.
810 This function groups the input DataFrame by 'ligand', 'protein', and
811 'buffer', then yields a line plot figure for each group.
813 Args:
814 data_df: A pandas DataFrame containing the data to plot.
815 It must include 'ligand', 'protein', 'buffer',
816 'Temperature', 'value', and 'well_unqcond' columns.
818 Yields:
819 A Plotly figure object for each group.
820 """
821 # Load the organized data
822 bg_min_max_data_path = Path(experiment_name) / "min_max_scaled_data.csv"
823 try:
824 data_df = pd.read_csv(bg_min_max_data_path)
825 except FileNotFoundError:
826 raise FileNotFoundError(f"BG subtracted data file not found: {bg_min_max_data_path}")
828 # ensire the first column is 'Temperature'
829 if data_df.columns[0] != "Temperature":
830 raise ValueError("The first column must be 'Temperature'.")
832 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value")
833 # ensure the necessary columns are present
834 plotting_df = split_unqcon_column(plotting_df)
835 required_columns = [
836 "Temperature",
837 "unqcond",
838 "value",
839 "concentration",
840 "ligand",
841 "protein",
842 "buffer",
843 ]
845 for col in required_columns:
846 if col not in plotting_df.columns:
847 raise ValueError(f"Missing required column: {col} in the data.")
848 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"])
850 for (ligand, protein, buffer), group in grouped_data:
851 fig = px.line(
852 group,
853 x="Temperature",
854 y="value",
855 color="unqcond",
856 title=f"BG subtracted min-max Data for {ligand} and {protein} in {buffer} Buffer",
857 )
858 yield fig
861def calculate_derivative(experiment_name: str) -> None:
862 """Calclulates the derivative of the background subtracted data with respect to Temperature. And Min-Max scales the data afterwards."""
863 # build a path the the bg subtracted data
864 bg_data_path = Path(experiment_name) / "background_subtracted_data.csv"
865 if not bg_data_path.exists():
866 raise FileNotFoundError(f"BG subtracted data file not found: {bg_data_path}")
867 # Load the background subtracted data
868 data = pd.read_csv(bg_data_path)
869 # ensure the first column is 'Temperature'
870 if data.columns[0] != "Temperature":
871 raise ValueError("The first column must be 'Temperature'.")
873 # Calculate the derivative with respect to Temperature
874 data_derivative = data.copy()
875 for col in data.columns[1:]: # Skip the first column (Temperature)
876 data_derivative[col] = np.gradient(data[col], data["Temperature"])
877 # multiply by -1 to flip the sign
878 data_derivative[col] *= -1
880 for col in data_derivative.columns:
881 if col.startswith("Temperature"):
882 continue
883 if data_derivative[col].max() - data_derivative[col].min() == 0:
884 continue # Avoid division by zero
885 data_derivative[col] = (data_derivative[col] - data_derivative[col].min()) / (
886 data_derivative[col].max() - data_derivative[col].min()
887 )
889 # Save the derivative data
890 derivative_data_path = Path(experiment_name) / "derivative_data.csv"
891 data_derivative.to_csv(derivative_data_path, index=False)
892 logging.info(f"Derivative data saved to {derivative_data_path}")
895def create_derivative_figures_generator(experiment_name: str):
896 """
897 Generates Plotly figures from grouped data.
899 This function groups the input DataFrame by 'ligand', 'protein', and
900 'buffer', then yields a line plot figure for each group.
902 Args:
903 data_df: A pandas DataFrame containing the data to plot.
904 It must include 'ligand', 'protein', 'buffer',
905 'Temperature', 'value', and 'well_unqcond' columns.
907 Yields:
908 A Plotly figure object for each group.
909 """
910 # Load the organized data
911 derivative_data_path = Path(experiment_name) / "derivative_data.csv"
912 try:
913 data_df = pd.read_csv(derivative_data_path)
914 except FileNotFoundError:
915 raise FileNotFoundError(f"BG subtracted data file not found: {derivative_data_path}")
917 # ensire the first column is 'Temperature'
918 if data_df.columns[0] != "Temperature":
919 raise ValueError("The first column must be 'Temperature'.")
921 plotting_df = data_df.melt(id_vars=["Temperature"], var_name="unqcond", value_name="value")
922 # ensure the necessary columns are present
923 plotting_df = split_unqcon_column(plotting_df)
924 required_columns = [
925 "Temperature",
926 "unqcond",
927 "value",
928 "concentration",
929 "ligand",
930 "protein",
931 "buffer",
932 ]
934 for col in required_columns:
935 if col not in plotting_df.columns:
936 raise ValueError(f"Missing required column: {col} in the data.")
937 grouped_data = plotting_df.groupby(["ligand", "protein", "buffer"])
939 for (ligand, protein, buffer), group in grouped_data:
940 fig = px.line(
941 group,
942 x="Temperature",
943 y="value",
944 color="unqcond",
945 title=f"Derivative Data for {ligand} and {protein} in {buffer} Buffer",
946 )
947 yield fig
950def find_min_temperature(experiment_name: str) -> None:
951 """Finds the minimum temperature for each unique condition in the derivative data."""
952 # Build the path to the derivative data
953 derivative_data_path = Path(experiment_name) / "derivative_data.csv"
954 if not derivative_data_path.exists():
955 raise FileNotFoundError(f"Derivative data file not found: {derivative_data_path}")
956 # Load the derivative data
957 data = pd.read_csv(derivative_data_path)
958 # ensure the first column is 'Temperature'
959 if data.columns[0] != "Temperature":
960 raise ValueError("The first column must be 'Temperature'.")
961 min_temps = {}
962 for col in data.columns: # Skip the first column (Temperature)
963 if col == "Temperature" or col == "index":
964 continue
965 min_index = data[col].idxmin()
966 min_temp = data["Temperature"].iloc[min_index]
967 min_temps[col] = min_temp
968 # need to convert the min_temps dictionary to a DataFrame
969 min_temps_df = pd.DataFrame(list(min_temps.items()), columns=["unqcond", "min_temperature"])
970 # Save the min temperatures to a CSV file
971 min_temps_df = split_unqcon_column(min_temps_df)
972 min_temps_path = Path(experiment_name) / "min_temperatures.csv"
973 min_temps_df.to_csv(min_temps_path, index=False)
974 logging.info(f"Min temperatures saved to {min_temps_path}")
977def create_mintemp_figures_generator(experiment_name: str):
978 """
979 Generates Plotly figures from grouped data.
981 This function groups the input DataFrame by 'ligand', 'protein', and
982 'buffer', then yields a line plot figure for each group.
984 Args:
985 data_df: A pandas DataFrame containing the data to plot.
986 It must include 'ligand', 'protein', 'buffer',
987 'Temperature', 'value', and 'well_unqcond' columns.
989 Yields:
990 A Plotly figure object for each group.
991 """
992 # Load the organized data
993 min_temperatures_data_path = Path(experiment_name) / "min_temperatures.csv"
994 try:
995 data_df = pd.read_csv(min_temperatures_data_path)
996 except FileNotFoundError:
997 raise FileNotFoundError(f"BG subtracted data file not found: {min_temperatures_data_path}")
999 # ensure the necessary columns are present
1000 required_columns = [
1001 "unqcond",
1002 "min_temperature",
1003 "concentration",
1004 "ligand",
1005 "protein",
1006 "buffer",
1007 ]
1009 for col in required_columns:
1010 if col not in data_df.columns:
1011 raise ValueError(f"Missing required column: {col} in the data.")
1013 data_df["concentration2"] = data_df["concentration"].apply(convert_concentration_to_float)
1014 grouped_data = data_df.groupby(["ligand", "protein", "buffer"])
1016 for (ligand, protein, buffer), group in grouped_data:
1017 fig = px.scatter(
1018 group,
1019 x="concentration2",
1020 y="min_temperature",
1021 color="ligand",
1022 title=f"Min Temperature for {ligand} and {protein} in {buffer} Buffer",
1023 )
1024 yield fig