ctfy.sdk
Python SDK for the ctfy platform.
Drives the platform's /api/v1/* REST API from Python with typed Pydantic
returns and auto-retry on transient errors. Two clients, split by audience —
the same line the ctfy / ctfy-admin CLI binaries draw:
~ctfy.sdk.client.PlatformClient— the player client. This is what agents and players want. Methods are grouped into resource namespaces (client.teams,client.instances, …) plus a per-competition handle viaclient.competition(id).~ctfy.sdk.admin.AdminClient— the operator client (/api/v1/admin/*and fleet management). Reached viaclient.adminorAdminClient.connect(). Kept separate so the player reference never surfaces admin endpoints they can't call.
Install
The full ctfy package pulls in the server too. For an agent /
harness that only needs the client, use the client extra::
pip install "ctfy[client]"
Authenticate
Sign in to the platform in the browser, then go to Settings → API
tokens and mint a fine-grained token (prefix pf_). These tokens
launch instances and submit answers on the team's behalf but cannot
mint more tokens, re-link OAuth providers, or change team
membership — so a leaked token never costs you the account.
Export it as CTFY_TOKEN for the CLI / MCP server, or pass it
directly to PlatformClient.
Quick start
End-to-end: discover a competition, register, list its challenges, launch one, read the attack surface, submit a captured flag::
from ctfy.sdk import PlatformClient
client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")
# 1) Find a running competition
comps = client.competitions.list(phase="running")
comp = comps[0]
print(comp.id, comp.title)
# 2) Scope to the competition, then register Solo (skip if already on a
# team: inspect ``client.me.get().competition_teams`` first)
comp_api = client.competition(comp.id)
team = comp_api.register(mode="solo")
print(team.id)
# 3) List the challenges scoped to this competition (curated order)
challenges = comp_api.challenges()
for ch in challenges:
print(f" {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")
# 4) Launch the first challenge — blocks until ready
ready = client.instances.start(
challenges[0].id, competition_id=comp.id
)
print(f"instance {ready.id}")
# 5) Read the attack surface (services + credentials, plus VPN for
# engagement-mode challenges)
for svc in ready.surface.services:
print(f" [{svc.service_type.value}] {svc.url}")
if svc.credentials:
print(f" {svc.credentials.username}/{svc.credentials.password}")
if ready.surface.vpn:
print(f" VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")
# 6) Submit a captured flag (oracle-probe via
# ``client.submissions.verify()`` first if you don't want to burn
# an audit row)
result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
print(f"correct={result.correct} solved={result.challenge_fully_solved}")
# 7) Tear down (or let TTL expire — default 24h, admin-tunable)
client.instances.stop(ready.id)
A runnable, interactive copy of this script lives at
examples/quickstart.py.
Player surface
Global / account namespaces on ~ctfy.sdk.client.PlatformClient:
client.teams/client.users— public team + user profile reads.client.me— the caller's profile, inbox, account export/delete, achievements, solve + milestone progress.client.auth— password register/login, OAuth identities, API token CRUD, sign-in discovery.client.competitions— discover competitions (list/get).client.challenges— global catalog, attachments, facets, feedback chips.client.instances—start(blocks until ready),status,stop,renew,list(by instance id; the competition is inferred).client.submissions—submit(graded),verify(oracle probe, no record),list, and QA challenges.client.scoreboard— global standings, per-challenge stats, snapshots.client.achievements/client.activities/client.nodes— public badge catalog, activity log, worker-node list.
Per-competition scope — client.competition(id) returns a
~ctfy.sdk.competition.Competition handle for the operations that are
inherently competition-scoped, so competition_id is named once:
comp.register(mode="solo" | "create" | "join", …)— get on a team.comp.team— your team:rename/leave/kick, pluscomp.team.invites(codes + email invites) andcomp.team.requests(approve / reject join requests).comp.challenges()/comp.search_teams()and the standings (comp.scoreboard()/score_history/score_distribution/solve_matrix/challenge_breakdown).
Top-level convenience methods stay flat: client.health(),
client.get_meta(), client.cluster_info(), client.events(),
client.check_server_compatibility().
Admin surface
client.admin (an ~ctfy.sdk.admin.AdminClient) carries the
operator endpoints, grouped the same way: admin.users,
admin.competitions, admin.announcements, admin.achievements,
admin.challenges, admin.instances, admin.records (instance
forensics), admin.nodes, admin.observability, admin.settings,
admin.competition_admins. Operator tooling without a player client can use
AdminClient.connect(url, token=...).
Realtime
~~~~
PlatformClient.events() yields an auto-reconnecting
~ctfy.sdk.events.EventStream over the SSE endpoint —
solve broadcasts, instance state changes, scoreboard refreshes.
See also
PlatformClient— the player client; every namespace method has full docstrings.AdminClient— the operator client.ctfy.server.models— the typed response shapes (ChallengeInfo,InstanceInfo,SubmissionResponse, …) that SDK methods return. Re-using the server's Pydantic models means the wire shape can never drift between the two sides.<platform>/docs— auto-generated FastAPI Swagger UI for the raw REST API (language-agnostic).<platform>/api/v1/mcp/— streamable-HTTP MCP endpoint with the same operations as MCP tools, if the agent speaks MCP instead of Python.~ctfy.sdk.node_client.NodeClient— separate client for worker-node ↔ platform RPCs; not what agent harnesses want.
1"""Python SDK for the ctfy platform. 2 3Drives the platform's ``/api/v1/*`` REST API from Python with typed Pydantic 4returns and auto-retry on transient errors. Two clients, split by audience — 5the same line the ``ctfy`` / ``ctfy-admin`` CLI binaries draw: 6 7- :class:`~ctfy.sdk.client.PlatformClient` — the **player** client. This is 8 what agents and players want. Methods are grouped into resource namespaces 9 (``client.teams``, ``client.instances``, …) plus a per-competition handle 10 via ``client.competition(id)``. 11- :class:`~ctfy.sdk.admin.AdminClient` — the **operator** client 12 (``/api/v1/admin/*`` and fleet management). Reached via ``client.admin`` or 13 :meth:`AdminClient.connect`. Kept separate so the player reference never 14 surfaces admin endpoints they can't call. 15 16Install 17------- 18 19The full ``ctfy`` package pulls in the server too. For an agent / 20harness that only needs the client, use the ``client`` extra:: 21 22 pip install "ctfy[client]" 23 24Authenticate 25------------ 26 27Sign in to the platform in the browser, then go to **Settings → API 28tokens** and mint a fine-grained token (prefix ``pf_``). These tokens 29launch instances and submit answers on the team's behalf but cannot 30mint more tokens, re-link OAuth providers, or change team 31membership — so a leaked token never costs you the account. 32 33Export it as ``CTFY_TOKEN`` for the CLI / MCP server, or pass it 34directly to :class:`PlatformClient`. 35 36Quick start 37----------- 38 39End-to-end: discover a competition, register, list its challenges, 40launch one, read the attack surface, submit a captured flag:: 41 42 from ctfy.sdk import PlatformClient 43 44 client = PlatformClient("https://ctfy.authu.online", token="pf_xxx") 45 46 # 1) Find a running competition 47 comps = client.competitions.list(phase="running") 48 comp = comps[0] 49 print(comp.id, comp.title) 50 51 # 2) Scope to the competition, then register Solo (skip if already on a 52 # team: inspect ``client.me.get().competition_teams`` first) 53 comp_api = client.competition(comp.id) 54 team = comp_api.register(mode="solo") 55 print(team.id) 56 57 # 3) List the challenges scoped to this competition (curated order) 58 challenges = comp_api.challenges() 59 for ch in challenges: 60 print(f" {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}") 61 62 # 4) Launch the first challenge — blocks until ready 63 ready = client.instances.start( 64 challenges[0].id, competition_id=comp.id 65 ) 66 print(f"instance {ready.id}") 67 68 # 5) Read the attack surface (services + credentials, plus VPN for 69 # engagement-mode challenges) 70 for svc in ready.surface.services: 71 print(f" [{svc.service_type.value}] {svc.url}") 72 if svc.credentials: 73 print(f" {svc.credentials.username}/{svc.credentials.password}") 74 if ready.surface.vpn: 75 print(f" VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}") 76 77 # 6) Submit a captured flag (oracle-probe via 78 # ``client.submissions.verify()`` first if you don't want to burn 79 # an audit row) 80 result = client.submissions.submit(ready.id, "FLAG{your_flag_here}") 81 print(f"correct={result.correct} solved={result.challenge_fully_solved}") 82 83 # 7) Tear down (or let TTL expire — default 24h, admin-tunable) 84 client.instances.stop(ready.id) 85 86A runnable, interactive copy of this script lives at 87``examples/quickstart.py``. 88 89Player surface 90-------------- 91 92Global / account namespaces on :class:`~ctfy.sdk.client.PlatformClient`: 93 94- ``client.teams`` / ``client.users`` — public team + user profile reads. 95- ``client.me`` — the caller's profile, inbox, account export/delete, 96 achievements, solve + milestone progress. 97- ``client.auth`` — password register/login, OAuth identities, API token 98 CRUD, sign-in discovery. 99- ``client.competitions`` — discover competitions (``list`` / ``get``). 100- ``client.challenges`` — global catalog, attachments, facets, feedback chips. 101- ``client.instances`` — ``start`` (blocks until ready), ``status``, ``stop``, 102 ``renew``, ``list`` (by instance id; the competition is inferred). 103- ``client.submissions`` — ``submit`` (graded), ``verify`` (oracle probe, 104 no record), ``list``, and QA challenges. 105- ``client.scoreboard`` — global standings, per-challenge stats, snapshots. 106- ``client.achievements`` / ``client.activities`` / ``client.nodes`` — public 107 badge catalog, activity log, worker-node list. 108 109Per-competition scope — ``client.competition(id)`` returns a 110:class:`~ctfy.sdk.competition.Competition` handle for the operations that are 111*inherently* competition-scoped, so ``competition_id`` is named once: 112 113- ``comp.register(mode="solo" | "create" | "join", …)`` — get on a team. 114- ``comp.team`` — your team: ``rename`` / ``leave`` / ``kick``, plus 115 ``comp.team.invites`` (codes + email invites) and ``comp.team.requests`` 116 (approve / reject join requests). 117- ``comp.challenges()`` / ``comp.search_teams()`` and the standings 118 (``comp.scoreboard()`` / ``score_history`` / ``score_distribution`` / 119 ``solve_matrix`` / ``challenge_breakdown``). 120 121Top-level convenience methods stay flat: ``client.health()``, 122``client.get_meta()``, ``client.cluster_info()``, ``client.events()``, 123``client.check_server_compatibility()``. 124 125Admin surface 126------------- 127 128``client.admin`` (an :class:`~ctfy.sdk.admin.AdminClient`) carries the 129operator endpoints, grouped the same way: ``admin.users``, 130``admin.competitions``, ``admin.announcements``, ``admin.achievements``, 131``admin.challenges``, ``admin.instances``, ``admin.records`` (instance 132forensics), ``admin.nodes``, ``admin.observability``, ``admin.settings``, 133``admin.competition_admins``. Operator tooling without a player client can use 134``AdminClient.connect(url, token=...)``. 135 136Realtime 137~~~~~~~~ 138 139:meth:`PlatformClient.events` yields an auto-reconnecting 140:class:`~ctfy.sdk.events.EventStream` over the SSE endpoint — 141solve broadcasts, instance state changes, scoreboard refreshes. 142 143See also 144-------- 145 146- :class:`PlatformClient` — the player client; every namespace method has 147 full docstrings. 148- :class:`AdminClient` — the operator client. 149- ``ctfy.server.models`` — the typed response shapes 150 (``ChallengeInfo``, ``InstanceInfo``, ``SubmissionResponse``, …) 151 that SDK methods return. Re-using the server's Pydantic models means 152 the wire shape can never drift between the two sides. 153- ``<platform>/docs`` — auto-generated FastAPI Swagger UI for the 154 raw REST API (language-agnostic). 155- ``<platform>/api/v1/mcp/`` — streamable-HTTP MCP endpoint with 156 the same operations as MCP tools, if the agent speaks MCP 157 instead of Python. 158- :class:`~ctfy.sdk.node_client.NodeClient` — separate client for 159 worker-node ↔ platform RPCs; not what agent harnesses want. 160""" 161 162from ctfy.sdk.admin import AdminClient 163from ctfy.sdk.client import PlatformClient 164from ctfy.sdk.node_client import NodeClient 165 166__all__ = ["AdminClient", "NodeClient", "PlatformClient"]
60class AdminClient: 61 """Admin / operator surface, grouped into resource namespaces. 62 63 Namespaces: :attr:`users`, :attr:`competitions`, :attr:`announcements`, 64 :attr:`achievements`, :attr:`challenges`, :attr:`instances`, 65 :attr:`records`, :attr:`nodes`, :attr:`observability`, :attr:`settings`, 66 :attr:`competition_admins`. 67 """ 68 69 def __init__(self, http: BaseHttpClient) -> None: 70 #: Shared transport (the parent PlatformClient when reached via 71 #: ``client.admin``, or an owned client when built via ``connect``). 72 self._http = http 73 74 @classmethod 75 def connect( 76 cls, 77 server_url: str, 78 token: str = "", 79 *, 80 max_retries: int = 3, 81 timeout: int = DEFAULT_CLIENT_TIMEOUT, 82 ) -> AdminClient: 83 """Build an ``AdminClient`` that owns its own transport. 84 85 For operator tooling that doesn't already hold a 86 :class:`PlatformClient`. The returned client is a context manager 87 that closes its transport on exit. 88 """ 89 base = server_url.rstrip("/") 90 http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries) 91 return cls(http) 92 93 @cached_property 94 def users(self) -> AdminUsersResource: 95 return AdminUsersResource(self._http) 96 97 @cached_property 98 def competitions(self) -> AdminCompetitionsResource: 99 return AdminCompetitionsResource(self._http) 100 101 @cached_property 102 def announcements(self) -> AdminAnnouncementsResource: 103 return AdminAnnouncementsResource(self._http) 104 105 @cached_property 106 def achievements(self) -> AdminAchievementsResource: 107 return AdminAchievementsResource(self._http) 108 109 @cached_property 110 def challenges(self) -> AdminChallengesResource: 111 return AdminChallengesResource(self._http) 112 113 @cached_property 114 def instances(self) -> AdminInstancesResource: 115 return AdminInstancesResource(self._http) 116 117 @cached_property 118 def records(self) -> AdminRecordsResource: 119 return AdminRecordsResource(self._http) 120 121 @cached_property 122 def nodes(self) -> AdminNodesResource: 123 return AdminNodesResource(self._http) 124 125 @cached_property 126 def observability(self) -> AdminObservabilityResource: 127 return AdminObservabilityResource(self._http) 128 129 @cached_property 130 def settings(self) -> AdminSettingsResource: 131 return AdminSettingsResource(self._http) 132 133 @cached_property 134 def competition_admins(self) -> AdminCompetitionAdminsResource: 135 return AdminCompetitionAdminsResource(self._http) 136 137 @cached_property 138 def competition_invites(self) -> AdminCompetitionInvitesResource: 139 return AdminCompetitionInvitesResource(self._http) 140 141 @cached_property 142 def tasks(self) -> AdminTasksResource: 143 return AdminTasksResource(self._http) 144 145 @cached_property 146 def scheduled_jobs(self) -> AdminScheduledJobsResource: 147 return AdminScheduledJobsResource(self._http) 148 149 def close(self) -> None: 150 """Close the underlying transport. 151 152 Only call this on a standalone client built via :meth:`connect`; 153 when reached via ``PlatformClient.admin`` the transport is shared 154 with — and closed by — the parent player client. 155 """ 156 self._http.close() 157 158 def __enter__(self) -> Self: 159 return self 160 161 def __exit__( 162 self, 163 exc_type: type[BaseException] | None, 164 exc: BaseException | None, 165 tb: TracebackType | None, 166 ) -> None: 167 self.close()
Admin / operator surface, grouped into resource namespaces.
Namespaces: users, competitions, announcements,
achievements, challenges, instances,
records, nodes, observability, settings,
competition_admins.
74 @classmethod 75 def connect( 76 cls, 77 server_url: str, 78 token: str = "", 79 *, 80 max_retries: int = 3, 81 timeout: int = DEFAULT_CLIENT_TIMEOUT, 82 ) -> AdminClient: 83 """Build an ``AdminClient`` that owns its own transport. 84 85 For operator tooling that doesn't already hold a 86 :class:`PlatformClient`. The returned client is a context manager 87 that closes its transport on exit. 88 """ 89 base = server_url.rstrip("/") 90 http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries) 91 return cls(http)
Build an AdminClient that owns its own transport.
For operator tooling that doesn't already hold a
PlatformClient. The returned client is a context manager
that closes its transport on exit.
149 def close(self) -> None: 150 """Close the underlying transport. 151 152 Only call this on a standalone client built via :meth:`connect`; 153 when reached via ``PlatformClient.admin`` the transport is shared 154 with — and closed by — the parent player client. 155 """ 156 self._http.close()
Close the underlying transport.
Only call this on a standalone client built via connect();
when reached via PlatformClient.admin the transport is shared
with — and closed by — the parent player client.
48class NodeClient(BaseHttpClient): 49 """Thin sync HTTP client; one instance per node URL.""" 50 51 def __init__( 52 self, 53 node_url: str, 54 token: str, 55 *, 56 timeout: int = DEFAULT_CLIENT_TIMEOUT, 57 ) -> None: 58 super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout) 59 60 # -- lifecycle ---------------------------------------------------------- 61 62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body 85 86 def stop_instance(self, instance_id: str) -> None: 87 resp = self.request("DELETE", f"/instances/{instance_id}") 88 resp.raise_for_status() 89 90 def stop_all(self) -> None: 91 resp = self.request("POST", "/admin/stop-all") 92 resp.raise_for_status() 93 94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body 103 104 # -- admin pre-build (image cache warming) ------------------------------ 105 106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body 118 119 def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 120 """Queue background pre-build on the node. 121 122 With ``challenge_ids=None`` the node builds every spec it knows. 123 With an explicit list (the platform scoping a bulk build to a 124 competition's challenge set) only those are built. Returns 125 ``{queued, skipped_built, skipped_in_progress}`` so the platform 126 can report per-node what got picked up. Sequential on the node 127 side — no fan-out across the corpus. 128 """ 129 kwargs: dict[str, Any] = {} 130 if challenge_ids is not None: 131 kwargs["json"] = {"challenge_ids": challenge_ids} 132 resp = self.request("POST", "/admin/challenges/build-all", **kwargs) 133 _raise_for_node_status(resp) 134 body: dict[str, Any] = resp.json() 135 return body 136 137 def get_build_state(self) -> dict[str, Any]: 138 """Fetch every per-challenge build-state row on this node. 139 140 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 141 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 142 ``failed``; ``unbuilt`` placeholders are synthesised for specs 143 the node has seen but never been asked to build. 144 """ 145 resp = self.request("GET", "/admin/challenges/build-state") 146 _raise_for_node_status(resp) 147 body: dict[str, Any] = resp.json() 148 return body 149 150 def pull_challenge(self, challenge_id: str) -> dict[str, Any]: 151 """Ask the node to pre-pull registry images for *challenge_id*. 152 153 Pull-side twin of :meth:`build_challenge`: the node persists 154 ``status="pulling"`` and spawns a daemon thread; poll 155 :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing. 156 """ 157 resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull") 158 _raise_for_node_status(resp) 159 body: dict[str, Any] = resp.json() 160 return body 161 162 def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 163 """Queue background pre-pull on the node. 164 165 With ``challenge_ids=None`` the node pulls every spec it knows; 166 with an explicit list only those (the platform scoping to a 167 competition). Returns ``{queued, skipped_pulled, 168 skipped_in_progress}``. 169 """ 170 kwargs: dict[str, Any] = {} 171 if challenge_ids is not None: 172 kwargs["json"] = {"challenge_ids": challenge_ids} 173 resp = self.request("POST", "/admin/challenges/pull-all", **kwargs) 174 _raise_for_node_status(resp) 175 body: dict[str, Any] = resp.json() 176 return body 177 178 def get_pull_state(self) -> dict[str, Any]: 179 """Fetch every per-challenge pull-state row on this node. 180 181 Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``. 182 ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` / 183 ``failed``; ``unpulled`` placeholders are synthesised for specs 184 the node has seen but never been asked to pull. 185 """ 186 resp = self.request("GET", "/admin/challenges/pull-state") 187 _raise_for_node_status(resp) 188 body: dict[str, Any] = resp.json() 189 return body 190 191 # -- status / health ---------------------------------------------------- 192 193 def get_status(self, instance_id: str) -> dict[str, Any]: 194 resp = self.request("GET", f"/instances/{instance_id}/status") 195 resp.raise_for_status() 196 body: dict[str, Any] = resp.json() 197 return body 198 199 def check_health(self, instance_id: str) -> bool: 200 resp = self.request("GET", f"/instances/{instance_id}/health") 201 resp.raise_for_status() 202 return bool(resp.json().get("is_healthy")) 203 204 def node_health(self) -> dict[str, Any]: 205 """Liveness + ``{running, capacity}`` for heartbeat.""" 206 resp = self.request("GET", "/health") 207 resp.raise_for_status() 208 body: dict[str, Any] = resp.json() 209 return body 210 211 # -- traffic / runtime -------------------------------------------------- 212 213 def get_traffic(self, instance_id: str) -> dict[str, Any]: 214 """Fetch mitmproxy flow data; file lives on the node's FS.""" 215 resp = self.request("GET", f"/instances/{instance_id}/traffic") 216 resp.raise_for_status() 217 body: dict[str, Any] = resp.json() 218 return body 219 220 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 221 """Enumerate every container in an instance's compose project. 222 223 Backs the admin-shell feature: the platform forwards a 224 ``GET /admin/instances/{id}/containers`` to the assigned node, 225 which returns one row per container (challenge services and 226 platform-injected sidecars alike). The WebSocket reverse-proxy 227 is opened separately and does not go through ``NodeClient``. 228 """ 229 resp = self.request("GET", f"/instances/{instance_id}/containers") 230 _raise_for_node_status(resp) 231 body: list[dict[str, Any]] = resp.json() 232 return body 233 234 def get_container_logs(self, instance_id: str) -> str: 235 """Fetch combined container stdout/stderr for archival.""" 236 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 237 resp.raise_for_status() 238 return str(resp.json().get("logs") or "") 239 240 def get_pcap(self, instance_id: str) -> bytes: 241 """Fetch the raw tcpdump capture for *instance_id*. 242 243 Returns an empty ``bytes`` when no capture exists (sidecar 244 disabled, instance never reached the running state, etc.) so 245 callers can persist conditionally without try/except. 246 """ 247 resp = self.request("GET", f"/instances/{instance_id}/pcap") 248 if resp.status_code == 404: 249 return b"" 250 resp.raise_for_status() 251 return resp.content 252 253 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 254 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 255 CA PEM, optional Docker-specific names). Replaces the old 256 sandbox-network shape.""" 257 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 258 resp.raise_for_status() 259 body: dict[str, Any] = resp.json() 260 return body 261 262 # -- per-instance rendered attachments --------------------------------- 263 264 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 265 """List per-instance attachments rendered into the node's workdir. 266 267 Returns the raw JSON dict — caller projects through 268 :class:`AttachmentList` for typing. Used by the platform's 269 per-instance attachment endpoint to surface team-specific 270 rendered file listings.""" 271 resp = self.request("GET", f"/instances/{instance_id}/attachments") 272 resp.raise_for_status() 273 body: dict[str, Any] = resp.json() 274 return body 275 276 def get_openvpn_config(self, instance_id: str) -> bytes: 277 """Fetch the per-instance OpenVPN client config from the node. 278 279 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 280 ``openvpn`` service container (generated on first boot by the 281 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 282 verbatim — the platform-side route adds the 283 ``Content-Disposition`` header before re-emitting to the player. 284 285 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 286 proxy route catches 404 and re-emits to the player, anything 287 else is treated as a 502. 288 """ 289 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 290 resp.raise_for_status() 291 return resp.content 292 293 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 294 """Download one per-instance attachment as ``(bytes, content_type)``. 295 296 Buffers the whole body — attachments are bounded (typical case 297 is a 10 KB binary or a small text file). For huge captures the 298 caller should hand the player a CDN URL instead. 299 300 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 301 proxy route catches 404 and re-emits to the player, anything 302 else is a 502.""" 303 resp = self.request( 304 "GET", 305 f"/instances/{instance_id}/attachments/{filename}", 306 ) 307 resp.raise_for_status() 308 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Thin sync HTTP client; one instance per node URL.
62 def start_instance( 63 self, 64 *, 65 challenge_id: str, 66 instance_id: str, 67 ttl: int, 68 answers: dict[str, str], 69 proxy_output_dir: str | None = None, 70 ) -> dict[str, Any]: 71 resp = self.request( 72 "POST", 73 "/instances", 74 json={ 75 "challenge_id": challenge_id, 76 "instance_id": instance_id, 77 "ttl": ttl, 78 "answers": answers, 79 "proxy_output_dir": proxy_output_dir, 80 }, 81 ) 82 _raise_for_node_status(resp) 83 body: dict[str, Any] = resp.json() 84 return body
94 def rescan_challenges(self) -> dict[str, Any]: 95 """Tell the node to drop its spec cache and re-scan challenges_dir. 96 97 Returns ``{total, added, removed}`` so the platform can report 98 the per-node outcome of a cluster-wide rescan.""" 99 resp = self.request("POST", "/admin/rescan-challenges") 100 resp.raise_for_status() 101 body: dict[str, Any] = resp.json() 102 return body
Tell the node to drop its spec cache and re-scan challenges_dir.
Returns {total, added, removed} so the platform can report
the per-node outcome of a cluster-wide rescan.
106 def build_challenge(self, challenge_id: str) -> dict[str, Any]: 107 """Ask the node to pre-build images for *challenge_id*. 108 109 Fires-and-returns: the node persists ``status="building"`` and 110 spawns a daemon thread; the body returned here is that initial 111 state row. Polling :meth:`get_build_state` is how the platform 112 learns when it lands on ``built`` / ``failed``. 113 """ 114 resp = self.request("POST", f"/admin/challenges/{challenge_id}/build") 115 _raise_for_node_status(resp) 116 body: dict[str, Any] = resp.json() 117 return body
Ask the node to pre-build images for challenge_id.
Fires-and-returns: the node persists status="building" and
spawns a daemon thread; the body returned here is that initial
state row. Polling get_build_state() is how the platform
learns when it lands on built / failed.
119 def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 120 """Queue background pre-build on the node. 121 122 With ``challenge_ids=None`` the node builds every spec it knows. 123 With an explicit list (the platform scoping a bulk build to a 124 competition's challenge set) only those are built. Returns 125 ``{queued, skipped_built, skipped_in_progress}`` so the platform 126 can report per-node what got picked up. Sequential on the node 127 side — no fan-out across the corpus. 128 """ 129 kwargs: dict[str, Any] = {} 130 if challenge_ids is not None: 131 kwargs["json"] = {"challenge_ids": challenge_ids} 132 resp = self.request("POST", "/admin/challenges/build-all", **kwargs) 133 _raise_for_node_status(resp) 134 body: dict[str, Any] = resp.json() 135 return body
Queue background pre-build on the node.
With challenge_ids=None the node builds every spec it knows.
With an explicit list (the platform scoping a bulk build to a
competition's challenge set) only those are built. Returns
{queued, skipped_built, skipped_in_progress} so the platform
can report per-node what got picked up. Sequential on the node
side — no fan-out across the corpus.
137 def get_build_state(self) -> dict[str, Any]: 138 """Fetch every per-challenge build-state row on this node. 139 140 Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``. 141 ``status`` is one of ``unbuilt`` / ``building`` / ``built`` / 142 ``failed``; ``unbuilt`` placeholders are synthesised for specs 143 the node has seen but never been asked to build. 144 """ 145 resp = self.request("GET", "/admin/challenges/build-state") 146 _raise_for_node_status(resp) 147 body: dict[str, Any] = resp.json() 148 return body
Fetch every per-challenge build-state row on this node.
Returns {"rows": [{challenge_id, status, built_at, error}, …]}.
status is one of unbuilt / building / built /
failed; unbuilt placeholders are synthesised for specs
the node has seen but never been asked to build.
150 def pull_challenge(self, challenge_id: str) -> dict[str, Any]: 151 """Ask the node to pre-pull registry images for *challenge_id*. 152 153 Pull-side twin of :meth:`build_challenge`: the node persists 154 ``status="pulling"`` and spawns a daemon thread; poll 155 :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing. 156 """ 157 resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull") 158 _raise_for_node_status(resp) 159 body: dict[str, Any] = resp.json() 160 return body
Ask the node to pre-pull registry images for challenge_id.
Pull-side twin of build_challenge(): the node persists
status="pulling" and spawns a daemon thread; poll
get_pull_state() for the pulled / failed landing.
162 def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]: 163 """Queue background pre-pull on the node. 164 165 With ``challenge_ids=None`` the node pulls every spec it knows; 166 with an explicit list only those (the platform scoping to a 167 competition). Returns ``{queued, skipped_pulled, 168 skipped_in_progress}``. 169 """ 170 kwargs: dict[str, Any] = {} 171 if challenge_ids is not None: 172 kwargs["json"] = {"challenge_ids": challenge_ids} 173 resp = self.request("POST", "/admin/challenges/pull-all", **kwargs) 174 _raise_for_node_status(resp) 175 body: dict[str, Any] = resp.json() 176 return body
Queue background pre-pull on the node.
With challenge_ids=None the node pulls every spec it knows;
with an explicit list only those (the platform scoping to a
competition). Returns {queued, skipped_pulled,
skipped_in_progress}.
178 def get_pull_state(self) -> dict[str, Any]: 179 """Fetch every per-challenge pull-state row on this node. 180 181 Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``. 182 ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` / 183 ``failed``; ``unpulled`` placeholders are synthesised for specs 184 the node has seen but never been asked to pull. 185 """ 186 resp = self.request("GET", "/admin/challenges/pull-state") 187 _raise_for_node_status(resp) 188 body: dict[str, Any] = resp.json() 189 return body
Fetch every per-challenge pull-state row on this node.
Returns {"rows": [{challenge_id, status, pulled_at, error}, …]}.
status is one of unpulled / pulling / pulled /
failed; unpulled placeholders are synthesised for specs
the node has seen but never been asked to pull.
204 def node_health(self) -> dict[str, Any]: 205 """Liveness + ``{running, capacity}`` for heartbeat.""" 206 resp = self.request("GET", "/health") 207 resp.raise_for_status() 208 body: dict[str, Any] = resp.json() 209 return body
Liveness + {running, capacity} for heartbeat.
213 def get_traffic(self, instance_id: str) -> dict[str, Any]: 214 """Fetch mitmproxy flow data; file lives on the node's FS.""" 215 resp = self.request("GET", f"/instances/{instance_id}/traffic") 216 resp.raise_for_status() 217 body: dict[str, Any] = resp.json() 218 return body
Fetch mitmproxy flow data; file lives on the node's FS.
220 def list_containers(self, instance_id: str) -> list[dict[str, Any]]: 221 """Enumerate every container in an instance's compose project. 222 223 Backs the admin-shell feature: the platform forwards a 224 ``GET /admin/instances/{id}/containers`` to the assigned node, 225 which returns one row per container (challenge services and 226 platform-injected sidecars alike). The WebSocket reverse-proxy 227 is opened separately and does not go through ``NodeClient``. 228 """ 229 resp = self.request("GET", f"/instances/{instance_id}/containers") 230 _raise_for_node_status(resp) 231 body: list[dict[str, Any]] = resp.json() 232 return body
Enumerate every container in an instance's compose project.
Backs the admin-shell feature: the platform forwards a
GET /admin/instances/{id}/containers to the assigned node,
which returns one row per container (challenge services and
platform-injected sidecars alike). The WebSocket reverse-proxy
is opened separately and does not go through NodeClient.
234 def get_container_logs(self, instance_id: str) -> str: 235 """Fetch combined container stdout/stderr for archival.""" 236 resp = self.request("GET", f"/instances/{instance_id}/container-logs") 237 resp.raise_for_status() 238 return str(resp.json().get("logs") or "")
Fetch combined container stdout/stderr for archival.
240 def get_pcap(self, instance_id: str) -> bytes: 241 """Fetch the raw tcpdump capture for *instance_id*. 242 243 Returns an empty ``bytes`` when no capture exists (sidecar 244 disabled, instance never reached the running state, etc.) so 245 callers can persist conditionally without try/except. 246 """ 247 resp = self.request("GET", f"/instances/{instance_id}/pcap") 248 if resp.status_code == 404: 249 return b"" 250 resp.raise_for_status() 251 return resp.content
Fetch the raw tcpdump capture for instance_id.
Returns an empty bytes when no capture exists (sidecar
disabled, instance never reached the running state, etc.) so
callers can persist conditionally without try/except.
253 def get_agent_runtime(self, instance_id: str) -> dict[str, Any]: 254 """Provider-agnostic runtime hints for attaching an agent (proxy URL, 255 CA PEM, optional Docker-specific names). Replaces the old 256 sandbox-network shape.""" 257 resp = self.request("GET", f"/instances/{instance_id}/agent-runtime") 258 resp.raise_for_status() 259 body: dict[str, Any] = resp.json() 260 return body
Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.
264 def list_instance_attachments(self, instance_id: str) -> dict[str, Any]: 265 """List per-instance attachments rendered into the node's workdir. 266 267 Returns the raw JSON dict — caller projects through 268 :class:`AttachmentList` for typing. Used by the platform's 269 per-instance attachment endpoint to surface team-specific 270 rendered file listings.""" 271 resp = self.request("GET", f"/instances/{instance_id}/attachments") 272 resp.raise_for_status() 273 body: dict[str, Any] = resp.json() 274 return body
List per-instance attachments rendered into the node's workdir.
Returns the raw JSON dict — caller projects through
AttachmentList for typing. Used by the platform's
per-instance attachment endpoint to surface team-specific
rendered file listings.
276 def get_openvpn_config(self, instance_id: str) -> bytes: 277 """Fetch the per-instance OpenVPN client config from the node. 278 279 The node reads ``/etc/openvpn/client.ovpn`` out of the instance's 280 ``openvpn`` service container (generated on first boot by the 281 ``ctfy/openvpn-base`` image's bootstrap). Returns the body 282 verbatim — the platform-side route adds the 283 ``Content-Disposition`` header before re-emitting to the player. 284 285 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 286 proxy route catches 404 and re-emits to the player, anything 287 else is treated as a 502. 288 """ 289 resp = self.request("GET", f"/instances/{instance_id}/openvpn-config") 290 resp.raise_for_status() 291 return resp.content
Fetch the per-instance OpenVPN client config from the node.
The node reads /etc/openvpn/client.ovpn out of the instance's
openvpn service container (generated on first boot by the
ctfy/openvpn-base image's bootstrap). Returns the body
verbatim — the platform-side route adds the
Content-Disposition header before re-emitting to the player.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is treated as a 502.
293 def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]: 294 """Download one per-instance attachment as ``(bytes, content_type)``. 295 296 Buffers the whole body — attachments are bounded (typical case 297 is a 10 KB binary or a small text file). For huge captures the 298 caller should hand the player a CDN URL instead. 299 300 Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's 301 proxy route catches 404 and re-emits to the player, anything 302 else is a 502.""" 303 resp = self.request( 304 "GET", 305 f"/instances/{instance_id}/attachments/{filename}", 306 ) 307 resp.raise_for_status() 308 return resp.content, resp.headers.get("content-type", "application/octet-stream")
Download one per-instance attachment as (bytes, content_type).
Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.
Raises httpx.HTTPStatusError on non-2xx — the platform's
proxy route catches 404 and re-emits to the player, anything
else is a 502.
77class PlatformClient(BaseHttpClient): 78 """Player-facing HTTP client for the ctfy platform. 79 80 Methods are grouped into resource namespaces (``client.teams.list()``, …) 81 plus a per-competition scope via ``client.competition(id)``. The admin 82 surface is a separate :class:`~ctfy.sdk.admin.AdminClient` reached via 83 :attr:`admin`. Used by the CLI, the SDK, the MCP server, and external callers. 84 """ 85 86 def __init__( 87 self, 88 server_url: str, 89 token: str = "", 90 max_retries: int = 3, 91 timeout: int = DEFAULT_CLIENT_TIMEOUT, 92 ): 93 self._url = server_url.rstrip("/") 94 super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries) 95 96 # -- resource namespaces ------------------------------------------------ 97 98 @cached_property 99 def teams(self) -> TeamsResource: 100 """Public team discovery + profile reads.""" 101 return TeamsResource(self) 102 103 @cached_property 104 def users(self) -> UsersResource: 105 """Public per-user profile reads.""" 106 return UsersResource(self) 107 108 @cached_property 109 def me(self) -> MeResource: 110 """The calling user's profile, inbox, account, and progress.""" 111 return MeResource(self) 112 113 @cached_property 114 def auth(self) -> AuthResource: 115 """Password auth, OAuth identities, API tokens, sign-in discovery.""" 116 return AuthResource(self) 117 118 @cached_property 119 def achievements(self) -> AchievementsResource: 120 """Public badge catalog, recent-unlock feed, easter eggs.""" 121 return AchievementsResource(self) 122 123 @cached_property 124 def challenges(self) -> ChallengesResource: 125 """Global challenge catalog, attachments, facets, feedback chips.""" 126 return ChallengesResource(self) 127 128 @cached_property 129 def competitions(self) -> CompetitionsResource: 130 """Discover competitions (``list`` / ``get``). To act within one, use 131 :meth:`competition`.""" 132 return CompetitionsResource(self) 133 134 def competition(self, competition_id: str) -> Competition: 135 """Scope to one competition. The inherently competition-scoped 136 operations — register, your team (captain tooling + invites + 137 join-requests), the comp's challenges, team search, and standings — 138 hang off the returned :class:`~ctfy.sdk.competition.Competition` 139 handle, so ``competition_id`` is named once instead of on every call. 140 (Instance lifecycle and answer submission stay on :attr:`instances` / 141 :attr:`submissions`, keyed by ``instance_id``.)""" 142 return Competition(self, competition_id) 143 144 @cached_property 145 def instances(self) -> InstancesResource: 146 """Launch / control / inspect challenge instances by instance id 147 (the competition is inferred server-side).""" 148 return InstancesResource(self) 149 150 @cached_property 151 def submissions(self) -> SubmissionsResource: 152 """Submit / verify answers + list submissions (by instance id), plus 153 the instance-less QA challenges.""" 154 return SubmissionsResource(self) 155 156 @cached_property 157 def scoreboard(self) -> ScoreboardResource: 158 """Global standings, per-challenge stats, persisted snapshots.""" 159 return ScoreboardResource(self) 160 161 @cached_property 162 def activities(self) -> ActivitiesResource: 163 """Platform activity log + histogram.""" 164 return ActivitiesResource(self) 165 166 @cached_property 167 def nodes(self) -> NodesResource: 168 """Public worker-node list (operator node mgmt is on ``admin.nodes``).""" 169 return NodesResource(self) 170 171 @cached_property 172 def admin(self) -> AdminClient: 173 """Admin / operator surface (separate, role-gated server-side). 174 175 Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`. 176 """ 177 return AdminClient(self) 178 179 # -- top-level convenience: server status / realtime / version ---------- 180 181 def health(self) -> HealthResponse: 182 resp = self.request("GET", "/health") 183 _raise_for_status(resp) 184 return HealthResponse.model_validate(resp.json()) 185 186 def get_meta(self) -> MetaResponse: 187 """Server identity + challenge repo SHA + build version. 188 Admin tokens additionally see cluster capacity + team / solve 189 counts in the same payload.""" 190 resp = self.request("GET", "/meta") 191 _raise_for_status(resp) 192 return MetaResponse.model_validate(resp.json()) 193 194 def cluster_info(self) -> ClusterInfo: 195 """Aggregate worker-node capacity / utilisation (the public 196 capacity banner; no per-node detail).""" 197 resp = self.request("GET", "/cluster-info") 198 _raise_for_status(resp) 199 return ClusterInfo.model_validate(resp.json()) 200 201 def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck: 202 """Compare this client's ``ctfy`` version against the server's. 203 204 Uses the cheap unauthenticated ``/health`` probe (pass an 205 already-fetched :class:`HealthResponse` to avoid a second round 206 trip). Pure classification — never prints, never raises on a 207 mismatch, never mutates anything. The caller (CLI, MCP, harness) 208 decides what to do with :class:`~ctfy.core.version.VersionCheck` 209 (typically: print ``.message`` to stderr when not ``.quiet``). 210 211 A server too old to report its version yields 212 :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which 213 is ``.quiet`` — so this stays silent against legacy servers 214 rather than crying wolf. 215 """ 216 h = health if health is not None else self.health() 217 return check_compatibility(package_version(), h.version) 218 219 def events(self, *, auto_reconnect: bool = True) -> EventStream: 220 """Open the platform SSE event stream. 221 222 Yields ``{"event": <name>, "data": <dict>}`` for each frame. 223 Filters: team-scoped events for every team the caller's user is 224 on (across every per-comp competition), plus all global events; 225 admin tokens additionally see admin-only frames. 226 227 Usage:: 228 229 with client.events() as stream: 230 for event in stream: 231 if event["event"] == "solve": 232 ... 233 234 Auto-reconnects on transient network errors with exponential 235 backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make 236 the iterator raise instead. 237 """ 238 token = self._token 239 240 def _factory() -> httpx.Client: 241 # New client per session so a reconnect after a stale 242 # connection drops fresh sockets, not warmed-over ones. 243 return httpx.Client(base_url=self._base_url, timeout=None) 244 245 return EventStream( 246 client_factory=_factory, 247 path="/events", 248 token=token, 249 auto_reconnect=auto_reconnect, 250 )
Player-facing HTTP client for the ctfy platform.
Methods are grouped into resource namespaces (client.teams.list(), …)
plus a per-competition scope via client.competition(id). The admin
surface is a separate ~ctfy.sdk.admin.AdminClient reached via
admin. Used by the CLI, the SDK, the MCP server, and external callers.
98 @cached_property 99 def teams(self) -> TeamsResource: 100 """Public team discovery + profile reads.""" 101 return TeamsResource(self)
Public team discovery + profile reads.
103 @cached_property 104 def users(self) -> UsersResource: 105 """Public per-user profile reads.""" 106 return UsersResource(self)
Public per-user profile reads.
108 @cached_property 109 def me(self) -> MeResource: 110 """The calling user's profile, inbox, account, and progress.""" 111 return MeResource(self)
The calling user's profile, inbox, account, and progress.
113 @cached_property 114 def auth(self) -> AuthResource: 115 """Password auth, OAuth identities, API tokens, sign-in discovery.""" 116 return AuthResource(self)
Password auth, OAuth identities, API tokens, sign-in discovery.
118 @cached_property 119 def achievements(self) -> AchievementsResource: 120 """Public badge catalog, recent-unlock feed, easter eggs.""" 121 return AchievementsResource(self)
Public badge catalog, recent-unlock feed, easter eggs.
123 @cached_property 124 def challenges(self) -> ChallengesResource: 125 """Global challenge catalog, attachments, facets, feedback chips.""" 126 return ChallengesResource(self)
Global challenge catalog, attachments, facets, feedback chips.
128 @cached_property 129 def competitions(self) -> CompetitionsResource: 130 """Discover competitions (``list`` / ``get``). To act within one, use 131 :meth:`competition`.""" 132 return CompetitionsResource(self)
Discover competitions (list / get). To act within one, use
competition().
134 def competition(self, competition_id: str) -> Competition: 135 """Scope to one competition. The inherently competition-scoped 136 operations — register, your team (captain tooling + invites + 137 join-requests), the comp's challenges, team search, and standings — 138 hang off the returned :class:`~ctfy.sdk.competition.Competition` 139 handle, so ``competition_id`` is named once instead of on every call. 140 (Instance lifecycle and answer submission stay on :attr:`instances` / 141 :attr:`submissions`, keyed by ``instance_id``.)""" 142 return Competition(self, competition_id)
Scope to one competition. The inherently competition-scoped
operations — register, your team (captain tooling + invites +
join-requests), the comp's challenges, team search, and standings —
hang off the returned ~ctfy.sdk.competition.Competition
handle, so competition_id is named once instead of on every call.
(Instance lifecycle and answer submission stay on instances /
submissions, keyed by instance_id.)
144 @cached_property 145 def instances(self) -> InstancesResource: 146 """Launch / control / inspect challenge instances by instance id 147 (the competition is inferred server-side).""" 148 return InstancesResource(self)
Launch / control / inspect challenge instances by instance id (the competition is inferred server-side).
150 @cached_property 151 def submissions(self) -> SubmissionsResource: 152 """Submit / verify answers + list submissions (by instance id), plus 153 the instance-less QA challenges.""" 154 return SubmissionsResource(self)
Submit / verify answers + list submissions (by instance id), plus the instance-less QA challenges.
156 @cached_property 157 def scoreboard(self) -> ScoreboardResource: 158 """Global standings, per-challenge stats, persisted snapshots.""" 159 return ScoreboardResource(self)
Global standings, per-challenge stats, persisted snapshots.
161 @cached_property 162 def activities(self) -> ActivitiesResource: 163 """Platform activity log + histogram.""" 164 return ActivitiesResource(self)
Platform activity log + histogram.
166 @cached_property 167 def nodes(self) -> NodesResource: 168 """Public worker-node list (operator node mgmt is on ``admin.nodes``).""" 169 return NodesResource(self)
Public worker-node list (operator node mgmt is on admin.nodes).
171 @cached_property 172 def admin(self) -> AdminClient: 173 """Admin / operator surface (separate, role-gated server-side). 174 175 Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`. 176 """ 177 return AdminClient(self)
Admin / operator surface (separate, role-gated server-side).
Shares this client's transport. See ~ctfy.sdk.admin.AdminClient.
186 def get_meta(self) -> MetaResponse: 187 """Server identity + challenge repo SHA + build version. 188 Admin tokens additionally see cluster capacity + team / solve 189 counts in the same payload.""" 190 resp = self.request("GET", "/meta") 191 _raise_for_status(resp) 192 return MetaResponse.model_validate(resp.json())
Server identity + challenge repo SHA + build version. Admin tokens additionally see cluster capacity + team / solve counts in the same payload.
194 def cluster_info(self) -> ClusterInfo: 195 """Aggregate worker-node capacity / utilisation (the public 196 capacity banner; no per-node detail).""" 197 resp = self.request("GET", "/cluster-info") 198 _raise_for_status(resp) 199 return ClusterInfo.model_validate(resp.json())
Aggregate worker-node capacity / utilisation (the public capacity banner; no per-node detail).
201 def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck: 202 """Compare this client's ``ctfy`` version against the server's. 203 204 Uses the cheap unauthenticated ``/health`` probe (pass an 205 already-fetched :class:`HealthResponse` to avoid a second round 206 trip). Pure classification — never prints, never raises on a 207 mismatch, never mutates anything. The caller (CLI, MCP, harness) 208 decides what to do with :class:`~ctfy.core.version.VersionCheck` 209 (typically: print ``.message`` to stderr when not ``.quiet``). 210 211 A server too old to report its version yields 212 :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which 213 is ``.quiet`` — so this stays silent against legacy servers 214 rather than crying wolf. 215 """ 216 h = health if health is not None else self.health() 217 return check_compatibility(package_version(), h.version)
Compare this client's ctfy version against the server's.
Uses the cheap unauthenticated /health probe (pass an
already-fetched HealthResponse to avoid a second round
trip). Pure classification — never prints, never raises on a
mismatch, never mutates anything. The caller (CLI, MCP, harness)
decides what to do with ~ctfy.core.version.VersionCheck
(typically: print .message to stderr when not .quiet).
A server too old to report its version yields
Compatibility.UNKNOWN (health.version == ""), which
is .quiet — so this stays silent against legacy servers
rather than crying wolf.
219 def events(self, *, auto_reconnect: bool = True) -> EventStream: 220 """Open the platform SSE event stream. 221 222 Yields ``{"event": <name>, "data": <dict>}`` for each frame. 223 Filters: team-scoped events for every team the caller's user is 224 on (across every per-comp competition), plus all global events; 225 admin tokens additionally see admin-only frames. 226 227 Usage:: 228 229 with client.events() as stream: 230 for event in stream: 231 if event["event"] == "solve": 232 ... 233 234 Auto-reconnects on transient network errors with exponential 235 backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make 236 the iterator raise instead. 237 """ 238 token = self._token 239 240 def _factory() -> httpx.Client: 241 # New client per session so a reconnect after a stale 242 # connection drops fresh sockets, not warmed-over ones. 243 return httpx.Client(base_url=self._base_url, timeout=None) 244 245 return EventStream( 246 client_factory=_factory, 247 path="/events", 248 token=token, 249 auto_reconnect=auto_reconnect, 250 )
Open the platform SSE event stream.
Yields {"event": <name>, "data": <dict>} for each frame.
Filters: team-scoped events for every team the caller's user is
on (across every per-comp competition), plus all global events;
admin tokens additionally see admin-only frames.
Usage::
with client.events() as stream:
for event in stream:
if event["event"] == "solve":
...
Auto-reconnects on transient network errors with exponential
backoff (1s → 30s capped). Set auto_reconnect=False to make
the iterator raise instead.