Coverage for src/driada/gdrive/upload.py: 0.00%
37 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
1import os.path
2from datetime import datetime
3import pytz
4from pydrive2.drive import GoogleDrive
6from .gdrive_utils import *
7from .download import retrieve_relevant_ids
9def get_datetime():
10 tz = pytz.timezone('Europe/Moscow')
11 now = datetime.now(tz)
13 dt_string = now.strftime("%d-%m-%Y %H:%M:%S")
14 return dt_string
17def save_file_to_gdrive(data_router,
18 expname,
19 path_to_file,
20 link=None,
21 destination=None,
22 force_rewriting=False,
23 gauth=None):
25 drive = GoogleDrive(gauth)
27 if link is None:
28 row = data_router[data_router['Эксперимент'] == expname]
29 links = dict(zip(row.columns, row.values[0]))
30 if destination not in links:
31 raise ValueError(f'Wrong folder name: {destination}')
32 link = links[destination]
34 fid = id_from_link(link)
36 dataname = os.path.basename(path_to_file)
37 if force_rewriting:
38 return_code, rel = retrieve_relevant_ids(link,
39 dataname,
40 whitelist=[],
41 extensions=['.npz', '.csv', 'xlsx'])
42 if len(rel) != 0:
43 if len(rel) > 1:
44 print('More than one relevant file found, which is suspicious. Consider manual check')
46 existing_file_data = rel[0]
47 dat_id, dat_name = existing_file_data
48 f = drive.CreateFile({'id': dat_id})
50 else:
51 print(f'Data for {dataname} not found in folder, nothing to rewrite')
52 f = drive.CreateFile({'title': dataname, "parents": [{"kind": "drive#fileLink", "id": fid}]})
54 else:
55 date_time = get_datetime()
56 ext = os.path.splitext(path_to_file)[1]
57 dataname = dataname[:-len(ext)] + date_time + ext
58 f = drive.CreateFile({'title': dataname, "parents": [{"kind": "drive#fileLink", "id": fid}]})
60 f.SetContentFile(path_to_file)
61 f.Upload()