ctfy.sdk.resources.auth

client.auth — password auth, OAuth identities, API tokens, discovery.

  1"""``client.auth`` — password auth, OAuth identities, API tokens, discovery."""
  2
  3from __future__ import annotations
  4
  5from ctfy.sdk._helpers import _raise_for_status
  6from ctfy.sdk.base import BaseHttpClient
  7from ctfy.server.models import (
  8    AuthTokenResponse,
  9    CreateFineGrainedTokenResponse,
 10    DeviceCodeResponse,
 11    DeviceTokenResponse,
 12    LinkedIdentity,
 13    LinkStartResponse,
 14    ProvidersResponse,
 15    TokenInfo,
 16    TokenScopesResponse,
 17)
 18
 19
 20class AuthResource:
 21    """Register / login, identity binding, token CRUD, sign-in discovery."""
 22
 23    def __init__(self, http: BaseHttpClient) -> None:
 24        self._http = http
 25
 26    def register(
 27        self,
 28        email: str,
 29        password: str,
 30        display_name: str = "",
 31    ) -> AuthTokenResponse:
 32        """Register via email + password. Returns a fresh user-kind token.
 33
 34        First-user bootstrap: when the backend has zero users, the new
 35        user is granted ``role="super_admin"``.
 36        """
 37        resp = self._http.request(
 38            "POST",
 39            "/auth/register",
 40            json={"email": email, "password": password, "display_name": display_name},
 41        )
 42        _raise_for_status(resp)
 43        return AuthTokenResponse.model_validate(resp.json())
 44
 45    def login(self, email: str, password: str) -> AuthTokenResponse:
 46        """Sign in with email + password. Returns a fresh user-kind token."""
 47        resp = self._http.request(
 48            "POST", "/auth/login", json={"email": email, "password": password}
 49        )
 50        _raise_for_status(resp)
 51        return AuthTokenResponse.model_validate(resp.json())
 52
 53    def device_code(self) -> DeviceCodeResponse:
 54        """Start a device-authorization flow (unauthenticated, RFC 8628).
 55
 56        Returns a short ``user_code`` + a ``verification_uri`` the user
 57        opens in a browser, plus the opaque ``device_code`` to poll with.
 58        """
 59        resp = self._http.request("POST", "/auth/device/code")
 60        _raise_for_status(resp)
 61        return DeviceCodeResponse.model_validate(resp.json())
 62
 63    def device_token(self, device_code: str) -> DeviceTokenResponse:
 64        """Poll for the token after the user approves (unauthenticated).
 65
 66        ``status`` is ``pending`` / ``approved`` / ``denied`` /
 67        ``expired``; ``token`` is set only when ``approved``.
 68        """
 69        resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code})
 70        _raise_for_status(resp)
 71        return DeviceTokenResponse.model_validate(resp.json())
 72
 73    def set_password(self, new_password: str, current_password: str = "") -> None:
 74        """Set or change the caller's password.
 75
 76        ``current_password`` is required iff the caller already has a
 77        password identity; absent on the first-time "attach password to
 78        OAuth-only account" flow.
 79        """
 80        resp = self._http.request(
 81            "POST",
 82            "/auth/password",
 83            json={"new_password": new_password, "current_password": current_password},
 84        )
 85        _raise_for_status(resp)
 86
 87    def delete_password(self) -> None:
 88        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
 89        resp = self._http.request("DELETE", "/auth/password")
 90        _raise_for_status(resp)
 91
 92    def list_identities(self) -> list[LinkedIdentity]:
 93        """Bound OAuth + password identities for the calling user."""
 94        resp = self._http.request("GET", "/auth/identities")
 95        _raise_for_status(resp)
 96        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
 97
 98    def unbind_identity(self, identity_id: str) -> None:
 99        """Unbind one OAuth identity. Server blocks unlinking the last one."""
100        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
101        _raise_for_status(resp)
102
103    def start_oauth_link(self, provider: str) -> LinkStartResponse:
104        """Begin a "link a second OAuth identity to my account" flow.
105
106        Returns the platform's signed-state authorize URL — the actual
107        flow finishes in a browser (the user opens the URL and approves
108        linking the provider to their existing account).
109        """
110        resp = self._http.request("POST", f"/auth/identities/link/{provider}")
111        _raise_for_status(resp)
112        return LinkStartResponse.model_validate(resp.json())
113
114    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
115        """List caller's tokens (user + agent). Plaintext never returned."""
116        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
117        _raise_for_status(resp)
118        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
119
120    def mint_token(
121        self,
122        label: str,
123        expires_in_days: int | None = None,
124        *,
125        competition_access: str | None = None,
126        competition_ids: list[str] | None = None,
127        permissions: dict[str, str] | None = None,
128    ) -> CreateFineGrainedTokenResponse:
129        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
130
131        With no scope args the server mints the broad legacy profile
132        (every competition, full participate access) so existing
133        callers keep working unchanged. To scope it GitHub-style pass
134        ``competition_access`` (``all`` / ``selected`` / ``none``),
135        ``competition_ids`` (when ``selected``) and ``permissions``
136        (category → ``none`` / ``read`` / ``write``).
137
138        ``expires_in_days`` defaults server-side when ``None``.
139        """
140        body: dict[str, object] = {"label": label}
141        if expires_in_days is not None:
142            body["expires_in_days"] = expires_in_days
143        if competition_access is not None:
144            body["competition_access"] = competition_access
145        if competition_ids is not None:
146            body["competition_ids"] = competition_ids
147        if permissions is not None:
148            body["permissions"] = permissions
149        resp = self._http.request("POST", "/auth/tokens", json=body)
150        _raise_for_status(resp)
151        return CreateFineGrainedTokenResponse.model_validate(resp.json())
152
153    def revoke_token(self, token_id: str) -> None:
154        """Revoke one token by id."""
155        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
156        _raise_for_status(resp)
157
158    def revoke_other_tokens(self) -> None:
159        """Revoke every token belonging to this user except the caller's own."""
160        resp = self._http.request("DELETE", "/auth/tokens/others")
161        _raise_for_status(resp)
162
163    def providers(self) -> ProvidersResponse:
164        """Which sign-in methods this deployment offers (OAuth providers
165        configured + whether password auth is enabled)."""
166        resp = self._http.request("GET", "/auth/providers")
167        _raise_for_status(resp)
168        return ProvidersResponse.model_validate(resp.json())
169
170    def token_scopes(self) -> TokenScopesResponse:
171        """The catalog of grantable fine-grained token scopes — drives
172        the token-create wizard."""
173        resp = self._http.request("GET", "/auth/tokens/scopes")
174        _raise_for_status(resp)
175        return TokenScopesResponse.model_validate(resp.json())
class AuthResource:
 21class AuthResource:
 22    """Register / login, identity binding, token CRUD, sign-in discovery."""
 23
 24    def __init__(self, http: BaseHttpClient) -> None:
 25        self._http = http
 26
 27    def register(
 28        self,
 29        email: str,
 30        password: str,
 31        display_name: str = "",
 32    ) -> AuthTokenResponse:
 33        """Register via email + password. Returns a fresh user-kind token.
 34
 35        First-user bootstrap: when the backend has zero users, the new
 36        user is granted ``role="super_admin"``.
 37        """
 38        resp = self._http.request(
 39            "POST",
 40            "/auth/register",
 41            json={"email": email, "password": password, "display_name": display_name},
 42        )
 43        _raise_for_status(resp)
 44        return AuthTokenResponse.model_validate(resp.json())
 45
 46    def login(self, email: str, password: str) -> AuthTokenResponse:
 47        """Sign in with email + password. Returns a fresh user-kind token."""
 48        resp = self._http.request(
 49            "POST", "/auth/login", json={"email": email, "password": password}
 50        )
 51        _raise_for_status(resp)
 52        return AuthTokenResponse.model_validate(resp.json())
 53
 54    def device_code(self) -> DeviceCodeResponse:
 55        """Start a device-authorization flow (unauthenticated, RFC 8628).
 56
 57        Returns a short ``user_code`` + a ``verification_uri`` the user
 58        opens in a browser, plus the opaque ``device_code`` to poll with.
 59        """
 60        resp = self._http.request("POST", "/auth/device/code")
 61        _raise_for_status(resp)
 62        return DeviceCodeResponse.model_validate(resp.json())
 63
 64    def device_token(self, device_code: str) -> DeviceTokenResponse:
 65        """Poll for the token after the user approves (unauthenticated).
 66
 67        ``status`` is ``pending`` / ``approved`` / ``denied`` /
 68        ``expired``; ``token`` is set only when ``approved``.
 69        """
 70        resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code})
 71        _raise_for_status(resp)
 72        return DeviceTokenResponse.model_validate(resp.json())
 73
 74    def set_password(self, new_password: str, current_password: str = "") -> None:
 75        """Set or change the caller's password.
 76
 77        ``current_password`` is required iff the caller already has a
 78        password identity; absent on the first-time "attach password to
 79        OAuth-only account" flow.
 80        """
 81        resp = self._http.request(
 82            "POST",
 83            "/auth/password",
 84            json={"new_password": new_password, "current_password": current_password},
 85        )
 86        _raise_for_status(resp)
 87
 88    def delete_password(self) -> None:
 89        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
 90        resp = self._http.request("DELETE", "/auth/password")
 91        _raise_for_status(resp)
 92
 93    def list_identities(self) -> list[LinkedIdentity]:
 94        """Bound OAuth + password identities for the calling user."""
 95        resp = self._http.request("GET", "/auth/identities")
 96        _raise_for_status(resp)
 97        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
 98
 99    def unbind_identity(self, identity_id: str) -> None:
100        """Unbind one OAuth identity. Server blocks unlinking the last one."""
101        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
102        _raise_for_status(resp)
103
104    def start_oauth_link(self, provider: str) -> LinkStartResponse:
105        """Begin a "link a second OAuth identity to my account" flow.
106
107        Returns the platform's signed-state authorize URL — the actual
108        flow finishes in a browser (the user opens the URL and approves
109        linking the provider to their existing account).
110        """
111        resp = self._http.request("POST", f"/auth/identities/link/{provider}")
112        _raise_for_status(resp)
113        return LinkStartResponse.model_validate(resp.json())
114
115    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
116        """List caller's tokens (user + agent). Plaintext never returned."""
117        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
118        _raise_for_status(resp)
119        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
120
121    def mint_token(
122        self,
123        label: str,
124        expires_in_days: int | None = None,
125        *,
126        competition_access: str | None = None,
127        competition_ids: list[str] | None = None,
128        permissions: dict[str, str] | None = None,
129    ) -> CreateFineGrainedTokenResponse:
130        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
131
132        With no scope args the server mints the broad legacy profile
133        (every competition, full participate access) so existing
134        callers keep working unchanged. To scope it GitHub-style pass
135        ``competition_access`` (``all`` / ``selected`` / ``none``),
136        ``competition_ids`` (when ``selected``) and ``permissions``
137        (category → ``none`` / ``read`` / ``write``).
138
139        ``expires_in_days`` defaults server-side when ``None``.
140        """
141        body: dict[str, object] = {"label": label}
142        if expires_in_days is not None:
143            body["expires_in_days"] = expires_in_days
144        if competition_access is not None:
145            body["competition_access"] = competition_access
146        if competition_ids is not None:
147            body["competition_ids"] = competition_ids
148        if permissions is not None:
149            body["permissions"] = permissions
150        resp = self._http.request("POST", "/auth/tokens", json=body)
151        _raise_for_status(resp)
152        return CreateFineGrainedTokenResponse.model_validate(resp.json())
153
154    def revoke_token(self, token_id: str) -> None:
155        """Revoke one token by id."""
156        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
157        _raise_for_status(resp)
158
159    def revoke_other_tokens(self) -> None:
160        """Revoke every token belonging to this user except the caller's own."""
161        resp = self._http.request("DELETE", "/auth/tokens/others")
162        _raise_for_status(resp)
163
164    def providers(self) -> ProvidersResponse:
165        """Which sign-in methods this deployment offers (OAuth providers
166        configured + whether password auth is enabled)."""
167        resp = self._http.request("GET", "/auth/providers")
168        _raise_for_status(resp)
169        return ProvidersResponse.model_validate(resp.json())
170
171    def token_scopes(self) -> TokenScopesResponse:
172        """The catalog of grantable fine-grained token scopes — drives
173        the token-create wizard."""
174        resp = self._http.request("GET", "/auth/tokens/scopes")
175        _raise_for_status(resp)
176        return TokenScopesResponse.model_validate(resp.json())

Register / login, identity binding, token CRUD, sign-in discovery.

AuthResource(http: ctfy.sdk.base.BaseHttpClient)
24    def __init__(self, http: BaseHttpClient) -> None:
25        self._http = http
def register( self, email: str, password: str, display_name: str = '') -> ctfy.server.models.AuthTokenResponse:
27    def register(
28        self,
29        email: str,
30        password: str,
31        display_name: str = "",
32    ) -> AuthTokenResponse:
33        """Register via email + password. Returns a fresh user-kind token.
34
35        First-user bootstrap: when the backend has zero users, the new
36        user is granted ``role="super_admin"``.
37        """
38        resp = self._http.request(
39            "POST",
40            "/auth/register",
41            json={"email": email, "password": password, "display_name": display_name},
42        )
43        _raise_for_status(resp)
44        return AuthTokenResponse.model_validate(resp.json())

Register via email + password. Returns a fresh user-kind token.

First-user bootstrap: when the backend has zero users, the new user is granted role="super_admin".

def login( self, email: str, password: str) -> ctfy.server.models.AuthTokenResponse:
46    def login(self, email: str, password: str) -> AuthTokenResponse:
47        """Sign in with email + password. Returns a fresh user-kind token."""
48        resp = self._http.request(
49            "POST", "/auth/login", json={"email": email, "password": password}
50        )
51        _raise_for_status(resp)
52        return AuthTokenResponse.model_validate(resp.json())

Sign in with email + password. Returns a fresh user-kind token.

def device_code(self) -> ctfy.server.models.DeviceCodeResponse:
54    def device_code(self) -> DeviceCodeResponse:
55        """Start a device-authorization flow (unauthenticated, RFC 8628).
56
57        Returns a short ``user_code`` + a ``verification_uri`` the user
58        opens in a browser, plus the opaque ``device_code`` to poll with.
59        """
60        resp = self._http.request("POST", "/auth/device/code")
61        _raise_for_status(resp)
62        return DeviceCodeResponse.model_validate(resp.json())

Start a device-authorization flow (unauthenticated, RFC 8628).

Returns a short user_code + a verification_uri the user opens in a browser, plus the opaque device_code to poll with.

def device_token(self, device_code: str) -> ctfy.server.models.DeviceTokenResponse:
64    def device_token(self, device_code: str) -> DeviceTokenResponse:
65        """Poll for the token after the user approves (unauthenticated).
66
67        ``status`` is ``pending`` / ``approved`` / ``denied`` /
68        ``expired``; ``token`` is set only when ``approved``.
69        """
70        resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code})
71        _raise_for_status(resp)
72        return DeviceTokenResponse.model_validate(resp.json())

Poll for the token after the user approves (unauthenticated).

status is pending / approved / denied / expired; token is set only when approved.

def set_password(self, new_password: str, current_password: str = '') -> None:
74    def set_password(self, new_password: str, current_password: str = "") -> None:
75        """Set or change the caller's password.
76
77        ``current_password`` is required iff the caller already has a
78        password identity; absent on the first-time "attach password to
79        OAuth-only account" flow.
80        """
81        resp = self._http.request(
82            "POST",
83            "/auth/password",
84            json={"new_password": new_password, "current_password": current_password},
85        )
86        _raise_for_status(resp)

Set or change the caller's password.

current_password is required iff the caller already has a password identity; absent on the first-time "attach password to OAuth-only account" flow.

def delete_password(self) -> None:
88    def delete_password(self) -> None:
89        """Drop the caller's password identity. Requires ≥1 OAuth identity."""
90        resp = self._http.request("DELETE", "/auth/password")
91        _raise_for_status(resp)

Drop the caller's password identity. Requires ≥1 OAuth identity.

def list_identities(self) -> list[ctfy.server.models.LinkedIdentity]:
93    def list_identities(self) -> list[LinkedIdentity]:
94        """Bound OAuth + password identities for the calling user."""
95        resp = self._http.request("GET", "/auth/identities")
96        _raise_for_status(resp)
97        return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]

Bound OAuth + password identities for the calling user.

def unbind_identity(self, identity_id: str) -> None:
 99    def unbind_identity(self, identity_id: str) -> None:
100        """Unbind one OAuth identity. Server blocks unlinking the last one."""
101        resp = self._http.request("DELETE", f"/auth/identities/{identity_id}")
102        _raise_for_status(resp)

Unbind one OAuth identity. Server blocks unlinking the last one.

def list_tokens( self, offset: int = 0, limit: int = 50) -> list[ctfy.server.models.TokenInfo]:
115    def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]:
116        """List caller's tokens (user + agent). Plaintext never returned."""
117        resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit})
118        _raise_for_status(resp)
119        return [TokenInfo.model_validate(t) for t in resp.json()["items"]]

List caller's tokens (user + agent). Plaintext never returned.

def mint_token( self, label: str, expires_in_days: int | None = None, *, competition_access: str | None = None, competition_ids: list[str] | None = None, permissions: dict[str, str] | None = None) -> ctfy.server.models.CreateFineGrainedTokenResponse:
121    def mint_token(
122        self,
123        label: str,
124        expires_in_days: int | None = None,
125        *,
126        competition_access: str | None = None,
127        competition_ids: list[str] | None = None,
128        permissions: dict[str, str] | None = None,
129    ) -> CreateFineGrainedTokenResponse:
130        """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE.
131
132        With no scope args the server mints the broad legacy profile
133        (every competition, full participate access) so existing
134        callers keep working unchanged. To scope it GitHub-style pass
135        ``competition_access`` (``all`` / ``selected`` / ``none``),
136        ``competition_ids`` (when ``selected``) and ``permissions``
137        (category → ``none`` / ``read`` / ``write``).
138
139        ``expires_in_days`` defaults server-side when ``None``.
140        """
141        body: dict[str, object] = {"label": label}
142        if expires_in_days is not None:
143            body["expires_in_days"] = expires_in_days
144        if competition_access is not None:
145            body["competition_access"] = competition_access
146        if competition_ids is not None:
147            body["competition_ids"] = competition_ids
148        if permissions is not None:
149            body["permissions"] = permissions
150        resp = self._http.request("POST", "/auth/tokens", json=body)
151        _raise_for_status(resp)
152        return CreateFineGrainedTokenResponse.model_validate(resp.json())

Mint a fresh fine-grained token (pf_*). Plaintext ONCE.

With no scope args the server mints the broad legacy profile (every competition, full participate access) so existing callers keep working unchanged. To scope it GitHub-style pass competition_access (all / selected / none), competition_ids (when selected) and permissions (category → none / read / write).

expires_in_days defaults server-side when None.

def revoke_token(self, token_id: str) -> None:
154    def revoke_token(self, token_id: str) -> None:
155        """Revoke one token by id."""
156        resp = self._http.request("DELETE", f"/auth/tokens/{token_id}")
157        _raise_for_status(resp)

Revoke one token by id.

def revoke_other_tokens(self) -> None:
159    def revoke_other_tokens(self) -> None:
160        """Revoke every token belonging to this user except the caller's own."""
161        resp = self._http.request("DELETE", "/auth/tokens/others")
162        _raise_for_status(resp)

Revoke every token belonging to this user except the caller's own.

def providers(self) -> ctfy.server.models.ProvidersResponse:
164    def providers(self) -> ProvidersResponse:
165        """Which sign-in methods this deployment offers (OAuth providers
166        configured + whether password auth is enabled)."""
167        resp = self._http.request("GET", "/auth/providers")
168        _raise_for_status(resp)
169        return ProvidersResponse.model_validate(resp.json())

Which sign-in methods this deployment offers (OAuth providers configured + whether password auth is enabled).

def token_scopes(self) -> ctfy.server.models.TokenScopesResponse:
171    def token_scopes(self) -> TokenScopesResponse:
172        """The catalog of grantable fine-grained token scopes — drives
173        the token-create wizard."""
174        resp = self._http.request("GET", "/auth/tokens/scopes")
175        _raise_for_status(resp)
176        return TokenScopesResponse.model_validate(resp.json())

The catalog of grantable fine-grained token scopes — drives the token-create wizard.