Source code for scitex_browser.remote.ZenRowsAPIClient

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: 2025-07-31 23:30:00
# Author: ywatanabe
# File: /home/ywatanabe/proj/SciTeX-Code/src/scitex/scholar/browser/remote/_ZenRowsAPIBrowser.py
# ----------------------------------------
from __future__ import annotations

import os

__FILE__ = "./src/scitex/scholar/browser/remote/_ZenRowsAPIBrowser.py"
__DIR__ = os.path.dirname(__FILE__)
# ----------------------------------------

"""
ZenRows API-based browser for reliable page rendering and screenshot capture.
This uses the ZenRows API directly instead of WebSocket for better reliability.
"""

import asyncio
import base64
import json
from pathlib import Path
from typing import Any, Dict, List, Optional

import aiohttp

import scitex_logging as logging
from scitex_browser._compat import ScholarError

logger = logging.getLogger(__name__)


[docs] class ZenRowsAPIBrowser: """Browser-like interface using ZenRows API for page rendering. This provides a simpler, more reliable alternative to WebSocket-based browser connections. It's especially good for: - Taking screenshots - Handling CAPTCHAs automatically - Getting rendered HTML content - Bypassing anti-bot measures """
[docs] def __init__( self, api_key: Optional[str] = None, proxy_country: str = "au", enable_antibot: bool = True, premium_proxy: bool = True, ): """Initialize ZenRows API browser. Args: api_key: ZenRows API key (or from env) proxy_country: Country code for proxy enable_antibot: Enable anti-bot bypass features premium_proxy: Use premium residential proxies """ self.api_key = api_key or os.getenv("SCITEX_SCHOLAR_ZENROWS_API_KEY") if not self.api_key: raise ValueError( "ZenRows API key required. Set SCITEX_SCHOLAR_ZENROWS_API_KEY" ) self.proxy_country = proxy_country self.enable_antibot = enable_antibot self.premium_proxy = premium_proxy self.base_url = "https://api.zenrows.com/v1/"
[docs] async def navigate_and_screenshot_async( self, url: str, screenshot_path: Optional[str] = None, wait_ms: int = 5000, js_instructions: Optional[List[Dict]] = None, return_html: bool = False, ) -> Dict[str, Any]: """Navigate to URL and optionally take screenshot. Args: url: Target URL screenshot_path: Path to save screenshot (None to skip) wait_ms: Additional wait time in milliseconds js_instructions: Custom JavaScript instructions return_html: Whether to return rendered HTML Returns: Dict with results including screenshot info and HTML """ # Default JS instructions for reliable rendering if js_instructions is None: js_instructions = [ {"wait": 3000}, # Initial wait {"wait_event": "networkidle"}, # Wait for network {"scroll_y": 300}, # Trigger lazy loading {"wait": 2000}, # Final wait ] # Build parameters params = { "url": url, "apikey": self.api_key, "js_render": "true", "js_instructions": json.dumps(js_instructions), "wait": str(wait_ms), } # Add optional features if self.enable_antibot: params["antibot"] = "true" if self.premium_proxy: params["premium_proxy"] = "true" if self.proxy_country: params["proxy_country"] = self.proxy_country if screenshot_path: params["screenshot"] = "true" if return_html or screenshot_path: params["json_response"] = "true" logger.debug(f"Navigating to: {url}") try: async with aiohttp.ClientSession() as session: timeout = aiohttp.ClientTimeout(total=60) async with session.get( self.base_url, params=params, timeout=timeout ) as response: if response.status != 200: error_text = await response.text() logger.error( f"ZenRows error {response.status}: {error_text[:200]}" ) return { "success": False, "error": f"API error {response.status}", "url": url, } # Handle response based on content type content_type = response.headers.get("content-type", "") if "json" in content_type: # JSON response with detailed data data = await response.json() result = { "success": True, "url": url, "html": data.get("html", "") if return_html else None, "html_length": len(data.get("html", "")), } # Handle screenshot if screenshot_path and data.get("screenshot"): screenshot_data = data["screenshot"] if screenshot_data.get("data"): image_bytes = base64.b64decode(screenshot_data["data"]) Path(screenshot_path).parent.mkdir( parents=True, exist_ok=True ) with open(screenshot_path, "wb") as f: f.write(image_bytes) result["screenshot"] = { "saved": True, "path": screenshot_path, "width": screenshot_data.get("width"), "height": screenshot_data.get("height"), } logger.success(f"Screenshot saved: {screenshot_path}") # Check JS execution report if data.get("js_instructions_report"): report = data["js_instructions_report"] result["js_report"] = { "executed": report.get("instructions_executed", 0), "succeeded": report.get("instructions_succeeded", 0), "failed": report.get("instructions_failed", 0), } # Check for CAPTCHA solving for inst in report.get("instructions", []): if inst.get( "instruction" ) == "solve_captcha" and inst.get("success"): result["captcha_solved"] = True logger.debug( f"CAPTCHA solved: {inst['params']['type']}" ) return result else: # Direct response (image or HTML) content = await response.read() if screenshot_path and len(content) > 1000: # Save as image Path(screenshot_path).parent.mkdir( parents=True, exist_ok=True ) with open(screenshot_path, "wb") as f: f.write(content) logger.success(f"Screenshot saved: {screenshot_path}") return { "success": True, "url": url, "screenshot": { "saved": True, "path": screenshot_path, "size_bytes": len(content), }, } elif return_html: # Return as HTML html = content.decode("utf-8", errors="ignore") return { "success": True, "url": url, "html": html, "html_length": len(html), } return {"success": True, "url": url} except asyncio.TimeoutError: logger.error("Request timed out - page may require manual intervention") return {"success": False, "error": "Timeout", "url": url} except Exception as e: logger.error(f"Error navigating to {url}: {e}") return {"success": False, "error": str(e), "url": url}
[docs] async def get_pdf_url_async( self, doi: str, use_openurl: bool = True ) -> Optional[str]: """Try to get PDF URL for a DOI. Args: doi: DOI to resolve use_openurl: Whether to try OpenURL resolver first Returns: PDF URL if found, None otherwise """ urls_to_try = [] # Try OpenURL first if requested if use_openurl: openurl_base = os.getenv( "SCITEX_SCHOLAR_OPENURL_RESOLVER_URL", "https://unimelb.hosted.exlibrisgroup.com/sfxlcl41", ) openurl = f"{openurl_base}?url_ver=Z39.88-2004&rft_id=info:doi/{doi}&svc_id=fulltext" urls_to_try.append(openurl) # Direct DOI urls_to_try.append(f"https://doi.org/{doi}") for url in urls_to_try: result = await self.navigate_and_screenshot_async( url, return_html=True, wait_ms=8000, # Longer wait for redirects ) if result.get("success") and result.get("html"): html = result["html"] # Simple PDF URL extraction import re pdf_patterns = [ r'href="([^"]+\.pdf[^"]*)"', r'content="([^"]+\.pdf[^"]*)"', r'url["\']?\s*:\s*["\']([^"\']+\.pdf[^"\']*)', ] for pattern in pdf_patterns: match = re.search(pattern, html, re.IGNORECASE) if match: pdf_url = match.group(1) logger.debug(f"Found PDF URL: {pdf_url}") return pdf_url return None
[docs] async def batch_screenshot_async( self, urls: List[str], output_dir: str, max_concurrent: int = 3 ) -> List[Dict[str, Any]]: """Take screenshots of multiple URLs concurrently. Args: urls: List of URLs to screenshot output_dir: Directory to save screenshots max_concurrent: Max concurrent requests Returns: List of results for each URL """ Path(output_dir).mkdir(parents=True, exist_ok=True) async def process_url_async(url: str, index: int) -> Dict[str, Any]: """Process single URL.""" filename = f"screenshot_{index:03d}.png" filepath = os.path.join(output_dir, filename) result = await self.navigate_and_screenshot_async(url, filepath) result["index"] = index return result # Process with limited concurrency semaphore = asyncio.Semaphore(max_concurrent) async def process_with_limit_async(url: str, index: int): async with semaphore: return await process_url_async(url, index) tasks = [process_with_limit_async(url, i) for i, url in enumerate(urls)] results = await asyncio.gather(*tasks, return_exceptions=True) # Convert exceptions to error results final_results = [] for i, result in enumerate(results): if isinstance(result, Exception): final_results.append( {"success": False, "error": str(result), "url": urls[i], "index": i} ) else: final_results.append(result) # Summary successful = sum(1 for r in final_results if r.get("success")) logger.debug(f"Screenshots: {successful}/{len(urls)} successful") return final_results