nrp_cmd.async_client.streams
Data sources and sinks.
1# 2# Copyright (C) 2024 CESNET z.s.p.o. 3# 4# invenio-nrp is free software; you can redistribute it and/or 5# modify it under the terms of the MIT License; see LICENSE file for more 6# details. 7# 8"""Data sources and sinks.""" 9 10from .base import DataSink, DataSource, InputStream, OutputStream, SinkState 11from .file import FileSink, FileSource 12from .memory import MemorySink, MemorySource 13from .stdin import StdInDataSource 14 15__all__ = ( 16 "DataSink", 17 "DataSource", 18 "SinkState", 19 "InputStream", 20 "OutputStream", 21 "MemorySink", 22 "MemorySource", 23 "FileSink", 24 "FileSource", 25 "StdInDataSource", 26)
53class DataSink(Protocol): 54 """Protocol for data sinks.""" 55 56 async def allocate(self, size: int) -> None: 57 """Allocate space for the sink. 58 59 :param size: The size of the sink in bytes. 60 """ 61 ... 62 63 async def open_chunk(self, offset: int = 0) -> OutputStream: 64 """Get a writer for the sink, starting at the given offset. 65 66 :param offset: The offset in bytes from the start of the sink. 67 :return: A writer for the sink. 68 """ 69 ... 70 71 async def close(self) -> None: 72 """Close the sink and all unclosed writers.""" 73 ... 74 75 @property 76 def state(self) -> SinkState: 77 """Return the current state of the sink.""" 78 ...
Protocol for data sinks.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
56 async def allocate(self, size: int) -> None: 57 """Allocate space for the sink. 58 59 :param size: The size of the sink in bytes. 60 """ 61 ...
Allocate space for the sink.
Parameters
- size: The size of the sink in bytes.
63 async def open_chunk(self, offset: int = 0) -> OutputStream: 64 """Get a writer for the sink, starting at the given offset. 65 66 :param offset: The offset in bytes from the start of the sink. 67 :return: A writer for the sink. 68 """ 69 ...
Get a writer for the sink, starting at the given offset.
Parameters
- offset: The offset in bytes from the start of the sink.
Returns
A writer for the sink.
81class DataSource(Protocol): 82 """Protocol for data sources.""" 83 84 has_range_support: bool = False 85 86 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: 87 """Open the data source for reading. 88 89 :param offset: where to start reading from 90 :param count: how many bytes to read, if None, read until the end 91 :return: a reader for the data source 92 """ 93 ... 94 95 async def size(self) -> int: 96 """Return the length of the data source in bytes.""" 97 ... 98 99 async def content_type(self) -> str: 100 """Return the content type of the data source.""" 101 ... 102 103 async def close(self) -> None: 104 """Close the data source.""" 105 ...
Protocol for data sources.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
86 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: 87 """Open the data source for reading. 88 89 :param offset: where to start reading from 90 :param count: how many bytes to read, if None, read until the end 91 :return: a reader for the data source 92 """ 93 ...
Open the data source for reading.
Parameters
- offset: where to start reading from
- count: how many bytes to read, if None, read until the end
Returns
a reader for the data source
42class SinkState(StrEnum): 43 """State of the sink.""" 44 45 NOT_ALLOCATED = auto() 46 """Sink's space has not been allocated yet.""" 47 ALLOCATED = auto() 48 """Sink's space has been allocated and reserved on the filesystem/memory.""" 49 CLOSED = auto() 50 """Sink has been closed."""
State of the sink.
Sink's space has been allocated and reserved on the filesystem/memory.
16class InputStream(Protocol): 17 """Protocol for data readers.""" 18 19 async def read(self, n: int = -1) -> bytes: 20 ... 21 22 def __aiter__(self) -> AsyncIterator[bytes]: 23 ... 24 25 async def close(self) -> None: 26 ... 27 28 async def __len__(self) -> int: 29 ...
Protocol for data readers.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
32class OutputStream(Protocol): 33 """Protocol for data writers.""" 34 35 async def write(self, data: bytes) -> int: 36 ... 37 38 async def close(self) -> None: 39 ...
Protocol for data writers.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
15class MemorySink(DataSink): 16 """Implementation of a sink that writes data to memory.""" 17 18 def __init__(self): 19 """Initialize the sink.""" 20 self._state = SinkState.NOT_ALLOCATED 21 self._buffer = None 22 23 async def allocate(self, size: int) -> None: 24 """Allocate space for the sink.""" 25 self._buffer = bytearray(size) 26 self._state = SinkState.ALLOCATED 27 28 async def open_chunk(self, offset: int = 0) -> OutputStream: # type: ignore 29 """Open a chunk of the sink for writing.""" 30 if self._state != SinkState.ALLOCATED: 31 raise RuntimeError("Sink not allocated") 32 33 return MemoryWriter(self._buffer, offset) # noqa 34 35 async def close(self) -> None: 36 """Close the sink.""" 37 self._state = SinkState.CLOSED 38 39 @property 40 def state(self) -> SinkState: 41 """Return the state of the sink.""" 42 return self._state
Implementation of a sink that writes data to memory.
18 def __init__(self): 19 """Initialize the sink.""" 20 self._state = SinkState.NOT_ALLOCATED 21 self._buffer = None
Initialize the sink.
23 async def allocate(self, size: int) -> None: 24 """Allocate space for the sink.""" 25 self._buffer = bytearray(size) 26 self._state = SinkState.ALLOCATED
Allocate space for the sink.
28 async def open_chunk(self, offset: int = 0) -> OutputStream: # type: ignore 29 """Open a chunk of the sink for writing.""" 30 if self._state != SinkState.ALLOCATED: 31 raise RuntimeError("Sink not allocated") 32 33 return MemoryWriter(self._buffer, offset) # noqa
Open a chunk of the sink for writing.
45class MemorySource(DataSource): 46 """A data source that reads data from memory.""" 47 48 has_range_support = True 49 50 def __init__(self, data: bytes, content_type: str): 51 """Initialize the data source. 52 53 :param data: the data to be read 54 :param content_type: the content type of the data 55 """ 56 self._data = data 57 self._content_type = content_type 58 59 async def open( 60 self, offset: int = 0, count: int | None = None 61 ) -> InputStream: 62 """Open the data source for reading.""" 63 if count is not None: 64 return MemoryReader(self._data[offset : offset + count]) 65 else: 66 return MemoryReader(self._data[offset:]) 67 68 async def size(self) -> int: 69 """Return the size of the data.""" 70 return len(self._data) 71 72 async def content_type(self) -> str: 73 """Return the content type of the data.""" 74 return self._content_type 75 76 async def close(self) -> None: 77 """Close the data source.""" 78 pass
A data source that reads data from memory.
50 def __init__(self, data: bytes, content_type: str): 51 """Initialize the data source. 52 53 :param data: the data to be read 54 :param content_type: the content type of the data 55 """ 56 self._data = data 57 self._content_type = content_type
Initialize the data source.
Parameters
- data: the data to be read
- content_type: the content type of the data
59 async def open( 60 self, offset: int = 0, count: int | None = None 61 ) -> InputStream: 62 """Open the data source for reading.""" 63 if count is not None: 64 return MemoryReader(self._data[offset : offset + count]) 65 else: 66 return MemoryReader(self._data[offset:])
Open the data source for reading.
21class FileSink(DataSink): 22 """Implementation of a sink that writes data to filesystem.""" 23 24 def __init__(self, fpath: Path): 25 """Initialize the sink. 26 27 :param fpath: The path to the file where the data will be written. 28 """ 29 self._fpath = fpath 30 self._state = SinkState.NOT_ALLOCATED 31 self._file: FileOutputStream | None = None 32 33 async def allocate(self, size: int) -> None: 34 """Allocate space for the sink.""" 35 self._file = await open_file(self._fpath, mode="wb") 36 await self._file.truncate(size) 37 self._state = SinkState.ALLOCATED 38 39 async def open_chunk(self, offset: int = 0) -> OutputStream: # type: ignore 40 """Open a chunk of the sink for writing.""" 41 if self._state != SinkState.ALLOCATED: 42 raise RuntimeError("Sink not allocated") 43 44 chunk = await open_file(self._fpath, mode="r+b") 45 await chunk.seek(offset) 46 return chunk 47 48 async def close(self) -> None: 49 """Close the sink.""" 50 if self._file is not None: 51 with contextlib.suppress(Exception): 52 await self._file.close() 53 self._file = None 54 55 self._state = SinkState.CLOSED 56 57 @property 58 def state(self) -> SinkState: 59 """Return the current state of the sink.""" 60 return self._state 61 62 def __repr__(self): 63 """Return a string representation of the sink.""" 64 return f"<{self.__class__.__name__} {self._fpath} {self._state}>"
Implementation of a sink that writes data to filesystem.
24 def __init__(self, fpath: Path): 25 """Initialize the sink. 26 27 :param fpath: The path to the file where the data will be written. 28 """ 29 self._fpath = fpath 30 self._state = SinkState.NOT_ALLOCATED 31 self._file: FileOutputStream | None = None
Initialize the sink.
Parameters
- fpath: The path to the file where the data will be written.
33 async def allocate(self, size: int) -> None: 34 """Allocate space for the sink.""" 35 self._file = await open_file(self._fpath, mode="wb") 36 await self._file.truncate(size) 37 self._state = SinkState.ALLOCATED
Allocate space for the sink.
39 async def open_chunk(self, offset: int = 0) -> OutputStream: # type: ignore 40 """Open a chunk of the sink for writing.""" 41 if self._state != SinkState.ALLOCATED: 42 raise RuntimeError("Sink not allocated") 43 44 chunk = await open_file(self._fpath, mode="r+b") 45 await chunk.seek(offset) 46 return chunk
Open a chunk of the sink for writing.
67class FileSource(DataSource): 68 """A data source that reads data from a file.""" 69 70 has_range_support = True 71 72 def __init__(self, file_name: Path | str): 73 """Initialize the data source. 74 75 :param file_name: The name of the file to read from, must exist on the filesystem 76 """ 77 if isinstance(file_name, str): 78 file_name = Path(file_name) 79 self._file_name = file_name 80 81 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: # type: ignore 82 """Open the file for reading.""" 83 ret = await open_file(self._file_name, mode="rb") 84 await ret.seek(offset) 85 if not count: 86 return BoundedStream(ret, await self.size()) 87 else: 88 return BoundedStream(ret, count) 89 90 async def size(self) -> int: 91 """Return the size of the file.""" 92 return (await file_stat(self._file_name)).st_size 93 94 async def content_type(self) -> str: 95 """Return the content type of the file.""" 96 f = await open_file(self._file_name, mode="rb") 97 try: 98 data = await f.read(2048) 99 return magic.from_buffer(data, mime=True) 100 finally: 101 await f.close() 102 103 async def close(self) -> None: 104 """Close the data source.""" 105 pass
A data source that reads data from a file.
72 def __init__(self, file_name: Path | str): 73 """Initialize the data source. 74 75 :param file_name: The name of the file to read from, must exist on the filesystem 76 """ 77 if isinstance(file_name, str): 78 file_name = Path(file_name) 79 self._file_name = file_name
Initialize the data source.
Parameters
- file_name: The name of the file to read from, must exist on the filesystem
81 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: # type: ignore 82 """Open the file for reading.""" 83 ret = await open_file(self._file_name, mode="rb") 84 await ret.seek(offset) 85 if not count: 86 return BoundedStream(ret, await self.size()) 87 else: 88 return BoundedStream(ret, count)
Open the file for reading.
90 async def size(self) -> int: 91 """Return the size of the file.""" 92 return (await file_stat(self._file_name)).st_size
Return the size of the file.
94 async def content_type(self) -> str: 95 """Return the content type of the file.""" 96 f = await open_file(self._file_name, mode="rb") 97 try: 98 data = await f.read(2048) 99 return magic.from_buffer(data, mime=True) 100 finally: 101 await f.close()
Return the content type of the file.
17class StdInDataSource(DataSource): 18 """A data source that reads data from standard input.""" 19 20 def __init__(self) -> None: 21 super().__init__() 22 self._opened = False 23 24 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: 25 """Open the data source for reading.""" 26 if self._opened: 27 raise RuntimeError("Cannot open the same data source multiple times.") 28 self._opened = True 29 if count is not None: 30 raise ValueError("Cannot read a bounded stream from standard input.") 31 if offset != 0: 32 raise ValueError("Cannot seek in standard input.") 33 ret = await open_file(Path("/sys/stdin"), mode="rb") 34 return ret 35 36 async def size(self) -> int: 37 """Return the size of the data - in this case -1 as unknown.""" 38 return -1 39 40 async def content_type(self) -> str: 41 """Return the content type of the data.""" 42 return "application/octet-stream" 43 44 async def close(self) -> None: 45 """Close the data source.""" 46 pass
A data source that reads data from standard input.
24 async def open(self, offset: int = 0, count: int | None = None) -> InputStream: 25 """Open the data source for reading.""" 26 if self._opened: 27 raise RuntimeError("Cannot open the same data source multiple times.") 28 self._opened = True 29 if count is not None: 30 raise ValueError("Cannot read a bounded stream from standard input.") 31 if offset != 0: 32 raise ValueError("Cannot seek in standard input.") 33 ret = await open_file(Path("/sys/stdin"), mode="rb") 34 return ret
Open the data source for reading.
36 async def size(self) -> int: 37 """Return the size of the data - in this case -1 as unknown.""" 38 return -1
Return the size of the data - in this case -1 as unknown.