Coverage for /Users/ajo/work/jumpstarter/jumpstarter/packages/jumpstarter/jumpstarter/streams/encoding.py: 53%

58 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 17:10 +0200

1import bz2 

2import lzma 

3import zlib 

4from dataclasses import dataclass 

5from enum import StrEnum 

6from typing import Any, Callable, Mapping 

7 

8from anyio import ClosedResourceError, EndOfStream 

9from anyio.abc import AnyByteStream, ObjectStream 

10 

11 

12class Compression(StrEnum): 

13 GZIP = "gzip" 

14 XZ = "xz" 

15 BZ2 = "bz2" 

16 

17 

18@dataclass(kw_only=True) 

19class CompressedStream(ObjectStream[bytes]): 

20 stream: AnyByteStream 

21 decompressor: Any 

22 compressor: Any 

23 

24 async def send(self, item: bytes) -> None: 

25 if self.compressor is None: 

26 raise ClosedResourceError 

27 

28 await self.stream.send(self.compressor.compress(item)) 

29 

30 async def receive(self) -> bytes: 

31 return self.decompressor.decompress(await self.stream.receive()) 

32 

33 async def send_eof(self) -> None: 

34 await self._flush() 

35 await self.stream.send_eof() 

36 

37 async def aclose(self) -> None: 

38 await self._flush() 

39 await self.stream.aclose() 

40 

41 async def _flush(self) -> None: 

42 if self.compressor is None: 

43 return 

44 

45 await self.stream.send(self.compressor.flush()) 

46 self.compressor = None 

47 

48 @property 

49 def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: 

50 return self.stream.extra_attributes 

51 

52 

53@dataclass(kw_only=True) 

54class ZlibCompressedStream(CompressedStream): 

55 async def receive(self) -> bytes: 

56 if self.decompressor is None: 

57 raise EndOfStream 

58 

59 try: 

60 return self.decompressor.decompress(await self.stream.receive()) 

61 except EndOfStream: 

62 data = self.decompressor.flush() 

63 self.decompressor = None 

64 return data 

65 

66 

67def compress_stream(stream: AnyByteStream, compression: Compression | None) -> AnyByteStream: 

68 match compression: 

69 case None: 

70 return stream 

71 case Compression.GZIP: 

72 return ZlibCompressedStream( 

73 stream=stream, 

74 compressor=zlib.compressobj(wbits=31), 

75 decompressor=zlib.decompressobj(wbits=47), 

76 ) 

77 case Compression.XZ: 

78 return CompressedStream( 

79 stream=stream, 

80 compressor=lzma.LZMACompressor(), 

81 decompressor=lzma.LZMADecompressor(), 

82 ) 

83 case Compression.BZ2: 

84 return CompressedStream( 

85 stream=stream, 

86 compressor=bz2.BZ2Compressor(), 

87 decompressor=bz2.BZ2Decompressor(), 

88 )