Coverage for /home/deng/Projects/metatree_drawer/treeprofiler_algo/pastml/pastml/visualisation/itol_manager.py: 17%

96 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2024-03-21 09:19 +0100

1import logging 

2import os 

3from pathlib import Path 

4 

5import pandas as pd 

6 

7from pastml.ml import is_marginal, MARGINAL_PROBABILITIES, MODEL 

8from pastml import METHOD, CHARACTER, STATES 

9from pastml.visualisation.colour_generator import get_enough_colours, parse_colours 

10from itolapi import Itol 

11 

12from pastml.tree import DATE 

13 

14STYLE_FILE_HEADER_TEMPLATE = """DATASET_STYLE 

15 

16SEPARATOR TAB 

17DATASET_LABEL {column} ({method}) 

18COLOR #ffffff 

19 

20LEGEND_COLORS {colours} 

21LEGEND_LABELS {states} 

22LEGEND_SHAPES {shapes} 

23LEGEND_TITLE {column} ({method}) 

24 

25DATA 

26#NODE_ID TYPE NODE COLOR SIZE_FACTOR LABEL_OR_STYLE 

27""" 

28 

29COLORSTRIP_FILE_HEADER_TEMPLATE = """DATASET_COLORSTRIP 

30 

31SEPARATOR TAB 

32BORDER_WIDTH 0.5 

33MARGIN 5 

34STRIP_WIDTH 25 

35COLOR #ffffff 

36DATASET_LABEL {column} 

37 

38LEGEND_COLORS {colours} 

39LEGEND_LABELS {states} 

40LEGEND_SHAPES {shapes} 

41LEGEND_TITLE {column} 

42 

43DATA 

44#NODE_ID COLOR LABEL_OR_STYLE 

45""" 

46 

47POPUP_FILE_HEADER = """POPUP_INFO 

48 

49SEPARATOR TAB 

50 

51DATA 

52#NODE_ID POPUP_TITLE POPUP_CONTENT 

53""" 

54 

55POPUP_CONTENT_TEMPLATE = "<b>{key}: </b>" \ 

56 "<div style='overflow:auto;max-width:50vw;'>" \ 

57 "<span style='white-space:nowrap;'>{value}</span></div>" 

58 

59DEFAULT_ITOL_PROJECT = 'Sample project' 

60 

61 

62def generate_itol_annotations(column2states, work_dir, acrs, state_df, date_col, 

63 tree_path, itol_id=None, itol_project=None, itol_tree_name=None, 

64 column2colours=None): 

65 annotation_files = [] 

66 popup_file = os.path.join(work_dir, 'iTOL_popup_info.txt') 

67 with open(popup_file, 'w+') as pf: 

68 pf.write(POPUP_FILE_HEADER) 

69 

70 state_df['itol_type'] = 'branch' 

71 state_df['itol_node'] = 'node' 

72 state_df['itol_style'] = 'normal' 

73 state_df['itol_size'] = 2 

74 

75 for acr_result in acrs: 

76 column = acr_result[CHARACTER] 

77 states = acr_result[STATES] 

78 num_unique_values = len(states) 

79 if column2colours and column in column2colours: 

80 colours = parse_colours(column2colours[column], states) 

81 else: 

82 colours = get_enough_colours(num_unique_values) 

83 value2colour = dict(zip(states, colours)) 

84 style_file = os.path.join(work_dir, 'iTOL_style-{}.txt'.format(column)) 

85 with open(style_file, 'w+') as sf: 

86 sf.write(STYLE_FILE_HEADER_TEMPLATE 

87 .format(column=column, colours='\t'.join(colours), states='\t'.join(states), 

88 shapes='\t'.join(['1'] * len(states)), 

89 method='{}{}'.format(acr_result[METHOD], 

90 ('+{}'.format(acr_result[MODEL].name) if MODEL in acr_result else '')))) 

91 col_df = state_df[state_df[column].apply(len) == 1] 

92 col_df['itol_label'] = col_df[column].apply(lambda _: next(iter(_))) 

93 col_df['itol_colour'] = col_df['itol_label'].apply(lambda _: value2colour[_]) 

94 col_df[['itol_type', 'itol_node', 'itol_colour', 'itol_size', 'itol_style']].to_csv(style_file, sep='\t', 

95 header=False, mode='a') 

96 annotation_files.append(style_file) 

97 logging.getLogger('pastml').debug('Generated iTol style file for {}: {}.'.format(column, style_file)) 

98 

99 colorstrip_file = os.path.join(work_dir, 'iTOL_colorstrip-{}.txt'.format(column)) 

100 with open(colorstrip_file, 'w+') as csf: 

101 csf.write(COLORSTRIP_FILE_HEADER_TEMPLATE.format(column=column, colours='\t'.join(colours), 

102 states='\t'.join(states), 

103 shapes='\t'.join(['1'] * len(states)))) 

104 col_df[['itol_colour', 'itol_label']].to_csv(colorstrip_file, sep='\t', header=False, mode='a') 

105 annotation_files.append(colorstrip_file) 

106 logging.getLogger('pastml').debug('Generated iTol colorstrip file for {}: {}.'.format(column, colorstrip_file)) 

107 

108 state_df = state_df[list(column2states.keys()) + ['dist', DATE]] 

109 for c in column2states.keys(): 

110 state_df[c] = state_df[c].apply(lambda _: ' or '.join(sorted(_))) 

111 state_df.columns = ['ACR {} predicted state'.format(c) for c in column2states.keys()] + ['Node dist', date_col] 

112 state_df['Node id'] = state_df.index 

113 

114 for acr_result in acrs: 

115 if is_marginal(acr_result[METHOD]): 

116 df = acr_result[MARGINAL_PROBABILITIES] 

117 state_df.loc[df.index.map(str), 

118 'ACR {character} marginal probabilities'.format(character=acr_result[CHARACTER])] = \ 

119 df.apply(lambda vs: ', '.join(('{}: {:g}'.format(c, mp) for (c, mp) in zip(df.columns, vs))), 

120 axis=1) 

121 cols = sorted(state_df.columns, reverse=True) 

122 state_df['popup_info'] = \ 

123 state_df[cols].apply(lambda vs: '<br>'.join(((POPUP_CONTENT_TEMPLATE 

124 if c.startswith('ACR ') else '<b>{key}: </b>: {value}') 

125 .format(key=c, value=v) for (c, v) in zip(cols, vs) 

126 if not pd.isna(v))), 

127 axis=1) 

128 state_df['label'] = 'ACR results' 

129 state_df[['label', 'popup_info']].to_csv(popup_file, sep='\t', mode='a', header=False) 

130 annotation_files.append(popup_file) 

131 logging.getLogger('pastml').debug('Generated iTol pop-up file: {}.'.format(popup_file)) 

132 

133 if itol_id: 

134 if not itol_project: 

135 logging.getLogger('pastml').debug('Trying "{}" as iTOL project, the tree will be uploaded to. ' 

136 'To upload to a different project, use itol_project argument.' 

137 .format(DEFAULT_ITOL_PROJECT)) 

138 if not itol_tree_name: 

139 itol_tree_name = os.path.splitext(os.path.basename(tree_path))[0] 

140 tree_id, web_page = upload_to_itol(tree_path, annotation_files, tree_name=itol_tree_name, 

141 tree_description=None, project_name=itol_project, upload_id=itol_id) 

142 if web_page: 

143 with open(os.path.join(work_dir, 'iTOL_url.txt'), 'w+') as f: 

144 f.write(web_page) 

145 if tree_id: 

146 with open(os.path.join(work_dir, 'iTOL_tree_id.txt'), 'w+') as f: 

147 f.write(tree_id) 

148 else: 

149 logging.getLogger('pastml').debug('To upload your tree(s) to iTOL, please specify the itol_id argument.') 

150 

151 

152def upload_to_itol(tree_path, dataset_paths, tree_name=None, tree_description=None, project_name=None, upload_id=None): 

153 try: 

154 itol_uploader = Itol() 

155 itol_uploader.add_file(Path(tree_path)) 

156 for annotation_file in dataset_paths: 

157 itol_uploader.add_file(Path(annotation_file)) 

158 if tree_name: 

159 itol_uploader.params['treeName'] = tree_name 

160 if tree_description: 

161 itol_uploader.params['treeDescription'] = tree_description 

162 if upload_id: 

163 itol_uploader.params['uploadID'] = upload_id 

164 if project_name: 

165 itol_uploader.params['projectName'] = project_name 

166 if itol_uploader.upload(): 

167 logging.getLogger('pastml').debug( 

168 'Successfully uploaded your tree ({}) to iTOL: {}.'.format(itol_uploader.comm.tree_id, 

169 itol_uploader.get_webpage())) 

170 return itol_uploader.comm.tree_id, itol_uploader.get_webpage() 

171 else: 

172 status = itol_uploader.comm.upload_output 

173 except Exception as e: 

174 status = e 

175 logging.getLogger('pastml').error( 

176 'Failed to upload your tree to iTOL because of "{}". Please check your internet connection and iTOL settings{}.' 

177 .format(status, 

178 (', e.g. your iTOL batch upload id ({}){}' 

179 .format(upload_id, 

180 (' and whether the project {} exists'.format(project_name) if project_name else ''))) 

181 if upload_id else '')) 

182 return None, None