Source code for runtimepy.message.handlers

"""
A module defining some useful JSON message handlers.
"""

# built-in
import asyncio

# third-party
from vcorelib.dict import MergeStrategy
from vcorelib.dict.codec import BasicDictCodec
from vcorelib.io import ARBITER
from vcorelib.paths import find_file

# internal
from runtimepy.channel.environment.command.processor import (
    ChannelCommandProcessor,
)
from runtimepy.message import JsonMessage
from runtimepy.message.types import TypedHandler
from runtimepy.schemas import RuntimepyDictCodec


[docs] class ChannelCommand(RuntimepyDictCodec, BasicDictCodec): """A schema-validated find-file request.""" data: JsonMessage
[docs] def channel_env_handler( envs: dict[str, ChannelCommandProcessor], default: ChannelCommandProcessor ) -> TypedHandler[ChannelCommand]: """Create a channel-environment map command handler.""" async def handler(outbox: JsonMessage, request: ChannelCommand) -> None: """Handle a JSON command for channel environments.""" if request.data["is_request"]: outbox["is_request"] = False env_name = request.data["environment"] env = envs.get(env_name) if env_name == "default": env = default # Run the command if we have the environment. if env is not None: result = env.command(request.data["command"]) outbox["success"] = result.success outbox["reason"] = result.reason # Respond with an error (environment not found). else: outbox["success"] = False outbox["reason"] = ( f"No environment '{env_name}', options: {envs.keys()}." ) return handler
[docs] async def loopback_handler(outbox: JsonMessage, inbox: JsonMessage) -> None: """A simple loopback handler.""" outbox.update(inbox)
[docs] async def event_wait(event: asyncio.Event, timeout: float) -> bool: """Wait for an event to be set within a timeout.""" result = True try: await asyncio.wait_for(event.wait(), timeout) except asyncio.TimeoutError: result = False return result
[docs] class FindFile(RuntimepyDictCodec, BasicDictCodec): """A schema-validated find-file request.""" data: JsonMessage
[docs] async def find_file_request_handler( outbox: JsonMessage, request: FindFile ) -> None: """Attempt to find a file path based on the request.""" if request.data["is_request"]: path = find_file( request.data["path"], *request.data.get("parts", []), search_paths=request.data.get("search_paths"), include_cwd=request.data["include_cwd"], relative_to=request.data.get("relative_to"), package=request.data.get("package"), package_subdir=request.data["package_subdir"], ) outbox["path"] = str(path) if path is not None else None outbox["is_request"] = False # Attempt to decode the file if requested. if request.data["decode"]: decoded = {"success": False, "data": {}, "time_ns": -1} if path is not None: result = ARBITER.decode( path, includes_key=request.data["includes_key"], expect_overwrite=request.data["expect_overwrite"], strategy=( MergeStrategy.UPDATE if request.data["update"] else MergeStrategy.RECURSIVE ), ) decoded["success"] = result.success decoded["data"] = result.data decoded["time_ns"] = result.time_ns outbox["decoded"] = decoded