tensorblob
tensorblob
A lightweight, dynamic-sized, memory-mapped tensor storage with file-like APIs, while also supporting integer indexing and slicing, built with MemoryMappedTensor from tensordict.
Features
- 🔗 Memory-mapped storage: Efficient storage of large collections of same-shaped tensors
- 💾 File-like APIs: Read, write, and seek like a file, while also supporting integer indexing and slicing
- âš¡ Dynamic-sized: No need to specify the total number of tensors upfront
- 🔄 Extend and truncate: Extend the blob with another blob or truncate the blob to a specific position
Installation
From PyPI:
pip install tensorblob
If you are interested in the experimental (i.e., unstable and undertested) version, you can install it from GitHub:
pip install git+https://github.com/Guest400123064/tensorblob.git
Core Use Cases
Quick Start
The example below shows how to create a new storage for a collection of randomly generated fake embeddings, and how to access them by index. Since the storage is memory-mapped, no need to read all tensors into memory; just access them by index.
import torch
from tensorblob import TensorBlob
# Create a new storage for a collection of randomly generated fake embeddings;
# need to specify the data type and shape of each tensor for creation
with TensorBlob.open("embeddings.blob", "w", dtype="float32", shape=768) as blob:
blob.write(torch.randn(100_000, 768))
print(f"Wrote {len(blob)} embeddings")
# No need to specify the configurations again after creation
with TensorBlob.open("embeddings.blob", "r") as blob:
e1 = blob[42]
e2 = blob[-1:16384:-12345]
print(f"Similarity: {torch.cosine_similarity(e1, e2)}")
Processing Large Datasets
Store and preprocess datasets larger than RAM using memory mapping can be useful to accelerate the training process by reducing the time spent on data loading and transformation.
with TensorBlob.open("data/images.blob", "w", dtype="float32", shape=(3, 224, 224)) as blob:
for image_batch in data_loader:
blob.write(preprocess(image_batch))
with TensorBlob.open("data/images.blob", "r") as blob:
for image in blob:
result = model(image)
Incremental Data Collection
Append new data to existing blobs can be useful with streaming data collection.
with TensorBlob.open("positions.blob", "w", dtype="float32", shape=3) as blob:
blob.write(initial_position)
# Later: append more data by opening the blob in append mode
with TensorBlob.open("positions.blob", "a") as blob:
for pos in trajectory_queue.get():
blob.write(pos)
print(f"Total trajectory recorded: {len(blob)}")
Random Access and Updates with File-Like APIs
Read and modify specific tensors starting from a specific position.
import io
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.seek(1000)
print(f"Current position: {blob.tell()}")
batch = blob.read(size=100)
print(f"Read {batch.shape} tensors")
# Update specific positions, whence is also supported
blob.seek(-500, whence=io.SEEK_END)
blob.write(updated_features)
# Append new data
blob.seek(len(blob))
blob.write(additional_features)
Extend and Truncate
Extend the blob with another blob or truncate the blob to a specific position. Extension could be useful if we want to merge two blobs into one, e.g., results from two different processes. Note that extension operation does not delete the original data.
with TensorBlob.open("data/features.blob", "a") as blob:
blob.extend(other_blob)
# Extension without maintaining the order is faster
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.extend(other_blob, maintain_order=False)
with TensorBlob.open("data/features.blob", "r+") as blob:
blob.truncate(1000)
print(f"Truncated to {len(blob)} tensors")
Contributing
Contributions welcome! Please submit a Pull Request.
License
Apache License 2.0 - see LICENSE file for details.
35class TensorBlob(ConfigMixin): 36 _m_rd = False 37 _m_wr = False 38 _m_ap = False 39 40 status_name = ".stat" 41 config_name = ".conf" 42 ignore_for_config = ["filename", "mode"] 43 44 @classmethod 45 def open(cls, filename, mode="r", *, dtype=None, shape=None, block_size=8192): 46 r"""Open a TensorBlob with file-like interface for tensor storage. 47 48 TensorBlob provides persistent, memory-mapped storage for large collections 49 of same-shaped tensors. It uses a block-based architecture where tensors are 50 organized into fixed-size blocks for efficient I/O and memory management. 51 52 The blob is stored as a directory containing: 53 - ``.conf``: Configuration file (dtype, shape, block_size) 54 - ``.stat``: State file (length, block list) 55 - Block files: UUID-named memory-mapped tensor files 56 57 Parameters 58 ---------- 59 filename : str or Path 60 Directory path for blob storage. Supports tilde expansion (~) and 61 relative paths. 62 mode : str, default="r" 63 File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details. 64 dtype : str or torch.dtype, optional 65 Data type for tensors. Required for new blobs (modes 'w', 'w+'). 66 shape : tuple of int or int, optional 67 Shape of individual tensors. Required for new blobs (modes 'w', 'w+'). 68 block_size : int, default=8192 69 Number of tensors per memory-mapped block file. 70 71 Returns 72 ------- 73 TensorBlob 74 Opened blob object. Use with context manager for automatic cleanup. 75 76 Raises 77 ------ 78 FileNotFoundError 79 If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist. 80 ValueError 81 If creating new blob without dtype or shape, or if mode is invalid. 82 TypeError 83 If dtype is neither string nor torch.dtype. 84 85 Examples 86 -------- 87 Creating a new blob and writing data: 88 89 >>> import torch 90 >>> from tensorblob import TensorBlob 91 >>> 92 >>> with TensorBlob.open("data/embeddings", "w", 93 ... dtype="float32", shape=(768,)) as blob: 94 ... embeddings = torch.randn(1000, 768) 95 ... blob.write(embeddings) 96 ... print(f"Wrote {len(blob)} tensors") 97 Wrote 1000 tensors 98 99 Reading from existing blob: 100 101 >>> with TensorBlob.open("data/embeddings", "r") as blob: 102 ... all_data = blob.read() 103 ... print(all_data.shape) 104 torch.Size([1000, 768]) 105 106 Appending to existing blob: 107 108 >>> with TensorBlob.open("data/embeddings", "a") as blob: 109 ... new_data = torch.randn(100, 768) 110 ... blob.write(new_data) 111 ... print(f"Total: {len(blob)}") 112 Total: 1100 113 114 Read and update with r+ mode: 115 116 >>> with TensorBlob.open("data/embeddings", "r+") as blob: 117 ... first_10 = blob.read(size=10) 118 ... blob.seek(5) 119 ... blob.write(torch.ones(3, 768)) # Overwrite at position 5 120 121 Custom block size for large tensors: 122 123 >>> with TensorBlob.open("data/images", "w", 124 ... dtype=torch.float32, 125 ... shape=(3, 1024, 1024), 126 ... block_size=256) as blob: 127 ... images = torch.randn(1000, 3, 1024, 1024) 128 ... blob.write(images) 129 130 File Access Modes 131 ----------------- 132 Similar to Python's built-in open(), supports the following modes: 133 134 Basic modes: 135 - 'r' : Read-only. Blob must exist. Position starts at beginning. 136 - 'w' : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists, 137 truncation will ignore any other parameters supplied and rely on existing configuration.** 138 - 'a' : Append-only. Blob must exist. Position starts at end. 139 All writes go to end regardless of seek position. 140 141 Update modes (with '+'): 142 - 'r+' : Read and write. Blob must exist. Position at start. 143 Can overwrite existing data or extend at end. 144 - 'w+' : Read and write. Creates new or truncates existing. Position at start. 145 - 'a+' : Read and append. Blob must exist. Position at end. 146 Reads allowed anywhere, writes always append to end. 147 148 Data Type and Shape 149 ------------------- 150 All tensors in a blob must have the same dtype and shape. These are 151 specified when creating a new blob (modes 'w', 'w+') and stored in 152 the configuration file. When opening existing blobs, dtype and shape 153 are loaded automatically. 154 155 Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. 156 Can also use torch.dtype objects like torch.float32. 157 158 Shape can be: 159 - Single integer: shape=10 creates 1D tensors of shape (10,) 160 - Tuple: shape=(3, 224, 224) creates 3D tensors 161 """ 162 modes = set(mode) 163 if modes - set("raw+") or len(mode) > len(modes): 164 raise ValueError("Invalid mode: %s" % mode) 165 if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1: 166 raise ValueError( 167 "Must have exactly one of read/write/append mode and at most one plus: %s" 168 % mode 169 ) 170 171 filename = Path(filename).expanduser().resolve() 172 if not filename.exists(): 173 if "r" in modes or "a" in modes: 174 raise FileNotFoundError("Blob not found: %r" % filename) 175 if dtype is None or shape is None: 176 raise ValueError( 177 "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r" 178 % (dtype, shape) 179 ) 180 if isinstance(dtype, torch.dtype): 181 dtype = str(dtype).split(".").pop() 182 elif not isinstance(dtype, str): 183 raise TypeError( 184 "dtype must be str or torch.dtype, got %r" % type(dtype).__name__ 185 ) 186 shape = (shape,) if isinstance(shape, int) else tuple(shape) 187 return cls(os.fspath(filename), dtype, shape, block_size, mode) 188 189 return cls.from_config( 190 save_directory=filename, 191 runtime_kwargs={"mode": mode, "filename": os.fspath(filename)}, 192 ) 193 194 @classmethod 195 def apply_param_hooks(cls, d): 196 d["shape"] = tuple(d["shape"]) 197 return d 198 199 @register_to_config 200 def __init__( 201 self, 202 filename: str, 203 dtype: str, 204 shape: tuple[int, ...], 205 block_size: int, 206 mode: str, 207 ) -> None: 208 self.filename = filename 209 self.dtype = dtype 210 self.shape = shape 211 self.block_size = block_size 212 self.mode = mode 213 214 self._pos = 0 215 self._closed = False 216 217 if "+" in mode: 218 self._m_rd = True 219 self._m_wr = True 220 match mode.replace("+", ""): 221 case "r": 222 self._m_rd = True 223 case "w": 224 self._m_wr = True 225 self._trunc() 226 case "a": 227 self._m_wr = True 228 self._m_ap = True 229 self._create() 230 231 self._loadstatus() 232 233 @property 234 def configpath(self) -> str: 235 return os.path.join(self.filename, self.config_name) 236 237 @property 238 def statuspath(self) -> str: 239 return os.path.join(self.filename, self.status_name) 240 241 @property 242 def closed(self) -> bool: 243 return self._closed 244 245 def __enter__(self) -> TensorBlob: 246 return self 247 248 def __exit__(self, *_) -> None: 249 self.close() 250 251 def __len__(self) -> int: 252 return self._status.len 253 254 def __getitem__(self, idx: int | slice) -> torch.Tensor: 255 if not isinstance(idx, (int, slice)): 256 raise TypeError("Index must be int or slice, got %r!" % type(idx).__name__) 257 if isinstance(idx, int): 258 if idx >= len(self) or idx < -len(self): 259 raise IndexError( 260 "Index out of bounds: %r (length: %d)" % (idx, len(self)) 261 ) 262 i, o = divmod(idx + len(self) if idx < 0 else idx, self.block_size) 263 return self._getblock(i)[o].clone() 264 265 # Although the current implementation may not be efficient, it is very easy to 266 # understand and debug. More efficient implementation requires much more complex 267 # edge case handling and is error prone. Also, I think the primary cost here is 268 # still the I/O operations, not the Python code. 269 ret = [ 270 self._getblock(bd)[[i % self.block_size for i in _is]] 271 for bd, _is in groupby( 272 range(*idx.indices(len(self))), key=lambda i: i // self.block_size 273 ) 274 ] 275 if not ret: 276 return torch.empty(0, *self.shape, dtype=getattr(torch, self.dtype)) 277 return torch.cat(ret, dim=0) 278 279 def __iter__(self) -> Iterator[torch.Tensor]: 280 for i in range(self._pos, len(self)): 281 self._pos += 1 282 yield self[i] 283 284 def _trunc(self) -> None: 285 if os.path.exists(self.filename): 286 try: 287 st = TensorBlobStatus.load(self.statuspath) 288 except FileNotFoundError as exc: 289 raise FileNotFoundError( 290 "Status file missing for blob at %r; file corrupted!" 291 % self.statuspath 292 ) from exc 293 for bd in st.bds: 294 os.remove(os.path.join(self.filename, bd)) 295 self.save_config(save_directory=self.filename, overwrite=True) 296 TensorBlobStatus().dump(self.statuspath) 297 298 def _create(self) -> None: 299 if not os.path.exists(self.filename): 300 self.save_config(save_directory=self.filename) 301 TensorBlobStatus().dump(self.statuspath) 302 303 def _getblock(self, bd: str | int = -1) -> MemoryMappedTensor: 304 if not self._status.bds: 305 self._addblock() 306 if isinstance(bd, int): 307 bd = self._status.bds[bd] 308 return self._memmap[bd] 309 310 def _isfull(self) -> bool: 311 return (not len(self) % self.block_size) and bool(len(self)) 312 313 def _addblock(self) -> MemoryMappedTensor: 314 if self._status.bds and not self._isfull(): 315 raise RuntimeError( 316 "Attempt to create a new block when working block " 317 "is not full: length <%d> < capacity <%d>." 318 % (len(self) % self.block_size, self.block_size) 319 ) 320 name = str(uuid.uuid4()) 321 mmap = MemoryMappedTensor.empty( 322 self.block_size, 323 *self.shape, 324 dtype=getattr(torch, self.dtype), 325 filename=os.path.join(self.filename, name), 326 ) 327 self._status.bds.append(name) 328 self._memmap[name] = mmap 329 return mmap 330 331 def _loadstatus(self) -> None: 332 try: 333 self._status = TensorBlobStatus.load(self.statuspath) 334 self._memmap = { 335 name: MemoryMappedTensor.from_filename( 336 os.path.join(self.filename, name), 337 dtype=getattr(torch, self.dtype), 338 shape=(self.block_size, *self.shape), 339 ) 340 for name in self._status.bds 341 } 342 if self._m_ap: 343 self._pos = len(self) 344 except FileNotFoundError as exc: 345 raise FileNotFoundError( 346 "status file missing for blob at %r; file corrupted!" % self.statuspath 347 ) from exc 348 349 def _checkclosed(self) -> None: 350 if self._closed: 351 raise IOError("I/O operation on closed blob.") 352 353 def _checkwritable(self) -> None: 354 if not self._m_wr: 355 raise IOError("Blob is not open for writing (mode='%s')" % self.mode) 356 self._checkclosed() 357 358 def _checkreadable(self) -> None: 359 if not self._m_rd: 360 raise IOError("Blob is not open for reading (mode='%s')" % self.mode) 361 self._checkclosed() 362 363 def tell(self) -> int: 364 self._checkclosed() 365 return self._pos 366 367 def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int: 368 self._checkclosed() 369 match whence: 370 case io.SEEK_SET: 371 _pos = pos 372 case io.SEEK_CUR: 373 _pos = self._pos + pos 374 case io.SEEK_END: 375 _pos = len(self) + pos 376 case _: 377 raise ValueError("Invalid whence: %r" % whence) 378 self._pos = max(min(_pos, len(self)), 0) 379 return self.tell() 380 381 def close(self) -> None: 382 if not self._closed and self._m_wr: 383 self.flush() 384 self._closed = True 385 386 def flush(self) -> None: 387 self._checkwritable() 388 self._status.dump(self.statuspath) 389 390 def read(self, size: int | None = None) -> torch.Tensor: 391 self._checkreadable() 392 end = min(self._pos + (size or len(self)), len(self)) 393 ret = self[self._pos : end] 394 self.seek(end) 395 return ret 396 397 def write(self, ts: torch.Tensor) -> int: 398 self._checkwritable() 399 if self._m_ap: 400 self.seek(whence=io.SEEK_END) 401 ts = ts.view(-1, *self.shape) 402 for t in ts: 403 if self._isfull() and self._pos >= len(self): 404 self._addblock() 405 i, o = divmod(self._pos, self.block_size) 406 self._getblock(i)[o] = t 407 self._status.len += self._pos >= len(self) 408 self._pos += 1 409 return len(ts) 410 411 def truncate(self, pos: int | None = None) -> int: 412 self._checkwritable() 413 self.seek(pos or self.tell()) 414 brk = ceil(self.tell() / self.block_size) 415 for bd in self._status.bds[brk:]: 416 os.remove(self._memmap.pop(bd).filename) 417 self._status.bds = self._status.bds[:brk] 418 self._status.len = self.tell() 419 self.flush() 420 return self.tell() 421 422 def extend(self, other: TensorBlob, maintain_order: bool = False) -> None: 423 if self.dtype != other.dtype or self.shape != other.shape: 424 raise ValueError("Blob data types and shapes must match to extend blobs!") 425 426 self._checkwritable() 427 self.seek(whence=io.SEEK_END) 428 if maintain_order: 429 for i in range(len(other)): 430 self.write(other[i]) 431 return 432 433 # If order is not important, we can simply copy over the complete blocks from 434 # the other blob and merge incomplete blocks. 435 if self.block_size != other.block_size: 436 raise ValueError( 437 "Block sizes must match to extend blobs in non-order-preserving mode!" 438 ) 439 440 comb = [] 441 sbrk = len(self) // self.block_size * self.block_size 442 if sbrk < len(self): 443 comb.append(self[sbrk:]) 444 obrk = len(other) // other.block_size * other.block_size 445 if obrk < len(other): 446 comb.append(other[obrk:]) 447 448 # TODO: We are directly accessing internal data structures of the other blob here. 449 self.truncate(sbrk) 450 for obd in other._status.bds[: len(other) // other.block_size]: 451 sbd = str(uuid.uuid4()) 452 shutil.copy( 453 os.path.join(other.filename, obd), os.path.join(self.filename, sbd) 454 ) 455 self._status.bds.append(sbd) 456 self._status.len += self.block_size 457 self._memmap[sbd] = MemoryMappedTensor.from_filename( 458 os.path.join(self.filename, sbd), 459 dtype=getattr(torch, self.dtype), 460 shape=(self.block_size, *self.shape), 461 ) 462 463 self.seek(whence=io.SEEK_END) 464 if comb: 465 self.write(torch.cat(comb, dim=0)) 466 self.flush()
Mixin class for automated configuration registration and IO.
Attributes
- config_name (str, default=None):
Class attribute that specifies the filename under which the config should be stored when calling
save_config. Should be overridden by the subclass. - ignore_for_config (list[str], default=[]): Class attribute that specifies a list of attributes that should not be saved in the config. Should be overridden by the subclass.
Examples
In this example, we have a model with 3 arguments:
hidden_size: The hidden size of the model._num_layers: The number of layers in the model.dropout: The dropout rate of the model.
Among the three arguments, the number of layers is implicitly ignored by the decorator because of the leading
underscore; the dropout argument is explicitly based on the specification in ignore_for_config class
variable. The hidden_size argument is registered to the config.
>>> class MyModel(ConfigMixin):
... config_name = "my_model_config.json"
... ignore_for_config = ["dropout"]
...
... @register_to_config
... def __init__(self, hidden_size: int = 768, _num_layers: int = 12, dropout: float = 0.1):
... self.hidden_size = hidden_size
... self.num_layers = _num_layers
... self.dropout = dropout # This will be ignored because of the specification in `ignore_for_config`
...
>>> model = MyModel(hidden_size=1024, _num_layers=20, dropout=0.2)
>>> model.config
mappingproxy({'__notes__': {'class_name': 'MyModel', 'using_default_values': [], 'args': (), 'kwargs': {}}, 'hidden_size': 1024})
>>> model.num_layers
20
>>> model.dropout
0.2
199 @register_to_config 200 def __init__( 201 self, 202 filename: str, 203 dtype: str, 204 shape: tuple[int, ...], 205 block_size: int, 206 mode: str, 207 ) -> None: 208 self.filename = filename 209 self.dtype = dtype 210 self.shape = shape 211 self.block_size = block_size 212 self.mode = mode 213 214 self._pos = 0 215 self._closed = False 216 217 if "+" in mode: 218 self._m_rd = True 219 self._m_wr = True 220 match mode.replace("+", ""): 221 case "r": 222 self._m_rd = True 223 case "w": 224 self._m_wr = True 225 self._trunc() 226 case "a": 227 self._m_wr = True 228 self._m_ap = True 229 self._create() 230 231 self._loadstatus()
44 @classmethod 45 def open(cls, filename, mode="r", *, dtype=None, shape=None, block_size=8192): 46 r"""Open a TensorBlob with file-like interface for tensor storage. 47 48 TensorBlob provides persistent, memory-mapped storage for large collections 49 of same-shaped tensors. It uses a block-based architecture where tensors are 50 organized into fixed-size blocks for efficient I/O and memory management. 51 52 The blob is stored as a directory containing: 53 - ``.conf``: Configuration file (dtype, shape, block_size) 54 - ``.stat``: State file (length, block list) 55 - Block files: UUID-named memory-mapped tensor files 56 57 Parameters 58 ---------- 59 filename : str or Path 60 Directory path for blob storage. Supports tilde expansion (~) and 61 relative paths. 62 mode : str, default="r" 63 File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details. 64 dtype : str or torch.dtype, optional 65 Data type for tensors. Required for new blobs (modes 'w', 'w+'). 66 shape : tuple of int or int, optional 67 Shape of individual tensors. Required for new blobs (modes 'w', 'w+'). 68 block_size : int, default=8192 69 Number of tensors per memory-mapped block file. 70 71 Returns 72 ------- 73 TensorBlob 74 Opened blob object. Use with context manager for automatic cleanup. 75 76 Raises 77 ------ 78 FileNotFoundError 79 If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist. 80 ValueError 81 If creating new blob without dtype or shape, or if mode is invalid. 82 TypeError 83 If dtype is neither string nor torch.dtype. 84 85 Examples 86 -------- 87 Creating a new blob and writing data: 88 89 >>> import torch 90 >>> from tensorblob import TensorBlob 91 >>> 92 >>> with TensorBlob.open("data/embeddings", "w", 93 ... dtype="float32", shape=(768,)) as blob: 94 ... embeddings = torch.randn(1000, 768) 95 ... blob.write(embeddings) 96 ... print(f"Wrote {len(blob)} tensors") 97 Wrote 1000 tensors 98 99 Reading from existing blob: 100 101 >>> with TensorBlob.open("data/embeddings", "r") as blob: 102 ... all_data = blob.read() 103 ... print(all_data.shape) 104 torch.Size([1000, 768]) 105 106 Appending to existing blob: 107 108 >>> with TensorBlob.open("data/embeddings", "a") as blob: 109 ... new_data = torch.randn(100, 768) 110 ... blob.write(new_data) 111 ... print(f"Total: {len(blob)}") 112 Total: 1100 113 114 Read and update with r+ mode: 115 116 >>> with TensorBlob.open("data/embeddings", "r+") as blob: 117 ... first_10 = blob.read(size=10) 118 ... blob.seek(5) 119 ... blob.write(torch.ones(3, 768)) # Overwrite at position 5 120 121 Custom block size for large tensors: 122 123 >>> with TensorBlob.open("data/images", "w", 124 ... dtype=torch.float32, 125 ... shape=(3, 1024, 1024), 126 ... block_size=256) as blob: 127 ... images = torch.randn(1000, 3, 1024, 1024) 128 ... blob.write(images) 129 130 File Access Modes 131 ----------------- 132 Similar to Python's built-in open(), supports the following modes: 133 134 Basic modes: 135 - 'r' : Read-only. Blob must exist. Position starts at beginning. 136 - 'w' : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists, 137 truncation will ignore any other parameters supplied and rely on existing configuration.** 138 - 'a' : Append-only. Blob must exist. Position starts at end. 139 All writes go to end regardless of seek position. 140 141 Update modes (with '+'): 142 - 'r+' : Read and write. Blob must exist. Position at start. 143 Can overwrite existing data or extend at end. 144 - 'w+' : Read and write. Creates new or truncates existing. Position at start. 145 - 'a+' : Read and append. Blob must exist. Position at end. 146 Reads allowed anywhere, writes always append to end. 147 148 Data Type and Shape 149 ------------------- 150 All tensors in a blob must have the same dtype and shape. These are 151 specified when creating a new blob (modes 'w', 'w+') and stored in 152 the configuration file. When opening existing blobs, dtype and shape 153 are loaded automatically. 154 155 Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. 156 Can also use torch.dtype objects like torch.float32. 157 158 Shape can be: 159 - Single integer: shape=10 creates 1D tensors of shape (10,) 160 - Tuple: shape=(3, 224, 224) creates 3D tensors 161 """ 162 modes = set(mode) 163 if modes - set("raw+") or len(mode) > len(modes): 164 raise ValueError("Invalid mode: %s" % mode) 165 if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1: 166 raise ValueError( 167 "Must have exactly one of read/write/append mode and at most one plus: %s" 168 % mode 169 ) 170 171 filename = Path(filename).expanduser().resolve() 172 if not filename.exists(): 173 if "r" in modes or "a" in modes: 174 raise FileNotFoundError("Blob not found: %r" % filename) 175 if dtype is None or shape is None: 176 raise ValueError( 177 "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r" 178 % (dtype, shape) 179 ) 180 if isinstance(dtype, torch.dtype): 181 dtype = str(dtype).split(".").pop() 182 elif not isinstance(dtype, str): 183 raise TypeError( 184 "dtype must be str or torch.dtype, got %r" % type(dtype).__name__ 185 ) 186 shape = (shape,) if isinstance(shape, int) else tuple(shape) 187 return cls(os.fspath(filename), dtype, shape, block_size, mode) 188 189 return cls.from_config( 190 save_directory=filename, 191 runtime_kwargs={"mode": mode, "filename": os.fspath(filename)}, 192 )
Open a TensorBlob with file-like interface for tensor storage.
TensorBlob provides persistent, memory-mapped storage for large collections of same-shaped tensors. It uses a block-based architecture where tensors are organized into fixed-size blocks for efficient I/O and memory management.
The blob is stored as a directory containing:
.conf: Configuration file (dtype, shape, block_size).stat: State file (length, block list)- Block files: UUID-named memory-mapped tensor files
Parameters
- filename (str or Path): Directory path for blob storage. Supports tilde expansion (~) and relative paths.
- mode (str, default="r"): File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
- dtype (str or torch.dtype, optional): Data type for tensors. Required for new blobs (modes 'w', 'w+').
- shape (tuple of int or int, optional): Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
- block_size (int, default=8192): Number of tensors per memory-mapped block file.
Returns
- TensorBlob: Opened blob object. Use with context manager for automatic cleanup.
Raises
- FileNotFoundError: If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
- ValueError: If creating new blob without dtype or shape, or if mode is invalid.
- TypeError: If dtype is neither string nor torch.dtype.
Examples
Creating a new blob and writing data:
>>> import torch
>>> from tensorblob import TensorBlob
>>>
>>> with TensorBlob.open("data/embeddings", "w",
... dtype="float32", shape=(768,)) as blob:
... embeddings = torch.randn(1000, 768)
... blob.write(embeddings)
... print(f"Wrote {len(blob)} tensors")
Wrote 1000 tensors
Reading from existing blob:
>>> with TensorBlob.open("data/embeddings", "r") as blob:
... all_data = blob.read()
... print(all_data.shape)
torch.Size([1000, 768])
Appending to existing blob:
>>> with TensorBlob.open("data/embeddings", "a") as blob:
... new_data = torch.randn(100, 768)
... blob.write(new_data)
... print(f"Total: {len(blob)}")
Total: 1100
Read and update with r+ mode:
>>> with TensorBlob.open("data/embeddings", "r+") as blob:
... first_10 = blob.read(size=10)
... blob.seek(5)
... blob.write(torch.ones(3, 768)) # Overwrite at position 5
Custom block size for large tensors:
>>> with TensorBlob.open("data/images", "w",
... dtype=torch.float32,
... shape=(3, 1024, 1024),
... block_size=256) as blob:
... images = torch.randn(1000, 3, 1024, 1024)
... blob.write(images)
File Access Modes
Similar to Python's built-in open(), supports the following modes:
Basic modes:
- 'r' : Read-only. Blob must exist. Position starts at beginning.
- 'w' : Write-only. Creates new or truncates existing. Position at start. If the blob already exists, truncation will ignore any other parameters supplied and rely on existing configuration.
- 'a' : Append-only. Blob must exist. Position starts at end. All writes go to end regardless of seek position.
Update modes (with '+'):
- 'r+' : Read and write. Blob must exist. Position at start. Can overwrite existing data or extend at end.
- 'w+' : Read and write. Creates new or truncates existing. Position at start.
- 'a+' : Read and append. Blob must exist. Position at end. Reads allowed anywhere, writes always append to end.
Data Type and Shape
All tensors in a blob must have the same dtype and shape. These are specified when creating a new blob (modes 'w', 'w+') and stored in the configuration file. When opening existing blobs, dtype and shape are loaded automatically.
Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. Can also use torch.dtype objects like torch.float32.
Shape can be:
- Single integer: shape=10 creates 1D tensors of shape (10,)
- Tuple: shape=(3, 224, 224) creates 3D tensors
Apply post-processing hooks to the JSON dictionary.
orjson.loads only decode configs to primitive types, which may not be directly
consumable by the class initializer. For instance, a dataclass object will be
loaded as a dictionary. Therefore, this method is intended to be overridden by the
subclass to perform additional post-processing on the loaded config dictionary.
Note that, it is highly discouraged to abuse this method to deserialize complex objects
and one should consider using runtime_kwargs argument of from_config instead,
to explicitly pass the complex objects to the class initializer.
By default, this method returns the input dictionary unchanged.
Parameters
- jdict (dict[str, Any]): The config dictionary after deserialization.
Returns
- dict[str, Any]: The config dictionary after post-processing.
367 def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int: 368 self._checkclosed() 369 match whence: 370 case io.SEEK_SET: 371 _pos = pos 372 case io.SEEK_CUR: 373 _pos = self._pos + pos 374 case io.SEEK_END: 375 _pos = len(self) + pos 376 case _: 377 raise ValueError("Invalid whence: %r" % whence) 378 self._pos = max(min(_pos, len(self)), 0) 379 return self.tell()
397 def write(self, ts: torch.Tensor) -> int: 398 self._checkwritable() 399 if self._m_ap: 400 self.seek(whence=io.SEEK_END) 401 ts = ts.view(-1, *self.shape) 402 for t in ts: 403 if self._isfull() and self._pos >= len(self): 404 self._addblock() 405 i, o = divmod(self._pos, self.block_size) 406 self._getblock(i)[o] = t 407 self._status.len += self._pos >= len(self) 408 self._pos += 1 409 return len(ts)
411 def truncate(self, pos: int | None = None) -> int: 412 self._checkwritable() 413 self.seek(pos or self.tell()) 414 brk = ceil(self.tell() / self.block_size) 415 for bd in self._status.bds[brk:]: 416 os.remove(self._memmap.pop(bd).filename) 417 self._status.bds = self._status.bds[:brk] 418 self._status.len = self.tell() 419 self.flush() 420 return self.tell()
422 def extend(self, other: TensorBlob, maintain_order: bool = False) -> None: 423 if self.dtype != other.dtype or self.shape != other.shape: 424 raise ValueError("Blob data types and shapes must match to extend blobs!") 425 426 self._checkwritable() 427 self.seek(whence=io.SEEK_END) 428 if maintain_order: 429 for i in range(len(other)): 430 self.write(other[i]) 431 return 432 433 # If order is not important, we can simply copy over the complete blocks from 434 # the other blob and merge incomplete blocks. 435 if self.block_size != other.block_size: 436 raise ValueError( 437 "Block sizes must match to extend blobs in non-order-preserving mode!" 438 ) 439 440 comb = [] 441 sbrk = len(self) // self.block_size * self.block_size 442 if sbrk < len(self): 443 comb.append(self[sbrk:]) 444 obrk = len(other) // other.block_size * other.block_size 445 if obrk < len(other): 446 comb.append(other[obrk:]) 447 448 # TODO: We are directly accessing internal data structures of the other blob here. 449 self.truncate(sbrk) 450 for obd in other._status.bds[: len(other) // other.block_size]: 451 sbd = str(uuid.uuid4()) 452 shutil.copy( 453 os.path.join(other.filename, obd), os.path.join(self.filename, sbd) 454 ) 455 self._status.bds.append(sbd) 456 self._status.len += self.block_size 457 self._memmap[sbd] = MemoryMappedTensor.from_filename( 458 os.path.join(self.filename, sbd), 459 dtype=getattr(torch, self.dtype), 460 shape=(self.block_size, *self.shape), 461 ) 462 463 self.seek(whence=io.SEEK_END) 464 if comb: 465 self.write(torch.cat(comb, dim=0)) 466 self.flush()