ctfy.sdk.events

Sync consumer for the platform's SSE event stream (GET /events).

The platform broadcasts every team-scoped and global event over an SSE endpoint; admin tokens additionally see admin-only frames (see ctfy.server.sse). This module turns that wire-level stream into an ergonomic Python iterator that the CLI's ctfy events watch and direct SDK callers can consume.

Usage::

with client.events() as stream:
    for event in stream:
        print(event["event"], event["data"])

The stream auto-reconnects on transient network errors with exponential backoff; close the context manager (or break the loop) to stop.

Frame format on the wire (from ctfy.server.sse.SSEHub._encode())::

event: <name>\n
data: <json>\n
\n

Comment lines (: prefix) — including the keepalive every ctfy.core.constants.SSE_KEEPALIVE_INTERVAL seconds — are swallowed silently.

  1"""Sync consumer for the platform's SSE event stream (``GET /events``).
  2
  3The platform broadcasts every team-scoped and global event over an SSE
  4endpoint; admin tokens additionally see admin-only frames (see
  5:mod:`ctfy.server.sse`). This module turns that wire-level stream into
  6an ergonomic Python iterator that the CLI's ``ctfy events watch`` and
  7direct SDK callers can consume.
  8
  9Usage::
 10
 11    with client.events() as stream:
 12        for event in stream:
 13            print(event["event"], event["data"])
 14
 15The stream auto-reconnects on transient network errors with exponential
 16backoff; close the context manager (or break the loop) to stop.
 17
 18Frame format on the wire (from :func:`ctfy.server.sse.SSEHub._encode`)::
 19
 20    event: <name>\\n
 21    data: <json>\\n
 22    \\n
 23
 24Comment lines (``:`` prefix) — including the keepalive every
 25:data:`ctfy.core.constants.SSE_KEEPALIVE_INTERVAL` seconds — are
 26swallowed silently.
 27"""
 28
 29from __future__ import annotations
 30
 31import json
 32import time
 33from collections.abc import Callable, Iterator
 34from typing import Any
 35
 36import httpx
 37
 38_RECONNECT_BACKOFF_MAX_S = 30.0
 39_RECONNECT_BACKOFF_START_S = 1.0
 40
 41
 42class EventStream:
 43    """Iterator over events from a single live HTTP connection, with
 44    auto-reconnect.
 45
 46    Construct via :meth:`ctfy.sdk.PlatformClient.events`; don't
 47    instantiate directly.
 48    """
 49
 50    def __init__(
 51        self,
 52        *,
 53        client_factory: Callable[[], httpx.Client],
 54        path: str,
 55        token: str = "",
 56        auto_reconnect: bool = True,
 57    ) -> None:
 58        self._client_factory = client_factory
 59        self._path = path
 60        self._token = token
 61        self._auto_reconnect = auto_reconnect
 62        self._stopped = False
 63
 64    def __enter__(self) -> EventStream:
 65        return self
 66
 67    def __exit__(self, *_exc: object) -> None:
 68        self.close()
 69
 70    def __iter__(self) -> Iterator[dict[str, Any]]:
 71        backoff = _RECONNECT_BACKOFF_START_S
 72        while not self._stopped:
 73            try:
 74                yield from self._read_one_session()
 75            except (httpx.HTTPError, ConnectionError, OSError):
 76                if not self._auto_reconnect or self._stopped:
 77                    raise
 78                time.sleep(backoff)
 79                backoff = min(backoff * 2, _RECONNECT_BACKOFF_MAX_S)
 80                continue
 81            # Clean EOF on the connection (server closed) — reconnect
 82            # immediately if auto_reconnect, otherwise stop.
 83            if not self._auto_reconnect or self._stopped:
 84                break
 85            backoff = _RECONNECT_BACKOFF_START_S
 86
 87    def close(self) -> None:
 88        """Stop the loop; in-flight ``read_one_session`` finishes its
 89        current iter_lines step before the iterator exits."""
 90        self._stopped = True
 91
 92    # -- internal -----------------------------------------------------
 93
 94    def _read_one_session(self) -> Iterator[dict[str, Any]]:
 95        params = {"token": self._token} if self._token else None
 96        with (
 97            self._client_factory() as cli,
 98            cli.stream("GET", self._path, params=params, timeout=None) as resp,
 99        ):
100            resp.raise_for_status()
101            event_name = ""
102            data_lines: list[str] = []
103            for line in resp.iter_lines():
104                if self._stopped:
105                    return
106                if not line:
107                    # Blank line → frame boundary. Flush whatever we have.
108                    if event_name and data_lines:
109                        body = "\n".join(data_lines)
110                        try:
111                            data: Any = json.loads(body)
112                        except json.JSONDecodeError:
113                            data = body
114                        yield {"event": event_name, "data": data}
115                    event_name = ""
116                    data_lines = []
117                    continue
118                if line.startswith(":"):
119                    # Comment line (server keepalive, initial ": connected").
120                    continue
121                if line.startswith("event:"):
122                    event_name = line[6:].lstrip()
123                elif line.startswith("data:"):
124                    # Per the SSE spec a single ``data:`` value may span
125                    # multiple lines; rejoin with ``\n``. ctfy's encoder
126                    # emits exactly one ``data:`` line so this is normally
127                    # a single entry, but be tolerant.
128                    data_lines.append(line[5:].lstrip())
129                # ``id:`` / ``retry:`` lines: ignored — the platform
130                # doesn't currently emit them.
class EventStream:
 43class EventStream:
 44    """Iterator over events from a single live HTTP connection, with
 45    auto-reconnect.
 46
 47    Construct via :meth:`ctfy.sdk.PlatformClient.events`; don't
 48    instantiate directly.
 49    """
 50
 51    def __init__(
 52        self,
 53        *,
 54        client_factory: Callable[[], httpx.Client],
 55        path: str,
 56        token: str = "",
 57        auto_reconnect: bool = True,
 58    ) -> None:
 59        self._client_factory = client_factory
 60        self._path = path
 61        self._token = token
 62        self._auto_reconnect = auto_reconnect
 63        self._stopped = False
 64
 65    def __enter__(self) -> EventStream:
 66        return self
 67
 68    def __exit__(self, *_exc: object) -> None:
 69        self.close()
 70
 71    def __iter__(self) -> Iterator[dict[str, Any]]:
 72        backoff = _RECONNECT_BACKOFF_START_S
 73        while not self._stopped:
 74            try:
 75                yield from self._read_one_session()
 76            except (httpx.HTTPError, ConnectionError, OSError):
 77                if not self._auto_reconnect or self._stopped:
 78                    raise
 79                time.sleep(backoff)
 80                backoff = min(backoff * 2, _RECONNECT_BACKOFF_MAX_S)
 81                continue
 82            # Clean EOF on the connection (server closed) — reconnect
 83            # immediately if auto_reconnect, otherwise stop.
 84            if not self._auto_reconnect or self._stopped:
 85                break
 86            backoff = _RECONNECT_BACKOFF_START_S
 87
 88    def close(self) -> None:
 89        """Stop the loop; in-flight ``read_one_session`` finishes its
 90        current iter_lines step before the iterator exits."""
 91        self._stopped = True
 92
 93    # -- internal -----------------------------------------------------
 94
 95    def _read_one_session(self) -> Iterator[dict[str, Any]]:
 96        params = {"token": self._token} if self._token else None
 97        with (
 98            self._client_factory() as cli,
 99            cli.stream("GET", self._path, params=params, timeout=None) as resp,
100        ):
101            resp.raise_for_status()
102            event_name = ""
103            data_lines: list[str] = []
104            for line in resp.iter_lines():
105                if self._stopped:
106                    return
107                if not line:
108                    # Blank line → frame boundary. Flush whatever we have.
109                    if event_name and data_lines:
110                        body = "\n".join(data_lines)
111                        try:
112                            data: Any = json.loads(body)
113                        except json.JSONDecodeError:
114                            data = body
115                        yield {"event": event_name, "data": data}
116                    event_name = ""
117                    data_lines = []
118                    continue
119                if line.startswith(":"):
120                    # Comment line (server keepalive, initial ": connected").
121                    continue
122                if line.startswith("event:"):
123                    event_name = line[6:].lstrip()
124                elif line.startswith("data:"):
125                    # Per the SSE spec a single ``data:`` value may span
126                    # multiple lines; rejoin with ``\n``. ctfy's encoder
127                    # emits exactly one ``data:`` line so this is normally
128                    # a single entry, but be tolerant.
129                    data_lines.append(line[5:].lstrip())
130                # ``id:`` / ``retry:`` lines: ignored — the platform
131                # doesn't currently emit them.

Iterator over events from a single live HTTP connection, with auto-reconnect.

Construct via ctfy.sdk.PlatformClient.events(); don't instantiate directly.

EventStream( *, client_factory: Callable[[], httpx.Client], path: str, token: str = '', auto_reconnect: bool = True)
51    def __init__(
52        self,
53        *,
54        client_factory: Callable[[], httpx.Client],
55        path: str,
56        token: str = "",
57        auto_reconnect: bool = True,
58    ) -> None:
59        self._client_factory = client_factory
60        self._path = path
61        self._token = token
62        self._auto_reconnect = auto_reconnect
63        self._stopped = False
def close(self) -> None:
88    def close(self) -> None:
89        """Stop the loop; in-flight ``read_one_session`` finishes its
90        current iter_lines step before the iterator exits."""
91        self._stopped = True

Stop the loop; in-flight read_one_session finishes its current iter_lines step before the iterator exits.