Coverage for privacyforms_pdf / extractor.py: 99%
223 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-06 21:04 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-06 21:04 +0100
1"""PDF Form Extractor module using pdfcpu."""
3from __future__ import annotations
5import json
6import shutil
7import subprocess
8import tempfile
9from contextlib import contextmanager
10from dataclasses import dataclass
11from pathlib import Path
12from typing import TYPE_CHECKING, Any
14if TYPE_CHECKING:
15 from collections.abc import Iterator, Sequence
18class PDFCPUError(Exception):
19 """Base exception for pdfcpu related errors."""
21 pass
24class PDFCPUNotFoundError(PDFCPUError):
25 """Raised when pdfcpu is not found on the system."""
27 pass
30class PDFCPUExecutionError(PDFCPUError):
31 """Raised when pdfcpu execution fails."""
33 def __init__(self, message: str, returncode: int, stderr: str = "") -> None:
34 """Initialize the error with execution details.
36 Args:
37 message: Error message.
38 returncode: The return code from the process.
39 stderr: Standard error output from the process.
40 """
41 super().__init__(message)
42 self.returncode = returncode
43 self.stderr = stderr
46class PDFFormNotFoundError(PDFCPUError):
47 """Raised when the PDF does not contain any forms."""
49 pass
52class FormValidationError(PDFCPUError):
53 """Raised when form data validation fails."""
55 def __init__(self, message: str, errors: list[str] | None = None) -> None:
56 """Initialize the error with validation details.
58 Args:
59 message: Error message.
60 errors: List of specific validation errors.
61 """
62 super().__init__(message)
63 self.message = message
64 self.errors = errors or []
66 def __str__(self) -> str: # noqa: D105
67 if self.errors:
68 return f"{self.message}\n- " + "\n- ".join(self.errors)
69 return self.message
72class FieldNotFoundError(PDFCPUError):
73 """Raised when a field is not found in the form."""
75 pass
78@dataclass(frozen=True)
79class FormField:
80 """Represents a single form field.
82 Attributes:
83 field_type: The type of the form field (e.g., 'textfield', 'checkbox').
84 pages: List of pages where this field appears.
85 id: The unique identifier of the field.
86 name: The name of the field.
87 value: The current value of the field.
88 locked: Whether the field is locked.
89 """
91 field_type: str
92 pages: list[int]
93 id: str
94 name: str
95 value: str | bool
96 locked: bool
99@dataclass(frozen=True)
100class PDFFormData:
101 """Represents extracted PDF form data.
103 Attributes:
104 source: Path to the source PDF file.
105 pdf_version: Version of the PDF.
106 has_form: Whether the PDF contains a form.
107 fields: List of form fields.
108 raw_data: The raw JSON data from pdfcpu.
109 """
111 source: Path
112 pdf_version: str
113 has_form: bool
114 fields: list[FormField]
115 raw_data: dict[str, Any]
118class PDFFormExtractor:
119 """Extracts form information from PDF files using pdfcpu.
121 This class provides methods to extract form data from PDF files.
122 It wraps the pdfcpu command-line tool and provides a Pythonic interface.
124 Example:
125 >>> extractor = PDFFormExtractor()
126 >>> form_data = extractor.extract("form.pdf")
127 >>> for field in form_data.fields:
128 ... print(f"{field.name}: {field.value}")
130 Raises:
131 PDFCPUNotFoundError: If pdfcpu is not installed on the system.
132 """
134 def __init__(self, pdfcpu_path: str | None = None, timeout_seconds: float = 30.0) -> None:
135 """Initialize the extractor.
137 Args:
138 pdfcpu_path: Optional path to the pdfcpu executable.
139 If not provided, searches in system PATH.
140 timeout_seconds: Timeout for pdfcpu command execution.
142 Raises:
143 PDFCPUNotFoundError: If pdfcpu is not found on the system.
144 """
145 resolved_path = pdfcpu_path or self._find_pdfcpu()
146 if not resolved_path:
147 raise PDFCPUNotFoundError(
148 "pdfcpu not found. Please install pdfcpu: https://pdfcpu.io/install"
149 )
150 self._pdfcpu_path: str = resolved_path
151 self._timeout_seconds: float = timeout_seconds
153 @staticmethod
154 def _find_pdfcpu() -> str | None:
155 """Find the pdfcpu executable in the system PATH.
157 Returns:
158 Path to pdfcpu executable, or None if not found.
159 """
160 pdfcpu = shutil.which("pdfcpu")
161 return pdfcpu
163 def _run_command(
164 self, args: Sequence[str], check: bool = True
165 ) -> subprocess.CompletedProcess[str]:
166 """Run a pdfcpu command.
168 Args:
169 args: Command arguments.
170 check: Whether to check the return code.
172 Returns:
173 The completed process.
175 Raises:
176 PDFCPUExecutionError: If the command fails.
177 PDFCPUNotFoundError: If pdfcpu is not found.
178 """
179 cmd: list[str] = [self._pdfcpu_path, *args]
180 try:
181 result: subprocess.CompletedProcess[str] = subprocess.run(
182 cmd,
183 capture_output=True,
184 text=True,
185 check=False,
186 timeout=self._timeout_seconds,
187 )
188 if check and result.returncode != 0:
189 stderr_msg = self._sanitize_stderr(result.stderr)
190 raise PDFCPUExecutionError(
191 "pdfcpu command failed",
192 result.returncode,
193 stderr_msg,
194 )
195 return result
196 except FileNotFoundError as e:
197 raise PDFCPUNotFoundError(f"pdfcpu not found at {self._pdfcpu_path}") from e
198 except subprocess.TimeoutExpired as e:
199 raise PDFCPUExecutionError(
200 f"pdfcpu command timed out after {self._timeout_seconds:.1f}s",
201 -1,
202 self._sanitize_stderr(e.stderr),
203 ) from e
205 @staticmethod
206 def _sanitize_stderr(stderr: str | bytes | None) -> str:
207 """Return a bounded stderr string suitable for end-user messages."""
208 if stderr is None:
209 return ""
210 text = stderr.decode("utf-8", errors="replace") if isinstance(stderr, bytes) else stderr
211 return text.strip()[:500]
213 @contextmanager
214 def _temporary_json_path(self) -> Iterator[Path]:
215 """Create a temporary JSON path and ensure cleanup."""
216 with tempfile.TemporaryDirectory(prefix="privacyforms_pdf_") as tmp_dir:
217 yield Path(tmp_dir) / "form.json"
219 def check_pdfcpu(self) -> bool:
220 """Check if pdfcpu is available and working.
222 Returns:
223 True if pdfcpu is available, False otherwise.
224 """
225 try:
226 result = self._run_command(["version"], check=False)
227 return result.returncode == 0 and "pdfcpu" in result.stdout
228 except PDFCPUError:
229 return False
231 def get_pdfcpu_version(self) -> str:
232 """Get the installed pdfcpu version.
234 Returns:
235 The version string of pdfcpu.
237 Raises:
238 PDFCPUExecutionError: If the version command fails.
239 """
240 result = self._run_command(["version"])
241 return result.stdout.strip()
243 def has_form(self, pdf_path: str | Path) -> bool:
244 """Check if a PDF contains a form.
246 Args:
247 pdf_path: Path to the PDF file.
249 Returns:
250 True if the PDF contains a form, False otherwise.
252 Raises:
253 PDFCPUExecutionError: If the pdfcpu command fails.
254 """
255 pdf_path = Path(pdf_path)
256 self._validate_pdf_path(pdf_path)
258 result = self._run_command(["info", str(pdf_path)], check=False)
259 if result.returncode != 0:
260 raise PDFCPUExecutionError(
261 f"Failed to get PDF info for {pdf_path}",
262 result.returncode,
263 result.stderr,
264 )
266 return "Form: Yes" in result.stdout
268 def extract(self, pdf_path: str | Path) -> PDFFormData:
269 """Extract form data from a PDF file.
271 This method exports the form data from the PDF using pdfcpu and
272 parses it into a structured format.
274 Args:
275 pdf_path: Path to the PDF file.
277 Returns:
278 PDFFormData containing all form information.
280 Raises:
281 FileNotFoundError: If the PDF file does not exist.
282 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
283 PDFFormNotFoundError: If the PDF does not contain a form.
284 """
285 pdf_path = Path(pdf_path)
286 self._validate_pdf_path(pdf_path)
288 # Check if PDF has a form
289 if not self.has_form(pdf_path):
290 raise PDFFormNotFoundError(f"PDF does not contain a form: {pdf_path}")
292 with self._temporary_json_path() as tmp_path:
293 # Export form data using pdfcpu
294 result = self._run_command(
295 ["form", "export", str(pdf_path), str(tmp_path)],
296 check=False,
297 )
298 if result.returncode != 0:
299 raise PDFCPUExecutionError(
300 f"Failed to export form data from {pdf_path}",
301 result.returncode,
302 self._sanitize_stderr(result.stderr),
303 )
305 # Read and parse the exported JSON
306 with open(tmp_path, encoding="utf-8") as f:
307 raw_data: dict[str, Any] = json.load(f)
309 return self._parse_form_data(pdf_path, raw_data)
311 def extract_to_json(self, pdf_path: str | Path, output_path: str | Path) -> None:
312 """Extract form data and save it to a JSON file.
314 Args:
315 pdf_path: Path to the PDF file.
316 output_path: Path where the JSON output should be saved.
318 Raises:
319 FileNotFoundError: If the PDF file does not exist.
320 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
321 """
322 pdf_path = Path(pdf_path)
323 output_path = Path(output_path)
324 self._validate_pdf_path(pdf_path)
326 result = self._run_command(
327 ["form", "export", str(pdf_path), str(output_path)],
328 check=False,
329 )
330 if result.returncode != 0:
331 raise PDFCPUExecutionError(
332 f"Failed to export form data from {pdf_path}",
333 result.returncode,
334 self._sanitize_stderr(result.stderr),
335 )
337 def list_fields(self, pdf_path: str | Path) -> list[FormField]:
338 """List all form fields in a PDF.
340 Args:
341 pdf_path: Path to the PDF file.
343 Returns:
344 List of FormField objects.
346 Raises:
347 FileNotFoundError: If the PDF file does not exist.
348 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
349 """
350 form_data = self.extract(pdf_path)
351 return form_data.fields
353 def get_field_value(self, pdf_path: str | Path, field_name: str) -> str | bool | None:
354 """Get the value of a specific form field.
356 Args:
357 pdf_path: Path to the PDF file.
358 field_name: Name of the field to retrieve.
360 Returns:
361 The field value, or None if the field is not found.
363 Raises:
364 FileNotFoundError: If the PDF file does not exist.
365 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
366 """
367 fields = self.list_fields(pdf_path)
368 for field in fields:
369 if field.name == field_name: 369 ↛ 368line 369 didn't jump to line 368 because the condition on line 369 was always true
370 return field.value
371 return None
373 def get_field_by_id(self, pdf_path: str | Path, field_id: str) -> FormField | None:
374 """Get a form field by its ID.
376 Args:
377 pdf_path: Path to the PDF file.
378 field_id: ID of the field to retrieve.
380 Returns:
381 The FormField object, or None if the field is not found.
383 Raises:
384 FileNotFoundError: If the PDF file does not exist.
385 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
386 """
387 fields = self.list_fields(pdf_path)
388 for field in fields:
389 if field.id == field_id: 389 ↛ 388line 389 didn't jump to line 388 because the condition on line 389 was always true
390 return field
391 return None
393 def get_field_by_name(self, pdf_path: str | Path, field_name: str) -> FormField | None:
394 """Get a form field by its name.
396 Args:
397 pdf_path: Path to the PDF file.
398 field_name: Name of the field to retrieve.
400 Returns:
401 The FormField object, or None if the field is not found.
403 Raises:
404 FileNotFoundError: If the PDF file does not exist.
405 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
406 """
407 fields = self.list_fields(pdf_path)
408 for field in fields:
409 if field.name == field_name: 409 ↛ 408line 409 didn't jump to line 408 because the condition on line 409 was always true
410 return field
411 return None
413 def validate_form_data(
414 self,
415 pdf_path: str | Path,
416 form_data: dict[str, Any],
417 *,
418 strict: bool = False,
419 allow_extra_fields: bool = False,
420 ) -> list[str]:
421 """Validate form data against PDF form fields.
423 This method validates that the provided form data (simple key:value format)
424 matches the structure and field types of the PDF form.
426 Args:
427 pdf_path: Path to the PDF file.
428 form_data: The form data to validate (simple format: {"Field Name": value}).
429 strict: If True, also checks that all form fields are provided.
430 allow_extra_fields: If True, allows fields not present in the form.
432 Returns:
433 List of validation error messages (empty if valid).
435 Raises:
436 FileNotFoundError: If the PDF file does not exist.
437 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
438 """
439 pdf_path = Path(pdf_path)
440 self._validate_pdf_path(pdf_path)
442 errors: list[str] = []
444 # Get the form fields from the PDF
445 try:
446 form_data_obj = self.extract(pdf_path)
447 except PDFFormNotFoundError:
448 return ["PDF does not contain a form"]
450 # Build lookup by name
451 fields_by_name = {f.name: f for f in form_data_obj.fields}
453 # Validate each input field
454 if not allow_extra_fields:
455 for field_name, value in form_data.items():
456 if field_name not in fields_by_name:
457 errors.append(f"Field not found in form: '{field_name}'")
458 continue
460 field = fields_by_name[field_name]
462 # Validate value type matches field type
463 if field.field_type == "checkbox" and not isinstance(value, bool):
464 errors.append(
465 f"Field '{field_name}': checkbox value must be boolean, "
466 f"got {type(value).__name__}"
467 )
469 # In strict mode, check all form fields are provided
470 if strict:
471 provided_names = set(form_data.keys())
472 for field in form_data_obj.fields:
473 if field.name not in provided_names:
474 errors.append(f"Required field not provided: '{field.name}'")
476 return errors
478 def _convert_to_pdfcpu_format(
479 self, pdf_path: str | Path, form_data: dict[str, Any]
480 ) -> dict[str, Any]:
481 """Convert simple key:value format to pdfcpu export format.
483 This is an internal helper method used by fill_form.
485 Args:
486 pdf_path: Path to the PDF file.
487 form_data: Simple format data {"Field Name": value}.
489 Returns:
490 pdfcpu format data structure.
492 Raises:
493 FileNotFoundError: If the PDF file does not exist.
494 PDFFormNotFoundError: If the PDF does not contain a form.
495 PDFCPUExecutionError: If pdfcpu fails to process the PDF.
496 """
497 # Get form fields to determine field types
498 form_data_obj = self.extract(pdf_path)
500 # Build lookup by field name
501 fields_by_name = {f.name: f for f in form_data_obj.fields}
503 # Initialize pdfcpu format structure
504 pdfcpu_data: dict[str, Any] = {
505 "header": {
506 "source": str(pdf_path),
507 "version": "pdfcpu",
508 },
509 "forms": [
510 {
511 "textfield": [],
512 "datefield": [],
513 "checkbox": [],
514 "radiobuttongroup": [],
515 "combobox": [],
516 "listbox": [],
517 }
518 ],
519 }
521 form = pdfcpu_data["forms"][0]
523 for field_name, value in form_data.items():
524 field = fields_by_name.get(field_name)
525 if not field:
526 continue # Skip unknown fields, validation will catch them
528 field_entry = {
529 "pages": field.pages,
530 "id": field.id,
531 "name": field.name,
532 "value": value,
533 "locked": False,
534 }
536 # Add type-specific attributes
537 if field.field_type == "datefield":
538 field_entry["format"] = "yyyy-m-d" # Default format
540 if field.field_type == "radiobuttongroup":
541 field_entry["options"] = getattr(field, "options", [])
543 # Add to appropriate list
544 form[field.field_type].append(field_entry)
546 return pdfcpu_data
548 def fill_form(
549 self,
550 pdf_path: str | Path,
551 form_data: dict[str, Any],
552 output_path: str | Path | None = None,
553 *,
554 validate: bool = True,
555 ) -> Path:
556 """Fill a PDF form with data.
558 This method accepts form data in simple key:value format where keys are
559 field names and values are the values to fill.
561 Args:
562 pdf_path: Path to the PDF file containing the form.
563 form_data: The form data to fill (format: {"Field Name": value}).
564 output_path: Optional output path. If not provided, the input PDF
565 is modified in place (pdfcpu default behavior).
566 validate: If True, validates form data before filling.
568 Returns:
569 Path to the filled PDF (output_path or pdf_path if no output specified).
571 Raises:
572 FileNotFoundError: If the PDF file does not exist.
573 FormValidationError: If validation fails and validate=True.
574 PDFCPUExecutionError: If pdfcpu fails to fill the form.
575 PDFFormNotFoundError: If the PDF does not contain a form.
577 Example:
578 >>> form_data = {"Candidate Name": "John Smith", "Full time": True}
579 >>> extractor.fill_form("form.pdf", form_data, "filled.pdf")
580 """
581 pdf_path = Path(pdf_path)
582 self._validate_pdf_path(pdf_path)
584 # Check if PDF has a form
585 if not self.has_form(pdf_path):
586 raise PDFFormNotFoundError(f"PDF does not contain a form: {pdf_path}")
588 # Validate form data if requested
589 if validate:
590 errors = self.validate_form_data(pdf_path, form_data)
591 if errors:
592 raise FormValidationError("Form data validation failed", errors)
594 # Convert simple format to pdfcpu format for the fill command
595 pdfcpu_data = self._convert_to_pdfcpu_format(pdf_path, form_data)
597 with self._temporary_json_path() as tmp_path:
598 # Write form data to temporary JSON file
599 with open(tmp_path, "w", encoding="utf-8") as f:
600 json.dump(pdfcpu_data, f, indent=2)
602 # Build command arguments
603 args = ["form", "fill", str(pdf_path), str(tmp_path)]
604 if output_path:
605 args.append(str(output_path))
607 # Execute fill command
608 result = self._run_command(args, check=False)
609 if result.returncode != 0:
610 raise PDFCPUExecutionError(
611 f"Failed to fill form in {pdf_path}",
612 result.returncode,
613 self._sanitize_stderr(result.stderr),
614 )
616 return Path(output_path) if output_path else pdf_path
618 def fill_form_from_json(
619 self,
620 pdf_path: str | Path,
621 json_path: str | Path,
622 output_path: str | Path | None = None,
623 *,
624 validate: bool = True,
625 ) -> Path:
626 """Fill a PDF form with data from a JSON file.
628 The JSON file should contain simple key:value pairs where keys are
629 field names and values are the values to fill.
631 Args:
632 pdf_path: Path to the PDF file containing the form.
633 json_path: Path to the JSON file with form data.
634 output_path: Optional output path. If not provided, the input PDF
635 is modified in place.
636 validate: If True, validates form data before filling.
638 Returns:
639 Path to the filled PDF.
641 Raises:
642 FileNotFoundError: If any file does not exist.
643 FormValidationError: If validation fails and validate=True.
644 PDFCPUExecutionError: If pdfcpu fails to fill the form.
645 """
646 pdf_path = Path(pdf_path)
647 json_path = Path(json_path)
649 self._validate_pdf_path(pdf_path)
650 if not json_path.exists():
651 raise FileNotFoundError(f"JSON file not found: {json_path}")
652 if not json_path.is_file():
653 raise FileNotFoundError(f"Path is not a file: {json_path}")
655 # Read and parse JSON
656 with open(json_path, encoding="utf-8") as f:
657 form_data: dict[str, Any] = json.load(f)
659 return self.fill_form(pdf_path, form_data, output_path, validate=validate)
661 def _validate_pdf_path(self, pdf_path: Path) -> None:
662 """Validate that the PDF path exists and is a file.
664 Args:
665 pdf_path: Path to validate.
667 Raises:
668 FileNotFoundError: If the path does not exist or is not a file.
669 """
670 if not pdf_path.exists():
671 raise FileNotFoundError(f"PDF file not found: {pdf_path}")
672 if not pdf_path.is_file():
673 raise FileNotFoundError(f"Path is not a file: {pdf_path}")
675 def _parse_form_data(self, pdf_path: Path, raw_data: dict[str, Any]) -> PDFFormData:
676 """Parse raw form data from pdfcpu into structured format.
678 Args:
679 pdf_path: Path to the PDF file.
680 raw_data: Raw JSON data from pdfcpu.
682 Returns:
683 Parsed PDFFormData object.
684 """
685 header = raw_data.get("header", {})
686 pdf_version = header.get("version", "unknown")
688 fields: list[FormField] = []
690 forms = raw_data.get("forms", [])
691 if forms:
692 form = forms[0]
694 # Process each field type
695 field_types = [
696 "textfield",
697 "datefield",
698 "checkbox",
699 "radiobuttongroup",
700 "combobox",
701 "listbox",
702 ]
704 for field_type in field_types:
705 for field_data in form.get(field_type, []):
706 field = FormField(
707 field_type=field_type,
708 pages=field_data.get("pages", []),
709 id=str(field_data.get("id", "")),
710 name=field_data.get("name", ""),
711 value=field_data.get("value", ""),
712 locked=field_data.get("locked", False),
713 )
714 fields.append(field)
716 return PDFFormData(
717 source=pdf_path,
718 pdf_version=pdf_version,
719 has_form=len(fields) > 0,
720 fields=fields,
721 raw_data=raw_data,
722 )