ctfy.sdk

Python SDK for the ctfy platform.

Drives the platform's /api/v1/* REST API from Python with typed Pydantic returns and auto-retry on transient errors. Two clients, split by audience — the same line the ctfy / ctfy-admin CLI binaries draw:

  • ~ctfy.sdk.client.PlatformClient — the player client. This is what agents and players want. Methods are grouped into resource namespaces (client.teams, client.instances, …) plus a per-competition handle via client.competition(id).
  • ~ctfy.sdk.admin.AdminClient — the operator client (/api/v1/admin/* and fleet management). Reached via client.admin or AdminClient.connect(). Kept separate so the player reference never surfaces admin endpoints they can't call.

Install

The full ctfy package pulls in the server too. For an agent / harness that only needs the client, use the client extra::

pip install "ctfy[client]"

Authenticate

Sign in to the platform in the browser, then go to Settings → API tokens and mint a fine-grained token (prefix pf_). These tokens launch instances and submit answers on the team's behalf but cannot mint more tokens, re-link OAuth providers, or change team membership — so a leaked token never costs you the account.

Export it as CTFY_TOKEN for the CLI / MCP server, or pass it directly to PlatformClient.

Quick start

End-to-end: discover a competition, register, list its challenges, launch one, read the attack surface, submit a captured flag::

from ctfy.sdk import PlatformClient

client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")

# 1) Find a running competition
comps = client.competitions.list(phase="running")
comp = comps[0]
print(comp.id, comp.title)

# 2) Scope to the competition, then register Solo (skip if already on a
#    team: inspect ``client.me.get().competition_teams`` first)
comp_api = client.competition(comp.id)
team = comp_api.register(mode="solo")
print(team.id)

# 3) List the challenges scoped to this competition (curated order)
challenges = comp_api.challenges()
for ch in challenges:
    print(f"  {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")

# 4) Launch the first challenge — blocks until ready
ready = client.instances.start(
    challenges[0].id, competition_id=comp.id
)
print(f"instance {ready.id}")

# 5) Read the attack surface (services + credentials, plus VPN for
#    engagement-mode challenges)
for svc in ready.surface.services:
    print(f"  [{svc.service_type.value}] {svc.url}")
    if svc.credentials:
        print(f"    {svc.credentials.username}/{svc.credentials.password}")
if ready.surface.vpn:
    print(f"  VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")

# 6) Submit a captured flag (oracle-probe via
#    ``client.submissions.verify()`` first if you don't want to burn
#    an audit row)
result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
print(f"correct={result.correct} solved={result.challenge_fully_solved}")

# 7) Tear down (or let TTL expire — default 24h, admin-tunable)
client.instances.stop(ready.id)

A runnable, interactive copy of this script lives at examples/quickstart.py.

Player surface

Global / account namespaces on ~ctfy.sdk.client.PlatformClient:

  • client.teams / client.users — public team + user profile reads.
  • client.me — the caller's profile, inbox, account export/delete, achievements, solve + milestone progress.
  • client.auth — password register/login, OAuth identities, API token CRUD, sign-in discovery.
  • client.competitions — discover competitions (list / get).
  • client.challenges — global catalog, attachments, facets, feedback chips.
  • client.instancesstart (blocks until ready), status, stop, renew, list (by instance id; the competition is inferred).
  • client.submissionssubmit (graded), verify (oracle probe, no record), list, and QA challenges.
  • client.scoreboard — global standings, per-challenge stats, snapshots.
  • client.achievements / client.activities / client.nodes — public badge catalog, activity log, worker-node list.

Per-competition scope — client.competition(id) returns a ~ctfy.sdk.competition.Competition handle for the operations that are inherently competition-scoped, so competition_id is named once:

  • comp.register(mode="solo" | "create" | "join", …) — get on a team.
  • comp.team — your team: rename / leave / kick, plus comp.team.invites (codes + email invites) and comp.team.requests (approve / reject join requests).
  • comp.challenges() / comp.search_teams() and the standings (comp.scoreboard() / score_history / score_distribution / solve_matrix / challenge_breakdown).

Top-level convenience methods stay flat: client.health(), client.get_meta(), client.cluster_info(), client.events(), client.check_server_compatibility().

Admin surface

client.admin (an ~ctfy.sdk.admin.AdminClient) carries the operator endpoints, grouped the same way: admin.users, admin.competitions, admin.announcements, admin.achievements, admin.challenges, admin.instances, admin.records (instance forensics), admin.nodes, admin.observability, admin.settings, admin.competition_admins. Operator tooling without a player client can use AdminClient.connect(url, token=...).

Realtime ~~~~

PlatformClient.events() yields an auto-reconnecting ~ctfy.sdk.events.EventStream over the SSE endpoint — solve broadcasts, instance state changes, scoreboard refreshes.

See also

  • PlatformClient — the player client; every namespace method has full docstrings.
  • AdminClient — the operator client.
  • ctfy.server.models — the typed response shapes (ChallengeInfo, InstanceInfo, SubmissionResponse, …) that SDK methods return. Re-using the server's Pydantic models means the wire shape can never drift between the two sides.
  • <platform>/docs — auto-generated FastAPI Swagger UI for the raw REST API (language-agnostic).
  • <platform>/api/v1/mcp/ — streamable-HTTP MCP endpoint with the same operations as MCP tools, if the agent speaks MCP instead of Python.
  • ~ctfy.sdk.node_client.NodeClient — separate client for worker-node ↔ platform RPCs; not what agent harnesses want.
  1"""Python SDK for the ctfy platform.
  2
  3Drives the platform's ``/api/v1/*`` REST API from Python with typed Pydantic
  4returns and auto-retry on transient errors. Two clients, split by audience —
  5the same line the ``ctfy`` / ``ctfy-admin`` CLI binaries draw:
  6
  7- :class:`~ctfy.sdk.client.PlatformClient` — the **player** client. This is
  8  what agents and players want. Methods are grouped into resource namespaces
  9  (``client.teams``, ``client.instances``, …) plus a per-competition handle
 10  via ``client.competition(id)``.
 11- :class:`~ctfy.sdk.admin.AdminClient` — the **operator** client
 12  (``/api/v1/admin/*`` and fleet management). Reached via ``client.admin`` or
 13  :meth:`AdminClient.connect`. Kept separate so the player reference never
 14  surfaces admin endpoints they can't call.
 15
 16Install
 17-------
 18
 19The full ``ctfy`` package pulls in the server too. For an agent /
 20harness that only needs the client, use the ``client`` extra::
 21
 22    pip install "ctfy[client]"
 23
 24Authenticate
 25------------
 26
 27Sign in to the platform in the browser, then go to **Settings → API
 28tokens** and mint a fine-grained token (prefix ``pf_``). These tokens
 29launch instances and submit answers on the team's behalf but cannot
 30mint more tokens, re-link OAuth providers, or change team
 31membership — so a leaked token never costs you the account.
 32
 33Export it as ``CTFY_TOKEN`` for the CLI / MCP server, or pass it
 34directly to :class:`PlatformClient`.
 35
 36Quick start
 37-----------
 38
 39End-to-end: discover a competition, register, list its challenges,
 40launch one, read the attack surface, submit a captured flag::
 41
 42    from ctfy.sdk import PlatformClient
 43
 44    client = PlatformClient("https://ctfy.authu.online", token="pf_xxx")
 45
 46    # 1) Find a running competition
 47    comps = client.competitions.list(phase="running")
 48    comp = comps[0]
 49    print(comp.id, comp.title)
 50
 51    # 2) Scope to the competition, then register Solo (skip if already on a
 52    #    team: inspect ``client.me.get().competition_teams`` first)
 53    comp_api = client.competition(comp.id)
 54    team = comp_api.register(mode="solo")
 55    print(team.id)
 56
 57    # 3) List the challenges scoped to this competition (curated order)
 58    challenges = comp_api.challenges()
 59    for ch in challenges:
 60        print(f"  {ch.id} [{ch.category.value}/{ch.difficulty}] {ch.name}")
 61
 62    # 4) Launch the first challenge — blocks until ready
 63    ready = client.instances.start(
 64        challenges[0].id, competition_id=comp.id
 65    )
 66    print(f"instance {ready.id}")
 67
 68    # 5) Read the attack surface (services + credentials, plus VPN for
 69    #    engagement-mode challenges)
 70    for svc in ready.surface.services:
 71        print(f"  [{svc.service_type.value}] {svc.url}")
 72        if svc.credentials:
 73            print(f"    {svc.credentials.username}/{svc.credentials.password}")
 74    if ready.surface.vpn:
 75        print(f"  VPN: {ready.surface.vpn.host}:{ready.surface.vpn.port}")
 76
 77    # 6) Submit a captured flag (oracle-probe via
 78    #    ``client.submissions.verify()`` first if you don't want to burn
 79    #    an audit row)
 80    result = client.submissions.submit(ready.id, "FLAG{your_flag_here}")
 81    print(f"correct={result.correct} solved={result.challenge_fully_solved}")
 82
 83    # 7) Tear down (or let TTL expire — default 24h, admin-tunable)
 84    client.instances.stop(ready.id)
 85
 86A runnable, interactive copy of this script lives at
 87``examples/quickstart.py``.
 88
 89Player surface
 90--------------
 91
 92Global / account namespaces on :class:`~ctfy.sdk.client.PlatformClient`:
 93
 94- ``client.teams`` / ``client.users`` — public team + user profile reads.
 95- ``client.me`` — the caller's profile, inbox, account export/delete,
 96  achievements, solve + milestone progress.
 97- ``client.auth`` — password register/login, OAuth identities, API token
 98  CRUD, sign-in discovery.
 99- ``client.competitions`` — discover competitions (``list`` / ``get``).
100- ``client.challenges`` — global catalog, attachments, facets, feedback chips.
101- ``client.instances`` — ``start`` (blocks until ready), ``status``, ``stop``,
102  ``renew``, ``list`` (by instance id; the competition is inferred).
103- ``client.submissions`` — ``submit`` (graded), ``verify`` (oracle probe,
104  no record), ``list``, and QA challenges.
105- ``client.scoreboard`` — global standings, per-challenge stats, snapshots.
106- ``client.achievements`` / ``client.activities`` / ``client.nodes`` — public
107  badge catalog, activity log, worker-node list.
108
109Per-competition scope — ``client.competition(id)`` returns a
110:class:`~ctfy.sdk.competition.Competition` handle for the operations that are
111*inherently* competition-scoped, so ``competition_id`` is named once:
112
113- ``comp.register(mode="solo" | "create" | "join", …)`` — get on a team.
114- ``comp.team`` — your team: ``rename`` / ``leave`` / ``kick``, plus
115  ``comp.team.invites`` (codes + email invites) and ``comp.team.requests``
116  (approve / reject join requests).
117- ``comp.challenges()`` / ``comp.search_teams()`` and the standings
118  (``comp.scoreboard()`` / ``score_history`` / ``score_distribution`` /
119  ``solve_matrix`` / ``challenge_breakdown``).
120
121Top-level convenience methods stay flat: ``client.health()``,
122``client.get_meta()``, ``client.cluster_info()``, ``client.events()``,
123``client.check_server_compatibility()``.
124
125Admin surface
126-------------
127
128``client.admin`` (an :class:`~ctfy.sdk.admin.AdminClient`) carries the
129operator endpoints, grouped the same way: ``admin.users``,
130``admin.competitions``, ``admin.announcements``, ``admin.achievements``,
131``admin.challenges``, ``admin.instances``, ``admin.records`` (instance
132forensics), ``admin.nodes``, ``admin.observability``, ``admin.settings``,
133``admin.competition_admins``. Operator tooling without a player client can use
134``AdminClient.connect(url, token=...)``.
135
136Realtime
137~~~~~~~~
138
139:meth:`PlatformClient.events` yields an auto-reconnecting
140:class:`~ctfy.sdk.events.EventStream` over the SSE endpoint —
141solve broadcasts, instance state changes, scoreboard refreshes.
142
143See also
144--------
145
146- :class:`PlatformClient` — the player client; every namespace method has
147  full docstrings.
148- :class:`AdminClient` — the operator client.
149- ``ctfy.server.models`` — the typed response shapes
150  (``ChallengeInfo``, ``InstanceInfo``, ``SubmissionResponse``, …)
151  that SDK methods return. Re-using the server's Pydantic models means
152  the wire shape can never drift between the two sides.
153- ``<platform>/docs`` — auto-generated FastAPI Swagger UI for the
154  raw REST API (language-agnostic).
155- ``<platform>/api/v1/mcp/`` — streamable-HTTP MCP endpoint with
156  the same operations as MCP tools, if the agent speaks MCP
157  instead of Python.
158- :class:`~ctfy.sdk.node_client.NodeClient` — separate client for
159  worker-node ↔ platform RPCs; not what agent harnesses want.
160"""
161
162from ctfy.sdk.admin import AdminClient
163from ctfy.sdk.client import PlatformClient
164from ctfy.sdk.node_client import NodeClient
165
166__all__ = ["AdminClient", "NodeClient", "PlatformClient"]
class AdminClient:
 60class AdminClient:
 61    """Admin / operator surface, grouped into resource namespaces.
 62
 63    Namespaces: :attr:`users`, :attr:`competitions`, :attr:`announcements`,
 64    :attr:`achievements`, :attr:`challenges`, :attr:`instances`,
 65    :attr:`records`, :attr:`nodes`, :attr:`observability`, :attr:`settings`,
 66    :attr:`competition_admins`.
 67    """
 68
 69    def __init__(self, http: BaseHttpClient) -> None:
 70        #: Shared transport (the parent PlatformClient when reached via
 71        #: ``client.admin``, or an owned client when built via ``connect``).
 72        self._http = http
 73
 74    @classmethod
 75    def connect(
 76        cls,
 77        server_url: str,
 78        token: str = "",
 79        *,
 80        max_retries: int = 3,
 81        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 82    ) -> AdminClient:
 83        """Build an ``AdminClient`` that owns its own transport.
 84
 85        For operator tooling that doesn't already hold a
 86        :class:`PlatformClient`. The returned client is a context manager
 87        that closes its transport on exit.
 88        """
 89        base = server_url.rstrip("/")
 90        http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries)
 91        return cls(http)
 92
 93    @cached_property
 94    def users(self) -> AdminUsersResource:
 95        return AdminUsersResource(self._http)
 96
 97    @cached_property
 98    def competitions(self) -> AdminCompetitionsResource:
 99        return AdminCompetitionsResource(self._http)
100
101    @cached_property
102    def announcements(self) -> AdminAnnouncementsResource:
103        return AdminAnnouncementsResource(self._http)
104
105    @cached_property
106    def achievements(self) -> AdminAchievementsResource:
107        return AdminAchievementsResource(self._http)
108
109    @cached_property
110    def challenges(self) -> AdminChallengesResource:
111        return AdminChallengesResource(self._http)
112
113    @cached_property
114    def instances(self) -> AdminInstancesResource:
115        return AdminInstancesResource(self._http)
116
117    @cached_property
118    def records(self) -> AdminRecordsResource:
119        return AdminRecordsResource(self._http)
120
121    @cached_property
122    def nodes(self) -> AdminNodesResource:
123        return AdminNodesResource(self._http)
124
125    @cached_property
126    def observability(self) -> AdminObservabilityResource:
127        return AdminObservabilityResource(self._http)
128
129    @cached_property
130    def settings(self) -> AdminSettingsResource:
131        return AdminSettingsResource(self._http)
132
133    @cached_property
134    def competition_admins(self) -> AdminCompetitionAdminsResource:
135        return AdminCompetitionAdminsResource(self._http)
136
137    @cached_property
138    def competition_invites(self) -> AdminCompetitionInvitesResource:
139        return AdminCompetitionInvitesResource(self._http)
140
141    @cached_property
142    def tasks(self) -> AdminTasksResource:
143        return AdminTasksResource(self._http)
144
145    @cached_property
146    def scheduled_jobs(self) -> AdminScheduledJobsResource:
147        return AdminScheduledJobsResource(self._http)
148
149    def close(self) -> None:
150        """Close the underlying transport.
151
152        Only call this on a standalone client built via :meth:`connect`;
153        when reached via ``PlatformClient.admin`` the transport is shared
154        with — and closed by — the parent player client.
155        """
156        self._http.close()
157
158    def __enter__(self) -> Self:
159        return self
160
161    def __exit__(
162        self,
163        exc_type: type[BaseException] | None,
164        exc: BaseException | None,
165        tb: TracebackType | None,
166    ) -> None:
167        self.close()

Admin / operator surface, grouped into resource namespaces.

Namespaces: users, competitions, announcements, achievements, challenges, instances, records, nodes, observability, settings, competition_admins.

AdminClient(http: ctfy.sdk.base.BaseHttpClient)
69    def __init__(self, http: BaseHttpClient) -> None:
70        #: Shared transport (the parent PlatformClient when reached via
71        #: ``client.admin``, or an owned client when built via ``connect``).
72        self._http = http
@classmethod
def connect( cls, server_url: str, token: str = '', *, max_retries: int = 3, timeout: int = 600) -> AdminClient:
74    @classmethod
75    def connect(
76        cls,
77        server_url: str,
78        token: str = "",
79        *,
80        max_retries: int = 3,
81        timeout: int = DEFAULT_CLIENT_TIMEOUT,
82    ) -> AdminClient:
83        """Build an ``AdminClient`` that owns its own transport.
84
85        For operator tooling that doesn't already hold a
86        :class:`PlatformClient`. The returned client is a context manager
87        that closes its transport on exit.
88        """
89        base = server_url.rstrip("/")
90        http = BaseHttpClient(f"{base}/api/v1", token, timeout=timeout, max_retries=max_retries)
91        return cls(http)

Build an AdminClient that owns its own transport.

For operator tooling that doesn't already hold a PlatformClient. The returned client is a context manager that closes its transport on exit.

93    @cached_property
94    def users(self) -> AdminUsersResource:
95        return AdminUsersResource(self._http)
97    @cached_property
98    def competitions(self) -> AdminCompetitionsResource:
99        return AdminCompetitionsResource(self._http)
101    @cached_property
102    def announcements(self) -> AdminAnnouncementsResource:
103        return AdminAnnouncementsResource(self._http)
105    @cached_property
106    def achievements(self) -> AdminAchievementsResource:
107        return AdminAchievementsResource(self._http)
109    @cached_property
110    def challenges(self) -> AdminChallengesResource:
111        return AdminChallengesResource(self._http)
113    @cached_property
114    def instances(self) -> AdminInstancesResource:
115        return AdminInstancesResource(self._http)
117    @cached_property
118    def records(self) -> AdminRecordsResource:
119        return AdminRecordsResource(self._http)
121    @cached_property
122    def nodes(self) -> AdminNodesResource:
123        return AdminNodesResource(self._http)
125    @cached_property
126    def observability(self) -> AdminObservabilityResource:
127        return AdminObservabilityResource(self._http)
129    @cached_property
130    def settings(self) -> AdminSettingsResource:
131        return AdminSettingsResource(self._http)
133    @cached_property
134    def competition_admins(self) -> AdminCompetitionAdminsResource:
135        return AdminCompetitionAdminsResource(self._http)
137    @cached_property
138    def competition_invites(self) -> AdminCompetitionInvitesResource:
139        return AdminCompetitionInvitesResource(self._http)
141    @cached_property
142    def tasks(self) -> AdminTasksResource:
143        return AdminTasksResource(self._http)
145    @cached_property
146    def scheduled_jobs(self) -> AdminScheduledJobsResource:
147        return AdminScheduledJobsResource(self._http)
def close(self) -> None:
149    def close(self) -> None:
150        """Close the underlying transport.
151
152        Only call this on a standalone client built via :meth:`connect`;
153        when reached via ``PlatformClient.admin`` the transport is shared
154        with — and closed by — the parent player client.
155        """
156        self._http.close()

Close the underlying transport.

Only call this on a standalone client built via connect(); when reached via PlatformClient.admin the transport is shared with — and closed by — the parent player client.

class NodeClient(ctfy.sdk.base.BaseHttpClient):
 48class NodeClient(BaseHttpClient):
 49    """Thin sync HTTP client; one instance per node URL."""
 50
 51    def __init__(
 52        self,
 53        node_url: str,
 54        token: str,
 55        *,
 56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 57    ) -> None:
 58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
 59
 60    # -- lifecycle ----------------------------------------------------------
 61
 62    def start_instance(
 63        self,
 64        *,
 65        challenge_id: str,
 66        instance_id: str,
 67        ttl: int,
 68        answers: dict[str, str],
 69        proxy_output_dir: str | None = None,
 70    ) -> dict[str, Any]:
 71        resp = self.request(
 72            "POST",
 73            "/instances",
 74            json={
 75                "challenge_id": challenge_id,
 76                "instance_id": instance_id,
 77                "ttl": ttl,
 78                "answers": answers,
 79                "proxy_output_dir": proxy_output_dir,
 80            },
 81        )
 82        _raise_for_node_status(resp)
 83        body: dict[str, Any] = resp.json()
 84        return body
 85
 86    def stop_instance(self, instance_id: str) -> None:
 87        resp = self.request("DELETE", f"/instances/{instance_id}")
 88        resp.raise_for_status()
 89
 90    def stop_all(self) -> None:
 91        resp = self.request("POST", "/admin/stop-all")
 92        resp.raise_for_status()
 93
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body
103
104    # -- admin pre-build (image cache warming) ------------------------------
105
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body
118
119    def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
120        """Queue background pre-build on the node.
121
122        With ``challenge_ids=None`` the node builds every spec it knows.
123        With an explicit list (the platform scoping a bulk build to a
124        competition's challenge set) only those are built. Returns
125        ``{queued, skipped_built, skipped_in_progress}`` so the platform
126        can report per-node what got picked up. Sequential on the node
127        side — no fan-out across the corpus.
128        """
129        kwargs: dict[str, Any] = {}
130        if challenge_ids is not None:
131            kwargs["json"] = {"challenge_ids": challenge_ids}
132        resp = self.request("POST", "/admin/challenges/build-all", **kwargs)
133        _raise_for_node_status(resp)
134        body: dict[str, Any] = resp.json()
135        return body
136
137    def get_build_state(self) -> dict[str, Any]:
138        """Fetch every per-challenge build-state row on this node.
139
140        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
141        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
142        ``failed``; ``unbuilt`` placeholders are synthesised for specs
143        the node has seen but never been asked to build.
144        """
145        resp = self.request("GET", "/admin/challenges/build-state")
146        _raise_for_node_status(resp)
147        body: dict[str, Any] = resp.json()
148        return body
149
150    def pull_challenge(self, challenge_id: str) -> dict[str, Any]:
151        """Ask the node to pre-pull registry images for *challenge_id*.
152
153        Pull-side twin of :meth:`build_challenge`: the node persists
154        ``status="pulling"`` and spawns a daemon thread; poll
155        :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing.
156        """
157        resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull")
158        _raise_for_node_status(resp)
159        body: dict[str, Any] = resp.json()
160        return body
161
162    def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
163        """Queue background pre-pull on the node.
164
165        With ``challenge_ids=None`` the node pulls every spec it knows;
166        with an explicit list only those (the platform scoping to a
167        competition). Returns ``{queued, skipped_pulled,
168        skipped_in_progress}``.
169        """
170        kwargs: dict[str, Any] = {}
171        if challenge_ids is not None:
172            kwargs["json"] = {"challenge_ids": challenge_ids}
173        resp = self.request("POST", "/admin/challenges/pull-all", **kwargs)
174        _raise_for_node_status(resp)
175        body: dict[str, Any] = resp.json()
176        return body
177
178    def get_pull_state(self) -> dict[str, Any]:
179        """Fetch every per-challenge pull-state row on this node.
180
181        Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``.
182        ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` /
183        ``failed``; ``unpulled`` placeholders are synthesised for specs
184        the node has seen but never been asked to pull.
185        """
186        resp = self.request("GET", "/admin/challenges/pull-state")
187        _raise_for_node_status(resp)
188        body: dict[str, Any] = resp.json()
189        return body
190
191    # -- status / health ----------------------------------------------------
192
193    def get_status(self, instance_id: str) -> dict[str, Any]:
194        resp = self.request("GET", f"/instances/{instance_id}/status")
195        resp.raise_for_status()
196        body: dict[str, Any] = resp.json()
197        return body
198
199    def check_health(self, instance_id: str) -> bool:
200        resp = self.request("GET", f"/instances/{instance_id}/health")
201        resp.raise_for_status()
202        return bool(resp.json().get("is_healthy"))
203
204    def node_health(self) -> dict[str, Any]:
205        """Liveness + ``{running, capacity}`` for heartbeat."""
206        resp = self.request("GET", "/health")
207        resp.raise_for_status()
208        body: dict[str, Any] = resp.json()
209        return body
210
211    # -- traffic / runtime --------------------------------------------------
212
213    def get_traffic(self, instance_id: str) -> dict[str, Any]:
214        """Fetch mitmproxy flow data; file lives on the node's FS."""
215        resp = self.request("GET", f"/instances/{instance_id}/traffic")
216        resp.raise_for_status()
217        body: dict[str, Any] = resp.json()
218        return body
219
220    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
221        """Enumerate every container in an instance's compose project.
222
223        Backs the admin-shell feature: the platform forwards a
224        ``GET /admin/instances/{id}/containers`` to the assigned node,
225        which returns one row per container (challenge services and
226        platform-injected sidecars alike). The WebSocket reverse-proxy
227        is opened separately and does not go through ``NodeClient``.
228        """
229        resp = self.request("GET", f"/instances/{instance_id}/containers")
230        _raise_for_node_status(resp)
231        body: list[dict[str, Any]] = resp.json()
232        return body
233
234    def get_container_logs(self, instance_id: str) -> str:
235        """Fetch combined container stdout/stderr for archival."""
236        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
237        resp.raise_for_status()
238        return str(resp.json().get("logs") or "")
239
240    def get_pcap(self, instance_id: str) -> bytes:
241        """Fetch the raw tcpdump capture for *instance_id*.
242
243        Returns an empty ``bytes`` when no capture exists (sidecar
244        disabled, instance never reached the running state, etc.) so
245        callers can persist conditionally without try/except.
246        """
247        resp = self.request("GET", f"/instances/{instance_id}/pcap")
248        if resp.status_code == 404:
249            return b""
250        resp.raise_for_status()
251        return resp.content
252
253    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
254        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
255        CA PEM, optional Docker-specific names). Replaces the old
256        sandbox-network shape."""
257        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
258        resp.raise_for_status()
259        body: dict[str, Any] = resp.json()
260        return body
261
262    # -- per-instance rendered attachments ---------------------------------
263
264    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
265        """List per-instance attachments rendered into the node's workdir.
266
267        Returns the raw JSON dict — caller projects through
268        :class:`AttachmentList` for typing. Used by the platform's
269        per-instance attachment endpoint to surface team-specific
270        rendered file listings."""
271        resp = self.request("GET", f"/instances/{instance_id}/attachments")
272        resp.raise_for_status()
273        body: dict[str, Any] = resp.json()
274        return body
275
276    def get_openvpn_config(self, instance_id: str) -> bytes:
277        """Fetch the per-instance OpenVPN client config from the node.
278
279        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
280        ``openvpn`` service container (generated on first boot by the
281        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
282        verbatim — the platform-side route adds the
283        ``Content-Disposition`` header before re-emitting to the player.
284
285        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
286        proxy route catches 404 and re-emits to the player, anything
287        else is treated as a 502.
288        """
289        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
290        resp.raise_for_status()
291        return resp.content
292
293    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
294        """Download one per-instance attachment as ``(bytes, content_type)``.
295
296        Buffers the whole body — attachments are bounded (typical case
297        is a 10 KB binary or a small text file). For huge captures the
298        caller should hand the player a CDN URL instead.
299
300        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
301        proxy route catches 404 and re-emits to the player, anything
302        else is a 502."""
303        resp = self.request(
304            "GET",
305            f"/instances/{instance_id}/attachments/{filename}",
306        )
307        resp.raise_for_status()
308        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Thin sync HTTP client; one instance per node URL.

NodeClient(node_url: str, token: str, *, timeout: int = 600)
51    def __init__(
52        self,
53        node_url: str,
54        token: str,
55        *,
56        timeout: int = DEFAULT_CLIENT_TIMEOUT,
57    ) -> None:
58        super().__init__(f"{node_url.rstrip('/')}/api/v1", token, timeout=timeout)
def start_instance( self, *, challenge_id: str, instance_id: str, ttl: int, answers: dict[str, str], proxy_output_dir: str | None = None) -> dict[str, typing.Any]:
62    def start_instance(
63        self,
64        *,
65        challenge_id: str,
66        instance_id: str,
67        ttl: int,
68        answers: dict[str, str],
69        proxy_output_dir: str | None = None,
70    ) -> dict[str, Any]:
71        resp = self.request(
72            "POST",
73            "/instances",
74            json={
75                "challenge_id": challenge_id,
76                "instance_id": instance_id,
77                "ttl": ttl,
78                "answers": answers,
79                "proxy_output_dir": proxy_output_dir,
80            },
81        )
82        _raise_for_node_status(resp)
83        body: dict[str, Any] = resp.json()
84        return body
def stop_instance(self, instance_id: str) -> None:
86    def stop_instance(self, instance_id: str) -> None:
87        resp = self.request("DELETE", f"/instances/{instance_id}")
88        resp.raise_for_status()
def stop_all(self) -> None:
90    def stop_all(self) -> None:
91        resp = self.request("POST", "/admin/stop-all")
92        resp.raise_for_status()
def rescan_challenges(self) -> dict[str, typing.Any]:
 94    def rescan_challenges(self) -> dict[str, Any]:
 95        """Tell the node to drop its spec cache and re-scan challenges_dir.
 96
 97        Returns ``{total, added, removed}`` so the platform can report
 98        the per-node outcome of a cluster-wide rescan."""
 99        resp = self.request("POST", "/admin/rescan-challenges")
100        resp.raise_for_status()
101        body: dict[str, Any] = resp.json()
102        return body

Tell the node to drop its spec cache and re-scan challenges_dir.

Returns {total, added, removed} so the platform can report the per-node outcome of a cluster-wide rescan.

def build_challenge(self, challenge_id: str) -> dict[str, typing.Any]:
106    def build_challenge(self, challenge_id: str) -> dict[str, Any]:
107        """Ask the node to pre-build images for *challenge_id*.
108
109        Fires-and-returns: the node persists ``status="building"`` and
110        spawns a daemon thread; the body returned here is that initial
111        state row. Polling :meth:`get_build_state` is how the platform
112        learns when it lands on ``built`` / ``failed``.
113        """
114        resp = self.request("POST", f"/admin/challenges/{challenge_id}/build")
115        _raise_for_node_status(resp)
116        body: dict[str, Any] = resp.json()
117        return body

Ask the node to pre-build images for challenge_id.

Fires-and-returns: the node persists status="building" and spawns a daemon thread; the body returned here is that initial state row. Polling get_build_state() is how the platform learns when it lands on built / failed.

def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, typing.Any]:
119    def build_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
120        """Queue background pre-build on the node.
121
122        With ``challenge_ids=None`` the node builds every spec it knows.
123        With an explicit list (the platform scoping a bulk build to a
124        competition's challenge set) only those are built. Returns
125        ``{queued, skipped_built, skipped_in_progress}`` so the platform
126        can report per-node what got picked up. Sequential on the node
127        side — no fan-out across the corpus.
128        """
129        kwargs: dict[str, Any] = {}
130        if challenge_ids is not None:
131            kwargs["json"] = {"challenge_ids": challenge_ids}
132        resp = self.request("POST", "/admin/challenges/build-all", **kwargs)
133        _raise_for_node_status(resp)
134        body: dict[str, Any] = resp.json()
135        return body

Queue background pre-build on the node.

With challenge_ids=None the node builds every spec it knows. With an explicit list (the platform scoping a bulk build to a competition's challenge set) only those are built. Returns {queued, skipped_built, skipped_in_progress} so the platform can report per-node what got picked up. Sequential on the node side — no fan-out across the corpus.

def get_build_state(self) -> dict[str, typing.Any]:
137    def get_build_state(self) -> dict[str, Any]:
138        """Fetch every per-challenge build-state row on this node.
139
140        Returns ``{"rows": [{challenge_id, status, built_at, error}, …]}``.
141        ``status`` is one of ``unbuilt`` / ``building`` / ``built`` /
142        ``failed``; ``unbuilt`` placeholders are synthesised for specs
143        the node has seen but never been asked to build.
144        """
145        resp = self.request("GET", "/admin/challenges/build-state")
146        _raise_for_node_status(resp)
147        body: dict[str, Any] = resp.json()
148        return body

Fetch every per-challenge build-state row on this node.

Returns {"rows": [{challenge_id, status, built_at, error}, …]}. status is one of unbuilt / building / built / failed; unbuilt placeholders are synthesised for specs the node has seen but never been asked to build.

def pull_challenge(self, challenge_id: str) -> dict[str, typing.Any]:
150    def pull_challenge(self, challenge_id: str) -> dict[str, Any]:
151        """Ask the node to pre-pull registry images for *challenge_id*.
152
153        Pull-side twin of :meth:`build_challenge`: the node persists
154        ``status="pulling"`` and spawns a daemon thread; poll
155        :meth:`get_pull_state` for the ``pulled`` / ``failed`` landing.
156        """
157        resp = self.request("POST", f"/admin/challenges/{challenge_id}/pull")
158        _raise_for_node_status(resp)
159        body: dict[str, Any] = resp.json()
160        return body

Ask the node to pre-pull registry images for challenge_id.

Pull-side twin of build_challenge(): the node persists status="pulling" and spawns a daemon thread; poll get_pull_state() for the pulled / failed landing.

def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, typing.Any]:
162    def pull_all_challenges(self, challenge_ids: list[str] | None = None) -> dict[str, Any]:
163        """Queue background pre-pull on the node.
164
165        With ``challenge_ids=None`` the node pulls every spec it knows;
166        with an explicit list only those (the platform scoping to a
167        competition). Returns ``{queued, skipped_pulled,
168        skipped_in_progress}``.
169        """
170        kwargs: dict[str, Any] = {}
171        if challenge_ids is not None:
172            kwargs["json"] = {"challenge_ids": challenge_ids}
173        resp = self.request("POST", "/admin/challenges/pull-all", **kwargs)
174        _raise_for_node_status(resp)
175        body: dict[str, Any] = resp.json()
176        return body

Queue background pre-pull on the node.

With challenge_ids=None the node pulls every spec it knows; with an explicit list only those (the platform scoping to a competition). Returns {queued, skipped_pulled, skipped_in_progress}.

def get_pull_state(self) -> dict[str, typing.Any]:
178    def get_pull_state(self) -> dict[str, Any]:
179        """Fetch every per-challenge pull-state row on this node.
180
181        Returns ``{"rows": [{challenge_id, status, pulled_at, error}, …]}``.
182        ``status`` is one of ``unpulled`` / ``pulling`` / ``pulled`` /
183        ``failed``; ``unpulled`` placeholders are synthesised for specs
184        the node has seen but never been asked to pull.
185        """
186        resp = self.request("GET", "/admin/challenges/pull-state")
187        _raise_for_node_status(resp)
188        body: dict[str, Any] = resp.json()
189        return body

Fetch every per-challenge pull-state row on this node.

Returns {"rows": [{challenge_id, status, pulled_at, error}, …]}. status is one of unpulled / pulling / pulled / failed; unpulled placeholders are synthesised for specs the node has seen but never been asked to pull.

def get_status(self, instance_id: str) -> dict[str, typing.Any]:
193    def get_status(self, instance_id: str) -> dict[str, Any]:
194        resp = self.request("GET", f"/instances/{instance_id}/status")
195        resp.raise_for_status()
196        body: dict[str, Any] = resp.json()
197        return body
def check_health(self, instance_id: str) -> bool:
199    def check_health(self, instance_id: str) -> bool:
200        resp = self.request("GET", f"/instances/{instance_id}/health")
201        resp.raise_for_status()
202        return bool(resp.json().get("is_healthy"))
def node_health(self) -> dict[str, typing.Any]:
204    def node_health(self) -> dict[str, Any]:
205        """Liveness + ``{running, capacity}`` for heartbeat."""
206        resp = self.request("GET", "/health")
207        resp.raise_for_status()
208        body: dict[str, Any] = resp.json()
209        return body

Liveness + {running, capacity} for heartbeat.

def get_traffic(self, instance_id: str) -> dict[str, typing.Any]:
213    def get_traffic(self, instance_id: str) -> dict[str, Any]:
214        """Fetch mitmproxy flow data; file lives on the node's FS."""
215        resp = self.request("GET", f"/instances/{instance_id}/traffic")
216        resp.raise_for_status()
217        body: dict[str, Any] = resp.json()
218        return body

Fetch mitmproxy flow data; file lives on the node's FS.

def list_containers(self, instance_id: str) -> list[dict[str, typing.Any]]:
220    def list_containers(self, instance_id: str) -> list[dict[str, Any]]:
221        """Enumerate every container in an instance's compose project.
222
223        Backs the admin-shell feature: the platform forwards a
224        ``GET /admin/instances/{id}/containers`` to the assigned node,
225        which returns one row per container (challenge services and
226        platform-injected sidecars alike). The WebSocket reverse-proxy
227        is opened separately and does not go through ``NodeClient``.
228        """
229        resp = self.request("GET", f"/instances/{instance_id}/containers")
230        _raise_for_node_status(resp)
231        body: list[dict[str, Any]] = resp.json()
232        return body

Enumerate every container in an instance's compose project.

Backs the admin-shell feature: the platform forwards a GET /admin/instances/{id}/containers to the assigned node, which returns one row per container (challenge services and platform-injected sidecars alike). The WebSocket reverse-proxy is opened separately and does not go through NodeClient.

def get_container_logs(self, instance_id: str) -> str:
234    def get_container_logs(self, instance_id: str) -> str:
235        """Fetch combined container stdout/stderr for archival."""
236        resp = self.request("GET", f"/instances/{instance_id}/container-logs")
237        resp.raise_for_status()
238        return str(resp.json().get("logs") or "")

Fetch combined container stdout/stderr for archival.

def get_pcap(self, instance_id: str) -> bytes:
240    def get_pcap(self, instance_id: str) -> bytes:
241        """Fetch the raw tcpdump capture for *instance_id*.
242
243        Returns an empty ``bytes`` when no capture exists (sidecar
244        disabled, instance never reached the running state, etc.) so
245        callers can persist conditionally without try/except.
246        """
247        resp = self.request("GET", f"/instances/{instance_id}/pcap")
248        if resp.status_code == 404:
249            return b""
250        resp.raise_for_status()
251        return resp.content

Fetch the raw tcpdump capture for instance_id.

Returns an empty bytes when no capture exists (sidecar disabled, instance never reached the running state, etc.) so callers can persist conditionally without try/except.

def get_agent_runtime(self, instance_id: str) -> dict[str, typing.Any]:
253    def get_agent_runtime(self, instance_id: str) -> dict[str, Any]:
254        """Provider-agnostic runtime hints for attaching an agent (proxy URL,
255        CA PEM, optional Docker-specific names). Replaces the old
256        sandbox-network shape."""
257        resp = self.request("GET", f"/instances/{instance_id}/agent-runtime")
258        resp.raise_for_status()
259        body: dict[str, Any] = resp.json()
260        return body

Provider-agnostic runtime hints for attaching an agent (proxy URL, CA PEM, optional Docker-specific names). Replaces the old sandbox-network shape.

def list_instance_attachments(self, instance_id: str) -> dict[str, typing.Any]:
264    def list_instance_attachments(self, instance_id: str) -> dict[str, Any]:
265        """List per-instance attachments rendered into the node's workdir.
266
267        Returns the raw JSON dict — caller projects through
268        :class:`AttachmentList` for typing. Used by the platform's
269        per-instance attachment endpoint to surface team-specific
270        rendered file listings."""
271        resp = self.request("GET", f"/instances/{instance_id}/attachments")
272        resp.raise_for_status()
273        body: dict[str, Any] = resp.json()
274        return body

List per-instance attachments rendered into the node's workdir.

Returns the raw JSON dict — caller projects through AttachmentList for typing. Used by the platform's per-instance attachment endpoint to surface team-specific rendered file listings.

def get_openvpn_config(self, instance_id: str) -> bytes:
276    def get_openvpn_config(self, instance_id: str) -> bytes:
277        """Fetch the per-instance OpenVPN client config from the node.
278
279        The node reads ``/etc/openvpn/client.ovpn`` out of the instance's
280        ``openvpn`` service container (generated on first boot by the
281        ``ctfy/openvpn-base`` image's bootstrap). Returns the body
282        verbatim — the platform-side route adds the
283        ``Content-Disposition`` header before re-emitting to the player.
284
285        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
286        proxy route catches 404 and re-emits to the player, anything
287        else is treated as a 502.
288        """
289        resp = self.request("GET", f"/instances/{instance_id}/openvpn-config")
290        resp.raise_for_status()
291        return resp.content

Fetch the per-instance OpenVPN client config from the node.

The node reads /etc/openvpn/client.ovpn out of the instance's openvpn service container (generated on first boot by the ctfy/openvpn-base image's bootstrap). Returns the body verbatim — the platform-side route adds the Content-Disposition header before re-emitting to the player.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is treated as a 502.

def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
293    def download_instance_attachment(self, instance_id: str, filename: str) -> tuple[bytes, str]:
294        """Download one per-instance attachment as ``(bytes, content_type)``.
295
296        Buffers the whole body — attachments are bounded (typical case
297        is a 10 KB binary or a small text file). For huge captures the
298        caller should hand the player a CDN URL instead.
299
300        Raises ``httpx.HTTPStatusError`` on non-2xx — the platform's
301        proxy route catches 404 and re-emits to the player, anything
302        else is a 502."""
303        resp = self.request(
304            "GET",
305            f"/instances/{instance_id}/attachments/{filename}",
306        )
307        resp.raise_for_status()
308        return resp.content, resp.headers.get("content-type", "application/octet-stream")

Download one per-instance attachment as (bytes, content_type).

Buffers the whole body — attachments are bounded (typical case is a 10 KB binary or a small text file). For huge captures the caller should hand the player a CDN URL instead.

Raises httpx.HTTPStatusError on non-2xx — the platform's proxy route catches 404 and re-emits to the player, anything else is a 502.

class PlatformClient(ctfy.sdk.base.BaseHttpClient):
 77class PlatformClient(BaseHttpClient):
 78    """Player-facing HTTP client for the ctfy platform.
 79
 80    Methods are grouped into resource namespaces (``client.teams.list()``, …)
 81    plus a per-competition scope via ``client.competition(id)``. The admin
 82    surface is a separate :class:`~ctfy.sdk.admin.AdminClient` reached via
 83    :attr:`admin`. Used by the CLI, the SDK, the MCP server, and external callers.
 84    """
 85
 86    def __init__(
 87        self,
 88        server_url: str,
 89        token: str = "",
 90        max_retries: int = 3,
 91        timeout: int = DEFAULT_CLIENT_TIMEOUT,
 92    ):
 93        self._url = server_url.rstrip("/")
 94        super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries)
 95
 96    # -- resource namespaces ------------------------------------------------
 97
 98    @cached_property
 99    def teams(self) -> TeamsResource:
100        """Public team discovery + profile reads."""
101        return TeamsResource(self)
102
103    @cached_property
104    def users(self) -> UsersResource:
105        """Public per-user profile reads."""
106        return UsersResource(self)
107
108    @cached_property
109    def me(self) -> MeResource:
110        """The calling user's profile, inbox, account, and progress."""
111        return MeResource(self)
112
113    @cached_property
114    def auth(self) -> AuthResource:
115        """Password auth, OAuth identities, API tokens, sign-in discovery."""
116        return AuthResource(self)
117
118    @cached_property
119    def achievements(self) -> AchievementsResource:
120        """Public badge catalog, recent-unlock feed, easter eggs."""
121        return AchievementsResource(self)
122
123    @cached_property
124    def challenges(self) -> ChallengesResource:
125        """Global challenge catalog, attachments, facets, feedback chips."""
126        return ChallengesResource(self)
127
128    @cached_property
129    def competitions(self) -> CompetitionsResource:
130        """Discover competitions (``list`` / ``get``). To act within one, use
131        :meth:`competition`."""
132        return CompetitionsResource(self)
133
134    def competition(self, competition_id: str) -> Competition:
135        """Scope to one competition. The inherently competition-scoped
136        operations — register, your team (captain tooling + invites +
137        join-requests), the comp's challenges, team search, and standings —
138        hang off the returned :class:`~ctfy.sdk.competition.Competition`
139        handle, so ``competition_id`` is named once instead of on every call.
140        (Instance lifecycle and answer submission stay on :attr:`instances` /
141        :attr:`submissions`, keyed by ``instance_id``.)"""
142        return Competition(self, competition_id)
143
144    @cached_property
145    def instances(self) -> InstancesResource:
146        """Launch / control / inspect challenge instances by instance id
147        (the competition is inferred server-side)."""
148        return InstancesResource(self)
149
150    @cached_property
151    def submissions(self) -> SubmissionsResource:
152        """Submit / verify answers + list submissions (by instance id), plus
153        the instance-less QA challenges."""
154        return SubmissionsResource(self)
155
156    @cached_property
157    def scoreboard(self) -> ScoreboardResource:
158        """Global standings, per-challenge stats, persisted snapshots."""
159        return ScoreboardResource(self)
160
161    @cached_property
162    def activities(self) -> ActivitiesResource:
163        """Platform activity log + histogram."""
164        return ActivitiesResource(self)
165
166    @cached_property
167    def nodes(self) -> NodesResource:
168        """Public worker-node list (operator node mgmt is on ``admin.nodes``)."""
169        return NodesResource(self)
170
171    @cached_property
172    def admin(self) -> AdminClient:
173        """Admin / operator surface (separate, role-gated server-side).
174
175        Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`.
176        """
177        return AdminClient(self)
178
179    # -- top-level convenience: server status / realtime / version ----------
180
181    def health(self) -> HealthResponse:
182        resp = self.request("GET", "/health")
183        _raise_for_status(resp)
184        return HealthResponse.model_validate(resp.json())
185
186    def get_meta(self) -> MetaResponse:
187        """Server identity + challenge repo SHA + build version.
188        Admin tokens additionally see cluster capacity + team / solve
189        counts in the same payload."""
190        resp = self.request("GET", "/meta")
191        _raise_for_status(resp)
192        return MetaResponse.model_validate(resp.json())
193
194    def cluster_info(self) -> ClusterInfo:
195        """Aggregate worker-node capacity / utilisation (the public
196        capacity banner; no per-node detail)."""
197        resp = self.request("GET", "/cluster-info")
198        _raise_for_status(resp)
199        return ClusterInfo.model_validate(resp.json())
200
201    def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck:
202        """Compare this client's ``ctfy`` version against the server's.
203
204        Uses the cheap unauthenticated ``/health`` probe (pass an
205        already-fetched :class:`HealthResponse` to avoid a second round
206        trip). Pure classification — never prints, never raises on a
207        mismatch, never mutates anything. The caller (CLI, MCP, harness)
208        decides what to do with :class:`~ctfy.core.version.VersionCheck`
209        (typically: print ``.message`` to stderr when not ``.quiet``).
210
211        A server too old to report its version yields
212        :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which
213        is ``.quiet`` — so this stays silent against legacy servers
214        rather than crying wolf.
215        """
216        h = health if health is not None else self.health()
217        return check_compatibility(package_version(), h.version)
218
219    def events(self, *, auto_reconnect: bool = True) -> EventStream:
220        """Open the platform SSE event stream.
221
222        Yields ``{"event": <name>, "data": <dict>}`` for each frame.
223        Filters: team-scoped events for every team the caller's user is
224        on (across every per-comp competition), plus all global events;
225        admin tokens additionally see admin-only frames.
226
227        Usage::
228
229            with client.events() as stream:
230                for event in stream:
231                    if event["event"] == "solve":
232                        ...
233
234        Auto-reconnects on transient network errors with exponential
235        backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make
236        the iterator raise instead.
237        """
238        token = self._token
239
240        def _factory() -> httpx.Client:
241            # New client per session so a reconnect after a stale
242            # connection drops fresh sockets, not warmed-over ones.
243            return httpx.Client(base_url=self._base_url, timeout=None)
244
245        return EventStream(
246            client_factory=_factory,
247            path="/events",
248            token=token,
249            auto_reconnect=auto_reconnect,
250        )

Player-facing HTTP client for the ctfy platform.

Methods are grouped into resource namespaces (client.teams.list(), …) plus a per-competition scope via client.competition(id). The admin surface is a separate ~ctfy.sdk.admin.AdminClient reached via admin. Used by the CLI, the SDK, the MCP server, and external callers.

PlatformClient( server_url: str, token: str = '', max_retries: int = 3, timeout: int = 600)
86    def __init__(
87        self,
88        server_url: str,
89        token: str = "",
90        max_retries: int = 3,
91        timeout: int = DEFAULT_CLIENT_TIMEOUT,
92    ):
93        self._url = server_url.rstrip("/")
94        super().__init__(f"{self._url}/api/v1", token, timeout=timeout, max_retries=max_retries)
 98    @cached_property
 99    def teams(self) -> TeamsResource:
100        """Public team discovery + profile reads."""
101        return TeamsResource(self)

Public team discovery + profile reads.

103    @cached_property
104    def users(self) -> UsersResource:
105        """Public per-user profile reads."""
106        return UsersResource(self)

Public per-user profile reads.

108    @cached_property
109    def me(self) -> MeResource:
110        """The calling user's profile, inbox, account, and progress."""
111        return MeResource(self)

The calling user's profile, inbox, account, and progress.

113    @cached_property
114    def auth(self) -> AuthResource:
115        """Password auth, OAuth identities, API tokens, sign-in discovery."""
116        return AuthResource(self)

Password auth, OAuth identities, API tokens, sign-in discovery.

118    @cached_property
119    def achievements(self) -> AchievementsResource:
120        """Public badge catalog, recent-unlock feed, easter eggs."""
121        return AchievementsResource(self)

Public badge catalog, recent-unlock feed, easter eggs.

123    @cached_property
124    def challenges(self) -> ChallengesResource:
125        """Global challenge catalog, attachments, facets, feedback chips."""
126        return ChallengesResource(self)

Global challenge catalog, attachments, facets, feedback chips.

128    @cached_property
129    def competitions(self) -> CompetitionsResource:
130        """Discover competitions (``list`` / ``get``). To act within one, use
131        :meth:`competition`."""
132        return CompetitionsResource(self)

Discover competitions (list / get). To act within one, use competition().

def competition(self, competition_id: str) -> ctfy.sdk.competition.Competition:
134    def competition(self, competition_id: str) -> Competition:
135        """Scope to one competition. The inherently competition-scoped
136        operations — register, your team (captain tooling + invites +
137        join-requests), the comp's challenges, team search, and standings —
138        hang off the returned :class:`~ctfy.sdk.competition.Competition`
139        handle, so ``competition_id`` is named once instead of on every call.
140        (Instance lifecycle and answer submission stay on :attr:`instances` /
141        :attr:`submissions`, keyed by ``instance_id``.)"""
142        return Competition(self, competition_id)

Scope to one competition. The inherently competition-scoped operations — register, your team (captain tooling + invites + join-requests), the comp's challenges, team search, and standings — hang off the returned ~ctfy.sdk.competition.Competition handle, so competition_id is named once instead of on every call. (Instance lifecycle and answer submission stay on instances / submissions, keyed by instance_id.)

144    @cached_property
145    def instances(self) -> InstancesResource:
146        """Launch / control / inspect challenge instances by instance id
147        (the competition is inferred server-side)."""
148        return InstancesResource(self)

Launch / control / inspect challenge instances by instance id (the competition is inferred server-side).

150    @cached_property
151    def submissions(self) -> SubmissionsResource:
152        """Submit / verify answers + list submissions (by instance id), plus
153        the instance-less QA challenges."""
154        return SubmissionsResource(self)

Submit / verify answers + list submissions (by instance id), plus the instance-less QA challenges.

156    @cached_property
157    def scoreboard(self) -> ScoreboardResource:
158        """Global standings, per-challenge stats, persisted snapshots."""
159        return ScoreboardResource(self)

Global standings, per-challenge stats, persisted snapshots.

161    @cached_property
162    def activities(self) -> ActivitiesResource:
163        """Platform activity log + histogram."""
164        return ActivitiesResource(self)

Platform activity log + histogram.

166    @cached_property
167    def nodes(self) -> NodesResource:
168        """Public worker-node list (operator node mgmt is on ``admin.nodes``)."""
169        return NodesResource(self)

Public worker-node list (operator node mgmt is on admin.nodes).

admin: AdminClient
171    @cached_property
172    def admin(self) -> AdminClient:
173        """Admin / operator surface (separate, role-gated server-side).
174
175        Shares this client's transport. See :class:`~ctfy.sdk.admin.AdminClient`.
176        """
177        return AdminClient(self)

Admin / operator surface (separate, role-gated server-side).

Shares this client's transport. See ~ctfy.sdk.admin.AdminClient.

def health(self) -> ctfy.server.models.HealthResponse:
181    def health(self) -> HealthResponse:
182        resp = self.request("GET", "/health")
183        _raise_for_status(resp)
184        return HealthResponse.model_validate(resp.json())
def get_meta(self) -> ctfy.server.models.MetaResponse:
186    def get_meta(self) -> MetaResponse:
187        """Server identity + challenge repo SHA + build version.
188        Admin tokens additionally see cluster capacity + team / solve
189        counts in the same payload."""
190        resp = self.request("GET", "/meta")
191        _raise_for_status(resp)
192        return MetaResponse.model_validate(resp.json())

Server identity + challenge repo SHA + build version. Admin tokens additionally see cluster capacity + team / solve counts in the same payload.

def cluster_info(self) -> ctfy.server.models.ClusterInfo:
194    def cluster_info(self) -> ClusterInfo:
195        """Aggregate worker-node capacity / utilisation (the public
196        capacity banner; no per-node detail)."""
197        resp = self.request("GET", "/cluster-info")
198        _raise_for_status(resp)
199        return ClusterInfo.model_validate(resp.json())

Aggregate worker-node capacity / utilisation (the public capacity banner; no per-node detail).

def check_server_compatibility( self, *, health: ctfy.server.models.HealthResponse | None = None) -> ctfy.core.version.VersionCheck:
201    def check_server_compatibility(self, *, health: HealthResponse | None = None) -> VersionCheck:
202        """Compare this client's ``ctfy`` version against the server's.
203
204        Uses the cheap unauthenticated ``/health`` probe (pass an
205        already-fetched :class:`HealthResponse` to avoid a second round
206        trip). Pure classification — never prints, never raises on a
207        mismatch, never mutates anything. The caller (CLI, MCP, harness)
208        decides what to do with :class:`~ctfy.core.version.VersionCheck`
209        (typically: print ``.message`` to stderr when not ``.quiet``).
210
211        A server too old to report its version yields
212        :attr:`Compatibility.UNKNOWN` (``health.version == ""``), which
213        is ``.quiet`` — so this stays silent against legacy servers
214        rather than crying wolf.
215        """
216        h = health if health is not None else self.health()
217        return check_compatibility(package_version(), h.version)

Compare this client's ctfy version against the server's.

Uses the cheap unauthenticated /health probe (pass an already-fetched HealthResponse to avoid a second round trip). Pure classification — never prints, never raises on a mismatch, never mutates anything. The caller (CLI, MCP, harness) decides what to do with ~ctfy.core.version.VersionCheck (typically: print .message to stderr when not .quiet).

A server too old to report its version yields Compatibility.UNKNOWN (health.version == ""), which is .quiet — so this stays silent against legacy servers rather than crying wolf.

def events(self, *, auto_reconnect: bool = True) -> ctfy.sdk.events.EventStream:
219    def events(self, *, auto_reconnect: bool = True) -> EventStream:
220        """Open the platform SSE event stream.
221
222        Yields ``{"event": <name>, "data": <dict>}`` for each frame.
223        Filters: team-scoped events for every team the caller's user is
224        on (across every per-comp competition), plus all global events;
225        admin tokens additionally see admin-only frames.
226
227        Usage::
228
229            with client.events() as stream:
230                for event in stream:
231                    if event["event"] == "solve":
232                        ...
233
234        Auto-reconnects on transient network errors with exponential
235        backoff (1s → 30s capped). Set ``auto_reconnect=False`` to make
236        the iterator raise instead.
237        """
238        token = self._token
239
240        def _factory() -> httpx.Client:
241            # New client per session so a reconnect after a stale
242            # connection drops fresh sockets, not warmed-over ones.
243            return httpx.Client(base_url=self._base_url, timeout=None)
244
245        return EventStream(
246            client_factory=_factory,
247            path="/events",
248            token=token,
249            auto_reconnect=auto_reconnect,
250        )

Open the platform SSE event stream.

Yields {"event": <name>, "data": <dict>} for each frame. Filters: team-scoped events for every team the caller's user is on (across every per-comp competition), plus all global events; admin tokens additionally see admin-only frames.

Usage::

with client.events() as stream:
    for event in stream:
        if event["event"] == "solve":
            ...

Auto-reconnects on transient network errors with exponential backoff (1s → 30s capped). Set auto_reconnect=False to make the iterator raise instead.