Coverage for src/epublib/resources/manager.py: 93%

254 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 16:07 -0300

1from collections.abc import Callable, Generator, Iterable 

2from pathlib import Path 

3from typing import Literal, TypedDict, Unpack, cast, overload 

4 

5import bs4 

6 

7from epublib.exceptions import EPUBError 

8from epublib.identifier import EPUBId 

9from epublib.mediatype import Category, MediaType 

10from epublib.nav.resource import NavigationDocument 

11from epublib.ncx.resource import NCXFile 

12from epublib.package.manifest import BookManifest, ManifestItem 

13from epublib.package.resource import PackageDocument, resource_to_manifest_item 

14from epublib.package.spine import BookSpine, SpineItemRef 

15from epublib.resources import ( 

16 ContentDocument, 

17 PublicationResource, 

18 Resource, 

19 XMLResource, 

20) 

21from epublib.util import ( 

22 attr_to_str, 

23 get_absolute_href, 

24 get_relative_href, 

25 normalize_path, 

26 split_fragment, 

27) 

28 

29type ResourceIdentifier = str | Path | EPUBId | ManifestItem | SpineItemRef 

30type ResourceQuery = type[Resource] | MediaType | Category | str 

31 

32 

33class AddResourceOptions(TypedDict, total=False): 

34 is_cover: bool 

35 after: Resource | ResourceIdentifier | None 

36 before: Resource | ResourceIdentifier | None 

37 add_to_manifest: bool | None 

38 identifier: str | EPUBId | None 

39 add_to_spine: bool | None 

40 spine_position: int | None 

41 linear: bool | None 

42 add_to_toc: bool | None 

43 toc_position: int | None 

44 add_to_ncx: bool | None 

45 ncx_position: int | None 

46 

47 

48def ri_to_filename( 

49 identifier: ResourceIdentifier, 

50 manifest: BookManifest, 

51) -> str: 

52 """ 

53 Convert various resource identifier types to its corresponding filename 

54 """ 

55 

56 if isinstance(identifier, ManifestItem): 

57 return identifier.filename 

58 

59 if isinstance(identifier, (EPUBId, SpineItemRef)): 

60 return manifest[identifier].filename 

61 

62 return str(identifier) 

63 

64 

65def ri_to_id( 

66 identifier: ResourceIdentifier, 

67 manifest: BookManifest, 

68) -> EPUBId: 

69 """ 

70 Convert various resource identifier types to its corresponding EPUBId 

71 """ 

72 

73 if isinstance(identifier, ManifestItem): 

74 return identifier.id 

75 

76 if isinstance(identifier, EPUBId): 

77 return identifier 

78 

79 if isinstance(identifier, SpineItemRef): 

80 return identifier.idref 

81 

82 return manifest[identifier].id 

83 

84 

85class ResourceManager: 

86 def __init__( 

87 self, 

88 resources_list: Iterable[Resource], 

89 container_file: XMLResource, 

90 package_document: PackageDocument, 

91 nav_getter: Callable[[], NavigationDocument | None] = lambda: None, 

92 ncx_getter: Callable[[], NCXFile | None] = lambda: None, 

93 ): 

94 self._resources: list[Resource] = list(resources_list) 

95 self.container_file: XMLResource = container_file 

96 self.package_document: PackageDocument = package_document 

97 self._get_nav: Callable[[], NavigationDocument | None] = nav_getter 

98 self._get_ncx: Callable[[], NCXFile | None] = ncx_getter 

99 

100 def ri_to_filename(self, identifier: ResourceIdentifier) -> str: 

101 return ri_to_filename(identifier, self.manifest) 

102 

103 def ri_to_id(self, identifier: ResourceIdentifier) -> EPUBId: 

104 return ri_to_id(identifier, self.manifest) 

105 

106 @property 

107 def manifest(self) -> BookManifest: 

108 return self.package_document.manifest 

109 

110 @property 

111 def spine(self) -> BookSpine: 

112 return self.package_document.spine 

113 

114 @property 

115 def ncx(self) -> NCXFile | None: 

116 return self._get_ncx() 

117 

118 @property 

119 def nav(self) -> NavigationDocument | None: 

120 return self._get_nav() 

121 

122 @overload 

123 def filter[R: Resource](self, query: type[R]) -> Generator[R]: ... 

124 @overload 

125 def filter(self, query: type[Resource] = Resource) -> Generator[Resource]: ... 

126 @overload 

127 def filter( 

128 self, query: Literal[MediaType.XHTML, MediaType.IMAGE_SVG] 

129 ) -> Generator[ContentDocument]: ... 

130 @overload 

131 def filter(self, query: Literal[MediaType.NCX]) -> Generator[NCXFile]: ... 

132 @overload 

133 def filter(self, query: MediaType | Category) -> Generator[PublicationResource]: ... 

134 

135 def filter(self, query: ResourceQuery = Resource) -> Generator[Resource]: 

136 if isinstance(query, type): 

137 yield from ( 

138 resource for resource in self._resources if isinstance(resource, query) 

139 ) 

140 elif isinstance(query, Category): 

141 yield from ( 

142 resource 

143 for resource in self._resources 

144 if isinstance(resource, PublicationResource) 

145 and isinstance(resource.media_type, MediaType) 

146 and resource.media_type.category is query 

147 ) 

148 else: 

149 yield from ( 

150 resource 

151 for resource in self._resources 

152 if isinstance(resource, PublicationResource) 

153 and resource.media_type == MediaType.coalesce(query) 

154 ) 

155 

156 @overload 

157 def get[R: PublicationResource]( 

158 self, identifier: EPUBId | ManifestItem, cls: type[R] 

159 ) -> R | None: ... 

160 @overload 

161 def get( 

162 self, 

163 identifier: EPUBId | ManifestItem | SpineItemRef, 

164 cls: type[PublicationResource] = PublicationResource, 

165 ) -> PublicationResource | None: ... 

166 @overload 

167 def get[R: Resource](self, identifier: str | Path, cls: type[R]) -> R | None: ... 

168 @overload 

169 def get( 

170 self, identifier: str | Path, cls: type[Resource] = Resource 

171 ) -> Resource | None: ... 

172 

173 def get( 

174 self, identifier: ResourceIdentifier, cls: type[Resource] = Resource 

175 ) -> Resource | None: 

176 identifier = self.ri_to_filename(identifier) 

177 

178 return next( 

179 ( 

180 resource 

181 for resource in self.filter(cls) 

182 if resource.filename == identifier 

183 ), 

184 None, 

185 ) 

186 

187 @overload 

188 def __getitem__(self, identifier: slice) -> list[Resource]: ... 

189 @overload 

190 def __getitem__(self, identifier: ResourceIdentifier | int) -> Resource: ... 

191 def __getitem__(self, identifier: ResourceIdentifier | int | slice): 

192 if isinstance(identifier, (int, slice)): 

193 return self._resources[identifier] 

194 

195 resource = self.get(identifier) 

196 if resource is None: 

197 raise KeyError(identifier) 

198 

199 return resource 

200 

201 def __iter__(self) -> Generator[Resource]: 

202 yield from self._resources 

203 

204 def __len__(self) -> int: 

205 return len(self._resources) 

206 

207 def _resolve_position( 

208 self, 

209 default: int, 

210 position: int | None = None, 

211 after: Resource | None = None, 

212 before: Resource | None = None, 

213 ): 

214 if after and position is None: 

215 try: 

216 return self._resources.index(after) + 1 

217 except ValueError as error: 

218 raise EPUBError( 

219 f"resource provided as argument 'after' ('{after}') " 

220 "must be part of this epub" 

221 ) from error 

222 if before and position is None: 

223 try: 

224 return self._resources.index(before) - 1 

225 except ValueError as error: 

226 raise EPUBError( 

227 f"resource provided as argument 'before' ('{after}') " 

228 "must be part of this epub" 

229 ) from error 

230 if position: 

231 return position 

232 return default 

233 

234 @staticmethod 

235 def _should_be_manifested(resource: Resource) -> bool: 

236 return Path(resource.filename).parts[0] != "META-INF" 

237 

238 @staticmethod 

239 def _should_be_in_spine(resource: Resource) -> bool: 

240 return isinstance(resource, ContentDocument) 

241 

242 @staticmethod 

243 def _should_be_spine_linear(_resource: Resource) -> bool: 

244 return True 

245 

246 def add_to_manifest[T: Resource]( 

247 self, 

248 resource: T, 

249 media_type: MediaType | str | None = None, 

250 identifier: EPUBId | str | None = None, 

251 fallback: str | None = None, 

252 media_overlay: str | None = None, 

253 is_cover: bool = False, 

254 is_nav: bool = False, 

255 properties: list[str] | None = None, 

256 detect_properties: bool = True, 

257 exists_ok: bool = True, 

258 ) -> tuple[T, ManifestItem]: 

259 """ 

260 Add a resource to the manifest, if not already present. The 

261 resource may be promoted to a PublicationResource if needed, so 

262 the resource is returned as well. 

263 """ 

264 manifest_item = self.manifest.get(resource.filename) 

265 if manifest_item: 

266 if exists_ok: 

267 return resource, manifest_item 

268 raise EPUBError(f"Resource '{resource.filename}' already in manifest") 

269 

270 # Promoting to PublicationResource 

271 if not isinstance(resource, PublicationResource): 

272 new_resource = PublicationResource.from_resource(resource, media_type) 

273 try: 

274 index = self._resources.index(resource) 

275 self._resources[index] = new_resource 

276 except ValueError: 

277 pass 

278 

279 resource = new_resource 

280 

281 manifest_item = resource_to_manifest_item( 

282 resource, 

283 self.package_document, 

284 media_type=media_type, 

285 identifier=identifier, 

286 fallback=fallback, 

287 media_overlay=media_overlay, 

288 is_cover=is_cover, 

289 is_nav=is_nav, 

290 properties=properties, 

291 detect_properties=detect_properties, 

292 ) 

293 __ = self.manifest.add_item(manifest_item) 

294 

295 return resource, manifest_item 

296 

297 def add( 

298 self, 

299 resource: Resource, 

300 is_cover: bool = False, 

301 position: int | None = None, 

302 after: Resource | ResourceIdentifier | None = None, 

303 before: Resource | ResourceIdentifier | None = None, 

304 add_to_manifest: bool | None = None, 

305 identifier: str | EPUBId | None = None, 

306 add_to_spine: bool | None = None, 

307 spine_position: int | None = None, 

308 linear: bool | None = None, 

309 add_to_toc: bool | None = None, 

310 toc_position: int | None = None, 

311 add_to_ncx: bool | None = None, 

312 ncx_position: int | None = None, 

313 ) -> None: 

314 is_nav = isinstance(resource, NavigationDocument) 

315 

316 if not isinstance(after, Resource) and after is not None: 

317 after = self.get(after) 

318 if not isinstance(before, Resource) and before is not None: 

319 before = self.get(before) 

320 

321 position = self._resolve_position(len(self._resources), position, after, before) 

322 self._resources.insert(position, resource) 

323 

324 if add_to_manifest is False and add_to_spine: 

325 raise EPUBError("Cannot add to spine without adding to manifest") 

326 

327 if add_to_manifest is False and add_to_toc: 

328 raise EPUBError( 

329 "Cannot update navigation document without adding to manifest" 

330 ) 

331 

332 if add_to_manifest is None: 

333 add_to_manifest = add_to_spine or self._should_be_manifested(resource) 

334 

335 if add_to_spine is None: 

336 add_to_spine = add_to_manifest and self._should_be_in_spine(resource) 

337 

338 if add_to_toc is None: 

339 add_to_toc = add_to_spine 

340 

341 if add_to_ncx and not self.ncx: 

342 raise EPUBError.missing_ncx(self, "add_resource", "add_to_ncx") 

343 

344 if add_to_ncx is None: 

345 add_to_ncx = self.ncx is not None and add_to_toc 

346 

347 if ncx_position is None: 

348 ncx_position = toc_position 

349 

350 manifest_item: None | ManifestItem = None 

351 

352 if add_to_manifest: 

353 resource, manifest_item = self.add_to_manifest( 

354 resource, 

355 identifier=identifier, 

356 is_cover=is_cover, 

357 is_nav=is_nav, 

358 exists_ok=False, 

359 ) 

360 

361 if spine_position is None: 

362 spine_position = len(self.spine.items) 

363 

364 if add_to_spine: 

365 if linear is None: 

366 linear = self._should_be_spine_linear(resource) 

367 spine_item = SpineItemRef( 

368 name=manifest_item.id, 

369 linear=linear, 

370 ) 

371 __ = self.spine.insert_item(spine_position, spine_item) 

372 

373 if add_to_toc and self.nav: 

374 __ = self.nav.add_to_toc( 

375 resource.filename, 

376 resource.get_title(), 

377 position=toc_position, 

378 ) 

379 

380 if add_to_ncx and self.ncx: 

381 __ = self.ncx.add_to_nav_map( 

382 resource.filename, 

383 resource.get_title(), 

384 position=ncx_position, 

385 ) 

386 

387 def insert( 

388 self, 

389 position: int, 

390 resource: Resource, 

391 **kwargs: Unpack[AddResourceOptions], 

392 ) -> None: 

393 return self.add(resource, **kwargs, position=position) 

394 

395 def append( 

396 self, 

397 resource: Resource, 

398 **kwargs: Unpack[AddResourceOptions], 

399 ) -> None: 

400 return self.add(resource, **kwargs) 

401 

402 def remove( 

403 self, 

404 resource: ResourceIdentifier | Resource, 

405 remove_css_js_links: bool = False, 

406 ): 

407 """ 

408 Remove a resource from this EPUB. If it is a CSS or JS file, 

409 you can set the remove_css_js_links flag To remove any link 

410 from content documents to it. 

411 """ 

412 

413 if not isinstance(resource, Resource): 

414 res = self.get(resource) 

415 if res is None: 

416 raise EPUBError( 

417 f"Can't remove resource '{resource}' not in this epub ('{self}')" 

418 ) 

419 

420 resource = res 

421 

422 elif resource not in self: 

423 raise EPUBError(f"Resource '{resource}' not in EPUB") 

424 

425 if resource is self.package_document: 

426 raise EPUBError("Can't remove package document") 

427 

428 if resource is self.container_file: 

429 raise EPUBError("Can't remove container file") 

430 

431 elif self.nav: 

432 self.nav.remove(resource.filename) 

433 

434 if self.ncx and resource is not self.ncx: 

435 self.ncx.remove(resource.filename) 

436 

437 self.package_document.remove(resource.filename) 

438 self._resources.remove(resource) 

439 

440 if remove_css_js_links: 

441 if ( 

442 not isinstance(resource, PublicationResource) 

443 or isinstance(resource.media_type, str) 

444 or not (resource.media_type.is_css() or resource.media_type.is_js()) 

445 ): 

446 raise EPUBError( 

447 "Can't remove CSS and JavaScript links for file " 

448 "that is neither CSS nor JavaScript" 

449 ) 

450 

451 for res in self.filter(ContentDocument): 

452 relative_href = get_relative_href(res.filename, resource.filename) 

453 for tag in res.soup.find_all( 

454 "link", 

455 rel="stylesheet", 

456 href=relative_href, 

457 ): 

458 tag.decompose() 

459 for tag in res.soup.find_all( 

460 "script", 

461 src=relative_href, 

462 ): 

463 tag.decompose() 

464 

465 def rename( 

466 self, 

467 resource: ResourceIdentifier | Resource, 

468 new_filename: str, 

469 update_references: bool = True, 

470 reference_attrs: list[str] | None = None, 

471 ): 

472 """ 

473 Rename the resource, optionally updating references to it 

474 """ 

475 

476 if not isinstance(resource, Resource): 

477 res = self.get(resource) 

478 if res is None: 

479 raise EPUBError( 

480 f"Can't rename resource '{resource}' not in this epub ('{self}')" 

481 ) 

482 

483 resource = res 

484 

485 elif resource not in self: 

486 raise EPUBError( 

487 f"Can't rename resource '{resource}' not in this epub ('{self}')" 

488 ) 

489 

490 if resource is self.container_file: 

491 raise EPUBError("Can't rename container file") 

492 

493 if reference_attrs is None: 

494 reference_attrs = ["href", "src", "full-path", "xlink:href"] 

495 selector = ", ".join(f"[{attr.replace(':', '|')}]" for attr in reference_attrs) 

496 

497 if update_references: 

498 for other_resource in self.filter(XMLResource): 

499 if other_resource == resource: 

500 continue 

501 

502 old_ref = get_relative_href(other_resource.filename, resource.filename) 

503 new_ref = get_relative_href(other_resource.filename, new_filename) 

504 

505 for tag in other_resource.soup.select(selector): 

506 for attr in reference_attrs: 

507 value = attr_to_str(tag.get(attr)) 

508 if value is not None: 

509 if attr == "full-path": 

510 if resource.filename == value: 

511 tag[attr] = new_filename 

512 else: 

513 ref, identifier = split_fragment(value) 

514 if ref == old_ref: 

515 tag[attr] = new_ref + ( 

516 f"#{identifier}" if identifier else "" 

517 ) 

518 

519 if isinstance(resource, XMLResource): 

520 prefix = get_relative_href(new_filename, Path(resource.filename)).parent 

521 if str(prefix) != ".": 

522 soup = cast(bs4.BeautifulSoup, resource.soup) 

523 for tag in soup.select(selector): 

524 for attr in reference_attrs: 

525 value = attr_to_str(tag.get(attr)) 

526 if value is not None: 

527 ref, identifier = split_fragment(value) 

528 if ref: 

529 new_ref = str(normalize_path(prefix / ref)) 

530 tag[attr] = new_ref 

531 

532 resource.filename = new_filename 

533 

534 @overload 

535 def resolve_href[R: Resource]( 

536 self, 

537 href: str, 

538 with_tag: Literal[True], 

539 relative_to: Resource | ResourceIdentifier | None, 

540 cls: type[R], 

541 ) -> tuple[R, bs4.Tag | None] | tuple[None, None]: ... 

542 

543 @overload 

544 def resolve_href[R: Resource]( 

545 self, 

546 href: str, 

547 with_tag: Literal[False], 

548 relative_to: Resource | ResourceIdentifier | None, 

549 cls: type[R], 

550 ) -> R | None: ... 

551 

552 @overload 

553 def resolve_href( 

554 self, 

555 href: str, 

556 with_tag: Literal[True] = True, 

557 relative_to: Resource | ResourceIdentifier | None = None, 

558 cls: type[XMLResource] = XMLResource, 

559 ) -> tuple[XMLResource | None, bs4.Tag | None] | tuple[None, None]: ... 

560 

561 @overload 

562 def resolve_href( 

563 self, 

564 href: str, 

565 with_tag: Literal[False], 

566 relative_to: Resource | ResourceIdentifier | None = None, 

567 cls: type[Resource] = Resource, 

568 ) -> Resource | None: ... 

569 

570 def resolve_href( 

571 self, 

572 href: str, 

573 with_tag: bool = True, 

574 relative_to: Resource | ResourceIdentifier | None = None, 

575 cls: type[Resource] = Resource, 

576 ) -> tuple[Resource | None, bs4.Tag | None] | tuple[None, None] | Resource | None: 

577 """ 

578 Resolve an href (possibly with a fragment identifier) to a 

579 resource. Optionally return the tag of the matched fragment 

580 within that resource. 

581 """ 

582 

583 if relative_to is not None: 

584 if isinstance(relative_to, Resource): 

585 relative_to = relative_to.filename 

586 else: 

587 relative_to = self.ri_to_filename(relative_to) 

588 

589 filename = get_absolute_href(relative_to, href) 

590 else: 

591 filename = href 

592 

593 filename, identifier = split_fragment(filename) 

594 resource = self.get(filename, cls) 

595 

596 if not with_tag: 

597 return resource 

598 

599 if resource is None: 

600 return None, None 

601 

602 if not isinstance(resource, XMLResource): 

603 return resource, None 

604 

605 resource = cast(XMLResource, resource) 

606 return resource, cast( 

607 bs4.Tag, resource.soup.find(id=identifier) 

608 ) if identifier is not None else None