Coverage for src/driada/gdrive/download.py: 39.81%

103 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1from os.path import join 

2 

3from pydrive2.drive import GoogleDrive 

4import warnings 

5import wget 

6import gdown 

7import pandas as pd 

8import shutil 

9from pathlib import Path 

10 

11from .gdrive_utils import * 

12from ..utils.output import * 

13 

14 

15def retrieve_relevant_ids(folder, 

16 name_part, 

17 prohibited_name_part='', 

18 whitelist=[], 

19 extensions=['.csv', '.xlsx']): 

20 

21 return_code = True 

22 folder_page = client.get(folder) 

23 

24 if folder_page.status_code != 200: 

25 return False, None 

26 

27 gdrive_file, id_name_type_iter = parse_google_drive_file( 

28 folder, 

29 folder_page.text, 

30 ) 

31 

32 relevant = [] 

33 if len(list(id_name_type_iter)) > MAX_NUMBER_FILES: 

34 raise MemoryError( 

35 f'The folder {folder} has {len(list(id_name_type_iter))} elements while max allowed number of files is {MAX_NUMBER_FILES}') 

36 

37 for child_id, child_name, child_type in id_name_type_iter: 

38 if child_type != folder_type: 

39 if child_name in whitelist: 

40 relevant.append((child_id, child_name)) 

41 elif name_part in child_name: 

42 if len(extensions) != 0 and Path(child_name).suffix in extensions or len(extensions) == 0: 

43 if (prohibited_name_part is not None) and (prohibited_name_part not in child_name) or prohibited_name_part is None: 

44 relevant.append((child_id, child_name)) 

45 else: 

46 pass 

47 

48 else: 

49 return_code, rel_sublist = retrieve_relevant_ids(folders_url + child_id, 

50 name_part, 

51 prohibited_name_part=prohibited_name_part, 

52 whitelist=whitelist, 

53 extensions=extensions) 

54 if not return_code: 

55 print(f'recursive search broke on folder {child_id}') 

56 break 

57 relevant.extend(rel_sublist) 

58 

59 return return_code, relevant 

60 

61 

62def download_part_of_folder( 

63 output, # path for downloaded data 

64 folder, # share link to google drive folder 

65 key='', # part of filename to search for 

66 antikey=None, # part of name to suppress 

67 whitelist=[], # list of filenames to be downloaded regardless of their names 

68 extensions=['.csv', '.xlsx', '.npz'], # allowed file extensions 

69 via_pydrive=False, # pydrive requires authorization, but can download a big number of files, 

70 gauth=None, 

71 maxfiles=None): 

72 

73 os.makedirs(output, exist_ok=True) 

74 

75 with Capturing() as load_log: 

76 if via_pydrive: 

77 if gauth is None: 

78 raise ValueError('To use pydrive, you need to authenticate using one of the functions' 

79 ' in driada.gdrive.auth') 

80 drive = GoogleDrive(gauth) 

81 

82 rel = [] 

83 fid = id_from_link(folder) 

84 file_list = drive.ListFile({'q': f"'{fid}' in parents and trashed=false"}).GetList() 

85 if maxfiles is not None: 

86 file_list = file_list[:maxfiles] 

87 

88 for f in file_list: 

89 if key in f['title']: 

90 # print('title: %s, id: %s' % (f['title'],f['id'])) 

91 f.GetContentFile(join(output, f['title'])) 

92 rel.append((f['id'], f['title'])) 

93 

94 return_code = True 

95 

96 else: 

97 return_code, rel = retrieve_relevant_ids(folder, 

98 key, 

99 prohibited_name_part=antikey, 

100 whitelist=whitelist, 

101 extensions=extensions) 

102 

103 if return_code: 

104 for i, pair in enumerate(rel): 

105 idx, name = rel[i] 

106 gdown.download(id=idx, output=os.path.join(output, name)) 

107 

108 else: 

109 raise FileNotFoundError('Error in downloading procedure!') 

110 

111 return return_code, rel, load_log 

112 

113 

114def download_gdrive_data(data_router, 

115 expname, 

116 whitelist=['Timing.xlsx'], 

117 via_pydrive=False, 

118 data_pieces=None, 

119 tdir='DRIADA data', 

120 gauth=None): 

121 

122 with Capturing() as load_log: 

123 print('-------------------------------------------------------------') 

124 print(f'Extracting data for {expname} from Google Drive') 

125 print('-------------------------------------------------------------') 

126 

127 with warnings.catch_warnings(): 

128 warnings.filterwarnings("ignore", category=DeprecationWarning) 

129 

130 success = False 

131 available_exp = data_router['Эксперимент'].values 

132 if expname not in available_exp: 

133 print(f'{expname} not found in available experiments: {available_exp}') 

134 return success, load_log 

135 

136 row = data_router[data_router['Эксперимент'] == expname] 

137 links = dict(zip(row.columns, row.values[0])) 

138 

139 os.makedirs(join(tdir, expname), exist_ok=True) 

140 if data_pieces is None: 

141 data_pieces = [d for d in list(data_router.columns.values) if d not in ['Эксперимент', 'Краткое описание', 'Video', 'Aligned data', 'Computation results']] 

142 

143 for key in data_pieces: 

144 if 'http' in links[key]: 

145 print(f'Loading data: {key}...') 

146 ddir = join(tdir, expname, key) 

147 os.makedirs(ddir, exist_ok=True) 

148 # gdown.download_folder(url = links[key], output = dir, quiet=False) 

149 return_code, rel, folder_log = download_part_of_folder(ddir, 

150 links[key], 

151 key=expname, 

152 whitelist=whitelist, 

153 via_pydrive=via_pydrive, 

154 gauth=gauth) 

155 

156 load_log.extend(folder_log) 

157 

158 if len(rel) == 0: 

159 os.rmdir(ddir) 

160 print('No relevant data found at: ', links[key]) 

161 

162 else: 

163 loaded_names = [r[1] for r in rel] 

164 for n in loaded_names: 

165 print(n) 

166 success = True 

167 

168 print('--------------------------') 

169 

170 return success, load_log 

171 

172 

173def initialize_iabs_router(root='\\content'): 

174 router_name = 'IABS data router.xlsx' 

175 router_path = join(root, router_name) 

176 os.makedirs(root, exist_ok=True) 

177 if router_name in os.listdir(root): 

178 os.remove(router_path) 

179 

180 global_data_table_url = 'https://docs.google.com/spreadsheets/d/130DDFAoAbmm0jcKLBF6xsWsQLDr2Zsj4cPuOYivXoM8/export?format=xlsx' 

181 wget.download(global_data_table_url, out=router_path) 

182 

183 data_router = pd.read_excel(router_path) 

184 #data_router.fillna(method='ffill', inplace=True) 

185 data_router = data_router.replace("", None).ffill() 

186 

187 data_pieces = [d for d in list(data_router.columns.values) if d not in ['Эксперимент', 

188 'Краткое описание', 

189 'Video', 

190 'Aligned data', 

191 'Computation results'] 

192 ] 

193 return data_router, data_pieces