Coverage for src/epublib/source.py: 100%
64 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 12:41 -0300
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-07 12:41 -0300
1import zipfile
2from datetime import datetime
3from pathlib import Path
4from typing import IO, Literal, Protocol, override
5from zipfile import ZipInfo
7from epublib.exceptions import EPUBError
10def zip_info_now():
11 now = datetime.now()
13 if now.year > 2107:
14 print("to aqui")
15 now = now.replace(year=2107, month=12, day=31)
17 return (
18 now.year,
19 now.month,
20 now.day,
21 now.hour,
22 now.minute,
23 now.second,
24 )
27class SourceProtocol(Protocol):
28 """Protocol for a source of EPUB data."""
30 def getinfo(self, name: str) -> ZipInfo: ...
31 def open(
32 self,
33 name: str | ZipInfo,
34 mode: Literal["r", "w"] = "r",
35 pwd: bytes | None = None,
36 *,
37 force_zip64: bool = False,
38 ) -> IO[bytes]: ...
39 def infolist(self) -> list[ZipInfo]: ...
40 def close(self) -> None: ...
41 @property
42 def closed(self) -> bool: ...
45class SinkProtocol(Protocol):
46 """Protocol for a sink of EPUB data."""
48 def writestr(
49 self,
50 zinfo_or_arcname: str | ZipInfo,
51 data: bytes | str,
52 compress_type: int | None = None,
53 compresslevel: int | None = None,
54 ) -> None: ...
57class DirectorySource(SourceProtocol):
58 """An EPUB source that reads the book from a directory on the filesystem (an 'unzipped' EPUB)."""
60 def __init__(self, path: str | Path):
61 self.path: Path = Path(path)
62 if not self.path.is_dir():
63 raise EPUBError(f"'{path}' is not a directory")
64 self._closed: bool = False
66 @override
67 def infolist(self) -> list[ZipInfo]:
68 return [
69 self._to_zipinfo(str(file.relative_to(self.path)))
70 for file in self.path.rglob("*")
71 if not file.is_dir()
72 ]
74 @override
75 def getinfo(self, name: str | Path) -> ZipInfo:
76 target = self.path / name
77 if target.is_file():
78 return self._to_zipinfo(str(name))
79 raise KeyError(f"There is no item named '{name}' in the folder")
81 def _to_zipinfo(self, name: str):
82 return ZipInfo.from_file(
83 self.path / name,
84 arcname=name,
85 strict_timestamps=False,
86 )
88 @override
89 def open(
90 self,
91 name: str | Path | ZipInfo,
92 mode: Literal["r", "w"] = "r",
93 pwd: bytes | None = None,
94 *,
95 force_zip64: bool = False,
96 ) -> IO[bytes]:
97 if isinstance(name, ZipInfo):
98 filename = self.path / name.filename
99 else:
100 filename = self.path / name
102 return open(filename, mode + "b")
104 @override
105 def close(self) -> None:
106 self._closed = True
108 @property
109 @override
110 def closed(self) -> bool:
111 return self._closed
114class DirectorySink(SinkProtocol):
115 """
116 An EPUB sink that writes the book to a directory on the filesystem
117 (as an 'unzipped' EPUB).
118 """
120 def __init__(self, path: str | Path) -> None:
121 self.path: Path = Path(path)
122 if not self.path.is_dir():
123 raise EPUBError(f"'{path}' is not a directory")
125 @override
126 def writestr(
127 self,
128 zinfo_or_arcname: str | Path | ZipInfo,
129 data: bytes | str,
130 compress_type: int | None = None,
131 compresslevel: int | None = None,
132 ) -> None:
133 if isinstance(zinfo_or_arcname, ZipInfo):
134 filename = zinfo_or_arcname.filename
135 else:
136 filename = zinfo_or_arcname
138 full_path = self.path / filename
140 if str(full_path.parent) != ".":
141 full_path.parent.mkdir(exist_ok=True, parents=True)
143 mode = "w" if isinstance(data, str) else "wb"
144 with open(full_path, mode) as f:
145 __ = f.write(data)
148class ZipFile(zipfile.ZipFile):
149 @property
150 def closed(self) -> bool:
151 return self.fp is None or self.fp.closed