Coverage for src/driada/gdrive/gdrive_utils.py: 52.70%
74 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-25 15:40 +0300
1import requests
2import regex
3import os
4from bs4 import BeautifulSoup
5from itertools import islice
6import json
8class GoogleDriveFile(object):
9 """Represent Google Drive file objects structure.
11 Attributes
12 ----------
13 id: str
14 Unique id, used to build the download URL.
15 name: str
16 Actual name, used as file name.
17 type: str
18 MIME type, or application/vnd.google-apps.folder if it is a folder
19 children: List[GoogleDriveFile]
20 If it is a directory, it contains the folder files/directories
22 """
24 def __init__(self, id, name, type, children=None):
25 self.id = id
26 self.name = name
27 self.type = type
28 self.children = children if children is not None else []
30 def is_folder(self):
31 return self.type == folder_type
33 def __repr__(self):
34 template = "(id={id}, name={name}, type={type}, children={children})"
35 return "GoogleDriveFile" + template.format(
36 id=self.id,
37 name=self.name,
38 type=self.type,
39 children=self.children,
40 )
43def parse_google_drive_file(folder, content, use_cookies=True):
44 """Extracts information about the current page file and its children
46 Parameters
47 ----------
48 folder: str
49 URL of the Google Drive folder.
50 Must be of the format 'https://drive.google.com/drive/folders/{url}'.
51 content: str
52 Google Drive's raw string
54 Returns
55 -------
56 gdrive_file: GoogleDriveFile
57 Current GoogleDriveFile, with empty children
58 id_name_type_iter: Iterator
59 Tuple iterator of each children id, name, type
60 """
61 folder_soup = BeautifulSoup(content, features="html.parser")
63 if not use_cookies:
64 client.cookies.clear()
66 # finds the script tag with window['_DRIVE_ivd']
67 encoded_data = None
68 for script in folder_soup.select("script"):
69 inner_html = script.decode_contents()
71 if "_DRIVE_ivd" in inner_html:
72 # first js string is _DRIVE_ivd, the second one is the encoded arr
73 regex_iter = string_regex.finditer(inner_html)
74 # get the second elem in the iter
75 try:
76 encoded_data = next(islice(regex_iter, 1, None)).group(1)
77 except StopIteration:
78 raise RuntimeError(
79 "Couldn't find the folder encoded JS string"
80 )
81 break
83 if encoded_data is None:
84 raise RuntimeError(
85 "Cannot retrieve the folder information from the link. "
86 "You may need to change the permission to "
87 "'Anyone with the link'."
88 )
90 # decodes the array and evaluates it as a python array
91 decoded = encoded_data.encode("utf-8").decode("unicode_escape")
92 folder_arr = json.loads(decoded)
94 folder_contents = [] if folder_arr[0] is None else folder_arr[0]
96 gdrive_file = GoogleDriveFile(
97 id=folder.split("/")[-1],
98 name=" - ".join(folder_soup.title.contents[0].split(" - ")[:-1]),
99 type=folder_type,
100 )
102 id_name_type_iter = [
103 (e[0], e[2].encode("raw_unicode_escape").decode("utf-8"), e[3])
104 for e in folder_contents
105 ]
107 return gdrive_file, id_name_type_iter
110def download_and_parse_google_drive_link(
111 folder,
112 quiet=False,
113 use_cookies=True,
114 remaining_ok=False,
115 name_part=''
116):
117 """Get folder structure of Google Drive folder URL.
119 Parameters
120 ----------
121 folder: str
122 URL of the Google Drive folder.
123 Must be of the format 'https://drive.google.com/drive/folders/{url}'.
124 quiet: bool, optional
125 Suppress terminal output.
126 use_cookies: bool, optional
127 Flag to use cookies. Default is True.
128 remaining_ok: bool, optional
129 Flag that ensures that is ok to let some file to not be downloaded,
130 since there is a limitation of how many items gdown can download,
131 default is False.
133 Returns
134 -------
135 return_code: bool
136 Returns False if the download completed unsuccessfully.
137 May be due to invalid URLs, permission errors, rate limits, etc.
138 gdrive_file: GoogleDriveFile
139 Returns the folder structure of the Google Drive folder.
140 """
141 return_code = True
143 folder_page = client.get(folder)
145 if folder_page.status_code != 200:
146 return False, None
148 gdrive_file, id_name_type_iter = parse_google_drive_file(
149 folder,
150 folder_page.text,
151 )
153 for child_id, child_name, child_type in id_name_type_iter:
154 if name_part in child_name:
155 if child_type != folder_type:
156 if not quiet:
157 print(
158 "Processing file",
159 child_id,
160 child_name,
161 )
162 gdrive_file.children.append(
163 GoogleDriveFile(
164 id=child_id,
165 name=child_name,
166 type=child_type,
167 )
168 )
169 if not return_code:
170 return return_code, None
171 continue
173 if not quiet:
174 print(
175 "Retrieving folder",
176 child_id,
177 child_name,
178 )
179 return_code, child = download_and_parse_google_drive_link(
180 folders_url + child_id,
181 use_cookies=use_cookies,
182 quiet=quiet,
183 )
184 if not return_code:
185 return return_code, None
186 gdrive_file.children.append(child)
188 has_at_least_max_files = len(gdrive_file.children) == MAX_NUMBER_FILES
189 if not remaining_ok and has_at_least_max_files:
190 err_msg = " ".join(
191 [
192 "The gdrive folder with url: {url}".format(url=folder),
193 "has at least {max} files,".format(max=MAX_NUMBER_FILES),
194 "gdrive can't download more than this limit,",
195 "if you are ok with this,",
196 "please run again with --remaining-ok flag.",
197 ]
198 )
199 raise RuntimeError(err_msg)
200 return return_code, gdrive_file
203def id_from_link(link):
204 if 'http' not in link:
205 raise ValueError('Wrong link format')
207 if 'id=' in link:
208 return link.split('id=')[-1].split('&')[0]
209 else:
210 return link.split('folders/')[-1].split('?')[0]
213folders_url = "https://drive.google.com/drive/folders/"
214files_url = "https://drive.google.com/uc?id="
215folder_type = "application/vnd.google-apps.folder"
217string_regex = regex.compile(r"'((?:[^'\\]|\\.)*)'")
218MAX_NUMBER_FILES = 50
220client = requests.session()