Source code for runtimepy.sample.program

"""
A module implementing a sample peer-program interface.
"""

# built-in
import asyncio

# third-party
from vcorelib.math import RateLimiter

# internal
from runtimepy.net.arbiter.info import AppInfo, SampleStruct
from runtimepy.subprocess.program import PeerProgram


[docs] class SampleProgram(PeerProgram): """A sample peer program.""" struct_type = SampleStruct stderr_task: asyncio.Task[None]
[docs] async def log_message_sender( self, poll_period_s: float, did_write: asyncio.Event ) -> None: """Write to stderr periodically.""" keep_going = True while keep_going: try: self.struct.governed_log( self.log_limiter, "Sup, it's %s.", self.struct.name ) did_write.set() await asyncio.sleep(poll_period_s) except asyncio.CancelledError: keep_going = False
[docs] def struct_pre_finalize(self) -> None: """Configure struct before finalization.""" super().struct_pre_finalize() self.log_limiter = RateLimiter.from_s(2.0)
[docs] def pre_environment_exchange(self) -> None: """Perform early initialization tasks.""" # Trigger missed telemetry. with self.streaming_events(): self.struct.poll()
[docs] async def cleanup(self) -> None: """Runs when program 'running' context exits.""" # Cancel stderr task. if hasattr(self, "stderr_task"): self.stderr_task.cancel() await self.stderr_task
[docs] async def run(app: AppInfo) -> int: """A network application that doesn't do anything.""" await app.all_finalized() prog = SampleProgram.singleton() prog.send_poll(2) await prog.share_config({"config": app.original_config()}) assert prog is not None prog.struct.poll() # Send remote commands. assert prog.peer is not None for cmd in [ "set a.0.really_really_long_enum very_long_member_name_2", "set -f a.0.enum one", ]: prog.peer.command(cmd) await prog.process_command_queue() # Register other async tasks. did_write = asyncio.Event() prog.stderr_task = asyncio.create_task( prog.log_message_sender(0.1, did_write) ) await did_write.wait() prog.struct.poll() await prog.wait_json({"a": 1, "b": 2, "c": 3}) await app.stop.wait() return 0