nrp_cmd.async_client.streams

Data sources and sinks.

 1#
 2# Copyright (C) 2024 CESNET z.s.p.o.
 3#
 4# invenio-nrp is free software; you can redistribute it and/or
 5# modify it under the terms of the MIT License; see LICENSE file for more
 6# details.
 7#
 8"""Data sources and sinks."""
 9
10from .base import DataSink, DataSource, InputStream, OutputStream, SinkState
11from .file import FileSink, FileSource
12from .memory import MemorySink, MemorySource
13from .stdin import StdInDataSource
14
15__all__ = (
16    "DataSink",
17    "DataSource",
18    "SinkState",
19    "InputStream",
20    "OutputStream",
21    "MemorySink",
22    "MemorySource",
23    "FileSink",
24    "FileSource",
25    "StdInDataSource",
26)
class DataSink(typing.Protocol):
53class DataSink(Protocol):
54    """Protocol for data sinks."""
55
56    async def allocate(self, size: int) -> None:
57        """Allocate space for the sink.
58
59        :param size: The size of the sink in bytes.
60        """
61        ...
62
63    async def open_chunk(self, offset: int = 0) -> OutputStream:
64        """Get a writer for the sink, starting at the given offset.
65
66        :param offset: The offset in bytes from the start of the sink.
67        :return: A writer for the sink.
68        """
69        ...
70
71    async def close(self) -> None:
72        """Close the sink and all unclosed writers."""
73        ...
74
75    @property
76    def state(self) -> SinkState:
77        """Return the current state of the sink."""
78        ...

Protocol for data sinks.

DataSink(*args, **kwargs)
1767def _no_init_or_replace_init(self, *args, **kwargs):
1768    cls = type(self)
1769
1770    if cls._is_protocol:
1771        raise TypeError('Protocols cannot be instantiated')
1772
1773    # Already using a custom `__init__`. No need to calculate correct
1774    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1775    if cls.__init__ is not _no_init_or_replace_init:
1776        return
1777
1778    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1779    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1780    # searches for a proper new `__init__` in the MRO. The new `__init__`
1781    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1782    # instantiation of the protocol subclass will thus use the new
1783    # `__init__` and no longer call `_no_init_or_replace_init`.
1784    for base in cls.__mro__:
1785        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1786        if init is not _no_init_or_replace_init:
1787            cls.__init__ = init
1788            break
1789    else:
1790        # should not happen
1791        cls.__init__ = object.__init__
1792
1793    cls.__init__(self, *args, **kwargs)
async def allocate(self, size: int) -> None:
56    async def allocate(self, size: int) -> None:
57        """Allocate space for the sink.
58
59        :param size: The size of the sink in bytes.
60        """
61        ...

Allocate space for the sink.

Parameters
  • size: The size of the sink in bytes.
async def open_chunk(self, offset: int = 0) -> OutputStream:
63    async def open_chunk(self, offset: int = 0) -> OutputStream:
64        """Get a writer for the sink, starting at the given offset.
65
66        :param offset: The offset in bytes from the start of the sink.
67        :return: A writer for the sink.
68        """
69        ...

Get a writer for the sink, starting at the given offset.

Parameters
  • offset: The offset in bytes from the start of the sink.
Returns

A writer for the sink.

async def close(self) -> None:
71    async def close(self) -> None:
72        """Close the sink and all unclosed writers."""
73        ...

Close the sink and all unclosed writers.

state: SinkState
75    @property
76    def state(self) -> SinkState:
77        """Return the current state of the sink."""
78        ...

Return the current state of the sink.

class DataSource(typing.Protocol):
 81class DataSource(Protocol):
 82    """Protocol for data sources."""
 83
 84    has_range_support: bool = False
 85
 86    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:
 87        """Open the data source for reading.
 88
 89        :param offset:      where to start reading from
 90        :param count:       how many bytes to read, if None, read until the end
 91        :return:            a reader for the data source
 92        """
 93        ...
 94
 95    async def size(self) -> int:
 96        """Return the length of the data source in bytes."""
 97        ...
 98
 99    async def content_type(self) -> str:
100        """Return the content type of the data source."""
101        ...
102
103    async def close(self) -> None:
104        """Close the data source."""
105        ...

Protocol for data sources.

DataSource(*args, **kwargs)
1767def _no_init_or_replace_init(self, *args, **kwargs):
1768    cls = type(self)
1769
1770    if cls._is_protocol:
1771        raise TypeError('Protocols cannot be instantiated')
1772
1773    # Already using a custom `__init__`. No need to calculate correct
1774    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1775    if cls.__init__ is not _no_init_or_replace_init:
1776        return
1777
1778    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1779    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1780    # searches for a proper new `__init__` in the MRO. The new `__init__`
1781    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1782    # instantiation of the protocol subclass will thus use the new
1783    # `__init__` and no longer call `_no_init_or_replace_init`.
1784    for base in cls.__mro__:
1785        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1786        if init is not _no_init_or_replace_init:
1787            cls.__init__ = init
1788            break
1789    else:
1790        # should not happen
1791        cls.__init__ = object.__init__
1792
1793    cls.__init__(self, *args, **kwargs)
has_range_support: bool = False
async def open( self, offset: int = 0, count: int | None = None) -> InputStream:
86    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:
87        """Open the data source for reading.
88
89        :param offset:      where to start reading from
90        :param count:       how many bytes to read, if None, read until the end
91        :return:            a reader for the data source
92        """
93        ...

Open the data source for reading.

Parameters
  • offset: where to start reading from
  • count: how many bytes to read, if None, read until the end
Returns
        a reader for the data source
async def size(self) -> int:
95    async def size(self) -> int:
96        """Return the length of the data source in bytes."""
97        ...

Return the length of the data source in bytes.

async def content_type(self) -> str:
 99    async def content_type(self) -> str:
100        """Return the content type of the data source."""
101        ...

Return the content type of the data source.

async def close(self) -> None:
103    async def close(self) -> None:
104        """Close the data source."""
105        ...

Close the data source.

class SinkState(enum.StrEnum):
42class SinkState(StrEnum):
43    """State of the sink."""
44
45    NOT_ALLOCATED = auto()
46    """Sink's space has not been allocated yet."""
47    ALLOCATED = auto()
48    """Sink's space has been allocated and reserved on the filesystem/memory."""
49    CLOSED = auto()
50    """Sink has been closed."""

State of the sink.

NOT_ALLOCATED = <SinkState.NOT_ALLOCATED: 'not_allocated'>

Sink's space has not been allocated yet.

ALLOCATED = <SinkState.ALLOCATED: 'allocated'>

Sink's space has been allocated and reserved on the filesystem/memory.

CLOSED = <SinkState.CLOSED: 'closed'>

Sink has been closed.

class InputStream(typing.Protocol):
16class InputStream(Protocol):
17    """Protocol for data readers."""
18
19    async def read(self, n: int = -1) -> bytes:
20        ...
21        
22    def __aiter__(self) -> AsyncIterator[bytes]:
23        ...
24
25    async def close(self) -> None:
26        ...
27        
28    async def __len__(self) -> int:
29        ...

Protocol for data readers.

InputStream(*args, **kwargs)
1767def _no_init_or_replace_init(self, *args, **kwargs):
1768    cls = type(self)
1769
1770    if cls._is_protocol:
1771        raise TypeError('Protocols cannot be instantiated')
1772
1773    # Already using a custom `__init__`. No need to calculate correct
1774    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1775    if cls.__init__ is not _no_init_or_replace_init:
1776        return
1777
1778    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1779    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1780    # searches for a proper new `__init__` in the MRO. The new `__init__`
1781    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1782    # instantiation of the protocol subclass will thus use the new
1783    # `__init__` and no longer call `_no_init_or_replace_init`.
1784    for base in cls.__mro__:
1785        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1786        if init is not _no_init_or_replace_init:
1787            cls.__init__ = init
1788            break
1789    else:
1790        # should not happen
1791        cls.__init__ = object.__init__
1792
1793    cls.__init__(self, *args, **kwargs)
async def read(self, n: int = -1) -> bytes:
19    async def read(self, n: int = -1) -> bytes:
20        ...
async def close(self) -> None:
25    async def close(self) -> None:
26        ...
class OutputStream(typing.Protocol):
32class OutputStream(Protocol):
33    """Protocol for data writers."""
34
35    async def write(self, data: bytes) -> int:
36        ...
37
38    async def close(self) -> None:
39        ...

Protocol for data writers.

OutputStream(*args, **kwargs)
1767def _no_init_or_replace_init(self, *args, **kwargs):
1768    cls = type(self)
1769
1770    if cls._is_protocol:
1771        raise TypeError('Protocols cannot be instantiated')
1772
1773    # Already using a custom `__init__`. No need to calculate correct
1774    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1775    if cls.__init__ is not _no_init_or_replace_init:
1776        return
1777
1778    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1779    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1780    # searches for a proper new `__init__` in the MRO. The new `__init__`
1781    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1782    # instantiation of the protocol subclass will thus use the new
1783    # `__init__` and no longer call `_no_init_or_replace_init`.
1784    for base in cls.__mro__:
1785        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1786        if init is not _no_init_or_replace_init:
1787            cls.__init__ = init
1788            break
1789    else:
1790        # should not happen
1791        cls.__init__ = object.__init__
1792
1793    cls.__init__(self, *args, **kwargs)
async def write(self, data: bytes) -> int:
35    async def write(self, data: bytes) -> int:
36        ...
async def close(self) -> None:
38    async def close(self) -> None:
39        ...
class MemorySink(nrp_cmd.async_client.streams.DataSink):
15class MemorySink(DataSink):
16    """Implementation of a sink that writes data to memory."""
17
18    def __init__(self):
19        """Initialize the sink."""
20        self._state = SinkState.NOT_ALLOCATED
21        self._buffer = None
22
23    async def allocate(self, size: int) -> None:
24        """Allocate space for the sink."""
25        self._buffer = bytearray(size)
26        self._state = SinkState.ALLOCATED
27
28    async def open_chunk(self, offset: int = 0) -> OutputStream:  # type: ignore
29        """Open a chunk of the sink for writing."""
30        if self._state != SinkState.ALLOCATED:
31            raise RuntimeError("Sink not allocated")
32
33        return MemoryWriter(self._buffer, offset)  # noqa
34
35    async def close(self) -> None:
36        """Close the sink."""
37        self._state = SinkState.CLOSED
38
39    @property
40    def state(self) -> SinkState:
41        """Return the state of the sink."""
42        return self._state

Implementation of a sink that writes data to memory.

MemorySink()
18    def __init__(self):
19        """Initialize the sink."""
20        self._state = SinkState.NOT_ALLOCATED
21        self._buffer = None

Initialize the sink.

async def allocate(self, size: int) -> None:
23    async def allocate(self, size: int) -> None:
24        """Allocate space for the sink."""
25        self._buffer = bytearray(size)
26        self._state = SinkState.ALLOCATED

Allocate space for the sink.

async def open_chunk(self, offset: int = 0) -> OutputStream:
28    async def open_chunk(self, offset: int = 0) -> OutputStream:  # type: ignore
29        """Open a chunk of the sink for writing."""
30        if self._state != SinkState.ALLOCATED:
31            raise RuntimeError("Sink not allocated")
32
33        return MemoryWriter(self._buffer, offset)  # noqa

Open a chunk of the sink for writing.

async def close(self) -> None:
35    async def close(self) -> None:
36        """Close the sink."""
37        self._state = SinkState.CLOSED

Close the sink.

state: SinkState
39    @property
40    def state(self) -> SinkState:
41        """Return the state of the sink."""
42        return self._state

Return the state of the sink.

class MemorySource(nrp_cmd.async_client.streams.DataSource):
45class MemorySource(DataSource):
46    """A data source that reads data from memory."""
47
48    has_range_support = True
49
50    def __init__(self, data: bytes, content_type: str):
51        """Initialize the data source.
52
53        :param data:                the data to be read
54        :param content_type:        the content type of the data
55        """
56        self._data = data
57        self._content_type = content_type
58
59    async def open(
60        self, offset: int = 0, count: int | None = None
61    ) -> InputStream:
62        """Open the data source for reading."""
63        if count is not None:
64            return MemoryReader(self._data[offset : offset + count])
65        else:
66            return MemoryReader(self._data[offset:])
67
68    async def size(self) -> int: 
69        """Return the size of the data."""
70        return len(self._data)
71
72    async def content_type(self) -> str:
73        """Return the content type of the data."""
74        return self._content_type
75
76    async def close(self) -> None:
77        """Close the data source."""
78        pass

A data source that reads data from memory.

MemorySource(data: bytes, content_type: str)
50    def __init__(self, data: bytes, content_type: str):
51        """Initialize the data source.
52
53        :param data:                the data to be read
54        :param content_type:        the content type of the data
55        """
56        self._data = data
57        self._content_type = content_type

Initialize the data source.

Parameters
  • data: the data to be read
  • content_type: the content type of the data
has_range_support = True
async def open( self, offset: int = 0, count: int | None = None) -> InputStream:
59    async def open(
60        self, offset: int = 0, count: int | None = None
61    ) -> InputStream:
62        """Open the data source for reading."""
63        if count is not None:
64            return MemoryReader(self._data[offset : offset + count])
65        else:
66            return MemoryReader(self._data[offset:])

Open the data source for reading.

async def size(self) -> int:
68    async def size(self) -> int: 
69        """Return the size of the data."""
70        return len(self._data)

Return the size of the data.

async def content_type(self) -> str:
72    async def content_type(self) -> str:
73        """Return the content type of the data."""
74        return self._content_type

Return the content type of the data.

async def close(self) -> None:
76    async def close(self) -> None:
77        """Close the data source."""
78        pass

Close the data source.

class FileSink(nrp_cmd.async_client.streams.DataSink):
21class FileSink(DataSink):
22    """Implementation of a sink that writes data to filesystem."""
23
24    def __init__(self, fpath: Path):
25        """Initialize the sink.
26
27        :param fpath: The path to the file where the data will be written.
28        """
29        self._fpath = fpath
30        self._state = SinkState.NOT_ALLOCATED
31        self._file: FileOutputStream | None = None
32
33    async def allocate(self, size: int) -> None:
34        """Allocate space for the sink."""
35        self._file = await open_file(self._fpath, mode="wb")
36        await self._file.truncate(size)
37        self._state = SinkState.ALLOCATED
38
39    async def open_chunk(self, offset: int = 0) -> OutputStream:  # type: ignore
40        """Open a chunk of the sink for writing."""
41        if self._state != SinkState.ALLOCATED:
42            raise RuntimeError("Sink not allocated")
43
44        chunk = await open_file(self._fpath, mode="r+b")
45        await chunk.seek(offset)
46        return chunk
47
48    async def close(self) -> None:
49        """Close the sink."""
50        if self._file is not None:
51            with contextlib.suppress(Exception):
52                await self._file.close()
53        self._file = None
54
55        self._state = SinkState.CLOSED
56
57    @property
58    def state(self) -> SinkState:
59        """Return the current state of the sink."""
60        return self._state
61
62    def __repr__(self):
63        """Return a string representation of the sink."""
64        return f"<{self.__class__.__name__} {self._fpath} {self._state}>"

Implementation of a sink that writes data to filesystem.

FileSink(fpath: pathlib.Path)
24    def __init__(self, fpath: Path):
25        """Initialize the sink.
26
27        :param fpath: The path to the file where the data will be written.
28        """
29        self._fpath = fpath
30        self._state = SinkState.NOT_ALLOCATED
31        self._file: FileOutputStream | None = None

Initialize the sink.

Parameters
  • fpath: The path to the file where the data will be written.
async def allocate(self, size: int) -> None:
33    async def allocate(self, size: int) -> None:
34        """Allocate space for the sink."""
35        self._file = await open_file(self._fpath, mode="wb")
36        await self._file.truncate(size)
37        self._state = SinkState.ALLOCATED

Allocate space for the sink.

async def open_chunk(self, offset: int = 0) -> OutputStream:
39    async def open_chunk(self, offset: int = 0) -> OutputStream:  # type: ignore
40        """Open a chunk of the sink for writing."""
41        if self._state != SinkState.ALLOCATED:
42            raise RuntimeError("Sink not allocated")
43
44        chunk = await open_file(self._fpath, mode="r+b")
45        await chunk.seek(offset)
46        return chunk

Open a chunk of the sink for writing.

async def close(self) -> None:
48    async def close(self) -> None:
49        """Close the sink."""
50        if self._file is not None:
51            with contextlib.suppress(Exception):
52                await self._file.close()
53        self._file = None
54
55        self._state = SinkState.CLOSED

Close the sink.

state: SinkState
57    @property
58    def state(self) -> SinkState:
59        """Return the current state of the sink."""
60        return self._state

Return the current state of the sink.

class FileSource(nrp_cmd.async_client.streams.DataSource):
 67class FileSource(DataSource):
 68    """A data source that reads data from a file."""
 69
 70    has_range_support = True
 71
 72    def __init__(self, file_name: Path | str):
 73        """Initialize the data source.
 74
 75        :param file_name: The name of the file to read from, must exist on the filesystem
 76        """
 77        if isinstance(file_name, str):
 78            file_name = Path(file_name)
 79        self._file_name = file_name
 80
 81    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:  # type: ignore
 82        """Open the file for reading."""
 83        ret = await open_file(self._file_name, mode="rb")
 84        await ret.seek(offset)
 85        if not count:
 86            return BoundedStream(ret, await self.size())
 87        else:
 88            return BoundedStream(ret, count)
 89
 90    async def size(self) -> int:
 91        """Return the size of the file."""
 92        return (await file_stat(self._file_name)).st_size
 93
 94    async def content_type(self) -> str:
 95        """Return the content type of the file."""
 96        f = await open_file(self._file_name, mode="rb")
 97        try:
 98            data = await f.read(2048)
 99            return magic.from_buffer(data, mime=True)
100        finally:
101            await f.close()
102
103    async def close(self) -> None:
104        """Close the data source."""
105        pass

A data source that reads data from a file.

FileSource(file_name: pathlib.Path | str)
72    def __init__(self, file_name: Path | str):
73        """Initialize the data source.
74
75        :param file_name: The name of the file to read from, must exist on the filesystem
76        """
77        if isinstance(file_name, str):
78            file_name = Path(file_name)
79        self._file_name = file_name

Initialize the data source.

Parameters
  • file_name: The name of the file to read from, must exist on the filesystem
has_range_support = True
async def open( self, offset: int = 0, count: int | None = None) -> InputStream:
81    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:  # type: ignore
82        """Open the file for reading."""
83        ret = await open_file(self._file_name, mode="rb")
84        await ret.seek(offset)
85        if not count:
86            return BoundedStream(ret, await self.size())
87        else:
88            return BoundedStream(ret, count)

Open the file for reading.

async def size(self) -> int:
90    async def size(self) -> int:
91        """Return the size of the file."""
92        return (await file_stat(self._file_name)).st_size

Return the size of the file.

async def content_type(self) -> str:
 94    async def content_type(self) -> str:
 95        """Return the content type of the file."""
 96        f = await open_file(self._file_name, mode="rb")
 97        try:
 98            data = await f.read(2048)
 99            return magic.from_buffer(data, mime=True)
100        finally:
101            await f.close()

Return the content type of the file.

async def close(self) -> None:
103    async def close(self) -> None:
104        """Close the data source."""
105        pass

Close the data source.

class StdInDataSource(nrp_cmd.async_client.streams.DataSource):
17class StdInDataSource(DataSource):
18    """A data source that reads data from standard input."""
19
20    def __init__(self) -> None:
21        super().__init__()
22        self._opened = False
23
24    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:
25        """Open the data source for reading."""
26        if self._opened:
27            raise RuntimeError("Cannot open the same data source multiple times.")
28        self._opened = True
29        if count is not None:
30            raise ValueError("Cannot read a bounded stream from standard input.")
31        if offset != 0:
32            raise ValueError("Cannot seek in standard input.")
33        ret = await open_file(Path("/sys/stdin"), mode="rb")
34        return ret
35
36    async def size(self) -> int:
37        """Return the size of the data - in this case -1 as unknown."""
38        return -1
39
40    async def content_type(self) -> str:
41        """Return the content type of the data."""
42        return "application/octet-stream"
43
44    async def close(self) -> None:
45        """Close the data source."""
46        pass

A data source that reads data from standard input.

async def open( self, offset: int = 0, count: int | None = None) -> InputStream:
24    async def open(self, offset: int = 0, count: int | None = None) -> InputStream:
25        """Open the data source for reading."""
26        if self._opened:
27            raise RuntimeError("Cannot open the same data source multiple times.")
28        self._opened = True
29        if count is not None:
30            raise ValueError("Cannot read a bounded stream from standard input.")
31        if offset != 0:
32            raise ValueError("Cannot seek in standard input.")
33        ret = await open_file(Path("/sys/stdin"), mode="rb")
34        return ret

Open the data source for reading.

async def size(self) -> int:
36    async def size(self) -> int:
37        """Return the size of the data - in this case -1 as unknown."""
38        return -1

Return the size of the data - in this case -1 as unknown.

async def content_type(self) -> str:
40    async def content_type(self) -> str:
41        """Return the content type of the data."""
42        return "application/octet-stream"

Return the content type of the data.

async def close(self) -> None:
44    async def close(self) -> None:
45        """Close the data source."""
46        pass

Close the data source.