Source code for runtimepy.net.tcp.telnet
"""
A module implementing a basic telnet (RFC 854) connection interface.
"""
# built-in
from abc import abstractmethod as _abstractmethod
from contextlib import ExitStack as _ExitStack
from contextlib import suppress
from io import BytesIO as _BytesIO
from typing import BinaryIO as _BinaryIO
# third-party
from vcorelib import DEFAULT_ENCODING
from vcorelib.io import BinaryMessage
# internal
from runtimepy.net.tcp.connection import TcpConnection as _TcpConnection
from runtimepy.net.tcp.telnet.codes import (
CARRIAGE_RETURN,
NEWLINE,
TelnetCode,
TelnetNvt,
)
__all__ = [
"TelnetCode",
"TelnetNvt",
"Telnet",
"BasicTelnet",
"NEWLINE",
"CARRIAGE_RETURN",
]
[docs]
class Telnet(_TcpConnection):
"""A class implementing a basic telnet interface."""
log_alias = "TELNET"
[docs]
async def process_telnet_message(self, data: bytes) -> bool:
"""By default, treat all incoming data bytes as text."""
with suppress(UnicodeDecodeError):
await self.process_text(data.decode(encoding=DEFAULT_ENCODING))
return True
[docs]
@_abstractmethod
async def process_command(self, code: TelnetCode) -> None:
"""Process a telnet command."""
[docs]
def send_command(self, code: TelnetCode) -> None:
"""Send a telnet command."""
self.send_binary(bytes([TelnetCode.IAC, code]))
[docs]
def send_option(self, option: TelnetCode, code: int) -> None:
"""
Send a telnet option sequence. if the 'SB' is the desired code, the
additional data can be sent using the 'send_binary' method directly.
"""
assert TelnetCode.is_option_code(option)
self.send_binary(bytes([TelnetCode.IAC, option, code]))
[docs]
@_abstractmethod
async def handle_nvt(self, action: TelnetNvt) -> None:
"""Handle a signal for the network virtual-terminal."""
[docs]
@_abstractmethod
async def process_option(
self, code: TelnetCode, option: int, stream: _BinaryIO
) -> None:
"""Process a telnet option."""
[docs]
async def process_binary(self, data: BinaryMessage) -> bool:
"""Process a binary frame."""
result = True
last_val = 0
val = 0
# These two conditions are used to form a simple state machine.
processing_option = False
processing_command = False
with _ExitStack() as stack:
text = stack.enter_context(_BytesIO())
stream = stack.enter_context(_BytesIO(data))
while True:
data = stream.read(1)
if len(data) == 0:
break
last_val = val
val = data[0]
if processing_option:
await self.process_option(
TelnetCode(last_val), val, stream
)
processing_option = False
elif processing_command:
if TelnetCode.is_option_code(val):
processing_option = True
else:
# If 'IAC' appears twice in a row, it's being
# escaped.
if val == TelnetCode.IAC:
text.write(data)
else:
await self.process_command(TelnetCode(val))
processing_command = False
elif val == TelnetCode.IAC:
processing_command = True
elif TelnetNvt.is_nvt(val):
nvt = TelnetNvt(val)
if not nvt.to_stream(text):
await self.handle_nvt(TelnetNvt(val))
# Forward byte to the data stream.
else:
text.write(data)
# Process the message that remains as data.
if text.tell() > 0:
result = await self.process_telnet_message(text.getvalue())
return result
[docs]
class BasicTelnet(Telnet):
"""A simple telnet implementation."""
[docs]
async def process_text(self, data: str) -> bool:
"""Process a text frame."""
self.logger.info("Text: '%s'.", data)
return True
[docs]
async def process_command(self, code: TelnetCode) -> None:
"""Process a telnet command."""
self.logger.info("Command: %s (%d).", code, code)
if code is TelnetCode.IP:
self.disable("Interrupt Process")
[docs]
async def handle_nvt(self, action: TelnetNvt) -> None:
"""Handle a signal for the network virtual-terminal."""
if action is not TelnetNvt.NUL:
self.logger.info("NVT signal: %s (%d).", action, action)
[docs]
async def process_option(
self, code: TelnetCode, option: int, _: _BinaryIO
) -> None:
"""Process a telnet option."""
self.logger.info("Option: %s (%d), %d", code, code, option)