mrblack
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# File: __init__.py 5# Author: Wadih Khairallah 6# Description: 7# Created: 2025-05-12 16:47:22 8# Modified: 2025-05-16 16:38:12 9 10from .pii import ( 11 extract_pii_text, 12 extract_pii_file, 13 extract_pii_url, 14 extract_pii_image, 15 extract_pii_screenshot 16) 17from .textextract import ( 18 extract_text, 19 extract_text_with_password, 20 extract_exif, 21 extract_strings, 22 extract_metadata, 23 text_from_screenshot, 24 text_from_url, 25 text_from_html, 26 text_from_audio, 27 text_from_pdf, 28 text_from_doc, 29 text_from_docx, 30 text_from_excel, 31 text_from_image, 32 text_from_any, 33 text_from_odt, 34 text_from_pptx, 35 text_from_epub, 36 analyze_text, 37 summarize_text, 38 translate_text, 39 list_available_languages, 40 detect_language, 41 scrape_website, 42 normalize_text, 43 44) 45 46__all__ = [ 47 "extract_pii_text", 48 "extract_pii_file", 49 "extract_pii_url", 50 "extract_pii_image", 51 "extract_pii_screenshot", 52 "extract_text_with_password", 53 "extract_text", 54 "extract_exif", 55 "extract_metadata", 56 "extract_strings", 57 "text_from_screenshot", 58 "text_from_url", 59 "text_from_html", 60 "text_from_audio", 61 "text_from_pdf", 62 "text_from_doc", 63 "text_from_docx", 64 "text_from_excel", 65 "text_from_image", 66 "text_from_any", 67 "text_from_odt", 68 "text_from_pptx", 69 "text_from_epub", 70 "analyze_text", 71 "summarize_text", 72 "translate_text", 73 "list_available_languages", 74 "detect_language", 75 "scrape_website", 76 "normalize_text" 77]
71def extract_pii_text( 72 text: str, 73 labels: Optional[Union[List[str], str]] = None 74) -> Dict[str, List[str]]: 75 """ 76 Extract PII matches from provided text. 77 78 Args: 79 text (str): The input text to scan for patterns. 80 labels (Optional[Union[List[str], str]]): Specific labels to filter on. 81 82 Returns: 83 Dict[str, List[str]]: Mapping of each label to a sorted list of 84 matched and cleaned strings. 85 """ 86 if isinstance(labels, str): 87 labels = [labels] 88 patterns = PATTERNS 89 if labels: 90 patterns = [ 91 p for p in PATTERNS 92 if any(re.search(rf"\(\?P<{lbl}>", p) for lbl in labels) 93 ] 94 results: Dict[str, set] = defaultdict(set) 95 for pattern in patterns: 96 try: 97 rx = re.compile(pattern) 98 for m in rx.finditer(text): 99 for lbl, val in m.groupdict().items(): 100 if not val: 101 continue 102 cleaned = _clean_value(lbl, val) 103 if lbl == "url": 104 cleaned = cleaned.rstrip("),.**") 105 if cleaned is not None: 106 results[lbl].add(cleaned) 107 except re.error as e: 108 print( 109 f"Invalid regex skipped: {pattern} → {e}", 110 file=sys.stderr 111 ) 112 return {lbl: sorted(vals) for lbl, vals in results.items()}
Extract PII matches from provided text.
Args: text (str): The input text to scan for patterns. labels (Optional[Union[List[str], str]]): Specific labels to filter on.
Returns: Dict[str, List[str]]: Mapping of each label to a sorted list of matched and cleaned strings.
115def extract_pii_file( 116 file_path: str, 117 labels: Optional[Union[List[str], str]] = None 118) -> Optional[Dict[str, List[str]]]: 119 """ 120 Extract PII from a single file's text content. 121 122 Args: 123 file_path (str): Path to the file. 124 labels (Optional[Union[List[str], str]]): Labels to filter. 125 126 Returns: 127 Optional[Dict[str, List[str]]]: Extraction results, or None. 128 """ 129 text = extract_text(file_path) 130 if not text: 131 return None 132 data = extract_pii_text(text, labels) 133 return data or None
Extract PII from a single file's text content.
Args: file_path (str): Path to the file. labels (Optional[Union[List[str], str]]): Labels to filter.
Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.
136def extract_pii_url( 137 path: str, 138 labels: Optional[Union[List[str], str]] = None 139) -> Optional[Dict[str, List[str]]]: 140 """ 141 Extract PII from the text at a URL. 142 143 Args: 144 path (str): The URL to fetch. 145 labels (Optional[Union[List[str], str]]): Labels to filter. 146 147 Returns: 148 Optional[Dict[str, List[str]]]: Extraction results, or None. 149 """ 150 text = text_from_url(path) 151 if not text: 152 return None 153 data = extract_pii_text(text, labels) 154 return data or None
Extract PII from the text at a URL.
Args: path (str): The URL to fetch. labels (Optional[Union[List[str], str]]): Labels to filter.
Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.
157def extract_pii_image( 158 image_path: str, 159 labels: Optional[Union[List[str], str]] = None 160) -> Optional[Dict[str, List[str]]]: 161 """ 162 Extract PII from an image using OCR. 163 164 Args: 165 image_path (str): Path to the image file. 166 labels (Optional[Union[List[str], str]]): Labels to filter. 167 168 Returns: 169 Optional[Dict[str, List[str]]]: Extraction results, or None. 170 """ 171 path = clean_path(image_path) 172 if not path or not os.path.isfile(path): 173 print(f"[red]Invalid image path:[/] {image_path}") 174 return None 175 text = extract_text(path) 176 if not text: 177 return None 178 data = extract_pii_text(text, labels) 179 return data or None
Extract PII from an image using OCR.
Args: image_path (str): Path to the image file. labels (Optional[Union[List[str], str]]): Labels to filter.
Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.
182def extract_pii_screenshot( 183 labels: Optional[Union[List[str], str]] = None 184) -> Optional[Dict[str, List[str]]]: 185 """ 186 Capture a screenshot and extract PII from its OCR text. 187 188 Args: 189 labels (Optional[Union[List[str], str]]): Labels to filter. 190 191 Returns: 192 Optional[Dict[str, List[str]]]: Extraction results, or None. 193 """ 194 text = text_from_screenshot() 195 if not text: 196 return None 197 data = extract_pii_text(text, labels) 198 return data or None
Capture a screenshot and extract PII from its OCR text.
Args: labels (Optional[Union[List[str], str]]): Labels to filter.
Returns: Optional[Dict[str, List[str]]]: Extraction results, or None.
566def extract_text_with_password(file_path: str, password: str) -> Optional[str]: 567 """ 568 Extract text from password-protected files. 569 570 Args: 571 file_path (str): Path to the file 572 password (str): Password to unlock the file 573 574 Returns: 575 Optional[str]: Extracted text 576 """ 577 file_ext = os.path.splitext(file_path)[1].lower() 578 579 if file_ext == '.pdf': 580 return text_from_pdf_protected(file_path, password) 581 elif file_ext in ['.docx', '.xlsx', '.pptx']: 582 return text_from_office_protected(file_path, password) 583 else: 584 logger.warning(f"Password protection not supported for {file_ext} files") 585 return None
Extract text from password-protected files.
Args: file_path (str): Path to the file password (str): Password to unlock the file
Returns: Optional[str]: Extracted text
480def extract_text( 481 file_path: str 482) -> Optional[str]: 483 """ 484 Extract text content from a local file or URL. 485 486 Supports web pages, text, JSON, XML, CSV, Excel, PDF, DOCX, images, audio. 487 488 Args: 489 file_path (str): Path to the input file or URL. 490 491 Returns: 492 Optional[str]: Extracted text, or None if unsupported or error. 493 """ 494 if is_url(file_path): 495 return text_from_url(file_path) 496 497 TEXT_MIME_TYPES = { 498 "application/json", "application/xml", "application/x-yaml", 499 "application/x-toml", "application/x-csv", "application/x-markdown", 500 } 501 502 path = clean_path(file_path) 503 if not path: 504 logger.error(f"No such file: {file_path}") 505 return None 506 507 mime_type = magic.from_file(path, mime=True) 508 try: 509 if mime_type.startswith("text/html"): 510 content = text_from_html(path) 511 return content 512 513 elif mime_type.startswith("text/") or mime_type in TEXT_MIME_TYPES: 514 with open(path, 'r', encoding='utf-8', errors='ignore') as f: 515 content = f.read() 516 return normalize_text(content) 517 518 elif mime_type in [ 519 "application/vnd.ms-excel", 520 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" 521 ]: 522 content = text_from_excel(path) 523 return content 524 525 elif mime_type == "application/pdf": 526 content = text_from_pdf(path) 527 return content 528 529 elif mime_type == \ 530 "application/vnd.openxmlformats-officedocument.wordprocessingml.document": 531 content = text_from_docx(path) 532 return content 533 534 elif mime_type == "application/msword": 535 content = text_from_doc(path) 536 return content 537 538 elif mime_type.startswith("image/"): 539 content = text_from_image(path) 540 return content 541 542 elif mime_type.startswith("audio/"): 543 content = text_from_audio(path) 544 return content 545 546 elif mime_type == "application/epub+zip": 547 content = text_from_epub(path) 548 return content 549 550 elif mime_type == "application/vnd.openxmlformats-officedocument.presentationml.presentation": 551 content = text_from_pptx(path) 552 return content 553 554 elif mime_type == "application/vnd.oasis.opendocument.text": 555 content = text_from_odt(path) 556 return content 557 558 else: 559 content = text_from_any(path) 560 return content 561 except Exception as e: 562 logger.error(f"Error reading {path}: {e}") 563 return None
Extract text content from a local file or URL.
Supports web pages, text, JSON, XML, CSV, Excel, PDF, DOCX, images, audio.
Args: file_path (str): Path to the input file or URL.
Returns: Optional[str]: Extracted text, or None if unsupported or error.
248def extract_exif( 249 file_path: str 250) -> Optional[Dict[str, Any]]: 251 """ 252 Extract EXIF metadata from a file using exiftool. 253 254 Args: 255 file_path (str): Path to the target file. 256 257 Returns: 258 Optional[Dict[str, Any]]: Parsed EXIF data, or None on failure. 259 """ 260 exif_data: Optional[Dict[str, Any]] = None 261 try: 262 result = subprocess.run( 263 ['exiftool', '-j', file_path], 264 stdout=subprocess.PIPE, 265 stderr=subprocess.PIPE 266 ) 267 if result.returncode == 0: 268 exif_data = json.loads(result.stdout.decode())[0] 269 except Exception as e: 270 logger.error(f"Exiftool failed: {e}") 271 return exif_data
Extract EXIF metadata from a file using exiftool.
Args: file_path (str): Path to the target file.
Returns: Optional[Dict[str, Any]]: Parsed EXIF data, or None on failure.
1965def extract_metadata(source: str) -> dict: 1966 """ 1967 Central metadata extraction router. Routes metadata extraction based on the source. 1968 1969 Args: 1970 source (str): source to extract metadata from 1971 1972 Returns: 1973 dict: Dictionary of extracted metadata for the given source. 1974 """ 1975 if is_url(source): 1976 metadata = extract_url_metadata(source) 1977 else: 1978 metadata = extract_file_metadata(source) 1979 1980 return metadata
Central metadata extraction router. Routes metadata extraction based on the source.
Args: source (str): source to extract metadata from
Returns: dict: Dictionary of extracted metadata for the given source.
1071def extract_strings(file_path, min_length=4): 1072 """ 1073 Extract printable strings from a file, similar to the Unix 'strings' command. 1074 1075 Args: 1076 file_path (str): Path to the file to extract strings from 1077 min_length (int, optional): Minimum length of strings to extract. Defaults to 4. 1078 1079 Returns: 1080 list: List of printable strings found in the file 1081 """ 1082 file_path = clean_path(file_path) 1083 1084 1085 # Define printable characters (excluding tabs and newlines) 1086 printable_chars = set(string.printable) - set('\t\n\r\v\f') 1087 1088 result = [] 1089 current_string = "" 1090 1091 # Read the file in binary mode 1092 try: 1093 with open(file_path, 'rb') as file: 1094 # Read the file byte by byte 1095 for byte in file.read(): 1096 # Convert byte to character 1097 char = chr(byte) 1098 1099 # If character is printable, add to current string 1100 if char in printable_chars: 1101 current_string += char 1102 # If not printable and we have a string of minimum length, add to results 1103 elif len(current_string) >= min_length: 1104 if current_string == "Sj[d": 1105 pass 1106 else: 1107 result.append(current_string) 1108 current_string = "" 1109 # If not printable and current string is too short, reset current string 1110 else: 1111 current_string = "" 1112 1113 # Don't forget to add the last string if it meets the minimum length 1114 if len(current_string) >= min_length: 1115 result.append(current_string) 1116 1117 return result 1118 except FileNotFoundError: 1119 print(f"Error: File '{file_path}' not found.", file=sys.stderr) 1120 return None 1121 except Exception as e: 1122 print(f"Error: {e}", file=sys.stderr) 1123 return None
Extract printable strings from a file, similar to the Unix 'strings' command.
Args: file_path (str): Path to the file to extract strings from min_length (int, optional): Minimum length of strings to extract. Defaults to 4.
Returns: list: List of printable strings found in the file
214def text_from_screenshot() -> str: 215 """ 216 Capture a full-screen screenshot, perform OCR, and clean up temp file. 217 218 Returns: 219 str: Normalized OCR-extracted text from the screenshot. 220 """ 221 tmp_filename = f"screenshot_{uuid4().hex}.png" 222 tmp_path = os.path.join(tempfile.gettempdir(), tmp_filename) 223 224 try: 225 with mss() as sct: 226 monitor = {"top": 0, "left": 0, "width": 0, "height": 0} 227 for mon in sct.monitors: 228 monitor["left"] = min(mon["left"], monitor["left"]) 229 monitor["top"] = min(mon["top"], monitor["top"]) 230 monitor["width"] = max(mon["width"] + mon["left"] - monitor["left"], monitor["width"]) 231 monitor["height"] = max(mon["height"] + mon["top"] - monitor["top"], monitor["height"]) 232 screenshot = sct.grab(monitor) 233 234 img = Image.frombytes("RGB", screenshot.size, screenshot.bgra, "raw", "BGRX") 235 img_gray = img.convert("L") 236 img_gray.save(tmp_path) 237 238 content = text_from_image(tmp_path) 239 return normalize_text(content) 240 finally: 241 if os.path.exists(tmp_path): 242 try: 243 os.remove(tmp_path) 244 except Exception as e: 245 logger.error(f"Failed to delete temp screenshot: {e}")
Capture a full-screen screenshot, perform OCR, and clean up temp file.
Returns: str: Normalized OCR-extracted text from the screenshot.
355def text_from_url( 356 url: str, 357 render_js: bool = True 358) -> Optional[str]: 359 """ 360 Extract visible text from a web page or downloadable file at the given URL. 361 362 Args: 363 url (str): Target URL (web page or file). 364 render_js (bool): Whether to render JavaScript content. 365 366 Returns: 367 Optional[str]: Extracted text, or None on failure. 368 """ 369 headers = generate_http_headers(url) 370 371 # Attempt to detect content type 372 content_type = "" 373 try: 374 head = requests.head(url, headers=headers, timeout=5, allow_redirects=True) 375 content_type = head.headers.get("Content-Type", "").lower() 376 except Exception as e: 377 logger.warning(f"HEAD request failed: {e}") 378 try: 379 resp = requests.get(url, headers=headers, timeout=5, stream=True) 380 content_type = resp.headers.get("Content-Type", "").lower() 381 except Exception as e: 382 logger.warning(f"GET fallback for Content-Type check failed: {e}") 383 384 # If clearly not HTML, treat as a file and extract locally 385 if not content_type.startswith("text/html"): 386 try: 387 with requests.get(url, headers=headers, stream=True, timeout=15) as r: 388 r.raise_for_status() 389 suffix = Path(urlparse(url).path).suffix or ".bin" 390 with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir="/tmp") as tmp_file: 391 shutil.copyfileobj(r.raw, tmp_file) 392 tmp_path = tmp_file.name 393 return extract_text(tmp_path) 394 except Exception as e: 395 logger.error(f"Failed to download and extract file from URL: {url} - {e}") 396 return None 397 398 # Standard HTML path 399 if render_js: 400 try: 401 session = HTMLSession() 402 try: 403 r = session.get(url, headers=headers, timeout=5) 404 try: 405 r.html.render(timeout=5, sleep=1, keep_page=True) 406 except Exception as e: 407 logger.warning(f"JS rendering failed, falling back to static HTML: {e}") 408 html = r.html.html 409 return text_from_html(html) 410 except Exception as e: 411 logger.error(f"[Error with HTMLSession] {url} - {e}") 412 finally: 413 session.close() 414 except Exception as e: 415 logger.error(f"[Error creating HTMLSession] {e}") 416 417 # Fallback: static HTML without rendering 418 try: 419 response = requests.get(url, headers=headers, timeout=10) 420 response.raise_for_status() 421 html = response.text 422 return text_from_html(html) 423 except Exception as e: 424 logger.error(f"[Error with requests] {url} - {e}") 425 return None
Extract visible text from a web page or downloadable file at the given URL.
Args: url (str): Target URL (web page or file). render_js (bool): Whether to render JavaScript content.
Returns: Optional[str]: Extracted text, or None on failure.
322def text_from_html(html: str) -> str: 323 """ 324 Extract readable text from raw HTML content. 325 326 Args: 327 html (str): HTML source as a string. 328 329 Returns: 330 str: Cleaned and normalized visible text. 331 """ 332 # Check if the input is a file path or HTML content 333 if os.path.isfile(html): 334 with open(html, 'r', encoding='utf-8', errors='ignore') as f: 335 html = f.read() 336 337 soup = BeautifulSoup(html, "html.parser") 338 339 # Remove non-visible or structural elements 340 for tag in soup([ 341 "script", "style", 342 "noscript", "iframe", 343 "meta", "link", 344 "header", "footer", 345 "form", "nav", 346 "aside" 347 ]): 348 tag.decompose() 349 350 text = soup.get_text(separator=" ") 351 352 return normalize_text(text)
Extract readable text from raw HTML content.
Args: html (str): HTML source as a string.
Returns: str: Cleaned and normalized visible text.
629def text_from_audio( 630 audio_file: str 631) -> Optional[str]: 632 """ 633 Transcribe audio to text using Google Speech Recognition. 634 635 Args: 636 audio_file (str): Path to the input audio file. 637 638 Returns: 639 Optional[str]: Transcription, or None on failure. 640 """ 641 def convert_to_wav(file_path: str) -> str: 642 _, ext = os.path.splitext(file_path) 643 ext = ext.lstrip('.') 644 audio = AudioSegment.from_file(file_path, format=ext) 645 tmp_filename = f"audio_{uuid4().hex}.wav" 646 wav_path = os.path.join(tempfile.gettempdir(), tmp_filename) 647 audio.export(wav_path, format='wav') 648 return wav_path 649 650 recognizer = sr.Recognizer() 651 temp_wav_path = None 652 cleanup_needed = False 653 654 try: 655 _, ext = os.path.splitext(audio_file) 656 if ext.lower() not in ['.wav', '.wave']: 657 temp_wav_path = convert_to_wav(audio_file) 658 cleanup_needed = True 659 else: 660 temp_wav_path = clean_path(audio_file) 661 662 if not temp_wav_path: 663 logger.error("Invalid audio path.") 664 return None 665 666 with sr.AudioFile(temp_wav_path) as source: 667 audio = recognizer.record(source) 668 return recognizer.recognize_google(audio) 669 670 except sr.UnknownValueError: 671 logger.error("Could not understand audio.") 672 except sr.RequestError as e: 673 logger.error(f"Speech recognition error: {e}") 674 except Exception as e: 675 logger.error(f"Failed to process audio: {e}") 676 finally: 677 if cleanup_needed and temp_wav_path and os.path.exists(temp_wav_path): 678 try: 679 os.remove(temp_wav_path) 680 except Exception as e: 681 logger.error(f"Failed to delete temp WAV file {temp_wav_path}: {e}") 682 683 return None
Transcribe audio to text using Google Speech Recognition.
Args: audio_file (str): Path to the input audio file.
Returns: Optional[str]: Transcription, or None on failure.
743def text_from_pdf( 744 pdf_path: str 745) -> Optional[str]: 746 """ 747 Extract text and OCR results from a PDF using PyMuPDF. 748 749 Args: 750 pdf_path (str): Path to PDF file. 751 752 Returns: 753 Optional[str]: Combined normalized text and image OCR results. 754 """ 755 plain_text = "" 756 temp_image_paths: List[str] = [] 757 758 try: 759 doc = pymupdf.open(pdf_path) 760 for k, v in doc.metadata.items(): 761 plain_text += f"{k}: {v}\n" 762 763 for i in range(len(doc)): 764 page = doc.load_page(i) 765 plain_text += f"\n--- Page {i + 1} ---\n" 766 text = page.get_text() 767 plain_text += text or "[No text]\n" 768 769 for img_index, img in enumerate(page.get_images(full=True), start=1): 770 xref = img[0] 771 base = doc.extract_image(xref) 772 img_bytes = base["image"] 773 774 img_filename = f"pdf_page{i+1}_img{img_index}_{uuid4().hex}.png" 775 img_path = os.path.join(tempfile.gettempdir(), img_filename) 776 temp_image_paths.append(img_path) 777 778 with open(img_path, "wb") as f: 779 f.write(img_bytes) 780 781 ocr = text_from_image(img_path) or "" 782 plain_text += f"\n[Image {img_index} OCR]\n{ocr}\n" 783 784 # Extract tables from PDF 785 """ 786 try: 787 tables = extract_tables_from_pdf(pdf_path) 788 if tables: 789 plain_text += "\n--- Tables ---\n" 790 for i, table in enumerate(tables, 1): 791 plain_text += f"\n[Table {i}]\n" 792 if isinstance(table, dict) and "data" in table: 793 for row in table["data"]: 794 plain_text += str(row) + "\n" 795 else: 796 plain_text += str(table) + "\n" 797 except Exception as e: 798 logger.warning(f"Could not extract tables from PDF: {e}") 799 """ 800 801 return normalize_text(plain_text) 802 except Exception as e: 803 logger.error(f"Error processing PDF: {e}") 804 return None 805 finally: 806 for path in temp_image_paths: 807 if os.path.exists(path): 808 try: 809 os.remove(path) 810 except Exception as e: 811 logger.error(f"Failed to delete temp image {path}: {e}") 812 if 'doc' in locals(): 813 doc.close()
Extract text and OCR results from a PDF using PyMuPDF.
Args: pdf_path (str): Path to PDF file.
Returns: Optional[str]: Combined normalized text and image OCR results.
891def text_from_doc( 892 filepath: str, 893 min_length: int = 4 894) -> str: 895 """ 896 Extract readable strings and metadata from binary Word (.doc) files. 897 898 Args: 899 filepath (str): Path to .doc file. 900 min_length (int): Minimum string length to extract. 901 902 Returns: 903 str: Metadata and text content. 904 """ 905 def extract_printable_strings( 906 data: bytes 907 ) -> List[str]: 908 pattern = re.compile( 909 b'[' + re.escape(bytes(string.printable, 'ascii')) + 910 b']{%d,}' % min_length 911 ) 912 found = pattern.findall(data) 913 914 results = [] 915 for m in found: 916 value = m.decode(errors='ignore').strip() 917 results.append(value) 918 919 return results 920 921 def clean_strings( 922 strs: List[str] 923 ) -> List[str]: 924 cleaned: List[str] = [] 925 skip = ["HYPERLINK", "OLE2", "Normal.dotm"] 926 for line in strs: 927 if any(line.startswith(pref) for pref in skip): 928 continue 929 cleaned.append(re.sub(r'\s+', ' ', line).strip()) 930 return cleaned 931 932 with open(filepath, 'rb') as f: 933 data = f.read() 934 935 strings = extract_printable_strings(data) 936 strings = clean_strings(strings) 937 content = "\n".join(strings) 938 939 return normalize_text(content)
Extract readable strings and metadata from binary Word (.doc) files.
Args: filepath (str): Path to .doc file. min_length (int): Minimum string length to extract.
Returns: str: Metadata and text content.
942def text_from_docx( 943 file_path: str 944) -> Optional[str]: 945 """ 946 Extract text, tables, and OCR from embedded images in a DOCX file. 947 948 Args: 949 file_path (str): Path to the .docx file. 950 951 Returns: 952 Optional[str]: Normalized full text content. 953 """ 954 path = clean_path(file_path) 955 if not path: 956 return None 957 958 temp_image_paths: List[str] = [] 959 plain_text = "" 960 961 try: 962 doc = Document(path) 963 964 for p in doc.paragraphs: 965 if p.text.strip(): 966 plain_text += p.text.strip() + "\n" 967 968 for tbl in doc.tables: 969 plain_text += "\n[Table]\n" 970 for row in tbl.rows: 971 row_text = "\t".join(c.text.strip() for c in row.cells) 972 plain_text += row_text + "\n" 973 974 for rel_id, rel in doc.part.rels.items(): 975 if "image" in rel.target_ref: 976 blob = rel.target_part.blob 977 978 img_filename = f"docx_img_{rel_id}_{uuid4().hex}.png" 979 img_path = os.path.join(tempfile.gettempdir(), img_filename) 980 temp_image_paths.append(img_path) 981 982 with open(img_path, "wb") as img_file: 983 img_file.write(blob) 984 985 ocr = text_from_image(img_path) or "" 986 plain_text += f"\n[Image OCR]\n{ocr}\n" 987 988 return normalize_text(plain_text) 989 990 except Exception as e: 991 logger.error(f"Error processing DOCX: {e}") 992 return None 993 finally: 994 for path in temp_image_paths: 995 if os.path.exists(path): 996 try: 997 os.remove(path) 998 except Exception as e: 999 logger.error(f"Failed to delete temp DOCX image {path}: {e}")
Extract text, tables, and OCR from embedded images in a DOCX file.
Args: file_path (str): Path to the .docx file.
Returns: Optional[str]: Normalized full text content.
1002def text_from_excel( 1003 file_path: str 1004) -> str: 1005 """ 1006 Convert an Excel workbook to CSV text. 1007 1008 Args: 1009 file_path (str): Path to the Excel file. 1010 1011 Returns: 1012 str: CSV-formatted string. 1013 """ 1014 path = clean_path(file_path) 1015 if not path: 1016 return "" 1017 try: 1018 # Get all sheets 1019 result = "" 1020 excel_file = pd.ExcelFile(path) 1021 for sheet_name in excel_file.sheet_names: 1022 df = pd.read_excel(path, sheet_name=sheet_name) 1023 out = StringIO() 1024 df.to_csv(out, index=False) 1025 result += f"\n--- Sheet: {sheet_name} ---\n" 1026 result += out.getvalue() 1027 result += "\n" 1028 return result 1029 except Exception as e: 1030 logger.error(f"Failed Excel -> CSV: {e}") 1031 return ""
Convert an Excel workbook to CSV text.
Args: file_path (str): Path to the Excel file.
Returns: str: CSV-formatted string.
1034def text_from_image( 1035 file_path: str 1036) -> Optional[str]: 1037 """ 1038 Perform OCR on an image file. 1039 1040 Args: 1041 file_path (str): Path to the image. 1042 1043 Returns: 1044 Optional[str]: Extracted text, or None on error. 1045 """ 1046 path = clean_path(file_path) 1047 if not path: 1048 return None 1049 try: 1050 with Image.open(path) as img: 1051 # Improve OCR with preprocessing 1052 # 1. Convert to grayscale if it's not already 1053 if img.mode != 'L': 1054 img = img.convert('L') 1055 1056 # 2. Optional: Apply some contrast enhancement 1057 # (Disabled by default, enable if needed for specific cases) 1058 # from PIL import ImageEnhance 1059 # enhancer = ImageEnhance.Contrast(img) 1060 # img = enhancer.enhance(1.5) # Increase contrast 1061 1062 # Perform OCR with custom configuration 1063 custom_config = r'--oem 3 --psm 6' # Default OCR Engine Mode and Page Segmentation Mode 1064 txt = pytesseract.image_to_string(img, config=custom_config).strip() 1065 return normalize_text(txt) or "" 1066 except Exception as e: 1067 logger.error(f"Failed image OCR: {e}") 1068 return None
Perform OCR on an image file.
Args: file_path (str): Path to the image.
Returns: Optional[str]: Extracted text, or None on error.
1126def text_from_any( 1127 file_path: str 1128) -> Optional[str]: 1129 """ 1130 Handle unknown file types by reporting stats and metadata. 1131 1132 Args: 1133 file_path (str): Path to the file. 1134 1135 Returns: 1136 Optional[str]: Plain-text report, or None on error. 1137 """ 1138 content = "" 1139 path = clean_path(file_path) 1140 if not path: 1141 return None 1142 try: 1143 stats = os.stat(path) 1144 info = { 1145 "path": path, 1146 "size": stats.st_size, 1147 "created": datetime.fromtimestamp(stats.st_ctime).isoformat(), 1148 "modified": datetime.fromtimestamp(stats.st_mtime).isoformat(), 1149 } 1150 1151 for k, v in info.items(): 1152 content += "File System Data:\n" 1153 content += f"{k}: {v}\n" 1154 1155 # Try to extract EXIF if available 1156 exif = extract_exif(path) 1157 if exif: 1158 info["exif"] = exif 1159 content += "\n\nEXIF Data:\n" 1160 for k, v in exif.items(): 1161 if isinstance(v, dict): 1162 content += f"\n{k}:\n" 1163 for sub_k, sub_v in v.items(): 1164 content += f" {sub_k}: {sub_v}\n" 1165 else: 1166 content += f"{k}: {v}\n" 1167 1168 # Get file hash 1169 md5_hash = hashlib.md5(open(path,'rb').read()).hexdigest() 1170 info["md5"] = md5_hash 1171 1172 # Get strings 1173 strings = extract_strings(path) 1174 if strings: 1175 info["strings"] = strings 1176 content += "\n\nStrings Data:\n" 1177 clean_strings = "\n".join(strings) 1178 content += clean_strings 1179 1180 return info 1181 except Exception as e: 1182 logger.error(f"Error on other file: {e}") 1183 return None
Handle unknown file types by reporting stats and metadata.
Args: file_path (str): Path to the file.
Returns: Optional[str]: Plain-text report, or None on error.
3123def text_from_odt(odt_path: str) -> Optional[str]: 3124 """ 3125 Extract text from OpenDocument Text files. 3126 3127 Args: 3128 odt_path (str): Path to the ODT file 3129 3130 Returns: 3131 Optional[str]: Extracted text 3132 """ 3133 try: 3134 from odf import text, teletype 3135 from odf.opendocument import load 3136 3137 textdoc = load(odt_path) 3138 3139 # Extract metadata 3140 meta = [] 3141 meta_elem = textdoc.meta 3142 if meta_elem: 3143 for prop in meta_elem.childNodes: 3144 if hasattr(prop, 'tagName') and hasattr(prop, 'childNodes') and prop.childNodes: 3145 meta.append(f"{prop.tagName}: {teletype.extractText(prop)}") 3146 3147 # Extract content 3148 allparas = textdoc.getElementsByType(text.P) 3149 content = "\n".join(teletype.extractText(p) for p in allparas) 3150 3151 # Combine metadata and content 3152 if meta: 3153 final_text = "\n".join(meta) + "\n---\n" + content 3154 else: 3155 final_text = content 3156 3157 return normalize_text(final_text) 3158 except ImportError: 3159 logger.error("odfpy not installed") 3160 return "odfpy package is required for ODT processing" 3161 except Exception as e: 3162 logger.error(f"Error processing ODT: {e}") 3163 return None
Extract text from OpenDocument Text files.
Args: odt_path (str): Path to the ODT file
Returns: Optional[str]: Extracted text
3080def text_from_pptx(pptx_path: str) -> Optional[str]: 3081 """ 3082 Extract text from PowerPoint presentations. 3083 3084 Args: 3085 pptx_path (str): Path to the PowerPoint file 3086 3087 Returns: 3088 Optional[str]: Extracted text 3089 """ 3090 try: 3091 from pptx import Presentation 3092 3093 prs = Presentation(pptx_path) 3094 text = ["--- PowerPoint Presentation ---"] 3095 3096 for i, slide in enumerate(prs.slides, 1): 3097 slide_text = [f"Slide {i}:"] 3098 3099 # Get slide title if it exists 3100 if slide.shapes.title and slide.shapes.title.text: 3101 slide_text.append(f"Title: {slide.shapes.title.text}") 3102 3103 # Extract text from all shapes 3104 shape_text = [] 3105 for shape in slide.shapes: 3106 if hasattr(shape, "text") and shape.text: 3107 shape_text.append(shape.text) 3108 3109 if shape_text: 3110 slide_text.append("\n".join(shape_text)) 3111 3112 text.append("\n".join(slide_text)) 3113 3114 return normalize_text("\n\n".join(text)) 3115 except ImportError: 3116 logger.error("python-pptx not installed") 3117 return "python-pptx package is required for PowerPoint processing" 3118 except Exception as e: 3119 logger.error(f"Error processing PowerPoint: {e}") 3120 return None
Extract text from PowerPoint presentations.
Args: pptx_path (str): Path to the PowerPoint file
Returns: Optional[str]: Extracted text
3033def text_from_epub(epub_path: str) -> Optional[str]: 3034 """ 3035 Extract text from EPUB ebooks. 3036 3037 Args: 3038 epub_path (str): Path to the EPUB file 3039 3040 Returns: 3041 Optional[str]: Extracted text 3042 """ 3043 try: 3044 from ebooklib import epub 3045 import html2text 3046 3047 book = epub.read_epub(epub_path) 3048 h = html2text.HTML2Text() 3049 h.ignore_links = False 3050 3051 content = [] 3052 3053 # Get book metadata 3054 metadata = [] 3055 if book.get_metadata('DC', 'title'): 3056 metadata.append(f"Title: {book.get_metadata('DC', 'title')[0][0]}") 3057 if book.get_metadata('DC', 'creator'): 3058 metadata.append(f"Author: {book.get_metadata('DC', 'creator')[0][0]}") 3059 if book.get_metadata('DC', 'description'): 3060 metadata.append(f"Description: {book.get_metadata('DC', 'description')[0][0]}") 3061 3062 if metadata: 3063 content.append("\n".join(metadata)) 3064 content.append("---") 3065 3066 # Get book content 3067 for item in book.get_items(): 3068 if item.get_type() == epub.ITEM_DOCUMENT: 3069 content.append(h.handle(item.get_content().decode('utf-8'))) 3070 3071 return normalize_text("\n".join(content)) 3072 except ImportError: 3073 logger.error("ebooklib and/or html2text not installed") 3074 return "ebooklib and/or html2text packages are required for EPUB processing" 3075 except Exception as e: 3076 logger.error(f"Error processing EPUB: {e}") 3077 return None
Extract text from EPUB ebooks.
Args: epub_path (str): Path to the EPUB file
Returns: Optional[str]: Extracted text
2136def analyze_text( 2137 text: str, 2138 advanced: bool = False, 2139 domain_specific: str = None 2140) -> Dict[str, Any]: 2141 """ 2142 Perform comprehensive text analytics with advanced NLP techniques. 2143 2144 Args: 2145 text (str): Input text 2146 advanced (bool): Whether to perform computationally intensive advanced analysis 2147 domain_specific (str): Optional domain for specialized analysis (e.g., "academic", "social_media", "customer_reviews") 2148 2149 Returns: 2150 Dict: Comprehensive analysis results 2151 """ 2152 try: 2153 # Import required libraries 2154 import numpy as np 2155 from scipy.spatial import distance 2156 import networkx as nx 2157 from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer 2158 from sklearn.decomposition import LatentDirichletAllocation, TruncatedSVD 2159 from sklearn.cluster import KMeans 2160 2161 # Basic tokenization 2162 original_text = text 2163 # Save original case for NER and other cases where case matters 2164 original_words = nltk.word_tokenize(text) 2165 original_sentences = nltk.sent_tokenize(text) 2166 2167 # Convert to lowercase for most analysis 2168 text = text.lower() 2169 sentences = nltk.sent_tokenize(text) 2170 words = nltk.word_tokenize(text) 2171 2172 # Filter out punctuation for word-based analysis 2173 words_no_punct = [word for word in words if word.isalpha()] 2174 2175 # Get paragraphs (text blocks separated by two newlines) 2176 paragraphs = text.split('\n\n') 2177 paragraphs = [p.strip() for p in paragraphs if p.strip()] 2178 2179 # Additional paragraph detection for different formats 2180 if len(paragraphs) <= 1: 2181 # Try other common paragraph separators 2182 paragraphs = re.split(r'\n[\t ]*\n', text) 2183 paragraphs = [p.strip() for p in paragraphs if p.strip()] 2184 2185 # If still only one paragraph, try to detect paragraph by indentation 2186 if len(paragraphs) <= 1: 2187 paragraphs = re.split(r'\n[\t ]+', text) 2188 paragraphs = [p.strip() for p in paragraphs if p.strip()] 2189 2190 # Stopwords 2191 try: 2192 stop_words = set(stopwords.words('english')) 2193 except: 2194 nltk.download('stopwords') 2195 stop_words = set(stopwords.words('english')) 2196 2197 # Remove stopwords 2198 filtered_words = [word for word in words_no_punct if word not in stop_words] 2199 2200 # Stemming and Lemmatization 2201 stemmer = PorterStemmer() 2202 lemmatizer = WordNetLemmatizer() 2203 2204 stemmed_words = [stemmer.stem(word) for word in filtered_words] 2205 lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words] 2206 2207 # Word frequencies 2208 word_freq = Counter(words_no_punct) 2209 filtered_word_freq = Counter(filtered_words) 2210 2211 # N-grams generation 2212 bigrams = list(ngrams(words_no_punct, 2)) 2213 trigrams = list(ngrams(words_no_punct, 3)) 2214 fourgrams = list(ngrams(words_no_punct, 4)) 2215 2216 bigram_freq = Counter(bigrams) 2217 trigram_freq = Counter(trigrams) 2218 fourgram_freq = Counter(fourgrams) 2219 2220 # Part-of-speech tagging 2221 pos_tags = nltk.pos_tag(original_words) 2222 pos_counts = Counter([tag for word, tag in pos_tags]) 2223 2224 # Count specific parts of speech 2225 noun_count = sum(1 for _, tag in pos_tags if tag.startswith('NN')) 2226 verb_count = sum(1 for _, tag in pos_tags if tag.startswith('VB')) 2227 adj_count = sum(1 for _, tag in pos_tags if tag.startswith('JJ')) 2228 adv_count = sum(1 for _, tag in pos_tags if tag.startswith('RB')) 2229 2230 # Lexical density (content words / total words) 2231 content_pos_tags = ['NN', 'VB', 'JJ', 'RB'] # Base forms 2232 content_words = sum(1 for _, tag in pos_tags if any(tag.startswith(pos) for pos in content_pos_tags)) 2233 lexical_density = content_words / len(words) if words else 0 2234 2235 # Named Entity Recognition 2236 named_entities = {} 2237 entity_counts = {} 2238 2239 try: 2240 ne_chunks = nltk.ne_chunk(pos_tags) 2241 2242 # Process tree to extract named entities 2243 for chunk in ne_chunks: 2244 if hasattr(chunk, 'label'): 2245 entity_type = chunk.label() 2246 entity_text = ' '.join(c[0] for c in chunk.leaves()) 2247 2248 if entity_type not in named_entities: 2249 named_entities[entity_type] = [] 2250 2251 named_entities[entity_type].append(entity_text) 2252 2253 # Count entities by type 2254 entity_counts = {entity_type: len(entities) for entity_type, entities in named_entities.items()} 2255 2256 except Exception as ne_error: 2257 logger.warning(f"NER error: {ne_error}") 2258 named_entities = {} 2259 entity_counts = {} 2260 2261 # Basic readability metrics 2262 char_count = len(text) 2263 char_count_no_spaces = len(text.replace(" ", "")) 2264 spaces = char_count - char_count_no_spaces 2265 2266 avg_word_length = sum(len(word) for word in words_no_punct) / len(words_no_punct) if words_no_punct else 0 2267 avg_sent_length = len(words_no_punct) / len(sentences) if sentences else 0 2268 avg_para_length = sum(len(p.split()) for p in paragraphs) / len(paragraphs) if paragraphs else 0 2269 2270 # Calculate syllables (approximation) 2271 def count_syllables(word): 2272 word = word.lower() 2273 if len(word) <= 3: 2274 return 1 2275 2276 # Remove silent e 2277 if word.endswith('e'): 2278 word = word[:-1] 2279 2280 # Count vowel groups 2281 vowels = "aeiouy" 2282 count = 0 2283 prev_is_vowel = False 2284 2285 for char in word: 2286 is_vowel = char in vowels 2287 if is_vowel and not prev_is_vowel: 2288 count += 1 2289 prev_is_vowel = is_vowel 2290 2291 return max(1, count) # Return at least 1 syllable 2292 2293 syllable_counts = [count_syllables(word) for word in words_no_punct] 2294 total_syllables = sum(syllable_counts) 2295 2296 # Readability formulas 2297 # Flesch Reading Ease 2298 flesch_reading_ease = 206.835 - (1.015 * avg_sent_length) - (84.6 * (total_syllables / len(words_no_punct))) if words_no_punct else 0 2299 2300 # Flesch-Kincaid Grade Level 2301 fk_grade = 0.39 * avg_sent_length + 11.8 * (total_syllables / len(words_no_punct)) - 15.59 if words_no_punct else 0 2302 2303 # Gunning Fog Index 2304 complex_words = sum(1 for word in words_no_punct if count_syllables(word) >= 3) 2305 complex_word_percentage = complex_words / len(words_no_punct) if words_no_punct else 0 2306 gunning_fog = 0.4 * (avg_sent_length + 100 * complex_word_percentage) if words_no_punct else 0 2307 2308 # SMOG Index 2309 if len(sentences) >= 30: 2310 smog_sentences = sentences[:30] # Use first 30 sentences 2311 else: 2312 smog_sentences = sentences # Use all available 2313 2314 smog_words = [word for sent in smog_sentences for word in nltk.word_tokenize(sent) if word.isalpha()] 2315 smog_complex_words = sum(1 for word in smog_words if count_syllables(word) >= 3) 2316 smog_index = 1.043 * math.sqrt(smog_complex_words * (30 / len(smog_sentences)) if smog_sentences else 0) + 3.1291 2317 2318 # Dale-Chall Readability Formula 2319 # This would require a list of common words, simplified version: 2320 dale_chall_diff_words = sum(1 for word in words_no_punct if len(word) >= 7) 2321 dale_chall_score = 0.1579 * (dale_chall_diff_words / len(words_no_punct) * 100 if words_no_punct else 0) + 0.0496 * avg_sent_length 2322 2323 if dale_chall_diff_words / len(words_no_punct) > 0.05 if words_no_punct else 0: 2324 dale_chall_score += 3.6365 2325 2326 # Sentiment Analysis 2327 blob = TextBlob(original_text) 2328 sentiment = blob.sentiment 2329 2330 # Subjectivity by sentence 2331 sentence_sentiments = [TextBlob(sent).sentiment for sent in original_sentences] 2332 sentence_polarities = [sent.polarity for sent in sentence_sentiments] 2333 sentence_subjectivities = [sent.subjectivity for sent in sentence_sentiments] 2334 2335 # Sentiment variance 2336 polarity_variance = np.var(sentence_polarities) if sentence_polarities else 0 2337 subjectivity_variance = np.var(sentence_subjectivities) if sentence_subjectivities else 0 2338 2339 # Sentiment extremes 2340 most_positive_sentence = original_sentences[np.argmax(sentence_polarities)] if sentence_polarities else "" 2341 most_negative_sentence = original_sentences[np.argmin(sentence_polarities)] if sentence_polarities else "" 2342 most_subjective_sentence = original_sentences[np.argmax(sentence_subjectivities)] if sentence_subjectivities else "" 2343 most_objective_sentence = original_sentences[np.argmin(sentence_subjectivities)] if sentence_subjectivities else "" 2344 2345 # Averaged categorical sentiment 2346 positive_threshold = 0.05 2347 negative_threshold = -0.05 2348 positive_count = sum(1 for polarity in sentence_polarities if polarity > positive_threshold) 2349 negative_count = sum(1 for polarity in sentence_polarities if polarity < negative_threshold) 2350 neutral_count = sum(1 for polarity in sentence_polarities if positive_threshold >= polarity >= negative_threshold) 2351 2352 # Calculate percentages 2353 total_sentences = len(sentence_polarities) if sentence_polarities else 1 # Avoid division by zero 2354 positive_percentage = (positive_count / total_sentences) * 100 2355 negative_percentage = (negative_count / total_sentences) * 100 2356 neutral_percentage = (neutral_count / total_sentences) * 100 2357 2358 # Determine categorical sentiment 2359 if positive_percentage > 60: 2360 categorical_sentiment = "very positive" 2361 elif positive_percentage > 40: 2362 categorical_sentiment = "positive" 2363 elif negative_percentage > 60: 2364 categorical_sentiment = "very negative" 2365 elif negative_percentage > 40: 2366 categorical_sentiment = "negative" 2367 elif neutral_percentage > 60: 2368 categorical_sentiment = "neutral" 2369 else: 2370 categorical_sentiment = "mixed" 2371 2372 # Lexical diversity 2373 lexical_diversity = len(set(words_no_punct)) / len(words_no_punct) if words_no_punct else 0 2374 2375 # Calculate TF-IDF 2376 # Without a corpus this is simplified, but we can treat each sentence as a document 2377 if len(sentences) > 3: # Only compute if we have enough sentences 2378 try: 2379 tfidf_vectorizer = TfidfVectorizer(stop_words='english') 2380 tfidf_matrix = tfidf_vectorizer.fit_transform(sentences) 2381 feature_names = tfidf_vectorizer.get_feature_names_out() 2382 2383 # Get top tfidf terms for each sentence 2384 tfidf_top_terms = [] 2385 for i, sentence in enumerate(sentences): 2386 if i < tfidf_matrix.shape[0]: # Safety check 2387 feature_index = tfidf_matrix[i,:].nonzero()[1] 2388 tfidf_scores = zip(feature_index, [tfidf_matrix[i, x] for x in feature_index]) 2389 tfidf_scores = sorted(tfidf_scores, key=lambda x: x[1], reverse=True) 2390 tfidf_top_terms.append([(feature_names[i], score) for i, score in tfidf_scores[:5]]) 2391 except Exception as tfidf_error: 2392 logger.warning(f"TF-IDF error: {tfidf_error}") 2393 tfidf_top_terms = [] 2394 else: 2395 tfidf_top_terms = [] 2396 2397 # Text summarization - extractive (simplified) 2398 # Rank sentences by importance (using word frequency as proxy) 2399 sentence_scores = {} 2400 for i, sentence in enumerate(sentences): 2401 sentence_words = nltk.word_tokenize(sentence.lower()) 2402 sentence_words = [word for word in sentence_words if word.isalpha()] 2403 score = sum(word_freq.get(word, 0) for word in sentence_words) 2404 sentence_scores[original_sentences[i]] = score 2405 2406 top_sentences = sorted(sentence_scores.items(), key=lambda x: x[1], reverse=True)[:3] 2407 summary = ' '.join([s[0] for s in top_sentences]) 2408 2409 # Text cohesion metrics 2410 transitional_words = ["however", "therefore", "furthermore", "consequently", "nevertheless", 2411 "thus", "meanwhile", "indeed", "moreover", "whereas", "conversely", 2412 "similarly", "in addition", "in contrast", "specifically", "especially", 2413 "particularly", "for example", "for instance", "in conclusion", "finally"] 2414 2415 # Count transitional words and their positions 2416 transition_count = 0 2417 transition_positions = [] 2418 2419 for i, word in enumerate(words): 2420 if word in transitional_words or any(phrase in ' '.join(words[i:i+4]) for phrase in transitional_words if ' ' in phrase): 2421 transition_count += 1 2422 transition_positions.append(i / len(words) if words else 0) # Normalized position 2423 2424 # Cohesion score - higher means more transitional elements 2425 cohesion_score = (transition_count / len(words) * 100) if words else 0 2426 2427 # Distribution of transitions (beginning, middle, end) 2428 if transition_positions: 2429 transitions_beginning = sum(1 for pos in transition_positions if pos < 0.33) 2430 transitions_middle = sum(1 for pos in transition_positions if 0.33 <= pos < 0.66) 2431 transitions_end = sum(1 for pos in transition_positions if pos >= 0.66) 2432 else: 2433 transitions_beginning = transitions_middle = transitions_end = 0 2434 2435 # Additional advanced metrics if requested 2436 advanced_results = {} 2437 2438 if advanced: 2439 try: 2440 # Create a document-term matrix 2441 if len(sentences) >= 5: # Need enough sentences for meaningful topics 2442 # Create Count Vectorizer 2443 count_vectorizer = CountVectorizer(stop_words='english', min_df=2) 2444 count_matrix = count_vectorizer.fit_transform(sentences) 2445 count_feature_names = count_vectorizer.get_feature_names_out() 2446 2447 # Train LDA model if we have enough data 2448 if count_matrix.shape[0] >= 5 and count_matrix.shape[1] >= 10: 2449 n_topics = min(3, count_matrix.shape[0] - 1) # Choose appropriate number of topics 2450 lda_model = LatentDirichletAllocation(n_components=n_topics, random_state=42) 2451 lda_model.fit(count_matrix) 2452 2453 # Get top words for each topic 2454 topics = [] 2455 for topic_idx, topic in enumerate(lda_model.components_): 2456 top_words_idx = topic.argsort()[:-11:-1] # Top 10 words 2457 top_words = [count_feature_names[i] for i in top_words_idx] 2458 topics.append(top_words) 2459 2460 advanced_results["topics"] = topics 2461 2462 # Alternative: Use TruncatedSVD (similar to LSA) for topic extraction 2463 svd_model = TruncatedSVD(n_components=n_topics, random_state=42) 2464 svd_model.fit(count_matrix) 2465 2466 # Get top words for each component (topic) 2467 svd_topics = [] 2468 for topic_idx, topic in enumerate(svd_model.components_): 2469 top_words_idx = topic.argsort()[:-11:-1] # Top 10 words 2470 top_words = [count_feature_names[i] for i in top_words_idx] 2471 svd_topics.append(top_words) 2472 2473 advanced_results["svd_topics"] = svd_topics 2474 else: 2475 advanced_results["topics"] = ["Insufficient data for topic modeling"] 2476 advanced_results["svd_topics"] = ["Insufficient data for topic modeling"] 2477 else: 2478 advanced_results["topics"] = ["Insufficient data for topic modeling"] 2479 advanced_results["svd_topics"] = ["Insufficient data for topic modeling"] 2480 2481 # Clustering sentences instead of document similarity 2482 if len(sentences) >= 5: 2483 # Use TF-IDF vectors for clustering 2484 tfidf_vectorizer = TfidfVectorizer(stop_words='english') 2485 tfidf_matrix = tfidf_vectorizer.fit_transform(sentences) 2486 2487 # Determine number of clusters 2488 n_clusters = min(3, len(sentences) - 1) 2489 km = KMeans(n_clusters=n_clusters, random_state=42) 2490 km.fit(tfidf_matrix) 2491 2492 # Get sentence clusters 2493 clusters = km.labels_.tolist() 2494 2495 # Organize sentences by cluster 2496 sentence_clusters = defaultdict(list) 2497 for i, cluster in enumerate(clusters): 2498 sentence_clusters[cluster].append(original_sentences[i]) 2499 2500 advanced_results["sentence_clusters"] = dict(sentence_clusters) 2501 2502 # Get top terms per cluster 2503 cluster_terms = {} 2504 order_centroids = km.cluster_centers_.argsort()[:, ::-1] 2505 terms = tfidf_vectorizer.get_feature_names_out() 2506 2507 for i in range(n_clusters): 2508 cluster_top_terms = [terms[ind] for ind in order_centroids[i, :10]] 2509 cluster_terms[i] = cluster_top_terms 2510 2511 advanced_results["cluster_terms"] = cluster_terms 2512 else: 2513 advanced_results["sentence_clusters"] = {"note": "Insufficient data for clustering"} 2514 advanced_results["cluster_terms"] = {"note": "Insufficient data for clustering"} 2515 2516 except Exception as topic_error: 2517 logger.warning(f"Topic modeling error: {topic_error}") 2518 advanced_results["topics"] = ["Error in topic modeling"] 2519 advanced_results["svd_topics"] = ["Error in topic modeling"] 2520 2521 # Text network analysis 2522 try: 2523 # Create word co-occurrence network 2524 G = nx.Graph() 2525 2526 # Add nodes (words) 2527 for word in set(filtered_words): 2528 G.add_node(word) 2529 2530 # Add edges (co-occurrences within sentences) 2531 for sentence in sentences: 2532 sent_words = [word for word in nltk.word_tokenize(sentence.lower()) 2533 if word.isalpha() and word not in stop_words] 2534 2535 # Add edges between all pairs of words in the sentence 2536 for i, word1 in enumerate(sent_words): 2537 for word2 in sent_words[i+1:]: 2538 if G.has_edge(word1, word2): 2539 G[word1][word2]['weight'] += 1 2540 else: 2541 G.add_edge(word1, word2, weight=1) 2542 2543 # Calculate network metrics if we have enough data 2544 if G.number_of_nodes() > 2: 2545 # Degree centrality 2546 degree_centrality = nx.degree_centrality(G) 2547 top_degree_centrality = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)[:10] 2548 2549 # Betweenness centrality for central connector words 2550 if G.number_of_nodes() < 1000: # Skip for very large networks 2551 betweenness_centrality = nx.betweenness_centrality(G, k=min(G.number_of_nodes(), 100)) 2552 top_betweenness = sorted(betweenness_centrality.items(), key=lambda x: x[1], reverse=True)[:10] 2553 else: 2554 top_betweenness = [("Network too large", 0)] 2555 2556 # Extract clusters/communities (simplified) 2557 components = list(nx.connected_components(G)) 2558 largest_component = max(components, key=len) 2559 2560 advanced_results["network_analysis"] = { 2561 "central_terms": [word for word, score in top_degree_centrality], 2562 "connector_terms": [word for word, score in top_betweenness], 2563 "clusters_count": len(components), 2564 "largest_cluster_size": len(largest_component) 2565 } 2566 else: 2567 advanced_results["network_analysis"] = {"note": "Insufficient data for network analysis"} 2568 except Exception as network_error: 2569 logger.warning(f"Network analysis error: {network_error}") 2570 advanced_results["network_analysis"] = {"error": str(network_error)} 2571 2572 # Syntactic complexity 2573 try: 2574 # Parse subtrees (approximation) 2575 syntactic_complexity = {} 2576 2577 # Count depth of clauses (approximation using POS patterns) 2578 clause_markers = [',', 'that', 'which', 'who', 'whom', 'whose', 'where', 'when', 'why', 'how'] 2579 subordinating_conjunctions = ['after', 'although', 'as', 'because', 'before', 'if', 'since', 'though', 'unless', 'until', 'when', 'where', 'while'] 2580 2581 clause_complexity = [] 2582 2583 for sentence in original_sentences: 2584 tokens = nltk.word_tokenize(sentence.lower()) 2585 clause_markers_count = sum(1 for token in tokens if token in clause_markers) 2586 subordinating_count = sum(1 for token in tokens if token in subordinating_conjunctions) 2587 2588 # Approximate clause depth 2589 clause_depth = 1 + clause_markers_count + subordinating_count 2590 clause_complexity.append(clause_depth) 2591 2592 syntactic_complexity["avg_clause_depth"] = sum(clause_complexity) / len(clause_complexity) if clause_complexity else 0 2593 syntactic_complexity["max_clause_depth"] = max(clause_complexity) if clause_complexity else 0 2594 2595 # Approximation of phrase types 2596 sentence_pos = [nltk.pos_tag(nltk.word_tokenize(sentence)) for sentence in original_sentences] 2597 2598 # Count noun phrases (approximated by adjective-noun sequences) 2599 noun_phrases = [] 2600 for sentence_tags in sentence_pos: 2601 for i in range(len(sentence_tags) - 1): 2602 if sentence_tags[i][1].startswith('JJ') and sentence_tags[i+1][1].startswith('NN'): 2603 noun_phrases.append(f"{sentence_tags[i][0]} {sentence_tags[i+1][0]}") 2604 2605 # Count verb phrases (approximated by adverb-verb sequences) 2606 verb_phrases = [] 2607 for sentence_tags in sentence_pos: 2608 for i in range(len(sentence_tags) - 1): 2609 if sentence_tags[i][1].startswith('RB') and sentence_tags[i+1][1].startswith('VB'): 2610 verb_phrases.append(f"{sentence_tags[i][0]} {sentence_tags[i+1][0]}") 2611 2612 syntactic_complexity["estimated_noun_phrases"] = len(noun_phrases) 2613 syntactic_complexity["estimated_verb_phrases"] = len(verb_phrases) 2614 syntactic_complexity["noun_verb_phrase_ratio"] = len(noun_phrases) / len(verb_phrases) if verb_phrases else 0 2615 2616 advanced_results["syntactic_complexity"] = syntactic_complexity 2617 except Exception as syntax_error: 2618 logger.warning(f"Syntactic analysis error: {syntax_error}") 2619 advanced_results["syntactic_complexity"] = {"error": str(syntax_error)} 2620 2621 # Domain-specific analysis 2622 if domain_specific: 2623 domain_analysis = {} 2624 2625 if domain_specific == "academic": 2626 # Academic writing analysis 2627 academic_terms = ["hypothesis", "theory", "analysis", "data", "method", "research", 2628 "study", "evidence", "results", "conclusion", "findings", "literature", 2629 "significant", "therefore", "thus", "however", "moreover"] 2630 hedge_words = ["may", "might", "could", "appears", "seems", "suggests", "indicates", 2631 "possibly", "perhaps", "likely", "unlikely", "generally", "usually"] 2632 2633 academic_term_count = sum(word_freq.get(term, 0) for term in academic_terms) 2634 hedge_word_count = sum(word_freq.get(term, 0) for term in hedge_words) 2635 2636 domain_analysis["academic_term_density"] = academic_term_count / len(words) if words else 0 2637 domain_analysis["hedging_density"] = hedge_word_count / len(words) if words else 0 2638 domain_analysis["citation_count"] = original_text.count("et al") + re.findall(r"\(\d{4}\)", original_text).__len__() 2639 2640 elif domain_specific == "social_media": 2641 # Social media analysis 2642 hashtags = re.findall(r"#\w+", original_text) 2643 mentions = re.findall(r"@\w+", original_text) 2644 urls = re.findall(r"https?://\S+", original_text) 2645 emojis = re.findall(r"[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F700-\U0001F77F\U0001F780-\U0001F7FF\U0001F800-\U0001F8FF\U0001F900-\U0001F9FF\U0001FA00-\U0001FA6F\U0001FA70-\U0001FAFF\U00002702-\U000027B0\U000024C2-\U0001F251]+", original_text) 2646 2647 slang_terms = ["lol", "omg", "wtf", "idk", "tbh", "imo", "fwiw", "ymmv", "tl;dr", "ftw"] 2648 slang_count = sum(1 for word in words if word.lower() in slang_terms) 2649 2650 domain_analysis["hashtag_count"] = len(hashtags) 2651 domain_analysis["mention_count"] = len(mentions) 2652 domain_analysis["url_count"] = len(urls) 2653 domain_analysis["emoji_count"] = len(emojis) 2654 domain_analysis["slang_terms"] = slang_count 2655 domain_analysis["engagement_markers"] = len(hashtags) + len(mentions) + len(emojis) + slang_count 2656 2657 elif domain_specific == "customer_reviews": 2658 # Customer review analysis 2659 product_terms = ["product", "quality", "price", "value", "recommend", "purchase", 2660 "buy", "bought", "worth", "money", "shipping", "delivery", "package", 2661 "arrived", "customer", "service", "return", "warranty", "replacement"] 2662 2663 rating_terms = ["star", "stars", "rating", "rate", "perfect", "excellent", "good", 2664 "average", "poor", "terrible", "worst", "best"] 2665 2666 feature_terms = ["feature", "features", "works", "worked", "functionality", "design", 2667 "size", "weight", "color", "material", "battery", "screen", "interface"] 2668 2669 product_term_count = sum(word_freq.get(term, 0) for term in product_terms) 2670 rating_term_count = sum(word_freq.get(term, 0) for term in rating_terms) 2671 feature_term_count = sum(word_freq.get(term, 0) for term in feature_terms) 2672 2673 # Find potential ratings (e.g. "5 star", "3.5 out of 5") 2674 rating_patterns = re.findall(r"(\d+\.?\d*)\s*(star|stars|out of \d+)", original_text.lower()) 2675 2676 domain_analysis["product_term_density"] = product_term_count / len(words) if words else 0 2677 domain_analysis["rating_term_density"] = rating_term_count / len(words) if words else 0 2678 domain_analysis["feature_term_density"] = feature_term_count / len(words) if words else 0 2679 domain_analysis["potential_ratings"] = rating_patterns 2680 domain_analysis["recommendation_language"] = "recommend" in text.lower() or "would buy" in text.lower() 2681 2682 advanced_results["domain_analysis"] = domain_analysis 2683 2684 # Collect all results 2685 analysis_results = { 2686 # Basic counts 2687 "basic_stats": { 2688 "word_count": len(words), 2689 "unique_word_count": len(set(words_no_punct)), 2690 "sentence_count": len(sentences), 2691 "paragraph_count": len(paragraphs), 2692 "character_count": char_count, 2693 "character_count_no_spaces": char_count_no_spaces, 2694 "avg_word_length": avg_word_length, 2695 "avg_sentence_length": avg_sent_length, 2696 "avg_paragraph_length": avg_para_length, 2697 "spaces": spaces, 2698 "punctuation_count": len(original_words) - len(words_no_punct), 2699 }, 2700 # Part of speech 2701 "part_of_speech": { 2702 "distribution": dict(pos_counts), 2703 "noun_count": noun_count, 2704 "verb_count": verb_count, 2705 "adjective_count": adj_count, 2706 "adverb_count": adv_count, 2707 "noun_to_verb_ratio": noun_count / verb_count if verb_count else 0, 2708 "lexical_density": lexical_density, 2709 }, 2710 2711 # Named entities 2712 "named_entities": { 2713 "counts": entity_counts, 2714 "entities": named_entities, 2715 "total_entities": sum(entity_counts.values()) 2716 }, 2717 2718 # Readability metrics 2719 "readability": { 2720 "flesch_reading_ease": flesch_reading_ease, 2721 "flesch_kincaid_grade": fk_grade, 2722 "gunning_fog_index": gunning_fog, 2723 "smog_index": smog_index, 2724 "dale_chall_score": dale_chall_score, 2725 "syllable_count": total_syllables, 2726 "avg_syllables_per_word": total_syllables / len(words_no_punct) if words_no_punct else 0, 2727 "complex_word_percentage": complex_word_percentage, 2728 "lexical_diversity": lexical_diversity, 2729 }, 2730 2731 # Frequency analysis 2732 "frequency_analysis": { 2733 "most_common_words": word_freq.most_common(20), 2734 "most_common_meaningful_words": filtered_word_freq.most_common(20), 2735 "most_common_bigrams": bigram_freq.most_common(10), 2736 "most_common_trigrams": trigram_freq.most_common(5), 2737 "most_common_fourgrams": fourgram_freq.most_common(3), 2738 "hapax_legomena": [word for word, count in word_freq.items() if count == 1], # Words occurring only once 2739 "hapax_percentage": sum(1 for _, count in word_freq.items() if count == 1) / len(word_freq) if word_freq else 0, 2740 }, 2741 2742 # Sentiment analysis 2743 "sentiment": { 2744 "overall_polarity": sentiment.polarity, # -1 to 1 (negative to positive) 2745 "overall_subjectivity": sentiment.subjectivity, # 0 to 1 (objective to subjective) 2746 "polarity_variance": polarity_variance, 2747 "subjectivity_variance": subjectivity_variance, 2748 "most_positive_sentence": most_positive_sentence, 2749 "most_negative_sentence": most_negative_sentence, 2750 "most_subjective_sentence": most_subjective_sentence, 2751 "most_objective_sentence": most_objective_sentence, 2752 "sentiment_shifts": sum(1 for i in range(1, len(sentence_polarities)) 2753 if (sentence_polarities[i-1] > 0 and sentence_polarities[i] < 0) or 2754 (sentence_polarities[i-1] < 0 and sentence_polarities[i] > 0)), 2755 "sentiment_progression": "positive_trend" if sum(1 for i in range(1, len(sentence_polarities)) 2756 if sentence_polarities[i] > sentence_polarities[i-1]) > len(sentence_polarities) / 2 2757 else "negative_trend" if sum(1 for i in range(1, len(sentence_polarities)) 2758 if sentence_polarities[i] < sentence_polarities[i-1]) > len(sentence_polarities) / 2 2759 else "neutral_trend", 2760 "categorical_sentiment": { 2761 "label": categorical_sentiment, 2762 "positive_percentage": positive_percentage, 2763 "negative_percentage": negative_percentage, 2764 "neutral_percentage": neutral_percentage, 2765 "positive_sentence_count": positive_count, 2766 "negative_sentence_count": negative_count, 2767 "neutral_sentence_count": neutral_count 2768 } 2769 }, 2770 2771 # Preprocessing results 2772 "preprocessing": { 2773 "filtered_words_count": len(filtered_words), 2774 "stopwords_removed": len(words_no_punct) - len(filtered_words), 2775 "stemmed_words_sample": stemmed_words[:10] if stemmed_words else [], 2776 "lemmatized_words_sample": lemmatized_words[:10] if lemmatized_words else [], 2777 }, 2778 2779 # Text summarization 2780 "summarization": { 2781 "extractive_summary": summary, 2782 "key_sentences": [s[0] for s in top_sentences], 2783 "tfidf_top_terms": tfidf_top_terms 2784 }, 2785 2786 # Text cohesion metrics 2787 "cohesion": { 2788 "transitional_word_count": transition_count, 2789 "cohesion_score": cohesion_score, 2790 "transitions_beginning": transitions_beginning, 2791 "transitions_middle": transitions_middle, 2792 "transitions_end": transitions_end, 2793 "connector_distribution": "front_loaded" if transitions_beginning > transitions_middle and transitions_beginning > transitions_end 2794 else "end_loaded" if transitions_end > transitions_beginning and transitions_end > transitions_middle 2795 else "evenly_distributed" 2796 } 2797 } 2798 2799 # Add advanced results if they were computed 2800 if advanced and advanced_results: 2801 for key, value in advanced_results.items(): 2802 analysis_results[key] = value 2803 2804 analysis_results["language"] = detect_language(text) 2805 2806 # Topic modeling (simple keyword-based approach) 2807 topic_keywords = filtered_word_freq.most_common(10) 2808 analysis_results["topic_analysis"] = { 2809 "possible_topics": topic_keywords 2810 } 2811 2812 # Contextual analysis (identifying context patterns) 2813 contextual_analysis = {} 2814 2815 # Temporal references 2816 temporal_markers = ["today", "yesterday", "tomorrow", "now", "then", "before", "after", 2817 "while", "during", "soon", "later", "earlier", "recently", "ago"] 2818 temporal_references = sum(word_freq.get(marker, 0) for marker in temporal_markers) 2819 2820 # Spatial references 2821 spatial_markers = ["here", "there", "above", "below", "behind", "in front", "nearby", 2822 "inside", "outside", "around", "between", "among", "everywhere"] 2823 spatial_references = sum(word_freq.get(marker, 0) for marker in spatial_markers) 2824 2825 # Personal references 2826 first_person = sum(word_freq.get(marker, 0) for marker in ["i", "me", "my", "mine", "we", "us", "our", "ours"]) 2827 second_person = sum(word_freq.get(marker, 0) for marker in ["you", "your", "yours"]) 2828 third_person = sum(word_freq.get(marker, 0) for marker in ["he", "him", "his", "she", "her", "hers", "they", "them", "their", "theirs"]) 2829 2830 contextual_analysis["temporal_references"] = temporal_references 2831 contextual_analysis["spatial_references"] = spatial_references 2832 contextual_analysis["first_person_references"] = first_person 2833 contextual_analysis["second_person_references"] = second_person 2834 contextual_analysis["third_person_references"] = third_person 2835 contextual_analysis["narration_perspective"] = "first_person" if first_person > second_person and first_person > third_person else \ 2836 "second_person" if second_person > first_person and second_person > third_person else \ 2837 "third_person" 2838 2839 analysis_results["contextual_analysis"] = contextual_analysis 2840 2841 # Detect writing style (tentative classification) 2842 style_markers = {} 2843 2844 # Formality markers 2845 formal_markers = ["therefore", "thus", "consequently", "furthermore", "moreover", "hence", 2846 "accordingly", "subsequently", "previously", "regarding", "concerning"] 2847 informal_markers = ["anyway", "basically", "actually", "kinda", "like", "so", "pretty", 2848 "totally", "really", "hopefully", "maybe", "ok", "okay", "stuff"] 2849 2850 style_markers["formal_marker_count"] = sum(word_freq.get(marker, 0) for marker in formal_markers) 2851 style_markers["informal_marker_count"] = sum(word_freq.get(marker, 0) for marker in informal_markers) 2852 style_markers["contraction_count"] = len(re.findall(r"\b\w+'[ts]|\b\w+n't\b|\b\w+'ll\b|\b\w+'re\b|\b\w+'ve\b", original_text)) 2853 style_markers["exclamation_count"] = original_text.count("!") 2854 style_markers["question_count"] = original_text.count("?") 2855 style_markers["parenthetical_count"] = len(re.findall(r"\([^)]*\)", original_text)) 2856 style_markers["semicolon_count"] = original_text.count(";") 2857 2858 # Tentative style classification 2859 formality_score = (style_markers["formal_marker_count"] + style_markers["semicolon_count"] * 2 + 2860 style_markers["parenthetical_count"]) - (style_markers["informal_marker_count"] + 2861 style_markers["contraction_count"] + style_markers["exclamation_count"] * 2) 2862 2863 if formality_score > 5: 2864 style_markers["style_classification"] = "formal_academic" 2865 elif formality_score > 0: 2866 style_markers["style_classification"] = "formal" 2867 elif formality_score > -5: 2868 style_markers["style_classification"] = "neutral" 2869 elif formality_score > -10: 2870 style_markers["style_classification"] = "informal" 2871 else: 2872 style_markers["style_classification"] = "very_informal" 2873 2874 analysis_results["style_analysis"] = style_markers 2875 2876 # Detect potential rhetoric patterns 2877 rhetoric_patterns = {} 2878 2879 # Repetition patterns 2880 repeated_bigrams = [bg for bg, count in bigram_freq.items() if count > 2] 2881 repeated_trigrams = [tg for tg, count in trigram_freq.items() if count > 2] 2882 2883 # Question patterns 2884 rhetorical_questions = sum(1 for sentence in original_sentences if 2885 sentence.endswith("?") and any(word in sentence.lower() for word in 2886 ["why", "who", "what", "how", "when", "where"])) 2887 2888 # Comparison patterns (similes) 2889 similes = len(re.findall(r"\b(like|as) a\b|\b(like|as) the\b", original_text.lower())) 2890 2891 # Alliteration (simplified detection) 2892 alliterations = 0 2893 for i in range(len(original_words) - 2): 2894 if (len(original_words[i]) > 0 and len(original_words[i+1]) > 0 and len(original_words[i+2]) > 0 and 2895 original_words[i][0].lower() == original_words[i+1][0].lower() == original_words[i+2][0].lower()): 2896 alliterations += 1 2897 2898 rhetoric_patterns["repeated_phrases"] = repeated_bigrams + repeated_trigrams 2899 rhetoric_patterns["rhetorical_questions"] = rhetorical_questions 2900 rhetoric_patterns["similes"] = similes 2901 rhetoric_patterns["alliterations"] = alliterations 2902 2903 analysis_results["rhetoric_patterns"] = rhetoric_patterns 2904 2905 # Potential bias indicators 2906 bias_indicators = {} 2907 2908 # Extreme language 2909 extreme_markers = ["always", "never", "all", "none", "every", "only", "impossible", 2910 "absolutely", "undoubtedly", "certainly", "definitely", "completely", 2911 "total", "totally", "utterly", "best", "worst", "perfect"] 2912 extreme_language = sum(word_freq.get(marker, 0) for marker in extreme_markers) 2913 2914 # Loaded language 2915 emotionally_loaded = ["amazing", "terrible", "awesome", "horrible", "wonderful", "dreadful", 2916 "excellent", "awful", "extraordinary", "appalling", "incredible", "disgusting"] 2917 loaded_language = sum(word_freq.get(marker, 0) for marker in emotionally_loaded) 2918 2919 bias_indicators["extreme_language_count"] = extreme_language 2920 bias_indicators["loaded_language_count"] = loaded_language 2921 bias_indicators["extreme_language_ratio"] = extreme_language / len(words) if words else 0 2922 bias_indicators["loaded_language_ratio"] = loaded_language / len(words) if words else 0 2923 2924 # Simple bias classification 2925 if bias_indicators["extreme_language_ratio"] > 0.05 or bias_indicators["loaded_language_ratio"] > 0.05: 2926 bias_indicators["bias_classification"] = "potentially_biased" 2927 else: 2928 bias_indicators["bias_classification"] = "relatively_neutral" 2929 2930 analysis_results["bias_indicators"] = bias_indicators 2931 2932 if advanced and len(filtered_words) >= 20: 2933 try: 2934 similarity_analysis = {} 2935 2936 # Create co-occurrence matrix (simplified word embedding alternative) 2937 vectorizer = CountVectorizer(ngram_range=(1, 1), token_pattern=r'\b\w+\b', min_df=2) 2938 X = vectorizer.fit_transform(sentences) 2939 features = vectorizer.get_feature_names_out() 2940 2941 # Convert to array for easier manipulation 2942 X_array = X.toarray() 2943 2944 # Compute pairwise distance between terms 2945 # Use cosine similarity between term vectors 2946 term_similarity = {} 2947 2948 for i, term1 in enumerate(features): 2949 if i < len(X_array[0]): # Safety check 2950 term_vec1 = X_array[:, i] 2951 for j, term2 in enumerate(features): 2952 if j < len(X_array[0]) and i != j: # Skip self-comparison 2953 term_vec2 = X_array[:, j] 2954 # Compute cosine similarity 2955 similarity = 1 - distance.cosine(term_vec1, term_vec2) 2956 2957 if similarity > 0.5: # Only keep high similarity pairs 2958 if term1 not in term_similarity: 2959 term_similarity[term1] = [] 2960 term_similarity[term1].append((term2, similarity)) 2961 2962 # Sort and keep top similar terms 2963 for term in term_similarity: 2964 term_similarity[term] = sorted(term_similarity[term], key=lambda x: x[1], reverse=True)[:5] 2965 2966 # Get top terms with most connections 2967 top_connected_terms = sorted(term_similarity.items(), key=lambda x: len(x[1]), reverse=True)[:10] 2968 2969 similarity_analysis["term_similarity"] = {term: similar for term, similar in top_connected_terms} 2970 2971 advanced_results["similarity_analysis"] = similarity_analysis 2972 except Exception as sim_error: 2973 logger.warning(f"Similarity analysis error: {sim_error}") 2974 advanced_results["similarity_analysis"] = {"error": str(sim_error)} 2975 2976 # Emotion detection (beyond just sentiment) 2977 try: 2978 emotion_analysis = {} 2979 2980 # Basic emotion lexicons 2981 emotions = { 2982 "joy": ["happy", "joy", "delight", "glad", "pleased", "excited", "thrilled", "elated"], 2983 "sadness": ["sad", "unhappy", "sorrow", "depressed", "miserable", "downcast", "gloomy"], 2984 "anger": ["angry", "mad", "furious", "outraged", "annoyed", "irritated", "livid"], 2985 "fear": ["afraid", "fear", "scared", "terrified", "worried", "anxious", "nervous"], 2986 "surprise": ["surprised", "amazed", "astonished", "shocked", "stunned", "startled"], 2987 "disgust": ["disgusted", "revolted", "repulsed", "sickened", "appalled"] 2988 } 2989 2990 # Count emotion words 2991 emotion_counts = {} 2992 for emotion, emotion_words in emotions.items(): 2993 emotion_counts[emotion] = sum(word_freq.get(word, 0) for word in emotion_words) 2994 2995 # Calculate dominant emotion 2996 dominant_emotion = max(emotion_counts.items(), key=lambda x: x[1]) if emotion_counts else ("neutral", 0) 2997 2998 # Calculate emotion intensity (percent of all emotion words that belong to dominant emotion) 2999 total_emotion_words = sum(emotion_counts.values()) 3000 dominant_intensity = (dominant_emotion[1] / total_emotion_words) if total_emotion_words > 0 else 0 3001 3002 emotion_analysis["emotion_counts"] = emotion_counts 3003 emotion_analysis["dominant_emotion"] = dominant_emotion[0] 3004 emotion_analysis["dominant_intensity"] = dominant_intensity 3005 emotion_analysis["emotional_diversity"] = len([e for e, c in emotion_counts.items() if c > 0]) 3006 3007 analysis_results["emotion_analysis"] = emotion_analysis 3008 except Exception as emo_error: 3009 logger.warning(f"Emotion analysis error: {emo_error}") 3010 analysis_results["emotion_analysis"] = {"error": str(emo_error)} 3011 3012 # Get metadata about the analysis 3013 metadata = { 3014 "timestamp": datetime.now(timezone.utc).isoformat(), 3015 "analysis_version": "2.0", 3016 "text_length_category": "short" if len(words) < 100 else "medium" if len(words) < 500 else "long", 3017 "advanced_analysis_performed": advanced, 3018 "domain_specific_analysis": domain_specific 3019 } 3020 3021 analysis_results["metadata"] = metadata 3022 3023 return analysis_results 3024 3025 except Exception as e: 3026 logger.error(f"Text analysis error: {e}") 3027 import traceback 3028 return { 3029 "error": str(e), 3030 "traceback": traceback.format_exc() 3031 }
Perform comprehensive text analytics with advanced NLP techniques.
Args: text (str): Input text advanced (bool): Whether to perform computationally intensive advanced analysis domain_specific (str): Optional domain for specialized analysis (e.g., "academic", "social_media", "customer_reviews")
Returns: Dict: Comprehensive analysis results
2080def summarize_text(text: str, sentences: int = 5) -> str: 2081 """ 2082 Create a simple extractive summary from the text. 2083 2084 Args: 2085 text (str): Input text to summarize 2086 sentences (int): Number of sentences to include 2087 2088 Returns: 2089 str: Summarized text 2090 """ 2091 try: 2092 2093 # Tokenize and calculate word frequencies 2094 stop_words = set(stopwords.words('english')) 2095 sentences_list = sent_tokenize(text) 2096 2097 # If there are fewer sentences than requested, return all 2098 if len(sentences_list) <= sentences: 2099 return text 2100 2101 word_frequencies = {} 2102 for sentence in sentences_list: 2103 for word in nltk.word_tokenize(sentence): 2104 word = word.lower() 2105 if word not in stop_words: 2106 if word not in word_frequencies: 2107 word_frequencies[word] = 1 2108 else: 2109 word_frequencies[word] += 1 2110 2111 # Normalize frequencies 2112 maximum_frequency = max(word_frequencies.values()) if word_frequencies else 1 2113 for word in word_frequencies: 2114 word_frequencies[word] = word_frequencies[word] / maximum_frequency 2115 2116 # Score sentences 2117 sentence_scores = {} 2118 for i, sentence in enumerate(sentences_list): 2119 for word in nltk.word_tokenize(sentence.lower()): 2120 if word in word_frequencies: 2121 if i not in sentence_scores: 2122 sentence_scores[i] = word_frequencies[word] 2123 else: 2124 sentence_scores[i] += word_frequencies[word] 2125 2126 # Get top N sentences 2127 summary_sentences = sorted(sentence_scores.items(), key=lambda x: x[1], reverse=True)[:sentences] 2128 summary_sentences = [sentences_list[i] for i, _ in sorted(summary_sentences)] 2129 2130 return ' '.join(summary_sentences) 2131 except Exception as e: 2132 logger.error(f"Summarization error: {e}") 2133 return text
Create a simple extractive summary from the text.
Args: text (str): Input text to summarize sentences (int): Number of sentences to include
Returns: str: Summarized text
2030def translate_text(text: str, target_lang: str = "en") -> Optional[str]: 2031 """ 2032 Translate text to target language. 2033 2034 Args: 2035 text (str): Input text to translate 2036 target_lang (str): Target language code (e.g., 'en', 'es', 'fr', 'ja' for Japanese) 2037 2038 Returns: 2039 Optional[str]: Translated text or None on failure 2040 """ 2041 try: 2042 2043 # Handle long texts by splitting into chunks (Google has a limit) 2044 max_chunk_size = 4500 # Google Translate has a limit around 5000 chars 2045 chunks = [] 2046 2047 # Split text into chunks of appropriate size (at sentence boundaries if possible) 2048 text_remaining = text 2049 while len(text_remaining) > 0: 2050 if len(text_remaining) <= max_chunk_size: 2051 chunks.append(text_remaining) 2052 break 2053 2054 # Try to find a sentence boundary near the max chunk size 2055 chunk_end = max_chunk_size 2056 while chunk_end > 0 and text_remaining[chunk_end] not in ['.', '!', '?', '\n']: 2057 chunk_end -= 1 2058 2059 # If no good sentence boundary found, just use max size 2060 if chunk_end == 0: 2061 chunk_end = max_chunk_size 2062 else: 2063 chunk_end += 1 # Include the period or boundary character 2064 2065 chunks.append(text_remaining[:chunk_end]) 2066 text_remaining = text_remaining[chunk_end:] 2067 2068 # Translate each chunk and combine 2069 translated_chunks = [] 2070 for chunk in chunks: 2071 translated_chunk = GoogleTranslator(source='auto', target=target_lang).translate(chunk) 2072 translated_chunks.append(translated_chunk) 2073 2074 return ' '.join(translated_chunks) 2075 except Exception as e: 2076 logger.error(f"Translation error: {e}") 2077 return None
Translate text to target language.
Args: text (str): Input text to translate target_lang (str): Target language code (e.g., 'en', 'es', 'fr', 'ja' for Japanese)
Returns: Optional[str]: Translated text or None on failure
2003def list_available_languages() -> Dict[str, str]: 2004 """ 2005 Get a dictionary of available languages for translation. 2006 2007 Returns: 2008 Dict[str, str]: Dictionary mapping language codes to language names 2009 """ 2010 try: 2011 # Get available languages from the translator 2012 languages = GoogleTranslator().get_supported_languages(as_dict=True) 2013 return languages 2014 except Exception as e: 2015 logger.error(f"Error getting language list: {e}") 2016 # Return a small subset as fallback 2017 return { 2018 "en": "English", 2019 "es": "Spanish", 2020 "fr": "French", 2021 "de": "German", 2022 "it": "Italian", 2023 "ja": "Japanese", 2024 "ko": "Korean", 2025 "zh-cn": "Chinese (Simplified)", 2026 "ru": "Russian", 2027 "ar": "Arabic" 2028 }
Get a dictionary of available languages for translation.
Returns: Dict[str, str]: Dictionary mapping language codes to language names
1982def detect_language(text: str) -> str: 1983 """ 1984 Detect the language of the extracted text. 1985 1986 Args: 1987 text (str): Input text 1988 1989 Returns: 1990 str: Detected language code or 'unknown' 1991 """ 1992 languages = list_available_languages() 1993 language_names = {code: name for name, code in languages.items()} 1994 try: 1995 import langdetect 1996 lang = langdetect.detect(text) 1997 return language_names[lang] 1998 except: 1999 logger.warning("Language detection failed or langdetect not installed") 2000 return "unknown"
Detect the language of the extracted text.
Args: text (str): Input text
Returns: str: Detected language code or 'unknown'
429def scrape_website(url: str, max_pages: int = 1, stay_on_domain: bool = True) -> Dict[str, str]: 430 """ 431 Scrape multiple pages of a website. 432 433 Args: 434 url (str): Starting URL 435 max_pages (int): Maximum pages to scrape 436 stay_on_domain (bool): Whether to stay on the same domain 437 438 Returns: 439 Dict[str, str]: Dictionary mapping URLs to extracted text 440 """ 441 results = {} 442 visited = set() 443 to_visit = [url] 444 base_domain = urlparse(url).netloc 445 446 while to_visit and len(visited) < max_pages: 447 current_url = to_visit.pop(0) 448 if current_url in visited: 449 continue 450 451 # Extract text from current page 452 text = text_from_url(current_url) 453 if text: 454 results[current_url] = text 455 456 visited.add(current_url) 457 458 # Find links on the page 459 session = HTMLSession() 460 try: 461 r = session.get(current_url) 462 r.html.render(timeout=20, sleep=1) 463 464 links = r.html.absolute_links 465 for link in links: 466 link_domain = urlparse(link).netloc 467 if link not in visited and link not in to_visit: 468 # Check if we should follow this link 469 if stay_on_domain and link_domain != base_domain: 470 continue 471 to_visit.append(link) 472 except Exception as e: 473 logger.error(f"Error scraping {current_url}: {e}") 474 finally: 475 session.close() 476 477 return results
Scrape multiple pages of a website.
Args: url (str): Starting URL max_pages (int): Maximum pages to scrape stay_on_domain (bool): Whether to stay on the same domain
Returns: Dict[str, str]: Dictionary mapping URLs to extracted text
175def normalize_text( 176 text: str 177) -> str: 178 """ 179 Replace multiple consecutive newlines, carriage returns, and spaces 180 with a single space. Ensures compact, single-line output. 181 182 Args: 183 text (str): Raw input text. 184 185 Returns: 186 str: Normalized single-line text. 187 """ 188 if not text: 189 return "" 190 text = unicodedata.normalize("NFKC", text) 191 text = re.sub(r' +', ' ', text) 192 text = re.sub(r'\n+', '\n', text) 193 text = re.sub(r'(?m)(^ \n)+', '\n', text) 194 text = re.sub(r'\t+', '\t', text) 195 text = re.sub(r'\r+', '\n', text) 196 text = re.sub(r"^ ", "", text, flags=re.MULTILINE) 197 return text
Replace multiple consecutive newlines, carriage returns, and spaces with a single space. Ensures compact, single-line output.
Args: text (str): Raw input text.
Returns: str: Normalized single-line text.