#!/usr/bin/env python3
# File: /home/ywatanabe/proj/scitex-code/src/scitex/scholar/core/journal_normalizer.py
"""
Journal Name Normalizer.
Handles journal name variations, abbreviations, and historical names
using ISSN-L as the unique identifier (single source of truth).
Data sources:
- OpenAlex API (display_name, alternate_titles, abbreviated_title, issn_l)
- Crossref API (container-title, short-container-title)
- Local cache with 1-day TTL
Usage:
from scitex_scholar.core import JournalNormalizer
normalizer = JournalNormalizer.get_instance()
# Normalize any journal name variant
canonical = normalizer.normalize("J. Neurosci.") # → "Journal of Neuroscience"
# Get ISSN-L for a journal
issn_l = normalizer.get_issn_l("PLOS ONE") # → "1932-6203"
# Check if two names refer to same journal
normalizer.is_same_journal("J Neurosci", "Journal of Neuroscience") # → True
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import aiohttp
import scitex_logging as logging
logger = logging.getLogger(__name__)
# Cache settings
CACHE_TTL_SECONDS = 86400 # 1 day
OPENALEX_SOURCES_URL = "https://api.openalex.org/sources"
OPENALEX_POLITE_EMAIL = "research@scitex.io"
def _get_default_cache_dir() -> Path:
"""Get default cache directory respecting SCITEX_DIR env var."""
scitex_dir = os.environ.get("SCITEX_DIR", "~/.scitex")
return Path(scitex_dir).expanduser() / "scholar" / "cache"
def _normalize_name(name: str) -> str:
"""
Basic string normalization for matching.
- Lowercase
- Remove extra whitespace
- Normalize punctuation
"""
if not name:
return ""
# Lowercase
name = name.lower()
# Normalize whitespace
name = " ".join(name.split())
# Remove common punctuation variations
name = name.replace(".", "").replace(",", "").replace(":", "")
# Normalize ampersand
name = name.replace(" & ", " and ")
return name.strip()
def _normalize_issn(issn: str) -> str:
"""Normalize ISSN format to XXXX-XXXX."""
if not issn:
return ""
issn = issn.upper().replace("-", "").replace(" ", "")
if len(issn) == 8:
return f"{issn[:4]}-{issn[4:]}"
return issn
[docs]
class JournalNormalizer:
"""
Journal name normalizer using ISSN-L as unique identifier.
Handles:
- Full names ↔ abbreviations
- Name variants (spelling, punctuation, capitalization)
- Historical/former names
- Publisher variations
Data is cached locally with daily refresh from OpenAlex.
"""
_instance: Optional[JournalNormalizer] = None
[docs]
def __init__(self, cache_dir: Optional[Path] = None):
self._cache_dir = cache_dir or _get_default_cache_dir()
self._cache_file = self._cache_dir / "journal_normalizer_cache.json"
# Core mappings (ISSN-L is the key)
self._issn_l_data: Dict[str, Dict[str, Any]] = {} # ISSN-L → full metadata
# Lookup indexes (for fast search)
self._name_to_issn_l: Dict[str, str] = {} # normalized name → ISSN-L
self._issn_to_issn_l: Dict[str, str] = {} # any ISSN → ISSN-L
self._abbrev_to_issn_l: Dict[str, str] = {} # abbreviated name → ISSN-L
# Stats
self._last_updated: float = 0
self._loaded = False
self._journal_count = 0
[docs]
@classmethod
def get_instance(cls, cache_dir: Optional[Path] = None) -> JournalNormalizer:
"""Get singleton instance."""
if cls._instance is None:
cls._instance = cls(cache_dir)
return cls._instance
[docs]
def _is_cache_valid(self) -> bool:
"""Check if cache exists and is within TTL."""
if not self._cache_file.exists():
return False
try:
with open(self._cache_file) as f:
data = json.load(f)
cached_time = data.get("timestamp", 0)
return (time.time() - cached_time) < CACHE_TTL_SECONDS
except (OSError, json.JSONDecodeError):
return False
[docs]
def _load_from_cache(self) -> bool:
"""Load cached data from file."""
if not self._cache_file.exists():
return False
try:
with open(self._cache_file) as f:
data = json.load(f)
self._issn_l_data = data.get("issn_l_data", {})
self._name_to_issn_l = data.get("name_to_issn_l", {})
self._issn_to_issn_l = data.get("issn_to_issn_l", {})
self._abbrev_to_issn_l = data.get("abbrev_to_issn_l", {})
self._last_updated = data.get("timestamp", 0)
self._journal_count = len(self._issn_l_data)
self._loaded = True
logger.info(f"Loaded {self._journal_count} journals from normalizer cache")
return True
except (OSError, json.JSONDecodeError) as e:
logger.warning(f"Failed to load journal normalizer cache: {e}")
return False
[docs]
def _save_to_cache(self) -> None:
"""Save current data to cache file."""
try:
self._cache_dir.mkdir(parents=True, exist_ok=True)
data = {
"timestamp": time.time(),
"journal_count": len(self._issn_l_data),
"issn_l_data": self._issn_l_data,
"name_to_issn_l": self._name_to_issn_l,
"issn_to_issn_l": self._issn_to_issn_l,
"abbrev_to_issn_l": self._abbrev_to_issn_l,
}
with open(self._cache_file, "w") as f:
json.dump(data, f)
logger.info(f"Saved {len(self._issn_l_data)} journals to normalizer cache")
except OSError as e:
logger.warning(f"Failed to save journal normalizer cache: {e}")
[docs]
def _add_journal(self, source_data: Dict[str, Any]) -> None:
"""
Add a journal to the normalizer from OpenAlex source data.
Args:
source_data: OpenAlex source object with display_name, issn_l, etc.
"""
issn_l = source_data.get("issn_l")
if not issn_l:
return
issn_l = _normalize_issn(issn_l)
display_name = source_data.get("display_name", "")
abbreviated_title = source_data.get("abbreviated_title", "")
alternate_titles = source_data.get("alternate_titles", []) or []
issns = source_data.get("issn", []) or []
is_oa = source_data.get("is_oa", False)
# Store full metadata
self._issn_l_data[issn_l] = {
"canonical_name": display_name,
"abbreviated_title": abbreviated_title,
"alternate_titles": alternate_titles,
"issns": [_normalize_issn(i) for i in issns if i],
"is_oa": is_oa,
"publisher": source_data.get("host_organization_name", ""),
}
# Build lookup indexes
# 1. Canonical name
if display_name:
norm_name = _normalize_name(display_name)
self._name_to_issn_l[norm_name] = issn_l
# 2. Alternate titles (variants)
for alt in alternate_titles:
if alt:
norm_alt = _normalize_name(alt)
if norm_alt and norm_alt not in self._name_to_issn_l:
self._name_to_issn_l[norm_alt] = issn_l
# 3. Abbreviated title
if abbreviated_title:
norm_abbrev = _normalize_name(abbreviated_title)
self._abbrev_to_issn_l[norm_abbrev] = issn_l
# Also add without periods (common variation)
self._abbrev_to_issn_l[norm_abbrev.replace(".", "")] = issn_l
# 4. All ISSNs → ISSN-L
for issn in issns:
if issn:
norm_issn = _normalize_issn(issn)
self._issn_to_issn_l[norm_issn] = issn_l
self._issn_to_issn_l[issn_l] = issn_l # Self-reference
[docs]
async def _fetch_journals_async(
self, max_pages: int = 500, filter_oa_only: bool = False
) -> None:
"""
Fetch journal data from OpenAlex API.
Args:
max_pages: Maximum pages to fetch (200 per page)
filter_oa_only: If True, only fetch OA journals
"""
per_page = 200
cursor = "*"
pages_fetched = 0
# Select fields to minimize response size
select_fields = "display_name,issn_l,issn,abbreviated_title,alternate_titles,is_oa,host_organization_name"
filter_param = "is_oa:true" if filter_oa_only else "type:journal"
async with aiohttp.ClientSession() as session:
while pages_fetched < max_pages:
url = (
f"{OPENALEX_SOURCES_URL}"
f"?filter={filter_param}"
f"&per_page={per_page}"
f"&cursor={cursor}"
f"&mailto={OPENALEX_POLITE_EMAIL}"
f"&select={select_fields}"
)
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status != 200:
logger.warning(f"OpenAlex API returned {resp.status}")
break
data = await resp.json()
results = data.get("results", [])
if not results:
break
for source in results:
self._add_journal(source)
# Get next cursor
meta = data.get("meta", {})
next_cursor = meta.get("next_cursor")
if not next_cursor or next_cursor == cursor:
break
cursor = next_cursor
pages_fetched += 1
# Progress log
if pages_fetched % 20 == 0:
logger.info(
f"Fetched {pages_fetched} pages, {len(self._issn_l_data)} journals..."
)
except asyncio.TimeoutError:
logger.warning("OpenAlex API timeout")
break
except Exception as e:
logger.error(f"Error fetching journals: {e}")
break
self._journal_count = len(self._issn_l_data)
self._last_updated = time.time()
self._loaded = True
if self._journal_count > 0:
self._save_to_cache()
logger.info(f"Fetched {self._journal_count} journals from OpenAlex")
[docs]
def _fetch_journals_sync(
self, max_pages: int = 500, filter_oa_only: bool = False
) -> None:
"""Synchronous wrapper for fetching journals (handles nested event loops)."""
import concurrent.futures
try:
asyncio.get_running_loop()
# Already in async context - use thread to avoid nested loop error
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, self._fetch_journals_async(max_pages, filter_oa_only)
)
future.result(timeout=120)
except RuntimeError:
# No running loop - safe to run directly
asyncio.run(self._fetch_journals_async(max_pages, filter_oa_only))
[docs]
def ensure_loaded(self, force_refresh: bool = False, max_pages: int = 500) -> None:
"""
Ensure cache is loaded, fetching from API if needed.
Args:
force_refresh: Force refresh even if cache is valid
max_pages: Max pages to fetch if refreshing
"""
if self._loaded and not force_refresh and self._is_cache_valid():
return
# Try loading from cache first
if not force_refresh and self._load_from_cache() and self._is_cache_valid():
return
# Fetch from API
logger.info("Refreshing journal normalizer cache from OpenAlex...")
self._fetch_journals_sync(max_pages)
# ==================== Public API ====================
[docs]
def get_issn_l(self, journal_name: str) -> Optional[str]:
"""
Get ISSN-L for a journal name.
Args:
journal_name: Any journal name variant, abbreviation, or ISSN
Returns
-------
ISSN-L if found, None otherwise
"""
self.ensure_loaded()
if not journal_name:
return None
# Check if it's an ISSN
if re.match(r"^\d{4}-?\d{3}[\dXx]$", journal_name.replace(" ", "")):
norm_issn = _normalize_issn(journal_name)
if norm_issn in self._issn_to_issn_l:
return self._issn_to_issn_l[norm_issn]
# Try normalized name lookup
norm_name = _normalize_name(journal_name)
# Check full names
if norm_name in self._name_to_issn_l:
return self._name_to_issn_l[norm_name]
# Check abbreviations
if norm_name in self._abbrev_to_issn_l:
return self._abbrev_to_issn_l[norm_name]
return None
[docs]
def normalize(self, journal_name: str) -> Optional[str]:
"""
Normalize journal name to canonical form.
Args:
journal_name: Any journal name variant
Returns
-------
Canonical journal name, or original if not found
"""
issn_l = self.get_issn_l(journal_name)
if issn_l and issn_l in self._issn_l_data:
return self._issn_l_data[issn_l].get("canonical_name", journal_name)
return journal_name
[docs]
def get_abbreviation(self, journal_name: str) -> Optional[str]:
"""
Get abbreviated title for a journal.
Args:
journal_name: Any journal name variant
Returns
-------
Abbreviated title if available
"""
issn_l = self.get_issn_l(journal_name)
if issn_l and issn_l in self._issn_l_data:
return self._issn_l_data[issn_l].get("abbreviated_title")
return None
[docs]
def get_journal_info(self, journal_name: str) -> Optional[Dict[str, Any]]:
"""
Get full journal metadata.
Args:
journal_name: Any journal name variant
Returns
-------
Dict with canonical_name, abbreviated_title, alternate_titles, issns, is_oa, publisher
"""
issn_l = self.get_issn_l(journal_name)
if issn_l and issn_l in self._issn_l_data:
return {"issn_l": issn_l, **self._issn_l_data[issn_l]}
return None
[docs]
def is_same_journal(self, name1: str, name2: str) -> bool:
"""
Check if two names refer to the same journal.
Args:
name1: First journal name
name2: Second journal name
Returns
-------
True if both names resolve to the same ISSN-L
"""
issn_l_1 = self.get_issn_l(name1)
issn_l_2 = self.get_issn_l(name2)
if issn_l_1 and issn_l_2:
return issn_l_1 == issn_l_2
# Fallback: simple normalization comparison
return _normalize_name(name1) == _normalize_name(name2)
[docs]
def is_open_access(self, journal_name: str) -> bool:
"""
Check if journal is Open Access.
Args:
journal_name: Any journal name variant
Returns
-------
True if journal is OA
"""
issn_l = self.get_issn_l(journal_name)
if issn_l and issn_l in self._issn_l_data:
return self._issn_l_data[issn_l].get("is_oa", False)
return False
[docs]
def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""
Search for journals by name (prefix/substring match).
Args:
query: Search query
limit: Maximum results
Returns
-------
List of matching journal info dicts
"""
self.ensure_loaded()
if not query:
return []
norm_query = _normalize_name(query)
results = []
for norm_name, issn_l in self._name_to_issn_l.items():
if norm_query in norm_name:
if issn_l in self._issn_l_data:
results.append({"issn_l": issn_l, **self._issn_l_data[issn_l]})
if len(results) >= limit:
break
return results
@property
def journal_count(self) -> int:
"""Get number of cached journals."""
self.ensure_loaded()
return self._journal_count
@property
def cache_age_hours(self) -> float:
"""Get cache age in hours."""
if self._last_updated == 0:
return float("inf")
return (time.time() - self._last_updated) / 3600
# ==================== Convenience Functions ====================
[docs]
def get_journal_normalizer(cache_dir: Optional[Path] = None) -> JournalNormalizer:
"""Get the journal normalizer singleton."""
return JournalNormalizer.get_instance(cache_dir)
[docs]
def normalize_journal_name(name: str) -> Optional[str]:
"""Normalize journal name to canonical form."""
return get_journal_normalizer().normalize(name)
[docs]
def get_journal_issn_l(name: str) -> Optional[str]:
"""Get ISSN-L for a journal name."""
return get_journal_normalizer().get_issn_l(name)
[docs]
def is_same_journal(name1: str, name2: str) -> bool:
"""Check if two names refer to the same journal."""
return get_journal_normalizer().is_same_journal(name1, name2)
[docs]
def refresh_journal_cache() -> None:
"""Force refresh the journal normalizer cache."""
get_journal_normalizer().ensure_loaded(force_refresh=True)
# EOF