Source code for runtimepy.net.tcp.http

"""
A module implementing a basic HTTP (multiple RFC's) connection interface.
"""

# built-in
import asyncio
from copy import copy
import http
from json import loads
from typing import Any, Awaitable, Callable, Optional, Tuple, Union, cast

# third-party
from vcorelib import DEFAULT_ENCODING
from vcorelib.io import BinaryMessage

# internal
from runtimepy import PKG_NAME, VERSION
from runtimepy.net.http import HttpMessageProcessor
from runtimepy.net.http.header import RequestHeader
from runtimepy.net.http.response import AsyncResponse, ResponseHeader
from runtimepy.net.tcp.connection import TcpConnection as _TcpConnection

HttpResult = Optional[BinaryMessage | AsyncResponse]

#
# async def handler(
#     response: ResponseHeader,
#     request: RequestHeader,
#     request_data: Optional[bytearray],
# ) -> HttpResult:
#     """Sample handler."""
#
HttpRequestHandler = Callable[
    [ResponseHeader, RequestHeader, Optional[bytearray]],
    Awaitable[HttpResult],
]
HttpResponse = Tuple[ResponseHeader, HttpResult]

HttpRequestHandlers = dict[http.HTTPMethod, HttpRequestHandler]


[docs] def to_json(response: HttpResponse) -> Any: """Get JSON data from an HTTP response.""" # Make sure the response is JSON. header = response[0] assert header["content-type"].startswith("application/json"), header[ "content-type" ] return loads( response[1].decode(encoding=DEFAULT_ENCODING), # type: ignore )
IDENTITY = f"{PKG_NAME}/{VERSION}"
[docs] class HttpConnection(_TcpConnection): """A class implementing a basic HTTP interface.""" identity = IDENTITY expecting_response: bool log_alias = "HTTP" # Handlers registered at the class level so that instances created at # runtime don't need additional initialization. handlers: HttpRequestHandlers = {} headers: dict[str, str] = { "Server": PKG_NAME, "X-Content-Type-Options": "nosniff", "Cache-Control": "public", }
[docs] def init(self) -> None: """Initialize this instance.""" # Incoming request handling. self.processor = HttpMessageProcessor() # Outgoing request handling. self.request_ready = asyncio.BoundedSemaphore() self.expecting_response = False self.responses: asyncio.Queue[HttpResponse] = asyncio.Queue(maxsize=1) self.handlers = copy(self.handlers) self.handlers[http.HTTPMethod.GET] = self.get_handler self.handlers[http.HTTPMethod.POST] = self.post_handler
[docs] @classmethod def get_log_prefix(cls, is_ssl: bool = False) -> str: """Get a logging prefix for this instance.""" return f"http{'s' if is_ssl else ''}://"
[docs] async def get_handler( self, response: ResponseHeader, request: RequestHeader, request_data: Optional[bytearray], ) -> HttpResult: """Sample handler."""
[docs] async def post_handler( self, response: ResponseHeader, request: RequestHeader, request_data: Optional[bytearray], ) -> HttpResult: """Sample handler."""
async def _process_request( self, response: ResponseHeader, request_header: RequestHeader, request_data: Optional[bytearray] = None, ) -> HttpResult: """Process an individual request.""" result = None # Handle request. if request_header.method in self.handlers: result = await self.handlers[request_header.method]( response, request_header, request_data ) # Set error code in response. else: response.status = http.HTTPStatus.NOT_IMPLEMENTED response.reason = ( f"No handler for {request_header.method} requests." ) for key, value in type(self).headers.items(): if response.get(key) is None: response[key] = value return result
[docs] async def request( self, request: RequestHeader, data: Optional[BinaryMessage] = None ) -> HttpResponse: """Make an HTTP request.""" async with self.request_ready: # Set boilerplate header data. request["user-agent"] = self.identity await self._send(request, data) self.expecting_response = True result = await self.responses.get() self.expecting_response = False return result
[docs] async def request_json( self, request: RequestHeader, data: Optional[BinaryMessage] = None ) -> Any: """ Perform a request and convert the response to a data structure by decoding it as JSON. """ return to_json(await self.request(request, data))
async def _send( self, header: Union[ResponseHeader, RequestHeader], data: HttpResult = None, ) -> None: """Send a request or response to a request.""" size = None if isinstance(data, AsyncResponse): size = await data.size() elif data is not None: size = len(data) if size is not None: header["content-length"] = str(size) self.send_binary(bytes(header)) if data is not None: if isinstance(data, AsyncResponse): async for chunk in data.process(): self.send_binary(chunk) else: self.send_binary(data) header.log(self.logger, True)
[docs] async def process_binary(self, data: BinaryMessage) -> bool: """Process a binary frame.""" for header, payload in self.processor.ingest( data, RequestHeader if not self.expecting_response else ResponseHeader, ): header.log(self.logger, False) if not self.expecting_response: # Process request. response = ResponseHeader() await self._send( response, await self._process_request( response, cast(RequestHeader, header), payload ), ) # Process the response to a pending request. else: await self.responses.put( (cast(ResponseHeader, header), payload) ) return True