Coverage for gcsfs/caching.py: 97%
62 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
« prev ^ index » next coverage.py v7.9.1, created at 2026-04-20 18:41 -0400
1from collections import deque
3from fsspec.caching import BaseCache, register_cache
6class ReadAheadChunked(BaseCache):
7 """
8 An optimized ReadAhead cache that fetches multiple chunks in a single
9 HTTP request but manages them as separate bytes objects to avoid
10 expensive memory slicing.
12 While this approach primarily optimizes for CPU and memory allocation overhead,
13 it strictly maintains the same semantics as the existing readahead cache.
14 For example, if a user requests 5MB and the cache fetches 10MB, it serves the
15 requested 5MB but retains that data in memory to handle potential backward seeks.
16 This mirrors the standard readahead behavior, which does not eagerly discard served
17 chunks until a new fetch is required.
18 """
20 name = "readahead_chunked"
22 def __init__(self, blocksize: int, fetcher, size: int) -> None:
23 super().__init__(blocksize, fetcher, size)
24 self.chunks = deque() # Entries: (start, end, data_bytes)
26 @property
27 def cache(self):
28 """
29 Compatibility property for tests/legacy code that expects 'cache'
30 to be a single bytestring.
32 WARNING: Accessing this property forces a memory copy of the
33 entire current buffer, negating the Zero-Copy optimization
34 of ReadAheadChunked. Use for debugging/testing only.
35 """
36 if not self.chunks:
37 return b""
38 return b"".join(chunk[2] for chunk in self.chunks)
40 def _fetch(self, start: int | None, end: int | None) -> bytes:
41 if start is None:
42 start = 0
43 if end is None or end > self.size:
44 end = self.size
45 if start >= self.size:
46 return b""
48 # Handle backward seeks that go beyond the start of our cache window
49 if self.chunks and self.chunks[0][0] > start:
50 self.chunks.clear()
52 parts = []
53 current_pos = start
55 # Satisfy as much as possible from the existing cache (Zero-Copy)
56 for c_start, c_end, c_data in self.chunks:
57 if c_end <= start:
58 continue # Skip chunks completely before our window
60 if c_start >= end:
61 break # If we've reached chunks completely past our window, stop
63 if c_end > current_pos:
64 slice_start = max(0, current_pos - c_start)
65 slice_end = min(len(c_data), end - c_start)
67 if slice_start == 0 and slice_end == len(c_data):
68 # Zero-copy: Direct reference to the full object
69 parts.append(c_data)
70 else:
71 # Slicing creates a copy, but it's unavoidable for partials
72 parts.append(c_data[slice_start:slice_end])
74 current_pos += slice_end - slice_start
76 # Fetch missing data if necessary
77 should_fetch_backend = current_pos < end
78 if should_fetch_backend:
79 # On a cache miss, we replace the entire window (standard readahead behavior)
80 self.chunks.clear()
82 missing_len = min(self.size - current_pos, end - current_pos)
83 readahead_block = min(
84 self.size - (current_pos + missing_len), self.blocksize
85 )
87 self.miss_count += 1
88 chunk_lengths = [missing_len]
89 if readahead_block > 0:
90 chunk_lengths.append(readahead_block)
92 # Vector read call
93 new_chunks = self.fetcher(start=current_pos, chunk_lengths=chunk_lengths)
95 # Process the requested data
96 req_data = new_chunks[0]
97 self.chunks.append((current_pos, current_pos + len(req_data), req_data))
98 self.total_requested_bytes += len(req_data)
99 parts.append(req_data)
101 # Process the readahead data (if any)
102 if len(new_chunks) > 1:
103 ra_data = new_chunks[1]
104 ra_start = current_pos + len(req_data)
105 self.chunks.append((ra_start, ra_start + len(ra_data), ra_data))
106 self.total_requested_bytes += len(ra_data)
108 if not parts:
109 return b""
111 if not should_fetch_backend:
112 self.hit_count += 1
114 # Optimization: return the single object directly if possible
115 if len(parts) == 1:
116 return parts[0]
118 return b"".join(parts)
121register_cache(ReadAheadChunked, clobber=True)