Source code for runtimepy.net.udp.queue
"""
A module implementing a simple queue-based UDP interface.
"""
# built-in
from asyncio import Queue
# third-party
from vcorelib.io import BinaryMessage
# internal
from runtimepy.net.udp.connection import UdpConnection
DatagramQueue = Queue[tuple[BinaryMessage, tuple[str, int]]]
[docs]
class QueueUdpConnection(UdpConnection):
"""An echo connection for UDP."""
datagrams: DatagramQueue
[docs]
def init(self) -> None:
"""Initialize this instance."""
self.datagrams = Queue()
[docs]
async def process_datagram(
self, data: BinaryMessage, addr: tuple[str, int]
) -> bool:
"""Process a datagram."""
self.datagrams.put_nowait((data, addr))
return True