mrblack

 1#!/usr/bin/env python3
 2# -*- coding: utf-8 -*-
 3#
 4# File: __init__.py
 5# Author: Wadih Khairallah
 6# Description: 
 7# Created: 2025-05-12 16:47:22
 8# Modified: 2025-05-14 18:49:23
 9
10from .pii import (
11    extract_pii_text,
12    extract_pii_file,
13    extract_pii_url,
14    extract_pii_image,
15    extract_pii_screenshot
16)
17from .textextract import (
18    extract_text,
19    extract_exif,
20    extract_metadata,
21    text_from_screenshot,
22    text_from_url,
23    text_from_audio,
24    text_from_pdf,
25    text_from_doc,
26    text_from_docx,
27    text_from_excel,
28    text_from_image,
29    text_from_any
30)
31
32__all__ = [
33    "extract_pii_text",
34    "extract_pii_file",
35    "extract_pii_url",
36    "extract_pii_image",
37    "extract_pii_screenshot",
38    "extract_text",
39    "extract_exif",
40    "extract_metadata",
41    "text_from_url",
42    "text_from_audio",
43    "text_from_pdf",
44    "text_from_doc",
45    "text_from_docx",
46    "text_from_excel",
47    "text_from_image",
48    "text_from_any"
49]
def extract_pii_text( text: str, labels: Union[List[str], str, NoneType] = None) -> Dict[str, List[str]]:
 71def extract_pii_text(
 72    text: str,
 73    labels: Optional[Union[List[str], str]] = None
 74) -> Dict[str, List[str]]:
 75    """
 76    Extract PII matches from provided text.
 77
 78    Args:
 79        text (str): The input text to scan for patterns.
 80        labels (Optional[Union[List[str], str]]): Specific labels to filter on.
 81
 82    Returns:
 83        Dict[str, List[str]]: Mapping of each label to a sorted list of
 84        matched and cleaned strings.
 85    """
 86    if isinstance(labels, str):
 87        labels = [labels]
 88    patterns = PATTERNS
 89    if labels:
 90        patterns = [
 91            p for p in PATTERNS
 92            if any(re.search(rf"\(\?P<{lbl}>", p) for lbl in labels)
 93        ]
 94    results: Dict[str, set] = defaultdict(set)
 95    for pattern in patterns:
 96        try:
 97            rx = re.compile(pattern)
 98            for m in rx.finditer(text):
 99                for lbl, val in m.groupdict().items():
100                    if not val:
101                        continue
102                    cleaned = _clean_value(lbl, val)
103                    if lbl == "url":
104                        cleaned = cleaned.rstrip("),.**")
105                    if cleaned is not None:
106                        results[lbl].add(cleaned)
107        except re.error as e:
108            print(
109                f"Invalid regex skipped: {pattern}{e}",
110                file=sys.stderr
111            )
112    return {lbl: sorted(vals) for lbl, vals in results.items()}

Extract PII matches from provided text.

Args: text (str): The input text to scan for patterns. labels (Optional[Union[List[str], str]]): Specific labels to filter on.

Returns: Dict[str, List[str]]: Mapping of each label to a sorted list of matched and cleaned strings.

def extract_pii_file( file_path: str, labels: Union[List[str], str, NoneType] = None) -> Optional[Dict[str, List[str]]]:
115def extract_pii_file(
116    file_path: str,
117    labels: Optional[Union[List[str], str]] = None
118) -> Optional[Dict[str, List[str]]]:
119    """
120    Extract PII from a single file's text content.
121
122    Args:
123        file_path (str): Path to the file.
124        labels (Optional[Union[List[str], str]]): Labels to filter.
125
126    Returns:
127        Optional[Dict[str, List[str]]]: Extraction results, or None.
128    """
129    text = extract_text(file_path)
130    if not text:
131        return None
132    data = extract_pii_text(text, labels)
133    return data or None

Extract PII from a single file's text content.

Args: file_path (str): Path to the file. labels (Optional[Union[List[str], str]]): Labels to filter.

Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.

def extract_pii_url( path: str, labels: Union[List[str], str, NoneType] = None) -> Optional[Dict[str, List[str]]]:
136def extract_pii_url(
137    path: str,
138    labels: Optional[Union[List[str], str]] = None
139) -> Optional[Dict[str, List[str]]]:
140    """
141    Extract PII from the text at a URL.
142
143    Args:
144        path (str): The URL to fetch.
145        labels (Optional[Union[List[str], str]]): Labels to filter.
146
147    Returns:
148        Optional[Dict[str, List[str]]]: Extraction results, or None.
149    """
150    text = text_from_url(path)
151    if not text:
152        return None
153    data = extract_pii_text(text, labels)
154    return data or None

Extract PII from the text at a URL.

Args: path (str): The URL to fetch. labels (Optional[Union[List[str], str]]): Labels to filter.

Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.

def extract_pii_image( image_path: str, labels: Union[List[str], str, NoneType] = None) -> Optional[Dict[str, List[str]]]:
157def extract_pii_image(
158    image_path: str,
159    labels: Optional[Union[List[str], str]] = None
160) -> Optional[Dict[str, List[str]]]:
161    """
162    Extract PII from an image using OCR.
163
164    Args:
165        image_path (str): Path to the image file.
166        labels (Optional[Union[List[str], str]]): Labels to filter.
167
168    Returns:
169        Optional[Dict[str, List[str]]]: Extraction results, or None.
170    """
171    path = clean_path(image_path)
172    if not path or not os.path.isfile(path):
173        print(f"[red]Invalid image path:[/] {image_path}")
174        return None
175    text = extract_text(path)
176    if not text:
177        return None
178    data = extract_pii_text(text, labels)
179    return data or None

Extract PII from an image using OCR.

Args: image_path (str): Path to the image file. labels (Optional[Union[List[str], str]]): Labels to filter.

Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.

def extract_pii_screenshot( labels: Union[List[str], str, NoneType] = None) -> Optional[Dict[str, List[str]]]:
182def extract_pii_screenshot(
183    labels: Optional[Union[List[str], str]] = None
184) -> Optional[Dict[str, List[str]]]:
185    """
186    Capture a screenshot and extract PII from its OCR text.
187
188    Args:
189        labels (Optional[Union[List[str], str]]): Labels to filter.
190
191    Returns:
192        Optional[Dict[str, List[str]]]: Extraction results, or None.
193    """
194    text = text_from_screenshot()
195    if not text:
196        return None
197    data = extract_pii_text(text, labels)
198    return data or None

Capture a screenshot and extract PII from its OCR text.

Args: labels (Optional[Union[List[str], str]]): Labels to filter.

Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.

def extract_text(file_path: str) -> Optional[str]:
186def extract_text(
187    file_path: str
188) -> Optional[str]:
189    """
190    Extract text content from a local file or URL.
191
192    Supports web pages, text, JSON, XML, CSV, Excel, PDF, DOCX, images, audio.
193
194    Args:
195        file_path (str): Path to the input file or URL.
196
197    Returns:
198        Optional[str]: Extracted text, or None if unsupported or error.
199    """
200    if is_url(file_path):
201        return text_from_url(file_path)
202
203    TEXT_MIME_TYPES = {
204        "application/json", "application/xml", "application/x-yaml",
205        "application/x-toml", "application/x-csv", "application/x-markdown",
206    }
207
208    path = clean_path(file_path)
209    if not path:
210        print(f"No such file: {file_path}")
211        return None
212
213    mime_type = magic.from_file(path, mime=True)
214    try:
215        if mime_type.startswith("text/") or mime_type in TEXT_MIME_TYPES:
216            with open(path, 'r', encoding='utf-8', errors='ignore') as f:
217                content = f.read()
218        elif mime_type in [
219            "application/vnd.ms-excel",
220            "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
221        ]:
222            content = text_from_excel(path)
223        elif mime_type == "application/pdf":
224            content = text_from_pdf(path)
225        elif mime_type == \
226            "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
227            content = text_from_docx(path)
228        elif mime_type == "application/msword":
229            content = text_from_doc(path)
230        elif mime_type.startswith("image/"):
231            content = text_from_image(path)
232        elif mime_type.startswith("audio/"):
233            content = text_from_audio(path)
234        else:
235            content = text_from_any(path)
236
237        if content:
238            return content
239        else:
240            print(f"No content found for file: {path}")
241            return None
242    except Exception as e:
243        print(f"Error reading {path}: {e}")
244        return None

Extract text content from a local file or URL.

Supports web pages, text, JSON, XML, CSV, Excel, PDF, DOCX, images, audio.

Args: file_path (str): Path to the input file or URL.

Returns: Optional[str]: Extracted text, or None if unsupported or error.

def extract_exif(file_path: str) -> Optional[Dict[str, Any]]:
132def extract_exif(
133    file_path: str
134) -> Optional[Dict[str, Any]]:
135    """
136    Extract EXIF metadata from a file using exiftool.
137
138    Args:
139        file_path (str): Path to the target file.
140
141    Returns:
142        Optional[Dict[str, Any]]: Parsed EXIF data, or None on failure.
143    """
144    exif_data: Optional[Dict[str, Any]] = None
145    try:
146        result = subprocess.run(
147            ['exiftool', '-j', file_path],
148            stdout=subprocess.PIPE,
149            stderr=subprocess.PIPE
150        )
151        if result.returncode == 0:
152            exif_data = json.loads(result.stdout.decode())[0]
153    except Exception as e:
154        print(f"Exiftool failed: {e}")
155    return exif_data

Extract EXIF metadata from a file using exiftool.

Args: file_path (str): Path to the target file.

Returns: Optional[Dict[str, Any]]: Parsed EXIF data, or None on failure.

def extract_metadata(file_path: str) -> Dict[str, Any]:
585def extract_metadata(
586    file_path: str
587) -> Dict[str, Any]:
588    """
589    Extract comprehensive metadata from any file type.
590
591    Args:
592        file_path (str): Path to target file.
593
594    Returns:
595        Dict[str, Any]: Nested metadata structure.
596    """
597    path = clean_path(file_path)
598    if not path:
599        return {"error": "File not found"}
600    meta: Dict[str, Any] = {}
601    try:
602        stats = os.stat(path)
603        meta["size_bytes"] = stats.st_size
604        meta["mime"] = magic.from_file(path, mime=True)
605        meta["hashes"] = {
606            "md5": hashlib.md5(open(path,'rb').read()).hexdigest()}
607    except Exception as e:
608        meta["error"] = str(e)
609    return meta

Extract comprehensive metadata from any file type.

Args: file_path (str): Path to target file.

Returns: Dict[str, Any]: Nested metadata structure.

def text_from_url(url: str) -> Optional[str]:
158def text_from_url(
159    url: str
160) -> Optional[str]:
161    """
162    Fetch and extract visible text from a web page.
163
164    Args:
165        url (str): The target webpage URL.
166
167    Returns:
168        Optional[str]: Extracted text, or None on failure.
169    """
170    try:
171        response = requests.get(url, timeout=10)
172        response.raise_for_status()
173        soup = BeautifulSoup(response.text, "html.parser")
174        for tag in soup(
175            ["script", "style", "noscript", "iframe",
176             "header", "footer", "meta", "link"]
177        ):
178            tag.decompose()
179        content = soup.get_text(separator=" ").strip()
180        return normalize(content) 
181    except requests.RequestException as e:
182        print(f"Error fetching URL: {url} - {e}")
183        return None

Fetch and extract visible text from a web page.

Args: url (str): The target webpage URL.

Returns: Optional[str]: Extracted text, or None on failure.

def text_from_audio(audio_file: str) -> Optional[str]:
246def text_from_audio(
247    audio_file: str
248) -> Optional[str]:
249    """
250    Transcribe audio to text using Google Speech Recognition.
251
252    Args:
253        audio_file (str): Path to the input audio file.
254
255    Returns:
256        Optional[str]: Transcription, or None on failure.
257    """
258    def convert_to_wav(file_path: str) -> str:
259        _, ext = os.path.splitext(file_path)
260        ext = ext.lstrip('.')
261        audio = AudioSegment.from_file(file_path, format=ext)
262        tmp_filename = f"audio_{uuid4().hex}.wav"
263        wav_path = os.path.join(tempfile.gettempdir(), tmp_filename)
264        audio.export(wav_path, format='wav')
265        return wav_path
266
267    recognizer = sr.Recognizer()
268    temp_wav_path = None
269    cleanup_needed = False
270
271    try:
272        _, ext = os.path.splitext(audio_file)
273        if ext.lower() not in ['.wav', '.wave']:
274            temp_wav_path = convert_to_wav(audio_file)
275            cleanup_needed = True
276        else:
277            temp_wav_path = clean_path(audio_file)
278
279        if not temp_wav_path:
280            print("Invalid audio path.")
281            return None
282
283        with sr.AudioFile(temp_wav_path) as source:
284            audio = recognizer.record(source)
285        return recognizer.recognize_google(audio)
286
287    except sr.UnknownValueError:
288        print("Could not understand audio.")
289    except sr.RequestError as e:
290        print(f"Speech recognition error: {e}")
291    except Exception as e:
292        print(f"Failed to process audio: {e}")
293    finally:
294        if cleanup_needed and temp_wav_path and os.path.exists(temp_wav_path):
295            try:
296                os.remove(temp_wav_path)
297            except Exception as e:
298                print(f"Failed to delete temp WAV file {temp_wav_path}: {e}")
299
300    return None

Transcribe audio to text using Google Speech Recognition.

Args: audio_file (str): Path to the input audio file.

Returns: Optional[str]: Transcription, or None on failure.

def text_from_pdf(pdf_path: str) -> Optional[str]:
347def text_from_pdf(
348    pdf_path: str
349) -> Optional[str]:
350    """
351    Extract text and OCR results from a PDF using PyMuPDF.
352
353    Args:
354        pdf_path (str): Path to PDF file.
355
356    Returns:
357        Optional[str]: Combined normalized text and image OCR results.
358    """
359    plain_text = ""
360    temp_image_paths: List[str] = []
361
362    try:
363        doc = pymupdf.open(pdf_path)
364        for k, v in doc.metadata.items():
365            plain_text += f"{k}: {v}\n"
366
367        for i in range(len(doc)):
368            page = doc.load_page(i)
369            plain_text += f"\n--- Page {i + 1} ---\n"
370            text = page.get_text()
371            plain_text += text or "[No text]\n"
372
373            for img_index, img in enumerate(page.get_images(full=True), start=1):
374                xref = img[0]
375                base = doc.extract_image(xref)
376                img_bytes = base["image"]
377
378                img_filename = f"pdf_page{i+1}_img{img_index}_{uuid4().hex}.png"
379                img_path = os.path.join(tempfile.gettempdir(), img_filename)
380                temp_image_paths.append(img_path)
381
382                with open(img_path, "wb") as f:
383                    f.write(img_bytes)
384
385                ocr = text_from_image(img_path) or ""
386                plain_text += f"\n[Image {img_index} OCR]\n{ocr}\n"
387
388        return normalize(plain_text)
389    except Exception as e:
390        print(f"Error processing PDF: {e}")
391        return None
392    finally:
393        for path in temp_image_paths:
394            if os.path.exists(path):
395                try:
396                    os.remove(path)
397                except Exception as e:
398                    print(f"Failed to delete temp image {path}: {e}")
399        doc.close()

Extract text and OCR results from a PDF using PyMuPDF.

Args: pdf_path (str): Path to PDF file.

Returns: Optional[str]: Combined normalized text and image OCR results.

def text_from_doc(filepath: str, min_length: int = 4) -> str:
402def text_from_doc(
403    filepath: str,
404    min_length: int = 4
405) -> str:
406    """
407    Extract readable strings and metadata from binary Word (.doc) files.
408
409    Args:
410        filepath (str): Path to .doc file.
411        min_length (int): Minimum string length to extract.
412
413    Returns:
414        str: Metadata and text content.
415    """
416    def extract_printable_strings(
417        data: bytes
418    ) -> List[str]:
419        pattern = re.compile(
420            b'[' + re.escape(bytes(string.printable, 'ascii')) +
421            b']{%d,}' % min_length
422        )
423        found = pattern.findall(data)
424        return list(dict.fromkeys(m.decode(errors='ignore').strip()
425                                   for m in found))
426
427    def clean_strings(
428        strs: List[str]
429    ) -> List[str]:
430        cleaned: List[str] = []
431        skip = ["HYPERLINK", "OLE2", "Normal.dotm"]
432        for line in strs:
433            if any(line.startswith(pref) for pref in skip):
434                continue
435            cleaned.append(re.sub(r'\s+', ' ', line).strip())
436        return cleaned
437
438    with open(filepath, 'rb') as f:
439        data = f.read()
440    strings = extract_printable_strings(data)
441    strings = clean_strings(strings)
442    content = "\n".join(strings)
443    return normalize(content)

Extract readable strings and metadata from binary Word (.doc) files.

Args: filepath (str): Path to .doc file. min_length (int): Minimum string length to extract.

Returns: str: Metadata and text content.

def text_from_docx(file_path: str) -> Optional[str]:
446def text_from_docx(
447    file_path: str
448) -> Optional[str]:
449    """
450    Extract text, tables, and OCR from embedded images in a DOCX file.
451
452    Args:
453        file_path (str): Path to the .docx file.
454
455    Returns:
456        Optional[str]: Normalized full text content.
457    """
458    path = clean_path(file_path)
459    if not path:
460        return None
461
462    temp_image_paths: List[str] = []
463    plain_text = ""
464
465    try:
466        doc = Document(path)
467
468        for p in doc.paragraphs:
469            if p.text.strip():
470                plain_text += p.text.strip() + "\n"
471
472        for tbl in doc.tables:
473            plain_text += "\n[Table]\n"
474            for row in tbl.rows:
475                row_text = "\t".join(c.text.strip() for c in row.cells)
476                plain_text += row_text + "\n"
477
478        for rel_id, rel in doc.part.rels.items():
479            if "image" in rel.target_ref:
480                blob = rel.target_part.blob
481
482                img_filename = f"docx_img_{rel_id}_{uuid4().hex}.png"
483                img_path = os.path.join(tempfile.gettempdir(), img_filename)
484                temp_image_paths.append(img_path)
485
486                with open(img_path, "wb") as img_file:
487                    img_file.write(blob)
488
489                ocr = text_from_image(img_path) or ""
490                plain_text += f"\n[Image OCR]\n{ocr}\n"
491
492        return normalize(plain_text)
493
494    except Exception as e:
495        print(f"Error processing DOCX: {e}")
496        return None
497    finally:
498        for path in temp_image_paths:
499            if os.path.exists(path):
500                try:
501                    os.remove(path)
502                except Exception as e:
503                    print(f"Failed to delete temp DOCX image {path}: {e}")

Extract text, tables, and OCR from embedded images in a DOCX file.

Args: file_path (str): Path to the .docx file.

Returns: Optional[str]: Normalized full text content.

def text_from_excel(file_path: str) -> str:
506def text_from_excel(
507    file_path: str
508) -> str:
509    """
510    Convert an Excel workbook to CSV text.
511
512    Args:
513        file_path (str): Path to the Excel file.
514
515    Returns:
516        str: CSV-formatted string.
517    """
518    path = clean_path(file_path)
519    if not path:
520        return ""
521    try:
522        df = pd.read_excel(path)
523        out = StringIO()
524        df.to_csv(out, index=False)
525        return out.getvalue()
526    except Exception as e:
527        print(f"Failed Excel -> CSV: {e}")
528        return ""

Convert an Excel workbook to CSV text.

Args: file_path (str): Path to the Excel file.

Returns: str: CSV-formatted string.

def text_from_image(file_path: str) -> Optional[str]:
531def text_from_image(
532    file_path: str
533) -> Optional[str]:
534    """
535    Perform OCR on an image file.
536
537    Args:
538        file_path (str): Path to the image.
539
540    Returns:
541        Optional[str]: Extracted text, or None on error.
542    """
543    path = clean_path(file_path)
544    if not path:
545        return None
546    try:
547        with Image.open(path) as img:
548            txt = pytesseract.image_to_string(img).strip()
549            return normalize(txt) or ""
550    except Exception as e:
551        print(f"Failed image OCR: {e}")
552        return None

Perform OCR on an image file.

Args: file_path (str): Path to the image.

Returns: Optional[str]: Extracted text, or None on error.

def text_from_any(file_path: str) -> Optional[str]:
555def text_from_any(
556    file_path: str
557) -> Optional[str]:
558    """
559    Handle unknown file types by reporting stats and metadata.
560
561    Args:
562        file_path (str): Path to the file.
563
564    Returns:
565        Optional[str]: Plain-text report, or None on error.
566    """
567    path = clean_path(file_path)
568    if not path:
569        return None
570    try:
571        stats = os.stat(path)
572        info = {
573            "path": path,
574            "size": stats.st_size,
575            "created": datetime.fromtimestamp(stats.st_ctime).isoformat(),
576            "modified": datetime.fromtimestamp(stats.st_mtime).isoformat(),
577        }
578        content = "\n".join(f"{k}: {v}" for k, v in info.items())
579        return normalize(content)
580    except Exception as e:
581        print(f"Error on other file: {e}")
582        return None

Handle unknown file types by reporting stats and metadata.

Args: file_path (str): Path to the file.

Returns: Optional[str]: Plain-text report, or None on error.