Metadata-Version: 2.4
Name: authpi-idp
Version: 0.4.0
Summary: Official Python SDK for AuthPI identity provider
Project-URL: Homepage, https://authpi.com
Project-URL: Documentation, https://docs.authpi.com/sdk/python
Project-URL: Repository, https://github.com/arbfay/authpi
Author-email: AuthPI <hello@authpi.com>
License: MIT
Keywords: authentication,authpi,idp,oauth,oidc
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Security
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: httpx>=0.27.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.13.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest-httpx>=0.34.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.8.0; extra == 'dev'
Description-Content-Type: text/markdown

# authpi-idp

Official Python SDK for AuthPI identity provider.

**Requires Python 3.11+**

## Installation

```bash
pip install authpi-idp
# or
poetry add authpi-idp
# or
uv add authpi-idp
```

## Quick Start

```python
from authpi_idp import IdpClient

async def main():
    idp = IdpClient(
        issuer_url="https://idp.authpi.com/iss_xxx",
        client_id="cli_xxx",
        client_secret="secret",  # omit for public clients (SPAs)
        redirect_uri="https://app.example.com/callback",
    )

    # 1. Create authorization URL (sync - no await needed)
    auth = idp.create_authorization_url(
        scopes=["openid", "profile", "email"],
        # org="org_xxx",  # optional selected-org token restriction
    )

    # 2. Store code_verifier, state, and nonce in session, redirect user to auth.url
    session["oauth"] = {"code_verifier": auth.code_verifier, "state": auth.state, "nonce": auth.nonce}
    # redirect(auth.url)

    # 3. Handle callback - exchange code for authenticated agent
    agent = await idp.exchange_code(code, auth.code_verifier)

    # 4. Store tokens for future requests
    session["tokens"] = agent.tokens.model_dump()

    # 5. Check authorization
    if agent.has_access_in("org_xxx", "write", "projects"):
        # User can write to projects in org_xxx
        pass
```

## Session Management

The SDK automatically refreshes expired tokens when creating an agent from stored tokens:

```python
from authpi_idp import IdpClient

idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
)

# Load tokens from your session store (dict or TokenSet)
tokens = session.get("tokens")

# create_agent() automatically refreshes if tokens are expired
agent = await idp.create_agent(
    tokens,
    # Called when tokens are refreshed - persist the new tokens (receives a dict)
    on_refresh=lambda new_tokens: session.update({"tokens": new_tokens}),
    # Called when refresh fails - handle the error
    on_refresh_error=lambda error: print(f"Session expired: {error}"),
)
```

### Configuring Auto-Refresh

```python
idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
    auto_refresh=True,        # Default: True
    refresh_buffer_seconds=60, # Refresh 60s before expiry (default)
)

# Or disable per-call
agent = await idp.create_agent(tokens, auto_refresh=False)
```

### Token Expiration

```python
# Check expiration
agent.expires_at              # Unix timestamp
agent.expires_in              # Seconds until expiry (negative if expired)
agent.is_expired()            # True if expires within 30 seconds (default clock skew buffer)
agent.is_expired(60)          # True if expires within 60 seconds
agent.is_expired(0)           # True only if actually expired (no buffer)
```

## Machine-to-Machine Authentication

For server-to-server or background service authentication using the client credentials flow:

```python
from authpi_idp import IdpClient, PrincipalType

idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="agt_machine1",
    client_secret="secret",
    # No redirect_uri needed for client_credentials
)

agent = await idp.client_credentials(scopes=["users:read", "users:write"])

# Agent uses token-level scopes (no organizations)
agent.has_access("read", "users")  # True
agent.type  # PrincipalType.AGENT
```

## Sync Support

For non-async contexts, use the sync client:

```python
from authpi_idp import IdpClientSync

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="https://app.example.com/callback",
)

auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
agent = idp.exchange_code(code, auth.code_verifier)
```

## Authorization

The SDK includes an **optional** authorization framework based on scopes. You can use it to make local authorization decisions without additional API calls, or you can ignore it entirely and implement your own authorization logic.

The authorization data comes from the `organizations` claim in the ID token, which AuthPI populates based on your organization memberships, optional selected-org requests, and client organization restrictions.
When you request a selected-org token with `org`, `agent.org_id` identifies the selected organization.

### Using the Built-in Authorization

```python
# Check access across all organizations
agent.has_access("read", "users")

# Check access in a specific organization
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")

# Role checks
agent.is_owner_of("org_xxx")   # Has "owner" scope
agent.is_admin_of("org_xxx")   # Has "admin" scope
agent.is_member_of("org_xxx")  # Has any membership

# Get scopes for an organization
scopes = agent.get_scopes_for("org_xxx")
# ["users:read", "projects:**"]
```

### Rolling Your Own Authorization

If the built-in scope system doesn't fit your needs, you can access the raw data directly:

```python
# Access organizations directly
for org in agent.organizations:
    print(org.id)       # "org_xxx"
    print(org.title)    # "Admin" or None
    print(org.scopes)   # ["users:read", "projects:**"]
    print(org.joined_at) # Unix timestamp

# Use agent.id for your own authorization lookups
permissions = my_permission_service.get_permissions(agent.id)
```

### How Scopes Work

AuthPI uses a hierarchical scope format: `resource:action`

**Basic format:**
```
resource:action
resource.subresource:action
```

**Examples:**
- `users:read` — Can read users
- `users:write` — Can create/update users
- `projects.tasks:delete` — Can delete tasks within projects

**Wildcards:**

| Pattern | Description |
|---------|-------------|
| `users:*` | All actions on users (but not sub-resources) |
| `users:**` | All actions on users AND all sub-resources |
| `*:read` | Read access to all top-level resources |
| `*:**` | Full access to everything (super-admin) |

**The difference between `*` and `**`:**

- `projects:*` grants `projects:read`, `projects:write`, `projects:delete`
- `projects:*` does NOT grant `projects.tasks:read` (sub-resource)
- `projects:**` grants all of the above PLUS `projects.tasks:read`, `projects.tasks.comments:write`, etc.

**Scope evaluation example:**

```python
# User has scopes: ["projects:**", "users:read"]

# These all return True:
agent.has_access_in("org_xxx", "read", "projects")
agent.has_access_in("org_xxx", "write", "projects")
agent.has_access_in("org_xxx", "delete", "projects.tasks")
agent.has_access_in("org_xxx", "read", "projects.tasks.comments")
agent.has_access_in("org_xxx", "read", "users")

# These return False:
agent.has_access_in("org_xxx", "write", "users")      # Only has users:read
agent.has_access_in("org_xxx", "read", "billing")     # No billing scope
```

### Special Role Scopes

Three scopes have special meaning and dedicated helper methods:

| Scope | Method | Typical Use |
|-------|--------|-------------|
| `owner` | `is_owner_of(org_id)` | Organization billing, deletion, transfer |
| `admin` | `is_admin_of(org_id)` | Member management, settings |
| `member` | `is_member_of(org_id)` | Basic membership check |

These are checked directly (not via wildcard expansion):

```python
# User has scopes: ["owner", "admin", "projects:**"]
agent.is_owner_of("org_xxx")  # True - has "owner" scope
agent.is_admin_of("org_xxx")  # True - has "admin" scope
agent.is_member_of("org_xxx") # True - has any membership

# Note: "*:**" does NOT grant owner/admin status
# These are explicit role assignments, not permissions
```

### Scope Utilities

For advanced use cases, you can use the scope utilities directly:

```python
from authpi_idp import has_access, parse_scope

# Check if a list of scopes grants access
scopes = ["users:read", "projects:**"]
has_access(scopes, "read", "users")           # True
has_access(scopes, "write", "projects.tasks") # True (** is recursive)
has_access(scopes, "delete", "users")         # False

# Parse a scope string into its components
parsed = parse_scope("users.verifiers:write")
# ParsedScope(resource='users.verifiers', action='write')

parsed = parse_scope("projects:**")
# ParsedScope(resource='projects', action='**')
```

## Error Handling

The SDK provides specific error types for different failure modes:

```python
from authpi_idp import (
    OAuthError,
    TokenExpiredError,
    RefreshError,
    TokenParseError,
    SubjectMismatchError,
    ConfigurationError,
    InsufficientScopeError,
    SessionExpiredError,
    UserBlockedError,
    AccountLinkingRequiredError,
    InteractionRequiredError,
    LoginRequiredError,
    ConsentRequiredError,
)

try:
    agent = await idp.create_agent(tokens)
except TokenExpiredError:
    # Token expired and no refresh token available
    redirect_to_login()
except RefreshError as error:
    # Refresh request failed (e.g., refresh token revoked)
    print(f"Refresh failed: {error.error_description}")
    print(f"Status: {error.status_code}")
    redirect_to_login()
except TokenParseError:
    # ID token missing or malformed
    print("Invalid token data")
except SubjectMismatchError as error:
    # Security: refreshed token belongs to different user
    print(f"Expected {error.expected_sub}, got {error.actual_sub}")
```

### Error Hierarchy

All OAuth errors extend `OAuthError`:

```python
class OAuthError(Exception):
    error: str            # OAuth error code
    error_description: str | None

class TokenExpiredError(OAuthError): ...
class RefreshError(OAuthError):
    status_code: int | None      # HTTP status from token endpoint

class TokenParseError(OAuthError): ...
class SubjectMismatchError(OAuthError):
    expected_sub: str
    actual_sub: str

# AuthPI-specific errors
class InsufficientScopeError(OAuthError): ...  # Token lacks required scope
class SessionExpiredError(OAuthError): ...      # Server-side session timed out
class UserBlockedError(OAuthError): ...         # Blocked user attempted auth
class AccountLinkingRequiredError(OAuthError): ...  # OAuth identity needs linking

# OIDC authorization endpoint errors
class InteractionRequiredError(OAuthError): ...  # Silent auth failed
class LoginRequiredError(OAuthError): ...        # No active session
class ConsentRequiredError(OAuthError): ...      # User hasn't consented
```

## User Info

For full profile data beyond the ID token claims:

```python
userinfo = await idp.get_user_info(agent)

userinfo.sub           # "usr_xxx"
userinfo.email         # "user@example.com"
userinfo.name          # "John Doe"
userinfo.picture       # "https://..."
userinfo.org_id        # "org_xxx" when the token is restricted to one org
userinfo.organizations # [Organization(...), ...]
```

## Logout

```python
from authpi_idp import LogoutOptions

logout_url = idp.create_logout_url(
    LogoutOptions(
        id_token_hint=agent.tokens.id_token,
        post_logout_redirect_uri="https://app.example.com",
        state="logout_state",
    )
)

# redirect(logout_url)
```

## Token Revocation

```python
# Revoke refresh token (recommended on logout)
await idp.revoke_token(agent.tokens.refresh_token, "refresh_token")

# Revoke access token
await idp.revoke_token(agent.tokens.access_token, "access_token")
```

## API Reference

### IdpClient / IdpClientSync

```python
IdpClient(
    issuer_url: str,          # OIDC issuer URL
    client_id: str,           # OAuth client ID
    redirect_uri: str | None = None,  # Required for auth code flow, optional for client_credentials
    client_secret: str | None = None,  # Optional for public clients
    auto_refresh: bool = True,         # Auto-refresh expired tokens
    refresh_buffer_seconds: int = 60,  # Refresh buffer
)
```

**Methods:**

| Method | Description |
|--------|-------------|
| `create_authorization_url(scopes, state?, nonce?, org?)` | Create OAuth authorization URL with PKCE; accepts optional `org` for selected-org tokens |
| `exchange_code(code, code_verifier)` | Exchange authorization code for agent |
| `create_agent(tokens, *, on_refresh?, on_refresh_error?, auto_refresh?)` | Create agent from stored tokens (auto-refreshes) |
| `refresh(agent)` | Manually refresh tokens |
| `get_user_info(agent)` | Fetch full user profile |
| `create_logout_url(options?)` | Create OIDC logout URL |
| `client_credentials(scopes?)` | Authenticate via client credentials (M2M) |
| `revoke_token(token, hint?)` | Revoke a token |

### AuthenticatedAgent

```python
class AuthenticatedAgent:
    # Identity
    id: str                 # Subject ID (usr_xxx, tok_xxx, key_xxx, agt_xxx)
    type: PrincipalType     # "user" | "personal_token" | "api_key" | "agent"
    email: str | None
    email_verified: bool | None
    org_id: str | None       # Selected org for org-restricted tokens

    # Tokens (for storage)
    tokens: TokenSet

    # Organizations (from ID token)
    organizations: list[Organization]

    # Expiration
    @property
    def expires_at(self) -> int: ...          # Unix timestamp
    @property
    def expires_in(self) -> int: ...          # Seconds until expiry
    def is_expired(self, buffer: int = 30) -> bool: ...

    # Authorization
    def has_access(self, action: str, resource: str) -> bool: ...
    def has_access_in(self, org_id: str, action: str, resource: str) -> bool: ...
    def get_scopes_for(self, org_id: str) -> list[str]: ...
    def is_owner_of(self, org_id: str) -> bool: ...
    def is_admin_of(self, org_id: str) -> bool: ...
    def is_member_of(self, org_id: str) -> bool: ...
```

## Framework Examples

### FastAPI

```python
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from authpi_idp import IdpClient, TokenExpiredError

app = FastAPI()
idp = IdpClient(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:8000/callback",
)


@app.get("/login")
async def login(request: Request):
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    request.session["code_verifier"] = auth.code_verifier
    request.session["state"] = auth.state
    return RedirectResponse(auth.url)


@app.get("/callback")
async def callback(request: Request, code: str, state: str):
    if state != request.session.get("state"):
        return {"error": "Invalid state"}

    agent = await idp.exchange_code(code, request.session["code_verifier"])
    request.session["tokens"] = agent.tokens.model_dump()
    return RedirectResponse("/dashboard")


async def get_agent(request: Request):
    tokens_data = request.session.get("tokens")
    if not tokens_data:
        raise HTTPException(status_code=401)

    try:
        return await idp.create_agent(
            tokens_data,
            on_refresh=lambda t: request.session.update({"tokens": t}),
        )
    except TokenExpiredError:
        raise HTTPException(status_code=401)


@app.get("/dashboard")
async def dashboard(agent=Depends(get_agent)):
    if not agent.has_access_in("org_xxx", "read", "dashboard"):
        raise HTTPException(status_code=403)
    return {"user": agent.id, "email": agent.email}
```

### Django

```python
from django.shortcuts import redirect
from django.http import HttpResponse
from authpi_idp import IdpClientSync

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:8000/callback",
)


def login(request):
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    request.session["code_verifier"] = auth.code_verifier
    request.session["state"] = auth.state
    return redirect(auth.url)


def callback(request):
    code = request.GET.get("code")
    state = request.GET.get("state")

    if state != request.session.get("state"):
        return HttpResponse("Invalid state", status=400)

    agent = idp.exchange_code(code, request.session["code_verifier"])
    request.session["tokens"] = agent.tokens.model_dump()
    return redirect("/dashboard")


def dashboard(request):
    tokens_data = request.session.get("tokens")
    if not tokens_data:
        return redirect("/login")

    agent = idp.create_agent(tokens_data)

    if not agent.has_access_in("org_xxx", "read", "dashboard"):
        return HttpResponse("Forbidden", status=403)

    return HttpResponse(f"Welcome, {agent.email}")
```

### Flask

```python
from flask import Flask, redirect, session, request
from authpi_idp import IdpClientSync

app = Flask(__name__)
app.secret_key = "your-secret-key"

idp = IdpClientSync(
    issuer_url="https://idp.authpi.com/iss_xxx",
    client_id="cli_xxx",
    redirect_uri="http://localhost:5000/callback",
)


@app.route("/login")
def login():
    auth = idp.create_authorization_url(scopes=["openid", "profile", "email"])
    session["code_verifier"] = auth.code_verifier
    session["state"] = auth.state
    return redirect(auth.url)


@app.route("/callback")
def callback():
    code = request.args.get("code")
    state = request.args.get("state")

    if state != session.get("state"):
        return "Invalid state", 400

    agent = idp.exchange_code(code, session["code_verifier"])
    session["tokens"] = agent.tokens.model_dump()
    return redirect("/dashboard")


@app.route("/dashboard")
def dashboard():
    tokens_data = session.get("tokens")
    if not tokens_data:
        return redirect("/login")

    agent = idp.create_agent(tokens_data)
    return f"Welcome, {agent.email}"
```

## Context Manager Support

Both clients support context managers for proper resource cleanup:

```python
# Async
async with IdpClient(...) as idp:
    auth = idp.create_authorization_url(scopes=["openid"])

# Sync
with IdpClientSync(...) as idp:
    auth = idp.create_authorization_url(scopes=["openid"])
```

## License

MIT
