ctfy.sdk.admin_resources.challenges

client.admin.challenges — catalog rescan, pre-build, stats, feedback audit.

  1"""``client.admin.challenges`` — catalog rescan, pre-build, stats, feedback audit."""
  2
  3from __future__ import annotations
  4
  5from typing import Any
  6
  7from ctfy.sdk._helpers import _extract_items, _raise_for_status
  8from ctfy.sdk.base import BaseHttpClient
  9from ctfy.server.models import (
 10    AdminChallengeLatencyBucket,
 11    AdminChallengeStatsRow,
 12    AdminChallengeTimeseries,
 13    AdminFeedbackRow,
 14    ChallengeBuildKickoffResponse,
 15    ChallengeBuildStateResponse,
 16    QuestionAttemptResetInfo,
 17)
 18
 19
 20class AdminChallengesResource:
 21    """Operator challenge tooling: reload the catalog, pre-build on nodes,
 22    inspect attempt/solve stats, audit feedback, reset attempt caps."""
 23
 24    def __init__(self, http: BaseHttpClient) -> None:
 25        self._http = http
 26
 27    def rescan(self) -> dict[str, Any]:
 28        """Reload the on-disk challenge catalog without a restart.
 29
 30        Drops the platform's process-lifetime spec cache, re-scans
 31        ``challenges_dir``, and fans the rescan out to every online
 32        worker node. Returns ``{total, added, removed, nodes}`` —
 33        the platform's post-rescan counts plus the per-node fan-out
 34        outcome. Wraps ``POST /admin/challenges/rescan``."""
 35        resp = self._http.request("POST", "/admin/challenges/rescan")
 36        _raise_for_status(resp)
 37        body: dict[str, Any] = resp.json()
 38        return body
 39
 40    def build(self, challenge_id: str) -> ChallengeBuildKickoffResponse:
 41        """Kick off a single-challenge pre-build on every online node.
 42
 43        Returns the per-node kickoff outcomes. Each node spawns a
 44        background thread; poll :meth:`build_state` to watch
 45        ``building`` → ``built`` / ``failed``. Wraps
 46        ``POST /admin/challenges/{id}/build``."""
 47        resp = self._http.request("POST", f"/admin/challenges/{challenge_id}/build")
 48        _raise_for_status(resp)
 49        return ChallengeBuildKickoffResponse.model_validate(resp.json())
 50
 51    def build_all(self) -> ChallengeBuildKickoffResponse:
 52        """Kick off a bulk pre-build on every online node.
 53
 54        Each node sequentially pre-builds every spec not already in
 55        ``built`` / ``building``. Returns per-node kickoff outcomes
 56        with ``queued`` / ``skipped_built`` / ``skipped_in_progress``
 57        lists. Wraps ``POST /admin/challenges/build-all``."""
 58        resp = self._http.request("POST", "/admin/challenges/build-all")
 59        _raise_for_status(resp)
 60        return ChallengeBuildKickoffResponse.model_validate(resp.json())
 61
 62    def build_state(self) -> ChallengeBuildStateResponse:
 63        """Aggregate per-challenge build state across every online node.
 64
 65        Returns one row per known challenge with an ``aggregated``
 66        worst-case status plus the per-node breakdown for the modal
 67        drill-down. Wraps ``GET /admin/challenges/build-state``."""
 68        resp = self._http.request("GET", "/admin/challenges/build-state")
 69        _raise_for_status(resp)
 70        return ChallengeBuildStateResponse.model_validate(resp.json())
 71
 72    def stats(self, offset: int = 0, limit: int = 50) -> list[AdminChallengeStatsRow]:
 73        """Per-challenge attempt / solve / first-blood counts."""
 74        resp = self._http.request(
 75            "GET",
 76            "/admin/challenges/stats",
 77            params={"offset": offset, "limit": limit},
 78        )
 79        _raise_for_status(resp)
 80        return [AdminChallengeStatsRow.model_validate(r) for r in resp.json()["items"]]
 81
 82    def timeseries(
 83        self,
 84        challenge_id: str,
 85        *,
 86        window: int = 86400,
 87        bucket: int = 3600,
 88    ) -> AdminChallengeTimeseries:
 89        """Per-challenge timeseries for the row-level drawer on
 90        ``/admin/challenges``."""
 91        resp = self._http.request(
 92            "GET",
 93            f"/admin/challenges/{challenge_id}/timeseries",
 94            params={"window": window, "bucket": bucket},
 95        )
 96        _raise_for_status(resp)
 97        return AdminChallengeTimeseries.model_validate(resp.json())
 98
 99    def latency_histogram(self) -> list[AdminChallengeLatencyBucket]:
100        """Instance-start latency distribution across challenges (the
101        admin challenges latency chart)."""
102        resp = self._http.request("GET", "/admin/challenges/latency-histogram")
103        _raise_for_status(resp)
104        return [AdminChallengeLatencyBucket.model_validate(b) for b in resp.json()]
105
106    def feedback(self, challenge_id: str, *, competition_id: str = "") -> list[AdminFeedbackRow]:
107        """Admin audit list: every reaction row on a challenge with
108        denormalised user identity. Requires admin role."""
109        params: dict[str, Any] = {}
110        if competition_id:
111            params["competition_id"] = competition_id
112        resp = self._http.request(
113            "GET", f"/admin/challenges/{challenge_id}/feedback", params=params
114        )
115        _raise_for_status(resp)
116        return _extract_items(resp.json(), AdminFeedbackRow)
117
118    def reset_question_attempts(
119        self,
120        team_id: str,
121        challenge_id: str,
122        question_id: str,
123        *,
124        reason: str = "",
125    ) -> QuestionAttemptResetInfo:
126        """Reset the per-question wrong-attempt counter for one team.
127
128        Writes a fresh baseline timestamp; the cap counter ignores all
129        wrong submissions before the new baseline, so the team gets a
130        fresh batch of attempts on this question without any audit
131        rows being deleted. Returns the new baseline row.
132        Wraps ``POST /admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts``."""
133        resp = self._http.request(
134            "POST",
135            f"/admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts",
136            json={"reason": reason},
137        )
138        _raise_for_status(resp)
139        return QuestionAttemptResetInfo.model_validate(resp.json())
class AdminChallengesResource:
 21class AdminChallengesResource:
 22    """Operator challenge tooling: reload the catalog, pre-build on nodes,
 23    inspect attempt/solve stats, audit feedback, reset attempt caps."""
 24
 25    def __init__(self, http: BaseHttpClient) -> None:
 26        self._http = http
 27
 28    def rescan(self) -> dict[str, Any]:
 29        """Reload the on-disk challenge catalog without a restart.
 30
 31        Drops the platform's process-lifetime spec cache, re-scans
 32        ``challenges_dir``, and fans the rescan out to every online
 33        worker node. Returns ``{total, added, removed, nodes}`` —
 34        the platform's post-rescan counts plus the per-node fan-out
 35        outcome. Wraps ``POST /admin/challenges/rescan``."""
 36        resp = self._http.request("POST", "/admin/challenges/rescan")
 37        _raise_for_status(resp)
 38        body: dict[str, Any] = resp.json()
 39        return body
 40
 41    def build(self, challenge_id: str) -> ChallengeBuildKickoffResponse:
 42        """Kick off a single-challenge pre-build on every online node.
 43
 44        Returns the per-node kickoff outcomes. Each node spawns a
 45        background thread; poll :meth:`build_state` to watch
 46        ``building`` → ``built`` / ``failed``. Wraps
 47        ``POST /admin/challenges/{id}/build``."""
 48        resp = self._http.request("POST", f"/admin/challenges/{challenge_id}/build")
 49        _raise_for_status(resp)
 50        return ChallengeBuildKickoffResponse.model_validate(resp.json())
 51
 52    def build_all(self) -> ChallengeBuildKickoffResponse:
 53        """Kick off a bulk pre-build on every online node.
 54
 55        Each node sequentially pre-builds every spec not already in
 56        ``built`` / ``building``. Returns per-node kickoff outcomes
 57        with ``queued`` / ``skipped_built`` / ``skipped_in_progress``
 58        lists. Wraps ``POST /admin/challenges/build-all``."""
 59        resp = self._http.request("POST", "/admin/challenges/build-all")
 60        _raise_for_status(resp)
 61        return ChallengeBuildKickoffResponse.model_validate(resp.json())
 62
 63    def build_state(self) -> ChallengeBuildStateResponse:
 64        """Aggregate per-challenge build state across every online node.
 65
 66        Returns one row per known challenge with an ``aggregated``
 67        worst-case status plus the per-node breakdown for the modal
 68        drill-down. Wraps ``GET /admin/challenges/build-state``."""
 69        resp = self._http.request("GET", "/admin/challenges/build-state")
 70        _raise_for_status(resp)
 71        return ChallengeBuildStateResponse.model_validate(resp.json())
 72
 73    def stats(self, offset: int = 0, limit: int = 50) -> list[AdminChallengeStatsRow]:
 74        """Per-challenge attempt / solve / first-blood counts."""
 75        resp = self._http.request(
 76            "GET",
 77            "/admin/challenges/stats",
 78            params={"offset": offset, "limit": limit},
 79        )
 80        _raise_for_status(resp)
 81        return [AdminChallengeStatsRow.model_validate(r) for r in resp.json()["items"]]
 82
 83    def timeseries(
 84        self,
 85        challenge_id: str,
 86        *,
 87        window: int = 86400,
 88        bucket: int = 3600,
 89    ) -> AdminChallengeTimeseries:
 90        """Per-challenge timeseries for the row-level drawer on
 91        ``/admin/challenges``."""
 92        resp = self._http.request(
 93            "GET",
 94            f"/admin/challenges/{challenge_id}/timeseries",
 95            params={"window": window, "bucket": bucket},
 96        )
 97        _raise_for_status(resp)
 98        return AdminChallengeTimeseries.model_validate(resp.json())
 99
100    def latency_histogram(self) -> list[AdminChallengeLatencyBucket]:
101        """Instance-start latency distribution across challenges (the
102        admin challenges latency chart)."""
103        resp = self._http.request("GET", "/admin/challenges/latency-histogram")
104        _raise_for_status(resp)
105        return [AdminChallengeLatencyBucket.model_validate(b) for b in resp.json()]
106
107    def feedback(self, challenge_id: str, *, competition_id: str = "") -> list[AdminFeedbackRow]:
108        """Admin audit list: every reaction row on a challenge with
109        denormalised user identity. Requires admin role."""
110        params: dict[str, Any] = {}
111        if competition_id:
112            params["competition_id"] = competition_id
113        resp = self._http.request(
114            "GET", f"/admin/challenges/{challenge_id}/feedback", params=params
115        )
116        _raise_for_status(resp)
117        return _extract_items(resp.json(), AdminFeedbackRow)
118
119    def reset_question_attempts(
120        self,
121        team_id: str,
122        challenge_id: str,
123        question_id: str,
124        *,
125        reason: str = "",
126    ) -> QuestionAttemptResetInfo:
127        """Reset the per-question wrong-attempt counter for one team.
128
129        Writes a fresh baseline timestamp; the cap counter ignores all
130        wrong submissions before the new baseline, so the team gets a
131        fresh batch of attempts on this question without any audit
132        rows being deleted. Returns the new baseline row.
133        Wraps ``POST /admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts``."""
134        resp = self._http.request(
135            "POST",
136            f"/admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts",
137            json={"reason": reason},
138        )
139        _raise_for_status(resp)
140        return QuestionAttemptResetInfo.model_validate(resp.json())

Operator challenge tooling: reload the catalog, pre-build on nodes, inspect attempt/solve stats, audit feedback, reset attempt caps.

AdminChallengesResource(http: ctfy.sdk.base.BaseHttpClient)
25    def __init__(self, http: BaseHttpClient) -> None:
26        self._http = http
def rescan(self) -> dict[str, typing.Any]:
28    def rescan(self) -> dict[str, Any]:
29        """Reload the on-disk challenge catalog without a restart.
30
31        Drops the platform's process-lifetime spec cache, re-scans
32        ``challenges_dir``, and fans the rescan out to every online
33        worker node. Returns ``{total, added, removed, nodes}`` —
34        the platform's post-rescan counts plus the per-node fan-out
35        outcome. Wraps ``POST /admin/challenges/rescan``."""
36        resp = self._http.request("POST", "/admin/challenges/rescan")
37        _raise_for_status(resp)
38        body: dict[str, Any] = resp.json()
39        return body

Reload the on-disk challenge catalog without a restart.

Drops the platform's process-lifetime spec cache, re-scans challenges_dir, and fans the rescan out to every online worker node. Returns {total, added, removed, nodes} — the platform's post-rescan counts plus the per-node fan-out outcome. Wraps POST /admin/challenges/rescan.

def build( self, challenge_id: str) -> ctfy.server.models.ChallengeBuildKickoffResponse:
41    def build(self, challenge_id: str) -> ChallengeBuildKickoffResponse:
42        """Kick off a single-challenge pre-build on every online node.
43
44        Returns the per-node kickoff outcomes. Each node spawns a
45        background thread; poll :meth:`build_state` to watch
46        ``building`` → ``built`` / ``failed``. Wraps
47        ``POST /admin/challenges/{id}/build``."""
48        resp = self._http.request("POST", f"/admin/challenges/{challenge_id}/build")
49        _raise_for_status(resp)
50        return ChallengeBuildKickoffResponse.model_validate(resp.json())

Kick off a single-challenge pre-build on every online node.

Returns the per-node kickoff outcomes. Each node spawns a background thread; poll build_state() to watch buildingbuilt / failed. Wraps POST /admin/challenges/{id}/build.

def build_all(self) -> ctfy.server.models.ChallengeBuildKickoffResponse:
52    def build_all(self) -> ChallengeBuildKickoffResponse:
53        """Kick off a bulk pre-build on every online node.
54
55        Each node sequentially pre-builds every spec not already in
56        ``built`` / ``building``. Returns per-node kickoff outcomes
57        with ``queued`` / ``skipped_built`` / ``skipped_in_progress``
58        lists. Wraps ``POST /admin/challenges/build-all``."""
59        resp = self._http.request("POST", "/admin/challenges/build-all")
60        _raise_for_status(resp)
61        return ChallengeBuildKickoffResponse.model_validate(resp.json())

Kick off a bulk pre-build on every online node.

Each node sequentially pre-builds every spec not already in built / building. Returns per-node kickoff outcomes with queued / skipped_built / skipped_in_progress lists. Wraps POST /admin/challenges/build-all.

def build_state(self) -> ctfy.server.models.ChallengeBuildStateResponse:
63    def build_state(self) -> ChallengeBuildStateResponse:
64        """Aggregate per-challenge build state across every online node.
65
66        Returns one row per known challenge with an ``aggregated``
67        worst-case status plus the per-node breakdown for the modal
68        drill-down. Wraps ``GET /admin/challenges/build-state``."""
69        resp = self._http.request("GET", "/admin/challenges/build-state")
70        _raise_for_status(resp)
71        return ChallengeBuildStateResponse.model_validate(resp.json())

Aggregate per-challenge build state across every online node.

Returns one row per known challenge with an aggregated worst-case status plus the per-node breakdown for the modal drill-down. Wraps GET /admin/challenges/build-state.

def stats( self, offset: int = 0, limit: int = 50) -> list[ctfy.server.models.AdminChallengeStatsRow]:
73    def stats(self, offset: int = 0, limit: int = 50) -> list[AdminChallengeStatsRow]:
74        """Per-challenge attempt / solve / first-blood counts."""
75        resp = self._http.request(
76            "GET",
77            "/admin/challenges/stats",
78            params={"offset": offset, "limit": limit},
79        )
80        _raise_for_status(resp)
81        return [AdminChallengeStatsRow.model_validate(r) for r in resp.json()["items"]]

Per-challenge attempt / solve / first-blood counts.

def timeseries( self, challenge_id: str, *, window: int = 86400, bucket: int = 3600) -> ctfy.server.models.AdminChallengeTimeseries:
83    def timeseries(
84        self,
85        challenge_id: str,
86        *,
87        window: int = 86400,
88        bucket: int = 3600,
89    ) -> AdminChallengeTimeseries:
90        """Per-challenge timeseries for the row-level drawer on
91        ``/admin/challenges``."""
92        resp = self._http.request(
93            "GET",
94            f"/admin/challenges/{challenge_id}/timeseries",
95            params={"window": window, "bucket": bucket},
96        )
97        _raise_for_status(resp)
98        return AdminChallengeTimeseries.model_validate(resp.json())

Per-challenge timeseries for the row-level drawer on /admin/challenges.

def latency_histogram(self) -> list[ctfy.server.models.AdminChallengeLatencyBucket]:
100    def latency_histogram(self) -> list[AdminChallengeLatencyBucket]:
101        """Instance-start latency distribution across challenges (the
102        admin challenges latency chart)."""
103        resp = self._http.request("GET", "/admin/challenges/latency-histogram")
104        _raise_for_status(resp)
105        return [AdminChallengeLatencyBucket.model_validate(b) for b in resp.json()]

Instance-start latency distribution across challenges (the admin challenges latency chart).

def feedback( self, challenge_id: str, *, competition_id: str = '') -> list[ctfy.server.models.AdminFeedbackRow]:
107    def feedback(self, challenge_id: str, *, competition_id: str = "") -> list[AdminFeedbackRow]:
108        """Admin audit list: every reaction row on a challenge with
109        denormalised user identity. Requires admin role."""
110        params: dict[str, Any] = {}
111        if competition_id:
112            params["competition_id"] = competition_id
113        resp = self._http.request(
114            "GET", f"/admin/challenges/{challenge_id}/feedback", params=params
115        )
116        _raise_for_status(resp)
117        return _extract_items(resp.json(), AdminFeedbackRow)

Admin audit list: every reaction row on a challenge with denormalised user identity. Requires admin role.

def reset_question_attempts( self, team_id: str, challenge_id: str, question_id: str, *, reason: str = '') -> ctfy.server.models.QuestionAttemptResetInfo:
119    def reset_question_attempts(
120        self,
121        team_id: str,
122        challenge_id: str,
123        question_id: str,
124        *,
125        reason: str = "",
126    ) -> QuestionAttemptResetInfo:
127        """Reset the per-question wrong-attempt counter for one team.
128
129        Writes a fresh baseline timestamp; the cap counter ignores all
130        wrong submissions before the new baseline, so the team gets a
131        fresh batch of attempts on this question without any audit
132        rows being deleted. Returns the new baseline row.
133        Wraps ``POST /admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts``."""
134        resp = self._http.request(
135            "POST",
136            f"/admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts",
137            json={"reason": reason},
138        )
139        _raise_for_status(resp)
140        return QuestionAttemptResetInfo.model_validate(resp.json())

Reset the per-question wrong-attempt counter for one team.

Writes a fresh baseline timestamp; the cap counter ignores all wrong submissions before the new baseline, so the team gets a fresh batch of attempts on this question without any audit rows being deleted. Returns the new baseline row. Wraps POST /admin/teams/{team_id}/questions/{challenge_id}/{question_id}/reset-attempts.