Coverage for gcsfs/inventory_report.py: 88%
139 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1from datetime import datetime
4class InventoryReport:
5 """
6 A utility class for fetching and processing inventory reports from GCS.
8 The 'InventoryReport' class provides logic to support logic to fetch
9 inventory reports, and process their content to obtain a final snapshot
10 of objects in the latest inventory reports.
12 High-Level Functionality:
13 ------------------------
14 1. Fetching Inventory Reports:
15 - The class offers methods to fetch inventory report configurations and
16 metadata from GCS.
17 - It validates the inventory report information provided by the user.
18 - Inventory report configurations include options for parsing CSV format
19 and specifying the bucket and destination path.
21 2. Parsing and Processing Inventory Report Content:
22 - The class processes the raw content of inventory reports to extract
23 object details such as name, size, etc.
24 - It supports listing objects using a snapshot option or filtering
25 based on a user-defined prefix.
26 - The class handles CSV parsing, removes header (if specified), and
27 fetches required object metadata.
29 3. Constructing the Final Snapshot:
30 - If the user wishes to use the snapshot to do listing directly, the
31 snapshot will contain the relevant object details and subdirectory
32 prefixes, filtered by the prefix.
34 - If the user wishes to use the snapshot as a starting point for async
35 listing, the snapshot will only contain a list of object names,
36 filtered by the prefix.
38 Note:
39 -----
40 - The class should only be internally used in the 'GCSFileSystem' as an
41 optional configuration during listing.
43 Example Usage:
44 --------------
45 # Should already be instanted in 'core.py'
46 gcs_file_system = GCSFileSystem(...)
48 # User defines inventory report information
49 inventory_report_info = {
50 "use_snapshot_listing": True,
51 "location": "us-east1",
52 "id": "inventory_report_id"
53 }
55 # User defines a prefix for filtering objects
56 prefix = "prefix/"
58 # Fetch the snapshot based on inventory reports
59 items, prefixes = await InventoryReport.fetch_snapshot(
60 gcs_file_system, inventory_report_info, prefix)
61 """
63 # HTTP endpoint of the Storage Insights Service.
64 BASE_URL = "https://storageinsights.googleapis.com/v1"
66 @classmethod
67 async def fetch_snapshot(cls, gcs_file_system, inventory_report_info, prefix):
68 """
69 Main entry point of the 'InventoryReport' class.
70 Fetches the latest snapshot of objects based on inventory report configuration.
72 Parameters:
73 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
74 class (see 'core.py').
75 inventory_report_info (dict): A client-configured dictionary
76 containing inventory report information.
77 prefix (str): Listing prefix specified by the client.
79 Returns:
80 tuple: A tuple containing two lists: the 'items' list representing
81 object details for the snapshot, and the 'prefixes' list containing
82 subdirectory prefixes.
84 Note: when 'use_snapshot_listing' in 'inventory_report_info' is set
85 to False, the 'prefixes' list will be empty, and the 'items' list
86 will contain only the object names.
87 """
88 # Validate the inventory report info that the user passes in.
89 cls._validate_inventory_report_info(inventory_report_info)
91 # Parse the inventory report info.
92 use_snapshot_listing = inventory_report_info.get("use_snapshot_listing")
93 inventory_report_location = inventory_report_info.get("location")
94 inventory_report_id = inventory_report_info.get("id")
96 # Fetch the inventory report configuration.
97 raw_inventory_report_config = await cls._fetch_raw_inventory_report_config(
98 gcs_file_system=gcs_file_system,
99 location=inventory_report_location,
100 id=inventory_report_id,
101 )
103 # Parse the inventory report configuration.
104 inventory_report_config = cls._parse_raw_inventory_report_config(
105 raw_inventory_report_config=raw_inventory_report_config,
106 use_snapshot_listing=use_snapshot_listing,
107 )
109 # Use the config to fetch all inventory report metadata.
110 unsorted_inventory_report_metadata = await cls._fetch_inventory_report_metadata(
111 gcs_file_system=gcs_file_system,
112 inventory_report_config=inventory_report_config,
113 )
115 # Sort the metadata based on reverse created time order.
116 inventory_report_metadata = cls._sort_inventory_report_metadata(
117 unsorted_inventory_report_metadata=unsorted_inventory_report_metadata
118 )
120 # Download the most recent inventory reports in raw form.
121 bucket = inventory_report_config.bucket
122 inventory_report_content = await cls._download_inventory_report_content(
123 gcs_file_system=gcs_file_system,
124 inventory_report_metadata=inventory_report_metadata,
125 bucket=bucket,
126 )
128 # Parse the raw inventory reports into snapshot objects.
129 objects = cls._parse_inventory_report_content(
130 gcs_file_system=gcs_file_system,
131 inventory_report_content=inventory_report_content,
132 inventory_report_config=inventory_report_config,
133 use_snapshot_listing=use_snapshot_listing,
134 bucket=bucket,
135 )
137 # Construct the final snapshot based on the fetched objects.
138 snapshot = cls._construct_final_snapshot(
139 objects=objects, prefix=prefix, use_snapshot_listing=use_snapshot_listing
140 )
142 # Return the final snapshot.
143 return snapshot
145 def _validate_inventory_report_info(inventory_report_info):
146 """
147 Validates the inventory report information dictionary that user
148 passes in.
150 Parameters:
151 inventory_report_info (dict): A dictionary containing the inventory
152 report information with the following keys:
153 - "use_snapshot_listing" (bool): A flag indicating whether
154 to use snapshot listing in the inventory report.
155 - "location" (str): The location of the inventory report in GCS.
156 - "id" (str): The ID of the inventory report in GCS.
158 Raises:
159 ValueError: If any required key (use_snapshot_listing, location, id)
160 is missing from the inventory_report_info dictionary.
161 """
162 if "use_snapshot_listing" not in inventory_report_info:
163 raise ValueError("Use snapshot listing is not configured.")
164 if "location" not in inventory_report_info:
165 raise ValueError("Inventory report location is not configured.")
166 if "id" not in inventory_report_info:
167 raise ValueError("Inventory report id is not configured.")
169 async def _fetch_raw_inventory_report_config(gcs_file_system, location, id):
170 """
171 Fetches the raw inventory report configuration from GCS based on the
172 specified location and ID.
174 Parameters:
175 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
176 class (see 'core.py').
177 location (str): The location of the inventory report in GCS.
178 id (str): The ID of the inventory report in GCS.
180 Returns:
181 dict: A dictionary containing the raw inventory report
182 configuration retrieved from GCS.
184 Raises:
185 Exception: If there is an error while fetching the inventory
186 report configuration.
187 """
188 project = gcs_file_system.project
189 url = "{}/projects/{}/locations/{}/reportConfigs/{}"
190 url = url.format(InventoryReport.BASE_URL, project, location, id)
191 try:
192 raw_inventory_report_config = await gcs_file_system._call(
193 "GET", url, json_out=True
194 )
195 return raw_inventory_report_config
196 except Exception as e:
197 raise ValueError(
198 f"Error encountered when fetching inventory report config: {e}."
199 )
201 def _parse_raw_inventory_report_config(
202 raw_inventory_report_config, use_snapshot_listing
203 ):
204 """
205 Parses the raw inventory report configuration and validates its properties.
207 Parameters:
208 raw_inventory_report_config (dict): A dictionary containing the raw
209 inventory report configuration retrieved from GCS.
210 use_snapshot_listing (bool): A flag indicating whether to use snapshot
211 listing in the inventory report.
213 Returns:
214 InventoryReportConfig: An instance of the InventoryReportConfig
215 class representing the parsed inventory report configuration.
217 Raises:
218 ValueError: If the current date is outside the start and
219 end range specified in the inventory report config.
220 ValueError: If the "name" field is not present in the metadata
221 fields of the report config.
222 ValueError: If "size" field is not present in the metadata
223 fields and use_snapshot_listing is True.
224 """
225 # Parse the report config.
226 frequency_options = raw_inventory_report_config.get("frequencyOptions")
227 start_date = InventoryReport._convert_obj_to_date(
228 frequency_options.get("startDate")
229 )
230 end_date = InventoryReport._convert_obj_to_date(
231 frequency_options.get("endDate")
232 )
233 object_metadata_report_options = raw_inventory_report_config.get(
234 "objectMetadataReportOptions"
235 )
236 storage_destination_options = object_metadata_report_options.get(
237 "storageDestinationOptions"
238 )
240 # Save relevant report config properties.
241 csv_options = raw_inventory_report_config.get("csvOptions")
242 bucket = storage_destination_options.get("bucket")
243 destination_path = storage_destination_options.get("destinationPath")
244 metadata_fields = object_metadata_report_options.get("metadataFields")
246 # Validate date, making sure the current date is within the start and end range.
247 today = datetime.now()
248 if today < start_date or today > end_date:
249 raise ValueError(
250 f"Current date {today} is outside the range \
251 {start_date} and {end_date} specified by the inventory report config."
252 )
254 # Validate object name exists in the metadata fields.
255 # Note that the size field is mandated to be included in the
256 # config when the client sets up the inventory report.
257 obj_name_idx = metadata_fields.index("name")
259 # If the user wants to do listing based on the snapshot, also
260 # validate the report contains size metadata for each object.
261 if use_snapshot_listing:
262 try:
263 metadata_fields.index("size")
264 except ValueError:
265 raise ValueError(
266 "If you want to use the snapshot for listing, the object size \
267 metadata has to be included in the inventory report."
268 )
270 # Finally, construct and return the inventory report config.
271 inventory_report_config = InventoryReportConfig(
272 csv_options=csv_options,
273 bucket=bucket,
274 destination_path=destination_path,
275 metadata_fields=metadata_fields,
276 obj_name_idx=obj_name_idx,
277 )
279 return inventory_report_config
281 async def _fetch_inventory_report_metadata(
282 gcs_file_system, inventory_report_config
283 ):
284 """
285 Fetches all inventory report metadata from GCS based on the specified
286 inventory report config.
288 Parameters:
289 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
290 class (see 'core.py').
291 inventory_report_config (InventoryReportConfig): An instance of
292 the InventoryReportConfig class representing the inventory report
293 configuration.
295 Returns:
296 list: A list containing dictionaries representing the metadata of
297 objects from the inventory reports.
299 Raises:
300 ValueError: If the fetched inventory reports are empty.
301 """
302 # There might be multiple inventory reports in the bucket.
303 inventory_report_metadata = []
305 # Extract out bucket and destination path of the inventory reports.
306 bucket = inventory_report_config.bucket
307 destination_path = inventory_report_config.destination_path
309 # Fetch the first page.
310 page = await gcs_file_system._call(
311 "GET", "b/{}/o", bucket, prefix=destination_path, json_out=True
312 )
314 inventory_report_metadata.extend(page.get("items", []))
315 next_page_token = page.get("nextPageToken", None)
317 # Keep fetching new pages as long as next page token exists.
318 # Note that the iteration in the while loop should most likely
319 # be minimal. For reference, a million objects is split up into
320 # two reports, and if the report is generated daily, then in a year,
321 # there will be roughly ~700 reports generated, which will still be
322 # fetched in a single page.
323 while next_page_token is not None:
324 page = await gcs_file_system._call(
325 "GET",
326 "b/{}/o",
327 bucket,
328 prefix=destination_path,
329 json_out=True,
330 pageToken=next_page_token,
331 )
333 inventory_report_metadata.extend(page.get("items", []))
334 next_page_token = page.get("nextPageToken", None)
336 # If no reports are fetched, indicates there is an error.
337 if len(inventory_report_metadata) == 0:
338 raise ValueError(
339 "No inventory reports to fetch. Check if \
340 your inventory report is set up correctly."
341 )
343 return inventory_report_metadata
345 def _sort_inventory_report_metadata(unsorted_inventory_report_metadata):
346 """
347 Sorts the inventory report metadata based on the 'timeCreated' field
348 in reverse chronological order.
350 Parameters:
351 unsorted_inventory_report_metadata (list): A list of dictionaries
352 representing the metadata of objects from the inventory reports.
354 Returns:
355 list: A sorted list of dictionaries representing the inventory
356 report metadata, sorted in reverse chronological order based
357 on 'timeCreated'.
358 """
359 return sorted(
360 unsorted_inventory_report_metadata,
361 key=lambda ir: InventoryReport._convert_str_to_datetime(
362 ir.get("timeCreated")
363 ),
364 reverse=True,
365 )
367 async def _download_inventory_report_content(
368 gcs_file_system, inventory_report_metadata, bucket
369 ):
370 """
371 Downloads the most recent inventory report content from GCS based on
372 the inventory report metadata.
374 Parameters:
375 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
376 class (see 'core.py').
377 inventory_report_metadata (list): A list of dictionaries
378 representing the metadata of objects from the inventory reports.
379 bucket (str): The name of the GCS bucket containing
380 the inventory reports.
382 Returns:
383 list: A list containing the content of the most recent inventory
384 report as strings.
385 """
386 # Get the most recent inventory report date.
387 most_recent_inventory_report = inventory_report_metadata[0]
388 most_recent_date = InventoryReport._convert_str_to_datetime(
389 most_recent_inventory_report.get("timeCreated")
390 ).date()
392 inventory_report_content = []
394 # Run a for loop here, since there might be multiple inventory reports
395 # generated on the same day. For reference, 1 million objects will be
396 # split into only 2 inventory reports, so it is very rare that there
397 # will be many inventory reports on the same day. But including this
398 # logic for robustness.
399 for metadata in inventory_report_metadata:
400 inventory_report_date = InventoryReport._convert_str_to_datetime(
401 metadata["timeCreated"]
402 ).date()
404 if inventory_report_date == most_recent_date:
405 # Download the raw inventory report if the date matches.
406 # Header is not needed, we only need to process and store
407 # the content.
408 _header, encoded_content = await gcs_file_system._call(
409 "GET", "b/{}/o/{}", bucket, metadata.get("name"), alt="media"
410 )
412 # Decode the binary content into string for the content.
413 decoded_content = encoded_content.decode()
415 inventory_report_content.append(decoded_content)
417 return inventory_report_content
419 def _parse_inventory_report_content(
420 gcs_file_system,
421 inventory_report_content,
422 inventory_report_config,
423 use_snapshot_listing,
424 bucket,
425 ):
426 """
427 Parses the raw inventory report content and extracts object details.
429 Parameters:
430 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
431 class (see 'core.py').
432 inventory_report_content (list): A list of strings containing the
433 raw content of the inventory report.
434 inventory_report_config (InventoryReportConfig): An instance of the
435 InventoryReportConfig class representing the inventory report
436 configuration.
437 use_snapshot_listing (bool): A flag indicating whether to use snapshot
438 listing in the inventory report.
439 bucket (str): The name of the GCS bucket containing the inventory
440 reports.
442 Returns:
443 list: A list of dictionaries representing object details parsed
444 from the inventory report content.
445 """
446 # Get the csv configuration for each inventory report.
447 csv_options = inventory_report_config.csv_options
448 record_separator = csv_options.get("recordSeparator", "\n")
449 delimiter = csv_options.get("delimiter", ",")
450 header_required = csv_options.get("headerRequired", False)
452 objects = []
454 for content in inventory_report_content:
455 # Split the content into lines based on the specified separator.
456 lines = content.split(record_separator)
458 # Remove the header, if present.
459 if header_required:
460 lines = lines[1:]
462 # Parse each line of the inventory report.
463 for line in lines:
464 obj = InventoryReport._parse_inventory_report_line(
465 inventory_report_line=line,
466 use_snapshot_listing=use_snapshot_listing,
467 gcs_file_system=gcs_file_system,
468 inventory_report_config=inventory_report_config,
469 delimiter=delimiter,
470 bucket=bucket,
471 )
473 objects.append(obj)
475 return objects
477 def _parse_inventory_report_line(
478 inventory_report_line,
479 use_snapshot_listing,
480 gcs_file_system,
481 inventory_report_config,
482 delimiter,
483 bucket,
484 ):
485 """
486 Parses a single line of the inventory report and extracts object details.
488 Parameters:
489 inventory_report_line (str): A string representing a single line of
490 the raw content from the inventory report.
491 use_snapshot_listing (bool): A flag indicating whether to use snapshot
492 listing in the inventory report.
493 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem'
494 class (see 'core.py').
495 inventory_report_config (InventoryReportConfig): An instance of the
496 InventoryReportConfig class representing the inventory report
497 configuration.
498 delimiter (str): The delimiter used in the inventory report content
499 to separate fields.
500 bucket (str): The name of the GCS bucket containing the inventory
501 reports.
503 Returns:
504 dict: A dictionary representing object details parsed from the
505 inventory report line.
506 """
507 obj_name_idx = inventory_report_config.obj_name_idx
508 metadata_fields = inventory_report_config.metadata_fields
510 # If the client wants to do listing from the snapshot, we need
511 # to fetch all the metadata for each object. Otherwise, we only
512 # need to fetch the name.
513 if use_snapshot_listing is True:
514 obj = gcs_file_system._process_object(
515 {
516 key: value
517 for key, value in zip(
518 metadata_fields, inventory_report_line.strip().split(delimiter)
519 )
520 },
521 bucket,
522 )
523 else:
524 obj = {"name": inventory_report_line.strip().split(delimiter)[obj_name_idx]}
526 return obj
528 def _construct_final_snapshot(objects, prefix, use_snapshot_listing):
529 """
530 Constructs the final snapshot based on the retrieved objects and prefix.
532 Parameters:
533 objects (list): A list of dictionaries representing object details
534 from the inventory report.
535 prefix (str): A prefix used to filter objects in the snapshot based
536 on their names.
537 use_snapshot_listing (bool): A flag indicating whether to use snapshot
538 listing in the inventory report.
540 Returns:
541 tuple: A tuple containing two lists: the 'items' list representing
542 object details for the snapshot, and the 'prefixes' list containing
543 subdirectory prefixes. If 'use_snapshot_listing' is set to False,
544 'prefix' will also be empty, and 'items' will contains the object
545 names in the snapshot.
546 """
547 if prefix is None:
548 prefix = ""
550 # Filter the prefix and returns the list if the user does not want to use
551 # the snapshot for listing.
552 if use_snapshot_listing is False:
553 return [obj for obj in objects if obj.get("name").startswith(prefix)], []
555 else:
556 # If the user wants to use the snapshot, generate both the items and
557 # prefixes manually.
558 items = []
559 prefixes = set()
561 for obj in objects:
562 # Fetch the name of the object.
563 obj_name = obj.get("name")
565 # If the object name doesn't start with the prefix, continue.
566 # In the case where prefix is empty, it will always return
567 # true (which is the expected behavior).
568 if not obj_name.startswith(prefix):
569 continue
571 # Remove the prefix.
572 object_name_no_prefix = obj_name[len(prefix) :]
574 # Determine whether the object name is a directory.
575 first_delimiter_idx = object_name_no_prefix.find("/")
577 # If not, then append it to items.
578 if first_delimiter_idx == -1:
579 items.append(obj)
580 continue
582 # If it is, recompose the directory and add to the prefix set.
583 dir = object_name_no_prefix[:first_delimiter_idx]
584 obj_prefix = (
585 prefix.rstrip("/")
586 + ("" if prefix == "" else "/")
587 + dir
588 + ("" if dir == "" else "/")
589 )
590 prefixes.add(obj_prefix)
592 return items, list(prefixes)
594 @staticmethod
595 def _convert_obj_to_date(obj):
596 """
597 Converts a dictionary representing a date object to a datetime object.
599 Parameters:
600 obj (dict): A dictionary representing a date object with keys "day",
601 "month", and "year".
603 Returns:
604 datetime: A datetime object representing the converted date.
605 """
606 day = obj["day"]
607 month = obj["month"]
608 year = obj["year"]
609 return datetime(year, month, day)
611 @staticmethod
612 def _convert_str_to_datetime(str):
613 """
614 Converts an ISO-formatted date string to a datetime object.
616 Parameters:
617 date_string (str): An ISO-formatted date string with or without
618 timezone information (Z).
620 Returns:
621 datetime: A datetime object representing the converted date and time.
622 """
623 return datetime.fromisoformat(str.replace("Z", "+00:00"))
626class InventoryReportConfig:
627 """
628 Represents the configuration for fetching inventory reports.
630 Attributes:
631 csv_options (dict): A dictionary containing options for parsing CSV
632 format in the inventory reports.
633 bucket (str): The name of the GCS bucket from which to fetch the
634 inventory reports.
635 destination_path (str): The path within the GCS bucket where the
636 inventory reports are stored.
637 metadata_fields (list): A list of strings representing metadata
638 fields to be extracted from the inventory reports.
639 obj_name_idx (int): The index of the "name" field in the 'metadata_fields'
640 list, used to identify object names.
641 """
643 def __init__(
644 self, csv_options, bucket, destination_path, metadata_fields, obj_name_idx
645 ):
646 self.csv_options = csv_options
647 self.bucket = bucket
648 self.destination_path = destination_path
649 self.metadata_fields = metadata_fields
650 self.obj_name_idx = obj_name_idx