ctfy.sdk.events
Sync consumer for the platform's SSE event stream (GET /events).
The platform broadcasts every team-scoped and global event over an SSE
endpoint; admin tokens additionally see admin-only frames (see
ctfy.server.sse). This module turns that wire-level stream into
an ergonomic Python iterator that the CLI's ctfy events watch and
direct SDK callers can consume.
Usage::
with client.events() as stream:
for event in stream:
print(event["event"], event["data"])
The stream auto-reconnects on transient network errors with exponential backoff; close the context manager (or break the loop) to stop.
Frame format on the wire (from ctfy.server.sse.SSEHub._encode())::
event: <name>\n
data: <json>\n
\n
Comment lines (: prefix) — including the keepalive every
ctfy.core.constants.SSE_KEEPALIVE_INTERVAL seconds — are
swallowed silently.
1"""Sync consumer for the platform's SSE event stream (``GET /events``). 2 3The platform broadcasts every team-scoped and global event over an SSE 4endpoint; admin tokens additionally see admin-only frames (see 5:mod:`ctfy.server.sse`). This module turns that wire-level stream into 6an ergonomic Python iterator that the CLI's ``ctfy events watch`` and 7direct SDK callers can consume. 8 9Usage:: 10 11 with client.events() as stream: 12 for event in stream: 13 print(event["event"], event["data"]) 14 15The stream auto-reconnects on transient network errors with exponential 16backoff; close the context manager (or break the loop) to stop. 17 18Frame format on the wire (from :func:`ctfy.server.sse.SSEHub._encode`):: 19 20 event: <name>\\n 21 data: <json>\\n 22 \\n 23 24Comment lines (``:`` prefix) — including the keepalive every 25:data:`ctfy.core.constants.SSE_KEEPALIVE_INTERVAL` seconds — are 26swallowed silently. 27""" 28 29from __future__ import annotations 30 31import json 32import time 33from collections.abc import Callable, Iterator 34from typing import Any 35 36import httpx 37 38_RECONNECT_BACKOFF_MAX_S = 30.0 39_RECONNECT_BACKOFF_START_S = 1.0 40 41 42class EventStream: 43 """Iterator over events from a single live HTTP connection, with 44 auto-reconnect. 45 46 Construct via :meth:`ctfy.sdk.PlatformClient.events`; don't 47 instantiate directly. 48 """ 49 50 def __init__( 51 self, 52 *, 53 client_factory: Callable[[], httpx.Client], 54 path: str, 55 token: str = "", 56 auto_reconnect: bool = True, 57 ) -> None: 58 self._client_factory = client_factory 59 self._path = path 60 self._token = token 61 self._auto_reconnect = auto_reconnect 62 self._stopped = False 63 64 def __enter__(self) -> EventStream: 65 return self 66 67 def __exit__(self, *_exc: object) -> None: 68 self.close() 69 70 def __iter__(self) -> Iterator[dict[str, Any]]: 71 backoff = _RECONNECT_BACKOFF_START_S 72 while not self._stopped: 73 try: 74 yield from self._read_one_session() 75 except (httpx.HTTPError, ConnectionError, OSError): 76 if not self._auto_reconnect or self._stopped: 77 raise 78 time.sleep(backoff) 79 backoff = min(backoff * 2, _RECONNECT_BACKOFF_MAX_S) 80 continue 81 # Clean EOF on the connection (server closed) — reconnect 82 # immediately if auto_reconnect, otherwise stop. 83 if not self._auto_reconnect or self._stopped: 84 break 85 backoff = _RECONNECT_BACKOFF_START_S 86 87 def close(self) -> None: 88 """Stop the loop; in-flight ``read_one_session`` finishes its 89 current iter_lines step before the iterator exits.""" 90 self._stopped = True 91 92 # -- internal ----------------------------------------------------- 93 94 def _read_one_session(self) -> Iterator[dict[str, Any]]: 95 params = {"token": self._token} if self._token else None 96 with ( 97 self._client_factory() as cli, 98 cli.stream("GET", self._path, params=params, timeout=None) as resp, 99 ): 100 resp.raise_for_status() 101 event_name = "" 102 data_lines: list[str] = [] 103 for line in resp.iter_lines(): 104 if self._stopped: 105 return 106 if not line: 107 # Blank line → frame boundary. Flush whatever we have. 108 if event_name and data_lines: 109 body = "\n".join(data_lines) 110 try: 111 data: Any = json.loads(body) 112 except json.JSONDecodeError: 113 data = body 114 yield {"event": event_name, "data": data} 115 event_name = "" 116 data_lines = [] 117 continue 118 if line.startswith(":"): 119 # Comment line (server keepalive, initial ": connected"). 120 continue 121 if line.startswith("event:"): 122 event_name = line[6:].lstrip() 123 elif line.startswith("data:"): 124 # Per the SSE spec a single ``data:`` value may span 125 # multiple lines; rejoin with ``\n``. ctfy's encoder 126 # emits exactly one ``data:`` line so this is normally 127 # a single entry, but be tolerant. 128 data_lines.append(line[5:].lstrip()) 129 # ``id:`` / ``retry:`` lines: ignored — the platform 130 # doesn't currently emit them.
43class EventStream: 44 """Iterator over events from a single live HTTP connection, with 45 auto-reconnect. 46 47 Construct via :meth:`ctfy.sdk.PlatformClient.events`; don't 48 instantiate directly. 49 """ 50 51 def __init__( 52 self, 53 *, 54 client_factory: Callable[[], httpx.Client], 55 path: str, 56 token: str = "", 57 auto_reconnect: bool = True, 58 ) -> None: 59 self._client_factory = client_factory 60 self._path = path 61 self._token = token 62 self._auto_reconnect = auto_reconnect 63 self._stopped = False 64 65 def __enter__(self) -> EventStream: 66 return self 67 68 def __exit__(self, *_exc: object) -> None: 69 self.close() 70 71 def __iter__(self) -> Iterator[dict[str, Any]]: 72 backoff = _RECONNECT_BACKOFF_START_S 73 while not self._stopped: 74 try: 75 yield from self._read_one_session() 76 except (httpx.HTTPError, ConnectionError, OSError): 77 if not self._auto_reconnect or self._stopped: 78 raise 79 time.sleep(backoff) 80 backoff = min(backoff * 2, _RECONNECT_BACKOFF_MAX_S) 81 continue 82 # Clean EOF on the connection (server closed) — reconnect 83 # immediately if auto_reconnect, otherwise stop. 84 if not self._auto_reconnect or self._stopped: 85 break 86 backoff = _RECONNECT_BACKOFF_START_S 87 88 def close(self) -> None: 89 """Stop the loop; in-flight ``read_one_session`` finishes its 90 current iter_lines step before the iterator exits.""" 91 self._stopped = True 92 93 # -- internal ----------------------------------------------------- 94 95 def _read_one_session(self) -> Iterator[dict[str, Any]]: 96 params = {"token": self._token} if self._token else None 97 with ( 98 self._client_factory() as cli, 99 cli.stream("GET", self._path, params=params, timeout=None) as resp, 100 ): 101 resp.raise_for_status() 102 event_name = "" 103 data_lines: list[str] = [] 104 for line in resp.iter_lines(): 105 if self._stopped: 106 return 107 if not line: 108 # Blank line → frame boundary. Flush whatever we have. 109 if event_name and data_lines: 110 body = "\n".join(data_lines) 111 try: 112 data: Any = json.loads(body) 113 except json.JSONDecodeError: 114 data = body 115 yield {"event": event_name, "data": data} 116 event_name = "" 117 data_lines = [] 118 continue 119 if line.startswith(":"): 120 # Comment line (server keepalive, initial ": connected"). 121 continue 122 if line.startswith("event:"): 123 event_name = line[6:].lstrip() 124 elif line.startswith("data:"): 125 # Per the SSE spec a single ``data:`` value may span 126 # multiple lines; rejoin with ``\n``. ctfy's encoder 127 # emits exactly one ``data:`` line so this is normally 128 # a single entry, but be tolerant. 129 data_lines.append(line[5:].lstrip()) 130 # ``id:`` / ``retry:`` lines: ignored — the platform 131 # doesn't currently emit them.
Iterator over events from a single live HTTP connection, with auto-reconnect.
Construct via ctfy.sdk.PlatformClient.events(); don't
instantiate directly.
51 def __init__( 52 self, 53 *, 54 client_factory: Callable[[], httpx.Client], 55 path: str, 56 token: str = "", 57 auto_reconnect: bool = True, 58 ) -> None: 59 self._client_factory = client_factory 60 self._path = path 61 self._token = token 62 self._auto_reconnect = auto_reconnect 63 self._stopped = False
88 def close(self) -> None: 89 """Stop the loop; in-flight ``read_one_session`` finishes its 90 current iter_lines step before the iterator exits.""" 91 self._stopped = True
Stop the loop; in-flight read_one_session finishes its
current iter_lines step before the iterator exits.