Source code for spacr.core

import os, gc, torch, time, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display
import warnings
from scipy import ndimage
warnings.filterwarnings("ignore", message="3D stack used, but stitch_threshold=0 and do_3D=False, so masks are made per plane only")

[docs] def preprocess_generate_masks(settings): #from .timelapse import _summarise_object_relationships from .object import generate_cellpose_masks, generate_organelle_masks from .io import preprocess_img_data, _load_and_concatenate_arrays, convert_to_yokogawa, convert_separate_files_to_yokogawa from .plot import plot_image_mask_overlay, plot_arrays from .utils import _pivot_counts_table, check_mask_folder, adjust_cell_masks, print_progress, save_settings, delete_intermedeate_files, format_path_for_system, normalize_src_path, generate_image_path_map, copy_images_to_consolidated from .settings import set_default_settings_preprocess_generate_masks, _set_organelle_defaults if 'src' in settings: if not isinstance(settings['src'], (str, list)): ValueError(f'src must be a string or a list of strings') return else: ValueError(f'src is a required parameter') return settings['src'] = normalize_src_path(settings['src']) if settings['consolidate']: image_map = generate_image_path_map(settings['src']) copy_images_to_consolidated(image_map, settings['src']) settings['src'] = os.path.join(settings['src'], 'consolidated') if isinstance(settings['src'], str): settings['src'] = [settings['src']] if isinstance(settings['src'], list): source_folders = settings['src'] for source_folder in source_folders: print(f'Processing folder: {source_folder}') source_folder = format_path_for_system(source_folder) settings['src'] = source_folder src = source_folder settings = set_default_settings_preprocess_generate_masks(settings) settings = _set_organelle_defaults(settings) if settings['metadata_type'] == 'auto': if settings['custom_regex'] != None: try: print(f"using regex: {settings['custom_regex']}") convert_separate_files_to_yokogawa(folder=source_folder, regex=settings['custom_regex']) except: try: convert_to_yokogawa(folder=source_folder) except Exception as e: print(f"Error: Tried to convert image files and image file name metadata with regex {settings['custom_regex']} then without regex but failed both.") print(f'Error: {e}') return else: try: convert_to_yokogawa(folder=source_folder) except Exception as e: print(f"Error: Tried to convert image files and image file name metadata without regex but failed.") print(f'Error: {e}') return if settings['cell_channel'] is None and settings['nucleus_channel'] is None and settings['pathogen_channel'] is None: print(f'Error: At least one of cell_channel, nucleus_channel or pathogen_channel must be defined') return save_settings(settings, name='gen_mask_settings') if not settings['pathogen_channel'] is None: custom_model_ls = ['toxo_pv_lumen','toxo_cyto'] if settings['pathogen_model'] not in custom_model_ls: ValueError(f'Pathogen model must be {custom_model_ls} or None') if settings['timelapse']: settings['randomize'] = False if settings['preprocess']: if not settings['masks']: print(f'WARNING: channels for mask generation are defined when preprocess = True') if isinstance(settings['save'], bool): settings['save'] = [settings['save']]*3 if settings['verbose']: settings_df = pd.DataFrame(list(settings.items()), columns=['setting_key', 'setting_value']) settings_df['setting_value'] = settings_df['setting_value'].apply(str) display(settings_df) if settings['test_mode']: print(f'Starting Test mode ...') if settings['preprocess']: settings, src = preprocess_img_data(settings) files_to_process = 3 files_processed = 0 if settings['masks']: mask_src = os.path.join(src, 'masks') if settings['cell_channel'] != None: time_ls=[] if check_mask_folder(src, 'cell_mask_stack'): start = time.time() generate_cellpose_masks(mask_src, settings, 'cell') stop = time.time() duration = (stop - start) time_ls.append(duration) files_processed += 1 print_progress(files_processed, files_to_process, n_jobs=1, time_ls=time_ls, batch_size=None, operation_type=f'cell_mask_gen') if settings['nucleus_channel'] != None: time_ls=[] if check_mask_folder(src, 'nucleus_mask_stack'): start = time.time() generate_cellpose_masks(mask_src, settings, 'nucleus') stop = time.time() duration = (stop - start) time_ls.append(duration) files_processed += 1 print_progress(files_processed, files_to_process, n_jobs=1, time_ls=time_ls, batch_size=None, operation_type=f'nucleus_mask_gen') if settings['pathogen_channel'] != None: time_ls=[] if check_mask_folder(src, 'pathogen_mask_stack'): start = time.time() generate_cellpose_masks(mask_src, settings, 'pathogen') stop = time.time() duration = (stop - start) time_ls.append(duration) files_processed += 1 print_progress(files_processed, files_to_process, n_jobs=1, time_ls=time_ls, batch_size=None, operation_type=f'pathogen_mask_gen') if settings['organelle'] != None: if check_mask_folder(src, 'organelle_mask_stack'): start = time.time() generate_organelle_masks(mask_src, settings, 'organelle') stop = time.time() duration = (stop - start) time_ls.append(duration) files_processed += 1 print_progress(files_processed, files_to_process, n_jobs=1, time_ls=time_ls, batch_size=None, operation_type=f'organelle_mask_gen') if settings['adjust_cells']: if not settings['timelapse']: if settings['pathogen_channel'] != None and settings['cell_channel'] != None and settings['nucleus_channel'] != None: start = time.time() cell_folder = os.path.join(mask_src, 'cell_mask_stack') nuclei_folder = os.path.join(mask_src, 'nucleus_mask_stack') parasite_folder = os.path.join(mask_src, 'pathogen_mask_stack') organelle_folder = os.path.join(mask_src, 'organelle_mask_stack') print(f'Adjusting cell masks with nuclei and pathogen masks') adjust_cell_masks(parasite_folder, cell_folder, nuclei_folder, organelle_folder, overlap_threshold=5, perimeter_threshold=30, n_jobs=settings['n_jobs']) stop = time.time() adjust_time = (stop-start)/60 print(f'Cell mask adjustment: {adjust_time} min.') if os.path.exists(os.path.join(src,'measurements')): _pivot_counts_table(db_path=os.path.join(src,'measurements', 'measurements.db')) #Concatenate stack with masks _load_and_concatenate_arrays(src, settings.get('channels'), settings.get('cell_channel'), settings.get('nucleus_channel'), settings.get('pathogen_channel'), settings.get('organelle_channel')) if settings['plot']: if not settings['timelapse']: if settings['test_mode'] == True: settings['examples_to_plot'] = len(os.path.join(src,'merged')) try: merged_src = os.path.join(src,'merged') files = os.listdir(merged_src) random.shuffle(files) time_ls = [] for i, file in enumerate(files): start = time.time() if i+1 <= settings['examples_to_plot']: file_path = os.path.join(merged_src, file) plot_image_mask_overlay(file_path, settings['channels'], settings['cell_channel'], settings['nucleus_channel'], settings['pathogen_channel'], figuresize=10, percentiles=(1,99), thickness=3, save_pdf=True) stop = time.time() duration = stop-start time_ls.append(duration) files_processed = i+1 files_to_process = settings['examples_to_plot'] print_progress(files_processed, files_to_process, n_jobs=1, time_ls=time_ls, batch_size=None, operation_type="Plot mask outlines") except Exception as e: print(f'Failed to plot image mask overly. Error: {e}') else: plot_arrays(src=os.path.join(src,'merged'), figuresize=settings['figuresize'], cmap=settings['cmap'], nr=settings['examples_to_plot'], normalize=settings['normalize'], q1=1, q2=99) torch.cuda.empty_cache() gc.collect() if settings['delete_intermediate']: print(f"deleting intermediate files") delete_intermedeate_files(settings) print("Successfully completed run") return
[docs] def generate_image_umap(settings={}, return_fig=False): """ Generate UMAP or tSNE embedding and visualize the data with clustering. Parameters: settings (dict): Dictionary containing the following keys: src (str): Source directory containing the data. row_limit (int): Limit the number of rows to process. tables (list): List of table names to read from the database. visualize (str): Visualization type. image_nr (int): Number of images to display. dot_size (int): Size of dots in the scatter plot. n_neighbors (int): Number of neighbors for UMAP. figuresize (int): Size of the figure. black_background (bool): Whether to use a black background. remove_image_canvas (bool): Whether to remove the image canvas. plot_outlines (bool): Whether to plot outlines. plot_points (bool): Whether to plot points. smooth_lines (bool): Whether to smooth lines. verbose (bool): Whether to print verbose output. embedding_by_controls (bool): Whether to use embedding from controls. col_to_compare (str): Column to compare for control-based embedding. pos (str): Positive control value. neg (str): Negative control value. clustering (str): Clustering method ('DBSCAN' or 'KMeans'). exclude (list): List of columns to exclude from the analysis. plot_images (bool): Whether to plot images. reduction_method (str): Dimensionality reduction method ('UMAP' or 'tSNE'). save_figure (bool): Whether to save the figure as a PDF. Returns: pd.DataFrame: DataFrame with the original data and an additional column 'cluster' containing the cluster identity. """ from .io import _read_and_join_tables from .utils import get_db_paths, preprocess_data, reduction_and_clustering, remove_noise, generate_colors, correct_paths, plot_embedding, plot_clusters_grid, cluster_feature_analysis, map_condition from .settings import set_default_umap_image_settings settings = set_default_umap_image_settings(settings) if isinstance(settings['src'], str): settings['src'] = [settings['src']] if settings['plot_images'] is False: settings['black_background'] = False if settings['color_by']: settings['remove_cluster_noise'] = False settings['plot_outlines'] = False settings['smooth_lines'] = False print(f'Generating Image UMAP ...') settings_df = pd.DataFrame(list(settings.items()), columns=['Key', 'Value']) settings_dir = os.path.join(settings['src'][0],'settings') settings_csv = os.path.join(settings_dir,'embedding_settings.csv') os.makedirs(settings_dir, exist_ok=True) settings_df.to_csv(settings_csv, index=False) display(settings_df) db_paths = get_db_paths(settings['src']) tables = settings['tables'] + ['png_list'] all_df = pd.DataFrame() for i,db_path in enumerate(db_paths): df = _read_and_join_tables(db_path, table_names=tables) df, image_paths_tmp = correct_paths(df, settings['src'][i]) all_df = pd.concat([all_df, df], axis=0) #image_paths.extend(image_paths_tmp) all_df['cond'] = all_df['columnID'].apply(map_condition, neg=settings['neg'], pos=settings['pos'], mix=settings['mix']) if settings['exclude_conditions']: if isinstance(settings['exclude_conditions'], str): settings['exclude_conditions'] = [settings['exclude_conditions']] row_count_before = len(all_df) all_df = all_df[~all_df['cond'].isin(settings['exclude_conditions'])] if settings['verbose']: print(f'Excluded {row_count_before - len(all_df)} rows after excluding: {settings["exclude_conditions"]}, rows left: {len(all_df)}') if settings['row_limit'] is not None: all_df = all_df.sample(n=settings['row_limit'], random_state=42) image_paths = all_df['png_path'].to_list() if settings['embedding_by_controls']: # Extract and reset the index for the column to compare col_to_compare = all_df[settings['col_to_compare']].reset_index(drop=True) print(col_to_compare) #if settings['only_top_features']: # column_list = None # Preprocess the data to obtain numeric data numeric_data = preprocess_data(all_df, settings['filter_by'], settings['remove_highly_correlated'], settings['log_data'], settings['exclude']) # Convert numeric_data back to a DataFrame to align with col_to_compare numeric_data_df = pd.DataFrame(numeric_data) # Ensure numeric_data_df and col_to_compare are properly aligned numeric_data_df = numeric_data_df.reset_index(drop=True) # Assign the column back to numeric_data_df numeric_data_df[settings['col_to_compare']] = col_to_compare # Subset the dataframe based on specified column values for controls positive_control_df = numeric_data_df[numeric_data_df[settings['col_to_compare']] == settings['pos']].copy() negative_control_df = numeric_data_df[numeric_data_df[settings['col_to_compare']] == settings['neg']].copy() control_numeric_data_df = pd.concat([positive_control_df, negative_control_df]) # Drop the comparison column from numeric_data_df and control_numeric_data_df numeric_data_df = numeric_data_df.drop(columns=[settings['col_to_compare']]) control_numeric_data_df = control_numeric_data_df.drop(columns=[settings['col_to_compare']]) # Convert numeric_data_df and control_numeric_data_df back to numpy arrays numeric_data = numeric_data_df.values control_numeric_data = control_numeric_data_df.values # Train the reducer on control data _, _, reducer = reduction_and_clustering(control_numeric_data, settings['n_neighbors'], settings['min_dist'], settings['metric'], settings['eps'], settings['min_samples'], settings['clustering'], settings['reduction_method'], settings['verbose'], n_jobs=settings['n_jobs'], mode='fit', model=False) # Apply the trained reducer to the entire dataset numeric_data = preprocess_data(all_df, settings['filter_by'], settings['remove_highly_correlated'], settings['log_data'], settings['exclude']) embedding, labels, _ = reduction_and_clustering(numeric_data, settings['n_neighbors'], settings['min_dist'], settings['metric'], settings['eps'], settings['min_samples'], settings['clustering'], settings['reduction_method'], settings['verbose'], n_jobs=settings['n_jobs'], mode=None, model=reducer) else: if settings['resnet_features']: # placeholder for resnet features, not implemented yet pass #numeric_data, embedding, labels = generate_umap_from_images(image_paths, settings['n_neighbors'], settings['min_dist'], settings['metric'], settings['clustering'], settings['eps'], settings['min_samples'], settings['n_jobs'], settings['verbose']) else: # Apply the trained reducer to the entire dataset numeric_data = preprocess_data(all_df, settings['filter_by'], settings['remove_highly_correlated'], settings['log_data'], settings['exclude']) embedding, labels, _ = reduction_and_clustering(numeric_data, settings['n_neighbors'], settings['min_dist'], settings['metric'], settings['eps'], settings['min_samples'], settings['clustering'], settings['reduction_method'], settings['verbose'], n_jobs=settings['n_jobs']) if settings['remove_cluster_noise']: # Remove noise from the clusters (removes -1 labels from DBSCAN) embedding, labels = remove_noise(embedding, labels) # Plot the results if settings['color_by']: if settings['embedding_by_controls']: labels = all_df[settings['color_by']] else: labels = all_df[settings['color_by']] # Generate colors for the clusters colors = generate_colors(len(np.unique(labels)), settings['black_background']) # Plot the embedding umap_plt = plot_embedding(embedding, image_paths, labels, settings['image_nr'], settings['img_zoom'], colors, settings['plot_by_cluster'], settings['plot_outlines'], settings['plot_points'], settings['plot_images'], settings['smooth_lines'], settings['black_background'], settings['figuresize'], settings['dot_size'], settings['remove_image_canvas'], settings['verbose']) if settings['plot_cluster_grids'] and settings['plot_images']: grid_plt = plot_clusters_grid(embedding, labels, settings['image_nr'], image_paths, colors, settings['figuresize'], settings['black_background'], settings['verbose']) # Save figure as PDF if required if settings['save_figure']: results_dir = os.path.join(settings['src'][0], 'results') os.makedirs(results_dir, exist_ok=True) reduction_method = settings['reduction_method'].upper() embedding_path = os.path.join(results_dir, f'{reduction_method}_embedding.pdf') umap_plt.savefig(embedding_path, format='pdf') print(f'Saved {reduction_method} embedding to {embedding_path} and grid to {embedding_path}') if settings['plot_cluster_grids'] and settings['plot_images']: grid_path = os.path.join(results_dir, f'{reduction_method}_grid.pdf') grid_plt.savefig(grid_path, format='pdf') print(f'Saved {reduction_method} embedding to {embedding_path} and grid to {grid_path}') # Add cluster labels to the dataframe if len(labels) > 0: all_df['cluster'] = labels else: all_df['cluster'] = 1 # Assign a default cluster label print("No clusters found. Consider reducing 'min_samples' or increasing 'eps' for DBSCAN.") # Save the results to a CSV file results_dir = os.path.join(settings['src'][0], 'results') results_csv = os.path.join(results_dir,'embedding_results.csv') os.makedirs(results_dir, exist_ok=True) all_df.to_csv(results_csv, index=False) print(f'Results saved to {results_csv}') if settings['analyze_clusters']: combined_results = cluster_feature_analysis(all_df) results_dir = os.path.join(settings['src'][0], 'results') cluster_results_csv = os.path.join(results_dir,'cluster_results.csv') os.makedirs(results_dir, exist_ok=True) combined_results.to_csv(cluster_results_csv, index=False) print(f'Cluster results saved to {cluster_results_csv}') fig = umap_plt.gcf() if hasattr(umap_plt, "gcf") else plt.gcf() # (saving CSVs etc. unchanged) if return_fig: return fig return all_df
[docs] def generate_screen_graphs(settings): """ Generate screen graphs for different measurements in a given source directory. Args: src (str or list): Path(s) to the source directory or directories. tables (list): List of tables to include in the analysis (default: ['cell', 'nucleus', 'pathogen', 'cytoplasm']). graph_type (str): Type of graph to generate (default: 'bar'). summary_func (str or function): Function to summarize data (default: 'mean'). y_axis_start (float): Starting value for the y-axis (default: 0). error_bar_type (str): Type of error bar to use ('std' or 'sem') (default: 'std'). theme (str): Theme for the graph (default: 'pastel'). representation (str): Representation for grouping (default: 'well'). Returns: figs (list): List of generated figures. results (list): List of corresponding result DataFrames. """ from .plot import spacrGraph from .io import _read_and_merge_data from.utils import annotate_conditions if isinstance(settings['src'], str): srcs = [settings['src']] else: srcs = settings['src'] all_df = pd.DataFrame() figs = [] results = [] for src in srcs: db_loc = [os.path.join(src, 'measurements', 'measurements.db')] # Read and merge data from the database df, _ = _read_and_merge_data(db_loc, settings['tables'], verbose=True, nuclei_limit=settings['nuclei_limit'], pathogen_limit=settings['pathogen_limit']) # Annotate the data df = annotate_conditions(df, cells=settings['cells'], cell_loc=None, pathogens=settings['controls'], pathogen_loc=settings['controls_loc'], treatments=None, treatment_loc=None) # Calculate recruitment metric df['recruitment'] = df['pathogen_channel_1_mean_intensity'] / df['cytoplasm_channel_1_mean_intensity'] # Combine with the overall DataFrame all_df = pd.concat([all_df, df], ignore_index=True) # Generate individual plot plotter = spacrGraph(df, grouping_column='pathogen', data_column='recruitment', graph_type=settings['graph_type'], summary_func=settings['summary_func'], y_axis_start=settings['y_axis_start'], error_bar_type=settings['error_bar_type'], theme=settings['theme'], representation=settings['representation']) plotter.create_plot() fig = plotter.get_figure() results_df = plotter.get_results() # Append to the lists figs.append(fig) results.append(results_df) # Generate plot for the combined data (all_df) plotter = spacrGraph(all_df, grouping_column='pathogen', data_column='recruitment', graph_type=settings['graph_type'], summary_func=settings['summary_func'], y_axis_start=settings['y_axis_start'], error_bar_type=settings['error_bar_type'], theme=settings['theme'], representation=settings['representation']) plotter.create_plot() fig = plotter.get_figure() results_df = plotter.get_results() figs.append(fig) results.append(results_df) # Save figures and results for i, fig in enumerate(figs): res = results[i] if i < len(srcs): source = srcs[i] else: source = srcs[0] # Ensure the destination folder exists dst = os.path.join(source, 'results') print(f"Savings results to {dst}") os.makedirs(dst, exist_ok=True) # Save the figure and results DataFrame fig.savefig(os.path.join(dst, f"figure_controls_{i}_{settings['representation']}_{settings['summary_func']}_{settings['graph_type']}.pdf"), format='pdf') res.to_csv(os.path.join(dst, f"results_controls_{i}_{settings['representation']}_{settings['summary_func']}_{settings['graph_type']}.csv"), index=False) return