ctfy.sdk.resources.auth
client.auth — password auth, OAuth identities, API tokens, discovery.
1"""``client.auth`` — password auth, OAuth identities, API tokens, discovery.""" 2 3from __future__ import annotations 4 5from ctfy.sdk._helpers import _raise_for_status 6from ctfy.sdk.base import BaseHttpClient 7from ctfy.server.models import ( 8 AuthTokenResponse, 9 CreateFineGrainedTokenResponse, 10 DeviceCodeResponse, 11 DeviceTokenResponse, 12 LinkedIdentity, 13 LinkStartResponse, 14 ProvidersResponse, 15 TokenInfo, 16 TokenScopesResponse, 17) 18 19 20class AuthResource: 21 """Register / login, identity binding, token CRUD, sign-in discovery.""" 22 23 def __init__(self, http: BaseHttpClient) -> None: 24 self._http = http 25 26 def register( 27 self, 28 email: str, 29 password: str, 30 display_name: str = "", 31 ) -> AuthTokenResponse: 32 """Register via email + password. Returns a fresh user-kind token. 33 34 First-user bootstrap: when the backend has zero users, the new 35 user is granted ``role="super_admin"``. 36 """ 37 resp = self._http.request( 38 "POST", 39 "/auth/register", 40 json={"email": email, "password": password, "display_name": display_name}, 41 ) 42 _raise_for_status(resp) 43 return AuthTokenResponse.model_validate(resp.json()) 44 45 def login(self, email: str, password: str) -> AuthTokenResponse: 46 """Sign in with email + password. Returns a fresh user-kind token.""" 47 resp = self._http.request( 48 "POST", "/auth/login", json={"email": email, "password": password} 49 ) 50 _raise_for_status(resp) 51 return AuthTokenResponse.model_validate(resp.json()) 52 53 def device_code(self) -> DeviceCodeResponse: 54 """Start a device-authorization flow (unauthenticated, RFC 8628). 55 56 Returns a short ``user_code`` + a ``verification_uri`` the user 57 opens in a browser, plus the opaque ``device_code`` to poll with. 58 """ 59 resp = self._http.request("POST", "/auth/device/code") 60 _raise_for_status(resp) 61 return DeviceCodeResponse.model_validate(resp.json()) 62 63 def device_token(self, device_code: str) -> DeviceTokenResponse: 64 """Poll for the token after the user approves (unauthenticated). 65 66 ``status`` is ``pending`` / ``approved`` / ``denied`` / 67 ``expired``; ``token`` is set only when ``approved``. 68 """ 69 resp = self._http.request( 70 "POST", "/auth/device/token", json={"device_code": device_code} 71 ) 72 _raise_for_status(resp) 73 return DeviceTokenResponse.model_validate(resp.json()) 74 75 def set_password(self, new_password: str, current_password: str = "") -> None: 76 """Set or change the caller's password. 77 78 ``current_password`` is required iff the caller already has a 79 password identity; absent on the first-time "attach password to 80 OAuth-only account" flow. 81 """ 82 resp = self._http.request( 83 "POST", 84 "/auth/password", 85 json={"new_password": new_password, "current_password": current_password}, 86 ) 87 _raise_for_status(resp) 88 89 def delete_password(self) -> None: 90 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 91 resp = self._http.request("DELETE", "/auth/password") 92 _raise_for_status(resp) 93 94 def list_identities(self) -> list[LinkedIdentity]: 95 """Bound OAuth + password identities for the calling user.""" 96 resp = self._http.request("GET", "/auth/identities") 97 _raise_for_status(resp) 98 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]] 99 100 def unbind_identity(self, identity_id: str) -> None: 101 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 102 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 103 _raise_for_status(resp) 104 105 def start_oauth_link(self, provider: str) -> LinkStartResponse: 106 """Begin a "link a second OAuth identity to my account" flow. 107 108 Returns the platform's signed-state authorize URL — the actual 109 flow finishes in a browser (the user opens the URL and approves 110 linking the provider to their existing account). 111 """ 112 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 113 _raise_for_status(resp) 114 return LinkStartResponse.model_validate(resp.json()) 115 116 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 117 """List caller's tokens (user + agent). Plaintext never returned.""" 118 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 119 _raise_for_status(resp) 120 return [TokenInfo.model_validate(t) for t in resp.json()["items"]] 121 122 def mint_token( 123 self, 124 label: str, 125 expires_in_days: int | None = None, 126 *, 127 competition_access: str | None = None, 128 competition_ids: list[str] | None = None, 129 permissions: dict[str, str] | None = None, 130 ) -> CreateFineGrainedTokenResponse: 131 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 132 133 With no scope args the server mints the broad legacy profile 134 (every competition, full participate access) so existing 135 callers keep working unchanged. To scope it GitHub-style pass 136 ``competition_access`` (``all`` / ``selected`` / ``none``), 137 ``competition_ids`` (when ``selected``) and ``permissions`` 138 (category → ``none`` / ``read`` / ``write``). 139 140 ``expires_in_days`` defaults server-side when ``None``. 141 """ 142 body: dict[str, object] = {"label": label} 143 if expires_in_days is not None: 144 body["expires_in_days"] = expires_in_days 145 if competition_access is not None: 146 body["competition_access"] = competition_access 147 if competition_ids is not None: 148 body["competition_ids"] = competition_ids 149 if permissions is not None: 150 body["permissions"] = permissions 151 resp = self._http.request("POST", "/auth/tokens", json=body) 152 _raise_for_status(resp) 153 return CreateFineGrainedTokenResponse.model_validate(resp.json()) 154 155 def revoke_token(self, token_id: str) -> None: 156 """Revoke one token by id.""" 157 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 158 _raise_for_status(resp) 159 160 def revoke_other_tokens(self) -> None: 161 """Revoke every token belonging to this user except the caller's own.""" 162 resp = self._http.request("DELETE", "/auth/tokens/others") 163 _raise_for_status(resp) 164 165 def providers(self) -> ProvidersResponse: 166 """Which sign-in methods this deployment offers (OAuth providers 167 configured + whether password auth is enabled).""" 168 resp = self._http.request("GET", "/auth/providers") 169 _raise_for_status(resp) 170 return ProvidersResponse.model_validate(resp.json()) 171 172 def token_scopes(self) -> TokenScopesResponse: 173 """The catalog of grantable fine-grained token scopes — drives 174 the token-create wizard.""" 175 resp = self._http.request("GET", "/auth/tokens/scopes") 176 _raise_for_status(resp) 177 return TokenScopesResponse.model_validate(resp.json())
21class AuthResource: 22 """Register / login, identity binding, token CRUD, sign-in discovery.""" 23 24 def __init__(self, http: BaseHttpClient) -> None: 25 self._http = http 26 27 def register( 28 self, 29 email: str, 30 password: str, 31 display_name: str = "", 32 ) -> AuthTokenResponse: 33 """Register via email + password. Returns a fresh user-kind token. 34 35 First-user bootstrap: when the backend has zero users, the new 36 user is granted ``role="super_admin"``. 37 """ 38 resp = self._http.request( 39 "POST", 40 "/auth/register", 41 json={"email": email, "password": password, "display_name": display_name}, 42 ) 43 _raise_for_status(resp) 44 return AuthTokenResponse.model_validate(resp.json()) 45 46 def login(self, email: str, password: str) -> AuthTokenResponse: 47 """Sign in with email + password. Returns a fresh user-kind token.""" 48 resp = self._http.request( 49 "POST", "/auth/login", json={"email": email, "password": password} 50 ) 51 _raise_for_status(resp) 52 return AuthTokenResponse.model_validate(resp.json()) 53 54 def device_code(self) -> DeviceCodeResponse: 55 """Start a device-authorization flow (unauthenticated, RFC 8628). 56 57 Returns a short ``user_code`` + a ``verification_uri`` the user 58 opens in a browser, plus the opaque ``device_code`` to poll with. 59 """ 60 resp = self._http.request("POST", "/auth/device/code") 61 _raise_for_status(resp) 62 return DeviceCodeResponse.model_validate(resp.json()) 63 64 def device_token(self, device_code: str) -> DeviceTokenResponse: 65 """Poll for the token after the user approves (unauthenticated). 66 67 ``status`` is ``pending`` / ``approved`` / ``denied`` / 68 ``expired``; ``token`` is set only when ``approved``. 69 """ 70 resp = self._http.request( 71 "POST", "/auth/device/token", json={"device_code": device_code} 72 ) 73 _raise_for_status(resp) 74 return DeviceTokenResponse.model_validate(resp.json()) 75 76 def set_password(self, new_password: str, current_password: str = "") -> None: 77 """Set or change the caller's password. 78 79 ``current_password`` is required iff the caller already has a 80 password identity; absent on the first-time "attach password to 81 OAuth-only account" flow. 82 """ 83 resp = self._http.request( 84 "POST", 85 "/auth/password", 86 json={"new_password": new_password, "current_password": current_password}, 87 ) 88 _raise_for_status(resp) 89 90 def delete_password(self) -> None: 91 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 92 resp = self._http.request("DELETE", "/auth/password") 93 _raise_for_status(resp) 94 95 def list_identities(self) -> list[LinkedIdentity]: 96 """Bound OAuth + password identities for the calling user.""" 97 resp = self._http.request("GET", "/auth/identities") 98 _raise_for_status(resp) 99 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]] 100 101 def unbind_identity(self, identity_id: str) -> None: 102 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 103 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 104 _raise_for_status(resp) 105 106 def start_oauth_link(self, provider: str) -> LinkStartResponse: 107 """Begin a "link a second OAuth identity to my account" flow. 108 109 Returns the platform's signed-state authorize URL — the actual 110 flow finishes in a browser (the user opens the URL and approves 111 linking the provider to their existing account). 112 """ 113 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 114 _raise_for_status(resp) 115 return LinkStartResponse.model_validate(resp.json()) 116 117 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 118 """List caller's tokens (user + agent). Plaintext never returned.""" 119 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 120 _raise_for_status(resp) 121 return [TokenInfo.model_validate(t) for t in resp.json()["items"]] 122 123 def mint_token( 124 self, 125 label: str, 126 expires_in_days: int | None = None, 127 *, 128 competition_access: str | None = None, 129 competition_ids: list[str] | None = None, 130 permissions: dict[str, str] | None = None, 131 ) -> CreateFineGrainedTokenResponse: 132 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 133 134 With no scope args the server mints the broad legacy profile 135 (every competition, full participate access) so existing 136 callers keep working unchanged. To scope it GitHub-style pass 137 ``competition_access`` (``all`` / ``selected`` / ``none``), 138 ``competition_ids`` (when ``selected``) and ``permissions`` 139 (category → ``none`` / ``read`` / ``write``). 140 141 ``expires_in_days`` defaults server-side when ``None``. 142 """ 143 body: dict[str, object] = {"label": label} 144 if expires_in_days is not None: 145 body["expires_in_days"] = expires_in_days 146 if competition_access is not None: 147 body["competition_access"] = competition_access 148 if competition_ids is not None: 149 body["competition_ids"] = competition_ids 150 if permissions is not None: 151 body["permissions"] = permissions 152 resp = self._http.request("POST", "/auth/tokens", json=body) 153 _raise_for_status(resp) 154 return CreateFineGrainedTokenResponse.model_validate(resp.json()) 155 156 def revoke_token(self, token_id: str) -> None: 157 """Revoke one token by id.""" 158 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 159 _raise_for_status(resp) 160 161 def revoke_other_tokens(self) -> None: 162 """Revoke every token belonging to this user except the caller's own.""" 163 resp = self._http.request("DELETE", "/auth/tokens/others") 164 _raise_for_status(resp) 165 166 def providers(self) -> ProvidersResponse: 167 """Which sign-in methods this deployment offers (OAuth providers 168 configured + whether password auth is enabled).""" 169 resp = self._http.request("GET", "/auth/providers") 170 _raise_for_status(resp) 171 return ProvidersResponse.model_validate(resp.json()) 172 173 def token_scopes(self) -> TokenScopesResponse: 174 """The catalog of grantable fine-grained token scopes — drives 175 the token-create wizard.""" 176 resp = self._http.request("GET", "/auth/tokens/scopes") 177 _raise_for_status(resp) 178 return TokenScopesResponse.model_validate(resp.json())
Register / login, identity binding, token CRUD, sign-in discovery.
27 def register( 28 self, 29 email: str, 30 password: str, 31 display_name: str = "", 32 ) -> AuthTokenResponse: 33 """Register via email + password. Returns a fresh user-kind token. 34 35 First-user bootstrap: when the backend has zero users, the new 36 user is granted ``role="super_admin"``. 37 """ 38 resp = self._http.request( 39 "POST", 40 "/auth/register", 41 json={"email": email, "password": password, "display_name": display_name}, 42 ) 43 _raise_for_status(resp) 44 return AuthTokenResponse.model_validate(resp.json())
Register via email + password. Returns a fresh user-kind token.
First-user bootstrap: when the backend has zero users, the new
user is granted role="super_admin".
46 def login(self, email: str, password: str) -> AuthTokenResponse: 47 """Sign in with email + password. Returns a fresh user-kind token.""" 48 resp = self._http.request( 49 "POST", "/auth/login", json={"email": email, "password": password} 50 ) 51 _raise_for_status(resp) 52 return AuthTokenResponse.model_validate(resp.json())
Sign in with email + password. Returns a fresh user-kind token.
54 def device_code(self) -> DeviceCodeResponse: 55 """Start a device-authorization flow (unauthenticated, RFC 8628). 56 57 Returns a short ``user_code`` + a ``verification_uri`` the user 58 opens in a browser, plus the opaque ``device_code`` to poll with. 59 """ 60 resp = self._http.request("POST", "/auth/device/code") 61 _raise_for_status(resp) 62 return DeviceCodeResponse.model_validate(resp.json())
Start a device-authorization flow (unauthenticated, RFC 8628).
Returns a short user_code + a verification_uri the user
opens in a browser, plus the opaque device_code to poll with.
64 def device_token(self, device_code: str) -> DeviceTokenResponse: 65 """Poll for the token after the user approves (unauthenticated). 66 67 ``status`` is ``pending`` / ``approved`` / ``denied`` / 68 ``expired``; ``token`` is set only when ``approved``. 69 """ 70 resp = self._http.request( 71 "POST", "/auth/device/token", json={"device_code": device_code} 72 ) 73 _raise_for_status(resp) 74 return DeviceTokenResponse.model_validate(resp.json())
Poll for the token after the user approves (unauthenticated).
status is pending / approved / denied /
expired; token is set only when approved.
76 def set_password(self, new_password: str, current_password: str = "") -> None: 77 """Set or change the caller's password. 78 79 ``current_password`` is required iff the caller already has a 80 password identity; absent on the first-time "attach password to 81 OAuth-only account" flow. 82 """ 83 resp = self._http.request( 84 "POST", 85 "/auth/password", 86 json={"new_password": new_password, "current_password": current_password}, 87 ) 88 _raise_for_status(resp)
Set or change the caller's password.
current_password is required iff the caller already has a
password identity; absent on the first-time "attach password to
OAuth-only account" flow.
90 def delete_password(self) -> None: 91 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 92 resp = self._http.request("DELETE", "/auth/password") 93 _raise_for_status(resp)
Drop the caller's password identity. Requires ≥1 OAuth identity.
95 def list_identities(self) -> list[LinkedIdentity]: 96 """Bound OAuth + password identities for the calling user.""" 97 resp = self._http.request("GET", "/auth/identities") 98 _raise_for_status(resp) 99 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
Bound OAuth + password identities for the calling user.
101 def unbind_identity(self, identity_id: str) -> None: 102 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 103 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 104 _raise_for_status(resp)
Unbind one OAuth identity. Server blocks unlinking the last one.
106 def start_oauth_link(self, provider: str) -> LinkStartResponse: 107 """Begin a "link a second OAuth identity to my account" flow. 108 109 Returns the platform's signed-state authorize URL — the actual 110 flow finishes in a browser (the user opens the URL and approves 111 linking the provider to their existing account). 112 """ 113 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 114 _raise_for_status(resp) 115 return LinkStartResponse.model_validate(resp.json())
Begin a "link a second OAuth identity to my account" flow.
Returns the platform's signed-state authorize URL — the actual flow finishes in a browser (the user opens the URL and approves linking the provider to their existing account).
117 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 118 """List caller's tokens (user + agent). Plaintext never returned.""" 119 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 120 _raise_for_status(resp) 121 return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
List caller's tokens (user + agent). Plaintext never returned.
123 def mint_token( 124 self, 125 label: str, 126 expires_in_days: int | None = None, 127 *, 128 competition_access: str | None = None, 129 competition_ids: list[str] | None = None, 130 permissions: dict[str, str] | None = None, 131 ) -> CreateFineGrainedTokenResponse: 132 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 133 134 With no scope args the server mints the broad legacy profile 135 (every competition, full participate access) so existing 136 callers keep working unchanged. To scope it GitHub-style pass 137 ``competition_access`` (``all`` / ``selected`` / ``none``), 138 ``competition_ids`` (when ``selected``) and ``permissions`` 139 (category → ``none`` / ``read`` / ``write``). 140 141 ``expires_in_days`` defaults server-side when ``None``. 142 """ 143 body: dict[str, object] = {"label": label} 144 if expires_in_days is not None: 145 body["expires_in_days"] = expires_in_days 146 if competition_access is not None: 147 body["competition_access"] = competition_access 148 if competition_ids is not None: 149 body["competition_ids"] = competition_ids 150 if permissions is not None: 151 body["permissions"] = permissions 152 resp = self._http.request("POST", "/auth/tokens", json=body) 153 _raise_for_status(resp) 154 return CreateFineGrainedTokenResponse.model_validate(resp.json())
Mint a fresh fine-grained token (pf_*). Plaintext ONCE.
With no scope args the server mints the broad legacy profile
(every competition, full participate access) so existing
callers keep working unchanged. To scope it GitHub-style pass
competition_access (all / selected / none),
competition_ids (when selected) and permissions
(category → none / read / write).
expires_in_days defaults server-side when None.
156 def revoke_token(self, token_id: str) -> None: 157 """Revoke one token by id.""" 158 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 159 _raise_for_status(resp)
Revoke one token by id.
161 def revoke_other_tokens(self) -> None: 162 """Revoke every token belonging to this user except the caller's own.""" 163 resp = self._http.request("DELETE", "/auth/tokens/others") 164 _raise_for_status(resp)
Revoke every token belonging to this user except the caller's own.
166 def providers(self) -> ProvidersResponse: 167 """Which sign-in methods this deployment offers (OAuth providers 168 configured + whether password auth is enabled).""" 169 resp = self._http.request("GET", "/auth/providers") 170 _raise_for_status(resp) 171 return ProvidersResponse.model_validate(resp.json())
Which sign-in methods this deployment offers (OAuth providers configured + whether password auth is enabled).
173 def token_scopes(self) -> TokenScopesResponse: 174 """The catalog of grantable fine-grained token scopes — drives 175 the token-create wizard.""" 176 resp = self._http.request("GET", "/auth/tokens/scopes") 177 _raise_for_status(resp) 178 return TokenScopesResponse.model_validate(resp.json())
The catalog of grantable fine-grained token scopes — drives the token-create wizard.