Source code for heros.zenoh

import zenoh
import json
import atexit
from .helper import log, generate_multicast_address
import os

zenoh_default_config = {}


[docs] class ZenohSessionManager: def __init__(self, config_dict: dict | None = None): config_dict = {} if config_dict is None else config_dict self._config_dict = {} self._config_dict.update(zenoh_default_config) self._config_dict.update(config_dict) self._sessions = {} self._referrers = [] self._referrer_realms = []
[docs] def request_session(self, obj: object, realm: str) -> zenoh.Session: """ Request the global zenoh session. Args: obj: The object that requests the session """ if realm not in self._sessions: env_config = os.environ.get("HEROS_ZENOH_CONFIG", None) if env_config is not None: try: config = zenoh.Config.from_json5(env_config) except: log.exception("Failed to interpret HEROS_ZENOH_CONFIG json string, ignoring") raise else: config = zenoh.Config() if realm != "heros" and os.environ.get("HEROS_SEP_MULTICAST", "0").lower() in ["1", "true"]: # Use default Multicast address for the default heros realm config.insert_json5( "scouting/multicast/address", json.dumps(f"{generate_multicast_address(realm)}:7446") ) for key, value in self._config_dict.items(): config.insert_json5(key, json.dumps(value)) self._sessions[realm] = zenoh.open(config) atexit.register(self._sessions[realm].close) if obj not in self._referrers: self._referrers.append(obj) self._referrer_realms.append(realm) return self._sessions[realm]
[docs] def release_session(self, obj: object) -> None: """ Release from the global zenoh session. Args: obj: The object that wants to release from the global zenoh session """ if obj in self._referrers: ref_idx = self._referrers.index(obj) del self._referrers[ref_idx] realm = self._referrer_realms.pop(ref_idx) else: return if realm not in self._referrer_realms and realm in self._sessions: try: self._sessions[realm].close() except zenoh.ZError: msg = "Timeout occurred when closing Zenoh session. This can lead to stale peers and indicates connection issues." log.exception(msg) del self._sessions[realm]
[docs] def update_config(self, config_dict: dict) -> None: self._config_dict.update(config_dict)
[docs] def force_close(self) -> None: for session in self._sessions.values(): session.close() self._sessions = {} self._referrers = [] self._referrer_realms = []
zenoh.try_init_log_from_env() session_manager = ZenohSessionManager()