akiflow

Akiflow SDK

Unofficial Python SDK for the Akiflow task management software.

Quick Start

Install the SDK:

pip install akiflow
# or
uv add akiflow
from akiflow import Akiflow

# Interactive login (prompts for email + 2FA code)
client = Akiflow()

# Or reuse saved tokens (no interactive prompt)
client = Akiflow(access_token="eyJ...", refresh_token="def50200...")

# Create a task in your inbox
task = client.task.create("Buy groceries")

# Schedule a task
task = client.task.create(
    "Team standup",
    date="2026-03-27",
    datetime_="2026-03-27T09:00:00.000Z",
    duration=1800,
)

# Update, complete, or delete
client.task.update(task["id"], title="Team standup (moved)")
client.task.done(task["id"])
client.task.delete(task["id"])

Authentication

Akiflow uses passwordless email + OTP authentication. On first use, pass your email to trigger the interactive flow. The client prints access_token and refresh_token after success — save them for reuse.

The access token expires in 30 minutes, but the refresh token is long-lived (~13 months). The client auto-refreshes on 401, so passing just a refresh_token is enough for persistent scripts.

Debugging

Pass debug=True to print every request and response:

client = Akiflow(email="you@example.com", debug=True)  # or omit email to be prompted

Pass verify_ssl=False to disable SSL verification (useful with Proxyman/Charles):

client = Akiflow(access_token="...", verify_ssl=False)
 1"""
 2# Akiflow SDK
 3
 4Unofficial Python SDK for the [Akiflow](https://akiflow.com) task management software.
 5
 6## Quick Start
 7
 8Install the SDK:
 9```bash
10pip install akiflow
11# or
12uv add akiflow
13```
14
15```python
16from akiflow import Akiflow
17
18# Interactive login (prompts for email + 2FA code)
19client = Akiflow()
20
21# Or reuse saved tokens (no interactive prompt)
22client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
23
24# Create a task in your inbox
25task = client.task.create("Buy groceries")
26
27# Schedule a task
28task = client.task.create(
29    "Team standup",
30    date="2026-03-27",
31    datetime_="2026-03-27T09:00:00.000Z",
32    duration=1800,
33)
34
35# Update, complete, or delete
36client.task.update(task["id"], title="Team standup (moved)")
37client.task.done(task["id"])
38client.task.delete(task["id"])
39```
40
41## Authentication
42
43Akiflow uses **passwordless email + OTP** authentication. On first use,
44pass your `email` to trigger the interactive flow. The client prints
45`access_token` and `refresh_token` after success — save them for reuse.
46
47The **access token** expires in 30 minutes, but the **refresh token** is
48long-lived (~13 months). The client auto-refreshes on 401, so passing just
49a `refresh_token` is enough for persistent scripts.
50
51## Debugging
52
53Pass `debug=True` to print every request and response:
54
55```python
56client = Akiflow(email="you@example.com", debug=True)  # or omit email to be prompted
57
58```
59
60Pass `verify_ssl=False` to disable SSL verification (useful with Proxyman/Charles):
61
62```python
63client = Akiflow(access_token="...", verify_ssl=False)
64```
65"""
66
67from .client import Akiflow
68from .exceptions import AkiflowError, APIError, AuthError, TokenExpiredError
69from .label import Label
70from .task import Task
71
72__all__ = ["Akiflow", "Label", "Task", "AkiflowError", "APIError", "AuthError", "TokenExpiredError"]
class Akiflow:
 20class Akiflow:
 21    """Unofficial Akiflow API client.
 22
 23    Provides access to Akiflow resources through sub-clients:
 24
 25    - `client.task` — create, update, delete, and list tasks
 26    - `client.label` — create, update, delete, and list labels/projects
 27
 28    There are three ways to authenticate:
 29
 30    **Interactive login** (prompts for email + 2FA code):
 31    ```python
 32    client = Akiflow()
 33    ```
 34
 35    **Access token + refresh token** (no prompt, auto-refreshes):
 36    ```python
 37    client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
 38    ```
 39
 40    **Refresh token only** (exchanges for access token on init):
 41    ```python
 42    client = Akiflow(refresh_token="def50200...")
 43    ```
 44
 45    Args:
 46        email: Akiflow account email. Triggers interactive OTP flow.
 47        access_token: JWT access token (expires in 30 min).
 48        refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh.
 49        client_id: Client UUID sent with requests. Auto-generated if omitted.
 50        auto_refresh: Automatically refresh expired tokens on 401. Default `True`.
 51        debug: Print request/response details to stdout. Default `False`.
 52        verify_ssl: Verify SSL certificates. Set `False` for proxy tools. Default `True`.
 53    """
 54
 55    def __init__(
 56        self,
 57        *,
 58        email: str | None = None,
 59        access_token: str | None = None,
 60        refresh_token: str | None = None,
 61        client_id: str | None = None,
 62        auto_refresh: bool = True,
 63        debug: bool = False,
 64        verify_ssl: bool = True,
 65    ):
 66        self._client_id = client_id or str(uuid.uuid4())
 67        self._auto_refresh = auto_refresh
 68        self._debug = debug
 69        self._verify_ssl = verify_ssl
 70        self._access_token: str | None = access_token
 71        self._refresh_token: str | None = refresh_token
 72
 73        if not email and not access_token and not refresh_token:
 74            email = input("Akiflow account email: ").strip()
 75            if not email:
 76                raise ValueError("Email is required for interactive login")
 77
 78        if email and not self._access_token:
 79            tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl)
 80            self._access_token = tokens["access_token"]
 81            self._refresh_token = tokens["refresh_token"]
 82            self._client_id = tokens["client_id"]
 83            print("\nAuthenticated successfully.")
 84
 85        if not self._access_token:
 86            if self._refresh_token:
 87                self._do_refresh()
 88            else:
 89                raise ValueError("Provide email (for interactive login) or access_token/refresh_token")
 90
 91        self._http = httpx.Client(
 92            base_url=API_BASE,
 93            headers=self._auth_headers(),
 94            timeout=30.0,
 95            verify=self._verify_ssl,
 96        )
 97
 98        self.label = Label(self)
 99        """Label/project operations. See `akiflow.label.Label`."""
100
101        self.task = Task(self)
102        """Task operations. See `akiflow.task.Task`."""
103
104    def _auth_headers(self) -> dict[str, str]:
105        return {
106            "Authorization": f"Bearer {self._access_token}",
107            "Akiflow-Platform": "web",
108            "Akiflow-Version": "2.69.3",
109            "Akiflow-Client-Id": self._client_id,
110            "Accept": "application/json",
111            "Content-Type": "application/json",
112        }
113
114    def _do_refresh(self) -> None:
115        if not self._refresh_token:
116            raise TokenExpiredError(401, "No refresh token available")
117        tokens = refresh_access_token(self._refresh_token, client_id=self._client_id, verify_ssl=self._verify_ssl)
118        self._access_token = tokens["access_token"]
119        self._refresh_token = tokens["refresh_token"]
120        self._client_id = tokens["client_id"]
121        if hasattr(self, "_http"):
122            self._http.headers.update(self._auth_headers())
123
124    def _request(self, method: str, path: str, **kwargs: Any) -> dict:
125        if self._debug:
126            body = kwargs.get("json")
127            print(f"\n>>> {method} {API_BASE}{path}")
128            if kwargs.get("params"):
129                print(f"    params: {kwargs['params']}")
130            if body is not None:
131                print(f"    body:   {json.dumps(body, indent=2)}")
132
133        resp = self._http.request(method, path, **kwargs)
134
135        if self._debug:
136            print(f"<<< {resp.status_code}")
137            try:
138                print(f"    body:   {json.dumps(resp.json(), indent=2)}")
139            except Exception:
140                print(f"    body:   {resp.text[:500]}")
141
142        # Auto-refresh on 401
143        if resp.status_code == 401 and self._auto_refresh and self._refresh_token:
144            if self._debug:
145                print("    (refreshing token...)")
146            self._do_refresh()
147            resp = self._http.request(method, path, **kwargs)
148            if self._debug:
149                print(f"<<< {resp.status_code} (after refresh)")
150                try:
151                    print(f"    body:   {json.dumps(resp.json(), indent=2)}")
152                except Exception:
153                    print(f"    body:   {resp.text[:500]}")
154
155        if resp.status_code == 401:
156            raise TokenExpiredError(401, resp.json().get("message", "Unauthorized"))
157        if not resp.is_success:
158            raise APIError(resp.status_code, resp.text)
159
160        return resp.json()
161
162    def _get(self, path: str, **kwargs: Any) -> dict:
163        return self._request("GET", path, **kwargs)
164
165    def _patch(self, path: str, **kwargs: Any) -> dict:
166        return self._request("PATCH", path, **kwargs)
167
168    @property
169    def access_token(self) -> str | None:
170        """Current JWT access token (may change after auto-refresh)."""
171        return self._access_token
172
173    @property
174    def refresh_token(self) -> str | None:
175        """Current refresh token (rotates on each refresh)."""
176        return self._refresh_token
177
178    def close(self) -> None:
179        """Close the underlying HTTP connection."""
180        self._http.close()
181
182    def __enter__(self):
183        return self
184
185    def __exit__(self, *args):
186        self.close()

Unofficial Akiflow API client.

Provides access to Akiflow resources through sub-clients:

  • client.task — create, update, delete, and list tasks
  • client.label — create, update, delete, and list labels/projects

There are three ways to authenticate:

Interactive login (prompts for email + 2FA code):

client = Akiflow()

Access token + refresh token (no prompt, auto-refreshes):

client = Akiflow(access_token="eyJ...", refresh_token="def50200...")

Refresh token only (exchanges for access token on init):

client = Akiflow(refresh_token="def50200...")
Arguments:
  • email: Akiflow account email. Triggers interactive OTP flow.
  • access_token: JWT access token (expires in 30 min).
  • refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh.
  • client_id: Client UUID sent with requests. Auto-generated if omitted.
  • auto_refresh: Automatically refresh expired tokens on 401. Default True.
  • debug: Print request/response details to stdout. Default False.
  • verify_ssl: Verify SSL certificates. Set False for proxy tools. Default True.
Akiflow( *, email: str | None = None, access_token: str | None = None, refresh_token: str | None = None, client_id: str | None = None, auto_refresh: bool = True, debug: bool = False, verify_ssl: bool = True)
 55    def __init__(
 56        self,
 57        *,
 58        email: str | None = None,
 59        access_token: str | None = None,
 60        refresh_token: str | None = None,
 61        client_id: str | None = None,
 62        auto_refresh: bool = True,
 63        debug: bool = False,
 64        verify_ssl: bool = True,
 65    ):
 66        self._client_id = client_id or str(uuid.uuid4())
 67        self._auto_refresh = auto_refresh
 68        self._debug = debug
 69        self._verify_ssl = verify_ssl
 70        self._access_token: str | None = access_token
 71        self._refresh_token: str | None = refresh_token
 72
 73        if not email and not access_token and not refresh_token:
 74            email = input("Akiflow account email: ").strip()
 75            if not email:
 76                raise ValueError("Email is required for interactive login")
 77
 78        if email and not self._access_token:
 79            tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl)
 80            self._access_token = tokens["access_token"]
 81            self._refresh_token = tokens["refresh_token"]
 82            self._client_id = tokens["client_id"]
 83            print("\nAuthenticated successfully.")
 84
 85        if not self._access_token:
 86            if self._refresh_token:
 87                self._do_refresh()
 88            else:
 89                raise ValueError("Provide email (for interactive login) or access_token/refresh_token")
 90
 91        self._http = httpx.Client(
 92            base_url=API_BASE,
 93            headers=self._auth_headers(),
 94            timeout=30.0,
 95            verify=self._verify_ssl,
 96        )
 97
 98        self.label = Label(self)
 99        """Label/project operations. See `akiflow.label.Label`."""
100
101        self.task = Task(self)
102        """Task operations. See `akiflow.task.Task`."""
label

Label/project operations. See akiflow.label.Label.

task

Task operations. See akiflow.task.Task.

access_token: str | None
168    @property
169    def access_token(self) -> str | None:
170        """Current JWT access token (may change after auto-refresh)."""
171        return self._access_token

Current JWT access token (may change after auto-refresh).

refresh_token: str | None
173    @property
174    def refresh_token(self) -> str | None:
175        """Current refresh token (rotates on each refresh)."""
176        return self._refresh_token

Current refresh token (rotates on each refresh).

def close(self) -> None:
178    def close(self) -> None:
179        """Close the underlying HTTP connection."""
180        self._http.close()

Close the underlying HTTP connection.

class Label:
 14class Label:
 15    """Operations on Akiflow labels/projects, available as `client.label`.
 16
 17    Labels are what Akiflow calls "projects" in the UI. Tasks reference
 18    labels via their `listId` field. Labels can be organized into folders
 19    using `parent_id` and `type="folder"`.
 20
 21    Example:
 22        ```python
 23        from akiflow import Akiflow
 24
 25        client = Akiflow(refresh_token="def50200...")
 26
 27        # Create a label
 28        label = client.label.create("My Project", color="palette-green")
 29
 30        # List all labels
 31        result = client.label.list()
 32        for lb in result["data"]:
 33            print(lb["title"], lb["id"])
 34
 35        # Find label ID by name
 36        label_id = client.label.get_id("My Project")
 37
 38        # Update
 39        client.label.update(label["id"], title="Renamed Project")
 40
 41        # Delete
 42        client.label.delete(label["id"])
 43        ```
 44    """
 45
 46    # Cache of name -> id, populated on first lookup
 47    _name_cache: dict[str, str] | None = None
 48
 49    def __init__(self, client: Akiflow):
 50        self._client = client
 51        self._name_cache = None
 52
 53    def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
 54        """Fetch labels, with optional incremental sync.
 55
 56        Args:
 57            sync_token: Cursor from a previous `list()` response.
 58            limit: Max labels per page (default 2500).
 59
 60        Returns:
 61            Dict with `data` (list of label dicts), `sync_token`, and
 62            `has_next_page`.
 63        """
 64        params: dict[str, Any] = {"limit": str(limit)}
 65        if sync_token:
 66            params["sync_token"] = sync_token
 67        return self._client._get("/v5/labels", params=params)
 68
 69    def create(
 70        self,
 71        title: str,
 72        *,
 73        color: str | None = None,
 74        icon: str | None = None,
 75        parent_id: str | None = None,
 76        type: str | None = None,
 77        **extra: Any,
 78    ) -> dict:
 79        """Create a new label.
 80
 81        Args:
 82            title: Label name.
 83            color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`).
 84            icon: Emoji icon.
 85            parent_id: UUID of a folder label to nest under.
 86            type: Set to `"folder"` to create a folder, or `None` for a label.
 87            **extra: Additional fields passed directly to the API.
 88
 89        Returns:
 90            The created label dict.
 91
 92        Example:
 93            ```python
 94            label = client.label.create("Work", color="palette-cobalt")
 95            folder = client.label.create("Area", type="folder")
 96            nested = client.label.create("Sub-project", parent_id=folder["id"])
 97            ```
 98        """
 99        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
100        label_id = str(uuid.uuid4())
101        sorting = int(datetime.now(timezone.utc).timestamp() * 1000)
102
103        label: dict[str, Any] = {
104            "id": label_id,
105            "title": title,
106            "color": color,
107            "icon": icon,
108            "sorting": sorting,
109            "parent_id": parent_id,
110            "type": type,
111            "data": {},
112            "global_created_at": now,
113            "global_updated_at": now,
114            "deleted_at": None,
115            **extra,
116        }
117
118        resp = self._client._patch("/v5/labels", json=[label])
119        created = resp["data"][0] if resp.get("data") else resp
120        # Invalidate name cache
121        self._name_cache = None
122        return created
123
124    def update(self, label_id: str, **fields: Any) -> dict:
125        """Update a label by ID.
126
127        Args:
128            label_id: UUID of the label to update.
129            **fields: Any label fields to update (e.g. `title`, `color`, `icon`).
130
131        Returns:
132            The updated label dict.
133
134        Example:
135            ```python
136            client.label.update(label_id, title="New Name", color="palette-red")
137            ```
138        """
139        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
140        payload: dict[str, Any] = {
141            "id": label_id,
142            "global_updated_at": now,
143        }
144        payload.update(fields)
145
146        resp = self._client._patch("/v5/labels", json=[payload])
147        self._name_cache = None
148        if resp.get("data"):
149            return resp["data"][0]
150        return resp
151
152    def delete(self, label_id: str) -> dict:
153        """Soft-delete a label.
154
155        Args:
156            label_id: UUID of the label to delete.
157
158        Returns:
159            The updated label dict with `deleted_at` set.
160
161        Example:
162            ```python
163            client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
164            ```
165        """
166        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
167        payload: dict[str, Any] = {
168            "id": label_id,
169            "title": None,
170            "color": None,
171            "sorting": None,
172            "icon": None,
173            "parent_id": None,
174            "type": None,
175            "data": {},
176            "deleted_at": now,
177            "global_updated_at": now,
178        }
179        resp = self._client._patch("/v5/labels", json=[payload])
180        self._name_cache = None
181        if resp.get("data"):
182            return resp["data"][0]
183        return resp
184
185    def _build_name_cache(self) -> dict[str, str]:
186        """Fetch all labels and build a lowercase name -> id mapping."""
187        result = self.list()
188        cache: dict[str, str] = {}
189        for lb in result.get("data", []):
190            if lb.get("title") and not lb.get("deleted_at"):
191                cache[lb["title"].lower()] = lb["id"]
192        return cache
193
194    def get_id(self, name: str) -> str | None:
195        """Resolve a label name to its UUID.
196
197        Case-insensitive. Returns `None` if no label matches.
198
199        Args:
200            name: Label name to look up.
201
202        Returns:
203            Label UUID, or `None` if not found.
204
205        Example:
206            ```python
207            label_id = client.label.get_id("Work")
208            ```
209        """
210        if self._name_cache is None:
211            self._name_cache = self._build_name_cache()
212        return self._name_cache.get(name.lower())
213
214    def resolve_id(self, label: str) -> str:
215        """Resolve a label name or UUID to a UUID.
216
217        If `label` looks like a UUID, returns it as-is. Otherwise, looks
218        it up by name (case-insensitive).
219
220        Args:
221            label: Label UUID or name.
222
223        Returns:
224            Label UUID.
225
226        Raises:
227            ValueError: If the name doesn't match any label.
228
229        Example:
230            ```python
231            # Both return the same UUID:
232            client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
233            client.label.resolve_id("Work")
234            ```
235        """
236        try:
237            uuid.UUID(label)
238            return label
239        except ValueError:
240            pass
241
242        label_id = self.get_id(label)
243        if label_id is None:
244            raise ValueError(f"No label found with name: {label!r}")
245        return label_id

Operations on Akiflow labels/projects, available as client.label.

Labels are what Akiflow calls "projects" in the UI. Tasks reference labels via their listId field. Labels can be organized into folders using parent_id and type="folder".

Example:
from akiflow import Akiflow

client = Akiflow(refresh_token="def50200...")

# Create a label
label = client.label.create("My Project", color="palette-green")

# List all labels
result = client.label.list()
for lb in result["data"]:
    print(lb["title"], lb["id"])

# Find label ID by name
label_id = client.label.get_id("My Project")

# Update
client.label.update(label["id"], title="Renamed Project")

# Delete
client.label.delete(label["id"])
Label(client: Akiflow)
49    def __init__(self, client: Akiflow):
50        self._client = client
51        self._name_cache = None
def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
53    def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
54        """Fetch labels, with optional incremental sync.
55
56        Args:
57            sync_token: Cursor from a previous `list()` response.
58            limit: Max labels per page (default 2500).
59
60        Returns:
61            Dict with `data` (list of label dicts), `sync_token`, and
62            `has_next_page`.
63        """
64        params: dict[str, Any] = {"limit": str(limit)}
65        if sync_token:
66            params["sync_token"] = sync_token
67        return self._client._get("/v5/labels", params=params)

Fetch labels, with optional incremental sync.

Arguments:
  • sync_token: Cursor from a previous list() response.
  • limit: Max labels per page (default 2500).
Returns:

Dict with data (list of label dicts), sync_token, and has_next_page.

def create( self, title: str, *, color: str | None = None, icon: str | None = None, parent_id: str | None = None, type: str | None = None, **extra: Any) -> dict:
 69    def create(
 70        self,
 71        title: str,
 72        *,
 73        color: str | None = None,
 74        icon: str | None = None,
 75        parent_id: str | None = None,
 76        type: str | None = None,
 77        **extra: Any,
 78    ) -> dict:
 79        """Create a new label.
 80
 81        Args:
 82            title: Label name.
 83            color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`).
 84            icon: Emoji icon.
 85            parent_id: UUID of a folder label to nest under.
 86            type: Set to `"folder"` to create a folder, or `None` for a label.
 87            **extra: Additional fields passed directly to the API.
 88
 89        Returns:
 90            The created label dict.
 91
 92        Example:
 93            ```python
 94            label = client.label.create("Work", color="palette-cobalt")
 95            folder = client.label.create("Area", type="folder")
 96            nested = client.label.create("Sub-project", parent_id=folder["id"])
 97            ```
 98        """
 99        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
100        label_id = str(uuid.uuid4())
101        sorting = int(datetime.now(timezone.utc).timestamp() * 1000)
102
103        label: dict[str, Any] = {
104            "id": label_id,
105            "title": title,
106            "color": color,
107            "icon": icon,
108            "sorting": sorting,
109            "parent_id": parent_id,
110            "type": type,
111            "data": {},
112            "global_created_at": now,
113            "global_updated_at": now,
114            "deleted_at": None,
115            **extra,
116        }
117
118        resp = self._client._patch("/v5/labels", json=[label])
119        created = resp["data"][0] if resp.get("data") else resp
120        # Invalidate name cache
121        self._name_cache = None
122        return created

Create a new label.

Arguments:
  • title: Label name.
  • color: Color palette name (e.g. "palette-green", "palette-pink").
  • icon: Emoji icon.
  • parent_id: UUID of a folder label to nest under.
  • type: Set to "folder" to create a folder, or None for a label.
  • **extra: Additional fields passed directly to the API.
Returns:

The created label dict.

Example:
label = client.label.create("Work", color="palette-cobalt")
folder = client.label.create("Area", type="folder")
nested = client.label.create("Sub-project", parent_id=folder["id"])
def update(self, label_id: str, **fields: Any) -> dict:
124    def update(self, label_id: str, **fields: Any) -> dict:
125        """Update a label by ID.
126
127        Args:
128            label_id: UUID of the label to update.
129            **fields: Any label fields to update (e.g. `title`, `color`, `icon`).
130
131        Returns:
132            The updated label dict.
133
134        Example:
135            ```python
136            client.label.update(label_id, title="New Name", color="palette-red")
137            ```
138        """
139        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
140        payload: dict[str, Any] = {
141            "id": label_id,
142            "global_updated_at": now,
143        }
144        payload.update(fields)
145
146        resp = self._client._patch("/v5/labels", json=[payload])
147        self._name_cache = None
148        if resp.get("data"):
149            return resp["data"][0]
150        return resp

Update a label by ID.

Arguments:
  • label_id: UUID of the label to update.
  • **fields: Any label fields to update (e.g. title, color, icon).
Returns:

The updated label dict.

Example:
client.label.update(label_id, title="New Name", color="palette-red")
def delete(self, label_id: str) -> dict:
152    def delete(self, label_id: str) -> dict:
153        """Soft-delete a label.
154
155        Args:
156            label_id: UUID of the label to delete.
157
158        Returns:
159            The updated label dict with `deleted_at` set.
160
161        Example:
162            ```python
163            client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
164            ```
165        """
166        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
167        payload: dict[str, Any] = {
168            "id": label_id,
169            "title": None,
170            "color": None,
171            "sorting": None,
172            "icon": None,
173            "parent_id": None,
174            "type": None,
175            "data": {},
176            "deleted_at": now,
177            "global_updated_at": now,
178        }
179        resp = self._client._patch("/v5/labels", json=[payload])
180        self._name_cache = None
181        if resp.get("data"):
182            return resp["data"][0]
183        return resp

Soft-delete a label.

Arguments:
  • label_id: UUID of the label to delete.
Returns:

The updated label dict with deleted_at set.

Example:
client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
def get_id(self, name: str) -> str | None:
194    def get_id(self, name: str) -> str | None:
195        """Resolve a label name to its UUID.
196
197        Case-insensitive. Returns `None` if no label matches.
198
199        Args:
200            name: Label name to look up.
201
202        Returns:
203            Label UUID, or `None` if not found.
204
205        Example:
206            ```python
207            label_id = client.label.get_id("Work")
208            ```
209        """
210        if self._name_cache is None:
211            self._name_cache = self._build_name_cache()
212        return self._name_cache.get(name.lower())

Resolve a label name to its UUID.

Case-insensitive. Returns None if no label matches.

Arguments:
  • name: Label name to look up.
Returns:

Label UUID, or None if not found.

Example:
label_id = client.label.get_id("Work")
def resolve_id(self, label: str) -> str:
214    def resolve_id(self, label: str) -> str:
215        """Resolve a label name or UUID to a UUID.
216
217        If `label` looks like a UUID, returns it as-is. Otherwise, looks
218        it up by name (case-insensitive).
219
220        Args:
221            label: Label UUID or name.
222
223        Returns:
224            Label UUID.
225
226        Raises:
227            ValueError: If the name doesn't match any label.
228
229        Example:
230            ```python
231            # Both return the same UUID:
232            client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
233            client.label.resolve_id("Work")
234            ```
235        """
236        try:
237            uuid.UUID(label)
238            return label
239        except ValueError:
240            pass
241
242        label_id = self.get_id(label)
243        if label_id is None:
244            raise ValueError(f"No label found with name: {label!r}")
245        return label_id

Resolve a label name or UUID to a UUID.

If label looks like a UUID, returns it as-is. Otherwise, looks it up by name (case-insensitive).

Arguments:
  • label: Label UUID or name.
Returns:

Label UUID.

Raises:
  • ValueError: If the name doesn't match any label.
Example:
# Both return the same UUID:
client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
client.label.resolve_id("Work")
class Task:
 45class Task:
 46    """Operations on Akiflow tasks, available as `client.task`.
 47
 48    All mutations go through `PATCH /v5/tasks` (Akiflow uses upsert semantics).
 49    Deletion is a soft-delete via `trashed_at`.
 50
 51    Example:
 52        ```python
 53        from akiflow import Akiflow
 54
 55        client = Akiflow(refresh_token="def50200...")
 56
 57        # Create
 58        task = client.task.create("Buy groceries")
 59
 60        # Update
 61        client.task.update(task["id"], title="Buy organic groceries")
 62
 63        # Mark done
 64        client.task.done(task["id"])
 65
 66        # Delete
 67        client.task.delete(task["id"])
 68
 69        # List all tasks
 70        result = client.task.list()
 71        for t in result["data"]:
 72            print(t["title"], t["done"])
 73        ```
 74    """
 75
 76    def __init__(self, client: Akiflow):
 77        self._client = client
 78
 79    def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
 80        """Fetch tasks, with optional incremental sync.
 81
 82        Args:
 83            sync_token: Cursor from a previous `list()` response. Pass this
 84                to get only tasks changed since the last call.
 85            limit: Max tasks per page (default 2500).
 86
 87        Returns:
 88            Dict with `data` (list of task dicts), `sync_token` (cursor for
 89            next call), and `has_next_page`.
 90
 91        Example:
 92            ```python
 93            # Full sync
 94            result = client.task.list()
 95            tasks = result["data"]
 96            cursor = result["sync_token"]
 97
 98            # Incremental sync (only changes since last call)
 99            result = client.task.list(sync_token=cursor)
100            ```
101        """
102        params: dict[str, Any] = {"limit": str(limit)}
103        if sync_token:
104            params["sync_token"] = sync_token
105        return self._client._get("/v5/tasks", params=params)
106
107    def create(
108        self,
109        title: str,
110        *,
111        description: str | None = None,
112        date: str | None = None,
113        datetime_: str | None = None,
114        datetime_tz: str | None = None,
115        duration: int | None = None,
116        due_date: str | None = None,
117        priority: int | None = None,
118        tags_ids: list[str] | None = None,
119        label: str | None = None,
120        list_id: str | None = None,
121        section_id: str | None = None,
122        links: list[str] | None = None,
123        **extra: Any,
124    ) -> TaskResult:
125        """Create a new task.
126
127        By default, tasks land in the **inbox** (`status=1`). To schedule a
128        task on a specific date/time, pass `date` and `datetime_`.
129
130        Args:
131            title: Task title.
132            description: HTML description body.
133            date: Planned date (`"2026-03-27"`).
134            datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`).
135            datetime_tz: Timezone for display (default `"Europe/Zurich"`).
136            duration: Duration in seconds (e.g. `1800` for 30 min).
137            due_date: Hard due date (`"2026-03-28"`).
138            priority: Priority level.
139            tags_ids: List of tag UUIDs.
140            label: Label/project name **or** UUID. Resolved automatically
141                via `client.label.resolve_id()`. Takes precedence over `list_id`.
142            list_id: Project/list UUID (use `label` for name-based lookup).
143            section_id: Section within a project.
144            links: List of URL strings.
145            **extra: Additional fields passed directly to the API.
146
147        Returns:
148            A `TaskResult` dict. Supports direct mutation via
149            `task.update(...)`, `task.done()`, and `task.delete()`.
150
151        Example:
152            ```python
153            # Inbox task
154            task = client.task.create("Buy groceries")
155
156            # Task assigned to a label by name
157            task = client.task.create("Review PR", label="Work")
158
159            # Scheduled task with duration
160            task = client.task.create(
161                "Team standup",
162                date="2026-03-27",
163                datetime_="2026-03-27T09:00:00.000Z",
164                duration=1800,
165            )
166            ```
167        """
168        if label is not None:
169            list_id = self._client.label.resolve_id(label)
170
171        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
172        task_id = str(uuid.uuid4())
173        sorting = int(datetime.now(timezone.utc).timestamp() * 1000)
174
175        task: dict[str, Any] = {
176            "id": task_id,
177            "title": title,
178            "description": description,
179            "status": 1,
180            "done": False,
181            "done_at": None,
182            "date": date,
183            "datetime": datetime_,
184            "datetime_tz": datetime_tz or "Europe/Zurich",
185            "original_date": None,
186            "original_datetime": None,
187            "duration": duration,
188            "due_date": due_date,
189            "priority": priority,
190            "sorting": sorting,
191            "sorting_label": sorting,
192            "tags_ids": tags_ids,
193            "links": links or [],
194            "listId": list_id,
195            "section_id": section_id,
196            "calendar_id": None,
197            "time_slot_id": None,
198            "recurring_id": None,
199            "recurrence": None,
200            "recurrence_version": None,
201            "plan_unit": None,
202            "plan_period": None,
203            "origin": None,
204            "connector_id": None,
205            "origin_id": None,
206            "origin_account_id": None,
207            "doc": None,
208            "content": None,
209            "data": {},
210            "search_text": "",
211            "trashed_at": None,
212            "deleted_at": None,
213            "global_created_at": now,
214            "global_updated_at": now,
215            "global_list_id_updated_at": now if list_id else None,
216            "global_tags_ids_updated_at": None,
217            **extra,
218        }
219
220        resp = self._client._patch("/v5/tasks", json=[task])
221        data = resp["data"][0] if resp.get("data") else resp
222        return TaskResult(data, self)
223
224    def update(self, task_id: str, **fields: Any) -> TaskResult:
225        """Update a task by ID.
226
227        Only pass the fields you want to change. The `global_updated_at`
228        timestamp is set automatically.
229
230        Args:
231            task_id: UUID of the task to update.
232            **fields: Any task fields to update. Use `label` for name-based
233                project lookup, `list_id` for UUID-based, and `datetime_`
234                for the datetime field.
235
236        Returns:
237            The updated task dict.
238
239        Example:
240            ```python
241            # Rename
242            client.task.update(task_id, title="New title")
243
244            # Assign to a label by name
245            client.task.update(task_id, label="Work")
246
247            # Reschedule
248            client.task.update(
249                task_id,
250                date="2026-04-01",
251                datetime_="2026-04-01T14:00:00.000Z",
252            )
253            ```
254        """
255        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
256        payload: dict[str, Any] = {
257            "id": task_id,
258            "global_updated_at": now,
259        }
260
261        # Resolve label name/UUID -> listId
262        if "label" in fields:
263            label = fields.pop("label")
264            if label is not None:
265                fields["list_id"] = self._client.label.resolve_id(label)
266            else:
267                fields["list_id"] = None
268
269        # Map python-friendly names to API names
270        if "list_id" in fields:
271            payload["listId"] = fields.pop("list_id")
272            payload["global_list_id_updated_at"] = now
273        if "datetime_" in fields:
274            payload["datetime"] = fields.pop("datetime_")
275
276        payload.update(fields)
277
278        resp = self._client._patch("/v5/tasks", json=[payload])
279        data = resp["data"][0] if resp.get("data") else resp
280        return TaskResult(data, self)
281
282    def delete(self, task_id: str) -> TaskResult:
283        """Soft-delete a task.
284
285        Sets `trashed_at` to the current time. The task can still be
286        recovered in Akiflow's trash.
287
288        Args:
289            task_id: UUID of the task to delete.
290
291        Returns:
292            The updated task dict with `trashed_at` set.
293
294        Example:
295            ```python
296            client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee")
297            ```
298        """
299        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
300        payload = {
301            "id": task_id,
302            "status": 10,
303            "trashed_at": now,
304            "global_updated_at": now,
305        }
306        resp = self._client._patch("/v5/tasks", json=[payload])
307        data = resp["data"][0] if resp.get("data") else resp
308        return TaskResult(data, self)
309
310    def done(self, task_id: str, *, date: str | None = None) -> TaskResult:
311        """Mark a task as done.
312
313        Args:
314            task_id: UUID of the task to complete.
315            date: Date in ``YYYY-MM-DD`` format. Defaults to today.
316                A date is required for the task to appear in Akiflow's
317                done list.
318
319        Returns:
320            The updated task dict with `done=True`.
321
322        Example:
323            ```python
324            client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee")
325            client.task.done(task_id, date="2026-03-20")
326            ```
327        """
328        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
329        if date is None:
330            date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
331        return self.update(task_id, done=True, done_at=now, date=date, status=2)

Operations on Akiflow tasks, available as client.task.

All mutations go through PATCH /v5/tasks (Akiflow uses upsert semantics). Deletion is a soft-delete via trashed_at.

Example:
from akiflow import Akiflow

client = Akiflow(refresh_token="def50200...")

# Create
task = client.task.create("Buy groceries")

# Update
client.task.update(task["id"], title="Buy organic groceries")

# Mark done
client.task.done(task["id"])

# Delete
client.task.delete(task["id"])

# List all tasks
result = client.task.list()
for t in result["data"]:
    print(t["title"], t["done"])
Task(client: Akiflow)
76    def __init__(self, client: Akiflow):
77        self._client = client
def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
 79    def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict:
 80        """Fetch tasks, with optional incremental sync.
 81
 82        Args:
 83            sync_token: Cursor from a previous `list()` response. Pass this
 84                to get only tasks changed since the last call.
 85            limit: Max tasks per page (default 2500).
 86
 87        Returns:
 88            Dict with `data` (list of task dicts), `sync_token` (cursor for
 89            next call), and `has_next_page`.
 90
 91        Example:
 92            ```python
 93            # Full sync
 94            result = client.task.list()
 95            tasks = result["data"]
 96            cursor = result["sync_token"]
 97
 98            # Incremental sync (only changes since last call)
 99            result = client.task.list(sync_token=cursor)
100            ```
101        """
102        params: dict[str, Any] = {"limit": str(limit)}
103        if sync_token:
104            params["sync_token"] = sync_token
105        return self._client._get("/v5/tasks", params=params)

Fetch tasks, with optional incremental sync.

Arguments:
  • sync_token: Cursor from a previous list() response. Pass this to get only tasks changed since the last call.
  • limit: Max tasks per page (default 2500).
Returns:

Dict with data (list of task dicts), sync_token (cursor for next call), and has_next_page.

Example:
# Full sync
result = client.task.list()
tasks = result["data"]
cursor = result["sync_token"]

# Incremental sync (only changes since last call)
result = client.task.list(sync_token=cursor)
def create( self, title: str, *, description: str | None = None, date: str | None = None, datetime_: str | None = None, datetime_tz: str | None = None, duration: int | None = None, due_date: str | None = None, priority: int | None = None, tags_ids: 'list[str] | None' = None, label: str | None = None, list_id: str | None = None, section_id: str | None = None, links: 'list[str] | None' = None, **extra: Any) -> akiflow.task.TaskResult:
107    def create(
108        self,
109        title: str,
110        *,
111        description: str | None = None,
112        date: str | None = None,
113        datetime_: str | None = None,
114        datetime_tz: str | None = None,
115        duration: int | None = None,
116        due_date: str | None = None,
117        priority: int | None = None,
118        tags_ids: list[str] | None = None,
119        label: str | None = None,
120        list_id: str | None = None,
121        section_id: str | None = None,
122        links: list[str] | None = None,
123        **extra: Any,
124    ) -> TaskResult:
125        """Create a new task.
126
127        By default, tasks land in the **inbox** (`status=1`). To schedule a
128        task on a specific date/time, pass `date` and `datetime_`.
129
130        Args:
131            title: Task title.
132            description: HTML description body.
133            date: Planned date (`"2026-03-27"`).
134            datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`).
135            datetime_tz: Timezone for display (default `"Europe/Zurich"`).
136            duration: Duration in seconds (e.g. `1800` for 30 min).
137            due_date: Hard due date (`"2026-03-28"`).
138            priority: Priority level.
139            tags_ids: List of tag UUIDs.
140            label: Label/project name **or** UUID. Resolved automatically
141                via `client.label.resolve_id()`. Takes precedence over `list_id`.
142            list_id: Project/list UUID (use `label` for name-based lookup).
143            section_id: Section within a project.
144            links: List of URL strings.
145            **extra: Additional fields passed directly to the API.
146
147        Returns:
148            A `TaskResult` dict. Supports direct mutation via
149            `task.update(...)`, `task.done()`, and `task.delete()`.
150
151        Example:
152            ```python
153            # Inbox task
154            task = client.task.create("Buy groceries")
155
156            # Task assigned to a label by name
157            task = client.task.create("Review PR", label="Work")
158
159            # Scheduled task with duration
160            task = client.task.create(
161                "Team standup",
162                date="2026-03-27",
163                datetime_="2026-03-27T09:00:00.000Z",
164                duration=1800,
165            )
166            ```
167        """
168        if label is not None:
169            list_id = self._client.label.resolve_id(label)
170
171        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
172        task_id = str(uuid.uuid4())
173        sorting = int(datetime.now(timezone.utc).timestamp() * 1000)
174
175        task: dict[str, Any] = {
176            "id": task_id,
177            "title": title,
178            "description": description,
179            "status": 1,
180            "done": False,
181            "done_at": None,
182            "date": date,
183            "datetime": datetime_,
184            "datetime_tz": datetime_tz or "Europe/Zurich",
185            "original_date": None,
186            "original_datetime": None,
187            "duration": duration,
188            "due_date": due_date,
189            "priority": priority,
190            "sorting": sorting,
191            "sorting_label": sorting,
192            "tags_ids": tags_ids,
193            "links": links or [],
194            "listId": list_id,
195            "section_id": section_id,
196            "calendar_id": None,
197            "time_slot_id": None,
198            "recurring_id": None,
199            "recurrence": None,
200            "recurrence_version": None,
201            "plan_unit": None,
202            "plan_period": None,
203            "origin": None,
204            "connector_id": None,
205            "origin_id": None,
206            "origin_account_id": None,
207            "doc": None,
208            "content": None,
209            "data": {},
210            "search_text": "",
211            "trashed_at": None,
212            "deleted_at": None,
213            "global_created_at": now,
214            "global_updated_at": now,
215            "global_list_id_updated_at": now if list_id else None,
216            "global_tags_ids_updated_at": None,
217            **extra,
218        }
219
220        resp = self._client._patch("/v5/tasks", json=[task])
221        data = resp["data"][0] if resp.get("data") else resp
222        return TaskResult(data, self)

Create a new task.

By default, tasks land in the inbox (status=1). To schedule a task on a specific date/time, pass date and datetime_.

Arguments:
  • title: Task title.
  • description: HTML description body.
  • date: Planned date ("2026-03-27").
  • datetime_: Planned datetime in UTC ("2026-03-27T09:00:00.000Z").
  • datetime_tz: Timezone for display (default "Europe/Zurich").
  • duration: Duration in seconds (e.g. 1800 for 30 min).
  • due_date: Hard due date ("2026-03-28").
  • priority: Priority level.
  • tags_ids: List of tag UUIDs.
  • label: Label/project name or UUID. Resolved automatically via client.label.resolve_id(). Takes precedence over list_id.
  • list_id: Project/list UUID (use label for name-based lookup).
  • section_id: Section within a project.
  • links: List of URL strings.
  • **extra: Additional fields passed directly to the API.
Returns:

A TaskResult dict. Supports direct mutation via task.update(...), task.done(), and task.delete().

Example:
# Inbox task
task = client.task.create("Buy groceries")

# Task assigned to a label by name
task = client.task.create("Review PR", label="Work")

# Scheduled task with duration
task = client.task.create(
    "Team standup",
    date="2026-03-27",
    datetime_="2026-03-27T09:00:00.000Z",
    duration=1800,
)
def update(self, task_id: str, **fields: Any) -> akiflow.task.TaskResult:
224    def update(self, task_id: str, **fields: Any) -> TaskResult:
225        """Update a task by ID.
226
227        Only pass the fields you want to change. The `global_updated_at`
228        timestamp is set automatically.
229
230        Args:
231            task_id: UUID of the task to update.
232            **fields: Any task fields to update. Use `label` for name-based
233                project lookup, `list_id` for UUID-based, and `datetime_`
234                for the datetime field.
235
236        Returns:
237            The updated task dict.
238
239        Example:
240            ```python
241            # Rename
242            client.task.update(task_id, title="New title")
243
244            # Assign to a label by name
245            client.task.update(task_id, label="Work")
246
247            # Reschedule
248            client.task.update(
249                task_id,
250                date="2026-04-01",
251                datetime_="2026-04-01T14:00:00.000Z",
252            )
253            ```
254        """
255        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
256        payload: dict[str, Any] = {
257            "id": task_id,
258            "global_updated_at": now,
259        }
260
261        # Resolve label name/UUID -> listId
262        if "label" in fields:
263            label = fields.pop("label")
264            if label is not None:
265                fields["list_id"] = self._client.label.resolve_id(label)
266            else:
267                fields["list_id"] = None
268
269        # Map python-friendly names to API names
270        if "list_id" in fields:
271            payload["listId"] = fields.pop("list_id")
272            payload["global_list_id_updated_at"] = now
273        if "datetime_" in fields:
274            payload["datetime"] = fields.pop("datetime_")
275
276        payload.update(fields)
277
278        resp = self._client._patch("/v5/tasks", json=[payload])
279        data = resp["data"][0] if resp.get("data") else resp
280        return TaskResult(data, self)

Update a task by ID.

Only pass the fields you want to change. The global_updated_at timestamp is set automatically.

Arguments:
  • task_id: UUID of the task to update.
  • **fields: Any task fields to update. Use label for name-based project lookup, list_id for UUID-based, and datetime_ for the datetime field.
Returns:

The updated task dict.

Example:
# Rename
client.task.update(task_id, title="New title")

# Assign to a label by name
client.task.update(task_id, label="Work")

# Reschedule
client.task.update(
    task_id,
    date="2026-04-01",
    datetime_="2026-04-01T14:00:00.000Z",
)
def delete(self, task_id: str) -> akiflow.task.TaskResult:
282    def delete(self, task_id: str) -> TaskResult:
283        """Soft-delete a task.
284
285        Sets `trashed_at` to the current time. The task can still be
286        recovered in Akiflow's trash.
287
288        Args:
289            task_id: UUID of the task to delete.
290
291        Returns:
292            The updated task dict with `trashed_at` set.
293
294        Example:
295            ```python
296            client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee")
297            ```
298        """
299        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
300        payload = {
301            "id": task_id,
302            "status": 10,
303            "trashed_at": now,
304            "global_updated_at": now,
305        }
306        resp = self._client._patch("/v5/tasks", json=[payload])
307        data = resp["data"][0] if resp.get("data") else resp
308        return TaskResult(data, self)

Soft-delete a task.

Sets trashed_at to the current time. The task can still be recovered in Akiflow's trash.

Arguments:
  • task_id: UUID of the task to delete.
Returns:

The updated task dict with trashed_at set.

Example:
client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee")
def done( self, task_id: str, *, date: str | None = None) -> akiflow.task.TaskResult:
310    def done(self, task_id: str, *, date: str | None = None) -> TaskResult:
311        """Mark a task as done.
312
313        Args:
314            task_id: UUID of the task to complete.
315            date: Date in ``YYYY-MM-DD`` format. Defaults to today.
316                A date is required for the task to appear in Akiflow's
317                done list.
318
319        Returns:
320            The updated task dict with `done=True`.
321
322        Example:
323            ```python
324            client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee")
325            client.task.done(task_id, date="2026-03-20")
326            ```
327        """
328        now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
329        if date is None:
330            date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
331        return self.update(task_id, done=True, done_at=now, date=date, status=2)

Mark a task as done.

Arguments:
  • task_id: UUID of the task to complete.
  • date: Date in YYYY-MM-DD format. Defaults to today. A date is required for the task to appear in Akiflow's done list.
Returns:

The updated task dict with done=True.

Example:
client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee")
client.task.done(task_id, date="2026-03-20")
class AkiflowError(builtins.Exception):
5class AkiflowError(Exception):
6    """Base exception for all Akiflow SDK errors."""

Base exception for all Akiflow SDK errors.

class APIError(akiflow.AkiflowError):
13class APIError(AkiflowError):
14    """Raised on non-2xx responses from the Akiflow API.
15
16    Attributes:
17        status_code: HTTP status code.
18        message: Error message from the response body.
19    """
20
21    def __init__(self, status_code: int, message: str):
22        self.status_code = status_code
23        self.message = message
24        super().__init__(f"HTTP {status_code}: {message}")

Raised on non-2xx responses from the Akiflow API.

Attributes:
  • status_code: HTTP status code.
  • message: Error message from the response body.
APIError(status_code: int, message: str)
21    def __init__(self, status_code: int, message: str):
22        self.status_code = status_code
23        self.message = message
24        super().__init__(f"HTTP {status_code}: {message}")
status_code
message
class AuthError(akiflow.AkiflowError):
 9class AuthError(AkiflowError):
10    """Raised when authentication fails (bad OTP, expired link, etc.)."""

Raised when authentication fails (bad OTP, expired link, etc.).

class TokenExpiredError(akiflow.APIError):
27class TokenExpiredError(APIError):
28    """Raised when the access token is expired and cannot be refreshed.
29
30    This typically means the refresh token has also expired or was revoked.
31    Re-authenticate with `Akiflow(email=...)` to get new tokens.
32    """

Raised when the access token is expired and cannot be refreshed.

This typically means the refresh token has also expired or was revoked. Re-authenticate with Akiflow(email=...) to get new tokens.