tensorblob

Python 3.10 License: Apache 2.0 test codecov PyPI

tensorblob

A lightweight, dynamic-sized, memory-mapped tensor storage with file-like APIs, while also supporting integer indexing and slicing, built with MemoryMappedTensor from tensordict.

Features

  • 🔗 Memory-mapped storage: Efficient storage of large collections of same-shaped tensors
  • 💾 File-like APIs: Read, write, and seek like a file, while also supporting integer indexing and slicing
  • âš¡ Dynamic-sized: No need to specify the total number of tensors upfront
  • 🔄 Extend and truncate: Extend the blob with another blob or truncate the blob to a specific position

Installation

From PyPI:

pip install tensorblob

If you are interested in the experimental (i.e., unstable and undertested) version, you can install it from GitHub:

pip install git+https://github.com/Guest400123064/tensorblob.git

Core Use Cases

Quick Start

The example below shows how to create a new storage for a collection of randomly generated fake embeddings, and how to access them by index. Since the storage is memory-mapped, no need to read all tensors into memory; just access them by index.

import torch
from tensorblob import TensorBlob

# Create a new storage for a collection of randomly generated fake embeddings;
# need to specify the data type and shape of each tensor for creation
with TensorBlob.open("embeddings.blob", "w", dtype="float32", shape=768) as blob:
    blob.write(torch.randn(100_000, 768))
    print(f"Wrote {len(blob)} embeddings")

# No need to specify the configurations again after creation
with TensorBlob.open("embeddings.blob", "r") as blob:
    e1 = blob[42]
    e2 = blob[-1:16384:-12345]
    print(f"Similarity: {torch.cosine_similarity(e1, e2)}")

Processing Large Datasets

Store and preprocess datasets larger than RAM using memory mapping can be useful to accelerate the training process by reducing the time spent on data loading and transformation.

with TensorBlob.open("data/images.blob", "w", dtype="float32", shape=(3, 224, 224)) as blob:
    for image_batch in data_loader:
        blob.write(preprocess(image_batch))

with TensorBlob.open("data/images.blob", "r") as blob:
    for image in blob:
        result = model(image)

Incremental Data Collection

Append new data to existing blobs can be useful with streaming data collection.

with TensorBlob.open("positions.blob", "w", dtype="float32", shape=3) as blob:
    blob.write(initial_position)

# Later: append more data by opening the blob in append mode
with TensorBlob.open("positions.blob", "a") as blob:
    for pos in trajectory_queue.get():
        blob.write(pos)
    print(f"Total trajectory recorded: {len(blob)}")

Random Access and Updates with File-Like APIs

Read and modify specific tensors starting from a specific position.

import io

with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.seek(1000)
    print(f"Current position: {blob.tell()}")

    batch = blob.read(size=100)
    print(f"Read {batch.shape} tensors")

    # Update specific positions, whence is also supported
    blob.seek(-500, whence=io.SEEK_END)
    blob.write(updated_features)

    # Append new data
    blob.seek(len(blob))
    blob.write(additional_features)

Extend and Truncate

Extend the blob with another blob or truncate the blob to a specific position. Extension could be useful if we want to merge two blobs into one, e.g., results from two different processes. Note that extension operation does not delete the original data.

with TensorBlob.open("data/features.blob", "a") as blob:
    blob.extend(other_blob)

# Extension without maintaining the order is faster
with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.extend(other_blob, maintain_order=False)

with TensorBlob.open("data/features.blob", "r+") as blob:
    blob.truncate(1000)
    print(f"Truncated to {len(blob)} tensors")

Contributing

Contributions welcome! Please submit a Pull Request.

License

Apache License 2.0 - see LICENSE file for details.

 1"""
 2.. include:: ../../README.md
 3"""
 4
 5from ._blob import TensorBlob
 6
 7__version__ = "0.1.0"
 8
 9__all__ = [
10    "TensorBlob",
11]
class TensorBlob(configmixin._core.ConfigMixin):
 35class TensorBlob(ConfigMixin):
 36    _m_rd = False
 37    _m_wr = False
 38    _m_ap = False
 39
 40    status_name = ".stat"
 41    config_name = ".conf"
 42    ignore_for_config = ["filename", "mode"]
 43
 44    @classmethod
 45    def open(cls, filename, mode="r", *, dtype=None, shape=None, block_size=8192):
 46        r"""Open a TensorBlob with file-like interface for tensor storage.
 47
 48        TensorBlob provides persistent, memory-mapped storage for large collections
 49        of same-shaped tensors. It uses a block-based architecture where tensors are
 50        organized into fixed-size blocks for efficient I/O and memory management.
 51
 52        The blob is stored as a directory containing:
 53        - ``.conf``: Configuration file (dtype, shape, block_size)
 54        - ``.stat``: State file (length, block list)
 55        - Block files: UUID-named memory-mapped tensor files
 56
 57        Parameters
 58        ----------
 59        filename : str or Path
 60            Directory path for blob storage. Supports tilde expansion (~) and
 61            relative paths.
 62        mode : str, default="r"
 63            File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
 64        dtype : str or torch.dtype, optional
 65            Data type for tensors. Required for new blobs (modes 'w', 'w+').
 66        shape : tuple of int or int, optional
 67            Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
 68        block_size : int, default=8192
 69            Number of tensors per memory-mapped block file.
 70
 71        Returns
 72        -------
 73        TensorBlob
 74            Opened blob object. Use with context manager for automatic cleanup.
 75
 76        Raises
 77        ------
 78        FileNotFoundError
 79            If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
 80        ValueError
 81            If creating new blob without dtype or shape, or if mode is invalid.
 82        TypeError
 83            If dtype is neither string nor torch.dtype.
 84
 85        Examples
 86        --------
 87        Creating a new blob and writing data:
 88
 89        >>> import torch
 90        >>> from tensorblob import TensorBlob
 91        >>>
 92        >>> with TensorBlob.open("data/embeddings", "w",
 93        ...                       dtype="float32", shape=(768,)) as blob:
 94        ...     embeddings = torch.randn(1000, 768)
 95        ...     blob.write(embeddings)
 96        ...     print(f"Wrote {len(blob)} tensors")
 97        Wrote 1000 tensors
 98
 99        Reading from existing blob:
100
101        >>> with TensorBlob.open("data/embeddings", "r") as blob:
102        ...     all_data = blob.read()
103        ...     print(all_data.shape)
104        torch.Size([1000, 768])
105
106        Appending to existing blob:
107
108        >>> with TensorBlob.open("data/embeddings", "a") as blob:
109        ...     new_data = torch.randn(100, 768)
110        ...     blob.write(new_data)
111        ...     print(f"Total: {len(blob)}")
112        Total: 1100
113
114        Read and update with r+ mode:
115
116        >>> with TensorBlob.open("data/embeddings", "r+") as blob:
117        ...     first_10 = blob.read(size=10)
118        ...     blob.seek(5)
119        ...     blob.write(torch.ones(3, 768))  # Overwrite at position 5
120
121        Custom block size for large tensors:
122
123        >>> with TensorBlob.open("data/images", "w",
124        ...                       dtype=torch.float32,
125        ...                       shape=(3, 1024, 1024),
126        ...                       block_size=256) as blob:
127        ...     images = torch.randn(1000, 3, 1024, 1024)
128        ...     blob.write(images)
129
130        File Access Modes
131        -----------------
132        Similar to Python's built-in open(), supports the following modes:
133
134        Basic modes:
135        - 'r'  : Read-only. Blob must exist. Position starts at beginning.
136        - 'w'  : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists,
137                   truncation will ignore any other parameters supplied and rely on existing configuration.**
138        - 'a'  : Append-only. Blob must exist. Position starts at end.
139                All writes go to end regardless of seek position.
140
141        Update modes (with '+'):
142        - 'r+' : Read and write. Blob must exist. Position at start.
143                   Can overwrite existing data or extend at end.
144        - 'w+' : Read and write. Creates new or truncates existing. Position at start.
145        - 'a+' : Read and append. Blob must exist. Position at end.
146                   Reads allowed anywhere, writes always append to end.
147
148        Data Type and Shape
149        -------------------
150        All tensors in a blob must have the same dtype and shape. These are
151        specified when creating a new blob (modes 'w', 'w+') and stored in
152        the configuration file. When opening existing blobs, dtype and shape
153        are loaded automatically.
154
155        Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc.
156        Can also use torch.dtype objects like torch.float32.
157
158        Shape can be:
159        - Single integer: shape=10 creates 1D tensors of shape (10,)
160        - Tuple: shape=(3, 224, 224) creates 3D tensors
161        """
162        modes = set(mode)
163        if modes - set("raw+") or len(mode) > len(modes):
164            raise ValueError("Invalid mode: %s" % mode)
165        if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1:
166            raise ValueError(
167                "Must have exactly one of read/write/append mode and at most one plus: %s"
168                % mode
169            )
170
171        filename = Path(filename).expanduser().resolve()
172        if not filename.exists():
173            if "r" in modes or "a" in modes:
174                raise FileNotFoundError("Blob not found: %r" % filename)
175            if dtype is None or shape is None:
176                raise ValueError(
177                    "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r"
178                    % (dtype, shape)
179                )
180            if isinstance(dtype, torch.dtype):
181                dtype = str(dtype).split(".").pop()
182            elif not isinstance(dtype, str):
183                raise TypeError(
184                    "dtype must be str or torch.dtype, got %r" % type(dtype).__name__
185                )
186            shape = (shape,) if isinstance(shape, int) else tuple(shape)
187            return cls(os.fspath(filename), dtype, shape, block_size, mode)
188
189        return cls.from_config(
190            save_directory=filename,
191            runtime_kwargs={"mode": mode, "filename": os.fspath(filename)},
192        )
193
194    @classmethod
195    def apply_param_hooks(cls, d):
196        d["shape"] = tuple(d["shape"])
197        return d
198
199    @register_to_config
200    def __init__(
201        self,
202        filename: str,
203        dtype: str,
204        shape: tuple[int, ...],
205        block_size: int,
206        mode: str,
207    ) -> None:
208        self.filename = filename
209        self.dtype = dtype
210        self.shape = shape
211        self.block_size = block_size
212        self.mode = mode
213
214        self._pos = 0
215        self._closed = False
216
217        if "+" in mode:
218            self._m_rd = True
219            self._m_wr = True
220        match mode.replace("+", ""):
221            case "r":
222                self._m_rd = True
223            case "w":
224                self._m_wr = True
225                self._trunc()
226            case "a":
227                self._m_wr = True
228                self._m_ap = True
229                self._create()
230
231        self._loadstatus()
232
233    @property
234    def configpath(self) -> str:
235        return os.path.join(self.filename, self.config_name)
236
237    @property
238    def statuspath(self) -> str:
239        return os.path.join(self.filename, self.status_name)
240
241    @property
242    def closed(self) -> bool:
243        return self._closed
244
245    def __enter__(self) -> TensorBlob:
246        return self
247
248    def __exit__(self, *_) -> None:
249        self.close()
250
251    def __len__(self) -> int:
252        return self._status.len
253
254    def __getitem__(self, idx: int | slice) -> torch.Tensor:
255        if not isinstance(idx, (int, slice)):
256            raise TypeError("Index must be int or slice, got %r!" % type(idx).__name__)
257        if isinstance(idx, int):
258            if idx >= len(self) or idx < -len(self):
259                raise IndexError(
260                    "Index out of bounds: %r (length: %d)" % (idx, len(self))
261                )
262            i, o = divmod(idx + len(self) if idx < 0 else idx, self.block_size)
263            return self._getblock(i)[o].clone()
264
265        # Although the current implementation may not be efficient, it is very easy to
266        # understand and debug. More efficient implementation requires much more complex
267        # edge case handling and is error prone. Also, I think the primary cost here is
268        # still the I/O operations, not the Python code.
269        ret = [
270            self._getblock(bd)[[i % self.block_size for i in _is]]
271            for bd, _is in groupby(
272                range(*idx.indices(len(self))), key=lambda i: i // self.block_size
273            )
274        ]
275        if not ret:
276            return torch.empty(0, *self.shape, dtype=getattr(torch, self.dtype))
277        return torch.cat(ret, dim=0)
278
279    def __iter__(self) -> Iterator[torch.Tensor]:
280        for i in range(self._pos, len(self)):
281            self._pos += 1
282            yield self[i]
283
284    def _trunc(self) -> None:
285        if os.path.exists(self.filename):
286            try:
287                st = TensorBlobStatus.load(self.statuspath)
288            except FileNotFoundError as exc:
289                raise FileNotFoundError(
290                    "Status file missing for blob at %r; file corrupted!"
291                    % self.statuspath
292                ) from exc
293            for bd in st.bds:
294                os.remove(os.path.join(self.filename, bd))
295        self.save_config(save_directory=self.filename, overwrite=True)
296        TensorBlobStatus().dump(self.statuspath)
297
298    def _create(self) -> None:
299        if not os.path.exists(self.filename):
300            self.save_config(save_directory=self.filename)
301            TensorBlobStatus().dump(self.statuspath)
302
303    def _getblock(self, bd: str | int = -1) -> MemoryMappedTensor:
304        if not self._status.bds:
305            self._addblock()
306        if isinstance(bd, int):
307            bd = self._status.bds[bd]
308        return self._memmap[bd]
309
310    def _isfull(self) -> bool:
311        return (not len(self) % self.block_size) and bool(len(self))
312
313    def _addblock(self) -> MemoryMappedTensor:
314        if self._status.bds and not self._isfull():
315            raise RuntimeError(
316                "Attempt to create a new block when working block "
317                "is not full: length <%d> < capacity <%d>."
318                % (len(self) % self.block_size, self.block_size)
319            )
320        name = str(uuid.uuid4())
321        mmap = MemoryMappedTensor.empty(
322            self.block_size,
323            *self.shape,
324            dtype=getattr(torch, self.dtype),
325            filename=os.path.join(self.filename, name),
326        )
327        self._status.bds.append(name)
328        self._memmap[name] = mmap
329        return mmap
330
331    def _loadstatus(self) -> None:
332        try:
333            self._status = TensorBlobStatus.load(self.statuspath)
334            self._memmap = {
335                name: MemoryMappedTensor.from_filename(
336                    os.path.join(self.filename, name),
337                    dtype=getattr(torch, self.dtype),
338                    shape=(self.block_size, *self.shape),
339                )
340                for name in self._status.bds
341            }
342            if self._m_ap:
343                self._pos = len(self)
344        except FileNotFoundError as exc:
345            raise FileNotFoundError(
346                "status file missing for blob at %r; file corrupted!" % self.statuspath
347            ) from exc
348
349    def _checkclosed(self) -> None:
350        if self._closed:
351            raise IOError("I/O operation on closed blob.")
352
353    def _checkwritable(self) -> None:
354        if not self._m_wr:
355            raise IOError("Blob is not open for writing (mode='%s')" % self.mode)
356        self._checkclosed()
357
358    def _checkreadable(self) -> None:
359        if not self._m_rd:
360            raise IOError("Blob is not open for reading (mode='%s')" % self.mode)
361        self._checkclosed()
362
363    def tell(self) -> int:
364        self._checkclosed()
365        return self._pos
366
367    def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int:
368        self._checkclosed()
369        match whence:
370            case io.SEEK_SET:
371                _pos = pos
372            case io.SEEK_CUR:
373                _pos = self._pos + pos
374            case io.SEEK_END:
375                _pos = len(self) + pos
376            case _:
377                raise ValueError("Invalid whence: %r" % whence)
378        self._pos = max(min(_pos, len(self)), 0)
379        return self.tell()
380
381    def close(self) -> None:
382        if not self._closed and self._m_wr:
383            self.flush()
384        self._closed = True
385
386    def flush(self) -> None:
387        self._checkwritable()
388        self._status.dump(self.statuspath)
389
390    def read(self, size: int | None = None) -> torch.Tensor:
391        self._checkreadable()
392        end = min(self._pos + (size or len(self)), len(self))
393        ret = self[self._pos : end]
394        self.seek(end)
395        return ret
396
397    def write(self, ts: torch.Tensor) -> int:
398        self._checkwritable()
399        if self._m_ap:
400            self.seek(whence=io.SEEK_END)
401        ts = ts.view(-1, *self.shape)
402        for t in ts:
403            if self._isfull() and self._pos >= len(self):
404                self._addblock()
405            i, o = divmod(self._pos, self.block_size)
406            self._getblock(i)[o] = t
407            self._status.len += self._pos >= len(self)
408            self._pos += 1
409        return len(ts)
410
411    def truncate(self, pos: int | None = None) -> int:
412        self._checkwritable()
413        self.seek(pos or self.tell())
414        brk = ceil(self.tell() / self.block_size)
415        for bd in self._status.bds[brk:]:
416            os.remove(self._memmap.pop(bd).filename)
417        self._status.bds = self._status.bds[:brk]
418        self._status.len = self.tell()
419        self.flush()
420        return self.tell()
421
422    def extend(self, other: TensorBlob, maintain_order: bool = False) -> None:
423        if self.dtype != other.dtype or self.shape != other.shape:
424            raise ValueError("Blob data types and shapes must match to extend blobs!")
425
426        self._checkwritable()
427        self.seek(whence=io.SEEK_END)
428        if maintain_order:
429            for i in range(len(other)):
430                self.write(other[i])
431            return
432
433        # If order is not important, we can simply copy over the complete blocks from
434        # the other blob and merge incomplete blocks.
435        if self.block_size != other.block_size:
436            raise ValueError(
437                "Block sizes must match to extend blobs in non-order-preserving mode!"
438            )
439
440        comb = []
441        sbrk = len(self) // self.block_size * self.block_size
442        if sbrk < len(self):
443            comb.append(self[sbrk:])
444        obrk = len(other) // other.block_size * other.block_size
445        if obrk < len(other):
446            comb.append(other[obrk:])
447
448        # TODO: We are directly accessing internal data structures of the other blob here.
449        self.truncate(sbrk)
450        for obd in other._status.bds[: len(other) // other.block_size]:
451            sbd = str(uuid.uuid4())
452            shutil.copy(
453                os.path.join(other.filename, obd), os.path.join(self.filename, sbd)
454            )
455            self._status.bds.append(sbd)
456            self._status.len += self.block_size
457            self._memmap[sbd] = MemoryMappedTensor.from_filename(
458                os.path.join(self.filename, sbd),
459                dtype=getattr(torch, self.dtype),
460                shape=(self.block_size, *self.shape),
461            )
462
463        self.seek(whence=io.SEEK_END)
464        if comb:
465            self.write(torch.cat(comb, dim=0))
466        self.flush()

Mixin class for automated configuration registration and IO.

Attributes
  • config_name (str, default=None): Class attribute that specifies the filename under which the config should be stored when calling save_config. Should be overridden by the subclass.
  • ignore_for_config (list[str], default=[]): Class attribute that specifies a list of attributes that should not be saved in the config. Should be overridden by the subclass.
Examples

In this example, we have a model with 3 arguments:

  • hidden_size: The hidden size of the model.
  • _num_layers: The number of layers in the model.
  • dropout: The dropout rate of the model.

Among the three arguments, the number of layers is implicitly ignored by the decorator because of the leading underscore; the dropout argument is explicitly based on the specification in ignore_for_config class variable. The hidden_size argument is registered to the config.

>>> class MyModel(ConfigMixin):
...     config_name = "my_model_config.json"
...     ignore_for_config = ["dropout"]
...
...     @register_to_config
...     def __init__(self, hidden_size: int = 768, _num_layers: int = 12, dropout: float = 0.1):
...         self.hidden_size = hidden_size
...         self.num_layers = _num_layers
...         self.dropout = dropout  # This will be ignored because of the specification in `ignore_for_config`
...
>>> model = MyModel(hidden_size=1024, _num_layers=20, dropout=0.2)
>>> model.config
mappingproxy({'__notes__': {'class_name': 'MyModel', 'using_default_values': [], 'args': (), 'kwargs': {}}, 'hidden_size': 1024})
>>> model.num_layers
20
>>> model.dropout
0.2
@register_to_config
TensorBlob( filename: str, dtype: str, shape: tuple[int, ...], block_size: int, mode: str)
199    @register_to_config
200    def __init__(
201        self,
202        filename: str,
203        dtype: str,
204        shape: tuple[int, ...],
205        block_size: int,
206        mode: str,
207    ) -> None:
208        self.filename = filename
209        self.dtype = dtype
210        self.shape = shape
211        self.block_size = block_size
212        self.mode = mode
213
214        self._pos = 0
215        self._closed = False
216
217        if "+" in mode:
218            self._m_rd = True
219            self._m_wr = True
220        match mode.replace("+", ""):
221            case "r":
222                self._m_rd = True
223            case "w":
224                self._m_wr = True
225                self._trunc()
226            case "a":
227                self._m_wr = True
228                self._m_ap = True
229                self._create()
230
231        self._loadstatus()
status_name = '.stat'
config_name = '.conf'
ignore_for_config = ['filename', 'mode']
@classmethod
def open(cls, filename, mode='r', *, dtype=None, shape=None, block_size=8192):
 44    @classmethod
 45    def open(cls, filename, mode="r", *, dtype=None, shape=None, block_size=8192):
 46        r"""Open a TensorBlob with file-like interface for tensor storage.
 47
 48        TensorBlob provides persistent, memory-mapped storage for large collections
 49        of same-shaped tensors. It uses a block-based architecture where tensors are
 50        organized into fixed-size blocks for efficient I/O and memory management.
 51
 52        The blob is stored as a directory containing:
 53        - ``.conf``: Configuration file (dtype, shape, block_size)
 54        - ``.stat``: State file (length, block list)
 55        - Block files: UUID-named memory-mapped tensor files
 56
 57        Parameters
 58        ----------
 59        filename : str or Path
 60            Directory path for blob storage. Supports tilde expansion (~) and
 61            relative paths.
 62        mode : str, default="r"
 63            File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
 64        dtype : str or torch.dtype, optional
 65            Data type for tensors. Required for new blobs (modes 'w', 'w+').
 66        shape : tuple of int or int, optional
 67            Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
 68        block_size : int, default=8192
 69            Number of tensors per memory-mapped block file.
 70
 71        Returns
 72        -------
 73        TensorBlob
 74            Opened blob object. Use with context manager for automatic cleanup.
 75
 76        Raises
 77        ------
 78        FileNotFoundError
 79            If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
 80        ValueError
 81            If creating new blob without dtype or shape, or if mode is invalid.
 82        TypeError
 83            If dtype is neither string nor torch.dtype.
 84
 85        Examples
 86        --------
 87        Creating a new blob and writing data:
 88
 89        >>> import torch
 90        >>> from tensorblob import TensorBlob
 91        >>>
 92        >>> with TensorBlob.open("data/embeddings", "w",
 93        ...                       dtype="float32", shape=(768,)) as blob:
 94        ...     embeddings = torch.randn(1000, 768)
 95        ...     blob.write(embeddings)
 96        ...     print(f"Wrote {len(blob)} tensors")
 97        Wrote 1000 tensors
 98
 99        Reading from existing blob:
100
101        >>> with TensorBlob.open("data/embeddings", "r") as blob:
102        ...     all_data = blob.read()
103        ...     print(all_data.shape)
104        torch.Size([1000, 768])
105
106        Appending to existing blob:
107
108        >>> with TensorBlob.open("data/embeddings", "a") as blob:
109        ...     new_data = torch.randn(100, 768)
110        ...     blob.write(new_data)
111        ...     print(f"Total: {len(blob)}")
112        Total: 1100
113
114        Read and update with r+ mode:
115
116        >>> with TensorBlob.open("data/embeddings", "r+") as blob:
117        ...     first_10 = blob.read(size=10)
118        ...     blob.seek(5)
119        ...     blob.write(torch.ones(3, 768))  # Overwrite at position 5
120
121        Custom block size for large tensors:
122
123        >>> with TensorBlob.open("data/images", "w",
124        ...                       dtype=torch.float32,
125        ...                       shape=(3, 1024, 1024),
126        ...                       block_size=256) as blob:
127        ...     images = torch.randn(1000, 3, 1024, 1024)
128        ...     blob.write(images)
129
130        File Access Modes
131        -----------------
132        Similar to Python's built-in open(), supports the following modes:
133
134        Basic modes:
135        - 'r'  : Read-only. Blob must exist. Position starts at beginning.
136        - 'w'  : Write-only. Creates new or truncates existing. Position at start. **If the blob already exists,
137                   truncation will ignore any other parameters supplied and rely on existing configuration.**
138        - 'a'  : Append-only. Blob must exist. Position starts at end.
139                All writes go to end regardless of seek position.
140
141        Update modes (with '+'):
142        - 'r+' : Read and write. Blob must exist. Position at start.
143                   Can overwrite existing data or extend at end.
144        - 'w+' : Read and write. Creates new or truncates existing. Position at start.
145        - 'a+' : Read and append. Blob must exist. Position at end.
146                   Reads allowed anywhere, writes always append to end.
147
148        Data Type and Shape
149        -------------------
150        All tensors in a blob must have the same dtype and shape. These are
151        specified when creating a new blob (modes 'w', 'w+') and stored in
152        the configuration file. When opening existing blobs, dtype and shape
153        are loaded automatically.
154
155        Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc.
156        Can also use torch.dtype objects like torch.float32.
157
158        Shape can be:
159        - Single integer: shape=10 creates 1D tensors of shape (10,)
160        - Tuple: shape=(3, 224, 224) creates 3D tensors
161        """
162        modes = set(mode)
163        if modes - set("raw+") or len(mode) > len(modes):
164            raise ValueError("Invalid mode: %s" % mode)
165        if sum(c in "raw" for c in mode) != 1 or mode.count("+") > 1:
166            raise ValueError(
167                "Must have exactly one of read/write/append mode and at most one plus: %s"
168                % mode
169            )
170
171        filename = Path(filename).expanduser().resolve()
172        if not filename.exists():
173            if "r" in modes or "a" in modes:
174                raise FileNotFoundError("Blob not found: %r" % filename)
175            if dtype is None or shape is None:
176                raise ValueError(
177                    "Arguments ``dtype`` and ``shape`` are required for new blob; got: %r and %r"
178                    % (dtype, shape)
179                )
180            if isinstance(dtype, torch.dtype):
181                dtype = str(dtype).split(".").pop()
182            elif not isinstance(dtype, str):
183                raise TypeError(
184                    "dtype must be str or torch.dtype, got %r" % type(dtype).__name__
185                )
186            shape = (shape,) if isinstance(shape, int) else tuple(shape)
187            return cls(os.fspath(filename), dtype, shape, block_size, mode)
188
189        return cls.from_config(
190            save_directory=filename,
191            runtime_kwargs={"mode": mode, "filename": os.fspath(filename)},
192        )

Open a TensorBlob with file-like interface for tensor storage.

TensorBlob provides persistent, memory-mapped storage for large collections of same-shaped tensors. It uses a block-based architecture where tensors are organized into fixed-size blocks for efficient I/O and memory management.

The blob is stored as a directory containing:

  • .conf: Configuration file (dtype, shape, block_size)
  • .stat: State file (length, block list)
  • Block files: UUID-named memory-mapped tensor files
Parameters
  • filename (str or Path): Directory path for blob storage. Supports tilde expansion (~) and relative paths.
  • mode (str, default="r"): File access mode ('r', 'w', 'a', 'r+', 'w+', 'a+'). See below for details.
  • dtype (str or torch.dtype, optional): Data type for tensors. Required for new blobs (modes 'w', 'w+').
  • shape (tuple of int or int, optional): Shape of individual tensors. Required for new blobs (modes 'w', 'w+').
  • block_size (int, default=8192): Number of tensors per memory-mapped block file.
Returns
  • TensorBlob: Opened blob object. Use with context manager for automatic cleanup.
Raises
  • FileNotFoundError: If mode is 'r', 'r+', 'a', or 'a+' and blob doesn't exist.
  • ValueError: If creating new blob without dtype or shape, or if mode is invalid.
  • TypeError: If dtype is neither string nor torch.dtype.
Examples

Creating a new blob and writing data:

>>> import torch
>>> from tensorblob import TensorBlob
>>>
>>> with TensorBlob.open("data/embeddings", "w",
...                       dtype="float32", shape=(768,)) as blob:
...     embeddings = torch.randn(1000, 768)
...     blob.write(embeddings)
...     print(f"Wrote {len(blob)} tensors")
Wrote 1000 tensors

Reading from existing blob:

>>> with TensorBlob.open("data/embeddings", "r") as blob:
...     all_data = blob.read()
...     print(all_data.shape)
torch.Size([1000, 768])

Appending to existing blob:

>>> with TensorBlob.open("data/embeddings", "a") as blob:
...     new_data = torch.randn(100, 768)
...     blob.write(new_data)
...     print(f"Total: {len(blob)}")
Total: 1100

Read and update with r+ mode:

>>> with TensorBlob.open("data/embeddings", "r+") as blob:
...     first_10 = blob.read(size=10)
...     blob.seek(5)
...     blob.write(torch.ones(3, 768))  # Overwrite at position 5

Custom block size for large tensors:

>>> with TensorBlob.open("data/images", "w",
...                       dtype=torch.float32,
...                       shape=(3, 1024, 1024),
...                       block_size=256) as blob:
...     images = torch.randn(1000, 3, 1024, 1024)
...     blob.write(images)
File Access Modes

Similar to Python's built-in open(), supports the following modes:

Basic modes:

  • 'r' : Read-only. Blob must exist. Position starts at beginning.
  • 'w' : Write-only. Creates new or truncates existing. Position at start. If the blob already exists, truncation will ignore any other parameters supplied and rely on existing configuration.
  • 'a' : Append-only. Blob must exist. Position starts at end. All writes go to end regardless of seek position.

Update modes (with '+'):

  • 'r+' : Read and write. Blob must exist. Position at start. Can overwrite existing data or extend at end.
  • 'w+' : Read and write. Creates new or truncates existing. Position at start.
  • 'a+' : Read and append. Blob must exist. Position at end. Reads allowed anywhere, writes always append to end.
Data Type and Shape

All tensors in a blob must have the same dtype and shape. These are specified when creating a new blob (modes 'w', 'w+') and stored in the configuration file. When opening existing blobs, dtype and shape are loaded automatically.

Supported dtypes: "float32", "float64", "int32", "int64", "bool", etc. Can also use torch.dtype objects like torch.float32.

Shape can be:

  • Single integer: shape=10 creates 1D tensors of shape (10,)
  • Tuple: shape=(3, 224, 224) creates 3D tensors
@classmethod
def apply_param_hooks(cls, d):
194    @classmethod
195    def apply_param_hooks(cls, d):
196        d["shape"] = tuple(d["shape"])
197        return d

Apply post-processing hooks to the JSON dictionary.

orjson.loads only decode configs to primitive types, which may not be directly consumable by the class initializer. For instance, a dataclass object will be loaded as a dictionary. Therefore, this method is intended to be overridden by the subclass to perform additional post-processing on the loaded config dictionary.

Note that, it is highly discouraged to abuse this method to deserialize complex objects and one should consider using runtime_kwargs argument of from_config instead, to explicitly pass the complex objects to the class initializer.

By default, this method returns the input dictionary unchanged.

Parameters
  • jdict (dict[str, Any]): The config dictionary after deserialization.
Returns
  • dict[str, Any]: The config dictionary after post-processing.
filename
dtype
shape
block_size
mode
configpath: str
233    @property
234    def configpath(self) -> str:
235        return os.path.join(self.filename, self.config_name)
statuspath: str
237    @property
238    def statuspath(self) -> str:
239        return os.path.join(self.filename, self.status_name)
closed: bool
241    @property
242    def closed(self) -> bool:
243        return self._closed
def tell(self) -> int:
363    def tell(self) -> int:
364        self._checkclosed()
365        return self._pos
def seek(self, pos: int = 0, whence: int = 0) -> int:
367    def seek(self, pos: int = 0, whence: int = io.SEEK_SET) -> int:
368        self._checkclosed()
369        match whence:
370            case io.SEEK_SET:
371                _pos = pos
372            case io.SEEK_CUR:
373                _pos = self._pos + pos
374            case io.SEEK_END:
375                _pos = len(self) + pos
376            case _:
377                raise ValueError("Invalid whence: %r" % whence)
378        self._pos = max(min(_pos, len(self)), 0)
379        return self.tell()
def close(self) -> None:
381    def close(self) -> None:
382        if not self._closed and self._m_wr:
383            self.flush()
384        self._closed = True
def flush(self) -> None:
386    def flush(self) -> None:
387        self._checkwritable()
388        self._status.dump(self.statuspath)
def read(self, size: int | None = None) -> torch.Tensor:
390    def read(self, size: int | None = None) -> torch.Tensor:
391        self._checkreadable()
392        end = min(self._pos + (size or len(self)), len(self))
393        ret = self[self._pos : end]
394        self.seek(end)
395        return ret
def write(self, ts: torch.Tensor) -> int:
397    def write(self, ts: torch.Tensor) -> int:
398        self._checkwritable()
399        if self._m_ap:
400            self.seek(whence=io.SEEK_END)
401        ts = ts.view(-1, *self.shape)
402        for t in ts:
403            if self._isfull() and self._pos >= len(self):
404                self._addblock()
405            i, o = divmod(self._pos, self.block_size)
406            self._getblock(i)[o] = t
407            self._status.len += self._pos >= len(self)
408            self._pos += 1
409        return len(ts)
def truncate(self, pos: int | None = None) -> int:
411    def truncate(self, pos: int | None = None) -> int:
412        self._checkwritable()
413        self.seek(pos or self.tell())
414        brk = ceil(self.tell() / self.block_size)
415        for bd in self._status.bds[brk:]:
416            os.remove(self._memmap.pop(bd).filename)
417        self._status.bds = self._status.bds[:brk]
418        self._status.len = self.tell()
419        self.flush()
420        return self.tell()
def extend( self, other: TensorBlob, maintain_order: bool = False) -> None:
422    def extend(self, other: TensorBlob, maintain_order: bool = False) -> None:
423        if self.dtype != other.dtype or self.shape != other.shape:
424            raise ValueError("Blob data types and shapes must match to extend blobs!")
425
426        self._checkwritable()
427        self.seek(whence=io.SEEK_END)
428        if maintain_order:
429            for i in range(len(other)):
430                self.write(other[i])
431            return
432
433        # If order is not important, we can simply copy over the complete blocks from
434        # the other blob and merge incomplete blocks.
435        if self.block_size != other.block_size:
436            raise ValueError(
437                "Block sizes must match to extend blobs in non-order-preserving mode!"
438            )
439
440        comb = []
441        sbrk = len(self) // self.block_size * self.block_size
442        if sbrk < len(self):
443            comb.append(self[sbrk:])
444        obrk = len(other) // other.block_size * other.block_size
445        if obrk < len(other):
446            comb.append(other[obrk:])
447
448        # TODO: We are directly accessing internal data structures of the other blob here.
449        self.truncate(sbrk)
450        for obd in other._status.bds[: len(other) // other.block_size]:
451            sbd = str(uuid.uuid4())
452            shutil.copy(
453                os.path.join(other.filename, obd), os.path.join(self.filename, sbd)
454            )
455            self._status.bds.append(sbd)
456            self._status.len += self.block_size
457            self._memmap[sbd] = MemoryMappedTensor.from_filename(
458                os.path.join(self.filename, sbd),
459                dtype=getattr(torch, self.dtype),
460                shape=(self.block_size, *self.shape),
461            )
462
463        self.seek(whence=io.SEEK_END)
464        if comb:
465            self.write(torch.cat(comb, dim=0))
466        self.flush()