Coverage for src/epublib/source.py: 100%

64 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-07 12:41 -0300

1import zipfile 

2from datetime import datetime 

3from pathlib import Path 

4from typing import IO, Literal, Protocol, override 

5from zipfile import ZipInfo 

6 

7from epublib.exceptions import EPUBError 

8 

9 

10def zip_info_now(): 

11 now = datetime.now() 

12 

13 if now.year > 2107: 

14 print("to aqui") 

15 now = now.replace(year=2107, month=12, day=31) 

16 

17 return ( 

18 now.year, 

19 now.month, 

20 now.day, 

21 now.hour, 

22 now.minute, 

23 now.second, 

24 ) 

25 

26 

27class SourceProtocol(Protocol): 

28 """Protocol for a source of EPUB data.""" 

29 

30 def getinfo(self, name: str) -> ZipInfo: ... 

31 def open( 

32 self, 

33 name: str | ZipInfo, 

34 mode: Literal["r", "w"] = "r", 

35 pwd: bytes | None = None, 

36 *, 

37 force_zip64: bool = False, 

38 ) -> IO[bytes]: ... 

39 def infolist(self) -> list[ZipInfo]: ... 

40 def close(self) -> None: ... 

41 @property 

42 def closed(self) -> bool: ... 

43 

44 

45class SinkProtocol(Protocol): 

46 """Protocol for a sink of EPUB data.""" 

47 

48 def writestr( 

49 self, 

50 zinfo_or_arcname: str | ZipInfo, 

51 data: bytes | str, 

52 compress_type: int | None = None, 

53 compresslevel: int | None = None, 

54 ) -> None: ... 

55 

56 

57class DirectorySource(SourceProtocol): 

58 """An EPUB source that reads the book from a directory on the filesystem (an 'unzipped' EPUB).""" 

59 

60 def __init__(self, path: str | Path): 

61 self.path: Path = Path(path) 

62 if not self.path.is_dir(): 

63 raise EPUBError(f"'{path}' is not a directory") 

64 self._closed: bool = False 

65 

66 @override 

67 def infolist(self) -> list[ZipInfo]: 

68 return [ 

69 self._to_zipinfo(str(file.relative_to(self.path))) 

70 for file in self.path.rglob("*") 

71 if not file.is_dir() 

72 ] 

73 

74 @override 

75 def getinfo(self, name: str | Path) -> ZipInfo: 

76 target = self.path / name 

77 if target.is_file(): 

78 return self._to_zipinfo(str(name)) 

79 raise KeyError(f"There is no item named '{name}' in the folder") 

80 

81 def _to_zipinfo(self, name: str): 

82 return ZipInfo.from_file( 

83 self.path / name, 

84 arcname=name, 

85 strict_timestamps=False, 

86 ) 

87 

88 @override 

89 def open( 

90 self, 

91 name: str | Path | ZipInfo, 

92 mode: Literal["r", "w"] = "r", 

93 pwd: bytes | None = None, 

94 *, 

95 force_zip64: bool = False, 

96 ) -> IO[bytes]: 

97 if isinstance(name, ZipInfo): 

98 filename = self.path / name.filename 

99 else: 

100 filename = self.path / name 

101 

102 return open(filename, mode + "b") 

103 

104 @override 

105 def close(self) -> None: 

106 self._closed = True 

107 

108 @property 

109 @override 

110 def closed(self) -> bool: 

111 return self._closed 

112 

113 

114class DirectorySink(SinkProtocol): 

115 """ 

116 An EPUB sink that writes the book to a directory on the filesystem 

117 (as an 'unzipped' EPUB). 

118 """ 

119 

120 def __init__(self, path: str | Path) -> None: 

121 self.path: Path = Path(path) 

122 if not self.path.is_dir(): 

123 raise EPUBError(f"'{path}' is not a directory") 

124 

125 @override 

126 def writestr( 

127 self, 

128 zinfo_or_arcname: str | Path | ZipInfo, 

129 data: bytes | str, 

130 compress_type: int | None = None, 

131 compresslevel: int | None = None, 

132 ) -> None: 

133 if isinstance(zinfo_or_arcname, ZipInfo): 

134 filename = zinfo_or_arcname.filename 

135 else: 

136 filename = zinfo_or_arcname 

137 

138 full_path = self.path / filename 

139 

140 if str(full_path.parent) != ".": 

141 full_path.parent.mkdir(exist_ok=True, parents=True) 

142 

143 mode = "w" if isinstance(data, str) else "wb" 

144 with open(full_path, mode) as f: 

145 __ = f.write(data) 

146 

147 

148class ZipFile(zipfile.ZipFile): 

149 @property 

150 def closed(self) -> bool: 

151 return self.fp is None or self.fp.closed