Coverage for gcsfs/caching.py: 97%

62 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-04-20 18:41 -0400

1from collections import deque 

2 

3from fsspec.caching import BaseCache, register_cache 

4 

5 

6class ReadAheadChunked(BaseCache): 

7 """ 

8 An optimized ReadAhead cache that fetches multiple chunks in a single 

9 HTTP request but manages them as separate bytes objects to avoid 

10 expensive memory slicing. 

11 

12 While this approach primarily optimizes for CPU and memory allocation overhead, 

13 it strictly maintains the same semantics as the existing readahead cache. 

14 For example, if a user requests 5MB and the cache fetches 10MB, it serves the 

15 requested 5MB but retains that data in memory to handle potential backward seeks. 

16 This mirrors the standard readahead behavior, which does not eagerly discard served 

17 chunks until a new fetch is required. 

18 """ 

19 

20 name = "readahead_chunked" 

21 

22 def __init__(self, blocksize: int, fetcher, size: int) -> None: 

23 super().__init__(blocksize, fetcher, size) 

24 self.chunks = deque() # Entries: (start, end, data_bytes) 

25 

26 @property 

27 def cache(self): 

28 """ 

29 Compatibility property for tests/legacy code that expects 'cache' 

30 to be a single bytestring. 

31 

32 WARNING: Accessing this property forces a memory copy of the 

33 entire current buffer, negating the Zero-Copy optimization 

34 of ReadAheadChunked. Use for debugging/testing only. 

35 """ 

36 if not self.chunks: 

37 return b"" 

38 return b"".join(chunk[2] for chunk in self.chunks) 

39 

40 def _fetch(self, start: int | None, end: int | None) -> bytes: 

41 if start is None: 

42 start = 0 

43 if end is None or end > self.size: 

44 end = self.size 

45 if start >= self.size: 

46 return b"" 

47 

48 # Handle backward seeks that go beyond the start of our cache window 

49 if self.chunks and self.chunks[0][0] > start: 

50 self.chunks.clear() 

51 

52 parts = [] 

53 current_pos = start 

54 

55 # Satisfy as much as possible from the existing cache (Zero-Copy) 

56 for c_start, c_end, c_data in self.chunks: 

57 if c_end <= start: 

58 continue # Skip chunks completely before our window 

59 

60 if c_start >= end: 

61 break # If we've reached chunks completely past our window, stop 

62 

63 if c_end > current_pos: 

64 slice_start = max(0, current_pos - c_start) 

65 slice_end = min(len(c_data), end - c_start) 

66 

67 if slice_start == 0 and slice_end == len(c_data): 

68 # Zero-copy: Direct reference to the full object 

69 parts.append(c_data) 

70 else: 

71 # Slicing creates a copy, but it's unavoidable for partials 

72 parts.append(c_data[slice_start:slice_end]) 

73 

74 current_pos += slice_end - slice_start 

75 

76 # Fetch missing data if necessary 

77 should_fetch_backend = current_pos < end 

78 if should_fetch_backend: 

79 # On a cache miss, we replace the entire window (standard readahead behavior) 

80 self.chunks.clear() 

81 

82 missing_len = min(self.size - current_pos, end - current_pos) 

83 readahead_block = min( 

84 self.size - (current_pos + missing_len), self.blocksize 

85 ) 

86 

87 self.miss_count += 1 

88 chunk_lengths = [missing_len] 

89 if readahead_block > 0: 

90 chunk_lengths.append(readahead_block) 

91 

92 # Vector read call 

93 new_chunks = self.fetcher(start=current_pos, chunk_lengths=chunk_lengths) 

94 

95 # Process the requested data 

96 req_data = new_chunks[0] 

97 self.chunks.append((current_pos, current_pos + len(req_data), req_data)) 

98 self.total_requested_bytes += len(req_data) 

99 parts.append(req_data) 

100 

101 # Process the readahead data (if any) 

102 if len(new_chunks) > 1: 

103 ra_data = new_chunks[1] 

104 ra_start = current_pos + len(req_data) 

105 self.chunks.append((ra_start, ra_start + len(ra_data), ra_data)) 

106 self.total_requested_bytes += len(ra_data) 

107 

108 if not parts: 

109 return b"" 

110 

111 if not should_fetch_backend: 

112 self.hit_count += 1 

113 

114 # Optimization: return the single object directly if possible 

115 if len(parts) == 1: 

116 return parts[0] 

117 

118 return b"".join(parts) 

119 

120 

121register_cache(ReadAheadChunked, clobber=True)