Coverage for src/instawell/processing/step_01_ingest_data.py: 94%
86 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 15:47 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 15:47 -0600
1import json
2import logging
3from collections import defaultdict
5import pandas as pd
7from instawell.core.data_models import Replicate, UniqueCondition
8from instawell.core.exp_context import ExperimentContext
9from instawell.core.parser import condition_from_string
10from instawell.core.steps import StepFiles
11from instawell.utils.logging_util import setup_experiment_logging
13logger = logging.getLogger(__name__)
16def _parse_layout(layout_df: pd.DataFrame, replicates: set[str]) -> None:
17 """
18 Modifies replicates in place
19 """
20 for col in layout_df.columns:
21 if col.startswith("well") or col.startswith("Well"):
22 continue
23 replicates.update(layout_df[col].unique())
26def _parse_conditions(
27 layout_df: pd.DataFrame,
28 replicates: set[str],
29 experiment_info: dict[str, UniqueCondition],
30 ctx: ExperimentContext,
31) -> None:
32 """
33 Extract unique conditions from the layout DataFrame.
34 Currently a placeholder for any future condition extraction steps.
35 """
36 _well_identifier = ctx.well_col_identifier.strip().lower()
37 for index, row in layout_df.iterrows():
38 for col in layout_df.columns:
39 if col.lower().startswith(_well_identifier):
40 continue
41 condition_str = row[col]
43 # Skip empty or placeholder conditions
44 if condition_str in {ctx.empty_condition_mask, ""}:
45 continue
46 if pd.isna(condition_str):
47 continue
48 # Use the new parser to extract components
49 try:
50 condition_obj = condition_from_string(
51 condition_str,
52 delimiter=ctx.condition_separator,
53 fields=ctx.fields,
54 include_replicates=False, # We'll add replicates manually
55 )
56 except ValueError as e:
57 logger.warning(
58 f"Failed to parse condition '{condition_str}' in row {index}, column {col}: {e}"
59 )
60 continue
62 # Create a replicate for this well
63 replicate = Replicate(
64 well_row=row[ctx.well_col_identifier],
65 well_column=str(col),
66 well_name=row[ctx.well_col_identifier] + str(col),
67 )
69 # Get the full condition name
70 full_name = condition_obj.full_name
72 # Add to experiment info
73 if full_name in replicates:
74 experiment_info[full_name] = condition_obj
75 replicates.remove(full_name)
76 experiment_info[full_name].replicates.append(replicate)
77 else:
78 experiment_info[full_name].replicates.append(replicate)
80 # Check for conditions with only one replicate
81 for condition, info in experiment_info.items():
82 if len(info.replicates) == 1:
83 logger.warning(f"Condition {condition} has only one replicate. Check for typos!")
85 # Check if all replicates are accounted for
86 if len(replicates) == 0 or (
87 len(replicates) == 1 and ctx.empty_condition_placeholder in replicates
88 ):
89 logger.info("All replicates accounted for in the layout data.")
90 else:
91 logger.warning(
92 f"There may be an error with layout data: {replicates}. Please check the layout data."
93 )
96def _save_experiment_info(
97 experiment_info: dict[str, UniqueCondition], ctx: ExperimentContext
98) -> None:
99 # Save experiment info to JSON
100 info_path = ctx.experiment_dir / "experiment_info.json"
101 info_path.parent.mkdir(parents=True, exist_ok=True)
103 # Create .gitignore in experiment directory
104 with open(info_path.parent / ".gitignore", "w") as f:
105 f.write("*\n")
107 # Save the experiment info
108 with open(info_path, "w") as f:
109 json.dump({k: v.model_dump() for k, v in experiment_info.items()}, f, indent=4)
112def _get_unique_conditions(
113 layout_df: pd.DataFrame,
114 ctx: ExperimentContext,
115) -> dict[str, UniqueCondition]:
116 experiment_info = defaultdict(UniqueCondition)
117 replicates: set[str] = set()
118 _parse_layout(layout_df, replicates)
119 _parse_conditions(layout_df, replicates, experiment_info, ctx)
120 _save_experiment_info(experiment_info, ctx)
121 return experiment_info
124def _initial_raw_data_organize(
125 initial_raw_data: pd.DataFrame,
126 experiment_info: dict[str, UniqueCondition],
127 ctx: ExperimentContext,
128) -> pd.DataFrame:
129 """
130 Organizes the raw data based on the layout data.
131 """
132 # Create a new DataFrame to hold the organized data, hardcode Temperature as the temperature column
133 raw_data_long = initial_raw_data.melt(
134 id_vars=[ctx.temperature_column], var_name="well", value_name="value"
135 )
136 if ctx.temperature_column != "Temperature":
137 raw_data_long = raw_data_long.rename(columns={ctx.temperature_column: "Temperature"})
139 # Initialize field columns to ensure they exist (regardless of whether loop runs)
140 # raw_data_long["ligand"] = ""
141 # raw_data_long["protein"] = ""
142 # raw_data_long["buffer"] = ""
143 # raw_data_long["concentration"] = ""
145 sep = ctx.condition_separator
146 for _, info in experiment_info.items():
147 ligand = info.ligand_name
148 protein = info.protein_name
149 buffer = info.buffer_condition
150 concentration = info.concentration
152 replicate_wells = [rep.well_name for rep in info.replicates]
154 # create a mask for the rows that have a well that is in replicate_wells
155 mask = raw_data_long["well"].isin(replicate_wells)
157 # add the columns to the raw data long
159 raw_data_long.loc[mask, "ligand"] = ligand
160 raw_data_long.loc[mask, "protein"] = protein
161 raw_data_long.loc[mask, "buffer"] = buffer
162 raw_data_long.loc[mask, "concentration"] = concentration
163 raw_data_long["well_unqcond"] = (
164 raw_data_long["well"]
165 + sep
166 + raw_data_long["concentration"]
167 + sep
168 + raw_data_long["ligand"]
169 + sep
170 + raw_data_long["protein"]
171 + sep
172 + raw_data_long["buffer"]
173 )
174 return raw_data_long
177def ingest_data(
178 ctx: ExperimentContext,
179) -> None:
180 """
181 The first step of the data processing pipeline.
182 It reads the raw data and layout data, gets the unique conditions,
183 and organizes the raw data based on the layout data.
185 Args:
186 ctx: ExperimentContext containing experiment configuration, can be from setup_experiment() or from load_experiment_context()
188 Examples:
189 >>> # Default field order
190 >>> first_step("raw.csv", "layout.csv", "exp1")
191 >>> # Custom field order
192 >>> first_step("raw.csv", "layout.csv", "exp1",
193 ... fields=("ligand", "protein", "concentration", "buffer"))
194 """
196 if ctx.log_to_file:
197 setup_experiment_logging(
198 experiment_dir=ctx.experiment_dir, filename="experiment.log", level=ctx.log_level
199 )
200 initial_raw_data = pd.read_csv(ctx.raw_data_path)
201 layout_data = pd.read_csv(ctx.layout_data_path)
202 # Copy the raw data and layout data to the experiment directory
204 # Get unique conditions from the layout data
205 experiment_info = _get_unique_conditions(layout_data, ctx)
207 # Organize the raw data based on the layout data
208 raw_organized_data = _initial_raw_data_organize(initial_raw_data, experiment_info, ctx)
210 # write the organized data to a csv file in the experiment directory
211 organized_data_path = ctx.experiment_dir / StepFiles.INGESTED_DATA
212 raw_organized_data.to_csv(organized_data_path, index=False)
214 # write to an experiment log file
215 logger.info(f"Organized data saved to {organized_data_path}")