Coverage for privacyforms_pdf / extractor.py: 99%

223 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-06 21:14 +0100

1"""PDF Form Extractor module using pdfcpu.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import shutil 

7import subprocess 

8import tempfile 

9from contextlib import contextmanager 

10from dataclasses import dataclass 

11from pathlib import Path 

12from typing import TYPE_CHECKING, Any 

13 

14if TYPE_CHECKING: 

15 from collections.abc import Iterator, Sequence 

16 

17 

18class PDFCPUError(Exception): 

19 """Base exception for pdfcpu related errors.""" 

20 

21 pass 

22 

23 

24class PDFCPUNotFoundError(PDFCPUError): 

25 """Raised when pdfcpu is not found on the system.""" 

26 

27 pass 

28 

29 

30class PDFCPUExecutionError(PDFCPUError): 

31 """Raised when pdfcpu execution fails.""" 

32 

33 def __init__(self, message: str, returncode: int, stderr: str = "") -> None: 

34 """Initialize the error with execution details. 

35 

36 Args: 

37 message: Error message. 

38 returncode: The return code from the process. 

39 stderr: Standard error output from the process. 

40 """ 

41 super().__init__(message) 

42 self.returncode = returncode 

43 self.stderr = stderr 

44 

45 

46class PDFFormNotFoundError(PDFCPUError): 

47 """Raised when the PDF does not contain any forms.""" 

48 

49 pass 

50 

51 

52class FormValidationError(PDFCPUError): 

53 """Raised when form data validation fails.""" 

54 

55 def __init__(self, message: str, errors: list[str] | None = None) -> None: 

56 """Initialize the error with validation details. 

57 

58 Args: 

59 message: Error message. 

60 errors: List of specific validation errors. 

61 """ 

62 super().__init__(message) 

63 self.message = message 

64 self.errors = errors or [] 

65 

66 def __str__(self) -> str: # noqa: D105 

67 if self.errors: 

68 return f"{self.message}\n- " + "\n- ".join(self.errors) 

69 return self.message 

70 

71 

72class FieldNotFoundError(PDFCPUError): 

73 """Raised when a field is not found in the form.""" 

74 

75 pass 

76 

77 

78@dataclass(frozen=True) 

79class FormField: 

80 """Represents a single form field. 

81 

82 Attributes: 

83 field_type: The type of the form field (e.g., 'textfield', 'checkbox'). 

84 pages: List of pages where this field appears. 

85 id: The unique identifier of the field. 

86 name: The name of the field. 

87 value: The current value of the field. 

88 locked: Whether the field is locked. 

89 """ 

90 

91 field_type: str 

92 pages: list[int] 

93 id: str 

94 name: str 

95 value: str | bool 

96 locked: bool 

97 

98 

99@dataclass(frozen=True) 

100class PDFFormData: 

101 """Represents extracted PDF form data. 

102 

103 Attributes: 

104 source: Path to the source PDF file. 

105 pdf_version: Version of the PDF. 

106 has_form: Whether the PDF contains a form. 

107 fields: List of form fields. 

108 raw_data: The raw JSON data from pdfcpu. 

109 """ 

110 

111 source: Path 

112 pdf_version: str 

113 has_form: bool 

114 fields: list[FormField] 

115 raw_data: dict[str, Any] 

116 

117 

118class PDFFormExtractor: 

119 """Extracts form information from PDF files using pdfcpu. 

120 

121 This class provides methods to extract form data from PDF files. 

122 It wraps the pdfcpu command-line tool and provides a Pythonic interface. 

123 

124 Example: 

125 >>> extractor = PDFFormExtractor() 

126 >>> form_data = extractor.extract("form.pdf") 

127 >>> for field in form_data.fields: 

128 ... print(f"{field.name}: {field.value}") 

129 

130 Raises: 

131 PDFCPUNotFoundError: If pdfcpu is not installed on the system. 

132 """ 

133 

134 def __init__(self, pdfcpu_path: str | None = None, timeout_seconds: float = 30.0) -> None: 

135 """Initialize the extractor. 

136 

137 Args: 

138 pdfcpu_path: Optional path to the pdfcpu executable. 

139 If not provided, searches in system PATH. 

140 timeout_seconds: Timeout for pdfcpu command execution. 

141 

142 Raises: 

143 PDFCPUNotFoundError: If pdfcpu is not found on the system. 

144 """ 

145 resolved_path = pdfcpu_path or self._find_pdfcpu() 

146 if not resolved_path: 

147 raise PDFCPUNotFoundError( 

148 "pdfcpu not found. Please install pdfcpu: https://pdfcpu.io/install" 

149 ) 

150 self._pdfcpu_path: str = resolved_path 

151 self._timeout_seconds: float = timeout_seconds 

152 

153 @staticmethod 

154 def _find_pdfcpu() -> str | None: 

155 """Find the pdfcpu executable in the system PATH. 

156 

157 Returns: 

158 Path to pdfcpu executable, or None if not found. 

159 """ 

160 pdfcpu = shutil.which("pdfcpu") 

161 return pdfcpu 

162 

163 def _run_command( 

164 self, args: Sequence[str], check: bool = True 

165 ) -> subprocess.CompletedProcess[str]: 

166 """Run a pdfcpu command. 

167 

168 Args: 

169 args: Command arguments. 

170 check: Whether to check the return code. 

171 

172 Returns: 

173 The completed process. 

174 

175 Raises: 

176 PDFCPUExecutionError: If the command fails. 

177 PDFCPUNotFoundError: If pdfcpu is not found. 

178 """ 

179 cmd: list[str] = [self._pdfcpu_path, *args] 

180 try: 

181 result: subprocess.CompletedProcess[str] = subprocess.run( 

182 cmd, 

183 capture_output=True, 

184 text=True, 

185 check=False, 

186 timeout=self._timeout_seconds, 

187 ) 

188 if check and result.returncode != 0: 

189 stderr_msg = self._sanitize_stderr(result.stderr) 

190 raise PDFCPUExecutionError( 

191 "pdfcpu command failed", 

192 result.returncode, 

193 stderr_msg, 

194 ) 

195 return result 

196 except FileNotFoundError as e: 

197 raise PDFCPUNotFoundError(f"pdfcpu not found at {self._pdfcpu_path}") from e 

198 except subprocess.TimeoutExpired as e: 

199 raise PDFCPUExecutionError( 

200 f"pdfcpu command timed out after {self._timeout_seconds:.1f}s", 

201 -1, 

202 self._sanitize_stderr(e.stderr), 

203 ) from e 

204 

205 @staticmethod 

206 def _sanitize_stderr(stderr: str | bytes | None) -> str: 

207 """Return a bounded stderr string suitable for end-user messages.""" 

208 if stderr is None: 

209 return "" 

210 text = stderr.decode("utf-8", errors="replace") if isinstance(stderr, bytes) else stderr 

211 return text.strip()[:500] 

212 

213 @contextmanager 

214 def _temporary_json_path(self) -> Iterator[Path]: 

215 """Create a temporary JSON path and ensure cleanup.""" 

216 with tempfile.TemporaryDirectory(prefix="privacyforms_pdf_") as tmp_dir: 

217 yield Path(tmp_dir) / "form.json" 

218 

219 def check_pdfcpu(self) -> bool: 

220 """Check if pdfcpu is available and working. 

221 

222 Returns: 

223 True if pdfcpu is available, False otherwise. 

224 """ 

225 try: 

226 result = self._run_command(["version"], check=False) 

227 return result.returncode == 0 and "pdfcpu" in result.stdout 

228 except PDFCPUError: 

229 return False 

230 

231 def get_pdfcpu_version(self) -> str: 

232 """Get the installed pdfcpu version. 

233 

234 Returns: 

235 The version string of pdfcpu. 

236 

237 Raises: 

238 PDFCPUExecutionError: If the version command fails. 

239 """ 

240 result = self._run_command(["version"]) 

241 return result.stdout.strip() 

242 

243 def has_form(self, pdf_path: str | Path) -> bool: 

244 """Check if a PDF contains a form. 

245 

246 Args: 

247 pdf_path: Path to the PDF file. 

248 

249 Returns: 

250 True if the PDF contains a form, False otherwise. 

251 

252 Raises: 

253 PDFCPUExecutionError: If the pdfcpu command fails. 

254 """ 

255 pdf_path = Path(pdf_path) 

256 self._validate_pdf_path(pdf_path) 

257 

258 result = self._run_command(["info", str(pdf_path)], check=False) 

259 if result.returncode != 0: 

260 raise PDFCPUExecutionError( 

261 f"Failed to get PDF info for {pdf_path}", 

262 result.returncode, 

263 result.stderr, 

264 ) 

265 

266 return "Form: Yes" in result.stdout 

267 

268 def extract(self, pdf_path: str | Path) -> PDFFormData: 

269 """Extract form data from a PDF file. 

270 

271 This method exports the form data from the PDF using pdfcpu and 

272 parses it into a structured format. 

273 

274 Args: 

275 pdf_path: Path to the PDF file. 

276 

277 Returns: 

278 PDFFormData containing all form information. 

279 

280 Raises: 

281 FileNotFoundError: If the PDF file does not exist. 

282 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

283 PDFFormNotFoundError: If the PDF does not contain a form. 

284 """ 

285 pdf_path = Path(pdf_path) 

286 self._validate_pdf_path(pdf_path) 

287 

288 # Check if PDF has a form 

289 if not self.has_form(pdf_path): 

290 raise PDFFormNotFoundError(f"PDF does not contain a form: {pdf_path}") 

291 

292 with self._temporary_json_path() as tmp_path: 

293 # Export form data using pdfcpu 

294 result = self._run_command( 

295 ["form", "export", str(pdf_path), str(tmp_path)], 

296 check=False, 

297 ) 

298 if result.returncode != 0: 

299 raise PDFCPUExecutionError( 

300 f"Failed to export form data from {pdf_path}", 

301 result.returncode, 

302 self._sanitize_stderr(result.stderr), 

303 ) 

304 

305 # Read and parse the exported JSON 

306 with open(tmp_path, encoding="utf-8") as f: 

307 raw_data: dict[str, Any] = json.load(f) 

308 

309 return self._parse_form_data(pdf_path, raw_data) 

310 

311 def extract_to_json(self, pdf_path: str | Path, output_path: str | Path) -> None: 

312 """Extract form data and save it to a JSON file. 

313 

314 Args: 

315 pdf_path: Path to the PDF file. 

316 output_path: Path where the JSON output should be saved. 

317 

318 Raises: 

319 FileNotFoundError: If the PDF file does not exist. 

320 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

321 """ 

322 pdf_path = Path(pdf_path) 

323 output_path = Path(output_path) 

324 self._validate_pdf_path(pdf_path) 

325 

326 result = self._run_command( 

327 ["form", "export", str(pdf_path), str(output_path)], 

328 check=False, 

329 ) 

330 if result.returncode != 0: 

331 raise PDFCPUExecutionError( 

332 f"Failed to export form data from {pdf_path}", 

333 result.returncode, 

334 self._sanitize_stderr(result.stderr), 

335 ) 

336 

337 def list_fields(self, pdf_path: str | Path) -> list[FormField]: 

338 """List all form fields in a PDF. 

339 

340 Args: 

341 pdf_path: Path to the PDF file. 

342 

343 Returns: 

344 List of FormField objects. 

345 

346 Raises: 

347 FileNotFoundError: If the PDF file does not exist. 

348 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

349 """ 

350 form_data = self.extract(pdf_path) 

351 return form_data.fields 

352 

353 def get_field_value(self, pdf_path: str | Path, field_name: str) -> str | bool | None: 

354 """Get the value of a specific form field. 

355 

356 Args: 

357 pdf_path: Path to the PDF file. 

358 field_name: Name of the field to retrieve. 

359 

360 Returns: 

361 The field value, or None if the field is not found. 

362 

363 Raises: 

364 FileNotFoundError: If the PDF file does not exist. 

365 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

366 """ 

367 fields = self.list_fields(pdf_path) 

368 for field in fields: 

369 if field.name == field_name: 369 ↛ 368line 369 didn't jump to line 368 because the condition on line 369 was always true

370 return field.value 

371 return None 

372 

373 def get_field_by_id(self, pdf_path: str | Path, field_id: str) -> FormField | None: 

374 """Get a form field by its ID. 

375 

376 Args: 

377 pdf_path: Path to the PDF file. 

378 field_id: ID of the field to retrieve. 

379 

380 Returns: 

381 The FormField object, or None if the field is not found. 

382 

383 Raises: 

384 FileNotFoundError: If the PDF file does not exist. 

385 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

386 """ 

387 fields = self.list_fields(pdf_path) 

388 for field in fields: 

389 if field.id == field_id: 389 ↛ 388line 389 didn't jump to line 388 because the condition on line 389 was always true

390 return field 

391 return None 

392 

393 def get_field_by_name(self, pdf_path: str | Path, field_name: str) -> FormField | None: 

394 """Get a form field by its name. 

395 

396 Args: 

397 pdf_path: Path to the PDF file. 

398 field_name: Name of the field to retrieve. 

399 

400 Returns: 

401 The FormField object, or None if the field is not found. 

402 

403 Raises: 

404 FileNotFoundError: If the PDF file does not exist. 

405 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

406 """ 

407 fields = self.list_fields(pdf_path) 

408 for field in fields: 

409 if field.name == field_name: 409 ↛ 408line 409 didn't jump to line 408 because the condition on line 409 was always true

410 return field 

411 return None 

412 

413 def validate_form_data( 

414 self, 

415 pdf_path: str | Path, 

416 form_data: dict[str, Any], 

417 *, 

418 strict: bool = False, 

419 allow_extra_fields: bool = False, 

420 ) -> list[str]: 

421 """Validate form data against PDF form fields. 

422 

423 This method validates that the provided form data (simple key:value format) 

424 matches the structure and field types of the PDF form. 

425 

426 Args: 

427 pdf_path: Path to the PDF file. 

428 form_data: The form data to validate (simple format: {"Field Name": value}). 

429 strict: If True, also checks that all form fields are provided. 

430 allow_extra_fields: If True, allows fields not present in the form. 

431 

432 Returns: 

433 List of validation error messages (empty if valid). 

434 

435 Raises: 

436 FileNotFoundError: If the PDF file does not exist. 

437 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

438 """ 

439 pdf_path = Path(pdf_path) 

440 self._validate_pdf_path(pdf_path) 

441 

442 errors: list[str] = [] 

443 

444 # Get the form fields from the PDF 

445 try: 

446 form_data_obj = self.extract(pdf_path) 

447 except PDFFormNotFoundError: 

448 return ["PDF does not contain a form"] 

449 

450 # Build lookup by name 

451 fields_by_name = {f.name: f for f in form_data_obj.fields} 

452 

453 # Validate each input field 

454 if not allow_extra_fields: 

455 for field_name, value in form_data.items(): 

456 if field_name not in fields_by_name: 

457 errors.append(f"Field not found in form: '{field_name}'") 

458 continue 

459 

460 field = fields_by_name[field_name] 

461 

462 # Validate value type matches field type 

463 if field.field_type == "checkbox" and not isinstance(value, bool): 

464 errors.append( 

465 f"Field '{field_name}': checkbox value must be boolean, " 

466 f"got {type(value).__name__}" 

467 ) 

468 

469 # In strict mode, check all form fields are provided 

470 if strict: 

471 provided_names = set(form_data.keys()) 

472 for field in form_data_obj.fields: 

473 if field.name not in provided_names: 

474 errors.append(f"Required field not provided: '{field.name}'") 

475 

476 return errors 

477 

478 def _convert_to_pdfcpu_format( 

479 self, pdf_path: str | Path, form_data: dict[str, Any] 

480 ) -> dict[str, Any]: 

481 """Convert simple key:value format to pdfcpu export format. 

482 

483 This is an internal helper method used by fill_form. 

484 

485 Args: 

486 pdf_path: Path to the PDF file. 

487 form_data: Simple format data {"Field Name": value}. 

488 

489 Returns: 

490 pdfcpu format data structure. 

491 

492 Raises: 

493 FileNotFoundError: If the PDF file does not exist. 

494 PDFFormNotFoundError: If the PDF does not contain a form. 

495 PDFCPUExecutionError: If pdfcpu fails to process the PDF. 

496 """ 

497 # Get form fields to determine field types 

498 form_data_obj = self.extract(pdf_path) 

499 

500 # Build lookup by field name 

501 fields_by_name = {f.name: f for f in form_data_obj.fields} 

502 

503 # Initialize pdfcpu format structure 

504 pdfcpu_data: dict[str, Any] = { 

505 "header": { 

506 "source": str(pdf_path), 

507 "version": "pdfcpu", 

508 }, 

509 "forms": [ 

510 { 

511 "textfield": [], 

512 "datefield": [], 

513 "checkbox": [], 

514 "radiobuttongroup": [], 

515 "combobox": [], 

516 "listbox": [], 

517 } 

518 ], 

519 } 

520 

521 form = pdfcpu_data["forms"][0] 

522 

523 for field_name, value in form_data.items(): 

524 field = fields_by_name.get(field_name) 

525 if not field: 

526 continue # Skip unknown fields, validation will catch them 

527 

528 field_entry = { 

529 "pages": field.pages, 

530 "id": field.id, 

531 "name": field.name, 

532 "value": value, 

533 "locked": False, 

534 } 

535 

536 # Add type-specific attributes 

537 if field.field_type == "datefield": 

538 field_entry["format"] = "yyyy-m-d" # Default format 

539 

540 if field.field_type == "radiobuttongroup": 

541 field_entry["options"] = getattr(field, "options", []) 

542 

543 # Add to appropriate list 

544 form[field.field_type].append(field_entry) 

545 

546 return pdfcpu_data 

547 

548 def fill_form( 

549 self, 

550 pdf_path: str | Path, 

551 form_data: dict[str, Any], 

552 output_path: str | Path | None = None, 

553 *, 

554 validate: bool = True, 

555 ) -> Path: 

556 """Fill a PDF form with data. 

557 

558 This method accepts form data in simple key:value format where keys are 

559 field names and values are the values to fill. 

560 

561 Args: 

562 pdf_path: Path to the PDF file containing the form. 

563 form_data: The form data to fill (format: {"Field Name": value}). 

564 output_path: Optional output path. If not provided, the input PDF 

565 is modified in place (pdfcpu default behavior). 

566 validate: If True, validates form data before filling. 

567 

568 Returns: 

569 Path to the filled PDF (output_path or pdf_path if no output specified). 

570 

571 Raises: 

572 FileNotFoundError: If the PDF file does not exist. 

573 FormValidationError: If validation fails and validate=True. 

574 PDFCPUExecutionError: If pdfcpu fails to fill the form. 

575 PDFFormNotFoundError: If the PDF does not contain a form. 

576 

577 Example: 

578 >>> form_data = {"Candidate Name": "John Smith", "Full time": True} 

579 >>> extractor.fill_form("form.pdf", form_data, "filled.pdf") 

580 """ 

581 pdf_path = Path(pdf_path) 

582 self._validate_pdf_path(pdf_path) 

583 

584 # Check if PDF has a form 

585 if not self.has_form(pdf_path): 

586 raise PDFFormNotFoundError(f"PDF does not contain a form: {pdf_path}") 

587 

588 # Validate form data if requested 

589 if validate: 

590 errors = self.validate_form_data(pdf_path, form_data) 

591 if errors: 

592 raise FormValidationError("Form data validation failed", errors) 

593 

594 # Convert simple format to pdfcpu format for the fill command 

595 pdfcpu_data = self._convert_to_pdfcpu_format(pdf_path, form_data) 

596 

597 with self._temporary_json_path() as tmp_path: 

598 # Write form data to temporary JSON file 

599 with open(tmp_path, "w", encoding="utf-8") as f: 

600 json.dump(pdfcpu_data, f, indent=2) 

601 

602 # Build command arguments 

603 args = ["form", "fill", str(pdf_path), str(tmp_path)] 

604 if output_path: 

605 args.append(str(output_path)) 

606 

607 # Execute fill command 

608 result = self._run_command(args, check=False) 

609 if result.returncode != 0: 

610 raise PDFCPUExecutionError( 

611 f"Failed to fill form in {pdf_path}", 

612 result.returncode, 

613 self._sanitize_stderr(result.stderr), 

614 ) 

615 

616 return Path(output_path) if output_path else pdf_path 

617 

618 def fill_form_from_json( 

619 self, 

620 pdf_path: str | Path, 

621 json_path: str | Path, 

622 output_path: str | Path | None = None, 

623 *, 

624 validate: bool = True, 

625 ) -> Path: 

626 """Fill a PDF form with data from a JSON file. 

627 

628 The JSON file should contain simple key:value pairs where keys are 

629 field names and values are the values to fill. 

630 

631 Args: 

632 pdf_path: Path to the PDF file containing the form. 

633 json_path: Path to the JSON file with form data. 

634 output_path: Optional output path. If not provided, the input PDF 

635 is modified in place. 

636 validate: If True, validates form data before filling. 

637 

638 Returns: 

639 Path to the filled PDF. 

640 

641 Raises: 

642 FileNotFoundError: If any file does not exist. 

643 FormValidationError: If validation fails and validate=True. 

644 PDFCPUExecutionError: If pdfcpu fails to fill the form. 

645 """ 

646 pdf_path = Path(pdf_path) 

647 json_path = Path(json_path) 

648 

649 self._validate_pdf_path(pdf_path) 

650 if not json_path.exists(): 

651 raise FileNotFoundError(f"JSON file not found: {json_path}") 

652 if not json_path.is_file(): 

653 raise FileNotFoundError(f"Path is not a file: {json_path}") 

654 

655 # Read and parse JSON 

656 with open(json_path, encoding="utf-8") as f: 

657 form_data: dict[str, Any] = json.load(f) 

658 

659 return self.fill_form(pdf_path, form_data, output_path, validate=validate) 

660 

661 def _validate_pdf_path(self, pdf_path: Path) -> None: 

662 """Validate that the PDF path exists and is a file. 

663 

664 Args: 

665 pdf_path: Path to validate. 

666 

667 Raises: 

668 FileNotFoundError: If the path does not exist or is not a file. 

669 """ 

670 if not pdf_path.exists(): 

671 raise FileNotFoundError(f"PDF file not found: {pdf_path}") 

672 if not pdf_path.is_file(): 

673 raise FileNotFoundError(f"Path is not a file: {pdf_path}") 

674 

675 def _parse_form_data(self, pdf_path: Path, raw_data: dict[str, Any]) -> PDFFormData: 

676 """Parse raw form data from pdfcpu into structured format. 

677 

678 Args: 

679 pdf_path: Path to the PDF file. 

680 raw_data: Raw JSON data from pdfcpu. 

681 

682 Returns: 

683 Parsed PDFFormData object. 

684 """ 

685 header = raw_data.get("header", {}) 

686 pdf_version = header.get("version", "unknown") 

687 

688 fields: list[FormField] = [] 

689 

690 forms = raw_data.get("forms", []) 

691 if forms: 

692 form = forms[0] 

693 

694 # Process each field type 

695 field_types = [ 

696 "textfield", 

697 "datefield", 

698 "checkbox", 

699 "radiobuttongroup", 

700 "combobox", 

701 "listbox", 

702 ] 

703 

704 for field_type in field_types: 

705 for field_data in form.get(field_type, []): 

706 field = FormField( 

707 field_type=field_type, 

708 pages=field_data.get("pages", []), 

709 id=str(field_data.get("id", "")), 

710 name=field_data.get("name", ""), 

711 value=field_data.get("value", ""), 

712 locked=field_data.get("locked", False), 

713 ) 

714 fields.append(field) 

715 

716 return PDFFormData( 

717 source=pdf_path, 

718 pdf_version=pdf_version, 

719 has_form=len(fields) > 0, 

720 fields=fields, 

721 raw_data=raw_data, 

722 )