tensorblob
tensorblob
A lightweight, dynamic-sized, memory-mapped tensor storage with file-like APIs, while also supporting integer indexing and slicing, built with MemoryMappedTensor from tensordict.
Features
- 🔗 Memory-mapped storage: Efficient storage of large collections of same-shaped tensors
- 💾 File-like APIs: Read, write, and seek like a file, while also supporting integer indexing and slicing
- âš¡ Dynamic-sized: No need to specify the total number of tensors upfront
- 🔄 Extend and truncate: Extend the blob with another blob or truncate the blob to a specific position
- 🚀 LRU cache: Automatic management of memory-mapped blocks for scalability with large blobs
Installation
From PyPI:
pip install tensorblob
If you are interested in the experimental (i.e., unstable and undertested) version, you can install it from GitHub:
pip install git+https://github.com/Guest400123064/tensorblob.git
Core Use Cases
Quick Start
The example below shows how to create a new storage for a collection of randomly generated fake embeddings, and how to access them by index. Since the storage is memory-mapped, no need to read all tensors into memory; just access them by index.
import torch
from tensorblob import TensorBlob
# Create a new storage for a collection of randomly generated fake embeddings;
# need to specify the data type and shape of each tensor for creation
with TensorBlob.open("embeddings.blob", "w", dtype="float32", shape=768) as blob:
blob.write(torch.randn(100_000, 768))
print(f"Wrote {len(blob)} embeddings")
# No need to specify the configurations again after creation
with TensorBlob.open("embeddings.blob", "r") as blob:
e1 = blob[42]
e2 = blob[-1:16384:-12345]
print(f"Similarity: {torch.cosine_similarity(e1, e2)}")
Processing Large Datasets
Store and preprocess datasets larger than RAM using memory mapping can be useful to accelerate the training process by reducing the time spent on data loading and transformation.
with TensorBlob.open("data/images.blob", "w", dtype="float32", shape=(3, 224, 224)) as blob:
for image_batch in data_loader:
blob.write(preprocess(image_batch))
with TensorBlob.open("data/images.blob", "r") as blob:
for image in blob:
result = model(image)
Incremental Data Collection
Append new data to existing blobs can be useful with streaming data collection.
with TensorBlob.open("positions.blob", "w", dtype="float32", shape=3) as blob:
blob.write(initial_position)
# Later: append more data by opening the blob in append mode
with TensorBlob.open("positions.blob", "a") as blob:
for pos in trajectory_queue.get():
blob.write(pos)
print(f"Total trajectory recorded: {len(blob)}")
Random Access and Updates with File-Like APIs
Read and modify specific tensors starting from a specific position.
import io
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.seek(1000)
print(f"Current position: {blob.tell()}")
batch = blob.read(size=100)
print(f"Read {batch.shape} tensors")
# Update specific positions, whence is also supported
blob.seek(-500, whence=io.SEEK_END)
blob.write(updated_features)
# Append new data
blob.seek(len(blob))
blob.write(additional_features)
Extend and Truncate
Extend the blob with another blob or truncate the blob to a specific position. Extension could be useful if we want to merge two blobs into one, e.g., results from two different processes. Note that extension operation does not delete the original data.
with TensorBlob.open("data/features.blob", "a") as blob:
blob.extend(other_blob)
# Extension without maintaining the order is faster
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.extend(other_blob, maintain_order=False)
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.truncate(1000)
print(f"Truncated to {len(blob)} tensors")
Performance and Scalability
Memory Management
TensorBlob uses an LRU (Least Recently Used) cache to manage memory-mapped blocks efficiently. This allows you to work with blobs containing millions of tensors without loading everything into memory.
Default behavior:
- Automatically caches up to ~4,000 blocks (1/16 of system's VMA limit)
- Blocks loaded on-demand when accessed
- Least recently used blocks automatically evicted when cache is full
For large-scale workloads:
# Increase cache for better random access performance
with TensorBlob.open("large.blob", "r", max_cached_blocks=10_000) as blob:
for idx in random_indices:
tensor = blob[idx] # Cached blocks reused efficiently
# Decrease cache for memory-constrained environments
with TensorBlob.open("data.blob", "r", max_cached_blocks=100) as blob:
for tensor in blob: # Sequential access works fine with small cache
process(tensor)
Performance tips:
- Sequential access patterns work well with any cache size
- Random access benefits from larger cache sizes
- Each cached block consumes ~200 bytes of kernel memory (VMA overhead)
- System limit: typically ~65,000 memory-mapped regions per process
- To avoid frequent cache evictions, one can also increase the block size to reduce the total number of blocks
Contributing
Contributions welcome! Please submit a Pull Request.
License
Apache License 2.0 - see LICENSE file for details.
38class TensorBlob(ConfigMixin): 39 _m_rd = False 40 _m_wr = False 41 _m_ap = False 42 43 status_name = ".stat" 44 config_name = ".conf" 45 ignore_for_config = ["filename", "mode", "max_cached_blocks"] 46 47 @classmethod 48 def open( 49 cls, 50 filename, 51 mode="r", 52 *, 53 dtype=None, 54 shape=None, 55 block_size=8192, 56 max_cached_blocks=None, 57 ): 58 r"""Open a TensorBlob with file-like interface for tensor storage. 59 60 TensorBlob provides persistent, memory-mapped storage for large collections 61 of same-shaped tensors. It uses a block-based architecture where tensors are 62 organized into fixed-size blocks for efficient I/O and memory management. 63 64 The blob is stored as a directory containing: 65 - ``.conf``: Configuration file (dtype, shape, block_size) 66 - ``.stat``: State file (length, block list) 67 - Block files: UUID-named memory-mapped tensor files 68 69 Parameters 70 ---------- 71 filename : str or Path 72 Directory path for blob storage. Supports tilde expansion (~) and 73 relative paths. 74 mode : str, default="r" 75 File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details. 76 dtype : str or torch.dtype, optional 77 Data type for tensors. Required for new blobs (modes 'w', 'w+'). 78 shape : tuple of int or int, optional 79 Shape of individual tensors. Required for new blobs (modes 'w', 'w+'). 80 block_size : int, default=8192 81 Number of tensors per memory-mapped block file. 82 max_cached_blocks : int, optional 83 Maximum number of memory-mapped blocks to keep cached. When exceeded, 84 least recently used blocks are unmapped. If None (default), uses 1/16 85 of system's max_map_count limit (typically ~4000). This limits kernel 86 VMA overhead for blobs with many blocks. 87 88 Returns 89 ------- 90 TensorBlob 91 Opened blob object. Use with context manager for automatic cleanup. 92 93 Raises 94 ------ 95 FileNotFoundError 96 If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist. 97 ValueError 98 If creating new blob without dtype or shape, or if mode is invalid. 99 TypeError 100 If dtype is neither string nor torch.dtype. 101 102 Examples 103 -------- 104 Creating a new blob and writing data: 105 106 >>> import torch 107 >>> from tensorblob import TensorBlob 108 >>> 109 >>> with TensorBlob.open("data/embeddings", "w", 110 ... dtype="float32", shape=(768,)) as blob: 111 ... embeddings = torch.randn(1000, 768) 112 ... blob.write(embeddings) 113 ... print(f"Wrote {len(blob)} tensors") 114 Wrote 1000 tensors 115 116 Reading from existing blob: 117 118 >>> with TensorBlob.open("data/embeddings", "r") as blob: 119 ... all_data = blob.read() 120 ... print(all_data.shape) 121 torch.Size([1000, 768]) 122 123 Appending to existing blob: 124 125 >>> with TensorBlob.open("data/embeddings", "a") as blob: 126 ... new_data = torch.randn(100, 768) 127 ... blob.write(new_data) 128 ... print(f"Total: {len(blob)}") 129 Total: 1100 130 131 Read and update with r+ mode: 132 133 >>> with TensorBlob.open("data/embeddings", "r+") as blob: 134 ... first_10 = blob.read(size=10) 135 ... blob.seek(5) 136 ... blob.write(torch.ones(3, 768)) # Overwrite at position 5 137 138 Custom block size for large tensors: 139 140 >>> with TensorBlob.open("data/images", "w", 141 ... dtype=torch.float32, 142 ... shape=(3, 1024, 1024), 143 ... block_size=256) as blob: 144 ... images = torch.randn(1000, 3, 1024, 1024) 145 ... blob.write(images) 146 147 Custom cache size for large-scale random access: 148 149 >>> # Increase cache for better random access performance 150 >>> with TensorBlob.open("data/embeddings", "r", 151 ... max_cached_blocks=10000) as blob: 152 ... for idx in random_indices: 153 ... embedding = blob[idx] # Frequently accessed blocks stay cached 154 155 >>> # Decrease cache for memory-constrained environments 156 >>> with TensorBlob.open("data/features", "r", 157 ... max_cached_blocks=100) as blob: 158 ... for feature in blob: # Sequential access works fine 159 ... process(feature) 160 161 File Access Modes 162 ----------------- 163 Similar to Python's built-in open(), supports the following modes: 164 165 Basic modes: 166 - 'r' : Read-only. Blob must exist. Position starts at beginning. 167 - 'w' : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists, 168 truncation will ignore any other parameters supplied and rely on existing configuration.** 169 - 'a' : Append-only. Blob must exist. Position starts at end. 170 All writes go to end regardless of seek position. 171 172 Update modes (with '+'): 173 - 'r+' : Read and write. Blob must exist. Position at start. 174 Can overwrite existing data or extend at end. 175 - 'w+' : Read and write. Creates new or truncates existing. Position at start. 176 - 'a+' : Read and append. Blob must exist. Position at end. 177 Reads allowed anywhere, writes always append to end. 178 179 Data Type and Shape 180 ------------------- 181 All tensors in a blob must have the same dtype and shape. These are 182 specified when creating a new blob (modes 'w', 'w+') and stored in 183 the configuration file. When opening existing blobs, dtype and shape 184 are loaded automatically. 185 186 Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. 187 Can also use torch.dtype objects like torch.float32. 188 189 Shape can be: 190 - Single integer: shape=10 creates 1D tensors of shape (10,) 191 - Tuple: shape=(3, 224, 224) creates 3D tensors 192 """ 193 modes = set(mode) 194 if modes - set("raw+") or len(mode) > len(modes): 195 raise ValueError("Invalid mode: %s" % mode) 196 if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1: 197 raise ValueError( 198 "Must have exactly one of read/write/append mode and at most one plus: %s" 199 % mode 200 ) 201 202 filename = Path(filename).expanduser().resolve() 203 if not filename.exists(): 204 if "r" in modes or "a" in modes: 205 raise FileNotFoundError("Blob not found: %r" % filename) 206 if dtype is None or shape is None: 207 raise ValueError( 208 "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r" 209 % (dtype, shape) 210 ) 211 if isinstance(dtype, torch.dtype): 212 dtype = str(dtype).split(".").pop() 213 elif not isinstance(dtype, str): 214 raise TypeError( 215 "dtype must be str or torch.dtype, got %r" % type(dtype).__name__ 216 ) 217 shape = (shape,) if isinstance(shape, int) else tuple(shape) 218 return cls( 219 os.fspath(filename), dtype, shape, block_size, mode, max_cached_blocks 220 ) 221 222 return cls.from_config( 223 save_directory=filename, 224 runtime_kwargs={ 225 "mode": mode, 226 "filename": os.fspath(filename), 227 "max_cached_blocks": max_cached_blocks, 228 }, 229 ) 230 231 @classmethod 232 def unlink(cls, filename): 233 filename = Path(filename).expanduser().resolve() 234 if filename.exists(): 235 try: 236 with cls.open(filename, "w") as _: 237 pass 238 os.unlink(filename / cls.config_name) 239 os.unlink(filename / cls.status_name) 240 os.rmdir(os.fspath(filename)) 241 except Exception as exc: 242 warnings.warn("Failed to unlink blob at %r: %s" % (filename, exc)) 243 return False 244 return True 245 246 @classmethod 247 def apply_param_hooks(cls, d): 248 d["shape"] = tuple(d["shape"]) 249 return d 250 251 @classmethod 252 def _getsyscachesize(cls) -> int: 253 # Get default cache size for memory-mapped blocks. Returns 1/16 of system's 254 # max_map_count to be conservative, typically ~4000, leaving room for other 255 # VMAs in the process. 256 maxsize = 65536 257 try: 258 with open("/proc/sys/vm/max_map_count", "r") as f: 259 maxsize = int(f.read().strip()) 260 except (FileNotFoundError, ValueError, PermissionError): 261 pass 262 return max(maxsize // 16, 128) 263 264 @register_to_config 265 def __init__( 266 self, 267 filename: str, 268 dtype: str, 269 shape: tuple[int, ...], 270 block_size: int, 271 mode: str, 272 max_cached_blocks: int | None = None, 273 ) -> None: 274 self.filename = filename 275 self.dtype = dtype 276 self.shape = shape 277 self.block_size = block_size 278 self.mode = mode 279 self.max_cached_blocks = max_cached_blocks or self._getsyscachesize() 280 281 self._pos = 0 282 self._closed = False 283 284 if "+" in mode: 285 self._m_rd = True 286 self._m_wr = True 287 match mode.replace("+", ""): 288 case "r": 289 self._m_rd = True 290 case "w": 291 self._m_wr = True 292 self._trunc() 293 case "a": 294 self._m_wr = True 295 self._m_ap = True 296 self._create() 297 298 self._loadstatus() 299 300 @property 301 def configpath(self) -> str: 302 return os.path.join(self.filename, self.config_name) 303 304 @property 305 def statuspath(self) -> str: 306 return os.path.join(self.filename, self.status_name) 307 308 @property 309 def closed(self) -> bool: 310 return self._closed 311 312 def __enter__(self) -> TensorBlob: 313 return self 314 315 def __exit__(self, *_) -> None: 316 self.close() 317 318 def __len__(self) -> int: 319 return self._status.len 320 321 def __getitem__(self, idx: int | slice) -> torch.Tensor: 322 if not isinstance(idx, (int, slice)): 323 raise TypeError("Index must be int or slice, got %r!" % type(idx).__name__) 324 if isinstance(idx, int): 325 if idx >= len(self) or idx < -len(self): 326 raise IndexError( 327 "Index out of bounds: %r (length: %d)" % (idx, len(self)) 328 ) 329 i, o = divmod(idx + len(self) if idx < 0 else idx, self.block_size) 330 return self._getblock(i)[o].clone() 331 332 # Although the current implementation may not be efficient, it is very easy to 333 # understand and debug. More efficient implementation requires much more complex 334 # edge case handling and is error prone. Also, I think the primary cost here is 335 # still the I/O operations, not the Python code. 336 ret = [ 337 self._getblock(bd)[[i % self.block_size for i in _is]] 338 for bd, _is in groupby( 339 range(*idx.indices(len(self))), key=lambda i: i // self.block_size 340 ) 341 ] 342 if not ret: 343 return torch.empty(0, *self.shape, dtype=getattr(torch, self.dtype)) 344 return torch.cat(ret, dim=0) 345 346 def __iter__(self) -> Iterator[torch.Tensor]: 347 for i in range(self._pos, len(self)): 348 self._pos += 1 349 yield self[i] 350 351 def _trunc(self) -> None: 352 if os.path.exists(self.filename): 353 try: 354 st = TensorBlobStatus.load(self.statuspath) 355 except FileNotFoundError as exc: 356 raise FileNotFoundError( 357 "Status file missing for blob at %r; file corrupted!" 358 % self.statuspath 359 ) from exc 360 for bd in st.bds: 361 os.remove(os.path.join(self.filename, bd)) 362 self.save_config(save_directory=self.filename, overwrite=True) 363 TensorBlobStatus().dump(self.statuspath) 364 365 def _create(self) -> None: 366 if not os.path.exists(self.filename): 367 self.save_config(save_directory=self.filename) 368 TensorBlobStatus().dump(self.statuspath) 369 370 def _getblock(self, bd: str | int = -1) -> MemoryMappedTensor: 371 if not self._status.bds: 372 self._addblock() 373 if isinstance(bd, int): 374 bd = self._status.bds[bd] 375 if bd in self._memmap: 376 return self._memmap[bd] 377 378 # If cache no hit, a block is lazy-loaded into the cache. We need to 379 # avoid the __getitem__ call during return here to not increase the 380 # cache hit count a second time. 381 block = self._memmap[bd] = MemoryMappedTensor.from_filename( 382 os.path.join(self.filename, bd), 383 dtype=getattr(torch, self.dtype), 384 shape=(self.block_size, *self.shape), 385 ) 386 return block 387 388 def _isfull(self) -> bool: 389 return (not len(self) % self.block_size) and bool(len(self)) 390 391 def _addblock(self) -> MemoryMappedTensor: 392 if self._status.bds and not self._isfull(): 393 raise RuntimeError( 394 "Attempt to create a new block when working block " 395 "is not full: length <%d> < capacity <%d>." 396 % (len(self) % self.block_size, self.block_size) 397 ) 398 name = str(uuid.uuid4()) 399 mmap = MemoryMappedTensor.empty( 400 self.block_size, 401 *self.shape, 402 dtype=getattr(torch, self.dtype), 403 filename=os.path.join(self.filename, name), 404 ) 405 self._status.bds.append(name) 406 self._memmap[name] = mmap 407 return mmap 408 409 def _loadstatus(self) -> None: 410 try: 411 self._status = TensorBlobStatus.load(self.statuspath) 412 self._memmap = LRUCache(maxsize=self.max_cached_blocks) 413 if self._m_ap: 414 self._pos = len(self) 415 except FileNotFoundError as exc: 416 raise FileNotFoundError( 417 "status file missing for blob at %r; file corrupted!" % self.statuspath 418 ) from exc 419 420 def _checkclosed(self) -> None: 421 if self._closed: 422 raise IOError("I/O operation on closed blob.") 423 424 def _checkwritable(self) -> None: 425 if not self._m_wr: 426 raise IOError("Blob is not open for writing (mode='%s')" % self.mode) 427 self._checkclosed() 428 429 def _checkreadable(self) -> None: 430 if not self._m_rd: 431 raise IOError("Blob is not open for reading (mode='%s')" % self.mode) 432 self._checkclosed() 433 434 def tell(self) -> int: 435 self._checkclosed() 436 return self._pos 437 438 def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int: 439 self._checkclosed() 440 match whence: 441 case io.SEEK_SET: 442 _pos = pos 443 case io.SEEK_CUR: 444 _pos = self._pos + pos 445 case io.SEEK_END: 446 _pos = len(self) + pos 447 case _: 448 raise ValueError("Invalid whence: %r" % whence) 449 self._pos = max(min(_pos, len(self)), 0) 450 return self.tell() 451 452 def close(self) -> None: 453 if not self._closed and self._m_wr: 454 self.flush() 455 self._closed = True 456 457 def flush(self) -> None: 458 self._checkwritable() 459 self._status.dump(self.statuspath) 460 461 def read(self, size: int | None = None) -> torch.Tensor: 462 self._checkreadable() 463 end = min(self._pos + (size or len(self)), len(self)) 464 ret = self[self._pos : end] 465 self.seek(end) 466 return ret 467 468 def write(self, ts: torch.Tensor) -> int: 469 self._checkwritable() 470 if self._m_ap: 471 self.seek(whence=io.SEEK_END) 472 ts = ts.view(-1, *self.shape) 473 nt = ts.size(0) 474 475 cnt = 0 476 while cnt < nt: 477 if self._isfull() and self._pos >= len(self): 478 self._addblock() 479 i, o = divmod(self._pos, self.block_size) 480 incr = min(self.block_size - o, nt - cnt) 481 self._getblock(i)[o : o + incr] = ts[cnt : cnt + incr] 482 483 # Update status length for new tensors exceeding the original range only, because 484 # the cursor may not always be the at EOF and the number of tensors written could 485 # be smaller than change in length 486 self._pos += incr 487 self._status.len += max(0, self._pos - len(self)) 488 489 cnt += incr 490 491 assert cnt == nt, "Write incomplete: wrote %d of %d tensors!" % (cnt, nt) 492 return cnt 493 494 def truncate(self, pos: int | None = None) -> int: 495 self._checkwritable() 496 self.seek(pos or self.tell()) 497 brk = ceil(self.tell() / self.block_size) 498 for bd in self._status.bds[brk:]: 499 if bd in self._memmap: 500 del self._memmap[bd] 501 os.remove(os.path.join(self.filename, bd)) 502 self._status.bds = self._status.bds[:brk] 503 self._status.len = self.tell() 504 self.flush() 505 return self.tell() 506 507 def extend(self, other: TensorBlob, maintain_order: bool = False) -> None: 508 if self.dtype != other.dtype or self.shape != other.shape: 509 raise ValueError("Blob data types and shapes must match to extend blobs!") 510 511 self._checkwritable() 512 self.seek(whence=io.SEEK_END) 513 514 # TODO: Honestly this is a bit inefficient but I think this is rarely used. 515 if maintain_order: 516 for i in range(len(other)): 517 self.write(other[i]) 518 return 519 520 # If order is not important, we can simply copy over the complete blocks from 521 # the other blob and merge incomplete blocks. 522 if self.block_size != other.block_size: 523 raise ValueError( 524 "Block sizes must match to extend blobs in non-order-preserving mode!" 525 ) 526 527 comb = [] 528 sbrk = len(self) // self.block_size * self.block_size 529 if sbrk < len(self): 530 comb.append(self[sbrk:]) 531 obrk = len(other) // other.block_size * other.block_size 532 if obrk < len(other): 533 comb.append(other[obrk:]) 534 535 # TODO: We are directly accessing internal data structures of the other blob here. 536 self.truncate(sbrk) 537 for obd in other._status.bds[: len(other) // other.block_size]: 538 sbd = str(uuid.uuid4()) 539 shutil.copy( 540 os.path.join(other.filename, obd), os.path.join(self.filename, sbd) 541 ) 542 self._status.bds.append(sbd) 543 self._status.len += self.block_size 544 self._memmap[sbd] = MemoryMappedTensor.from_filename( 545 os.path.join(self.filename, sbd), 546 dtype=getattr(torch, self.dtype), 547 shape=(self.block_size, *self.shape), 548 ) 549 550 self.seek(whence=io.SEEK_END) 551 if comb: 552 self.write(torch.cat(comb, dim=0)) 553 self.flush()
Mixin class for automated configuration registration and IO.
Attributes
- config_name (str, default=None):
Class attribute that specifies the filename under which the config should be stored when calling
save_config. Should be overridden by the subclass. - ignore_for_config (list[str], default=[]): Class attribute that specifies a list of attributes that should not be saved in the config. Should be overridden by the subclass.
Examples
In this example, we have a model with 3 arguments:
hidden_size: The hidden size of the model._num_layers: The number of layers in the model.dropout: The dropout rate of the model.
Among the three arguments, the number of layers is implicitly ignored by the decorator because of the leading
underscore; the dropout argument is explicitly based on the specification in ignore_for_config class
variable. The hidden_size argument is registered to the config.
>>> class MyModel(ConfigMixin):
... config_name = "my_model_config.json"
... ignore_for_config = ["dropout"]
...
... @register_to_config
... def __init__(self, hidden_size: int = 768, _num_layers: int = 12, dropout: float = 0.1):
... self.hidden_size = hidden_size
... self.num_layers = _num_layers
... self.dropout = dropout # This will be ignored because of the specification in `ignore_for_config`
...
>>> model = MyModel(hidden_size=1024, _num_layers=20, dropout=0.2)
>>> model.config
mappingproxy({'__notes__': {'class_name': 'MyModel', 'using_default_values': [], 'args': (), 'kwargs': {}}, 'hidden_size': 1024})
>>> model.num_layers
20
>>> model.dropout
0.2
264 @register_to_config 265 def __init__( 266 self, 267 filename: str, 268 dtype: str, 269 shape: tuple[int, ...], 270 block_size: int, 271 mode: str, 272 max_cached_blocks: int | None = None, 273 ) -> None: 274 self.filename = filename 275 self.dtype = dtype 276 self.shape = shape 277 self.block_size = block_size 278 self.mode = mode 279 self.max_cached_blocks = max_cached_blocks or self._getsyscachesize() 280 281 self._pos = 0 282 self._closed = False 283 284 if "+" in mode: 285 self._m_rd = True 286 self._m_wr = True 287 match mode.replace("+", ""): 288 case "r": 289 self._m_rd = True 290 case "w": 291 self._m_wr = True 292 self._trunc() 293 case "a": 294 self._m_wr = True 295 self._m_ap = True 296 self._create() 297 298 self._loadstatus()
47 @classmethod 48 def open( 49 cls, 50 filename, 51 mode="r", 52 *, 53 dtype=None, 54 shape=None, 55 block_size=8192, 56 max_cached_blocks=None, 57 ): 58 r"""Open a TensorBlob with file-like interface for tensor storage. 59 60 TensorBlob provides persistent, memory-mapped storage for large collections 61 of same-shaped tensors. It uses a block-based architecture where tensors are 62 organized into fixed-size blocks for efficient I/O and memory management. 63 64 The blob is stored as a directory containing: 65 - ``.conf``: Configuration file (dtype, shape, block_size) 66 - ``.stat``: State file (length, block list) 67 - Block files: UUID-named memory-mapped tensor files 68 69 Parameters 70 ---------- 71 filename : str or Path 72 Directory path for blob storage. Supports tilde expansion (~) and 73 relative paths. 74 mode : str, default="r" 75 File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details. 76 dtype : str or torch.dtype, optional 77 Data type for tensors. Required for new blobs (modes 'w', 'w+'). 78 shape : tuple of int or int, optional 79 Shape of individual tensors. Required for new blobs (modes 'w', 'w+'). 80 block_size : int, default=8192 81 Number of tensors per memory-mapped block file. 82 max_cached_blocks : int, optional 83 Maximum number of memory-mapped blocks to keep cached. When exceeded, 84 least recently used blocks are unmapped. If None (default), uses 1/16 85 of system's max_map_count limit (typically ~4000). This limits kernel 86 VMA overhead for blobs with many blocks. 87 88 Returns 89 ------- 90 TensorBlob 91 Opened blob object. Use with context manager for automatic cleanup. 92 93 Raises 94 ------ 95 FileNotFoundError 96 If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist. 97 ValueError 98 If creating new blob without dtype or shape, or if mode is invalid. 99 TypeError 100 If dtype is neither string nor torch.dtype. 101 102 Examples 103 -------- 104 Creating a new blob and writing data: 105 106 >>> import torch 107 >>> from tensorblob import TensorBlob 108 >>> 109 >>> with TensorBlob.open("data/embeddings", "w", 110 ... dtype="float32", shape=(768,)) as blob: 111 ... embeddings = torch.randn(1000, 768) 112 ... blob.write(embeddings) 113 ... print(f"Wrote {len(blob)} tensors") 114 Wrote 1000 tensors 115 116 Reading from existing blob: 117 118 >>> with TensorBlob.open("data/embeddings", "r") as blob: 119 ... all_data = blob.read() 120 ... print(all_data.shape) 121 torch.Size([1000, 768]) 122 123 Appending to existing blob: 124 125 >>> with TensorBlob.open("data/embeddings", "a") as blob: 126 ... new_data = torch.randn(100, 768) 127 ... blob.write(new_data) 128 ... print(f"Total: {len(blob)}") 129 Total: 1100 130 131 Read and update with r+ mode: 132 133 >>> with TensorBlob.open("data/embeddings", "r+") as blob: 134 ... first_10 = blob.read(size=10) 135 ... blob.seek(5) 136 ... blob.write(torch.ones(3, 768)) # Overwrite at position 5 137 138 Custom block size for large tensors: 139 140 >>> with TensorBlob.open("data/images", "w", 141 ... dtype=torch.float32, 142 ... shape=(3, 1024, 1024), 143 ... block_size=256) as blob: 144 ... images = torch.randn(1000, 3, 1024, 1024) 145 ... blob.write(images) 146 147 Custom cache size for large-scale random access: 148 149 >>> # Increase cache for better random access performance 150 >>> with TensorBlob.open("data/embeddings", "r", 151 ... max_cached_blocks=10000) as blob: 152 ... for idx in random_indices: 153 ... embedding = blob[idx] # Frequently accessed blocks stay cached 154 155 >>> # Decrease cache for memory-constrained environments 156 >>> with TensorBlob.open("data/features", "r", 157 ... max_cached_blocks=100) as blob: 158 ... for feature in blob: # Sequential access works fine 159 ... process(feature) 160 161 File Access Modes 162 ----------------- 163 Similar to Python's built-in open(), supports the following modes: 164 165 Basic modes: 166 - 'r' : Read-only. Blob must exist. Position starts at beginning. 167 - 'w' : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists, 168 truncation will ignore any other parameters supplied and rely on existing configuration.** 169 - 'a' : Append-only. Blob must exist. Position starts at end. 170 All writes go to end regardless of seek position. 171 172 Update modes (with '+'): 173 - 'r+' : Read and write. Blob must exist. Position at start. 174 Can overwrite existing data or extend at end. 175 - 'w+' : Read and write. Creates new or truncates existing. Position at start. 176 - 'a+' : Read and append. Blob must exist. Position at end. 177 Reads allowed anywhere, writes always append to end. 178 179 Data Type and Shape 180 ------------------- 181 All tensors in a blob must have the same dtype and shape. These are 182 specified when creating a new blob (modes 'w', 'w+') and stored in 183 the configuration file. When opening existing blobs, dtype and shape 184 are loaded automatically. 185 186 Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. 187 Can also use torch.dtype objects like torch.float32. 188 189 Shape can be: 190 - Single integer: shape=10 creates 1D tensors of shape (10,) 191 - Tuple: shape=(3, 224, 224) creates 3D tensors 192 """ 193 modes = set(mode) 194 if modes - set("raw+") or len(mode) > len(modes): 195 raise ValueError("Invalid mode: %s" % mode) 196 if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1: 197 raise ValueError( 198 "Must have exactly one of read/write/append mode and at most one plus: %s" 199 % mode 200 ) 201 202 filename = Path(filename).expanduser().resolve() 203 if not filename.exists(): 204 if "r" in modes or "a" in modes: 205 raise FileNotFoundError("Blob not found: %r" % filename) 206 if dtype is None or shape is None: 207 raise ValueError( 208 "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r" 209 % (dtype, shape) 210 ) 211 if isinstance(dtype, torch.dtype): 212 dtype = str(dtype).split(".").pop() 213 elif not isinstance(dtype, str): 214 raise TypeError( 215 "dtype must be str or torch.dtype, got %r" % type(dtype).__name__ 216 ) 217 shape = (shape,) if isinstance(shape, int) else tuple(shape) 218 return cls( 219 os.fspath(filename), dtype, shape, block_size, mode, max_cached_blocks 220 ) 221 222 return cls.from_config( 223 save_directory=filename, 224 runtime_kwargs={ 225 "mode": mode, 226 "filename": os.fspath(filename), 227 "max_cached_blocks": max_cached_blocks, 228 }, 229 )
Open a TensorBlob with file-like interface for tensor storage.
TensorBlob provides persistent, memory-mapped storage for large collections of same-shaped tensors. It uses a block-based architecture where tensors are organized into fixed-size blocks for efficient I/O and memory management.
The blob is stored as a directory containing:
.conf: Configuration file (dtype, shape, block_size).stat: State file (length, block list)- Block files: UUID-named memory-mapped tensor files
Parameters
- filename (str or Path): Directory path for blob storage. Supports tilde expansion (~) and relative paths.
- mode (str, default="r"): File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
- dtype (str or torch.dtype, optional): Data type for tensors. Required for new blobs (modes 'w', 'w+').
- shape (tuple of int or int, optional): Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
- block_size (int, default=8192): Number of tensors per memory-mapped block file.
- max_cached_blocks (int, optional): Maximum number of memory-mapped blocks to keep cached. When exceeded, least recently used blocks are unmapped. If None (default), uses 1/16 of system's max_map_count limit (typically ~4000). This limits kernel VMA overhead for blobs with many blocks.
Returns
- TensorBlob: Opened blob object. Use with context manager for automatic cleanup.
Raises
- FileNotFoundError: If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
- ValueError: If creating new blob without dtype or shape, or if mode is invalid.
- TypeError: If dtype is neither string nor torch.dtype.
Examples
Creating a new blob and writing data:
>>> import torch
>>> from tensorblob import TensorBlob
>>>
>>> with TensorBlob.open("data/embeddings", "w",
... dtype="float32", shape=(768,)) as blob:
... embeddings = torch.randn(1000, 768)
... blob.write(embeddings)
... print(f"Wrote {len(blob)} tensors")
Wrote 1000 tensors
Reading from existing blob:
>>> with TensorBlob.open("data/embeddings", "r") as blob:
... all_data = blob.read()
... print(all_data.shape)
torch.Size([1000, 768])
Appending to existing blob:
>>> with TensorBlob.open("data/embeddings", "a") as blob:
... new_data = torch.randn(100, 768)
... blob.write(new_data)
... print(f"Total: {len(blob)}")
Total: 1100
Read and update with r+ mode:
>>> with TensorBlob.open("data/embeddings", "r+") as blob:
... first_10 = blob.read(size=10)
... blob.seek(5)
... blob.write(torch.ones(3, 768)) # Overwrite at position 5
Custom block size for large tensors:
>>> with TensorBlob.open("data/images", "w",
... dtype=torch.float32,
... shape=(3, 1024, 1024),
... block_size=256) as blob:
... images = torch.randn(1000, 3, 1024, 1024)
... blob.write(images)
Custom cache size for large-scale random access:
>>> # Increase cache for better random access performance
>>> with TensorBlob.open("data/embeddings", "r",
... max_cached_blocks=10000) as blob:
... for idx in random_indices:
... embedding = blob[idx] # Frequently accessed blocks stay cached
>>> # Decrease cache for memory-constrained environments
>>> with TensorBlob.open("data/features", "r",
... max_cached_blocks=100) as blob:
... for feature in blob: # Sequential access works fine
... process(feature)
File Access Modes
Similar to Python's built-in open(), supports the following modes:
Basic modes:
- 'r' : Read-only. Blob must exist. Position starts at beginning.
- 'w' : Write-only. Creates new or truncates existing. Position at start. If the blob already exists, truncation will ignore any other parameters supplied and rely on existing configuration.
- 'a' : Append-only. Blob must exist. Position starts at end. All writes go to end regardless of seek position.
Update modes (with '+'):
- 'r+' : Read and write. Blob must exist. Position at start. Can overwrite existing data or extend at end.
- 'w+' : Read and write. Creates new or truncates existing. Position at start.
- 'a+' : Read and append. Blob must exist. Position at end. Reads allowed anywhere, writes always append to end.
Data Type and Shape
All tensors in a blob must have the same dtype and shape. These are specified when creating a new blob (modes 'w', 'w+') and stored in the configuration file. When opening existing blobs, dtype and shape are loaded automatically.
Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. Can also use torch.dtype objects like torch.float32.
Shape can be:
- Single integer: shape=10 creates 1D tensors of shape (10,)
- Tuple: shape=(3, 224, 224) creates 3D tensors
231 @classmethod 232 def unlink(cls, filename): 233 filename = Path(filename).expanduser().resolve() 234 if filename.exists(): 235 try: 236 with cls.open(filename, "w") as _: 237 pass 238 os.unlink(filename / cls.config_name) 239 os.unlink(filename / cls.status_name) 240 os.rmdir(os.fspath(filename)) 241 except Exception as exc: 242 warnings.warn("Failed to unlink blob at %r: %s" % (filename, exc)) 243 return False 244 return True
Apply post-processing hooks to the JSON dictionary.
orjson.loads only decode configs to primitive types, which may not be directly
consumable by the class initializer. For instance, a dataclass object will be
loaded as a dictionary. Therefore, this method is intended to be overridden by the
subclass to perform additional post-processing on the loaded config dictionary.
Note that, it is highly discouraged to abuse this method to deserialize complex objects
and one should consider using runtime_kwargs argument of from_config instead,
to explicitly pass the complex objects to the class initializer.
By default, this method returns the input dictionary unchanged.
Parameters
- jdict (dict[str, Any]): The config dictionary after deserialization.
Returns
- dict[str, Any]: The config dictionary after post-processing.
438 def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int: 439 self._checkclosed() 440 match whence: 441 case io.SEEK_SET: 442 _pos = pos 443 case io.SEEK_CUR: 444 _pos = self._pos + pos 445 case io.SEEK_END: 446 _pos = len(self) + pos 447 case _: 448 raise ValueError("Invalid whence: %r" % whence) 449 self._pos = max(min(_pos, len(self)), 0) 450 return self.tell()
468 def write(self, ts: torch.Tensor) -> int: 469 self._checkwritable() 470 if self._m_ap: 471 self.seek(whence=io.SEEK_END) 472 ts = ts.view(-1, *self.shape) 473 nt = ts.size(0) 474 475 cnt = 0 476 while cnt < nt: 477 if self._isfull() and self._pos >= len(self): 478 self._addblock() 479 i, o = divmod(self._pos, self.block_size) 480 incr = min(self.block_size - o, nt - cnt) 481 self._getblock(i)[o : o + incr] = ts[cnt : cnt + incr] 482 483 # Update status length for new tensors exceeding the original range only, because 484 # the cursor may not always be the at EOF and the number of tensors written could 485 # be smaller than change in length 486 self._pos += incr 487 self._status.len += max(0, self._pos - len(self)) 488 489 cnt += incr 490 491 assert cnt == nt, "Write incomplete: wrote %d of %d tensors!" % (cnt, nt) 492 return cnt
494 def truncate(self, pos: int | None = None) -> int: 495 self._checkwritable() 496 self.seek(pos or self.tell()) 497 brk = ceil(self.tell() / self.block_size) 498 for bd in self._status.bds[brk:]: 499 if bd in self._memmap: 500 del self._memmap[bd] 501 os.remove(os.path.join(self.filename, bd)) 502 self._status.bds = self._status.bds[:brk] 503 self._status.len = self.tell() 504 self.flush() 505 return self.tell()
507 def extend(self, other: TensorBlob, maintain_order: bool = False) -> None: 508 if self.dtype != other.dtype or self.shape != other.shape: 509 raise ValueError("Blob data types and shapes must match to extend blobs!") 510 511 self._checkwritable() 512 self.seek(whence=io.SEEK_END) 513 514 # TODO: Honestly this is a bit inefficient but I think this is rarely used. 515 if maintain_order: 516 for i in range(len(other)): 517 self.write(other[i]) 518 return 519 520 # If order is not important, we can simply copy over the complete blocks from 521 # the other blob and merge incomplete blocks. 522 if self.block_size != other.block_size: 523 raise ValueError( 524 "Block sizes must match to extend blobs in non-order-preserving mode!" 525 ) 526 527 comb = [] 528 sbrk = len(self) // self.block_size * self.block_size 529 if sbrk < len(self): 530 comb.append(self[sbrk:]) 531 obrk = len(other) // other.block_size * other.block_size 532 if obrk < len(other): 533 comb.append(other[obrk:]) 534 535 # TODO: We are directly accessing internal data structures of the other blob here. 536 self.truncate(sbrk) 537 for obd in other._status.bds[: len(other) // other.block_size]: 538 sbd = str(uuid.uuid4()) 539 shutil.copy( 540 os.path.join(other.filename, obd), os.path.join(self.filename, sbd) 541 ) 542 self._status.bds.append(sbd) 543 self._status.len += self.block_size 544 self._memmap[sbd] = MemoryMappedTensor.from_filename( 545 os.path.join(self.filename, sbd), 546 dtype=getattr(torch, self.dtype), 547 shape=(self.block_size, *self.shape), 548 ) 549 550 self.seek(whence=io.SEEK_END) 551 if comb: 552 self.write(torch.cat(comb, dim=0)) 553 self.flush()