Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/streams/encoding.py: 53%
58 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 17:10 +0200
1import bz2
2import lzma
3import zlib
4from dataclasses import dataclass
5from enum import StrEnum
6from typing import Any, Callable, Mapping
8from anyio import ClosedResourceError, EndOfStream
9from anyio.abc import AnyByteStream, ObjectStream
12class Compression(StrEnum):
13 GZIP = "gzip"
14 XZ = "xz"
15 BZ2 = "bz2"
18@dataclass(kw_only=True)
19class CompressedStream(ObjectStream[bytes]):
20 stream: AnyByteStream
21 decompressor: Any
22 compressor: Any
24 async def send(self, item: bytes) -> None:
25 if self.compressor is None:
26 raise ClosedResourceError
28 await self.stream.send(self.compressor.compress(item))
30 async def receive(self) -> bytes:
31 return self.decompressor.decompress(await self.stream.receive())
33 async def send_eof(self) -> None:
34 await self._flush()
35 await self.stream.send_eof()
37 async def aclose(self) -> None:
38 await self._flush()
39 await self.stream.aclose()
41 async def _flush(self) -> None:
42 if self.compressor is None:
43 return
45 await self.stream.send(self.compressor.flush())
46 self.compressor = None
48 @property
49 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
50 return self.stream.extra_attributes
53@dataclass(kw_only=True)
54class ZlibCompressedStream(CompressedStream):
55 async def receive(self) -> bytes:
56 if self.decompressor is None:
57 raise EndOfStream
59 try:
60 return self.decompressor.decompress(await self.stream.receive())
61 except EndOfStream:
62 data = self.decompressor.flush()
63 self.decompressor = None
64 return data
67def compress_stream(stream: AnyByteStream, compression: Compression | None) -> AnyByteStream:
68 match compression:
69 case None:
70 return stream
71 case Compression.GZIP:
72 return ZlibCompressedStream(
73 stream=stream,
74 compressor=zlib.compressobj(wbits=31),
75 decompressor=zlib.decompressobj(wbits=47),
76 )
77 case Compression.XZ:
78 return CompressedStream(
79 stream=stream,
80 compressor=lzma.LZMACompressor(),
81 decompressor=lzma.LZMADecompressor(),
82 )
83 case Compression.BZ2:
84 return CompressedStream(
85 stream=stream,
86 compressor=bz2.BZ2Compressor(),
87 decompressor=bz2.BZ2Decompressor(),
88 )