Source code for runtimepy.net.tcp.telnet.np_05b

"""
A module implementing a Telnet interface for the NP-05B.
"""

# built-in
from argparse import Namespace
import asyncio
from enum import Enum
from typing import List, Optional, Union

# third-party
from vcorelib.python import StrToBool

# internal
from runtimepy.channel.environment.command import FieldOrChannel
from runtimepy.channel.environment.command.parser import ChannelCommand
from runtimepy.mixins.async_command import AsyncCommandProcessingMixin
from runtimepy.net.tcp.telnet import BasicTelnet


[docs] class Np05bStrings(Enum): """Command strings for the NP-05B.""" SET_OUTLET = "$A3" # Requires polling to remain synchronized with device, 'reboot' is actually # a double toggle with a constant time in between (not that useful). # REBOOT = "$A4" OUTLET_STATUS = "$A5" ALL_OUTLETS = "$A7" OK = "$A0" FAIL = "$AF"
[docs] @staticmethod def is_command(data: str) -> bool: """Determine if some string data is a command string.""" return data.startswith("$A")
[docs] class Np05bConnection(BasicTelnet, AsyncCommandProcessingMixin): """ A class implementing a communication interface with the NP-05B networked PDU. """ # Connection settings. default_auto_restart = True # Defaults. num_ports = 5 prompt = ">" # Typing for some instance members. prev_cmd: str chan_to_idx: dict[str, int] lock: asyncio.Lock
[docs] def init(self) -> None: """Initialize this instance.""" self.lock = asyncio.Lock() self.prev_cmd = "" self.outlets_on: List[Union[None, bool]] = [ None for _ in range(self.num_ports) ] self.outlet_status_sem = asyncio.Semaphore(0) self.chan_to_idx = {} # Add runtime state. namespace = "outlet" with self.env.names_pushed(namespace): for idx in range(self.num_ports): one_idx = idx + 1 chan = f"{one_idx}.on" self.env.namespace(chan) self.chan_to_idx[f"{namespace}.{chan}"] = one_idx self.env.bool_channel( chan, commandable=True, default=False, description=f"Current state of outlet {one_idx}.", ) async def all_on(_: Namespace, __: Optional[FieldOrChannel]) -> None: """Handle an 'all_on' command.""" await self.np05b_command(Np05bStrings.ALL_OUTLETS, "1") async def all_off(_: Namespace, __: Optional[FieldOrChannel]) -> None: """Handle an 'all_off' command.""" await self.np05b_command(Np05bStrings.ALL_OUTLETS, "0") self._setup_async_commands(all_on, all_off)
[docs] async def handle_command( self, args: Namespace, channel: Optional[FieldOrChannel] ) -> None: """Handle a command.""" idx = self.chan_to_idx.get(args.channel) if idx is not None: curr = self.outlets_on[idx - 1] to_set = None # Handle toggle. if args.command == ChannelCommand.TOGGLE and curr is not None: to_set = not curr # Handle explicit value. elif args.extra: parsed = StrToBool.parse(args.extra[0]) if parsed.valid: to_set = parsed.result if to_set is not None: await self.set_outlet_state(idx, to_set)
[docs] async def async_init(self) -> bool: """Ensure that initial statuses get set.""" await self.request_outlet_status() return True
def _send_np05b(self, message: Np05bStrings, *parts: str) -> None: """Send a specific message.""" msg = " ".join([message.value, *parts]) self.logger.debug("Sending: '%s'.", msg) self.send_text(msg + "\r\n")
[docs] async def np05b_command(self, message: Np05bStrings, *parts: str) -> None: """Send a command then read outlet states.""" async with self.lock: self._send_np05b(message, *parts) await asyncio.wait_for(self.request_outlet_status(), 5.0)
[docs] async def request_outlet_status(self) -> None: """Request that the endpoint sends information about outlet status.""" self._send_np05b(Np05bStrings.OUTLET_STATUS) # Wait for the device to respond. await self.outlet_status_sem.acquire()
[docs] async def set_outlet_state(self, idx: int, state: bool) -> bool: """ Set the state of an outlet on the PDU. Returns whether or not the state for the specified outlet index got set. """ result = False real_idx = idx - 1 if 0 <= real_idx <= self.num_ports: if self.outlets_on[real_idx] != state: await self.np05b_command( Np05bStrings.SET_OUTLET, str(idx), "1" if state else "0" ) result = self.outlets_on[real_idx] == state return result
[docs] def process_response( self, command: Np05bStrings, command_data: str, response: str ) -> None: """Process a command response from the NP-05B.""" parts = response.split(",") result = Np05bStrings(parts[0]) if result is Np05bStrings.FAIL: self.logger.error( "Command %s ('%s') failed: %s.", command.name, command_data, parts, ) else: assert result is Np05bStrings.OK self.logger.info( "Command %s ('%s') succeeded.", command.name, command_data ) if command is Np05bStrings.OUTLET_STATUS: if result is Np05bStrings.OK: # Reverse the string because the output is sent in reverse # order. for idx, val in enumerate(parts[1][::-1]): prev = self.outlets_on[idx] curr = val == "1" # Log any state changes. if prev != curr: self.logger.debug( "Output %d changed: %s -> %s", idx + 1, prev, curr ) self.outlets_on[idx] = curr self.env.set(f"outlet.{idx + 1}.on", curr) # Signal that we got a response to outlet status. self.outlet_status_sem.release()
[docs] async def process_text(self, data: str) -> bool: """Process a text frame.""" line_stack = list(data.splitlines()) while line_stack: line = line_stack.pop(0) # We can skip lines that don't mean anything. if not line or line == self.prompt: continue # The device's response strings can end up on the same line. Split # lines with multiple command markers in them and re-process them # on the next iterations. commands = line.split("$A") if len(commands) > 2: line_stack = [f"$A{x}" for x in commands[1:]] + line_stack continue if Np05bStrings.is_command(line): # The device echo's incoming commands, so if the current # and previous lines have command markers, we can process # this line as the response. if Np05bStrings.is_command(self.prev_cmd): self.process_response( Np05bStrings(self.prev_cmd[:3]), self.prev_cmd[3:].strip(), line, ) self.prev_cmd = "" else: self.prev_cmd = line # Log lines that aren't commands or responses. else: self.logger.info(line) return True