"""
A module implementing a basic HTTP (multiple RFC's) connection interface.
"""
# built-in
import asyncio
from copy import copy
import http
from json import loads
from typing import Any, Awaitable, Callable, Optional, Tuple, Union, cast
# third-party
from vcorelib import DEFAULT_ENCODING
from vcorelib.io import BinaryMessage
# internal
from runtimepy import PKG_NAME, VERSION
from runtimepy.net.http import HttpMessageProcessor
from runtimepy.net.http.header import RequestHeader
from runtimepy.net.http.response import AsyncResponse, ResponseHeader
from runtimepy.net.tcp.connection import TcpConnection as _TcpConnection
HttpResult = Optional[BinaryMessage | AsyncResponse]
#
# async def handler(
# response: ResponseHeader,
# request: RequestHeader,
# request_data: Optional[bytearray],
# ) -> HttpResult:
# """Sample handler."""
#
HttpRequestHandler = Callable[
[ResponseHeader, RequestHeader, Optional[bytearray]],
Awaitable[HttpResult],
]
HttpResponse = Tuple[ResponseHeader, HttpResult]
HttpRequestHandlers = dict[http.HTTPMethod, HttpRequestHandler]
[docs]
def to_json(response: HttpResponse) -> Any:
"""Get JSON data from an HTTP response."""
# Make sure the response is JSON.
header = response[0]
assert header["content-type"].startswith("application/json"), header[
"content-type"
]
return loads(
response[1].decode(encoding=DEFAULT_ENCODING), # type: ignore
)
IDENTITY = f"{PKG_NAME}/{VERSION}"
[docs]
class HttpConnection(_TcpConnection):
"""A class implementing a basic HTTP interface."""
identity = IDENTITY
expecting_response: bool
log_alias = "HTTP"
# Handlers registered at the class level so that instances created at
# runtime don't need additional initialization.
handlers: HttpRequestHandlers = {}
headers: dict[str, str] = {
"Server": PKG_NAME,
"X-Content-Type-Options": "nosniff",
"Cache-Control": "public",
}
[docs]
def init(self) -> None:
"""Initialize this instance."""
# Incoming request handling.
self.processor = HttpMessageProcessor()
# Outgoing request handling.
self.request_ready = asyncio.BoundedSemaphore()
self.expecting_response = False
self.responses: asyncio.Queue[HttpResponse] = asyncio.Queue(maxsize=1)
self.handlers = copy(self.handlers)
self.handlers[http.HTTPMethod.GET] = self.get_handler
self.handlers[http.HTTPMethod.POST] = self.post_handler
[docs]
@classmethod
def get_log_prefix(cls, is_ssl: bool = False) -> str:
"""Get a logging prefix for this instance."""
return f"http{'s' if is_ssl else ''}://"
[docs]
async def get_handler(
self,
response: ResponseHeader,
request: RequestHeader,
request_data: Optional[bytearray],
) -> HttpResult:
"""Sample handler."""
[docs]
async def post_handler(
self,
response: ResponseHeader,
request: RequestHeader,
request_data: Optional[bytearray],
) -> HttpResult:
"""Sample handler."""
async def _process_request(
self,
response: ResponseHeader,
request_header: RequestHeader,
request_data: Optional[bytearray] = None,
) -> HttpResult:
"""Process an individual request."""
result = None
# Handle request.
if request_header.method in self.handlers:
result = await self.handlers[request_header.method](
response, request_header, request_data
)
# Set error code in response.
else:
response.status = http.HTTPStatus.NOT_IMPLEMENTED
response.reason = (
f"No handler for {request_header.method} requests."
)
for key, value in type(self).headers.items():
if response.get(key) is None:
response[key] = value
return result
[docs]
async def request(
self, request: RequestHeader, data: Optional[BinaryMessage] = None
) -> HttpResponse:
"""Make an HTTP request."""
async with self.request_ready:
# Set boilerplate header data.
request["user-agent"] = self.identity
await self._send(request, data)
self.expecting_response = True
result = await self.responses.get()
self.expecting_response = False
return result
[docs]
async def request_json(
self, request: RequestHeader, data: Optional[BinaryMessage] = None
) -> Any:
"""
Perform a request and convert the response to a data structure by
decoding it as JSON.
"""
return to_json(await self.request(request, data))
async def _send(
self,
header: Union[ResponseHeader, RequestHeader],
data: HttpResult = None,
) -> None:
"""Send a request or response to a request."""
size = None
if isinstance(data, AsyncResponse):
size = await data.size()
elif data is not None:
size = len(data)
if size is not None:
header["content-length"] = str(size)
self.send_binary(bytes(header))
if data is not None:
if isinstance(data, AsyncResponse):
async for chunk in data.process():
self.send_binary(chunk)
else:
self.send_binary(data)
header.log(self.logger, True)
[docs]
async def process_binary(self, data: BinaryMessage) -> bool:
"""Process a binary frame."""
for header, payload in self.processor.ingest(
data,
RequestHeader if not self.expecting_response else ResponseHeader,
):
header.log(self.logger, False)
if not self.expecting_response:
# Process request.
response = ResponseHeader()
await self._send(
response,
await self._process_request(
response, cast(RequestHeader, header), payload
),
)
# Process the response to a pending request.
else:
await self.responses.put(
(cast(ResponseHeader, header), payload)
)
return True