ctfy.sdk.resources.auth
client.auth — password auth, OAuth identities, API tokens, discovery.
1"""``client.auth`` — password auth, OAuth identities, API tokens, discovery.""" 2 3from __future__ import annotations 4 5from ctfy.sdk._helpers import _raise_for_status 6from ctfy.sdk.base import BaseHttpClient 7from ctfy.server.models import ( 8 AuthTokenResponse, 9 CreateFineGrainedTokenResponse, 10 DeviceCodeResponse, 11 DeviceTokenResponse, 12 LinkedIdentity, 13 LinkStartResponse, 14 ProvidersResponse, 15 TokenInfo, 16 TokenScopesResponse, 17) 18 19 20class AuthResource: 21 """Register / login, identity binding, token CRUD, sign-in discovery.""" 22 23 def __init__(self, http: BaseHttpClient) -> None: 24 self._http = http 25 26 def register( 27 self, 28 email: str, 29 password: str, 30 display_name: str = "", 31 ) -> AuthTokenResponse: 32 """Register via email + password. Returns a fresh user-kind token. 33 34 First-user bootstrap: when the backend has zero users, the new 35 user is granted ``role="super_admin"``. 36 """ 37 resp = self._http.request( 38 "POST", 39 "/auth/register", 40 json={"email": email, "password": password, "display_name": display_name}, 41 ) 42 _raise_for_status(resp) 43 return AuthTokenResponse.model_validate(resp.json()) 44 45 def login(self, email: str, password: str) -> AuthTokenResponse: 46 """Sign in with email + password. Returns a fresh user-kind token.""" 47 resp = self._http.request( 48 "POST", "/auth/login", json={"email": email, "password": password} 49 ) 50 _raise_for_status(resp) 51 return AuthTokenResponse.model_validate(resp.json()) 52 53 def device_code(self) -> DeviceCodeResponse: 54 """Start a device-authorization flow (unauthenticated, RFC 8628). 55 56 Returns a short ``user_code`` + a ``verification_uri`` the user 57 opens in a browser, plus the opaque ``device_code`` to poll with. 58 """ 59 resp = self._http.request("POST", "/auth/device/code") 60 _raise_for_status(resp) 61 return DeviceCodeResponse.model_validate(resp.json()) 62 63 def device_token(self, device_code: str) -> DeviceTokenResponse: 64 """Poll for the token after the user approves (unauthenticated). 65 66 ``status`` is ``pending`` / ``approved`` / ``denied`` / 67 ``expired``; ``token`` is set only when ``approved``. 68 """ 69 resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code}) 70 _raise_for_status(resp) 71 return DeviceTokenResponse.model_validate(resp.json()) 72 73 def set_password(self, new_password: str, current_password: str = "") -> None: 74 """Set or change the caller's password. 75 76 ``current_password`` is required iff the caller already has a 77 password identity; absent on the first-time "attach password to 78 OAuth-only account" flow. 79 """ 80 resp = self._http.request( 81 "POST", 82 "/auth/password", 83 json={"new_password": new_password, "current_password": current_password}, 84 ) 85 _raise_for_status(resp) 86 87 def delete_password(self) -> None: 88 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 89 resp = self._http.request("DELETE", "/auth/password") 90 _raise_for_status(resp) 91 92 def list_identities(self) -> list[LinkedIdentity]: 93 """Bound OAuth + password identities for the calling user.""" 94 resp = self._http.request("GET", "/auth/identities") 95 _raise_for_status(resp) 96 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]] 97 98 def unbind_identity(self, identity_id: str) -> None: 99 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 100 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 101 _raise_for_status(resp) 102 103 def start_oauth_link(self, provider: str) -> LinkStartResponse: 104 """Begin a "link a second OAuth identity to my account" flow. 105 106 Returns the platform's signed-state authorize URL — the actual 107 flow finishes in a browser (the user opens the URL and approves 108 linking the provider to their existing account). 109 """ 110 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 111 _raise_for_status(resp) 112 return LinkStartResponse.model_validate(resp.json()) 113 114 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 115 """List caller's tokens (user + agent). Plaintext never returned.""" 116 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 117 _raise_for_status(resp) 118 return [TokenInfo.model_validate(t) for t in resp.json()["items"]] 119 120 def mint_token( 121 self, 122 label: str, 123 expires_in_days: int | None = None, 124 *, 125 competition_access: str | None = None, 126 competition_ids: list[str] | None = None, 127 permissions: dict[str, str] | None = None, 128 ) -> CreateFineGrainedTokenResponse: 129 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 130 131 With no scope args the server mints the broad legacy profile 132 (every competition, full participate access) so existing 133 callers keep working unchanged. To scope it GitHub-style pass 134 ``competition_access`` (``all`` / ``selected`` / ``none``), 135 ``competition_ids`` (when ``selected``) and ``permissions`` 136 (category → ``none`` / ``read`` / ``write``). 137 138 ``expires_in_days`` defaults server-side when ``None``. 139 """ 140 body: dict[str, object] = {"label": label} 141 if expires_in_days is not None: 142 body["expires_in_days"] = expires_in_days 143 if competition_access is not None: 144 body["competition_access"] = competition_access 145 if competition_ids is not None: 146 body["competition_ids"] = competition_ids 147 if permissions is not None: 148 body["permissions"] = permissions 149 resp = self._http.request("POST", "/auth/tokens", json=body) 150 _raise_for_status(resp) 151 return CreateFineGrainedTokenResponse.model_validate(resp.json()) 152 153 def revoke_token(self, token_id: str) -> None: 154 """Revoke one token by id.""" 155 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 156 _raise_for_status(resp) 157 158 def revoke_other_tokens(self) -> None: 159 """Revoke every token belonging to this user except the caller's own.""" 160 resp = self._http.request("DELETE", "/auth/tokens/others") 161 _raise_for_status(resp) 162 163 def providers(self) -> ProvidersResponse: 164 """Which sign-in methods this deployment offers (OAuth providers 165 configured + whether password auth is enabled).""" 166 resp = self._http.request("GET", "/auth/providers") 167 _raise_for_status(resp) 168 return ProvidersResponse.model_validate(resp.json()) 169 170 def token_scopes(self) -> TokenScopesResponse: 171 """The catalog of grantable fine-grained token scopes — drives 172 the token-create wizard.""" 173 resp = self._http.request("GET", "/auth/tokens/scopes") 174 _raise_for_status(resp) 175 return TokenScopesResponse.model_validate(resp.json())
21class AuthResource: 22 """Register / login, identity binding, token CRUD, sign-in discovery.""" 23 24 def __init__(self, http: BaseHttpClient) -> None: 25 self._http = http 26 27 def register( 28 self, 29 email: str, 30 password: str, 31 display_name: str = "", 32 ) -> AuthTokenResponse: 33 """Register via email + password. Returns a fresh user-kind token. 34 35 First-user bootstrap: when the backend has zero users, the new 36 user is granted ``role="super_admin"``. 37 """ 38 resp = self._http.request( 39 "POST", 40 "/auth/register", 41 json={"email": email, "password": password, "display_name": display_name}, 42 ) 43 _raise_for_status(resp) 44 return AuthTokenResponse.model_validate(resp.json()) 45 46 def login(self, email: str, password: str) -> AuthTokenResponse: 47 """Sign in with email + password. Returns a fresh user-kind token.""" 48 resp = self._http.request( 49 "POST", "/auth/login", json={"email": email, "password": password} 50 ) 51 _raise_for_status(resp) 52 return AuthTokenResponse.model_validate(resp.json()) 53 54 def device_code(self) -> DeviceCodeResponse: 55 """Start a device-authorization flow (unauthenticated, RFC 8628). 56 57 Returns a short ``user_code`` + a ``verification_uri`` the user 58 opens in a browser, plus the opaque ``device_code`` to poll with. 59 """ 60 resp = self._http.request("POST", "/auth/device/code") 61 _raise_for_status(resp) 62 return DeviceCodeResponse.model_validate(resp.json()) 63 64 def device_token(self, device_code: str) -> DeviceTokenResponse: 65 """Poll for the token after the user approves (unauthenticated). 66 67 ``status`` is ``pending`` / ``approved`` / ``denied`` / 68 ``expired``; ``token`` is set only when ``approved``. 69 """ 70 resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code}) 71 _raise_for_status(resp) 72 return DeviceTokenResponse.model_validate(resp.json()) 73 74 def set_password(self, new_password: str, current_password: str = "") -> None: 75 """Set or change the caller's password. 76 77 ``current_password`` is required iff the caller already has a 78 password identity; absent on the first-time "attach password to 79 OAuth-only account" flow. 80 """ 81 resp = self._http.request( 82 "POST", 83 "/auth/password", 84 json={"new_password": new_password, "current_password": current_password}, 85 ) 86 _raise_for_status(resp) 87 88 def delete_password(self) -> None: 89 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 90 resp = self._http.request("DELETE", "/auth/password") 91 _raise_for_status(resp) 92 93 def list_identities(self) -> list[LinkedIdentity]: 94 """Bound OAuth + password identities for the calling user.""" 95 resp = self._http.request("GET", "/auth/identities") 96 _raise_for_status(resp) 97 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]] 98 99 def unbind_identity(self, identity_id: str) -> None: 100 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 101 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 102 _raise_for_status(resp) 103 104 def start_oauth_link(self, provider: str) -> LinkStartResponse: 105 """Begin a "link a second OAuth identity to my account" flow. 106 107 Returns the platform's signed-state authorize URL — the actual 108 flow finishes in a browser (the user opens the URL and approves 109 linking the provider to their existing account). 110 """ 111 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 112 _raise_for_status(resp) 113 return LinkStartResponse.model_validate(resp.json()) 114 115 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 116 """List caller's tokens (user + agent). Plaintext never returned.""" 117 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 118 _raise_for_status(resp) 119 return [TokenInfo.model_validate(t) for t in resp.json()["items"]] 120 121 def mint_token( 122 self, 123 label: str, 124 expires_in_days: int | None = None, 125 *, 126 competition_access: str | None = None, 127 competition_ids: list[str] | None = None, 128 permissions: dict[str, str] | None = None, 129 ) -> CreateFineGrainedTokenResponse: 130 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 131 132 With no scope args the server mints the broad legacy profile 133 (every competition, full participate access) so existing 134 callers keep working unchanged. To scope it GitHub-style pass 135 ``competition_access`` (``all`` / ``selected`` / ``none``), 136 ``competition_ids`` (when ``selected``) and ``permissions`` 137 (category → ``none`` / ``read`` / ``write``). 138 139 ``expires_in_days`` defaults server-side when ``None``. 140 """ 141 body: dict[str, object] = {"label": label} 142 if expires_in_days is not None: 143 body["expires_in_days"] = expires_in_days 144 if competition_access is not None: 145 body["competition_access"] = competition_access 146 if competition_ids is not None: 147 body["competition_ids"] = competition_ids 148 if permissions is not None: 149 body["permissions"] = permissions 150 resp = self._http.request("POST", "/auth/tokens", json=body) 151 _raise_for_status(resp) 152 return CreateFineGrainedTokenResponse.model_validate(resp.json()) 153 154 def revoke_token(self, token_id: str) -> None: 155 """Revoke one token by id.""" 156 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 157 _raise_for_status(resp) 158 159 def revoke_other_tokens(self) -> None: 160 """Revoke every token belonging to this user except the caller's own.""" 161 resp = self._http.request("DELETE", "/auth/tokens/others") 162 _raise_for_status(resp) 163 164 def providers(self) -> ProvidersResponse: 165 """Which sign-in methods this deployment offers (OAuth providers 166 configured + whether password auth is enabled).""" 167 resp = self._http.request("GET", "/auth/providers") 168 _raise_for_status(resp) 169 return ProvidersResponse.model_validate(resp.json()) 170 171 def token_scopes(self) -> TokenScopesResponse: 172 """The catalog of grantable fine-grained token scopes — drives 173 the token-create wizard.""" 174 resp = self._http.request("GET", "/auth/tokens/scopes") 175 _raise_for_status(resp) 176 return TokenScopesResponse.model_validate(resp.json())
Register / login, identity binding, token CRUD, sign-in discovery.
27 def register( 28 self, 29 email: str, 30 password: str, 31 display_name: str = "", 32 ) -> AuthTokenResponse: 33 """Register via email + password. Returns a fresh user-kind token. 34 35 First-user bootstrap: when the backend has zero users, the new 36 user is granted ``role="super_admin"``. 37 """ 38 resp = self._http.request( 39 "POST", 40 "/auth/register", 41 json={"email": email, "password": password, "display_name": display_name}, 42 ) 43 _raise_for_status(resp) 44 return AuthTokenResponse.model_validate(resp.json())
Register via email + password. Returns a fresh user-kind token.
First-user bootstrap: when the backend has zero users, the new
user is granted role="super_admin".
46 def login(self, email: str, password: str) -> AuthTokenResponse: 47 """Sign in with email + password. Returns a fresh user-kind token.""" 48 resp = self._http.request( 49 "POST", "/auth/login", json={"email": email, "password": password} 50 ) 51 _raise_for_status(resp) 52 return AuthTokenResponse.model_validate(resp.json())
Sign in with email + password. Returns a fresh user-kind token.
54 def device_code(self) -> DeviceCodeResponse: 55 """Start a device-authorization flow (unauthenticated, RFC 8628). 56 57 Returns a short ``user_code`` + a ``verification_uri`` the user 58 opens in a browser, plus the opaque ``device_code`` to poll with. 59 """ 60 resp = self._http.request("POST", "/auth/device/code") 61 _raise_for_status(resp) 62 return DeviceCodeResponse.model_validate(resp.json())
Start a device-authorization flow (unauthenticated, RFC 8628).
Returns a short user_code + a verification_uri the user
opens in a browser, plus the opaque device_code to poll with.
64 def device_token(self, device_code: str) -> DeviceTokenResponse: 65 """Poll for the token after the user approves (unauthenticated). 66 67 ``status`` is ``pending`` / ``approved`` / ``denied`` / 68 ``expired``; ``token`` is set only when ``approved``. 69 """ 70 resp = self._http.request("POST", "/auth/device/token", json={"device_code": device_code}) 71 _raise_for_status(resp) 72 return DeviceTokenResponse.model_validate(resp.json())
Poll for the token after the user approves (unauthenticated).
status is pending / approved / denied /
expired; token is set only when approved.
74 def set_password(self, new_password: str, current_password: str = "") -> None: 75 """Set or change the caller's password. 76 77 ``current_password`` is required iff the caller already has a 78 password identity; absent on the first-time "attach password to 79 OAuth-only account" flow. 80 """ 81 resp = self._http.request( 82 "POST", 83 "/auth/password", 84 json={"new_password": new_password, "current_password": current_password}, 85 ) 86 _raise_for_status(resp)
Set or change the caller's password.
current_password is required iff the caller already has a
password identity; absent on the first-time "attach password to
OAuth-only account" flow.
88 def delete_password(self) -> None: 89 """Drop the caller's password identity. Requires ≥1 OAuth identity.""" 90 resp = self._http.request("DELETE", "/auth/password") 91 _raise_for_status(resp)
Drop the caller's password identity. Requires ≥1 OAuth identity.
93 def list_identities(self) -> list[LinkedIdentity]: 94 """Bound OAuth + password identities for the calling user.""" 95 resp = self._http.request("GET", "/auth/identities") 96 _raise_for_status(resp) 97 return [LinkedIdentity.model_validate(i) for i in resp.json()["items"]]
Bound OAuth + password identities for the calling user.
99 def unbind_identity(self, identity_id: str) -> None: 100 """Unbind one OAuth identity. Server blocks unlinking the last one.""" 101 resp = self._http.request("DELETE", f"/auth/identities/{identity_id}") 102 _raise_for_status(resp)
Unbind one OAuth identity. Server blocks unlinking the last one.
104 def start_oauth_link(self, provider: str) -> LinkStartResponse: 105 """Begin a "link a second OAuth identity to my account" flow. 106 107 Returns the platform's signed-state authorize URL — the actual 108 flow finishes in a browser (the user opens the URL and approves 109 linking the provider to their existing account). 110 """ 111 resp = self._http.request("POST", f"/auth/identities/link/{provider}") 112 _raise_for_status(resp) 113 return LinkStartResponse.model_validate(resp.json())
Begin a "link a second OAuth identity to my account" flow.
Returns the platform's signed-state authorize URL — the actual flow finishes in a browser (the user opens the URL and approves linking the provider to their existing account).
115 def list_tokens(self, offset: int = 0, limit: int = 50) -> list[TokenInfo]: 116 """List caller's tokens (user + agent). Plaintext never returned.""" 117 resp = self._http.request("GET", "/auth/tokens", params={"offset": offset, "limit": limit}) 118 _raise_for_status(resp) 119 return [TokenInfo.model_validate(t) for t in resp.json()["items"]]
List caller's tokens (user + agent). Plaintext never returned.
121 def mint_token( 122 self, 123 label: str, 124 expires_in_days: int | None = None, 125 *, 126 competition_access: str | None = None, 127 competition_ids: list[str] | None = None, 128 permissions: dict[str, str] | None = None, 129 ) -> CreateFineGrainedTokenResponse: 130 """Mint a fresh fine-grained token (``pf_*``). Plaintext ONCE. 131 132 With no scope args the server mints the broad legacy profile 133 (every competition, full participate access) so existing 134 callers keep working unchanged. To scope it GitHub-style pass 135 ``competition_access`` (``all`` / ``selected`` / ``none``), 136 ``competition_ids`` (when ``selected``) and ``permissions`` 137 (category → ``none`` / ``read`` / ``write``). 138 139 ``expires_in_days`` defaults server-side when ``None``. 140 """ 141 body: dict[str, object] = {"label": label} 142 if expires_in_days is not None: 143 body["expires_in_days"] = expires_in_days 144 if competition_access is not None: 145 body["competition_access"] = competition_access 146 if competition_ids is not None: 147 body["competition_ids"] = competition_ids 148 if permissions is not None: 149 body["permissions"] = permissions 150 resp = self._http.request("POST", "/auth/tokens", json=body) 151 _raise_for_status(resp) 152 return CreateFineGrainedTokenResponse.model_validate(resp.json())
Mint a fresh fine-grained token (pf_*). Plaintext ONCE.
With no scope args the server mints the broad legacy profile
(every competition, full participate access) so existing
callers keep working unchanged. To scope it GitHub-style pass
competition_access (all / selected / none),
competition_ids (when selected) and permissions
(category → none / read / write).
expires_in_days defaults server-side when None.
154 def revoke_token(self, token_id: str) -> None: 155 """Revoke one token by id.""" 156 resp = self._http.request("DELETE", f"/auth/tokens/{token_id}") 157 _raise_for_status(resp)
Revoke one token by id.
159 def revoke_other_tokens(self) -> None: 160 """Revoke every token belonging to this user except the caller's own.""" 161 resp = self._http.request("DELETE", "/auth/tokens/others") 162 _raise_for_status(resp)
Revoke every token belonging to this user except the caller's own.
164 def providers(self) -> ProvidersResponse: 165 """Which sign-in methods this deployment offers (OAuth providers 166 configured + whether password auth is enabled).""" 167 resp = self._http.request("GET", "/auth/providers") 168 _raise_for_status(resp) 169 return ProvidersResponse.model_validate(resp.json())
Which sign-in methods this deployment offers (OAuth providers configured + whether password auth is enabled).
171 def token_scopes(self) -> TokenScopesResponse: 172 """The catalog of grantable fine-grained token scopes — drives 173 the token-create wizard.""" 174 resp = self._http.request("GET", "/auth/tokens/scopes") 175 _raise_for_status(resp) 176 return TokenScopesResponse.model_validate(resp.json())
The catalog of grantable fine-grained token scopes — drives the token-create wizard.