ctfy.sdk.admin_resources.tasks
client.admin.tasks + client.admin.scheduled_jobs — the
background-task system (admin).
1"""``client.admin.tasks`` + ``client.admin.scheduled_jobs`` — the 2background-task system (admin).""" 3 4from __future__ import annotations 5 6import builtins 7from typing import Any 8 9from ctfy.sdk._helpers import _raise_for_status 10from ctfy.sdk.base import BaseHttpClient 11from ctfy.server.models import ( 12 AdminTaskInfo, 13 AdminTaskListResponse, 14 AdminTaskLogsResponse, 15 ScheduledJobInfo, 16) 17 18 19class AdminTasksResource: 20 """One-shot background jobs: list / view / submit / cancel.""" 21 22 def __init__(self, http: BaseHttpClient) -> None: 23 self._http = http 24 25 def list( 26 self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50 27 ) -> AdminTaskListResponse: 28 """Paginated task list, newest first; filter by ``status`` / ``kind``.""" 29 resp = self._http.request( 30 "GET", 31 "/admin/tasks", 32 params={"status": status, "kind": kind, "offset": offset, "limit": limit}, 33 ) 34 _raise_for_status(resp) 35 return AdminTaskListResponse.model_validate(resp.json()) 36 37 def get(self, task_id: str) -> AdminTaskInfo: 38 resp = self._http.request("GET", f"/admin/tasks/{task_id}") 39 _raise_for_status(resp) 40 return AdminTaskInfo.model_validate(resp.json()) 41 42 def logs( 43 self, task_id: str, *, after: int = 0, limit: int = 1000 44 ) -> AdminTaskLogsResponse: 45 """Cursor page of a task's verbose logs (lines with ``seq > after``). 46 Poll with the returned ``next_after`` to tail a running task.""" 47 resp = self._http.request( 48 "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit} 49 ) 50 _raise_for_status(resp) 51 return AdminTaskLogsResponse.model_validate(resp.json()) 52 53 def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo: 54 """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` / 55 ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the 56 dispatcher runs it within a tick.""" 57 resp = self._http.request( 58 "POST", "/admin/tasks", json={"kind": kind, "params": params or {}} 59 ) 60 _raise_for_status(resp) 61 return AdminTaskInfo.model_validate(resp.json()) 62 63 def cancel(self, task_id: str) -> AdminTaskInfo: 64 """Cancel a task. Pending → cancelled immediately; running → a 65 cooperative-cancel flag the handler observes between node calls.""" 66 resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel") 67 _raise_for_status(resp) 68 return AdminTaskInfo.model_validate(resp.json()) 69 70 71class AdminScheduledJobsResource: 72 """Recurring jobs: list config + heartbeat, pause/resume/retune, run-now.""" 73 74 def __init__(self, http: BaseHttpClient) -> None: 75 self._http = http 76 77 def list(self) -> builtins.list[ScheduledJobInfo]: 78 resp = self._http.request("GET", "/admin/scheduled-jobs") 79 _raise_for_status(resp) 80 return [ScheduledJobInfo.model_validate(row) for row in resp.json()] 81 82 def update( 83 self, name: str, *, enabled: bool | None = None, interval_s: int | None = None 84 ) -> ScheduledJobInfo: 85 """Pause / resume (``enabled``) or re-tune the cadence 86 (``interval_s``, clamped server-side to [5, 86400]).""" 87 body: dict[str, Any] = {} 88 if enabled is not None: 89 body["enabled"] = enabled 90 if interval_s is not None: 91 body["interval_s"] = interval_s 92 resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body) 93 _raise_for_status(resp) 94 return ScheduledJobInfo.model_validate(resp.json()) 95 96 def run(self, name: str) -> AdminTaskInfo: 97 """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task.""" 98 resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run") 99 _raise_for_status(resp) 100 return AdminTaskInfo.model_validate(resp.json())
20class AdminTasksResource: 21 """One-shot background jobs: list / view / submit / cancel.""" 22 23 def __init__(self, http: BaseHttpClient) -> None: 24 self._http = http 25 26 def list( 27 self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50 28 ) -> AdminTaskListResponse: 29 """Paginated task list, newest first; filter by ``status`` / ``kind``.""" 30 resp = self._http.request( 31 "GET", 32 "/admin/tasks", 33 params={"status": status, "kind": kind, "offset": offset, "limit": limit}, 34 ) 35 _raise_for_status(resp) 36 return AdminTaskListResponse.model_validate(resp.json()) 37 38 def get(self, task_id: str) -> AdminTaskInfo: 39 resp = self._http.request("GET", f"/admin/tasks/{task_id}") 40 _raise_for_status(resp) 41 return AdminTaskInfo.model_validate(resp.json()) 42 43 def logs( 44 self, task_id: str, *, after: int = 0, limit: int = 1000 45 ) -> AdminTaskLogsResponse: 46 """Cursor page of a task's verbose logs (lines with ``seq > after``). 47 Poll with the returned ``next_after`` to tail a running task.""" 48 resp = self._http.request( 49 "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit} 50 ) 51 _raise_for_status(resp) 52 return AdminTaskLogsResponse.model_validate(resp.json()) 53 54 def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo: 55 """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` / 56 ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the 57 dispatcher runs it within a tick.""" 58 resp = self._http.request( 59 "POST", "/admin/tasks", json={"kind": kind, "params": params or {}} 60 ) 61 _raise_for_status(resp) 62 return AdminTaskInfo.model_validate(resp.json()) 63 64 def cancel(self, task_id: str) -> AdminTaskInfo: 65 """Cancel a task. Pending → cancelled immediately; running → a 66 cooperative-cancel flag the handler observes between node calls.""" 67 resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel") 68 _raise_for_status(resp) 69 return AdminTaskInfo.model_validate(resp.json())
One-shot background jobs: list / view / submit / cancel.
26 def list( 27 self, *, status: str = "", kind: str = "", offset: int = 0, limit: int = 50 28 ) -> AdminTaskListResponse: 29 """Paginated task list, newest first; filter by ``status`` / ``kind``.""" 30 resp = self._http.request( 31 "GET", 32 "/admin/tasks", 33 params={"status": status, "kind": kind, "offset": offset, "limit": limit}, 34 ) 35 _raise_for_status(resp) 36 return AdminTaskListResponse.model_validate(resp.json())
Paginated task list, newest first; filter by status / kind.
43 def logs( 44 self, task_id: str, *, after: int = 0, limit: int = 1000 45 ) -> AdminTaskLogsResponse: 46 """Cursor page of a task's verbose logs (lines with ``seq > after``). 47 Poll with the returned ``next_after`` to tail a running task.""" 48 resp = self._http.request( 49 "GET", f"/admin/tasks/{task_id}/logs", params={"after": after, "limit": limit} 50 ) 51 _raise_for_status(resp) 52 return AdminTaskLogsResponse.model_validate(resp.json())
Cursor page of a task's verbose logs (lines with seq > after).
Poll with the returned next_after to tail a running task.
54 def submit(self, kind: str, params: dict[str, Any] | None = None) -> AdminTaskInfo: 55 """Queue a one-shot task (``challenge_build_all`` / ``_pull_all`` / 56 ``_rescan`` / ``scheduled_job_run``). Returns the pending row; the 57 dispatcher runs it within a tick.""" 58 resp = self._http.request( 59 "POST", "/admin/tasks", json={"kind": kind, "params": params or {}} 60 ) 61 _raise_for_status(resp) 62 return AdminTaskInfo.model_validate(resp.json())
Queue a one-shot task (challenge_build_all / _pull_all /
_rescan / scheduled_job_run). Returns the pending row; the
dispatcher runs it within a tick.
64 def cancel(self, task_id: str) -> AdminTaskInfo: 65 """Cancel a task. Pending → cancelled immediately; running → a 66 cooperative-cancel flag the handler observes between node calls.""" 67 resp = self._http.request("POST", f"/admin/tasks/{task_id}/cancel") 68 _raise_for_status(resp) 69 return AdminTaskInfo.model_validate(resp.json())
Cancel a task. Pending → cancelled immediately; running → a cooperative-cancel flag the handler observes between node calls.
72class AdminScheduledJobsResource: 73 """Recurring jobs: list config + heartbeat, pause/resume/retune, run-now.""" 74 75 def __init__(self, http: BaseHttpClient) -> None: 76 self._http = http 77 78 def list(self) -> builtins.list[ScheduledJobInfo]: 79 resp = self._http.request("GET", "/admin/scheduled-jobs") 80 _raise_for_status(resp) 81 return [ScheduledJobInfo.model_validate(row) for row in resp.json()] 82 83 def update( 84 self, name: str, *, enabled: bool | None = None, interval_s: int | None = None 85 ) -> ScheduledJobInfo: 86 """Pause / resume (``enabled``) or re-tune the cadence 87 (``interval_s``, clamped server-side to [5, 86400]).""" 88 body: dict[str, Any] = {} 89 if enabled is not None: 90 body["enabled"] = enabled 91 if interval_s is not None: 92 body["interval_s"] = interval_s 93 resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body) 94 _raise_for_status(resp) 95 return ScheduledJobInfo.model_validate(resp.json()) 96 97 def run(self, name: str) -> AdminTaskInfo: 98 """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task.""" 99 resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run") 100 _raise_for_status(resp) 101 return AdminTaskInfo.model_validate(resp.json())
Recurring jobs: list config + heartbeat, pause/resume/retune, run-now.
83 def update( 84 self, name: str, *, enabled: bool | None = None, interval_s: int | None = None 85 ) -> ScheduledJobInfo: 86 """Pause / resume (``enabled``) or re-tune the cadence 87 (``interval_s``, clamped server-side to [5, 86400]).""" 88 body: dict[str, Any] = {} 89 if enabled is not None: 90 body["enabled"] = enabled 91 if interval_s is not None: 92 body["interval_s"] = interval_s 93 resp = self._http.request("PATCH", f"/admin/scheduled-jobs/{name}", json=body) 94 _raise_for_status(resp) 95 return ScheduledJobInfo.model_validate(resp.json())
Pause / resume (enabled) or re-tune the cadence
(interval_s, clamped server-side to [5, 86400]).
97 def run(self, name: str) -> AdminTaskInfo: 98 """Trigger an ad-hoc run now — submits a ``scheduled_job_run`` task.""" 99 resp = self._http.request("POST", f"/admin/scheduled-jobs/{name}/run") 100 _raise_for_status(resp) 101 return AdminTaskInfo.model_validate(resp.json())
Trigger an ad-hoc run now — submits a scheduled_job_run task.