Coverage for src/epublib/resources/manager.py: 93%
254 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 16:07 -0300
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 16:07 -0300
1from collections.abc import Callable, Generator, Iterable
2from pathlib import Path
3from typing import Literal, TypedDict, Unpack, cast, overload
5import bs4
7from epublib.exceptions import EPUBError
8from epublib.identifier import EPUBId
9from epublib.mediatype import Category, MediaType
10from epublib.nav.resource import NavigationDocument
11from epublib.ncx.resource import NCXFile
12from epublib.package.manifest import BookManifest, ManifestItem
13from epublib.package.resource import PackageDocument, resource_to_manifest_item
14from epublib.package.spine import BookSpine, SpineItemRef
15from epublib.resources import (
16 ContentDocument,
17 PublicationResource,
18 Resource,
19 XMLResource,
20)
21from epublib.util import (
22 attr_to_str,
23 get_absolute_href,
24 get_relative_href,
25 normalize_path,
26 split_fragment,
27)
29type ResourceIdentifier = str | Path | EPUBId | ManifestItem | SpineItemRef
30type ResourceQuery = type[Resource] | MediaType | Category | str
33class AddResourceOptions(TypedDict, total=False):
34 is_cover: bool
35 after: Resource | ResourceIdentifier | None
36 before: Resource | ResourceIdentifier | None
37 add_to_manifest: bool | None
38 identifier: str | EPUBId | None
39 add_to_spine: bool | None
40 spine_position: int | None
41 linear: bool | None
42 add_to_toc: bool | None
43 toc_position: int | None
44 add_to_ncx: bool | None
45 ncx_position: int | None
48def ri_to_filename(
49 identifier: ResourceIdentifier,
50 manifest: BookManifest,
51) -> str:
52 """
53 Convert various resource identifier types to its corresponding filename
54 """
56 if isinstance(identifier, ManifestItem):
57 return identifier.filename
59 if isinstance(identifier, (EPUBId, SpineItemRef)):
60 return manifest[identifier].filename
62 return str(identifier)
65def ri_to_id(
66 identifier: ResourceIdentifier,
67 manifest: BookManifest,
68) -> EPUBId:
69 """
70 Convert various resource identifier types to its corresponding EPUBId
71 """
73 if isinstance(identifier, ManifestItem):
74 return identifier.id
76 if isinstance(identifier, EPUBId):
77 return identifier
79 if isinstance(identifier, SpineItemRef):
80 return identifier.idref
82 return manifest[identifier].id
85class ResourceManager:
86 def __init__(
87 self,
88 resources_list: Iterable[Resource],
89 container_file: XMLResource,
90 package_document: PackageDocument,
91 nav_getter: Callable[[], NavigationDocument | None] = lambda: None,
92 ncx_getter: Callable[[], NCXFile | None] = lambda: None,
93 ):
94 self._resources: list[Resource] = list(resources_list)
95 self.container_file: XMLResource = container_file
96 self.package_document: PackageDocument = package_document
97 self._get_nav: Callable[[], NavigationDocument | None] = nav_getter
98 self._get_ncx: Callable[[], NCXFile | None] = ncx_getter
100 def ri_to_filename(self, identifier: ResourceIdentifier) -> str:
101 return ri_to_filename(identifier, self.manifest)
103 def ri_to_id(self, identifier: ResourceIdentifier) -> EPUBId:
104 return ri_to_id(identifier, self.manifest)
106 @property
107 def manifest(self) -> BookManifest:
108 return self.package_document.manifest
110 @property
111 def spine(self) -> BookSpine:
112 return self.package_document.spine
114 @property
115 def ncx(self) -> NCXFile | None:
116 return self._get_ncx()
118 @property
119 def nav(self) -> NavigationDocument | None:
120 return self._get_nav()
122 @overload
123 def filter[R: Resource](self, query: type[R]) -> Generator[R]: ...
124 @overload
125 def filter(self, query: type[Resource] = Resource) -> Generator[Resource]: ...
126 @overload
127 def filter(
128 self, query: Literal[MediaType.XHTML, MediaType.IMAGE_SVG]
129 ) -> Generator[ContentDocument]: ...
130 @overload
131 def filter(self, query: Literal[MediaType.NCX]) -> Generator[NCXFile]: ...
132 @overload
133 def filter(self, query: MediaType | Category) -> Generator[PublicationResource]: ...
135 def filter(self, query: ResourceQuery = Resource) -> Generator[Resource]:
136 if isinstance(query, type):
137 yield from (
138 resource for resource in self._resources if isinstance(resource, query)
139 )
140 elif isinstance(query, Category):
141 yield from (
142 resource
143 for resource in self._resources
144 if isinstance(resource, PublicationResource)
145 and isinstance(resource.media_type, MediaType)
146 and resource.media_type.category is query
147 )
148 else:
149 yield from (
150 resource
151 for resource in self._resources
152 if isinstance(resource, PublicationResource)
153 and resource.media_type == MediaType.coalesce(query)
154 )
156 @overload
157 def get[R: PublicationResource](
158 self, identifier: EPUBId | ManifestItem, cls: type[R]
159 ) -> R | None: ...
160 @overload
161 def get(
162 self,
163 identifier: EPUBId | ManifestItem | SpineItemRef,
164 cls: type[PublicationResource] = PublicationResource,
165 ) -> PublicationResource | None: ...
166 @overload
167 def get[R: Resource](self, identifier: str | Path, cls: type[R]) -> R | None: ...
168 @overload
169 def get(
170 self, identifier: str | Path, cls: type[Resource] = Resource
171 ) -> Resource | None: ...
173 def get(
174 self, identifier: ResourceIdentifier, cls: type[Resource] = Resource
175 ) -> Resource | None:
176 identifier = self.ri_to_filename(identifier)
178 return next(
179 (
180 resource
181 for resource in self.filter(cls)
182 if resource.filename == identifier
183 ),
184 None,
185 )
187 @overload
188 def __getitem__(self, identifier: slice) -> list[Resource]: ...
189 @overload
190 def __getitem__(self, identifier: ResourceIdentifier | int) -> Resource: ...
191 def __getitem__(self, identifier: ResourceIdentifier | int | slice):
192 if isinstance(identifier, (int, slice)):
193 return self._resources[identifier]
195 resource = self.get(identifier)
196 if resource is None:
197 raise KeyError(identifier)
199 return resource
201 def __iter__(self) -> Generator[Resource]:
202 yield from self._resources
204 def __len__(self) -> int:
205 return len(self._resources)
207 def _resolve_position(
208 self,
209 default: int,
210 position: int | None = None,
211 after: Resource | None = None,
212 before: Resource | None = None,
213 ):
214 if after and position is None:
215 try:
216 return self._resources.index(after) + 1
217 except ValueError as error:
218 raise EPUBError(
219 f"resource provided as argument 'after' ('{after}') "
220 "must be part of this epub"
221 ) from error
222 if before and position is None:
223 try:
224 return self._resources.index(before) - 1
225 except ValueError as error:
226 raise EPUBError(
227 f"resource provided as argument 'before' ('{after}') "
228 "must be part of this epub"
229 ) from error
230 if position:
231 return position
232 return default
234 @staticmethod
235 def _should_be_manifested(resource: Resource) -> bool:
236 return Path(resource.filename).parts[0] != "META-INF"
238 @staticmethod
239 def _should_be_in_spine(resource: Resource) -> bool:
240 return isinstance(resource, ContentDocument)
242 @staticmethod
243 def _should_be_spine_linear(_resource: Resource) -> bool:
244 return True
246 def add_to_manifest[T: Resource](
247 self,
248 resource: T,
249 media_type: MediaType | str | None = None,
250 identifier: EPUBId | str | None = None,
251 fallback: str | None = None,
252 media_overlay: str | None = None,
253 is_cover: bool = False,
254 is_nav: bool = False,
255 properties: list[str] | None = None,
256 detect_properties: bool = True,
257 exists_ok: bool = True,
258 ) -> tuple[T, ManifestItem]:
259 """
260 Add a resource to the manifest, if not already present. The
261 resource may be promoted to a PublicationResource if needed, so
262 the resource is returned as well.
263 """
264 manifest_item = self.manifest.get(resource.filename)
265 if manifest_item:
266 if exists_ok:
267 return resource, manifest_item
268 raise EPUBError(f"Resource '{resource.filename}' already in manifest")
270 # Promoting to PublicationResource
271 if not isinstance(resource, PublicationResource):
272 new_resource = PublicationResource.from_resource(resource, media_type)
273 try:
274 index = self._resources.index(resource)
275 self._resources[index] = new_resource
276 except ValueError:
277 pass
279 resource = new_resource
281 manifest_item = resource_to_manifest_item(
282 resource,
283 self.package_document,
284 media_type=media_type,
285 identifier=identifier,
286 fallback=fallback,
287 media_overlay=media_overlay,
288 is_cover=is_cover,
289 is_nav=is_nav,
290 properties=properties,
291 detect_properties=detect_properties,
292 )
293 __ = self.manifest.add_item(manifest_item)
295 return resource, manifest_item
297 def add(
298 self,
299 resource: Resource,
300 is_cover: bool = False,
301 position: int | None = None,
302 after: Resource | ResourceIdentifier | None = None,
303 before: Resource | ResourceIdentifier | None = None,
304 add_to_manifest: bool | None = None,
305 identifier: str | EPUBId | None = None,
306 add_to_spine: bool | None = None,
307 spine_position: int | None = None,
308 linear: bool | None = None,
309 add_to_toc: bool | None = None,
310 toc_position: int | None = None,
311 add_to_ncx: bool | None = None,
312 ncx_position: int | None = None,
313 ) -> None:
314 is_nav = isinstance(resource, NavigationDocument)
316 if not isinstance(after, Resource) and after is not None:
317 after = self.get(after)
318 if not isinstance(before, Resource) and before is not None:
319 before = self.get(before)
321 position = self._resolve_position(len(self._resources), position, after, before)
322 self._resources.insert(position, resource)
324 if add_to_manifest is False and add_to_spine:
325 raise EPUBError("Cannot add to spine without adding to manifest")
327 if add_to_manifest is False and add_to_toc:
328 raise EPUBError(
329 "Cannot update navigation document without adding to manifest"
330 )
332 if add_to_manifest is None:
333 add_to_manifest = add_to_spine or self._should_be_manifested(resource)
335 if add_to_spine is None:
336 add_to_spine = add_to_manifest and self._should_be_in_spine(resource)
338 if add_to_toc is None:
339 add_to_toc = add_to_spine
341 if add_to_ncx and not self.ncx:
342 raise EPUBError.missing_ncx(self, "add_resource", "add_to_ncx")
344 if add_to_ncx is None:
345 add_to_ncx = self.ncx is not None and add_to_toc
347 if ncx_position is None:
348 ncx_position = toc_position
350 manifest_item: None | ManifestItem = None
352 if add_to_manifest:
353 resource, manifest_item = self.add_to_manifest(
354 resource,
355 identifier=identifier,
356 is_cover=is_cover,
357 is_nav=is_nav,
358 exists_ok=False,
359 )
361 if spine_position is None:
362 spine_position = len(self.spine.items)
364 if add_to_spine:
365 if linear is None:
366 linear = self._should_be_spine_linear(resource)
367 spine_item = SpineItemRef(
368 name=manifest_item.id,
369 linear=linear,
370 )
371 __ = self.spine.insert_item(spine_position, spine_item)
373 if add_to_toc and self.nav:
374 __ = self.nav.add_to_toc(
375 resource.filename,
376 resource.get_title(),
377 position=toc_position,
378 )
380 if add_to_ncx and self.ncx:
381 __ = self.ncx.add_to_nav_map(
382 resource.filename,
383 resource.get_title(),
384 position=ncx_position,
385 )
387 def insert(
388 self,
389 position: int,
390 resource: Resource,
391 **kwargs: Unpack[AddResourceOptions],
392 ) -> None:
393 return self.add(resource, **kwargs, position=position)
395 def append(
396 self,
397 resource: Resource,
398 **kwargs: Unpack[AddResourceOptions],
399 ) -> None:
400 return self.add(resource, **kwargs)
402 def remove(
403 self,
404 resource: ResourceIdentifier | Resource,
405 remove_css_js_links: bool = False,
406 ):
407 """
408 Remove a resource from this EPUB. If it is a CSS or JS file,
409 you can set the remove_css_js_links flag To remove any link
410 from content documents to it.
411 """
413 if not isinstance(resource, Resource):
414 res = self.get(resource)
415 if res is None:
416 raise EPUBError(
417 f"Can't remove resource '{resource}' not in this epub ('{self}')"
418 )
420 resource = res
422 elif resource not in self:
423 raise EPUBError(f"Resource '{resource}' not in EPUB")
425 if resource is self.package_document:
426 raise EPUBError("Can't remove package document")
428 if resource is self.container_file:
429 raise EPUBError("Can't remove container file")
431 elif self.nav:
432 self.nav.remove(resource.filename)
434 if self.ncx and resource is not self.ncx:
435 self.ncx.remove(resource.filename)
437 self.package_document.remove(resource.filename)
438 self._resources.remove(resource)
440 if remove_css_js_links:
441 if (
442 not isinstance(resource, PublicationResource)
443 or isinstance(resource.media_type, str)
444 or not (resource.media_type.is_css() or resource.media_type.is_js())
445 ):
446 raise EPUBError(
447 "Can't remove CSS and JavaScript links for file "
448 "that is neither CSS nor JavaScript"
449 )
451 for res in self.filter(ContentDocument):
452 relative_href = get_relative_href(res.filename, resource.filename)
453 for tag in res.soup.find_all(
454 "link",
455 rel="stylesheet",
456 href=relative_href,
457 ):
458 tag.decompose()
459 for tag in res.soup.find_all(
460 "script",
461 src=relative_href,
462 ):
463 tag.decompose()
465 def rename(
466 self,
467 resource: ResourceIdentifier | Resource,
468 new_filename: str,
469 update_references: bool = True,
470 reference_attrs: list[str] | None = None,
471 ):
472 """
473 Rename the resource, optionally updating references to it
474 """
476 if not isinstance(resource, Resource):
477 res = self.get(resource)
478 if res is None:
479 raise EPUBError(
480 f"Can't rename resource '{resource}' not in this epub ('{self}')"
481 )
483 resource = res
485 elif resource not in self:
486 raise EPUBError(
487 f"Can't rename resource '{resource}' not in this epub ('{self}')"
488 )
490 if resource is self.container_file:
491 raise EPUBError("Can't rename container file")
493 if reference_attrs is None:
494 reference_attrs = ["href", "src", "full-path", "xlink:href"]
495 selector = ", ".join(f"[{attr.replace(':', '|')}]" for attr in reference_attrs)
497 if update_references:
498 for other_resource in self.filter(XMLResource):
499 if other_resource == resource:
500 continue
502 old_ref = get_relative_href(other_resource.filename, resource.filename)
503 new_ref = get_relative_href(other_resource.filename, new_filename)
505 for tag in other_resource.soup.select(selector):
506 for attr in reference_attrs:
507 value = attr_to_str(tag.get(attr))
508 if value is not None:
509 if attr == "full-path":
510 if resource.filename == value:
511 tag[attr] = new_filename
512 else:
513 ref, identifier = split_fragment(value)
514 if ref == old_ref:
515 tag[attr] = new_ref + (
516 f"#{identifier}" if identifier else ""
517 )
519 if isinstance(resource, XMLResource):
520 prefix = get_relative_href(new_filename, Path(resource.filename)).parent
521 if str(prefix) != ".":
522 soup = cast(bs4.BeautifulSoup, resource.soup)
523 for tag in soup.select(selector):
524 for attr in reference_attrs:
525 value = attr_to_str(tag.get(attr))
526 if value is not None:
527 ref, identifier = split_fragment(value)
528 if ref:
529 new_ref = str(normalize_path(prefix / ref))
530 tag[attr] = new_ref
532 resource.filename = new_filename
534 @overload
535 def resolve_href[R: Resource](
536 self,
537 href: str,
538 with_tag: Literal[True],
539 relative_to: Resource | ResourceIdentifier | None,
540 cls: type[R],
541 ) -> tuple[R, bs4.Tag | None] | tuple[None, None]: ...
543 @overload
544 def resolve_href[R: Resource](
545 self,
546 href: str,
547 with_tag: Literal[False],
548 relative_to: Resource | ResourceIdentifier | None,
549 cls: type[R],
550 ) -> R | None: ...
552 @overload
553 def resolve_href(
554 self,
555 href: str,
556 with_tag: Literal[True] = True,
557 relative_to: Resource | ResourceIdentifier | None = None,
558 cls: type[XMLResource] = XMLResource,
559 ) -> tuple[XMLResource | None, bs4.Tag | None] | tuple[None, None]: ...
561 @overload
562 def resolve_href(
563 self,
564 href: str,
565 with_tag: Literal[False],
566 relative_to: Resource | ResourceIdentifier | None = None,
567 cls: type[Resource] = Resource,
568 ) -> Resource | None: ...
570 def resolve_href(
571 self,
572 href: str,
573 with_tag: bool = True,
574 relative_to: Resource | ResourceIdentifier | None = None,
575 cls: type[Resource] = Resource,
576 ) -> tuple[Resource | None, bs4.Tag | None] | tuple[None, None] | Resource | None:
577 """
578 Resolve an href (possibly with a fragment identifier) to a
579 resource. Optionally return the tag of the matched fragment
580 within that resource.
581 """
583 if relative_to is not None:
584 if isinstance(relative_to, Resource):
585 relative_to = relative_to.filename
586 else:
587 relative_to = self.ri_to_filename(relative_to)
589 filename = get_absolute_href(relative_to, href)
590 else:
591 filename = href
593 filename, identifier = split_fragment(filename)
594 resource = self.get(filename, cls)
596 if not with_tag:
597 return resource
599 if resource is None:
600 return None, None
602 if not isinstance(resource, XMLResource):
603 return resource, None
605 resource = cast(XMLResource, resource)
606 return resource, cast(
607 bs4.Tag, resource.soup.find(id=identifier)
608 ) if identifier is not None else None