akiflow.auth

Low-level authentication helpers.

Most users should use Akiflow(email=...) instead of calling these directly. These are exposed for advanced use cases like custom token storage.

  1"""Low-level authentication helpers.
  2
  3Most users should use `Akiflow(email=...)` instead of calling these directly.
  4These are exposed for advanced use cases like custom token storage.
  5"""
  6
  7from __future__ import annotations
  8
  9import uuid
 10from urllib.parse import unquote
 11
 12import httpx
 13
 14from .exceptions import AuthError
 15
 16WEB_BASE = "https://web.akiflow.com"
 17DEFAULT_HEADERS = {
 18    "Akiflow-Platform": "web",
 19    "Akiflow-Version": "2.69.3",
 20}
 21
 22
 23def _extract_xsrf(cookies: httpx.Cookies) -> str:
 24    token = cookies.get("XSRF-TOKEN")
 25    if not token:
 26        raise AuthError("No XSRF-TOKEN cookie received")
 27    return unquote(token)
 28
 29
 30def interactive_login(email: str, client_id: str | None = None, verify_ssl: bool = True) -> dict:
 31    """Run the full interactive auth flow: email -> OTP -> tokens.
 32
 33    Sends a one-time code to `email`, prompts via `input()`, and exchanges
 34    the verified session for OAuth tokens.
 35
 36    Args:
 37        email: Akiflow account email address.
 38        client_id: Optional client UUID. Auto-generated if omitted.
 39        verify_ssl: Set to False to skip SSL verification (for proxies).
 40
 41    Returns:
 42        Dict with `access_token`, `refresh_token`, `expires_in`, `client_id`.
 43
 44    Raises:
 45        AuthError: If login or OTP verification fails.
 46
 47    Example:
 48        ```python
 49        from akiflow.auth import interactive_login
 50
 51        tokens = interactive_login("you@example.com")
 52        print(tokens["access_token"])
 53        ```
 54    """
 55    cid = client_id or str(uuid.uuid4())
 56
 57    with httpx.Client(follow_redirects=True, verify=verify_ssl) as http:
 58        # Step 1: get CSRF cookie
 59        resp = http.get(f"{WEB_BASE}/csrf-cookie")
 60        resp.raise_for_status()
 61        xsrf = _extract_xsrf(http.cookies)
 62
 63        # Step 2: request OTP email
 64        resp = http.post(
 65            f"{WEB_BASE}/auth/login",
 66            json={"email": email, "remember": True},
 67            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
 68        )
 69        resp.raise_for_status()
 70        body = resp.json()
 71        if not body.get("success"):
 72            raise AuthError(f"Login request failed: {body.get('message')}")
 73
 74        redirect_url = body["redirect"]
 75
 76        # Step 3: prompt user for OTP code
 77        code = input(f"Enter the 6-digit code sent to {email}: ").strip()
 78        if not code:
 79            raise AuthError("No code provided")
 80
 81        # Refresh XSRF after login call (cookie may have rotated)
 82        xsrf = _extract_xsrf(http.cookies)
 83
 84        # Step 4: verify OTP
 85        resp = http.post(
 86            redirect_url,
 87            json={"email": email, "code": code},
 88            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
 89        )
 90        resp.raise_for_status()
 91        body = resp.json()
 92        if not body.get("success"):
 93            raise AuthError(f"OTP verification failed: {body.get('message')}")
 94
 95        # Refresh XSRF after OTP verification
 96        xsrf = _extract_xsrf(http.cookies)
 97
 98        # Step 5: get user info + tokens
 99        resp = http.get(
100            f"{WEB_BASE}/user/me",
101            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
102        )
103        resp.raise_for_status()
104        user_data = resp.json()["data"]
105
106        return {
107            "access_token": user_data["access_token"],
108            "refresh_token": user_data["refresh_token"],
109            "expires_in": user_data["expires_in"],
110            "client_id": cid,
111        }
112
113
114def refresh_access_token(
115    refresh_token: str,
116    client_id: str | None = None,
117    oauth_client_id: str = "10",
118    verify_ssl: bool = True,
119) -> dict:
120    """Exchange a refresh token for a new access + refresh token pair.
121
122    The refresh token is rotated on each call (the old one is invalidated).
123
124    Args:
125        refresh_token: Current refresh token (`def50200...`).
126        client_id: Optional client UUID. Auto-generated if omitted.
127        oauth_client_id: OAuth client ID (`"10"` for web).
128        verify_ssl: Set to False to skip SSL verification.
129
130    Returns:
131        Dict with `access_token`, `refresh_token`, `expires_in`, `client_id`.
132
133    Example:
134        ```python
135        from akiflow.auth import refresh_access_token
136
137        new_tokens = refresh_access_token("def50200...")
138        print(new_tokens["access_token"])
139        ```
140    """
141    cid = client_id or str(uuid.uuid4())
142
143    with httpx.Client(follow_redirects=True, verify=verify_ssl) as http:
144        # Need CSRF cookie first
145        resp = http.get(f"{WEB_BASE}/csrf-cookie")
146        resp.raise_for_status()
147        xsrf = _extract_xsrf(http.cookies)
148
149        resp = http.post(
150            f"{WEB_BASE}/oauth/refreshToken",
151            json={"client_id": oauth_client_id, "refresh_token": refresh_token},
152            headers={
153                **DEFAULT_HEADERS,
154                "X-XSRF-TOKEN": xsrf,
155                "Akiflow-Client-Id": cid,
156            },
157        )
158        resp.raise_for_status()
159        body = resp.json()
160
161        return {
162            "access_token": body["access_token"],
163            "refresh_token": body["refresh_token"],
164            "expires_in": body["expires_in"],
165            "client_id": cid,
166        }
WEB_BASE = 'https://web.akiflow.com'
DEFAULT_HEADERS = {'Akiflow-Platform': 'web', 'Akiflow-Version': '2.69.3'}
def interactive_login( email: str, client_id: str | None = None, verify_ssl: bool = True) -> dict:
 31def interactive_login(email: str, client_id: str | None = None, verify_ssl: bool = True) -> dict:
 32    """Run the full interactive auth flow: email -> OTP -> tokens.
 33
 34    Sends a one-time code to `email`, prompts via `input()`, and exchanges
 35    the verified session for OAuth tokens.
 36
 37    Args:
 38        email: Akiflow account email address.
 39        client_id: Optional client UUID. Auto-generated if omitted.
 40        verify_ssl: Set to False to skip SSL verification (for proxies).
 41
 42    Returns:
 43        Dict with `access_token`, `refresh_token`, `expires_in`, `client_id`.
 44
 45    Raises:
 46        AuthError: If login or OTP verification fails.
 47
 48    Example:
 49        ```python
 50        from akiflow.auth import interactive_login
 51
 52        tokens = interactive_login("you@example.com")
 53        print(tokens["access_token"])
 54        ```
 55    """
 56    cid = client_id or str(uuid.uuid4())
 57
 58    with httpx.Client(follow_redirects=True, verify=verify_ssl) as http:
 59        # Step 1: get CSRF cookie
 60        resp = http.get(f"{WEB_BASE}/csrf-cookie")
 61        resp.raise_for_status()
 62        xsrf = _extract_xsrf(http.cookies)
 63
 64        # Step 2: request OTP email
 65        resp = http.post(
 66            f"{WEB_BASE}/auth/login",
 67            json={"email": email, "remember": True},
 68            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
 69        )
 70        resp.raise_for_status()
 71        body = resp.json()
 72        if not body.get("success"):
 73            raise AuthError(f"Login request failed: {body.get('message')}")
 74
 75        redirect_url = body["redirect"]
 76
 77        # Step 3: prompt user for OTP code
 78        code = input(f"Enter the 6-digit code sent to {email}: ").strip()
 79        if not code:
 80            raise AuthError("No code provided")
 81
 82        # Refresh XSRF after login call (cookie may have rotated)
 83        xsrf = _extract_xsrf(http.cookies)
 84
 85        # Step 4: verify OTP
 86        resp = http.post(
 87            redirect_url,
 88            json={"email": email, "code": code},
 89            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
 90        )
 91        resp.raise_for_status()
 92        body = resp.json()
 93        if not body.get("success"):
 94            raise AuthError(f"OTP verification failed: {body.get('message')}")
 95
 96        # Refresh XSRF after OTP verification
 97        xsrf = _extract_xsrf(http.cookies)
 98
 99        # Step 5: get user info + tokens
100        resp = http.get(
101            f"{WEB_BASE}/user/me",
102            headers={**DEFAULT_HEADERS, "X-XSRF-TOKEN": xsrf},
103        )
104        resp.raise_for_status()
105        user_data = resp.json()["data"]
106
107        return {
108            "access_token": user_data["access_token"],
109            "refresh_token": user_data["refresh_token"],
110            "expires_in": user_data["expires_in"],
111            "client_id": cid,
112        }

Run the full interactive auth flow: email -> OTP -> tokens.

Sends a one-time code to email, prompts via input(), and exchanges the verified session for OAuth tokens.

Args: email: Akiflow account email address. client_id: Optional client UUID. Auto-generated if omitted. verify_ssl: Set to False to skip SSL verification (for proxies).

Returns: Dict with access_token, refresh_token, expires_in, client_id.

Raises: AuthError: If login or OTP verification fails.

Example:

from akiflow.auth import interactive_login

tokens = interactive_login("you@example.com")
print(tokens["access_token"])
def refresh_access_token( refresh_token: str, client_id: str | None = None, oauth_client_id: str = '10', verify_ssl: bool = True) -> dict:
115def refresh_access_token(
116    refresh_token: str,
117    client_id: str | None = None,
118    oauth_client_id: str = "10",
119    verify_ssl: bool = True,
120) -> dict:
121    """Exchange a refresh token for a new access + refresh token pair.
122
123    The refresh token is rotated on each call (the old one is invalidated).
124
125    Args:
126        refresh_token: Current refresh token (`def50200...`).
127        client_id: Optional client UUID. Auto-generated if omitted.
128        oauth_client_id: OAuth client ID (`"10"` for web).
129        verify_ssl: Set to False to skip SSL verification.
130
131    Returns:
132        Dict with `access_token`, `refresh_token`, `expires_in`, `client_id`.
133
134    Example:
135        ```python
136        from akiflow.auth import refresh_access_token
137
138        new_tokens = refresh_access_token("def50200...")
139        print(new_tokens["access_token"])
140        ```
141    """
142    cid = client_id or str(uuid.uuid4())
143
144    with httpx.Client(follow_redirects=True, verify=verify_ssl) as http:
145        # Need CSRF cookie first
146        resp = http.get(f"{WEB_BASE}/csrf-cookie")
147        resp.raise_for_status()
148        xsrf = _extract_xsrf(http.cookies)
149
150        resp = http.post(
151            f"{WEB_BASE}/oauth/refreshToken",
152            json={"client_id": oauth_client_id, "refresh_token": refresh_token},
153            headers={
154                **DEFAULT_HEADERS,
155                "X-XSRF-TOKEN": xsrf,
156                "Akiflow-Client-Id": cid,
157            },
158        )
159        resp.raise_for_status()
160        body = resp.json()
161
162        return {
163            "access_token": body["access_token"],
164            "refresh_token": body["refresh_token"],
165            "expires_in": body["expires_in"],
166            "client_id": cid,
167        }

Exchange a refresh token for a new access + refresh token pair.

The refresh token is rotated on each call (the old one is invalidated).

Args: refresh_token: Current refresh token (def50200...). client_id: Optional client UUID. Auto-generated if omitted. oauth_client_id: OAuth client ID ("10" for web). verify_ssl: Set to False to skip SSL verification.

Returns: Dict with access_token, refresh_token, expires_in, client_id.

Example:

from akiflow.auth import refresh_access_token

new_tokens = refresh_access_token("def50200...")
print(new_tokens["access_token"])