ctfy.sdk.admin_resources.tasks

client.admin.tasks + client.admin.scheduled_jobs — the background-task system (admin).

  1"""``client.admin.tasks`` + ``client.admin.scheduled_jobs`` — the
  2background-task system (admin)."""
  3
  4from __future__ import annotations
  5
  6import builtins
  7from typing import Any
  8
  9from ctfy.sdk._helpers import _raise_for_status
 10from ctfy.sdk.base import BaseHttpClient
 11from ctfy.server.models import (
 12    AdminTaskInfo,
 13    AdminTaskListResponse,
 14    AdminTaskLogsResponse,
 15    ScheduledJobInfo,
 16)
 17
 18
 19class AdminTasksResource:
 20    """One-shot background jobs: list / view / submit / cancel."""
 21
 22    def __init__(self, http: BaseHttpClient) -> None:
 23        self._http = http
 24
 25    def list(
 26        self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50
 27    ) -> AdminTaskListResponse:
 28        """Paginated task list, newest first; filter by ``status`` / ``kind``."""
 29        resp = self._http.request(
 30            "GET",
 31            "/admin/tasks",
 32            params={"status": status, "kind": kind, "offset": offset, "limit": limit},
 33        )
 34        _raise_for_status(resp)
 35        return AdminTaskListResponse.model_validate(resp.json())
 36
 37    def get(self, task_id: str) -> AdminTaskInfo:
 38        resp = self._http.request("GET", f"/admin/tasks/{task_id}")
 39        _raise_for_status(resp)
 40        return AdminTaskInfo.model_validate(resp.json())
 41
 42    def logs(
 43        self, task_id: str, *, after: int = 0, limit: int = 1000
 44    ) -> AdminTaskLogsResponse:
 45        """Cursor page of a task's verbose logs (lines with ``seq > after``).
 46        Poll with the returned ``next_after`` to tail a running task."""
 47        resp = self._http.request(
 48            "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit}
 49        )
 50        _raise_for_status(resp)
 51        return AdminTaskLogsResponse.model_validate(resp.json())
 52
 53    def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo:
 54        """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` /
 55        ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the
 56        dispatcher runs it within a tick."""
 57        resp = self._http.request(
 58            "POST", "/admin/tasks", json={"kind": kind, "params": params or {}}
 59        )
 60        _raise_for_status(resp)
 61        return AdminTaskInfo.model_validate(resp.json())
 62
 63    def cancel(self, task_id: str) -> AdminTaskInfo:
 64        """Cancel a task. Pending → cancelled immediately; running → a
 65        cooperative-cancel flag the handler observes between node calls."""
 66        resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel")
 67        _raise_for_status(resp)
 68        return AdminTaskInfo.model_validate(resp.json())
 69
 70
 71class AdminScheduledJobsResource:
 72    """Recurring jobs: list config + heartbeat, pause/resume/retune, run-now."""
 73
 74    def __init__(self, http: BaseHttpClient) -> None:
 75        self._http = http
 76
 77    def list(self) -> builtins.list[ScheduledJobInfo]:
 78        resp = self._http.request("GET", "/admin/scheduled-jobs")
 79        _raise_for_status(resp)
 80        return [ScheduledJobInfo.model_validate(row) for row in resp.json()]
 81
 82    def update(
 83        self, name: str, *, enabled: bool | None = None, interval_s: int | None = None
 84    ) -> ScheduledJobInfo:
 85        """Pause / resume (``enabled``) or re-tune the cadence
 86        (``interval_s``, clamped server-side to [5, 86400])."""
 87        body: dict[str, Any] = {}
 88        if enabled is not None:
 89            body["enabled"] = enabled
 90        if interval_s is not None:
 91            body["interval_s"] = interval_s
 92        resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body)
 93        _raise_for_status(resp)
 94        return ScheduledJobInfo.model_validate(resp.json())
 95
 96    def run(self, name: str) -> AdminTaskInfo:
 97        """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task."""
 98        resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run")
 99        _raise_for_status(resp)
100        return AdminTaskInfo.model_validate(resp.json())
class AdminTasksResource:
20class AdminTasksResource:
21    """One-shot background jobs: list / view / submit / cancel."""
22
23    def __init__(self, http: BaseHttpClient) -> None:
24        self._http = http
25
26    def list(
27        self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50
28    ) -> AdminTaskListResponse:
29        """Paginated task list, newest first; filter by ``status`` / ``kind``."""
30        resp = self._http.request(
31            "GET",
32            "/admin/tasks",
33            params={"status": status, "kind": kind, "offset": offset, "limit": limit},
34        )
35        _raise_for_status(resp)
36        return AdminTaskListResponse.model_validate(resp.json())
37
38    def get(self, task_id: str) -> AdminTaskInfo:
39        resp = self._http.request("GET", f"/admin/tasks/{task_id}")
40        _raise_for_status(resp)
41        return AdminTaskInfo.model_validate(resp.json())
42
43    def logs(
44        self, task_id: str, *, after: int = 0, limit: int = 1000
45    ) -> AdminTaskLogsResponse:
46        """Cursor page of a task's verbose logs (lines with ``seq > after``).
47        Poll with the returned ``next_after`` to tail a running task."""
48        resp = self._http.request(
49            "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit}
50        )
51        _raise_for_status(resp)
52        return AdminTaskLogsResponse.model_validate(resp.json())
53
54    def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo:
55        """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` /
56        ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the
57        dispatcher runs it within a tick."""
58        resp = self._http.request(
59            "POST", "/admin/tasks", json={"kind": kind, "params": params or {}}
60        )
61        _raise_for_status(resp)
62        return AdminTaskInfo.model_validate(resp.json())
63
64    def cancel(self, task_id: str) -> AdminTaskInfo:
65        """Cancel a task. Pending → cancelled immediately; running → a
66        cooperative-cancel flag the handler observes between node calls."""
67        resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel")
68        _raise_for_status(resp)
69        return AdminTaskInfo.model_validate(resp.json())

One-shot background jobs: list / view / submit / cancel.

AdminTasksResource(http: ctfy.sdk.base.BaseHttpClient)
23    def __init__(self, http: BaseHttpClient) -> None:
24        self._http = http
def list( self, *, status: str = '', kind: str = '', offset: int = 0, limit: int = 50) -> ctfy.server.models.AdminTaskListResponse:
26    def list(
27        self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50
28    ) -> AdminTaskListResponse:
29        """Paginated task list, newest first; filter by ``status`` / ``kind``."""
30        resp = self._http.request(
31            "GET",
32            "/admin/tasks",
33            params={"status": status, "kind": kind, "offset": offset, "limit": limit},
34        )
35        _raise_for_status(resp)
36        return AdminTaskListResponse.model_validate(resp.json())

Paginated task list, newest first; filter by status / kind.

def get(self, task_id: str) -> ctfy.server.models.AdminTaskInfo:
38    def get(self, task_id: str) -> AdminTaskInfo:
39        resp = self._http.request("GET", f"/admin/tasks/{task_id}")
40        _raise_for_status(resp)
41        return AdminTaskInfo.model_validate(resp.json())
def logs( self, task_id: str, *, after: int = 0, limit: int = 1000) -> ctfy.server.models.AdminTaskLogsResponse:
43    def logs(
44        self, task_id: str, *, after: int = 0, limit: int = 1000
45    ) -> AdminTaskLogsResponse:
46        """Cursor page of a task's verbose logs (lines with ``seq > after``).
47        Poll with the returned ``next_after`` to tail a running task."""
48        resp = self._http.request(
49            "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit}
50        )
51        _raise_for_status(resp)
52        return AdminTaskLogsResponse.model_validate(resp.json())

Cursor page of a task's verbose logs (lines with seq > after). Poll with the returned next_after to tail a running task.

def submit( self, kind: str, params: dict[str, typing.Any] | None = None) -> ctfy.server.models.AdminTaskInfo:
54    def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo:
55        """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` /
56        ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the
57        dispatcher runs it within a tick."""
58        resp = self._http.request(
59            "POST", "/admin/tasks", json={"kind": kind, "params": params or {}}
60        )
61        _raise_for_status(resp)
62        return AdminTaskInfo.model_validate(resp.json())

Queue a one-shot task (challenge_build_all / _pull_all / _rescan / scheduled_job_run). Returns the pending row; the dispatcher runs it within a tick.

def cancel(self, task_id: str) -> ctfy.server.models.AdminTaskInfo:
64    def cancel(self, task_id: str) -> AdminTaskInfo:
65        """Cancel a task. Pending → cancelled immediately; running → a
66        cooperative-cancel flag the handler observes between node calls."""
67        resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel")
68        _raise_for_status(resp)
69        return AdminTaskInfo.model_validate(resp.json())

Cancel a task. Pending → cancelled immediately; running → a cooperative-cancel flag the handler observes between node calls.

class AdminScheduledJobsResource:
 72class AdminScheduledJobsResource:
 73    """Recurring jobs: list config + heartbeat, pause/resume/retune, run-now."""
 74
 75    def __init__(self, http: BaseHttpClient) -> None:
 76        self._http = http
 77
 78    def list(self) -> builtins.list[ScheduledJobInfo]:
 79        resp = self._http.request("GET", "/admin/scheduled-jobs")
 80        _raise_for_status(resp)
 81        return [ScheduledJobInfo.model_validate(row) for row in resp.json()]
 82
 83    def update(
 84        self, name: str, *, enabled: bool | None = None, interval_s: int | None = None
 85    ) -> ScheduledJobInfo:
 86        """Pause / resume (``enabled``) or re-tune the cadence
 87        (``interval_s``, clamped server-side to [5, 86400])."""
 88        body: dict[str, Any] = {}
 89        if enabled is not None:
 90            body["enabled"] = enabled
 91        if interval_s is not None:
 92            body["interval_s"] = interval_s
 93        resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body)
 94        _raise_for_status(resp)
 95        return ScheduledJobInfo.model_validate(resp.json())
 96
 97    def run(self, name: str) -> AdminTaskInfo:
 98        """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task."""
 99        resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run")
100        _raise_for_status(resp)
101        return AdminTaskInfo.model_validate(resp.json())

Recurring jobs: list config + heartbeat, pause/resume/retune, run-now.

AdminScheduledJobsResource(http: ctfy.sdk.base.BaseHttpClient)
75    def __init__(self, http: BaseHttpClient) -> None:
76        self._http = http
def list(self) -> list[ctfy.server.models.ScheduledJobInfo]:
78    def list(self) -> builtins.list[ScheduledJobInfo]:
79        resp = self._http.request("GET", "/admin/scheduled-jobs")
80        _raise_for_status(resp)
81        return [ScheduledJobInfo.model_validate(row) for row in resp.json()]
def update( self, name: str, *, enabled: bool | None = None, interval_s: int | None = None) -> ctfy.server.models.ScheduledJobInfo:
83    def update(
84        self, name: str, *, enabled: bool | None = None, interval_s: int | None = None
85    ) -> ScheduledJobInfo:
86        """Pause / resume (``enabled``) or re-tune the cadence
87        (``interval_s``, clamped server-side to [5, 86400])."""
88        body: dict[str, Any] = {}
89        if enabled is not None:
90            body["enabled"] = enabled
91        if interval_s is not None:
92            body["interval_s"] = interval_s
93        resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body)
94        _raise_for_status(resp)
95        return ScheduledJobInfo.model_validate(resp.json())

Pause / resume (enabled) or re-tune the cadence (interval_s, clamped server-side to [5, 86400]).

def run(self, name: str) -> ctfy.server.models.AdminTaskInfo:
 97    def run(self, name: str) -> AdminTaskInfo:
 98        """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task."""
 99        resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run")
100        _raise_for_status(resp)
101        return AdminTaskInfo.model_validate(resp.json())

Trigger an ad-hoc run now — submits a scheduled_job_run task.