ctfy.sdk

Python SDK for the ctfy platform.

Drives the platform's /api/v1/* REST API from Python with typed Pydantic returns and auto-retry on transient errors. Two clients, split by audience — the same line the ctfy / ctfy-admin CLI binaries draw:

  • ~ctfy.sdk.client.PlatformClient — the player client. This is what agents and players want. Methods are grouped into resource namespaces (client.teams, client.instances, …) plus a per-competition handle via client.competition(id).
  • ~ctfy.sdk.admin.AdminClient — the operator client (/api/v1/admin/* and fleet management). Reached via client.admin or AdminClient.connect(). Kept separate so the player reference never surfaces admin endpoints they can't call.

Install

The full ctfy package pulls in the server too. For an agent / harness that only needs the client, use the client extra::

pip install "ctfy[client]"

Authenticate

Sign in to the platform in the browser, then go to Settings → API tokens and mint a fine-grained token (prefix pf_). These tokens launch instances and submit answers on the team's behalf but cannot mint more tokens, re-link OAuth providers, or change team membership — so a leaked token never costs you the account.

Export it as CTFY_TOKEN for the CLI / MCP server, or pass it directly to PlatformClient.

Quick start

End-to-end: discover a competition, register, list its challenges, launch one, read the attack surface, submit a captured flag::

from ctfy.sdk import PlatformClient

client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")

# 1) Find a running competition
comps = client.competitions.list(phase="running")
comp = comps[0]
print(comp.id, comp.title)

# 2) Scope to the competition, then register Solo (skip if already on a
#    team: inspect ``client.me.get().competition_teams`` first)
comp_api = client.competition(comp.id)
team = comp_api.register(mode="solo")
print(team.id)

# 3) List the challenges scoped to this competition (curated order)
challenges = comp_api.challenges()
for ch in challenges:
    print(f"  {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")

# 4) Launch the first challenge — blocks until ready
ready = client.instances.start(
    challenges[0].id, competition_id=comp.id
)
print(f"instance {ready.id}")

# 5) Read the attack surface (services + credentials, plus VPN for
#    engagement-mode challenges)
for svc in ready.surface.services:
    print(f"  [{svc.service_type.value}] {svc.url}")
    if svc.credentials:
        print(f"    {svc.credentials.username}/{svc.credentials.password}")
if ready.surface.vpn:
    print(f"  VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")

# 6) Submit a captured flag (oracle-probe via
#    ``client.submissions.verify()`` first if you don't want to burn
#    an audit row)
result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
print(f"correct={result.correct} solved={result.challenge_fully_solved}")

# 7) Tear down (or let TTL expire — default 24h, admin-tunable)
client.instances.stop(ready.id)

A runnable, interactive copy of this script lives at examples/quickstart.py.

Player surface

Global / account namespaces on ~ctfy.sdk.client.PlatformClient:

  • client.teams / client.users — public team + user profile reads.
  • client.me — the caller's profile, inbox, account export/delete, achievements, solve + milestone progress.
  • client.auth — password register/login, OAuth identities, API token CRUD, sign-in discovery.
  • client.competitions — discover competitions (list / get).
  • client.challenges — global catalog, attachments, facets, feedback chips.
  • client.instancesstart (blocks until ready), status, stop, renew, list (by instance id; the competition is inferred).
  • client.submissionssubmit (graded), verify (oracle probe, no record), list, and QA challenges.
  • client.scoreboard — global standings, per-challenge stats, snapshots.
  • client.achievements / client.activities / client.nodes — public badge catalog, activity log, worker-node list.

Per-competition scope — client.competition(id) returns a ~ctfy.sdk.competition.Competition handle for the operations that are inherently competition-scoped, so competition_id is named once:

  • comp.register(mode="solo" | "create" | "join", …) — get on a team.
  • comp.team — your team: rename / leave / kick, plus comp.team.invites (codes + email invites) and comp.team.requests (approve / reject join requests).
  • comp.challenges() / comp.search_teams() and the standings (comp.scoreboard() / score_history / score_distribution / solve_matrix / challenge_breakdown).

Top-level convenience methods stay flat: client.health(), client.get_meta(), client.cluster_info(), client.events(), client.check_server_compatibility().

Admin surface

client.admin (an ~ctfy.sdk.admin.AdminClient) carries the operator endpoints, grouped the same way: admin.users, admin.competitions, admin.announcements, admin.achievements, admin.challenges, admin.instances, admin.records (instance forensics), admin.nodes, admin.observability, admin.settings, admin.competition_admins. Operator tooling without a player client can use AdminClient.connect(url, token=...).

Realtime ~~~~

PlatformClient.events() yields an auto-reconnecting ~ctfy.sdk.events.EventStream over the SSE endpoint — solve broadcasts, instance state changes, scoreboard refreshes.

See also

  • PlatformClient — the player client; every namespace method has full docstrings.
  • AdminClient — the operator client.
  • ctfy.server.models — the typed response shapes (ChallengeInfo, InstanceInfo, SubmissionResponse, …) that SDK methods return. Re-using the server's Pydantic models means the wire shape can never drift between the two sides.
  • <platform>/docs — auto-generated FastAPI Swagger UI for the raw REST API (language-agnostic).
  • <platform>/api/v1/mcp/ — streamable-HTTP MCP endpoint with the same operations as MCP tools, if the agent speaks MCP instead of Python.
  • ~ctfy.sdk.node_client.NodeClient — separate client for worker-node ↔ platform RPCs; not what agent harnesses want.
  1"""Python SDK for the ctfy platform.
  2
  3Drives the platform's ``/api/v1/*`` REST API from Python with typed Pydantic
  4returns and auto-retry on transient errors. Two clients, split by audience —
  5the same line the ``ctfy`` / ``ctfy-admin`` CLI binaries draw:
  6
  7- :class:`~ctfy.sdk.client.PlatformClient` — the **player** client. This is
  8  what agents and players want. Methods are grouped into resource namespaces
  9  (``client.teams``, ``client.instances``, …) plus a per-competition handle
 10  via ``client.competition(id)``.
 11- :class:`~ctfy.sdk.admin.AdminClient` — the **operator** client
 12  (``/api/v1/admin/*`` and fleet management). Reached via ``client.admin`` or
 13  :meth:`AdminClient.connect`. Kept separate so the player reference never
 14  surfaces admin endpoints they can't call.
 15
 16Install
 17-------
 18
 19The full ``ctfy`` package pulls in the server too. For an agent /
 20harness that only needs the client, use the ``client`` extra::
 21
 22    pip install "ctfy[client]"
 23
 24Authenticate
 25------------
 26
 27Sign in to the platform in the browser, then go to **Settings → API
 28tokens** and mint a fine-grained token (prefix ``pf_``). These tokens
 29launch instances and submit answers on the team's behalf but cannot
 30mint more tokens, re-link OAuth providers, or change team
 31membership — so a leaked token never costs you the account.
 32
 33Export it as ``CTFY_TOKEN`` for the CLI / MCP server, or pass it
 34directly to :class:`PlatformClient`.
 35
 36Quick start
 37-----------
 38
 39End-to-end: discover a competition, register, list its challenges,
 40launch one, read the attack surface, submit a captured flag::
 41
 42    from ctfy.sdk import PlatformClient
 43
 44    client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")
 45
 46    # 1) Find a running competition
 47    comps = client.competitions.list(phase="running")
 48    comp = comps[0]
 49    print(comp.id, comp.title)
 50
 51    # 2) Scope to the competition, then register Solo (skip if already on a
 52    #    team: inspect ``client.me.get().competition_teams`` first)
 53    comp_api = client.competition(comp.id)
 54    team = comp_api.register(mode="solo")
 55    print(team.id)
 56
 57    # 3) List the challenges scoped to this competition (curated order)
 58    challenges = comp_api.challenges()
 59    for ch in challenges:
 60        print(f"  {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")
 61
 62    # 4) Launch the first challenge — blocks until ready
 63    ready = client.instances.start(
 64        challenges[0].id, competition_id=comp.id
 65    )
 66    print(f"instance {ready.id}")
 67
 68    # 5) Read the attack surface (services + credentials, plus VPN for
 69    #    engagement-mode challenges)
 70    for svc in ready.surface.services:
 71        print(f"  [{svc.service_type.value}] {svc.url}")
 72        if svc.credentials:
 73            print(f"    {svc.credentials.username}/{svc.credentials.password}")
 74    if ready.surface.vpn:
 75        print(f"  VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")
 76
 77    # 6) Submit a captured flag (oracle-probe via
 78    #    ``client.submissions.verify()`` first if you don't want to burn
 79    #    an audit row)
 80    result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
 81    print(f"correct={result.correct} solved={result.challenge_fully_solved}")
 82
 83    # 7) Tear down (or let TTL expire — default 24h, admin-tunable)
 84    client.instances.stop(ready.id)
 85
 86A runnable, interactive copy of this script lives at
 87``examples/quickstart.py``.
 88
 89Player surface
 90--------------
 91
 92Global / account namespaces on :class:`~ctfy.sdk.client.PlatformClient`:
 93
 94- ``client.teams`` / ``client.users`` — public team + user profile reads.
 95- ``client.me`` — the caller's profile, inbox, account export/delete,
 96  achievements, solve + milestone progress.
 97- ``client.auth`` — password register/login, OAuth identities, API token
 98  CRUD, sign-in discovery.
 99- ``client.competitions`` — discover competitions (``list`` / ``get``).
100- ``client.challenges`` — global catalog, attachments, facets, feedback chips.
101- ``client.instances`` — ``start`` (blocks until ready), ``status``, ``stop``,
102  ``renew``, ``list`` (by instance id; the competition is inferred).
103- ``client.submissions`` — ``submit`` (graded), ``verify`` (oracle probe,
104  no record), ``list``, and QA challenges.
105- ``client.scoreboard`` — global standings, per-challenge stats, snapshots.
106- ``client.achievements`` / ``client.activities`` / ``client.nodes`` — public
107  badge catalog, activity log, worker-node list.
108
109Per-competition scope — ``client.competition(id)`` returns a
110:class:`~ctfy.sdk.competition.Competition` handle for the operations that are
111*inherently* competition-scoped, so ``competition_id`` is named once:
112
113- ``comp.register(mode="solo" | "create" | "join", …)`` — get on a team.
114- ``comp.team`` — your team: ``rename`` / ``leave`` / ``kick``, plus
115  ``comp.team.invites`` (codes + email invites) and ``comp.team.requests``
116  (approve / reject join requests).
117- ``comp.challenges()`` / ``comp.search_teams()`` and the standings
118  (``comp.scoreboard()`` / ``score_history`` / ``score_distribution`` /
119  ``solve_matrix`` / ``challenge_breakdown``).
120
121Top-level convenience methods stay flat: ``client.health()``,
122``client.get_meta()``, ``client.cluster_info()``, ``client.events()``,
123``client.check_server_compatibility()``.
124
125Admin surface
126-------------
127
128``client.admin`` (an :class:`~ctfy.sdk.admin.AdminClient`) carries the
129operator endpoints, grouped the same way: ``admin.users``,
130``admin.competitions``, ``admin.announcements``, ``admin.achievements``,
131``admin.challenges``, ``admin.instances``, ``admin.records`` (instance
132forensics), ``admin.nodes``, ``admin.observability``, ``admin.settings``,
133``admin.competition_admins``. Operator tooling without a player client can use
134``AdminClient.connect(url, token=...)``.
135
136Realtime
137~~~~~~~~
138
139:meth:`PlatformClient.events` yields an auto-reconnecting
140:class:`~ctfy.sdk.events.EventStream` over the SSE endpoint —
141solve broadcasts, instance state changes, scoreboard refreshes.
142
143See also
144--------
145
146- :class:`PlatformClient` — the player client; every namespace method has
147  full docstrings.
148- :class:`AdminClient` — the operator client.
149- ``ctfy.server.models`` — the typed response shapes
150  (``ChallengeInfo``, ``InstanceInfo``, ``SubmissionResponse``, …)
151  that SDK methods return. Re-using the server's Pydantic models means
152  the wire shape can never drift between the two sides.
153- ``<platform>/docs`` — auto-generated FastAPI Swagger UI for the
154  raw REST API (language-agnostic).
155- ``<platform>/api/v1/mcp/`` — streamable-HTTP MCP endpoint with
156  the same operations as MCP tools, if the agent speaks MCP
157  instead of Python.
158- :class:`~ctfy.sdk.node_client.NodeClient` — separate client for
159  worker-node ↔ platform RPCs; not what agent harnesses want.
160"""
161
162from ctfy.sdk.admin import AdminClient
163from ctfy.sdk.client import PlatformClient
164from ctfy.sdk.node_client import NodeClient
165
166__all__ = ["AdminClient", "NodeClient", "PlatformClient"]
class AdminClient:
 55class AdminClient:
 56    """Admin / operator surface, grouped into resource namespaces.
 57
 58    Namespaces: :attr:`users`, :attr:`competitions`, :attr:`announcements`,
 59    :attr:`achievements`, :attr:`challenges`, :attr:`instances`,
 60    :attr:`records`, :attr:`nodes`, :attr:`observability`, :attr:`settings`,
 61    :attr:`competition_admins`.
 62    """
 63
 64    def __init__(self, http: BaseHttpClient) -> None:
 65        #: Shared transport (the parent PlatformClient when reached via
 66        #: ``client.admin``, or an owned client when built via ``connect``).
 67        self._http = http
 68
 69    @classmethod
 70    def connect(
 71        cls,
 72        server_url: str,
 73        token: str = "",
 74        *,
 75        max_retries: int = 3,
 76        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 77    ) -> AdminClient:
 78        """Build an ``AdminClient`` that owns its own transport.
 79
 80        For operator tooling that doesn't already hold a
 81        :class:`PlatformClient`. The returned client is a context manager
 82        that closes its transport on exit.
 83        """
 84        base = server_url.rstrip("/")
 85        http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries)
 86        return cls(http)
 87
 88    @cached_property
 89    def users(self) -> AdminUsersResource:
 90        return AdminUsersResource(self._http)
 91
 92    @cached_property
 93    def competitions(self) -> AdminCompetitionsResource:
 94        return AdminCompetitionsResource(self._http)
 95
 96    @cached_property
 97    def announcements(self) -> AdminAnnouncementsResource:
 98        return AdminAnnouncementsResource(self._http)
 99
100    @cached_property
101    def achievements(self) -> AdminAchievementsResource:
102        return AdminAchievementsResource(self._http)
103
104    @cached_property
105    def challenges(self) -> AdminChallengesResource:
106        return AdminChallengesResource(self._http)
107
108    @cached_property
109    def instances(self) -> AdminInstancesResource:
110        return AdminInstancesResource(self._http)
111
112    @cached_property
113    def records(self) -> AdminRecordsResource:
114        return AdminRecordsResource(self._http)
115
116    @cached_property
117    def nodes(self) -> AdminNodesResource:
118        return AdminNodesResource(self._http)
119
120    @cached_property
121    def observability(self) -> AdminObservabilityResource:
122        return AdminObservabilityResource(self._http)
123
124    @cached_property
125    def settings(self) -> AdminSettingsResource:
126        return AdminSettingsResource(self._http)
127
128    @cached_property
129    def competition_admins(self) -> AdminCompetitionAdminsResource:
130        return AdminCompetitionAdminsResource(self._http)
131
132    def close(self) -> None:
133        """Close the underlying transport.
134
135        Only call this on a standalone client built via :meth:`connect`;
136        when reached via ``PlatformClient.admin`` the transport is shared
137        with — and closed by — the parent player client.
138        """
139        self._http.close()
140
141    def __enter__(self) -> Self:
142        return self
143
144    def __exit__(
145        self,
146        exc_type: type[BaseException] | None,
147        exc: BaseException | None,
148        tb: TracebackType | None,
149    ) -> None:
150        self.close()

Admin / operator surface, grouped into resource namespaces.

Namespaces: users, competitions, announcements, achievements, challenges, instances, records, nodes, observability, settings, competition_admins.

AdminClient(http: ctfy.sdk.base.BaseHttpClient)
64    def __init__(self, http: BaseHttpClient) -> None:
65        #: Shared transport (the parent PlatformClient when reached via
66        #: ``client.admin``, or an owned client when built via ``connect``).
67        self._http = http
@classmethod
def connect( cls, server_url: str, token: str = '', *, max_retries: int = 3, timeout: int = 600) -> AdminClient:
69    @classmethod
70    def connect(
71        cls,
72        server_url: str,
73        token: str = "",
74        *,
75        max_retries: int = 3,
76        timeout: int = DEFAULT_CLIENT_TIMEOUT,
77    ) -> AdminClient:
78        """Build an ``AdminClient`` that owns its own transport.
79
80        For operator tooling that doesn't already hold a
81        :class:`PlatformClient`. The returned client is a context manager
82        that closes its transport on exit.
83        """
84        base = server_url.rstrip("/")
85        http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries)
86        return cls(http)

Build an AdminClient that owns its own transport.

For operator tooling that doesn't already hold a PlatformClient. The returned client is a context manager that closes its transport on exit.

88    @cached_property
89    def users(self) -> AdminUsersResource:
90        return AdminUsersResource(self._http)
92    @cached_property
93    def competitions(self) -> AdminCompetitionsResource:
94        return AdminCompetitionsResource(self._http)
96    @cached_property
97    def announcements(self) -> AdminAnnouncementsResource:
98        return AdminAnnouncementsResource(self._http)
100    @cached_property
101    def achievements(self) -> AdminAchievementsResource:
102        return AdminAchievementsResource(self._http)
104    @cached_property
105    def challenges(self) -> AdminChallengesResource:
106        return AdminChallengesResource(self._http)
108    @cached_property
109    def instances(self) -> AdminInstancesResource:
110        return AdminInstancesResource(self._http)
112    @cached_property
113    def records(self) -> AdminRecordsResource:
114        return AdminRecordsResource(self._http)
116    @cached_property
117    def nodes(self) -> AdminNodesResource:
118        return AdminNodesResource(self._http)
120    @cached_property
121    def observability(self) -> AdminObservabilityResource:
122        return AdminObservabilityResource(self._http)
124    @cached_property
125    def settings(self) -> AdminSettingsResource:
126        return AdminSettingsResource(self._http)
128    @cached_property
129    def competition_admins(self) -> AdminCompetitionAdminsResource:
130        return AdminCompetitionAdminsResource(self._http)
def close(self) -> None:
132    def close(self) -> None:
133        """Close the underlying transport.
134
135        Only call this on a standalone client built via :meth:`connect`;
136        when reached via ``PlatformClient.admin`` the transport is shared
137        with — and closed by — the parent player client.
138        """
139        self._http.close()

Close the underlying transport.

Only call this on a standalone client built via connect(); when reached via PlatformClient.admin the transport is shared with — and closed by — the parent player client.

class NodeClient(ctfy.sdk.base.BaseHttpClient):
 48class NodeClient(BaseHttpClient):
 49    """Thin sync HTTP client; one instance per node URL."""
 50
 51    def __init__(
 52        self,
 53        node_url: str,
 54        token: str,
 55        *,
 56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 57    ) -> None:
 58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
 59
 60    # -- lifecycle ----------------------------------------------------------
 61
 62    def start_instance(
 63        self,
 64        *,
 65        challenge_id: str,
 66        instance_id: str,
 67        ttl: int,
 68        answers: dict[str, str],
 69        proxy_output_dir: str | None = None,
 70    ) -> dict[str, Any]:
 71        resp = self.request(
 72            "POST",
 73            "/instances",
 74            json={
 75                "challenge_id": challenge_id,
 76                "instance_id": instance_id,
 77                "ttl": ttl,
 78                "answers": answers,
 79                "proxy_output_dir": proxy_output_dir,
 80            },
 81        )
 82        _raise_for_node_status(resp)
 83        body: dict[str, Any] = resp.json()
 84        return body
 85
 86    def stop_instance(self, instance_id: str) -> None:
 87        resp = self.request("DELETE", f"/instances/{instance_id}")
 88        resp.raise_for_status()
 89
 90    def stop_all(self) -> None:
 91        resp = self.request("POST", "/admin/stop-all")
 92        resp.raise_for_status()
 93
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body
103
104    # -- admin pre-build (image cache warming) ------------------------------
105
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body
118
119    def build_all_challenges(self) -> dict[str, Any]:
120        """Queue background pre-build for every spec the node knows about.
121
122        Returns ``{queued, skipped_built, skipped_in_progress}`` so the
123        platform can report per-node what got picked up. Sequential
124        on the node side — no fan-out across the corpus.
125        """
126        resp = self.request("POST", "/admin/challenges/build-all")
127        _raise_for_node_status(resp)
128        body: dict[str, Any] = resp.json()
129        return body
130
131    def get_build_state(self) -> dict[str, Any]:
132        """Fetch every per-challenge build-state row on this node.
133
134        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
135        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
136        ``failed``; ``unbuilt`` placeholders are synthesised for specs
137        the node has seen but never been asked to build.
138        """
139        resp = self.request("GET", "/admin/challenges/build-state")
140        _raise_for_node_status(resp)
141        body: dict[str, Any] = resp.json()
142        return body
143
144    # -- status / health ----------------------------------------------------
145
146    def get_status(self, instance_id: str) -> dict[str, Any]:
147        resp = self.request("GET", f"/instances/{instance_id}/status")
148        resp.raise_for_status()
149        body: dict[str, Any] = resp.json()
150        return body
151
152    def check_health(self, instance_id: str) -> bool:
153        resp = self.request("GET", f"/instances/{instance_id}/health")
154        resp.raise_for_status()
155        return bool(resp.json().get("is_healthy"))
156
157    def node_health(self) -> dict[str, Any]:
158        """Liveness + ``{running, capacity}`` for heartbeat."""
159        resp = self.request("GET", "/health")
160        resp.raise_for_status()
161        body: dict[str, Any] = resp.json()
162        return body
163
164    # -- traffic / runtime --------------------------------------------------
165
166    def get_traffic(self, instance_id: str) -> dict[str, Any]:
167        """Fetch mitmproxy flow data; file lives on the node's FS."""
168        resp = self.request("GET", f"/instances/{instance_id}/traffic")
169        resp.raise_for_status()
170        body: dict[str, Any] = resp.json()
171        return body
172
173    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
174        """Enumerate every container in an instance's compose project.
175
176        Backs the admin-shell feature: the platform forwards a
177        ``GET /admin/instances/{id}/containers`` to the assigned node,
178        which returns one row per container (challenge services and
179        platform-injected sidecars alike). The WebSocket reverse-proxy
180        is opened separately and does not go through ``NodeClient``.
181        """
182        resp = self.request("GET", f"/instances/{instance_id}/containers")
183        _raise_for_node_status(resp)
184        body: list[dict[str, Any]] = resp.json()
185        return body
186
187    def get_container_logs(self, instance_id: str) -> str:
188        """Fetch combined container stdout/stderr for archival."""
189        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
190        resp.raise_for_status()
191        return str(resp.json().get("logs") or "")
192
193    def get_pcap(self, instance_id: str) -> bytes:
194        """Fetch the raw tcpdump capture for *instance_id*.
195
196        Returns an empty ``bytes`` when no capture exists (sidecar
197        disabled, instance never reached the running state, etc.) so
198        callers can persist conditionally without try/except.
199        """
200        resp = self.request("GET", f"/instances/{instance_id}/pcap")
201        if resp.status_code == 404:
202            return b""
203        resp.raise_for_status()
204        return resp.content
205
206    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
207        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
208        CA PEM, optional Docker-specific names). Replaces the old
209        sandbox-network shape."""
210        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
211        resp.raise_for_status()
212        body: dict[str, Any] = resp.json()
213        return body
214
215    # -- per-instance rendered attachments ---------------------------------
216
217    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
218        """List per-instance attachments rendered into the node's workdir.
219
220        Returns the raw JSON dict — caller projects through
221        :class:`AttachmentList` for typing. Used by the platform's
222        per-instance attachment endpoint to surface team-specific
223        rendered file listings."""
224        resp = self.request("GET", f"/instances/{instance_id}/attachments")
225        resp.raise_for_status()
226        body: dict[str, Any] = resp.json()
227        return body
228
229    def get_openvpn_config(self, instance_id: str) -> bytes:
230        """Fetch the per-instance OpenVPN client config from the node.
231
232        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
233        ``openvpn`` service container (generated on first boot by the
234        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
235        verbatim — the platform-side route adds the
236        ``Content-Disposition`` header before re-emitting to the player.
237
238        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
239        proxy route catches 404 and re-emits to the player, anything
240        else is treated as a 502.
241        """
242        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
243        resp.raise_for_status()
244        return resp.content
245
246    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
247        """Download one per-instance attachment as ``(bytes, content_type)``.
248
249        Buffers the whole body — attachments are bounded (typical case
250        is a 10 KB binary or a small text file). For huge captures the
251        caller should hand the player a CDN URL instead.
252
253        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
254        proxy route catches 404 and re-emits to the player, anything
255        else is a 502."""
256        resp = self.request(
257            "GET",
258            f"/instances/{instance_id}/attachments/{filename}",
259        )
260        resp.raise_for_status()
261        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Thin sync HTTP client; one instance per node URL.

NodeClient(node_url: str, token: str, *, timeout: int = 600)
51    def __init__(
52        self,
53        node_url: str,
54        token: str,
55        *,
56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
57    ) -> None:
58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
def start_instance( self, *, challenge_id: str, instance_id: str, ttl: int, answers: dict[str, str], proxy_output_dir: str | None = None) -> dict[str, typing.Any]:
62    def start_instance(
63        self,
64        *,
65        challenge_id: str,
66        instance_id: str,
67        ttl: int,
68        answers: dict[str, str],
69        proxy_output_dir: str | None = None,
70    ) -> dict[str, Any]:
71        resp = self.request(
72            "POST",
73            "/instances",
74            json={
75                "challenge_id": challenge_id,
76                "instance_id": instance_id,
77                "ttl": ttl,
78                "answers": answers,
79                "proxy_output_dir": proxy_output_dir,
80            },
81        )
82        _raise_for_node_status(resp)
83        body: dict[str, Any] = resp.json()
84        return body
def stop_instance(self, instance_id: str) -> None:
86    def stop_instance(self, instance_id: str) -> None:
87        resp = self.request("DELETE", f"/instances/{instance_id}")
88        resp.raise_for_status()
def stop_all(self) -> None:
90    def stop_all(self) -> None:
91        resp = self.request("POST", "/admin/stop-all")
92        resp.raise_for_status()
def rescan_challenges(self) -> dict[str, typing.Any]:
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body

Tell the node to drop its spec cache and re-scan challenges_dir.

Returns {total, added, removed} so the platform can report the per-node outcome of a cluster-wide rescan.

def build_challenge(self, challenge_id: str) -> dict[str, typing.Any]:
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body

Ask the node to pre-build images for challenge_id.

Fires-and-returns: the node persists status="building" and spawns a daemon thread; the body returned here is that initial state row. Polling get_build_state() is how the platform learns when it lands on built / failed.

def build_all_challenges(self) -> dict[str, typing.Any]:
119    def build_all_challenges(self) -> dict[str, Any]:
120        """Queue background pre-build for every spec the node knows about.
121
122        Returns ``{queued, skipped_built, skipped_in_progress}`` so the
123        platform can report per-node what got picked up. Sequential
124        on the node side — no fan-out across the corpus.
125        """
126        resp = self.request("POST", "/admin/challenges/build-all")
127        _raise_for_node_status(resp)
128        body: dict[str, Any] = resp.json()
129        return body

Queue background pre-build for every spec the node knows about.

Returns {queued, skipped_built, skipped_in_progress} so the platform can report per-node what got picked up. Sequential on the node side — no fan-out across the corpus.

def get_build_state(self) -> dict[str, typing.Any]:
131    def get_build_state(self) -> dict[str, Any]:
132        """Fetch every per-challenge build-state row on this node.
133
134        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
135        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
136        ``failed``; ``unbuilt`` placeholders are synthesised for specs
137        the node has seen but never been asked to build.
138        """
139        resp = self.request("GET", "/admin/challenges/build-state")
140        _raise_for_node_status(resp)
141        body: dict[str, Any] = resp.json()
142        return body

Fetch every per-challenge build-state row on this node.

Returns {"rows": [{challenge_id, status, built_at, error}, …]}. status is one of unbuilt / building / built / failed; unbuilt placeholders are synthesised for specs the node has seen but never been asked to build.

def get_status(self, instance_id: str) -> dict[str, typing.Any]:
146    def get_status(self, instance_id: str) -> dict[str, Any]:
147        resp = self.request("GET", f"/instances/{instance_id}/status")
148        resp.raise_for_status()
149        body: dict[str, Any] = resp.json()
150        return body
def check_health(self, instance_id: str) -> bool:
152    def check_health(self, instance_id: str) -> bool:
153        resp = self.request("GET", f"/instances/{instance_id}/health")
154        resp.raise_for_status()
155        return bool(resp.json().get("is_healthy"))
def node_health(self) -> dict[str, typing.Any]:
157    def node_health(self) -> dict[str, Any]:
158        """Liveness + ``{running, capacity}`` for heartbeat."""
159        resp = self.request("GET", "/health")
160        resp.raise_for_status()
161        body: dict[str, Any] = resp.json()
162        return body

Liveness + {running, capacity} for heartbeat.

def get_traffic(self, instance_id: str) -> dict[str, typing.Any]:
166    def get_traffic(self, instance_id: str) -> dict[str, Any]:
167        """Fetch mitmproxy flow data; file lives on the node's FS."""
168        resp = self.request("GET", f"/instances/{instance_id}/traffic")
169        resp.raise_for_status()
170        body: dict[str, Any] = resp.json()
171        return body

Fetch mitmproxy flow data; file lives on the node's FS.

def list_containers(self, instance_id: str) -> list[dict[str, typing.Any]]:
173    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
174        """Enumerate every container in an instance's compose project.
175
176        Backs the admin-shell feature: the platform forwards a
177        ``GET /admin/instances/{id}/containers`` to the assigned node,
178        which returns one row per container (challenge services and
179        platform-injected sidecars alike). The WebSocket reverse-proxy
180        is opened separately and does not go through ``NodeClient``.
181        """
182        resp = self.request("GET", f"/instances/{instance_id}/containers")
183        _raise_for_node_status(resp)
184        body: list[dict[str, Any]] = resp.json()
185        return body

Enumerate every container in an instance's compose project.

Backs the admin-shell feature: the platform forwards a GET /admin/instances/{id}/containers to the assigned node, which returns one row per container (challenge services and platform-injected sidecars alike). The WebSocket reverse-proxy is opened separately and does not go through NodeClient.

def get_container_logs(self, instance_id: str) -> str:
187    def get_container_logs(self, instance_id: str) -> str:
188        """Fetch combined container stdout/stderr for archival."""
189        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
190        resp.raise_for_status()
191        return str(resp.json().get("logs") or "")

Fetch combined container stdout/stderr for archival.

def get_pcap(self, instance_id: str) -> bytes:
193    def get_pcap(self, instance_id: str) -> bytes:
194        """Fetch the raw tcpdump capture for *instance_id*.
195
196        Returns an empty ``bytes`` when no capture exists (sidecar
197        disabled, instance never reached the running state, etc.) so
198        callers can persist conditionally without try/except.
199        """
200        resp = self.request("GET", f"/instances/{instance_id}/pcap")
201        if resp.status_code == 404:
202            return b""
203        resp.raise_for_status()
204        return resp.content

Fetch the raw tcpdump capture for instance_id.

Returns an empty bytes when no capture exists (sidecar disabled, instance never reached the running state, etc.) so callers can persist conditionally without try/except.

def get_agent_runtime(self, instance_id: str) -> dict[str, typing.Any]:
206    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
207        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
208        CA PEM, optional Docker-specific names). Replaces the old
209        sandbox-network shape."""
210        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
211        resp.raise_for_status()
212        body: dict[str, Any] = resp.json()
213        return body

Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.

def list_instance_attachments(self, instance_id: str) -> dict[str, typing.Any]:
217    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
218        """List per-instance attachments rendered into the node's workdir.
219
220        Returns the raw JSON dict — caller projects through
221        :class:`AttachmentList` for typing. Used by the platform's
222        per-instance attachment endpoint to surface team-specific
223        rendered file listings."""
224        resp = self.request("GET", f"/instances/{instance_id}/attachments")
225        resp.raise_for_status()
226        body: dict[str, Any] = resp.json()
227        return body

List per-instance attachments rendered into the node's workdir.

Returns the raw JSON dict — caller projects through AttachmentList for typing. Used by the platform's per-instance attachment endpoint to surface team-specific rendered file listings.

def get_openvpn_config(self, instance_id: str) -> bytes:
229    def get_openvpn_config(self, instance_id: str) -> bytes:
230        """Fetch the per-instance OpenVPN client config from the node.
231
232        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
233        ``openvpn`` service container (generated on first boot by the
234        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
235        verbatim — the platform-side route adds the
236        ``Content-Disposition`` header before re-emitting to the player.
237
238        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
239        proxy route catches 404 and re-emits to the player, anything
240        else is treated as a 502.
241        """
242        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
243        resp.raise_for_status()
244        return resp.content

Fetch the per-instance OpenVPN client config from the node.

The node reads /etc/openvpn/client.ovpn out of the instance's openvpn service container (generated on first boot by the ctfy/openvpn-base image's bootstrap). Returns the body verbatim — the platform-side route adds the Content-Disposition header before re-emitting to the player.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is treated as a 502.

def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
246    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
247        """Download one per-instance attachment as ``(bytes, content_type)``.
248
249        Buffers the whole body — attachments are bounded (typical case
250        is a 10 KB binary or a small text file). For huge captures the
251        caller should hand the player a CDN URL instead.
252
253        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
254        proxy route catches 404 and re-emits to the player, anything
255        else is a 502."""
256        resp = self.request(
257            "GET",
258            f"/instances/{instance_id}/attachments/{filename}",
259        )
260        resp.raise_for_status()
261        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Download one per-instance attachment as (bytes, content_type).

Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is a 502.

class PlatformClient(ctfy.sdk.base.BaseHttpClient):
 77class PlatformClient(BaseHttpClient):
 78    """Player-facing HTTP client for the ctfy platform.
 79
 80    Methods are grouped into resource namespaces (``client.teams.list()``, …)
 81    plus a per-competition scope via ``client.competition(id)``. The admin
 82    surface is a separate :class:`~ctfy.sdk.admin.AdminClient` reached via
 83    :attr:`admin`. Used by the CLI, the SDK, the MCP server, and external callers.
 84    """
 85
 86    def __init__(
 87        self,
 88        server_url: str,
 89        token: str = "",
 90        max_retries: int = 3,
 91        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 92    ):
 93        self._url = server_url.rstrip("/")
 94        super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries)
 95
 96    # -- resource namespaces ------------------------------------------------
 97
 98    @cached_property
 99    def teams(self) -> TeamsResource:
100        """Public team discovery + profile reads."""
101        return TeamsResource(self)
102
103    @cached_property
104    def users(self) -> UsersResource:
105        """Public per-user profile reads."""
106        return UsersResource(self)
107
108    @cached_property
109    def me(self) -> MeResource:
110        """The calling user's profile, inbox, account, and progress."""
111        return MeResource(self)
112
113    @cached_property
114    def auth(self) -> AuthResource:
115        """Password auth, OAuth identities, API tokens, sign-in discovery."""
116        return AuthResource(self)
117
118    @cached_property
119    def achievements(self) -> AchievementsResource:
120        """Public badge catalog, recent-unlock feed, easter eggs."""
121        return AchievementsResource(self)
122
123    @cached_property
124    def challenges(self) -> ChallengesResource:
125        """Global challenge catalog, attachments, facets, feedback chips."""
126        return ChallengesResource(self)
127
128    @cached_property
129    def competitions(self) -> CompetitionsResource:
130        """Discover competitions (``list`` / ``get``). To act within one, use
131        :meth:`competition`."""
132        return CompetitionsResource(self)
133
134    def competition(self, competition_id: str) -> Competition:
135        """Scope to one competition. The inherently competition-scoped
136        operations — register, your team (captain tooling + invites +
137        join-requests), the comp's challenges, team search, and standings —
138        hang off the returned :class:`~ctfy.sdk.competition.Competition`
139        handle, so ``competition_id`` is named once instead of on every call.
140        (Instance lifecycle and answer submission stay on :attr:`instances` /
141        :attr:`submissions`, keyed by ``instance_id``.)"""
142        return Competition(self, competition_id)
143
144    @cached_property
145    def instances(self) -> InstancesResource:
146        """Launch / control / inspect challenge instances by instance id
147        (the competition is inferred server-side)."""
148        return InstancesResource(self)
149
150    @cached_property
151    def submissions(self) -> SubmissionsResource:
152        """Submit / verify answers + list submissions (by instance id), plus
153        the instance-less QA challenges."""
154        return SubmissionsResource(self)
155
156    @cached_property
157    def scoreboard(self) -> ScoreboardResource:
158        """Global standings, per-challenge stats, persisted snapshots."""
159        return ScoreboardResource(self)
160
161    @cached_property
162    def activities(self) -> ActivitiesResource:
163        """Platform activity log + histogram."""
164        return ActivitiesResource(self)
165
166    @cached_property
167    def nodes(self) -> NodesResource:
168        """Public worker-node list (operator node mgmt is on ``admin.nodes``)."""
169        return NodesResource(self)
170
171    @cached_property
172    def admin(self) -> AdminClient:
173        """Admin / operator surface (separate, role-gated server-side).
174
175        Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`.
176        """
177        return AdminClient(self)
178
179    # -- top-level convenience: server status / realtime / version ----------
180
181    def health(self) -> HealthResponse:
182        resp = self.request("GET", "/health")
183        _raise_for_status(resp)
184        return HealthResponse.model_validate(resp.json())
185
186    def get_meta(self) -> MetaResponse:
187        """Server identity + challenge repo SHA + build version.
188        Admin tokens additionally see cluster capacity + team / solve
189        counts in the same payload."""
190        resp = self.request("GET", "/meta")
191        _raise_for_status(resp)
192        return MetaResponse.model_validate(resp.json())
193
194    def cluster_info(self) -> ClusterInfo:
195        """Aggregate worker-node capacity / utilisation (the public
196        capacity banner; no per-node detail)."""
197        resp = self.request("GET", "/cluster-info")
198        _raise_for_status(resp)
199        return ClusterInfo.model_validate(resp.json())
200
201    def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck:
202        """Compare this client's ``ctfy`` version against the server's.
203
204        Uses the cheap unauthenticated ``/health`` probe (pass an
205        already-fetched :class:`HealthResponse` to avoid a second round
206        trip). Pure classification — never prints, never raises on a
207        mismatch, never mutates anything. The caller (CLI, MCP, harness)
208        decides what to do with :class:`~ctfy.core.version.VersionCheck`
209        (typically: print ``.message`` to stderr when not ``.quiet``).
210
211        A server too old to report its version yields
212        :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which
213        is ``.quiet`` — so this stays silent against legacy servers
214        rather than crying wolf.
215        """
216        h = health if health is not None else self.health()
217        return check_compatibility(package_version(), h.version)
218
219    def events(self, *, auto_reconnect: bool = True) -> EventStream:
220        """Open the platform SSE event stream.
221
222        Yields ``{"event": <name>, "data": <dict>}`` for each frame.
223        Filters: team-scoped events for every team the caller's user is
224        on (across every per-comp competition), plus all global events;
225        admin tokens additionally see admin-only frames.
226
227        Usage::
228
229            with client.events() as stream:
230                for event in stream:
231                    if event["event"] == "solve":
232                        ...
233
234        Auto-reconnects on transient network errors with exponential
235        backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make
236        the iterator raise instead.
237        """
238        token = self._token
239
240        def _factory() -> httpx.Client:
241            # New client per session so a reconnect after a stale
242            # connection drops fresh sockets, not warmed-over ones.
243            return httpx.Client(base_url=self._base_url, timeout=None)
244
245        return EventStream(
246            client_factory=_factory,
247            path="/events",
248            token=token,
249            auto_reconnect=auto_reconnect,
250        )

Player-facing HTTP client for the ctfy platform.

Methods are grouped into resource namespaces (client.teams.list(), …) plus a per-competition scope via client.competition(id). The admin surface is a separate ~ctfy.sdk.admin.AdminClient reached via admin. Used by the CLI, the SDK, the MCP server, and external callers.

PlatformClient( server_url: str, token: str = '', max_retries: int = 3, timeout: int = 600)
86    def __init__(
87        self,
88        server_url: str,
89        token: str = "",
90        max_retries: int = 3,
91        timeout: int = DEFAULT_CLIENT_TIMEOUT,
92    ):
93        self._url = server_url.rstrip("/")
94        super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries)
 98    @cached_property
 99    def teams(self) -> TeamsResource:
100        """Public team discovery + profile reads."""
101        return TeamsResource(self)

Public team discovery + profile reads.

103    @cached_property
104    def users(self) -> UsersResource:
105        """Public per-user profile reads."""
106        return UsersResource(self)

Public per-user profile reads.

108    @cached_property
109    def me(self) -> MeResource:
110        """The calling user's profile, inbox, account, and progress."""
111        return MeResource(self)

The calling user's profile, inbox, account, and progress.

113    @cached_property
114    def auth(self) -> AuthResource:
115        """Password auth, OAuth identities, API tokens, sign-in discovery."""
116        return AuthResource(self)

Password auth, OAuth identities, API tokens, sign-in discovery.

118    @cached_property
119    def achievements(self) -> AchievementsResource:
120        """Public badge catalog, recent-unlock feed, easter eggs."""
121        return AchievementsResource(self)

Public badge catalog, recent-unlock feed, easter eggs.

123    @cached_property
124    def challenges(self) -> ChallengesResource:
125        """Global challenge catalog, attachments, facets, feedback chips."""
126        return ChallengesResource(self)

Global challenge catalog, attachments, facets, feedback chips.

128    @cached_property
129    def competitions(self) -> CompetitionsResource:
130        """Discover competitions (``list`` / ``get``). To act within one, use
131        :meth:`competition`."""
132        return CompetitionsResource(self)

Discover competitions (list / get). To act within one, use competition().

def competition(self, competition_id: str) -> ctfy.sdk.competition.Competition:
134    def competition(self, competition_id: str) -> Competition:
135        """Scope to one competition. The inherently competition-scoped
136        operations — register, your team (captain tooling + invites +
137        join-requests), the comp's challenges, team search, and standings —
138        hang off the returned :class:`~ctfy.sdk.competition.Competition`
139        handle, so ``competition_id`` is named once instead of on every call.
140        (Instance lifecycle and answer submission stay on :attr:`instances` /
141        :attr:`submissions`, keyed by ``instance_id``.)"""
142        return Competition(self, competition_id)

Scope to one competition. The inherently competition-scoped operations — register, your team (captain tooling + invites + join-requests), the comp's challenges, team search, and standings — hang off the returned ~ctfy.sdk.competition.Competition handle, so competition_id is named once instead of on every call. (Instance lifecycle and answer submission stay on instances / submissions, keyed by instance_id.)

144    @cached_property
145    def instances(self) -> InstancesResource:
146        """Launch / control / inspect challenge instances by instance id
147        (the competition is inferred server-side)."""
148        return InstancesResource(self)

Launch / control / inspect challenge instances by instance id (the competition is inferred server-side).

150    @cached_property
151    def submissions(self) -> SubmissionsResource:
152        """Submit / verify answers + list submissions (by instance id), plus
153        the instance-less QA challenges."""
154        return SubmissionsResource(self)

Submit / verify answers + list submissions (by instance id), plus the instance-less QA challenges.

156    @cached_property
157    def scoreboard(self) -> ScoreboardResource:
158        """Global standings, per-challenge stats, persisted snapshots."""
159        return ScoreboardResource(self)

Global standings, per-challenge stats, persisted snapshots.

161    @cached_property
162    def activities(self) -> ActivitiesResource:
163        """Platform activity log + histogram."""
164        return ActivitiesResource(self)

Platform activity log + histogram.

166    @cached_property
167    def nodes(self) -> NodesResource:
168        """Public worker-node list (operator node mgmt is on ``admin.nodes``)."""
169        return NodesResource(self)

Public worker-node list (operator node mgmt is on admin.nodes).

admin: AdminClient
171    @cached_property
172    def admin(self) -> AdminClient:
173        """Admin / operator surface (separate, role-gated server-side).
174
175        Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`.
176        """
177        return AdminClient(self)

Admin / operator surface (separate, role-gated server-side).

Shares this client's transport. See ~ctfy.sdk.admin.AdminClient.

def health(self) -> ctfy.server.models.HealthResponse:
181    def health(self) -> HealthResponse:
182        resp = self.request("GET", "/health")
183        _raise_for_status(resp)
184        return HealthResponse.model_validate(resp.json())
def get_meta(self) -> ctfy.server.models.MetaResponse:
186    def get_meta(self) -> MetaResponse:
187        """Server identity + challenge repo SHA + build version.
188        Admin tokens additionally see cluster capacity + team / solve
189        counts in the same payload."""
190        resp = self.request("GET", "/meta")
191        _raise_for_status(resp)
192        return MetaResponse.model_validate(resp.json())

Server identity + challenge repo SHA + build version. Admin tokens additionally see cluster capacity + team / solve counts in the same payload.

def cluster_info(self) -> ctfy.server.models.ClusterInfo:
194    def cluster_info(self) -> ClusterInfo:
195        """Aggregate worker-node capacity / utilisation (the public
196        capacity banner; no per-node detail)."""
197        resp = self.request("GET", "/cluster-info")
198        _raise_for_status(resp)
199        return ClusterInfo.model_validate(resp.json())

Aggregate worker-node capacity / utilisation (the public capacity banner; no per-node detail).

def check_server_compatibility( self, *, health: ctfy.server.models.HealthResponse | None = None) -> ctfy.core.version.VersionCheck:
201    def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck:
202        """Compare this client's ``ctfy`` version against the server's.
203
204        Uses the cheap unauthenticated ``/health`` probe (pass an
205        already-fetched :class:`HealthResponse` to avoid a second round
206        trip). Pure classification — never prints, never raises on a
207        mismatch, never mutates anything. The caller (CLI, MCP, harness)
208        decides what to do with :class:`~ctfy.core.version.VersionCheck`
209        (typically: print ``.message`` to stderr when not ``.quiet``).
210
211        A server too old to report its version yields
212        :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which
213        is ``.quiet`` — so this stays silent against legacy servers
214        rather than crying wolf.
215        """
216        h = health if health is not None else self.health()
217        return check_compatibility(package_version(), h.version)

Compare this client's ctfy version against the server's.

Uses the cheap unauthenticated /health probe (pass an already-fetched HealthResponse to avoid a second round trip). Pure classification — never prints, never raises on a mismatch, never mutates anything. The caller (CLI, MCP, harness) decides what to do with ~ctfy.core.version.VersionCheck (typically: print .message to stderr when not .quiet).

A server too old to report its version yields Compatibility.UNKNOWN (health.version == ""), which is .quiet — so this stays silent against legacy servers rather than crying wolf.

def events(self, *, auto_reconnect: bool = True) -> ctfy.sdk.events.EventStream:
219    def events(self, *, auto_reconnect: bool = True) -> EventStream:
220        """Open the platform SSE event stream.
221
222        Yields ``{"event": <name>, "data": <dict>}`` for each frame.
223        Filters: team-scoped events for every team the caller's user is
224        on (across every per-comp competition), plus all global events;
225        admin tokens additionally see admin-only frames.
226
227        Usage::
228
229            with client.events() as stream:
230                for event in stream:
231                    if event["event"] == "solve":
232                        ...
233
234        Auto-reconnects on transient network errors with exponential
235        backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make
236        the iterator raise instead.
237        """
238        token = self._token
239
240        def _factory() -> httpx.Client:
241            # New client per session so a reconnect after a stale
242            # connection drops fresh sockets, not warmed-over ones.
243            return httpx.Client(base_url=self._base_url, timeout=None)
244
245        return EventStream(
246            client_factory=_factory,
247            path="/events",
248            token=token,
249            auto_reconnect=auto_reconnect,
250        )

Open the platform SSE event stream.

Yields {"event": <name>, "data": <dict>} for each frame. Filters: team-scoped events for every team the caller's user is on (across every per-comp competition), plus all global events; admin tokens additionally see admin-only frames.

Usage::

with client.events() as stream:
    for event in stream:
        if event["event"] == "solve":
            ...

Auto-reconnects on transient network errors with exponential backoff (1s → 30s capped). Set auto_reconnect=False to make the iterator raise instead.