Source code for adataviz.tools

from json import load
import os
import numpy as np
import pandas as pd
import anndata
import scanpy as sc
import scanpy.external as sce
from .utils import normalize_mc_by_cell,parse_gtf
from concurrent.futures import ProcessPoolExecutor, as_completed
from loguru import logger as logger

[docs] def merge_adata_regions( pseudobulk_adata_path,bin_size=5000,use_raw=True, res=100000,filter_chroms=True,boundary=None, exclude_chroms=['chrY','chrM','chrX']): """ Aggregrate 5kb RNA or ATAC adata into 100kb (25kb, or 10kb) Parameters ---------- pseudobulk_adata_path : path bin_size: int use_raw: res : int 100000 filter_chroms : bool True Returns A dataframe ------- """ def assign_boundary(indexes, names, list_x): """ For example, the 1st 25kb: 0,1,2,3,4; 2nd 25kb: 0,1,2,3,4; the boundary between these two bins should be: 3,4,0,1 """ name2index = {} boundary = [] flag = True tmp_chrom = None tmp_name = '' for x, idx, name in zip(list_x, indexes, names): chrom = name.split('_')[0] if chrom != tmp_chrom: if len(boundary) > 0: name2index[tmp_name] = tuple(boundary) boundary = [] flag = True if x == 0 or x == 1: boundary.append(idx) if x == 1: if flag: name2index[name] = tuple(boundary) flag = False if len(boundary) == 4: name2index[name] = tuple(boundary) if x == 2: boundary = [] tmp_chrom = chrom if x == 3 or x == 4: boundary.append(idx) tmp_chrom = chrom tmp_name = name return name2index if isinstance(pseudobulk_adata_path,str): adata=anndata.read_h5ad(os.path.expanduser(pseudobulk_adata_path)) else: adata=pseudobulk_adata_path if use_raw: if not adata.raw is None: adata=adata.raw.to_adata() else: logger.warning("adata.raw is None!!") data=adata.to_df().T fea="100kb" if res==100000 else '10kb' if res==10000 else '25kb' if boundary is None: boundary=True if fea == '25kb' else False groups=data.columns.tolist() data['chrom']=data.index.to_series().apply(lambda x:x.split(':')[0]) data['start']=data.index.to_series().apply(lambda x:x.split(':')[1].split('-')[0]).map(int) data['BinID']=data['start'].apply(lambda x:np.floor(x / bin_size)).astype(int) if not boundary: # merge 5kb into 100kb or 10kb data[fea] = data.apply(lambda x:x['chrom']+'_'+str(x['start'] // res),axis=1) # index, for example, chr1_0, chr1_0, chr1_0...,chr1_1 data = data.loc[:,groups+[fea]].groupby(fea).sum().T else: # for 25kb, get the domain doundary (10kb), not the 25kb windows. logger.info("Generating results for boundaries of 25kb bins") ids = data['BinID'] % 5 # [0,1],2,[3,4,0,1],2,[3,4,0,1],... data[fea] = data.apply(lambda x:x['chrom']+'_'+str(x['BinID'] // 5),axis=1) name2index = assign_boundary(data.index.tolist(), data[fea].tolist(), ids.tolist()) idx2name = {} for name in name2index: for idx in name2index[name]: idx2name[idx] = name data['NAME'] = data.index.to_series().map(idx2name) data = data.loc[data['NAME'].notna()] data = data.loc[data['NAME'].apply(lambda x: x.split('_')[0]) == data.chrom] data = data.loc[:,groups+['NAME']].groupby('NAME').sum().T if use_raw: # convert sum of raw counts into CPM adata = anndata.AnnData(X=data) sc.pp.normalize_total(adata, target_sum=1e6) sc.pp.log1p(adata) # log(CPM) data=adata.to_df() if filter_chroms: s_col=data.columns.to_frame() s_col['chrom']=s_col.index.to_series().apply(lambda x:x.split('_')[0]) keep_cols=s_col.loc[s_col['chrom'].apply(lambda x:x not in exclude_chroms and len(x)<6)].index.tolist() # use_rows = list(set(mc_df.index.tolist()) & set(cellclass2majortype[group])) data = data.loc[:, keep_cols] # rows are 100kb bins, columns are cell types return data # to do next: subset rows (df_bin.index) and columns (cell types order)
[docs] def cal_stats(adata_path,obs1,modality="RNA",expression_cutoff=0, use_raw=False,normalize_per_cell=True, clip_norm_value=10,sum_only=False): raw_adata=anndata.read_h5ad(os.path.expanduser(adata_path),backed='r') adata=raw_adata[obs1.index.tolist(),:].to_memory() raw_adata.file.close() if modality not in ['RNA','ATAC']: adata = normalize_mc_by_cell( use_adata=adata, normalize_per_cell=normalize_per_cell, clip_norm_value=clip_norm_value, hypo_score=False,verbose=0) else: if use_raw and not adata.raw is None: # adata.X=adata.raw.X.copy() adata_raw=adata.raw[:,adata.var_names.tolist()].to_adata() adata.X=adata_raw[adata.obs_names.tolist(),adata.var_names.tolist()].X.copy() # type: ignore del adata_raw df_data=adata.to_df() # rows are cells, columns are genes # Compute per-gene statistics (min, q25, q50, q75, max, sum) across cells. # Use NumPy's nanpercentile and nansum which are fast (uses quickselect under the hood). sums = np.nansum(df_data.values, axis=0) # for each column # fraction of cells expressing (or hypomethylated) the gene if modality not in ['RNA','ATAC']: # methylation, cutoff = 1 # frac = df_data.apply(lambda x: x[x < 1].shape[0] / x.shape[0]) # vectorized: count values < 1 per column divided by number of cells frac = (df_data < 1).sum(axis=0) / float(df_data.shape[0]) else: # for RNA # frac = df_data.apply(lambda x: x[x > expression_cutoff].shape[0] / x.shape[0]) # vectorized: count values > cutoff per column divided by number of cells frac = (df_data > expression_cutoff).sum(axis=0) / float(df_data.shape[0]) if sum_only: return sums,frac,df_data.columns.tolist() qs = np.nanpercentile(df_data.values, [0, 25, 50, 75, 100], axis=0) std=np.nanstd(df_data.values,axis=0) return qs,sums,std,frac,df_data.columns.tolist()
[docs] def cal_tpm(adata,target_sum=1e6,length_fillna=1000): assert 'length' in adata.var.columns.tolist(), "For TPM normalization, gene length information is required in adata.var['length'], please provide gene_meta" adata.var.length.fillna(length_fillna,inplace=True) # RPK (Reads Per Kilobase) counts = adata.to_df() #row are cell types and columns are genes # if hasattr(counts, 'toarray'): # counts = counts.toarray() # Convert sparse to dense if needed lengths_kb = (adata.var['length'] / 1000).apply(lambda x:max(x,1)).to_dict() # keys are genes # RPK: divide each gene's counts by its length in kb rpk=counts.apply(lambda x: x / lengths_kb.get(x.name, 1)) # per column (gene), dataframe: rows are cell types, columns are genes # Calculate the "Per Million" Scaling Factor, Per-cell scaling factor: sum of RPK per cell rpk_sum = rpk.sum(axis=1).to_dict() # Sum RPKs per cell (row); keys are cell types # TPM = (RPK / per_cell_sum) * 1e6 tpm=rpk.apply(lambda x: (x / rpk_sum.get(x.name, 1)) * target_sum, axis=1) # Scale to TPM, for each row (cell type) # Store TPM in adata.layers adata.X = tpm.apply(np.log1p).values # log(TPM) adata.uns['Normalization']='log(TPM)' return adata
[docs] def scrna2pseudobulk( adata_path,downsample=2000, obs_path=None,groupby="Group",use_raw=True, n_jobs=1,normalization=None,target_sum=1e6,gtf=None,save=None ): assert use_raw == True, "For normalization (CPM or TPM), please set use_raw=True" # assert modality=='RNA': # methylation raw_adata=anndata.read_h5ad(os.path.expanduser(adata_path),backed='r') if not obs_path is None: if isinstance(obs_path,str): obs=pd.read_csv(os.path.expanduser(obs_path), sep='\t',index_col=0) else: obs=obs_path.copy() overlapped_cells=list(set(raw_adata.obs_names.tolist()) & set(obs.index.tolist())) obs=obs.loc[overlapped_cells] else: obs=raw_adata.obs.copy() # raw_adata.obs[groupby]=raw_adata.obs.index.to_series().map(obs[groupby].to_dict()) raw_adata.file.close() obs=obs.loc[obs[groupby].notna()] if not downsample is None: all_cells = obs.groupby(groupby).apply( lambda x: x.sample(downsample).index.tolist() if x.shape[0] > downsample else x.index.tolist()).sum() else: all_cells=obs.index.tolist() obs=obs.loc[all_cells] data={} if n_jobs==-1: n_jobs=os.cpu_count() with ProcessPoolExecutor(n_jobs) as executor: futures = {} for group in obs[groupby].unique(): obs1=obs.loc[obs[groupby]==group] if obs1.shape[0]==0: continue future = executor.submit( cal_stats,adata_path=adata_path,obs1=obs1, use_raw=use_raw,sum_only=True ) futures[future] = group logger.debug(f"Submitted {len(futures)} groups for pseudobulk calculation.") for future in as_completed(futures): group = futures[future] logger.debug(group) sums,frac,header = future.result() if 'sum' not in data: data['sum'] = [] data['sum'].append(pd.Series(sums, name=group, index=header)) frac.name = group if 'frac' not in data: data['frac'] = [] data['frac'].append(pd.Series(frac, name=group,index=header)) raw_adata.file.close() X=pd.concat(data['sum'],axis=1).T # sum of raw counts or normalized methylation fraction vc=raw_adata.obs.loc[all_cells][groupby].value_counts().to_frame(name='cell_count') adata = anndata.AnnData(X=X,obs=vc.loc[X.index.tolist()]) # put sum into adata.X adata.layers['frac']=pd.concat(objs=data['frac'],axis=1).T del data adata.raw=adata.copy() if not normalization is None and use_raw: # Calculate CPM or TPM only if aggfunc is sum logger.info(f"Normalizing pseudobulk adata using {normalization} method.") if not gtf is None: df_gene = parse_gtf(gtf=gtf) # ['chrom','beg','end','gene_name','gene_id','strand','gene_type'] # for genes with duplicated records, only keep the longest gene df_gene['length']=df_gene.end - df_gene.beg df_gene.sort_values('length',ascending=False,inplace=True) # type: ignore df_gene.drop_duplicates('gene_name',keep='first',inplace=True) # type: ignore df_gene.set_index('gene_name',inplace=True) for col in ['chrom','beg','end','strand','gene_type','gene_id','length']: adata.var[col]=adata.var_names.map(df_gene[col].to_dict()) if normalization=='CPM': # for new sc-RNA-seq pipeline, CPM is equal to TPM? sc.pp.normalize_total(adata, target_sum=target_sum) sc.pp.log1p(adata) # log(CPM) adata.uns['Normalization']='log(CPM)' else: #TPM assert not gtf is None, "For TPM normalization, please provide gtf file." adata=cal_tpm(adata,target_sum=target_sum,length_fillna=1000) vc_dict=vc.to_dict()['cell_count'] adata.layers['mean']=adata.to_df().apply(lambda x:x/vc_dict[x.name],axis=1) if not save is None: outdir=os.path.dirname(os.path.abspath(os.path.expanduser(save))) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) outfile=os.path.expanduser(save) adata.write_h5ad(outfile) else: return adata
[docs] def stat_pseudobulk( adata_path,downsample=2000, obs_path=None,groupby="Group",use_raw=False,expression_cutoff=0, modality="RNA",n_jobs=1,normalize_per_cell=True,clip_norm_value=10, save=None ): if modality not in ['RNA','ATAC']: # methylation assert normalize_per_cell==True, "For methylation, normalize_per_cell should be True" raw_adata=anndata.read_h5ad(os.path.expanduser(adata_path),backed='r') if not obs_path is None: if isinstance(obs_path,str): obs=pd.read_csv(os.path.expanduser(obs_path), sep='\t',index_col=0) else: obs=obs_path.copy() overlapped_cells=list(set(raw_adata.obs_names.tolist()) & set(obs.index.tolist())) obs=obs.loc[overlapped_cells] else: obs=raw_adata.obs.copy() # raw_adata.obs[groupby]=raw_adata.obs.index.to_series().map(obs[groupby].to_dict()) raw_adata.file.close() obs=obs.loc[obs[groupby].notna()] if not downsample is None: all_cells = obs.groupby(groupby).apply( lambda x: x.sample(downsample).index.tolist() if x.shape[0] > downsample else x.index.tolist()).sum() else: all_cells=obs.index.tolist() obs=obs.loc[all_cells] data={} if n_jobs==-1: n_jobs=os.cpu_count() with ProcessPoolExecutor(n_jobs) as executor: futures = {} for group in obs[groupby].unique(): obs1=obs.loc[obs[groupby]==group] if obs1.shape[0]==0: continue future = executor.submit( cal_stats,adata_path=adata_path,obs1=obs1,modality=modality, expression_cutoff=expression_cutoff, use_raw=use_raw,normalize_per_cell=normalize_per_cell, clip_norm_value=clip_norm_value ) futures[future] = group logger.debug(f"Submitted {len(futures)} groups for pseudobulk calculation.") for future in as_completed(futures): group = futures[future] logger.debug(group) qs,sums,std,frac,header = future.result() for k,v in zip(['min', 'q25', 'q50', 'q75', 'max', 'sum','std'], qs.tolist() + [sums.tolist(),std.tolist()]): if k not in data: data[k] = [] data[k].append(pd.Series(v, name=group, index=header)) frac.name = group if 'frac' not in data: data['frac'] = [] data['frac'].append(pd.Series(frac, name=group,index=header)) X=pd.concat(data['sum'],axis=1).T # sum of raw counts or normalized methylation fraction vc=obs[groupby].value_counts().to_frame(name='cell_count') adata = anndata.AnnData(X=X,obs=vc.loc[X.index.tolist()]) # put sum into adata.X for k in data: if k=='sum': continue adata.layers[k]=pd.concat(objs=data[k],axis=1).T del data vc_dict=vc.to_dict()['cell_count'] adata.layers['mean']=adata.to_df().apply(lambda x:x/vc_dict[x.name],axis=1) if not save is None: outdir=os.path.dirname(os.path.abspath(os.path.expanduser(save))) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) outfile=os.path.expanduser(save) adata.write_h5ad(outfile) else: return adata
[docs] def normalize_adata(adata,embedding=True,outfile=None, n_top_genes=5000,n_pcs=50,batch_col=None): adata=load_adata(adata,backed=None) sc.pp.filter_genes(adata,min_cells=5) sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) if embedding: sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes) sc.tl.pca(adata) sc.pl.pca_variance_ratio(adata, n_pcs=n_pcs) #log=True if not batch_col is None: sce.pp.harmony_integrate(adata, key='donor', basis='X_pca', max_iter_harmony=50) use_rep = 'X_pca_harmony' else: use_rep = 'X_pca' #'X_pca_harmony' sc.pp.neighbors(adata,use_rep=use_rep) sc.tl.leiden(adata, resolution=1) sc.tl.umap(adata) if not outfile is None: outfile=os.path.expanduser(outfile) outdir=os.path.dirname(os.path.abspath(outfile)) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) adata.write_h5ad(outfile) else: return adata
[docs] def export_pseudobulk_adata(adata,outdir="pseudobulk.bed",use_raw=False): """ Export pseudobulk adata to bed """ outdir=os.path.expanduser(outdir) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) if isinstance(adata,str): adata=anndata.read_h5ad(os.path.expanduser(adata)) else: adata=adata if use_raw: data=adata.raw.to_adata().to_df().T # raw counts else: data=adata.to_df().T # CPM, log(CPM) or .. if "chrom" in adata.var.columns.tolist(): data.insert(0,"chrom",adata.var.loc[data.index.tolist(),"chrom"].tolist()) else: data.insert(0,"chrom",data.index.to_series().apply(lambda x:x.split(':')[0])) beg="beg" if "beg" in adata.var.columns.tolist() else "start" if beg in adata.var.columns.tolist(): data.insert(1,"start",adata.var.loc[data.index.tolist(),beg].tolist()) else: data.insert(1,"start",data.index.to_series().apply(lambda x:x.split(':')[1].split('-')[0])) if "end" in adata.var.columns.tolist(): data.insert(2,"end",adata.var.loc[data.index.tolist(),"end"].tolist()) else: data.insert(2,"end",data.index.to_series().apply(lambda x:x.split(':')[1].split('-')[1])) data.insert(3,"features",data.index.tolist()) if "strand" in adata.var.columns.tolist(): data.insert(4,"strand",adata.var.loc[data.index.tolist(),"strand"].tolist()) else: data.insert(4,"strand","+") data=data.loc[(data.chrom.notna()) & (data.start.notna()) & (data.end.notna())] data.start=data.start.astype(int) data.end=data.end.astype(int) data.sort_values(['chrom','start','end'],ascending=True,inplace=True) for col in data.columns.tolist()[4:]: if col in ['chrom','start','beg','end','strand','features']: continue data.loc[:,['chrom','start','end','features',col,'strand']].to_csv( os.path.join(outdir,f"{col.replace(' ','_')}.bed"), sep='\t',index=False,header=False) df=data.loc[:,['features',col]] df.to_csv(os.path.join(outdir,f"{col.replace(' ','_')}.txt"), sep='\t',index=False,header=False)
[docs] def load_adata(adata,backed='r'): if isinstance(adata,str): adata=anndata.read_h5ad(os.path.expanduser(adata),backed=backed) else: assert isinstance(adata,anndata.AnnData) return adata
[docs] def load_obs(obs): if isinstance(obs,str) and not obs.endswith('.h5ad'): obs_path = os.path.abspath(os.path.expanduser(obs)) sep='\t' if obs_path.endswith('.tsv') or obs_path.endswith('.txt') else ',' obs = pd.read_csv(obs_path, index_col=0,sep=sep) elif isinstance(obs,str) and obs.endswith('.h5ad'): adata=anndata.read_h5ad(os.path.expanduser(obs),backed='r') obs=adata.obs.copy() adata.file.close() else: assert isinstance(obs,pd.DataFrame) return obs
[docs] def load_color_palette(palette_path=None,adata=None,groups=[]): # read color palette if isinstance(groups,str): groups=[groups] assert isinstance(groups,list) color_palette={} if not palette_path is None and os.path.exists(os.path.expanduser(palette_path)): palette_path = os.path.abspath(os.path.expanduser(palette_path)) D = pd.read_excel(palette_path,sheet_name=None, index_col=0) for group_col in groups: if group_col in D: color_palette[group_col] = D[group_col].Hex.to_dict() else: assert '+' in group_col, f"{group_col} not found in the palette file." for group in group_col.split('+'): assert group in D, f"{group} not found in the palette file." color_palette[group] = D[group].Hex.to_dict() else: if adata is None: return None # assert not adata is None, "If palette_path not provided, adata is required to extract color information from adata.obs" for group_col in groups: if f'{group_col}_colors' in adata.uns: group_series=adata.obs[group_col] if not pd.api.types.is_categorical_dtype(group_series): group_series=group_series.astype('category') color_palette[group_col]={cluster:color for cluster,color in zip(group_series.cat.categories.tolist(), adata.uns[f'{group_col}_colors']) } if len(color_palette)==0: return None if len(groups)==1: return color_palette[groups[0]] return color_palette
[docs] def get_obs(adata_path, add_coord=True,usecols=None,index_name='cell',outfile=None): adata = anndata.read_h5ad(os.path.expanduser(adata_path),backed='r') obs=adata.obs.copy() obs.index.name=index_name if add_coord: for coord in ["umap",'tsne']: if f'X_{coord}' not in adata.obsm: continue df_coord=pd.DataFrame(adata.obsm[f'X_{coord}'],columns=[f'{coord}_0',f'{coord}_1'],index=adata.obs_names) obs[f'{coord}_0']=obs.index.to_series().map(df_coord[f'{coord}_0'].to_dict()) obs[f'{coord}_1']=obs.index.to_series().map(df_coord[f'{coord}_1'].to_dict()) adata.file.close() if not usecols is None: obs=obs.loc[:,usecols] if not outfile is None: obs.to_csv(os.path.expanduser(outfile),sep='\t') return outfile return obs.reset_index()
[docs] def downsample_adata(adata_path,groupby="Group",obs_path=None, outfile="Group.downsample_1500.h5ad", downsample=1500): adata_path=os.path.expanduser(adata_path) outfile=os.path.expanduser(outfile) adata=anndata.read_h5ad(adata_path,backed='r') if not obs_path is None: if isinstance(obs_path,str): obs=pd.read_csv(os.path.expanduser(obs_path), sep='\t',index_col=0) else: obs=obs_path.copy() overlapped_cells=list(set(adata.obs_names.tolist()) & set(obs.index.tolist())) obs=obs.loc[overlapped_cells] else: obs=adata.obs.copy() keep_cells = obs.loc[obs[groupby].notna()].groupby(groupby).apply( lambda x: x.sample(downsample).index.tolist() if x.shape[0] > downsample else x.index.tolist()).sum() adata[keep_cells,:].write_h5ad(outfile,compression='gzip',convert_strings_to_categoricals=False) adata.file.close()
[docs] def prepare_color_palette(color_dict=None,outpath="palette.xlsx"): """ Generating a .xlsx file including all color palette. Parameters ---------- colors : dict A dict of dict, keys are categorical terms, values are HEX color code Returns ------- """ outpath=os.path.expanduser(outpath) writer = pd.ExcelWriter(outpath) for key in color_dict: data = pd.DataFrame.from_dict(color_dict[key], orient='index', columns=['Hex']) # data.style.background_gradient(cmap='gray_r') # data.style.applymap(lambda x:'color:'+x if x.startswith('#') else 'color: white') data.to_excel(writer, sheet_name=key, index=True) workbook = writer.book worksheet = writer.sheets[key] colors = data.Hex.tolist() for i in range(data.shape[0]): color = colors[i] f = workbook.add_format({'bold': True, 'font_color': 'black', 'bg_color': color}) worksheet.write(i + 1, 1, color, f) width = 20 cell_fmt = workbook.add_format( {'bold': False, 'font_color': 'black', # 'bg_color':'green', 'align': 'center', 'valign': 'vcenter'}) # styled = data.style.applymap(lambda val: 'color: %s' % 'red' if val < 0 else 'black').highlight_max() worksheet.set_column(0, 1, width, cell_fmt) # worksheet.conditional_format(f'A:{last_col}', {'type': 'no_blanks', 'format': cell_fmt}) writer.close()
[docs] def get_color_palette(adata,groupby="Group"): adata=load_adata(adata) # color palette color_dict = {} for col in ['Neighborhood', 'Class', 'Subclass', 'Group']: D = adata.obs.reset_index().loc[:, [col, f"color_hex_{col.lower()}"]].drop_duplicates().set_index(col)[ f"color_hex_{col.lower()}"].to_dict() color_dict[col] = D prepare_color_palette(color_dict=color_dict,outpath="color_palette.xlsx")
[docs] def composition(obs,groupby,stratify_col=None,composition_col="Region", outname=None,parent_col=None, sort_cols=None,adata=None,color_palette=None): from xlsxwriter.utility import xl_col_to_name # for example: Regional Composition Within Each Cell Type (for each cell type in each donor) obs=load_obs(obs) obs[groupby]=obs[groupby].astype(str) if not sort_cols is None: obs.sort_values(sort_cols,inplace=True) cell_type_order=obs[groupby].unique().tolist() if not parent_col is None: group2parent=obs.loc[:,[groupby,parent_col]].drop_duplicates().set_index(groupby)[parent_col].to_dict() if outname is None: outname=f"{composition_col}_composition.xlsx" if not outname.endswith('.xlsx'): outname=outname+'.xlsx' if not adata is None: adata=load_adata(adata) groups=[parent_col,groupby,composition_col] if not parent_col is None else [groupby,composition_col] if not color_palette is None: color_palette=load_color_palette(palette_path=color_palette,adata=adata, groups=groups) else: color_palette=None writer = pd.ExcelWriter(outname) if not stratify_col is None: # such as donor stratify_order=obs[stratify_col].unique().tolist() else: stratify_order=[] for stratify in ['All']+stratify_order: if stratify=='All': df=obs.groupby(groupby)[composition_col].value_counts(normalize=True).unstack() else: df=obs.loc[obs[stratify_col]==stratify].groupby(groupby)[composition_col].value_counts(normalize=True).unstack() rows=[ct for ct in cell_type_order if ct in df.index.tolist()] df=df.loc[rows] if not parent_col is None: df.reset_index(inplace=True) df.insert(0,parent_col,df[groupby].map(group2parent)) df.set_index([parent_col,groupby],inplace=True) df=df.applymap(lambda x:100*x) # type: ignore df.to_excel(writer,sheet_name=stratify) workbook = writer.book worksheet = writer.sheets[stratify] if not parent_col is None: parent_values = df.index.get_level_values(0).tolist() if not color_palette is None: parent_colors=[color_palette[parent_col][ct] for ct in parent_values] group_values=df.index.get_level_values(1).tolist() regions=df.columns.tolist() if not color_palette is None: group_colors=[color_palette[groupby][ct] for ct in group_values] composition_colors=[color_palette[composition_col][r] for r in regions] for i in range(df.shape[0]): group_color = group_colors[i] if not color_palette is None else 'black' # f2 = workbook.add_format({'bold': True, 'font_color': 'black', 'bg_color': group_color}) f2 = workbook.add_format({'bold': True, 'font_color': group_color, 'bg_color': 'white'}) if not parent_col is None: parent_color = parent_colors[i] if not color_palette is None else 'white' f1 = workbook.add_format({'bold': True, 'font_color': 'black', 'bg_color': parent_color}) worksheet.write(i + 1, 0, parent_values[i], f1) worksheet.write(i + 1, 1, group_values[i], f2) else: worksheet.write(i + 1, 0, group_values[i], f2) for i in range(df.shape[1]): composition_color = composition_colors[i] if not color_palette is None else 'black' f1 = workbook.add_format({'bold': True, 'font_color': composition_color, 'bg_color':'white'}) if not parent_col is None: worksheet.write(0, i+2, regions[i], f1) else: worksheet.write(0, i+1, regions[i], f1) # width = 20 # cell_fmt = workbook.add_format( # {'bold': False, 'font_color': 'black', # # 'bg_color':'green', # 'align': 'center', 'valign': 'vcenter'}) # worksheet.set_column(0, 1, width, cell_fmt) end=str(xl_col_to_name(df.shape[1]))+str(df.shape[0]+1) start="B2" if parent_col is None else "C2" mid_value= 1 / df.shape[1] worksheet.conditional_format(f'{start}:{end}', {'type': '3_color_scale', 'min_color': "#3abf99", 'min_value':0, 'mid_color': "white", 'mid_value':mid_value, 'max_color': "#c72228", 'max_value':1}) writer.close()
[docs] def taxonomy(obs,levels=['Neighborhood','Class','Subclass','Group'], groupby="Region",outfile=None): """ _summary_ Parameters ---------- obs : path or dataframe path to obs file (tsv, csv or h5ad) or obs dataframe with _description_ """ obs=load_obs(obs) level=levels[-1] D=obs.groupby(level).apply(lambda x:str(x[groupby].value_counts().to_dict())).to_dict() obs1=obs.reset_index().loc[:,levels].drop_duplicates() obs1[f'{groupby}.Distrubution']=obs1[level].map(D) obs1.sort_values(levels,inplace=True) # type: ignore if outfile is None: outfile=f"{level}.{groupby}_taxonomy.xlsx" obs1.to_excel(os.path.expanduser(outfile),index=False)
[docs] def get_markers_worker(adata1, obs,level, df_gene, key, outdir,topn,downsample): if not os.path.exists(os.path.join(outdir, f"{key}.tsv")): use_cells=adata1.obs.groupby(level).apply(lambda x: x.sample(downsample).index.tolist() if x.shape[0] > downsample else x.index.tolist()).sum() use_adata=adata1[use_cells,:].to_memory() if not obs is None: use_adata.obs=obs.loc[use_cells,:].copy() vc=use_adata.obs[level].value_counts() groups=vc[vc >= 3].index.tolist() sc.tl.rank_genes_groups(use_adata, groupby=level, groups=groups,method="wilcoxon", use_raw=False, key_added=key) # The Wilcoxon rank-sum test is a non-parametric test that compares the ranks of gene expression values between groups. It works best with raw counts because normalization can alter the rank distribution markers = sc.get.rank_genes_groups_df(use_adata, group=groups, key=key) markers = markers.loc[~ markers.names.isna()] markers = markers.loc[(~ markers.logfoldchanges.isna()) & (markers.scores > 0) & (markers.pvals < 0.01) & ( markers.logfoldchanges > 1)] if markers.shape[0] == 0: return {} markers.sort_values('logfoldchanges', ascending=False, inplace=True) markers.to_csv(os.path.join(outdir, f"{key}.tsv"), sep='\t', index=False) else: markers=pd.read_csv(os.path.join(outdir, f"{key}.tsv"), sep='\t') markers.names = markers.names.apply(lambda x: x.split('.')[0]) markers = markers.loc[markers.names.isin(df_gene['gene_name'].tolist())].groupby('group').apply(lambda x: x.head(topn).names.tolist()).to_dict() return markers
[docs] def get_markers(adata_path,levels,gtf,obs=None,outdir='markers',topn=20,downsample=2000): outdir=os.path.expanduser(outdir) if not os.path.exists(outdir): os.makedirs(outdir,exist_ok=True) adata=load_adata(adata_path) if not obs is None: obs=load_obs(obs) overlapped_cells=list(set(adata.obs_names.tolist()) & set(obs.index.tolist())) obs=obs.loc[overlapped_cells] for level in levels: adata.obs[level]=adata.obs.index.to_series().map(obs[level].to_dict()) else: obs=adata.obs.copy() df_gene = parse_gtf(gtf=gtf) # ['chrom','beg','end','gene_name','gene_id','strand','gene_type'] # DEG R = [] for i in range(len(levels)): level = levels[i] logger.info(level) if i == 0: parent = '' markers = get_markers_worker(adata, obs,level, df_gene, level,outdir,topn,downsample) for k in markers: # k is cell type in different levels R.append([level, parent, k, markers[k]]) # append topn marker genes else: # i >= 1 for clusters, df1 in adata.obs.groupby(levels[:i],observed=True): if df1.shape[0]==0: continue if df1[level].nunique() < 2: continue logger.info(clusters) adata1 = adata[df1.index.tolist(),:].to_memory() parent = list(clusters)[-1] # '|'.join(list(clusters)) key = parent + '|' + level # print(key) markers = get_markers_worker(adata1, obs,level, df_gene, key, outdir,topn,downsample) if len(markers) == 0: continue for k in markers: R.append([level, parent, k, markers[k]]) if adata.isbacked: adata.file.close() data = pd.DataFrame(R, columns=['Level', 'Parent', 'CellType', 'Markers']) data.to_excel(os.path.join(outdir,"markers.xlsx"), index=False, sheet_name='markers')