Source code for runtimepy.net.udp.protocol
"""
A module implementing a DatagramProtocol for UdpConnection.
"""
# built-in
import asyncio as _asyncio
from asyncio import DatagramProtocol as _DatagramProtocol
import logging
# third-party
from vcorelib.io import BinaryMessage
from vcorelib.logging import LoggerMixin, LoggerType
from vcorelib.math import RateLimiter
# internal
from runtimepy.net.connection import Connection as _Connection
[docs]
class UdpQueueProtocol(_DatagramProtocol):
"""A simple UDP protocol that populates a message queue."""
logger: LoggerType
conn: _Connection
def __init__(self) -> None:
"""Initialize this protocol."""
self.queue: _asyncio.Queue[tuple[BinaryMessage, tuple[str, int]]] = (
_asyncio.Queue()
)
self.log_limiter = RateLimiter.from_s(1.0)
[docs]
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
"""Handle incoming data."""
self.queue.put_nowait((data, addr))
[docs]
def error_received(self, exc: Exception) -> None:
"""Log any received errors."""
LoggerMixin.governed_log(
self, # type: ignore
self.log_limiter,
"Exception occurred:",
level=logging.ERROR,
exc_info=exc,
)
# Most of the time this error occurs when sending to a loopback
# destination (localhost) that is no longer listening.
self.conn.disable(str(exc))