"""Define Betty's core application functionality."""
from __future__ import annotations
from concurrent.futures import Executor, ProcessPoolExecutor
from contextlib import asynccontextmanager
from multiprocessing import get_context
from os import environ
from pathlib import Path
from typing import TYPE_CHECKING, Self, Any, final
import aiohttp
from aiofiles.tempfile import TemporaryDirectory
from typing_extensions import override
from betty import fs
from betty.app import config
from betty.app.config import AppConfiguration
from betty.app.factory import AppDependentFactory
from betty.assets import AssetRepository
from betty.asyncio import wait_to_thread
from betty.cache.file import BinaryFileCache, PickledFileCache
from betty.cache.no_op import NoOpCache
from betty.config import Configurable, assert_configuration_file
from betty.core import CoreComponent
from betty.factory import new, DependentFactory
from betty.fetch import Fetcher, http
from betty.fs import HOME_DIRECTORY_PATH
from betty.locale import DEFAULT_LOCALE
from betty.locale.localizer import Localizer, LocalizerRepository
if TYPE_CHECKING:
from betty.cache import Cache
from collections.abc import AsyncIterator, Callable
[docs]
@final
class App(Configurable[AppConfiguration], DependentFactory[Any], CoreComponent):
"""
The Betty application.
"""
[docs]
@classmethod
@asynccontextmanager
async def new_from_environment(cls) -> AsyncIterator[Self]:
"""
Create a new application from the environment.
"""
configuration = AppConfiguration()
if config.CONFIGURATION_FILE_PATH.exists():
assert_configuration_file(configuration)(config.CONFIGURATION_FILE_PATH)
yield cls(
configuration,
Path(environ.get("BETTY_CACHE_DIRECTORY", HOME_DIRECTORY_PATH / "cache")),
cache_factory=lambda app: PickledFileCache[Any](app._cache_directory_path),
)
[docs]
@classmethod
@asynccontextmanager
async def new_temporary(cls) -> AsyncIterator[Self]:
"""
Creat a new, temporary, isolated application.
The application will not use any persistent caches, or leave
any traces on the system.
"""
async with (
TemporaryDirectory() as cache_directory_path_str,
):
yield cls(
AppConfiguration(),
Path(cache_directory_path_str),
cache_factory=lambda app: NoOpCache(),
)
@property
def assets(self) -> AssetRepository:
"""
The assets file system.
"""
if self._assets is None:
self._assert_bootstrapped()
self._assets = AssetRepository(fs.ASSETS_DIRECTORY_PATH)
return self._assets
@property
def localizer(self) -> Localizer:
"""
Get the application's localizer.
"""
if self._localizer is None:
self._assert_bootstrapped()
self._localizer = wait_to_thread(
self.localizers.get_negotiated(
self.configuration.locale or DEFAULT_LOCALE
)
)
return self._localizer
@property
def localizers(self) -> LocalizerRepository:
"""
The available localizers.
"""
if self._localizers is None:
self._assert_bootstrapped()
self._localizers = LocalizerRepository(self.assets)
return self._localizers
@property
def http_client(self) -> aiohttp.ClientSession:
"""
The HTTP client.
"""
if self._http_client is None:
self._assert_bootstrapped()
self._http_client = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit_per_host=5),
headers={
"User-Agent": "Betty (https://github.com/bartfeenstra/betty)",
},
)
wait_to_thread(
self._async_exit_stack.enter_async_context(self._http_client)
)
return self._http_client
@property
def fetcher(self) -> Fetcher:
"""
The fetcher.
"""
if self._fetcher is None:
self._assert_bootstrapped()
self._fetcher = http.HttpFetcher(
self.http_client,
self.cache.with_scope("fetch"),
self.binary_file_cache.with_scope("fetch"),
)
return self._fetcher
@property
def cache(self) -> Cache[Any]:
"""
The cache.
"""
if self._cache is None:
self._assert_bootstrapped()
self._cache = self._cache_factory(self)
return self._cache
@property
def binary_file_cache(self) -> BinaryFileCache:
"""
The binary file cache.
"""
if self._binary_file_cache is None:
self._assert_bootstrapped()
self._binary_file_cache = BinaryFileCache(self._cache_directory_path)
return self._binary_file_cache
@property
def process_pool(self) -> Executor:
"""
The shared process pool.
Use this to run CPU/computationally-heavy tasks in other processes.
"""
if self._process_pool is None:
self._assert_bootstrapped()
# Avoid `fork` so as not to start worker processes with unneeded resources.
# Settle for `spawn` so all environments use the same start method.
self._process_pool = ProcessPoolExecutor(mp_context=get_context("spawn"))
return self._process_pool