"""
A module implementing a server interface for this package.
"""
# built-in
import http
from io import StringIO
from json import loads
import logging
import mimetypes
from pathlib import Path
from typing import Any, Awaitable, Callable, Optional, TextIO, Union
from urllib.parse import urlencode
# third-party
from vcorelib import DEFAULT_ENCODING
from vcorelib.io import IndentedFileWriter, JsonObject
from vcorelib.io.bus import BUS
from vcorelib.paths import Pathlike, find_file, normalize
# internal
from runtimepy import DEFAULT_EXT, PKG_NAME
from runtimepy.channel.environment.command import GLOBAL, global_command
from runtimepy.net.html import full_markdown_page
from runtimepy.net.http.header import RequestHeader
from runtimepy.net.http.request_target import PathMaybeQuery
from runtimepy.net.http.response import AsyncFile, ResponseHeader
from runtimepy.net.server.html import HtmlApp, HtmlApps, get_html, html_handler
from runtimepy.net.server.json import encode_json, json_handler
from runtimepy.net.server.markdown import DIR_FILE, markdown_for_dir
from runtimepy.net.server.mux import mux_app
from runtimepy.net.tcp.http import HttpConnection, HttpResult
from runtimepy.util import normalize_root, path_has_part, read_binary
MIMETYPES_INIT = False
[docs]
def package_data_dir() -> Path:
"""Get this package's data directory."""
result = find_file(f"factories.{DEFAULT_EXT}", package=PKG_NAME)
assert result is not None
return result.parent
CustomAsync = Callable[
[ResponseHeader, RequestHeader, Optional[bytearray]], Awaitable[HttpResult]
]
[docs]
class RuntimepyServerConnection(HttpConnection):
"""A class implementing a server-connection interface for this package."""
# Can register application methods to URL paths.
apps: HtmlApps = {"/mux.html": mux_app}
custom: dict[str, CustomAsync] = {}
default_app: Optional[HtmlApp] = None
# Can load additional data into this dictionary for easy HTTP access.
json_data: JsonObject = {"test": {"a": 1, "b": 2, "c": 3}}
favicon_data: bytes
paths: list[Path]
class_paths: list[Pathlike] = [Path(), package_data_dir()]
class_redirect_paths: dict[Path, Union[str, Path]] = {}
# Set these to control meta attributes.
metadata: dict[str, Optional[str]] = {
"title": HttpConnection.identity,
"description": f"({HttpConnection.identity})",
}
[docs]
def add_path(self, path: Pathlike, front: bool = False) -> None:
"""Add a path."""
resolved = normalize(path).resolve()
if not front:
self.paths.append(resolved)
else:
self.paths.insert(0, resolved)
self.log_paths()
[docs]
@classmethod
def add_redirect_path(
cls, dest: Union[str, Path], *src_parts: Union[str, Path]
) -> None:
"""Add a redirect path."""
source = normalize_root(*src_parts)
assert source not in cls.class_redirect_paths, (
source,
cls.class_redirect_paths,
)
cls.class_redirect_paths[source] = dest
[docs]
def log_paths(self) -> None:
"""Log search paths."""
self.logger.debug(
"New path: %s.", ", ".join(str(x) for x in self.paths)
)
[docs]
def init(self) -> None:
"""Initialize this instance."""
global MIMETYPES_INIT # pylint: disable=global-statement
if not MIMETYPES_INIT:
mimetypes.init()
MIMETYPES_INIT = True
super().init()
# Initialize paths.
self.paths = []
for path in type(self).class_paths:
self.add_path(path)
# Load favicon if necessary.
if not hasattr(type(self), "favicon_data"):
with self.log_time("Loading favicon"):
favicon = find_file("favicon.ico", package=PKG_NAME)
assert favicon is not None
with favicon.open("rb") as favicon_fd:
type(self).favicon_data = favicon_fd.read()
[docs]
def redirect_to(
self,
path: str,
response: ResponseHeader,
status: http.HTTPStatus = http.HTTPStatus.TEMPORARY_REDIRECT,
) -> bytes:
"""Handle responding with redirection status."""
response["Location"] = path
response.status = status
return bytes()
[docs]
async def try_redirect(
self, path: PathMaybeQuery, response: ResponseHeader
) -> Optional[bytes]:
"""Try handling any HTTP redirect rules."""
result = None
curr = Path(path[0])
if curr in self.class_redirect_paths:
result = self.redirect_to(
str(self.class_redirect_paths[curr]), response
)
return result
[docs]
def render_markdown(
self,
content: str,
response: ResponseHeader,
query: Optional[str],
**kwargs,
) -> bytes:
"""Return rendered markdown content."""
meta: dict[str, str] = type(self).metadata.copy() # type: ignore
meta["description"] += (
" This page was rendered from "
f"Markdown by {HttpConnection.identity}."
)
document = get_html(**meta)
with IndentedFileWriter.string() as writer:
writer.write_markdown(content, **kwargs)
full_markdown_page(
document,
writer.stream.getvalue(), # type: ignore
uri_query=query,
)
response["Content-Type"] = f"text/html; charset={DEFAULT_ENCODING}"
with StringIO() as stream:
document.render(stream)
return stream.getvalue().encode()
[docs]
async def render_markdown_file(
self,
path: Path,
response: ResponseHeader,
query: Optional[str],
**kwargs,
) -> bytes:
"""Render a markdown file as HTML and return the result."""
return self.render_markdown(
(await read_binary(path)).decode(), response, query, **kwargs
)
[docs]
async def try_file(
self, path: PathMaybeQuery, response: ResponseHeader
) -> HttpResult:
"""Try serving this path as a file directly from the file-system."""
result: HttpResult = None
# Keep track of directories encountered.
directories: list[tuple[Path, Path]] = []
# Build a list of all candidate files to check.
candidates: list[Path] = []
for search in self.paths:
candidate = search.joinpath(path[0][1:])
if candidate.name == DIR_FILE:
candidate = candidate.parent
if candidate.is_dir():
directories.append((candidate, search))
candidates.append(candidate.joinpath("index.html"))
else:
candidates.append(candidate)
for candidate in candidates:
# Handle markdown sources.
if candidate.name:
md_candidate = candidate.with_suffix(".md")
if md_candidate.is_file():
return await self.render_markdown_file(
md_candidate, response, path[1]
)
# Handle files.
if candidate.is_file():
mime, encoding = mimetypes.guess_type(candidate, strict=False)
# Set MIME type if it can be determined.
if mime:
# webhint suggestion
if (
mime.startswith("text")
and DEFAULT_ENCODING not in mime
):
mime += f"; charset={DEFAULT_ENCODING}"
response["Content-Type"] = mime
# We don't handle this yet.
assert not encoding, (candidate, mime, encoding)
self.logger.info("Serving '%s' (MIME: %s)", candidate, mime)
response.static_resource()
# Return the file data.
result = AsyncFile(candidate)
break
# Handle a directory as a last resort.
if not result and directories:
result = self.render_markdown(
markdown_for_dir(
directories, {"applications": self.apps.keys()}
),
response,
path[1],
)
return result
[docs]
def handle_command(
self, stream: TextIO, response: ResponseHeader, args: tuple[str, ...]
) -> None:
"""Handle a command request."""
response_data: dict[str, Any] = {"your_command": args}
response_data["success"] = False
response_data["message"] = "No command executed."
def cmd_usage() -> None:
"""Set usage information for the response."""
response_data["usage"] = (
"/<environment (arg0)>/<arg1>[/.../<argN>]"
)
response_data["environments"] = list(GLOBAL)
if args:
result = global_command(args[0], " ".join(args[1:]))
if result is None:
cmd_usage()
else:
response_data["success"] = result.success
response_data["message"] = str(result)
else:
cmd_usage()
# Send response.
encode_json(stream, response, response_data)
[docs]
async def post_handler(
self,
response: ResponseHeader,
request: RequestHeader,
request_data: Optional[bytearray],
) -> HttpResult:
"""Handle POST requests."""
request.log(self.logger, False, level=logging.INFO)
result = None
with StringIO() as stream:
if request.target.origin_form:
parts = Path(request.target.path).parts[1:]
if request_data:
request_data_obj = loads(
request_data.decode(encoding=DEFAULT_ENCODING)
)
response_data: dict[str, Any] = {}
if "key" in request_data_obj:
if parts and parts[0] == "ro":
response_data["count"] = await BUS.send_ro(
request_data_obj["key"],
request_data_obj.get("data", {}),
request_data_obj.get("null_ok", False),
)
else:
response_data.update(
await BUS.send(
request_data_obj["key"],
request_data_obj.get("data", {}),
send_ro=request_data_obj.get(
"send_ro", True
),
null_ok=request_data_obj.get(
"null_ok", False
),
)
)
encode_json(stream, response, response_data)
# Treat request as a command.
else:
self.handle_command(stream, response, parts)
result = stream.getvalue().encode()
return result
[docs]
async def get_handler(
self,
response: ResponseHeader,
request: RequestHeader,
request_data: Optional[bytearray],
) -> HttpResult:
"""Handle GET requests."""
request.log(self.logger, False, level=logging.INFO)
result = None
populated = False
with StringIO() as stream:
if request.target.origin_form:
path = request.target.path
# Handle favicon (for browser clients).
if path.startswith("/favicon"):
response["Content-Type"] = "image/x-icon"
response.static_resource()
return self.favicon_data
# Check for a custom handler.
if path in self.custom:
return await self.custom[path](
response, request, request_data
)
# Try serving a file and handling redirects.
for handler in [self.try_redirect, self.try_file]:
result = await handler(
request.target.origin_form, response
)
if result is not None:
return result
# Handle raw data queries.
if path_has_part(request.target.path):
json_handler(
stream,
request,
response,
request_data,
self.json_data,
)
populated = True
# Serve the application.
else:
populated = await html_handler(
type(self).apps,
stream,
request,
response,
request_data,
default_app=type(self).default_app,
**type(self).metadata,
)
if populated:
result = stream.getvalue().encode()
return result or self.redirect_to(
f"/404.html?{urlencode({'target': request.target.raw})}",
response,
)