Coverage for src/driada/gdrive/upload.py: 0.00%

37 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-25 15:40 +0300

1import os.path 

2from datetime import datetime 

3import pytz 

4from pydrive2.drive import GoogleDrive 

5 

6from .gdrive_utils import * 

7from .download import retrieve_relevant_ids 

8 

9def get_datetime(): 

10 tz = pytz.timezone('Europe/Moscow') 

11 now = datetime.now(tz) 

12 

13 dt_string = now.strftime("%d-%m-%Y %H:%M:%S") 

14 return dt_string 

15 

16 

17def save_file_to_gdrive(data_router, 

18 expname, 

19 path_to_file, 

20 link=None, 

21 destination=None, 

22 force_rewriting=False, 

23 gauth=None): 

24 

25 drive = GoogleDrive(gauth) 

26 

27 if link is None: 

28 row = data_router[data_router['Эксперимент'] == expname] 

29 links = dict(zip(row.columns, row.values[0])) 

30 if destination not in links: 

31 raise ValueError(f'Wrong folder name: {destination}') 

32 link = links[destination] 

33 

34 fid = id_from_link(link) 

35 

36 dataname = os.path.basename(path_to_file) 

37 if force_rewriting: 

38 return_code, rel = retrieve_relevant_ids(link, 

39 dataname, 

40 whitelist=[], 

41 extensions=['.npz', '.csv', 'xlsx']) 

42 if len(rel) != 0: 

43 if len(rel) > 1: 

44 print('More than one relevant file found, which is suspicious. Consider manual check') 

45 

46 existing_file_data = rel[0] 

47 dat_id, dat_name = existing_file_data 

48 f = drive.CreateFile({'id': dat_id}) 

49 

50 else: 

51 print(f'Data for {dataname} not found in folder, nothing to rewrite') 

52 f = drive.CreateFile({'title': dataname, "parents": [{"kind": "drive#fileLink", "id": fid}]}) 

53 

54 else: 

55 date_time = get_datetime() 

56 ext = os.path.splitext(path_to_file)[1] 

57 dataname = dataname[:-len(ext)] + date_time + ext 

58 f = drive.CreateFile({'title': dataname, "parents": [{"kind": "drive#fileLink", "id": fid}]}) 

59 

60 f.SetContentFile(path_to_file) 

61 f.Upload()