ctfy.sdk.resources.auth

client.auth — password auth, OAuth identities, API tokens, discovery.

  1"""``client.auth`` — password auth, OAuth identities, API tokens, discovery."""
  2
  3from __future__ import annotations
  4
  5from ctfy.sdk._helpers import _raise_for_status
  6from ctfy.sdk.base import BaseHttpClient
  7from ctfy.server.models import (
  8    AuthTokenResponse,
  9    CreateFineGrainedTokenResponse,
 10    DeviceCodeResponse,
 11    DeviceTokenResponse,
 12    LinkedIdentity,
 13    LinkStartResponse,
 14    ProvidersResponse,
 15    TokenInfo,
 16    TokenScopesResponse,
 17)
 18
 19
 20class AuthResource:
 21    """Register / login, identity binding, token CRUD, sign-in discovery."""
 22
 23    def __init__(self, http: BaseHttpClient) -> None:
 24        self._http = http
 25
 26    def register(
 27        self,
 28        email: str,
 29        password: str,
 30        display_name: str = "",
 31    ) -> AuthTokenResponse:
 32        """Register via email + password. Returns a fresh user-kind token.
 33
 34        First-user bootstrap: when the backend has zero users, the new
 35        user is granted ``role="super_admin"``.
 36        """
 37        resp = self._http.request(
 38            "POST",
 39            "/auth/register",
 40            json={"email": email, "password": password, "display_name": display_name},
 41        )
 42        _raise_for_status(resp)
 43        return AuthTokenResponse.model_validate(resp.json())
 44
 45    def login(self, email: str, password: str) -> AuthTokenResponse:
 46        """Sign in with email + password. Returns a fresh user-kind token."""
 47        resp = self._http.request(
 48            "POST", "/auth/login", json={"email": email, "password": password}
 49        )
 50        _raise_for_status(resp)
 51        return AuthTokenResponse.model_validate(resp.json())
 52
 53    def device_code(self) -> DeviceCodeResponse:
 54        """Start a device-authorization flow (unauthenticated, RFC 8628).
 55
 56        Returns a short ``user_code`` + a ``verification_uri`` the user
 57        opens in a browser, plus the opaque ``device_code`` to poll with.
 58        """
 59        resp = self._http.request("POST", "/auth/device/code")
 60        _raise_for_status(resp)
 61        return DeviceCodeResponse.model_validate(resp.json())
 62
 63    def device_token(self, device_code: str) -> DeviceTokenResponse:
 64        """Poll for the token after the user approves (unauthenticated).
 65
 66        ``status`` is ``pending`` / ``approved`` / ``denied`` /
 67        ``expired``; ``token`` is set only when ``approved``.
 68        """
 69        resp = self._http.request(
 70            "POST", "/auth/device/token", json={"device_code": device_code}
 71        )
 72        _raise_for_status(resp)
 73        return DeviceTokenResponse.model_validate(resp.json())
 74
 75    def set_password(self, new_password: str, current_password: str = "") -> None:
 76        """Set or change the caller's password.
 77
 78        ``current_password`` is required iff the caller already has a
 79        password identity; absent on the first-time "attach password to
 80        OAuth-only account" flow.
 81        """
 82        resp = self._http.request(
 83            "POST",
 84            "/auth/password",
 85            json={"new_password": new_password, "current_password": current_password},
 86        )
 87        _raise_for_status(resp)
 88
 89    def delete_password(self) -> None:
 90        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
 91        resp = self._http.request("DELETE", "/auth/password")
 92        _raise_for_status(resp)
 93
 94    def list_identities(self) -> list[LinkedIdentity]:
 95        """Bound OAuth + password identities for the calling user."""
 96        resp = self._http.request("GET", "/auth/identities")
 97        _raise_for_status(resp)
 98        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
 99
100    def unbind_identity(self, identity_id: str) -> None:
101        """Unbind one OAuth identity. Server blocks unlinking the last one."""
102        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
103        _raise_for_status(resp)
104
105    def start_oauth_link(self, provider: str) -> LinkStartResponse:
106        """Begin a "link a second OAuth identity to my account" flow.
107
108        Returns the platform's signed-state authorize URL — the actual
109        flow finishes in a browser (the user opens the URL and approves
110        linking the provider to their existing account).
111        """
112        resp = self._http.request("POST", f"/auth/identities/link/{provider}")
113        _raise_for_status(resp)
114        return LinkStartResponse.model_validate(resp.json())
115
116    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
117        """List caller's tokens (user + agent). Plaintext never returned."""
118        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
119        _raise_for_status(resp)
120        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
121
122    def mint_token(
123        self,
124        label: str,
125        expires_in_days: int | None = None,
126        *,
127        competition_access: str | None = None,
128        competition_ids: list[str] | None = None,
129        permissions: dict[str, str] | None = None,
130    ) -> CreateFineGrainedTokenResponse:
131        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
132
133        With no scope args the server mints the broad legacy profile
134        (every competition, full participate access) so existing
135        callers keep working unchanged. To scope it GitHub-style pass
136        ``competition_access`` (``all`` / ``selected`` / ``none``),
137        ``competition_ids`` (when ``selected``) and ``permissions``
138        (category → ``none`` / ``read`` / ``write``).
139
140        ``expires_in_days`` defaults server-side when ``None``.
141        """
142        body: dict[str, object] = {"label": label}
143        if expires_in_days is not None:
144            body["expires_in_days"] = expires_in_days
145        if competition_access is not None:
146            body["competition_access"] = competition_access
147        if competition_ids is not None:
148            body["competition_ids"] = competition_ids
149        if permissions is not None:
150            body["permissions"] = permissions
151        resp = self._http.request("POST", "/auth/tokens", json=body)
152        _raise_for_status(resp)
153        return CreateFineGrainedTokenResponse.model_validate(resp.json())
154
155    def revoke_token(self, token_id: str) -> None:
156        """Revoke one token by id."""
157        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
158        _raise_for_status(resp)
159
160    def revoke_other_tokens(self) -> None:
161        """Revoke every token belonging to this user except the caller's own."""
162        resp = self._http.request("DELETE", "/auth/tokens/others")
163        _raise_for_status(resp)
164
165    def providers(self) -> ProvidersResponse:
166        """Which sign-in methods this deployment offers (OAuth providers
167        configured + whether password auth is enabled)."""
168        resp = self._http.request("GET", "/auth/providers")
169        _raise_for_status(resp)
170        return ProvidersResponse.model_validate(resp.json())
171
172    def token_scopes(self) -> TokenScopesResponse:
173        """The catalog of grantable fine-grained token scopes — drives
174        the token-create wizard."""
175        resp = self._http.request("GET", "/auth/tokens/scopes")
176        _raise_for_status(resp)
177        return TokenScopesResponse.model_validate(resp.json())
class AuthResource:
 21class AuthResource:
 22    """Register / login, identity binding, token CRUD, sign-in discovery."""
 23
 24    def __init__(self, http: BaseHttpClient) -> None:
 25        self._http = http
 26
 27    def register(
 28        self,
 29        email: str,
 30        password: str,
 31        display_name: str = "",
 32    ) -> AuthTokenResponse:
 33        """Register via email + password. Returns a fresh user-kind token.
 34
 35        First-user bootstrap: when the backend has zero users, the new
 36        user is granted ``role="super_admin"``.
 37        """
 38        resp = self._http.request(
 39            "POST",
 40            "/auth/register",
 41            json={"email": email, "password": password, "display_name": display_name},
 42        )
 43        _raise_for_status(resp)
 44        return AuthTokenResponse.model_validate(resp.json())
 45
 46    def login(self, email: str, password: str) -> AuthTokenResponse:
 47        """Sign in with email + password. Returns a fresh user-kind token."""
 48        resp = self._http.request(
 49            "POST", "/auth/login", json={"email": email, "password": password}
 50        )
 51        _raise_for_status(resp)
 52        return AuthTokenResponse.model_validate(resp.json())
 53
 54    def device_code(self) -> DeviceCodeResponse:
 55        """Start a device-authorization flow (unauthenticated, RFC 8628).
 56
 57        Returns a short ``user_code`` + a ``verification_uri`` the user
 58        opens in a browser, plus the opaque ``device_code`` to poll with.
 59        """
 60        resp = self._http.request("POST", "/auth/device/code")
 61        _raise_for_status(resp)
 62        return DeviceCodeResponse.model_validate(resp.json())
 63
 64    def device_token(self, device_code: str) -> DeviceTokenResponse:
 65        """Poll for the token after the user approves (unauthenticated).
 66
 67        ``status`` is ``pending`` / ``approved`` / ``denied`` /
 68        ``expired``; ``token`` is set only when ``approved``.
 69        """
 70        resp = self._http.request(
 71            "POST", "/auth/device/token", json={"device_code": device_code}
 72        )
 73        _raise_for_status(resp)
 74        return DeviceTokenResponse.model_validate(resp.json())
 75
 76    def set_password(self, new_password: str, current_password: str = "") -> None:
 77        """Set or change the caller's password.
 78
 79        ``current_password`` is required iff the caller already has a
 80        password identity; absent on the first-time "attach password to
 81        OAuth-only account" flow.
 82        """
 83        resp = self._http.request(
 84            "POST",
 85            "/auth/password",
 86            json={"new_password": new_password, "current_password": current_password},
 87        )
 88        _raise_for_status(resp)
 89
 90    def delete_password(self) -> None:
 91        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
 92        resp = self._http.request("DELETE", "/auth/password")
 93        _raise_for_status(resp)
 94
 95    def list_identities(self) -> list[LinkedIdentity]:
 96        """Bound OAuth + password identities for the calling user."""
 97        resp = self._http.request("GET", "/auth/identities")
 98        _raise_for_status(resp)
 99        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
100
101    def unbind_identity(self, identity_id: str) -> None:
102        """Unbind one OAuth identity. Server blocks unlinking the last one."""
103        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
104        _raise_for_status(resp)
105
106    def start_oauth_link(self, provider: str) -> LinkStartResponse:
107        """Begin a "link a second OAuth identity to my account" flow.
108
109        Returns the platform's signed-state authorize URL — the actual
110        flow finishes in a browser (the user opens the URL and approves
111        linking the provider to their existing account).
112        """
113        resp = self._http.request("POST", f"/auth/identities/link/{provider}")
114        _raise_for_status(resp)
115        return LinkStartResponse.model_validate(resp.json())
116
117    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
118        """List caller's tokens (user + agent). Plaintext never returned."""
119        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
120        _raise_for_status(resp)
121        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
122
123    def mint_token(
124        self,
125        label: str,
126        expires_in_days: int | None = None,
127        *,
128        competition_access: str | None = None,
129        competition_ids: list[str] | None = None,
130        permissions: dict[str, str] | None = None,
131    ) -> CreateFineGrainedTokenResponse:
132        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
133
134        With no scope args the server mints the broad legacy profile
135        (every competition, full participate access) so existing
136        callers keep working unchanged. To scope it GitHub-style pass
137        ``competition_access`` (``all`` / ``selected`` / ``none``),
138        ``competition_ids`` (when ``selected``) and ``permissions``
139        (category → ``none`` / ``read`` / ``write``).
140
141        ``expires_in_days`` defaults server-side when ``None``.
142        """
143        body: dict[str, object] = {"label": label}
144        if expires_in_days is not None:
145            body["expires_in_days"] = expires_in_days
146        if competition_access is not None:
147            body["competition_access"] = competition_access
148        if competition_ids is not None:
149            body["competition_ids"] = competition_ids
150        if permissions is not None:
151            body["permissions"] = permissions
152        resp = self._http.request("POST", "/auth/tokens", json=body)
153        _raise_for_status(resp)
154        return CreateFineGrainedTokenResponse.model_validate(resp.json())
155
156    def revoke_token(self, token_id: str) -> None:
157        """Revoke one token by id."""
158        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
159        _raise_for_status(resp)
160
161    def revoke_other_tokens(self) -> None:
162        """Revoke every token belonging to this user except the caller's own."""
163        resp = self._http.request("DELETE", "/auth/tokens/others")
164        _raise_for_status(resp)
165
166    def providers(self) -> ProvidersResponse:
167        """Which sign-in methods this deployment offers (OAuth providers
168        configured + whether password auth is enabled)."""
169        resp = self._http.request("GET", "/auth/providers")
170        _raise_for_status(resp)
171        return ProvidersResponse.model_validate(resp.json())
172
173    def token_scopes(self) -> TokenScopesResponse:
174        """The catalog of grantable fine-grained token scopes — drives
175        the token-create wizard."""
176        resp = self._http.request("GET", "/auth/tokens/scopes")
177        _raise_for_status(resp)
178        return TokenScopesResponse.model_validate(resp.json())

Register / login, identity binding, token CRUD, sign-in discovery.

AuthResource(http: ctfy.sdk.base.BaseHttpClient)
24    def __init__(self, http: BaseHttpClient) -> None:
25        self._http = http
def register( self, email: str, password: str, display_name: str = '') -> ctfy.server.models.AuthTokenResponse:
27    def register(
28        self,
29        email: str,
30        password: str,
31        display_name: str = "",
32    ) -> AuthTokenResponse:
33        """Register via email + password. Returns a fresh user-kind token.
34
35        First-user bootstrap: when the backend has zero users, the new
36        user is granted ``role="super_admin"``.
37        """
38        resp = self._http.request(
39            "POST",
40            "/auth/register",
41            json={"email": email, "password": password, "display_name": display_name},
42        )
43        _raise_for_status(resp)
44        return AuthTokenResponse.model_validate(resp.json())

Register via email + password. Returns a fresh user-kind token.

First-user bootstrap: when the backend has zero users, the new user is granted role="super_admin".

def login( self, email: str, password: str) -> ctfy.server.models.AuthTokenResponse:
46    def login(self, email: str, password: str) -> AuthTokenResponse:
47        """Sign in with email + password. Returns a fresh user-kind token."""
48        resp = self._http.request(
49            "POST", "/auth/login", json={"email": email, "password": password}
50        )
51        _raise_for_status(resp)
52        return AuthTokenResponse.model_validate(resp.json())

Sign in with email + password. Returns a fresh user-kind token.

def device_code(self) -> ctfy.server.models.DeviceCodeResponse:
54    def device_code(self) -> DeviceCodeResponse:
55        """Start a device-authorization flow (unauthenticated, RFC 8628).
56
57        Returns a short ``user_code`` + a ``verification_uri`` the user
58        opens in a browser, plus the opaque ``device_code`` to poll with.
59        """
60        resp = self._http.request("POST", "/auth/device/code")
61        _raise_for_status(resp)
62        return DeviceCodeResponse.model_validate(resp.json())

Start a device-authorization flow (unauthenticated, RFC 8628).

Returns a short user_code + a verification_uri the user opens in a browser, plus the opaque device_code to poll with.

def device_token(self, device_code: str) -> ctfy.server.models.DeviceTokenResponse:
64    def device_token(self, device_code: str) -> DeviceTokenResponse:
65        """Poll for the token after the user approves (unauthenticated).
66
67        ``status`` is ``pending`` / ``approved`` / ``denied`` /
68        ``expired``; ``token`` is set only when ``approved``.
69        """
70        resp = self._http.request(
71            "POST", "/auth/device/token", json={"device_code": device_code}
72        )
73        _raise_for_status(resp)
74        return DeviceTokenResponse.model_validate(resp.json())

Poll for the token after the user approves (unauthenticated).

status is pending / approved / denied / expired; token is set only when approved.

def set_password(self, new_password: str, current_password: str = '') -> None:
76    def set_password(self, new_password: str, current_password: str = "") -> None:
77        """Set or change the caller's password.
78
79        ``current_password`` is required iff the caller already has a
80        password identity; absent on the first-time "attach password to
81        OAuth-only account" flow.
82        """
83        resp = self._http.request(
84            "POST",
85            "/auth/password",
86            json={"new_password": new_password, "current_password": current_password},
87        )
88        _raise_for_status(resp)

Set or change the caller's password.

current_password is required iff the caller already has a password identity; absent on the first-time "attach password to OAuth-only account" flow.

def delete_password(self) -> None:
90    def delete_password(self) -> None:
91        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
92        resp = self._http.request("DELETE", "/auth/password")
93        _raise_for_status(resp)

Drop the caller's password identity. Requires ≥1 OAuth identity.

def list_identities(self) -> list[ctfy.server.models.LinkedIdentity]:
95    def list_identities(self) -> list[LinkedIdentity]:
96        """Bound OAuth + password identities for the calling user."""
97        resp = self._http.request("GET", "/auth/identities")
98        _raise_for_status(resp)
99        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]

Bound OAuth + password identities for the calling user.

def unbind_identity(self, identity_id: str) -> None:
101    def unbind_identity(self, identity_id: str) -> None:
102        """Unbind one OAuth identity. Server blocks unlinking the last one."""
103        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
104        _raise_for_status(resp)

Unbind one OAuth identity. Server blocks unlinking the last one.

def list_tokens( self, offset: int = 0, limit: int = 50) -> list[ctfy.server.models.TokenInfo]:
117    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
118        """List caller's tokens (user + agent). Plaintext never returned."""
119        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
120        _raise_for_status(resp)
121        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]

List caller's tokens (user + agent). Plaintext never returned.

def mint_token( self, label: str, expires_in_days: int | None = None, *, competition_access: str | None = None, competition_ids: list[str] | None = None, permissions: dict[str, str] | None = None) -> ctfy.server.models.CreateFineGrainedTokenResponse:
123    def mint_token(
124        self,
125        label: str,
126        expires_in_days: int | None = None,
127        *,
128        competition_access: str | None = None,
129        competition_ids: list[str] | None = None,
130        permissions: dict[str, str] | None = None,
131    ) -> CreateFineGrainedTokenResponse:
132        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
133
134        With no scope args the server mints the broad legacy profile
135        (every competition, full participate access) so existing
136        callers keep working unchanged. To scope it GitHub-style pass
137        ``competition_access`` (``all`` / ``selected`` / ``none``),
138        ``competition_ids`` (when ``selected``) and ``permissions``
139        (category → ``none`` / ``read`` / ``write``).
140
141        ``expires_in_days`` defaults server-side when ``None``.
142        """
143        body: dict[str, object] = {"label": label}
144        if expires_in_days is not None:
145            body["expires_in_days"] = expires_in_days
146        if competition_access is not None:
147            body["competition_access"] = competition_access
148        if competition_ids is not None:
149            body["competition_ids"] = competition_ids
150        if permissions is not None:
151            body["permissions"] = permissions
152        resp = self._http.request("POST", "/auth/tokens", json=body)
153        _raise_for_status(resp)
154        return CreateFineGrainedTokenResponse.model_validate(resp.json())

Mint a fresh fine-grained token (pf_*). Plaintext ONCE.

With no scope args the server mints the broad legacy profile (every competition, full participate access) so existing callers keep working unchanged. To scope it GitHub-style pass competition_access (all / selected / none), competition_ids (when selected) and permissions (category → none / read / write).

expires_in_days defaults server-side when None.

def revoke_token(self, token_id: str) -> None:
156    def revoke_token(self, token_id: str) -> None:
157        """Revoke one token by id."""
158        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
159        _raise_for_status(resp)

Revoke one token by id.

def revoke_other_tokens(self) -> None:
161    def revoke_other_tokens(self) -> None:
162        """Revoke every token belonging to this user except the caller's own."""
163        resp = self._http.request("DELETE", "/auth/tokens/others")
164        _raise_for_status(resp)

Revoke every token belonging to this user except the caller's own.

def providers(self) -> ctfy.server.models.ProvidersResponse:
166    def providers(self) -> ProvidersResponse:
167        """Which sign-in methods this deployment offers (OAuth providers
168        configured + whether password auth is enabled)."""
169        resp = self._http.request("GET", "/auth/providers")
170        _raise_for_status(resp)
171        return ProvidersResponse.model_validate(resp.json())

Which sign-in methods this deployment offers (OAuth providers configured + whether password auth is enabled).

def token_scopes(self) -> ctfy.server.models.TokenScopesResponse:
173    def token_scopes(self) -> TokenScopesResponse:
174        """The catalog of grantable fine-grained token scopes — drives
175        the token-create wizard."""
176        resp = self._http.request("GET", "/auth/tokens/scopes")
177        _raise_for_status(resp)
178        return TokenScopesResponse.model_validate(resp.json())

The catalog of grantable fine-grained token scopes — drives the token-create wizard.