Source code for heros.zenoh
import zenoh
import json
import atexit
from .helper import log, generate_multicast_address
import os
zenoh_default_config = {}
[docs]
class ZenohSessionManager:
def __init__(self, config_dict: dict | None = None):
config_dict = {} if config_dict is None else config_dict
self._config_dict = {}
self._config_dict.update(zenoh_default_config)
self._config_dict.update(config_dict)
self._sessions = {}
self._referrers = []
self._referrer_realms = []
[docs]
def request_session(self, obj: object, realm: str) -> zenoh.Session:
"""
Request the global zenoh session.
Args:
obj: The object that requests the session
"""
if realm not in self._sessions:
config = zenoh.Config()
if realm != "heros" and os.environ.get("HEROS_SEP_MULTICAST", "0").lower() in ["1", "true"]:
# Use default Multicast address for the default heros realm
config.insert_json5(
"scouting/multicast/address", json.dumps(f"{generate_multicast_address(realm)}:7446")
)
for key, value in self._config_dict.items():
config.insert_json5(key, json.dumps(value))
self._sessions[realm] = zenoh.open(config)
atexit.register(self._sessions[realm].close)
if obj not in self._referrers:
self._referrers.append(obj)
self._referrer_realms.append(realm)
return self._sessions[realm]
[docs]
def release_session(self, obj: object) -> None:
"""
Release from the global zenoh session.
Args:
obj: The object that wants to release from the global zenoh session
"""
if obj in self._referrers:
ref_idx = self._referrers.index(obj)
del self._referrers[ref_idx]
realm = self._referrer_realms.pop(ref_idx)
else:
return
if realm not in self._referrer_realms and realm in self._sessions:
try:
self._sessions[realm].close()
except zenoh.ZError:
msg = "Timeout occurred when closing Zenoh session. This can lead to stale peers and indicates connection issues."
log.exception(msg)
del self._sessions[realm]
[docs]
def update_config(self, config_dict: dict) -> None:
self._config_dict.update(config_dict)
[docs]
def force_close(self) -> None:
for session in self._sessions.values():
session.close()
self._sessions = {}
self._referrers = []
self._referrer_realms = []
zenoh.try_init_log_from_env()
session_manager = ZenohSessionManager()