Source code for runtimepy.net.stream.json

"""
A module implementing a JSON message connection interface.
"""

# built-in
from argparse import Namespace
from json import JSONDecodeError, loads
from typing import Optional

# internal
from runtimepy.channel.environment.command import FieldOrChannel
from runtimepy.message.interface import JsonMessageInterface
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.net.stream.string import StringMessageConnection


[docs] class JsonMessageConnection( StringMessageConnection, AsyncCommandProcessingMixin, JsonMessageInterface ): """A connection interface for JSON messaging."""
[docs] def init(self) -> None: """Initialize this instance.""" super().init() self._setup_async_commands() JsonMessageInterface.__init__(self)
[docs] def write(self, data: bytes, addr: tuple[str, int] = None) -> None: """Write data.""" self._send_message(data, addr=addr)
[docs] async def handle_command( self, args: Namespace, channel: Optional[FieldOrChannel] ) -> None: """Handle a remote command asynchronously.""" if args.remote and self.connected: cli_args = [args.command] if args.force: cli_args.append("-f") cli_args.append(args.channel) self.logger.info( "Remote command: %s", await self.channel_command( " ".join(cli_args + args.extra), environment=args.env ), )
[docs] async def async_init(self) -> bool: """A runtime initialization routine (executes during 'process').""" # Check loopback if it makes sense to. result = await super().async_init() # Only not-connected UDP connections can't do this. if self.connected: result = await self.loopback() if result: await self.wait_json({"meta": self.meta}) return result
[docs] async def process_message( self, data: str, addr: tuple[str, int] = None ) -> bool: """Process a string message.""" result = True try: decoded = loads(data) if decoded and isinstance(decoded, dict): result = await self.process_json(decoded, addr=addr) else: self.logger.error("Ignoring message '%s'.", data) except JSONDecodeError as exc: self.logger.exception("Couldn't decode '%s': %s", data, exc) return result