Coverage for gcsfs/inventory_report.py: 88%

139 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1from datetime import datetime 

2 

3 

4class InventoryReport: 

5 """ 

6 A utility class for fetching and processing inventory reports from GCS. 

7 

8 The 'InventoryReport' class provides logic to support logic to fetch 

9 inventory reports, and process their content to obtain a final snapshot 

10 of objects in the latest inventory reports. 

11 

12 High-Level Functionality: 

13 ------------------------ 

14 1. Fetching Inventory Reports: 

15 - The class offers methods to fetch inventory report configurations and 

16 metadata from GCS. 

17 - It validates the inventory report information provided by the user. 

18 - Inventory report configurations include options for parsing CSV format 

19 and specifying the bucket and destination path. 

20 

21 2. Parsing and Processing Inventory Report Content: 

22 - The class processes the raw content of inventory reports to extract 

23 object details such as name, size, etc. 

24 - It supports listing objects using a snapshot option or filtering 

25 based on a user-defined prefix. 

26 - The class handles CSV parsing, removes header (if specified), and 

27 fetches required object metadata. 

28 

29 3. Constructing the Final Snapshot: 

30 - If the user wishes to use the snapshot to do listing directly, the 

31 snapshot will contain the relevant object details and subdirectory 

32 prefixes, filtered by the prefix. 

33 

34 - If the user wishes to use the snapshot as a starting point for async 

35 listing, the snapshot will only contain a list of object names, 

36 filtered by the prefix. 

37 

38 Note: 

39 ----- 

40 - The class should only be internally used in the 'GCSFileSystem' as an 

41 optional configuration during listing. 

42 

43 Example Usage: 

44 -------------- 

45 # Should already be instanted in 'core.py' 

46 gcs_file_system = GCSFileSystem(...) 

47 

48 # User defines inventory report information 

49 inventory_report_info = { 

50 "use_snapshot_listing": True, 

51 "location": "us-east1", 

52 "id": "inventory_report_id" 

53 } 

54 

55 # User defines a prefix for filtering objects 

56 prefix = "prefix/" 

57 

58 # Fetch the snapshot based on inventory reports 

59 items, prefixes = await InventoryReport.fetch_snapshot( 

60 gcs_file_system, inventory_report_info, prefix) 

61 """ 

62 

63 # HTTP endpoint of the Storage Insights Service. 

64 BASE_URL = "https://storageinsights.googleapis.com/v1" 

65 

66 @classmethod 

67 async def fetch_snapshot(cls, gcs_file_system, inventory_report_info, prefix): 

68 """ 

69 Main entry point of the 'InventoryReport' class. 

70 Fetches the latest snapshot of objects based on inventory report configuration. 

71 

72 Parameters: 

73 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

74 class (see 'core.py'). 

75 inventory_report_info (dict): A client-configured dictionary 

76 containing inventory report information. 

77 prefix (str): Listing prefix specified by the client. 

78 

79 Returns: 

80 tuple: A tuple containing two lists: the 'items' list representing 

81 object details for the snapshot, and the 'prefixes' list containing 

82 subdirectory prefixes. 

83 

84 Note: when 'use_snapshot_listing' in 'inventory_report_info' is set 

85 to False, the 'prefixes' list will be empty, and the 'items' list 

86 will contain only the object names. 

87 """ 

88 # Validate the inventory report info that the user passes in. 

89 cls._validate_inventory_report_info(inventory_report_info) 

90 

91 # Parse the inventory report info. 

92 use_snapshot_listing = inventory_report_info.get("use_snapshot_listing") 

93 inventory_report_location = inventory_report_info.get("location") 

94 inventory_report_id = inventory_report_info.get("id") 

95 

96 # Fetch the inventory report configuration. 

97 raw_inventory_report_config = await cls._fetch_raw_inventory_report_config( 

98 gcs_file_system=gcs_file_system, 

99 location=inventory_report_location, 

100 id=inventory_report_id, 

101 ) 

102 

103 # Parse the inventory report configuration. 

104 inventory_report_config = cls._parse_raw_inventory_report_config( 

105 raw_inventory_report_config=raw_inventory_report_config, 

106 use_snapshot_listing=use_snapshot_listing, 

107 ) 

108 

109 # Use the config to fetch all inventory report metadata. 

110 unsorted_inventory_report_metadata = await cls._fetch_inventory_report_metadata( 

111 gcs_file_system=gcs_file_system, 

112 inventory_report_config=inventory_report_config, 

113 ) 

114 

115 # Sort the metadata based on reverse created time order. 

116 inventory_report_metadata = cls._sort_inventory_report_metadata( 

117 unsorted_inventory_report_metadata=unsorted_inventory_report_metadata 

118 ) 

119 

120 # Download the most recent inventory reports in raw form. 

121 bucket = inventory_report_config.bucket 

122 inventory_report_content = await cls._download_inventory_report_content( 

123 gcs_file_system=gcs_file_system, 

124 inventory_report_metadata=inventory_report_metadata, 

125 bucket=bucket, 

126 ) 

127 

128 # Parse the raw inventory reports into snapshot objects. 

129 objects = cls._parse_inventory_report_content( 

130 gcs_file_system=gcs_file_system, 

131 inventory_report_content=inventory_report_content, 

132 inventory_report_config=inventory_report_config, 

133 use_snapshot_listing=use_snapshot_listing, 

134 bucket=bucket, 

135 ) 

136 

137 # Construct the final snapshot based on the fetched objects. 

138 snapshot = cls._construct_final_snapshot( 

139 objects=objects, prefix=prefix, use_snapshot_listing=use_snapshot_listing 

140 ) 

141 

142 # Return the final snapshot. 

143 return snapshot 

144 

145 def _validate_inventory_report_info(inventory_report_info): 

146 """ 

147 Validates the inventory report information dictionary that user 

148 passes in. 

149 

150 Parameters: 

151 inventory_report_info (dict): A dictionary containing the inventory 

152 report information with the following keys: 

153 - "use_snapshot_listing" (bool): A flag indicating whether 

154 to use snapshot listing in the inventory report. 

155 - "location" (str): The location of the inventory report in GCS. 

156 - "id" (str): The ID of the inventory report in GCS. 

157 

158 Raises: 

159 ValueError: If any required key (use_snapshot_listing, location, id) 

160 is missing from the inventory_report_info dictionary. 

161 """ 

162 if "use_snapshot_listing" not in inventory_report_info: 

163 raise ValueError("Use snapshot listing is not configured.") 

164 if "location" not in inventory_report_info: 

165 raise ValueError("Inventory report location is not configured.") 

166 if "id" not in inventory_report_info: 

167 raise ValueError("Inventory report id is not configured.") 

168 

169 async def _fetch_raw_inventory_report_config(gcs_file_system, location, id): 

170 """ 

171 Fetches the raw inventory report configuration from GCS based on the 

172 specified location and ID. 

173 

174 Parameters: 

175 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

176 class (see 'core.py'). 

177 location (str): The location of the inventory report in GCS. 

178 id (str): The ID of the inventory report in GCS. 

179 

180 Returns: 

181 dict: A dictionary containing the raw inventory report 

182 configuration retrieved from GCS. 

183 

184 Raises: 

185 Exception: If there is an error while fetching the inventory 

186 report configuration. 

187 """ 

188 project = gcs_file_system.project 

189 url = "{}/projects/{}/locations/{}/reportConfigs/{}" 

190 url = url.format(InventoryReport.BASE_URL, project, location, id) 

191 try: 

192 raw_inventory_report_config = await gcs_file_system._call( 

193 "GET", url, json_out=True 

194 ) 

195 return raw_inventory_report_config 

196 except Exception as e: 

197 raise ValueError( 

198 f"Error encountered when fetching inventory report config: {e}." 

199 ) 

200 

201 def _parse_raw_inventory_report_config( 

202 raw_inventory_report_config, use_snapshot_listing 

203 ): 

204 """ 

205 Parses the raw inventory report configuration and validates its properties. 

206 

207 Parameters: 

208 raw_inventory_report_config (dict): A dictionary containing the raw 

209 inventory report configuration retrieved from GCS. 

210 use_snapshot_listing (bool): A flag indicating whether to use snapshot 

211 listing in the inventory report. 

212 

213 Returns: 

214 InventoryReportConfig: An instance of the InventoryReportConfig 

215 class representing the parsed inventory report configuration. 

216 

217 Raises: 

218 ValueError: If the current date is outside the start and 

219 end range specified in the inventory report config. 

220 ValueError: If the "name" field is not present in the metadata 

221 fields of the report config. 

222 ValueError: If "size" field is not present in the metadata 

223 fields and use_snapshot_listing is True. 

224 """ 

225 # Parse the report config. 

226 frequency_options = raw_inventory_report_config.get("frequencyOptions") 

227 start_date = InventoryReport._convert_obj_to_date( 

228 frequency_options.get("startDate") 

229 ) 

230 end_date = InventoryReport._convert_obj_to_date( 

231 frequency_options.get("endDate") 

232 ) 

233 object_metadata_report_options = raw_inventory_report_config.get( 

234 "objectMetadataReportOptions" 

235 ) 

236 storage_destination_options = object_metadata_report_options.get( 

237 "storageDestinationOptions" 

238 ) 

239 

240 # Save relevant report config properties. 

241 csv_options = raw_inventory_report_config.get("csvOptions") 

242 bucket = storage_destination_options.get("bucket") 

243 destination_path = storage_destination_options.get("destinationPath") 

244 metadata_fields = object_metadata_report_options.get("metadataFields") 

245 

246 # Validate date, making sure the current date is within the start and end range. 

247 today = datetime.now() 

248 if today < start_date or today > end_date: 

249 raise ValueError( 

250 f"Current date {today} is outside the range \ 

251 {start_date} and {end_date} specified by the inventory report config." 

252 ) 

253 

254 # Validate object name exists in the metadata fields. 

255 # Note that the size field is mandated to be included in the 

256 # config when the client sets up the inventory report. 

257 obj_name_idx = metadata_fields.index("name") 

258 

259 # If the user wants to do listing based on the snapshot, also 

260 # validate the report contains size metadata for each object. 

261 if use_snapshot_listing: 

262 try: 

263 metadata_fields.index("size") 

264 except ValueError: 

265 raise ValueError( 

266 "If you want to use the snapshot for listing, the object size \ 

267 metadata has to be included in the inventory report." 

268 ) 

269 

270 # Finally, construct and return the inventory report config. 

271 inventory_report_config = InventoryReportConfig( 

272 csv_options=csv_options, 

273 bucket=bucket, 

274 destination_path=destination_path, 

275 metadata_fields=metadata_fields, 

276 obj_name_idx=obj_name_idx, 

277 ) 

278 

279 return inventory_report_config 

280 

281 async def _fetch_inventory_report_metadata( 

282 gcs_file_system, inventory_report_config 

283 ): 

284 """ 

285 Fetches all inventory report metadata from GCS based on the specified 

286 inventory report config. 

287 

288 Parameters: 

289 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

290 class (see 'core.py'). 

291 inventory_report_config (InventoryReportConfig): An instance of 

292 the InventoryReportConfig class representing the inventory report 

293 configuration. 

294 

295 Returns: 

296 list: A list containing dictionaries representing the metadata of 

297 objects from the inventory reports. 

298 

299 Raises: 

300 ValueError: If the fetched inventory reports are empty. 

301 """ 

302 # There might be multiple inventory reports in the bucket. 

303 inventory_report_metadata = [] 

304 

305 # Extract out bucket and destination path of the inventory reports. 

306 bucket = inventory_report_config.bucket 

307 destination_path = inventory_report_config.destination_path 

308 

309 # Fetch the first page. 

310 page = await gcs_file_system._call( 

311 "GET", "b/{}/o", bucket, prefix=destination_path, json_out=True 

312 ) 

313 

314 inventory_report_metadata.extend(page.get("items", [])) 

315 next_page_token = page.get("nextPageToken", None) 

316 

317 # Keep fetching new pages as long as next page token exists. 

318 # Note that the iteration in the while loop should most likely 

319 # be minimal. For reference, a million objects is split up into 

320 # two reports, and if the report is generated daily, then in a year, 

321 # there will be roughly ~700 reports generated, which will still be 

322 # fetched in a single page. 

323 while next_page_token is not None: 

324 page = await gcs_file_system._call( 

325 "GET", 

326 "b/{}/o", 

327 bucket, 

328 prefix=destination_path, 

329 json_out=True, 

330 pageToken=next_page_token, 

331 ) 

332 

333 inventory_report_metadata.extend(page.get("items", [])) 

334 next_page_token = page.get("nextPageToken", None) 

335 

336 # If no reports are fetched, indicates there is an error. 

337 if len(inventory_report_metadata) == 0: 

338 raise ValueError( 

339 "No inventory reports to fetch. Check if \ 

340 your inventory report is set up correctly." 

341 ) 

342 

343 return inventory_report_metadata 

344 

345 def _sort_inventory_report_metadata(unsorted_inventory_report_metadata): 

346 """ 

347 Sorts the inventory report metadata based on the 'timeCreated' field 

348 in reverse chronological order. 

349 

350 Parameters: 

351 unsorted_inventory_report_metadata (list): A list of dictionaries 

352 representing the metadata of objects from the inventory reports. 

353 

354 Returns: 

355 list: A sorted list of dictionaries representing the inventory 

356 report metadata, sorted in reverse chronological order based 

357 on 'timeCreated'. 

358 """ 

359 return sorted( 

360 unsorted_inventory_report_metadata, 

361 key=lambda ir: InventoryReport._convert_str_to_datetime( 

362 ir.get("timeCreated") 

363 ), 

364 reverse=True, 

365 ) 

366 

367 async def _download_inventory_report_content( 

368 gcs_file_system, inventory_report_metadata, bucket 

369 ): 

370 """ 

371 Downloads the most recent inventory report content from GCS based on 

372 the inventory report metadata. 

373 

374 Parameters: 

375 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

376 class (see 'core.py'). 

377 inventory_report_metadata (list): A list of dictionaries 

378 representing the metadata of objects from the inventory reports. 

379 bucket (str): The name of the GCS bucket containing 

380 the inventory reports. 

381 

382 Returns: 

383 list: A list containing the content of the most recent inventory 

384 report as strings. 

385 """ 

386 # Get the most recent inventory report date. 

387 most_recent_inventory_report = inventory_report_metadata[0] 

388 most_recent_date = InventoryReport._convert_str_to_datetime( 

389 most_recent_inventory_report.get("timeCreated") 

390 ).date() 

391 

392 inventory_report_content = [] 

393 

394 # Run a for loop here, since there might be multiple inventory reports 

395 # generated on the same day. For reference, 1 million objects will be 

396 # split into only 2 inventory reports, so it is very rare that there 

397 # will be many inventory reports on the same day. But including this 

398 # logic for robustness. 

399 for metadata in inventory_report_metadata: 

400 inventory_report_date = InventoryReport._convert_str_to_datetime( 

401 metadata["timeCreated"] 

402 ).date() 

403 

404 if inventory_report_date == most_recent_date: 

405 # Download the raw inventory report if the date matches. 

406 # Header is not needed, we only need to process and store 

407 # the content. 

408 _header, encoded_content = await gcs_file_system._call( 

409 "GET", "b/{}/o/{}", bucket, metadata.get("name"), alt="media" 

410 ) 

411 

412 # Decode the binary content into string for the content. 

413 decoded_content = encoded_content.decode() 

414 

415 inventory_report_content.append(decoded_content) 

416 

417 return inventory_report_content 

418 

419 def _parse_inventory_report_content( 

420 gcs_file_system, 

421 inventory_report_content, 

422 inventory_report_config, 

423 use_snapshot_listing, 

424 bucket, 

425 ): 

426 """ 

427 Parses the raw inventory report content and extracts object details. 

428 

429 Parameters: 

430 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

431 class (see 'core.py'). 

432 inventory_report_content (list): A list of strings containing the 

433 raw content of the inventory report. 

434 inventory_report_config (InventoryReportConfig): An instance of the 

435 InventoryReportConfig class representing the inventory report 

436 configuration. 

437 use_snapshot_listing (bool): A flag indicating whether to use snapshot 

438 listing in the inventory report. 

439 bucket (str): The name of the GCS bucket containing the inventory 

440 reports. 

441 

442 Returns: 

443 list: A list of dictionaries representing object details parsed 

444 from the inventory report content. 

445 """ 

446 # Get the csv configuration for each inventory report. 

447 csv_options = inventory_report_config.csv_options 

448 record_separator = csv_options.get("recordSeparator", "\n") 

449 delimiter = csv_options.get("delimiter", ",") 

450 header_required = csv_options.get("headerRequired", False) 

451 

452 objects = [] 

453 

454 for content in inventory_report_content: 

455 # Split the content into lines based on the specified separator. 

456 lines = content.split(record_separator) 

457 

458 # Remove the header, if present. 

459 if header_required: 

460 lines = lines[1:] 

461 

462 # Parse each line of the inventory report. 

463 for line in lines: 

464 obj = InventoryReport._parse_inventory_report_line( 

465 inventory_report_line=line, 

466 use_snapshot_listing=use_snapshot_listing, 

467 gcs_file_system=gcs_file_system, 

468 inventory_report_config=inventory_report_config, 

469 delimiter=delimiter, 

470 bucket=bucket, 

471 ) 

472 

473 objects.append(obj) 

474 

475 return objects 

476 

477 def _parse_inventory_report_line( 

478 inventory_report_line, 

479 use_snapshot_listing, 

480 gcs_file_system, 

481 inventory_report_config, 

482 delimiter, 

483 bucket, 

484 ): 

485 """ 

486 Parses a single line of the inventory report and extracts object details. 

487 

488 Parameters: 

489 inventory_report_line (str): A string representing a single line of 

490 the raw content from the inventory report. 

491 use_snapshot_listing (bool): A flag indicating whether to use snapshot 

492 listing in the inventory report. 

493 gcs_file_system (GCSFileSystem): An instance of the 'GCSFileSystem' 

494 class (see 'core.py'). 

495 inventory_report_config (InventoryReportConfig): An instance of the 

496 InventoryReportConfig class representing the inventory report 

497 configuration. 

498 delimiter (str): The delimiter used in the inventory report content 

499 to separate fields. 

500 bucket (str): The name of the GCS bucket containing the inventory 

501 reports. 

502 

503 Returns: 

504 dict: A dictionary representing object details parsed from the 

505 inventory report line. 

506 """ 

507 obj_name_idx = inventory_report_config.obj_name_idx 

508 metadata_fields = inventory_report_config.metadata_fields 

509 

510 # If the client wants to do listing from the snapshot, we need 

511 # to fetch all the metadata for each object. Otherwise, we only 

512 # need to fetch the name. 

513 if use_snapshot_listing is True: 

514 obj = gcs_file_system._process_object( 

515 { 

516 key: value 

517 for key, value in zip( 

518 metadata_fields, inventory_report_line.strip().split(delimiter) 

519 ) 

520 }, 

521 bucket, 

522 ) 

523 else: 

524 obj = {"name": inventory_report_line.strip().split(delimiter)[obj_name_idx]} 

525 

526 return obj 

527 

528 def _construct_final_snapshot(objects, prefix, use_snapshot_listing): 

529 """ 

530 Constructs the final snapshot based on the retrieved objects and prefix. 

531 

532 Parameters: 

533 objects (list): A list of dictionaries representing object details 

534 from the inventory report. 

535 prefix (str): A prefix used to filter objects in the snapshot based 

536 on their names. 

537 use_snapshot_listing (bool): A flag indicating whether to use snapshot 

538 listing in the inventory report. 

539 

540 Returns: 

541 tuple: A tuple containing two lists: the 'items' list representing 

542 object details for the snapshot, and the 'prefixes' list containing 

543 subdirectory prefixes. If 'use_snapshot_listing' is set to False, 

544 'prefix' will also be empty, and 'items' will contains the object 

545 names in the snapshot. 

546 """ 

547 if prefix is None: 

548 prefix = "" 

549 

550 # Filter the prefix and returns the list if the user does not want to use 

551 # the snapshot for listing. 

552 if use_snapshot_listing is False: 

553 return [obj for obj in objects if obj.get("name").startswith(prefix)], [] 

554 

555 else: 

556 # If the user wants to use the snapshot, generate both the items and 

557 # prefixes manually. 

558 items = [] 

559 prefixes = set() 

560 

561 for obj in objects: 

562 # Fetch the name of the object. 

563 obj_name = obj.get("name") 

564 

565 # If the object name doesn't start with the prefix, continue. 

566 # In the case where prefix is empty, it will always return 

567 # true (which is the expected behavior). 

568 if not obj_name.startswith(prefix): 

569 continue 

570 

571 # Remove the prefix. 

572 object_name_no_prefix = obj_name[len(prefix) :] 

573 

574 # Determine whether the object name is a directory. 

575 first_delimiter_idx = object_name_no_prefix.find("/") 

576 

577 # If not, then append it to items. 

578 if first_delimiter_idx == -1: 

579 items.append(obj) 

580 continue 

581 

582 # If it is, recompose the directory and add to the prefix set. 

583 dir = object_name_no_prefix[:first_delimiter_idx] 

584 obj_prefix = ( 

585 prefix.rstrip("/") 

586 + ("" if prefix == "" else "/") 

587 + dir 

588 + ("" if dir == "" else "/") 

589 ) 

590 prefixes.add(obj_prefix) 

591 

592 return items, list(prefixes) 

593 

594 @staticmethod 

595 def _convert_obj_to_date(obj): 

596 """ 

597 Converts a dictionary representing a date object to a datetime object. 

598 

599 Parameters: 

600 obj (dict): A dictionary representing a date object with keys "day", 

601 "month", and "year". 

602 

603 Returns: 

604 datetime: A datetime object representing the converted date. 

605 """ 

606 day = obj["day"] 

607 month = obj["month"] 

608 year = obj["year"] 

609 return datetime(year, month, day) 

610 

611 @staticmethod 

612 def _convert_str_to_datetime(str): 

613 """ 

614 Converts an ISO-formatted date string to a datetime object. 

615 

616 Parameters: 

617 date_string (str): An ISO-formatted date string with or without 

618 timezone information (Z). 

619 

620 Returns: 

621 datetime: A datetime object representing the converted date and time. 

622 """ 

623 return datetime.fromisoformat(str.replace("Z", "+00:00")) 

624 

625 

626class InventoryReportConfig: 

627 """ 

628 Represents the configuration for fetching inventory reports. 

629 

630 Attributes: 

631 csv_options (dict): A dictionary containing options for parsing CSV 

632 format in the inventory reports. 

633 bucket (str): The name of the GCS bucket from which to fetch the 

634 inventory reports. 

635 destination_path (str): The path within the GCS bucket where the 

636 inventory reports are stored. 

637 metadata_fields (list): A list of strings representing metadata 

638 fields to be extracted from the inventory reports. 

639 obj_name_idx (int): The index of the "name" field in the 'metadata_fields' 

640 list, used to identify object names. 

641 """ 

642 

643 def __init__( 

644 self, csv_options, bucket, destination_path, metadata_fields, obj_name_idx 

645 ): 

646 self.csv_options = csv_options 

647 self.bucket = bucket 

648 self.destination_path = destination_path 

649 self.metadata_fields = metadata_fields 

650 self.obj_name_idx = obj_name_idx