tensorblob

Python 3.10 License: Apache 2.0 test codecov PyPI

tensorblob

A lightweight, dynamic-sized, memory-mapped tensor storage with file-like APIs, while also supporting integer indexing and slicing, built with MemoryMappedTensor from tensordict.

Features

  • 🔗 Memory-mapped storage: Efficient storage of large collections of same-shaped tensors
  • 💾 File-like APIs: Read, write, and seek like a file, while also supporting integer indexing and slicing
  • âš¡ Dynamic-sized: No need to specify the total number of tensors upfront
  • 🔄 Extend and truncate: Extend the blob with another blob or truncate the blob to a specific position
  • 🚀 LRU cache: Automatic management of memory-mapped blocks for scalability with large blobs

Installation

From PyPI:

pip install tensorblob

If you are interested in the experimental (i.e., unstable and undertested) version, you can install it from GitHub:

pip install git+https://github.com/Guest400123064/tensorblob.git

Core Use Cases

Quick Start

The example below shows how to create a new storage for a collection of randomly generated fake embeddings, and how to access them by index. Since the storage is memory-mapped, no need to read all tensors into memory; just access them by index.

import torch
from tensorblob import TensorBlob

# Create a new storage for a collection of randomly generated fake embeddings;
# need to specify the data type and shape of each tensor for creation
with TensorBlob.open("embeddings.blob", "w", dtype="float32", shape=768) as blob:
    blob.write(torch.randn(100_000, 768))
    print(f"Wrote {len(blob)} embeddings")

# No need to specify the configurations again after creation
with TensorBlob.open("embeddings.blob", "r") as blob:
    e1 = blob[42]
    e2 = blob[-1:16384:-12345]
    print(f"Similarity: {torch.cosine_similarity(e1, e2)}")

Processing Large Datasets

Store and preprocess datasets larger than RAM using memory mapping can be useful to accelerate the training process by reducing the time spent on data loading and transformation.

with TensorBlob.open("data/images.blob", "w", dtype="float32", shape=(3, 224, 224)) as blob:
    for image_batch in data_loader:
        blob.write(preprocess(image_batch))

with TensorBlob.open("data/images.blob", "r") as blob:
    for image in blob:
        result = model(image)

Incremental Data Collection

Append new data to existing blobs can be useful with streaming data collection.

with TensorBlob.open("positions.blob", "w", dtype="float32", shape=3) as blob:
    blob.write(initial_position)

# Later: append more data by opening the blob in append mode
with TensorBlob.open("positions.blob", "a") as blob:
    for pos in trajectory_queue.get():
        blob.write(pos)
    print(f"Total trajectory recorded: {len(blob)}")

Random Access and Updates with File-Like APIs

Read and modify specific tensors starting from a specific position.

import io

with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.seek(1000)
    print(f"Current position: {blob.tell()}")

    batch = blob.read(size=100)
    print(f"Read {batch.shape} tensors")

    # Update specific positions, whence is also supported
    blob.seek(-500, whence=io.SEEK_END)
    blob.write(updated_features)

    # Append new data
    blob.seek(len(blob))
    blob.write(additional_features)

Extend and Truncate

Extend the blob with another blob or truncate the blob to a specific position. Extension could be useful if we want to merge two blobs into one, e.g., results from two different processes. Note that extension operation does not delete the original data.

with TensorBlob.open("data/features.blob", "a") as blob:
    blob.extend(other_blob)

# Extension without maintaining the order is faster
with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.extend(other_blob, maintain_order=False)

with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.truncate(1000)
    print(f"Truncated to {len(blob)} tensors")

Performance and Scalability

Memory Management

TensorBlob uses an LRU (Least Recently Used) cache to manage memory-mapped blocks efficiently. This allows you to work with blobs containing millions of tensors without loading everything into memory.

Default behavior:

  • Automatically caches up to ~4,000 blocks (1/16 of system's VMA limit)
  • Blocks loaded on-demand when accessed
  • Least recently used blocks automatically evicted when cache is full

For large-scale workloads:

# Increase cache for better random access performance
with TensorBlob.open("large.blob", "r", max_cached_blocks=10_000) as blob:
    for idx in random_indices:
        tensor = blob[idx]  # Cached blocks reused efficiently

# Decrease cache for memory-constrained environments
with TensorBlob.open("data.blob", "r", max_cached_blocks=100) as blob:
    for tensor in blob:  # Sequential access works fine with small cache
        process(tensor)

Performance tips:

  • Sequential access patterns work well with any cache size
  • Random access benefits from larger cache sizes
  • Each cached block consumes ~200 bytes of kernel memory (VMA overhead)
  • System limit: typically ~65,000 memory-mapped regions per process
  • To avoid frequent cache evictions, one can also increase the block size to reduce the total number of blocks

Contributing

Contributions welcome! Please submit a Pull Request.

License

Apache License 2.0 - see LICENSE file for details.

 1"""
 2.. include:: ../../README.md
 3"""
 4
 5from ._blob import TensorBlob
 6
 7__version__ = "0.1.3"
 8
 9__all__ = [
10    "TensorBlob",
11]
class TensorBlob(configmixin._core.ConfigMixin):
 38class TensorBlob(ConfigMixin):
 39    _m_rd = False
 40    _m_wr = False
 41    _m_ap = False
 42
 43    status_name = ".stat"
 44    config_name = ".conf"
 45    ignore_for_config = ["filename", "mode", "max_cached_blocks"]
 46
 47    @classmethod
 48    def open(
 49        cls,
 50        filename,
 51        mode="r",
 52        *,
 53        dtype=None,
 54        shape=None,
 55        block_size=8192,
 56        max_cached_blocks=None,
 57    ):
 58        r"""Open a TensorBlob with file-like interface for tensor storage.
 59
 60        TensorBlob provides persistent, memory-mapped storage for large collections
 61        of same-shaped tensors. It uses a block-based architecture where tensors are
 62        organized into fixed-size blocks for efficient I/O and memory management.
 63
 64        The blob is stored as a directory containing:
 65        - ``.conf``: Configuration file (dtype, shape, block_size)
 66        - ``.stat``: State file (length, block list)
 67        - Block files: UUID-named memory-mapped tensor files
 68
 69        Parameters
 70        ----------
 71        filename : str or Path
 72            Directory path for blob storage. Supports tilde expansion (~) and
 73            relative paths.
 74        mode : str, default="r"
 75            File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
 76        dtype : str or torch.dtype, optional
 77            Data type for tensors. Required for new blobs (modes 'w', 'w+').
 78        shape : tuple of int or int, optional
 79            Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
 80        block_size : int, default=8192
 81            Number of tensors per memory-mapped block file.
 82        max_cached_blocks : int, optional
 83            Maximum number of memory-mapped blocks to keep cached. When exceeded,
 84            least recently used blocks are unmapped. If None (default), uses 1/16
 85            of system's max_map_count limit (typically ~4000). This limits kernel
 86            VMA overhead for blobs with many blocks.
 87
 88        Returns
 89        -------
 90        TensorBlob
 91            Opened blob object. Use with context manager for automatic cleanup.
 92
 93        Raises
 94        ------
 95        FileNotFoundError
 96            If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
 97        ValueError
 98            If creating new blob without dtype or shape, or if mode is invalid.
 99        TypeError
100            If dtype is neither string nor torch.dtype.
101
102        Examples
103        --------
104        Creating a new blob and writing data:
105
106        >>> import torch
107        >>> from tensorblob import TensorBlob
108        >>>
109        >>> with TensorBlob.open("data/embeddings", "w",
110        ...                       dtype="float32", shape=(768,)) as blob:
111        ...     embeddings = torch.randn(1000, 768)
112        ...     blob.write(embeddings)
113        ...     print(f"Wrote {len(blob)} tensors")
114        Wrote 1000 tensors
115
116        Reading from existing blob:
117
118        >>> with TensorBlob.open("data/embeddings", "r") as blob:
119        ...     all_data = blob.read()
120        ...     print(all_data.shape)
121        torch.Size([1000, 768])
122
123        Appending to existing blob:
124
125        >>> with TensorBlob.open("data/embeddings", "a") as blob:
126        ...     new_data = torch.randn(100, 768)
127        ...     blob.write(new_data)
128        ...     print(f"Total: {len(blob)}")
129        Total: 1100
130
131        Read and update with r+ mode:
132
133        >>> with TensorBlob.open("data/embeddings", "r+") as blob:
134        ...     first_10 = blob.read(size=10)
135        ...     blob.seek(5)
136        ...     blob.write(torch.ones(3, 768))  # Overwrite at position 5
137
138        Custom block size for large tensors:
139
140        >>> with TensorBlob.open("data/images", "w",
141        ...                       dtype=torch.float32,
142        ...                       shape=(3, 1024, 1024),
143        ...                       block_size=256) as blob:
144        ...     images = torch.randn(1000, 3, 1024, 1024)
145        ...     blob.write(images)
146
147        Custom cache size for large-scale random access:
148
149        >>> # Increase cache for better random access performance
150        >>> with TensorBlob.open("data/embeddings", "r",
151        ...                       max_cached_blocks=10000) as blob:
152        ...     for idx in random_indices:
153        ...         embedding = blob[idx]  # Frequently accessed blocks stay cached
154
155        >>> # Decrease cache for memory-constrained environments
156        >>> with TensorBlob.open("data/features", "r",
157        ...                       max_cached_blocks=100) as blob:
158        ...     for feature in blob:  # Sequential access works fine
159        ...         process(feature)
160
161        File Access Modes
162        -----------------
163        Similar to Python's built-in open(), supports the following modes:
164
165        Basic modes:
166        - 'r'  : Read-only. Blob must exist. Position starts at beginning.
167        - 'w'  : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists,
168                   truncation will ignore any other parameters supplied and rely on existing configuration.**
169        - 'a'  : Append-only. Blob must exist. Position starts at end.
170                All writes go to end regardless of seek position.
171
172        Update modes (with '+'):
173        - 'r+' : Read and write. Blob must exist. Position at start.
174                   Can overwrite existing data or extend at end.
175        - 'w+' : Read and write. Creates new or truncates existing. Position at start.
176        - 'a+' : Read and append. Blob must exist. Position at end.
177                   Reads allowed anywhere, writes always append to end.
178
179        Data Type and Shape
180        -------------------
181        All tensors in a blob must have the same dtype and shape. These are
182        specified when creating a new blob (modes 'w', 'w+') and stored in
183        the configuration file. When opening existing blobs, dtype and shape
184        are loaded automatically.
185
186        Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc.
187        Can also use torch.dtype objects like torch.float32.
188
189        Shape can be:
190        - Single integer: shape=10 creates 1D tensors of shape (10,)
191        - Tuple: shape=(3, 224, 224) creates 3D tensors
192        """
193        modes = set(mode)
194        if modes - set("raw+") or len(mode) > len(modes):
195            raise ValueError("Invalid mode: %s" % mode)
196        if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1:
197            raise ValueError(
198                "Must have exactly one of read/write/append mode and at most one plus: %s"
199                % mode
200            )
201
202        filename = Path(filename).expanduser().resolve()
203        if not filename.exists():
204            if "r" in modes or "a" in modes:
205                raise FileNotFoundError("Blob not found: %r" % filename)
206            if dtype is None or shape is None:
207                raise ValueError(
208                    "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r"
209                    % (dtype, shape)
210                )
211            if isinstance(dtype, torch.dtype):
212                dtype = str(dtype).split(".").pop()
213            elif not isinstance(dtype, str):
214                raise TypeError(
215                    "dtype must be str or torch.dtype, got %r" % type(dtype).__name__
216                )
217            shape = (shape,) if isinstance(shape, int) else tuple(shape)
218            return cls(
219                os.fspath(filename), dtype, shape, block_size, mode, max_cached_blocks
220            )
221
222        return cls.from_config(
223            save_directory=filename,
224            runtime_kwargs={
225                "mode": mode,
226                "filename": os.fspath(filename),
227                "max_cached_blocks": max_cached_blocks,
228            },
229        )
230
231    @classmethod
232    def unlink(cls, filename):
233        filename = Path(filename).expanduser().resolve()
234        if filename.exists():
235            try:
236                with cls.open(filename, "w") as _:
237                    pass
238                os.unlink(filename / cls.config_name)
239                os.unlink(filename / cls.status_name)
240                os.rmdir(os.fspath(filename))
241            except Exception as exc:
242                warnings.warn("Failed to unlink blob at %r: %s" % (filename, exc))
243                return False
244        return True
245
246    @classmethod
247    def apply_param_hooks(cls, d):
248        d["shape"] = tuple(d["shape"])
249        return d
250
251    @classmethod
252    def _getsyscachesize(cls) -> int:
253        # Get default cache size for memory-mapped blocks. Returns 1/16 of system's
254        # max_map_count to be conservative, typically ~4000, leaving room for other
255        # VMAs in the process.
256        maxsize = 65536
257        try:
258            with open("/proc/sys/vm/max_map_count", "r") as f:
259                maxsize = int(f.read().strip())
260        except (FileNotFoundError, ValueError, PermissionError):
261            pass
262        return max(maxsize // 16, 128)
263
264    @register_to_config
265    def __init__(
266        self,
267        filename: str,
268        dtype: str,
269        shape: tuple[int, ...],
270        block_size: int,
271        mode: str,
272        max_cached_blocks: int | None = None,
273    ) -> None:
274        self.filename = filename
275        self.dtype = dtype
276        self.shape = shape
277        self.block_size = block_size
278        self.mode = mode
279        self.max_cached_blocks = max_cached_blocks or self._getsyscachesize()
280
281        self._pos = 0
282        self._closed = False
283
284        if "+" in mode:
285            self._m_rd = True
286            self._m_wr = True
287        match mode.replace("+", ""):
288            case "r":
289                self._m_rd = True
290            case "w":
291                self._m_wr = True
292                self._trunc()
293            case "a":
294                self._m_wr = True
295                self._m_ap = True
296                self._create()
297
298        self._loadstatus()
299
300    @property
301    def configpath(self) -> str:
302        return os.path.join(self.filename, self.config_name)
303
304    @property
305    def statuspath(self) -> str:
306        return os.path.join(self.filename, self.status_name)
307
308    @property
309    def closed(self) -> bool:
310        return self._closed
311
312    def __enter__(self) -> TensorBlob:
313        return self
314
315    def __exit__(self, *_) -> None:
316        self.close()
317
318    def __len__(self) -> int:
319        return self._status.len
320
321    def __getitem__(self, idx: int | slice) -> torch.Tensor:
322        if not isinstance(idx, (int, slice)):
323            raise TypeError("Index must be int or slice, got %r!" % type(idx).__name__)
324        if isinstance(idx, int):
325            if idx >= len(self) or idx < -len(self):
326                raise IndexError(
327                    "Index out of bounds: %r (length: %d)" % (idx, len(self))
328                )
329            i, o = divmod(idx + len(self) if idx < 0 else idx, self.block_size)
330            return self._getblock(i)[o].clone()
331
332        # Although the current implementation may not be efficient, it is very easy to
333        # understand and debug. More efficient implementation requires much more complex
334        # edge case handling and is error prone. Also, I think the primary cost here is
335        # still the I/O operations, not the Python code.
336        ret = [
337            self._getblock(bd)[[i % self.block_size for i in _is]]
338            for bd, _is in groupby(
339                range(*idx.indices(len(self))), key=lambda i: i // self.block_size
340            )
341        ]
342        if not ret:
343            return torch.empty(0, *self.shape, dtype=getattr(torch, self.dtype))
344        return torch.cat(ret, dim=0)
345
346    def __iter__(self) -> Iterator[torch.Tensor]:
347        for i in range(self._pos, len(self)):
348            self._pos += 1
349            yield self[i]
350
351    def _trunc(self) -> None:
352        if os.path.exists(self.filename):
353            try:
354                st = TensorBlobStatus.load(self.statuspath)
355            except FileNotFoundError as exc:
356                raise FileNotFoundError(
357                    "Status file missing for blob at %r; file corrupted!"
358                    % self.statuspath
359                ) from exc
360            for bd in st.bds:
361                os.remove(os.path.join(self.filename, bd))
362        self.save_config(save_directory=self.filename, overwrite=True)
363        TensorBlobStatus().dump(self.statuspath)
364
365    def _create(self) -> None:
366        if not os.path.exists(self.filename):
367            self.save_config(save_directory=self.filename)
368            TensorBlobStatus().dump(self.statuspath)
369
370    def _getblock(self, bd: str | int = -1) -> MemoryMappedTensor:
371        if not self._status.bds:
372            self._addblock()
373        if isinstance(bd, int):
374            bd = self._status.bds[bd]
375        if bd in self._memmap:
376            return self._memmap[bd]
377
378        # If cache no hit, a block is lazy-loaded into the cache. We need to
379        # avoid the __getitem__ call during return here to not increase the
380        # cache hit count a second time.
381        block = self._memmap[bd] = MemoryMappedTensor.from_filename(
382            os.path.join(self.filename, bd),
383            dtype=getattr(torch, self.dtype),
384            shape=(self.block_size, *self.shape),
385        )
386        return block
387
388    def _isfull(self) -> bool:
389        return (not len(self) % self.block_size) and bool(len(self))
390
391    def _addblock(self) -> MemoryMappedTensor:
392        if self._status.bds and not self._isfull():
393            raise RuntimeError(
394                "Attempt to create a new block when working block "
395                "is not full: length <%d> < capacity <%d>."
396                % (len(self) % self.block_size, self.block_size)
397            )
398        name = str(uuid.uuid4())
399        mmap = MemoryMappedTensor.empty(
400            self.block_size,
401            *self.shape,
402            dtype=getattr(torch, self.dtype),
403            filename=os.path.join(self.filename, name),
404        )
405        self._status.bds.append(name)
406        self._memmap[name] = mmap
407        return mmap
408
409    def _loadstatus(self) -> None:
410        try:
411            self._status = TensorBlobStatus.load(self.statuspath)
412            self._memmap = LRUCache(maxsize=self.max_cached_blocks)
413            if self._m_ap:
414                self._pos = len(self)
415        except FileNotFoundError as exc:
416            raise FileNotFoundError(
417                "status file missing for blob at %r; file corrupted!" % self.statuspath
418            ) from exc
419
420    def _checkclosed(self) -> None:
421        if self._closed:
422            raise IOError("I/O operation on closed blob.")
423
424    def _checkwritable(self) -> None:
425        if not self._m_wr:
426            raise IOError("Blob is not open for writing (mode='%s')" % self.mode)
427        self._checkclosed()
428
429    def _checkreadable(self) -> None:
430        if not self._m_rd:
431            raise IOError("Blob is not open for reading (mode='%s')" % self.mode)
432        self._checkclosed()
433
434    def tell(self) -> int:
435        self._checkclosed()
436        return self._pos
437
438    def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int:
439        self._checkclosed()
440        match whence:
441            case io.SEEK_SET:
442                _pos = pos
443            case io.SEEK_CUR:
444                _pos = self._pos + pos
445            case io.SEEK_END:
446                _pos = len(self) + pos
447            case _:
448                raise ValueError("Invalid whence: %r" % whence)
449        self._pos = max(min(_pos, len(self)), 0)
450        return self.tell()
451
452    def close(self) -> None:
453        if not self._closed and self._m_wr:
454            self.flush()
455        self._closed = True
456
457    def flush(self) -> None:
458        self._checkwritable()
459        self._status.dump(self.statuspath)
460
461    def read(self, size: int | None = None) -> torch.Tensor:
462        self._checkreadable()
463        end = min(self._pos + (size or len(self)), len(self))
464        ret = self[self._pos : end]
465        self.seek(end)
466        return ret
467
468    def write(self, ts: torch.Tensor) -> int:
469        self._checkwritable()
470        if self._m_ap:
471            self.seek(whence=io.SEEK_END)
472        ts = ts.view(-1, *self.shape)
473        nt = ts.size(0)
474
475        cnt = 0
476        while cnt < nt:
477            if self._isfull() and self._pos >= len(self):
478                self._addblock()
479            i, o = divmod(self._pos, self.block_size)
480            incr = min(self.block_size - o, nt - cnt)
481            self._getblock(i)[o : o + incr] = ts[cnt : cnt + incr]
482
483            # Update status length for new tensors exceeding the original range only, because
484            # the cursor may not always be the at EOF and the number of tensors written could
485            # be smaller than change in length
486            self._pos += incr
487            self._status.len += max(0, self._pos - len(self))
488
489            cnt += incr
490
491        assert cnt == nt, "Write incomplete: wrote %d of %d tensors!" % (cnt, nt)
492        return cnt
493
494    def truncate(self, pos: int | None = None) -> int:
495        self._checkwritable()
496        self.seek(pos or self.tell())
497        brk = ceil(self.tell() / self.block_size)
498        for bd in self._status.bds[brk:]:
499            if bd in self._memmap:
500                del self._memmap[bd]
501            os.remove(os.path.join(self.filename, bd))
502        self._status.bds = self._status.bds[:brk]
503        self._status.len = self.tell()
504        self.flush()
505        return self.tell()
506
507    def extend(self, other: TensorBlob, maintain_order: bool = False) -> None:
508        if self.dtype != other.dtype or self.shape != other.shape:
509            raise ValueError("Blob data types and shapes must match to extend blobs!")
510
511        self._checkwritable()
512        self.seek(whence=io.SEEK_END)
513
514        # TODO: Honestly this is a bit inefficient but I think this is rarely used.
515        if maintain_order:
516            for i in range(len(other)):
517                self.write(other[i])
518            return
519
520        # If order is not important, we can simply copy over the complete blocks from
521        # the other blob and merge incomplete blocks.
522        if self.block_size != other.block_size:
523            raise ValueError(
524                "Block sizes must match to extend blobs in non-order-preserving mode!"
525            )
526
527        comb = []
528        sbrk = len(self) // self.block_size * self.block_size
529        if sbrk < len(self):
530            comb.append(self[sbrk:])
531        obrk = len(other) // other.block_size * other.block_size
532        if obrk < len(other):
533            comb.append(other[obrk:])
534
535        # TODO: We are directly accessing internal data structures of the other blob here.
536        self.truncate(sbrk)
537        for obd in other._status.bds[: len(other) // other.block_size]:
538            sbd = str(uuid.uuid4())
539            shutil.copy(
540                os.path.join(other.filename, obd), os.path.join(self.filename, sbd)
541            )
542            self._status.bds.append(sbd)
543            self._status.len += self.block_size
544            self._memmap[sbd] = MemoryMappedTensor.from_filename(
545                os.path.join(self.filename, sbd),
546                dtype=getattr(torch, self.dtype),
547                shape=(self.block_size, *self.shape),
548            )
549
550        self.seek(whence=io.SEEK_END)
551        if comb:
552            self.write(torch.cat(comb, dim=0))
553        self.flush()

Mixin class for automated configuration registration and IO.

Attributes
  • config_name (str, default=None): Class attribute that specifies the filename under which the config should be stored when calling save_config. Should be overridden by the subclass.
  • ignore_for_config (list[str], default=[]): Class attribute that specifies a list of attributes that should not be saved in the config. Should be overridden by the subclass.
Examples

In this example, we have a model with 3 arguments:

  • hidden_size: The hidden size of the model.
  • _num_layers: The number of layers in the model.
  • dropout: The dropout rate of the model.

Among the three arguments, the number of layers is implicitly ignored by the decorator because of the leading underscore; the dropout argument is explicitly based on the specification in ignore_for_config class variable. The hidden_size argument is registered to the config.

>>> class MyModel(ConfigMixin):
...     config_name = "my_model_config.json"
...     ignore_for_config = ["dropout"]
...
...     @register_to_config
...     def __init__(self, hidden_size: int = 768, _num_layers: int = 12, dropout: float = 0.1):
...         self.hidden_size = hidden_size
...         self.num_layers = _num_layers
...         self.dropout = dropout  # This will be ignored because of the specification in `ignore_for_config`
...
>>> model = MyModel(hidden_size=1024, _num_layers=20, dropout=0.2)
>>> model.config
mappingproxy({'__notes__': {'class_name': 'MyModel', 'using_default_values': [], 'args': (), 'kwargs': {}}, 'hidden_size': 1024})
>>> model.num_layers
20
>>> model.dropout
0.2
@register_to_config
TensorBlob( filename: str, dtype: str, shape: tuple[int, ...], block_size: int, mode: str, max_cached_blocks: int | None = None)
264    @register_to_config
265    def __init__(
266        self,
267        filename: str,
268        dtype: str,
269        shape: tuple[int, ...],
270        block_size: int,
271        mode: str,
272        max_cached_blocks: int | None = None,
273    ) -> None:
274        self.filename = filename
275        self.dtype = dtype
276        self.shape = shape
277        self.block_size = block_size
278        self.mode = mode
279        self.max_cached_blocks = max_cached_blocks or self._getsyscachesize()
280
281        self._pos = 0
282        self._closed = False
283
284        if "+" in mode:
285            self._m_rd = True
286            self._m_wr = True
287        match mode.replace("+", ""):
288            case "r":
289                self._m_rd = True
290            case "w":
291                self._m_wr = True
292                self._trunc()
293            case "a":
294                self._m_wr = True
295                self._m_ap = True
296                self._create()
297
298        self._loadstatus()
status_name = '.stat'
config_name = '.conf'
ignore_for_config = ['filename', 'mode', 'max_cached_blocks']
@classmethod
def open( cls, filename, mode='r', *, dtype=None, shape=None, block_size=8192, max_cached_blocks=None):
 47    @classmethod
 48    def open(
 49        cls,
 50        filename,
 51        mode="r",
 52        *,
 53        dtype=None,
 54        shape=None,
 55        block_size=8192,
 56        max_cached_blocks=None,
 57    ):
 58        r"""Open a TensorBlob with file-like interface for tensor storage.
 59
 60        TensorBlob provides persistent, memory-mapped storage for large collections
 61        of same-shaped tensors. It uses a block-based architecture where tensors are
 62        organized into fixed-size blocks for efficient I/O and memory management.
 63
 64        The blob is stored as a directory containing:
 65        - ``.conf``: Configuration file (dtype, shape, block_size)
 66        - ``.stat``: State file (length, block list)
 67        - Block files: UUID-named memory-mapped tensor files
 68
 69        Parameters
 70        ----------
 71        filename : str or Path
 72            Directory path for blob storage. Supports tilde expansion (~) and
 73            relative paths.
 74        mode : str, default="r"
 75            File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
 76        dtype : str or torch.dtype, optional
 77            Data type for tensors. Required for new blobs (modes 'w', 'w+').
 78        shape : tuple of int or int, optional
 79            Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
 80        block_size : int, default=8192
 81            Number of tensors per memory-mapped block file.
 82        max_cached_blocks : int, optional
 83            Maximum number of memory-mapped blocks to keep cached. When exceeded,
 84            least recently used blocks are unmapped. If None (default), uses 1/16
 85            of system's max_map_count limit (typically ~4000). This limits kernel
 86            VMA overhead for blobs with many blocks.
 87
 88        Returns
 89        -------
 90        TensorBlob
 91            Opened blob object. Use with context manager for automatic cleanup.
 92
 93        Raises
 94        ------
 95        FileNotFoundError
 96            If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
 97        ValueError
 98            If creating new blob without dtype or shape, or if mode is invalid.
 99        TypeError
100            If dtype is neither string nor torch.dtype.
101
102        Examples
103        --------
104        Creating a new blob and writing data:
105
106        >>> import torch
107        >>> from tensorblob import TensorBlob
108        >>>
109        >>> with TensorBlob.open("data/embeddings", "w",
110        ...                       dtype="float32", shape=(768,)) as blob:
111        ...     embeddings = torch.randn(1000, 768)
112        ...     blob.write(embeddings)
113        ...     print(f"Wrote {len(blob)} tensors")
114        Wrote 1000 tensors
115
116        Reading from existing blob:
117
118        >>> with TensorBlob.open("data/embeddings", "r") as blob:
119        ...     all_data = blob.read()
120        ...     print(all_data.shape)
121        torch.Size([1000, 768])
122
123        Appending to existing blob:
124
125        >>> with TensorBlob.open("data/embeddings", "a") as blob:
126        ...     new_data = torch.randn(100, 768)
127        ...     blob.write(new_data)
128        ...     print(f"Total: {len(blob)}")
129        Total: 1100
130
131        Read and update with r+ mode:
132
133        >>> with TensorBlob.open("data/embeddings", "r+") as blob:
134        ...     first_10 = blob.read(size=10)
135        ...     blob.seek(5)
136        ...     blob.write(torch.ones(3, 768))  # Overwrite at position 5
137
138        Custom block size for large tensors:
139
140        >>> with TensorBlob.open("data/images", "w",
141        ...                       dtype=torch.float32,
142        ...                       shape=(3, 1024, 1024),
143        ...                       block_size=256) as blob:
144        ...     images = torch.randn(1000, 3, 1024, 1024)
145        ...     blob.write(images)
146
147        Custom cache size for large-scale random access:
148
149        >>> # Increase cache for better random access performance
150        >>> with TensorBlob.open("data/embeddings", "r",
151        ...                       max_cached_blocks=10000) as blob:
152        ...     for idx in random_indices:
153        ...         embedding = blob[idx]  # Frequently accessed blocks stay cached
154
155        >>> # Decrease cache for memory-constrained environments
156        >>> with TensorBlob.open("data/features", "r",
157        ...                       max_cached_blocks=100) as blob:
158        ...     for feature in blob:  # Sequential access works fine
159        ...         process(feature)
160
161        File Access Modes
162        -----------------
163        Similar to Python's built-in open(), supports the following modes:
164
165        Basic modes:
166        - 'r'  : Read-only. Blob must exist. Position starts at beginning.
167        - 'w'  : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists,
168                   truncation will ignore any other parameters supplied and rely on existing configuration.**
169        - 'a'  : Append-only. Blob must exist. Position starts at end.
170                All writes go to end regardless of seek position.
171
172        Update modes (with '+'):
173        - 'r+' : Read and write. Blob must exist. Position at start.
174                   Can overwrite existing data or extend at end.
175        - 'w+' : Read and write. Creates new or truncates existing. Position at start.
176        - 'a+' : Read and append. Blob must exist. Position at end.
177                   Reads allowed anywhere, writes always append to end.
178
179        Data Type and Shape
180        -------------------
181        All tensors in a blob must have the same dtype and shape. These are
182        specified when creating a new blob (modes 'w', 'w+') and stored in
183        the configuration file. When opening existing blobs, dtype and shape
184        are loaded automatically.
185
186        Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc.
187        Can also use torch.dtype objects like torch.float32.
188
189        Shape can be:
190        - Single integer: shape=10 creates 1D tensors of shape (10,)
191        - Tuple: shape=(3, 224, 224) creates 3D tensors
192        """
193        modes = set(mode)
194        if modes - set("raw+") or len(mode) > len(modes):
195            raise ValueError("Invalid mode: %s" % mode)
196        if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1:
197            raise ValueError(
198                "Must have exactly one of read/write/append mode and at most one plus: %s"
199                % mode
200            )
201
202        filename = Path(filename).expanduser().resolve()
203        if not filename.exists():
204            if "r" in modes or "a" in modes:
205                raise FileNotFoundError("Blob not found: %r" % filename)
206            if dtype is None or shape is None:
207                raise ValueError(
208                    "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r"
209                    % (dtype, shape)
210                )
211            if isinstance(dtype, torch.dtype):
212                dtype = str(dtype).split(".").pop()
213            elif not isinstance(dtype, str):
214                raise TypeError(
215                    "dtype must be str or torch.dtype, got %r" % type(dtype).__name__
216                )
217            shape = (shape,) if isinstance(shape, int) else tuple(shape)
218            return cls(
219                os.fspath(filename), dtype, shape, block_size, mode, max_cached_blocks
220            )
221
222        return cls.from_config(
223            save_directory=filename,
224            runtime_kwargs={
225                "mode": mode,
226                "filename": os.fspath(filename),
227                "max_cached_blocks": max_cached_blocks,
228            },
229        )

Open a TensorBlob with file-like interface for tensor storage.

TensorBlob provides persistent, memory-mapped storage for large collections of same-shaped tensors. It uses a block-based architecture where tensors are organized into fixed-size blocks for efficient I/O and memory management.

The blob is stored as a directory containing:

  • .conf: Configuration file (dtype, shape, block_size)
  • .stat: State file (length, block list)
  • Block files: UUID-named memory-mapped tensor files
Parameters
  • filename (str or Path): Directory path for blob storage. Supports tilde expansion (~) and relative paths.
  • mode (str, default="r"): File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
  • dtype (str or torch.dtype, optional): Data type for tensors. Required for new blobs (modes 'w', 'w+').
  • shape (tuple of int or int, optional): Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
  • block_size (int, default=8192): Number of tensors per memory-mapped block file.
  • max_cached_blocks (int, optional): Maximum number of memory-mapped blocks to keep cached. When exceeded, least recently used blocks are unmapped. If None (default), uses 1/16 of system's max_map_count limit (typically ~4000). This limits kernel VMA overhead for blobs with many blocks.
Returns
  • TensorBlob: Opened blob object. Use with context manager for automatic cleanup.
Raises
  • FileNotFoundError: If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
  • ValueError: If creating new blob without dtype or shape, or if mode is invalid.
  • TypeError: If dtype is neither string nor torch.dtype.
Examples

Creating a new blob and writing data:

>>> import torch
>>> from tensorblob import TensorBlob
>>>
>>> with TensorBlob.open("data/embeddings", "w",
...                       dtype="float32", shape=(768,)) as blob:
...     embeddings = torch.randn(1000, 768)
...     blob.write(embeddings)
...     print(f"Wrote {len(blob)} tensors")
Wrote 1000 tensors

Reading from existing blob:

>>> with TensorBlob.open("data/embeddings", "r") as blob:
...     all_data = blob.read()
...     print(all_data.shape)
torch.Size([1000, 768])

Appending to existing blob:

>>> with TensorBlob.open("data/embeddings", "a") as blob:
...     new_data = torch.randn(100, 768)
...     blob.write(new_data)
...     print(f"Total: {len(blob)}")
Total: 1100

Read and update with r+ mode:

>>> with TensorBlob.open("data/embeddings", "r+") as blob:
...     first_10 = blob.read(size=10)
...     blob.seek(5)
...     blob.write(torch.ones(3, 768))  # Overwrite at position 5

Custom block size for large tensors:

>>> with TensorBlob.open("data/images", "w",
...                       dtype=torch.float32,
...                       shape=(3, 1024, 1024),
...                       block_size=256) as blob:
...     images = torch.randn(1000, 3, 1024, 1024)
...     blob.write(images)

Custom cache size for large-scale random access:

>>> # Increase cache for better random access performance
>>> with TensorBlob.open("data/embeddings", "r",
...                       max_cached_blocks=10000) as blob:
...     for idx in random_indices:
...         embedding = blob[idx]  # Frequently accessed blocks stay cached
>>> # Decrease cache for memory-constrained environments
>>> with TensorBlob.open("data/features", "r",
...                       max_cached_blocks=100) as blob:
...     for feature in blob:  # Sequential access works fine
...         process(feature)
File Access Modes

Similar to Python's built-in open(), supports the following modes:

Basic modes:

  • 'r' : Read-only. Blob must exist. Position starts at beginning.
  • 'w' : Write-only. Creates new or truncates existing. Position at start. If the blob already exists, truncation will ignore any other parameters supplied and rely on existing configuration.
  • 'a' : Append-only. Blob must exist. Position starts at end. All writes go to end regardless of seek position.

Update modes (with '+'):

  • 'r+' : Read and write. Blob must exist. Position at start. Can overwrite existing data or extend at end.
  • 'w+' : Read and write. Creates new or truncates existing. Position at start.
  • 'a+' : Read and append. Blob must exist. Position at end. Reads allowed anywhere, writes always append to end.
Data Type and Shape

All tensors in a blob must have the same dtype and shape. These are specified when creating a new blob (modes 'w', 'w+') and stored in the configuration file. When opening existing blobs, dtype and shape are loaded automatically.

Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. Can also use torch.dtype objects like torch.float32.

Shape can be:

  • Single integer: shape=10 creates 1D tensors of shape (10,)
  • Tuple: shape=(3, 224, 224) creates 3D tensors
@classmethod
def apply_param_hooks(cls, d):
246    @classmethod
247    def apply_param_hooks(cls, d):
248        d["shape"] = tuple(d["shape"])
249        return d

Apply post-processing hooks to the JSON dictionary.

orjson.loads only decode configs to primitive types, which may not be directly consumable by the class initializer. For instance, a dataclass object will be loaded as a dictionary. Therefore, this method is intended to be overridden by the subclass to perform additional post-processing on the loaded config dictionary.

Note that, it is highly discouraged to abuse this method to deserialize complex objects and one should consider using runtime_kwargs argument of from_config instead, to explicitly pass the complex objects to the class initializer.

By default, this method returns the input dictionary unchanged.

Parameters
  • jdict (dict[str, Any]): The config dictionary after deserialization.
Returns
  • dict[str, Any]: The config dictionary after post-processing.
filename
dtype
shape
block_size
mode
max_cached_blocks
configpath: str
300    @property
301    def configpath(self) -> str:
302        return os.path.join(self.filename, self.config_name)
statuspath: str
304    @property
305    def statuspath(self) -> str:
306        return os.path.join(self.filename, self.status_name)
closed: bool
308    @property
309    def closed(self) -> bool:
310        return self._closed
def tell(self) -> int:
434    def tell(self) -> int:
435        self._checkclosed()
436        return self._pos
def seek(self, pos: int = 0, whence: int = 0) -> int:
438    def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int:
439        self._checkclosed()
440        match whence:
441            case io.SEEK_SET:
442                _pos = pos
443            case io.SEEK_CUR:
444                _pos = self._pos + pos
445            case io.SEEK_END:
446                _pos = len(self) + pos
447            case _:
448                raise ValueError("Invalid whence: %r" % whence)
449        self._pos = max(min(_pos, len(self)), 0)
450        return self.tell()
def close(self) -> None:
452    def close(self) -> None:
453        if not self._closed and self._m_wr:
454            self.flush()
455        self._closed = True
def flush(self) -> None:
457    def flush(self) -> None:
458        self._checkwritable()
459        self._status.dump(self.statuspath)
def read(self, size: int | None = None) -> torch.Tensor:
461    def read(self, size: int | None = None) -> torch.Tensor:
462        self._checkreadable()
463        end = min(self._pos + (size or len(self)), len(self))
464        ret = self[self._pos : end]
465        self.seek(end)
466        return ret
def write(self, ts: torch.Tensor) -> int:
468    def write(self, ts: torch.Tensor) -> int:
469        self._checkwritable()
470        if self._m_ap:
471            self.seek(whence=io.SEEK_END)
472        ts = ts.view(-1, *self.shape)
473        nt = ts.size(0)
474
475        cnt = 0
476        while cnt < nt:
477            if self._isfull() and self._pos >= len(self):
478                self._addblock()
479            i, o = divmod(self._pos, self.block_size)
480            incr = min(self.block_size - o, nt - cnt)
481            self._getblock(i)[o : o + incr] = ts[cnt : cnt + incr]
482
483            # Update status length for new tensors exceeding the original range only, because
484            # the cursor may not always be the at EOF and the number of tensors written could
485            # be smaller than change in length
486            self._pos += incr
487            self._status.len += max(0, self._pos - len(self))
488
489            cnt += incr
490
491        assert cnt == nt, "Write incomplete: wrote %d of %d tensors!" % (cnt, nt)
492        return cnt
def truncate(self, pos: int | None = None) -> int:
494    def truncate(self, pos: int | None = None) -> int:
495        self._checkwritable()
496        self.seek(pos or self.tell())
497        brk = ceil(self.tell() / self.block_size)
498        for bd in self._status.bds[brk:]:
499            if bd in self._memmap:
500                del self._memmap[bd]
501            os.remove(os.path.join(self.filename, bd))
502        self._status.bds = self._status.bds[:brk]
503        self._status.len = self.tell()
504        self.flush()
505        return self.tell()
def extend( self, other: TensorBlob, maintain_order: bool = False) -> None:
507    def extend(self, other: TensorBlob, maintain_order: bool = False) -> None:
508        if self.dtype != other.dtype or self.shape != other.shape:
509            raise ValueError("Blob data types and shapes must match to extend blobs!")
510
511        self._checkwritable()
512        self.seek(whence=io.SEEK_END)
513
514        # TODO: Honestly this is a bit inefficient but I think this is rarely used.
515        if maintain_order:
516            for i in range(len(other)):
517                self.write(other[i])
518            return
519
520        # If order is not important, we can simply copy over the complete blocks from
521        # the other blob and merge incomplete blocks.
522        if self.block_size != other.block_size:
523            raise ValueError(
524                "Block sizes must match to extend blobs in non-order-preserving mode!"
525            )
526
527        comb = []
528        sbrk = len(self) // self.block_size * self.block_size
529        if sbrk < len(self):
530            comb.append(self[sbrk:])
531        obrk = len(other) // other.block_size * other.block_size
532        if obrk < len(other):
533            comb.append(other[obrk:])
534
535        # TODO: We are directly accessing internal data structures of the other blob here.
536        self.truncate(sbrk)
537        for obd in other._status.bds[: len(other) // other.block_size]:
538            sbd = str(uuid.uuid4())
539            shutil.copy(
540                os.path.join(other.filename, obd), os.path.join(self.filename, sbd)
541            )
542            self._status.bds.append(sbd)
543            self._status.len += self.block_size
544            self._memmap[sbd] = MemoryMappedTensor.from_filename(
545                os.path.join(self.filename, sbd),
546                dtype=getattr(torch, self.dtype),
547                shape=(self.block_size, *self.shape),
548            )
549
550        self.seek(whence=io.SEEK_END)
551        if comb:
552            self.write(torch.cat(comb, dim=0))
553        self.flush()