akiflow
Akiflow SDK
Unofficial Python SDK for the Akiflow task management software.
Quick Start
Install the SDK:
pip install akiflow
# or
uv add akiflow
from akiflow import Akiflow
# Interactive login (prompts for email + 2FA code)
client = Akiflow()
# Or reuse saved tokens (no interactive prompt)
client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
# Create a task in your inbox
task = client.task.create("Buy groceries")
# Schedule a task
task = client.task.create(
"Team standup",
date="2026-03-27",
datetime_="2026-03-27T09:00:00.000Z",
duration=1800,
)
# Update, complete, or delete
client.task.update(task["id"], title="Team standup (moved)")
client.task.done(task["id"])
client.task.delete(task["id"])
Authentication
Akiflow uses passwordless email + OTP authentication. On first use,
pass your email to trigger the interactive flow. The client prints
access_token and refresh_token after success — save them for reuse.
The access token expires in 30 minutes, but the refresh token is
long-lived (~13 months). The client auto-refreshes on 401, so passing just
a refresh_token is enough for persistent scripts.
Debugging
Pass debug=True to print every request and response:
client = Akiflow(email="you@example.com", debug=True) # or omit email to be prompted
Pass verify_ssl=False to disable SSL verification (useful with Proxyman/Charles):
client = Akiflow(access_token="...", verify_ssl=False)
1""" 2# Akiflow SDK 3 4Unofficial Python SDK for the [Akiflow](https://akiflow.com) task management software. 5 6## Quick Start 7 8Install the SDK: 9```bash 10pip install akiflow 11# or 12uv add akiflow 13``` 14 15```python 16from akiflow import Akiflow 17 18# Interactive login (prompts for email + 2FA code) 19client = Akiflow() 20 21# Or reuse saved tokens (no interactive prompt) 22client = Akiflow(access_token="eyJ...", refresh_token="def50200...") 23 24# Create a task in your inbox 25task = client.task.create("Buy groceries") 26 27# Schedule a task 28task = client.task.create( 29 "Team standup", 30 date="2026-03-27", 31 datetime_="2026-03-27T09:00:00.000Z", 32 duration=1800, 33) 34 35# Update, complete, or delete 36client.task.update(task["id"], title="Team standup (moved)") 37client.task.done(task["id"]) 38client.task.delete(task["id"]) 39``` 40 41## Authentication 42 43Akiflow uses **passwordless email + OTP** authentication. On first use, 44pass your `email` to trigger the interactive flow. The client prints 45`access_token` and `refresh_token` after success — save them for reuse. 46 47The **access token** expires in 30 minutes, but the **refresh token** is 48long-lived (~13 months). The client auto-refreshes on 401, so passing just 49a `refresh_token` is enough for persistent scripts. 50 51## Debugging 52 53Pass `debug=True` to print every request and response: 54 55```python 56client = Akiflow(email="you@example.com", debug=True) # or omit email to be prompted 57 58``` 59 60Pass `verify_ssl=False` to disable SSL verification (useful with Proxyman/Charles): 61 62```python 63client = Akiflow(access_token="...", verify_ssl=False) 64``` 65""" 66 67from .client import Akiflow 68from .exceptions import AkiflowError, APIError, AuthError, TokenExpiredError 69from .label import Label 70from .task import Task 71 72__all__ = ["Akiflow", "Label", "Task", "AkiflowError", "APIError", "AuthError", "TokenExpiredError"]
20class Akiflow: 21 """Unofficial Akiflow API client. 22 23 Provides access to Akiflow resources through sub-clients: 24 25 - `client.task` — create, update, delete, and list tasks 26 - `client.label` — create, update, delete, and list labels/projects 27 28 There are three ways to authenticate: 29 30 **Interactive login** (prompts for email + 2FA code): 31 ```python 32 client = Akiflow() 33 ``` 34 35 **Access token + refresh token** (no prompt, auto-refreshes): 36 ```python 37 client = Akiflow(access_token="eyJ...", refresh_token="def50200...") 38 ``` 39 40 **Refresh token only** (exchanges for access token on init): 41 ```python 42 client = Akiflow(refresh_token="def50200...") 43 ``` 44 45 Args: 46 email: Akiflow account email. Triggers interactive OTP flow. 47 access_token: JWT access token (expires in 30 min). 48 refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh. 49 client_id: Client UUID sent with requests. Auto-generated if omitted. 50 auto_refresh: Automatically refresh expired tokens on 401. Default `True`. 51 debug: Print request/response details to stdout. Default `False`. 52 verify_ssl: Verify SSL certificates. Set `False` for proxy tools. Default `True`. 53 """ 54 55 def __init__( 56 self, 57 *, 58 email: str | None = None, 59 access_token: str | None = None, 60 refresh_token: str | None = None, 61 client_id: str | None = None, 62 auto_refresh: bool = True, 63 debug: bool = False, 64 verify_ssl: bool = True, 65 ): 66 self._client_id = client_id or str(uuid.uuid4()) 67 self._auto_refresh = auto_refresh 68 self._debug = debug 69 self._verify_ssl = verify_ssl 70 self._access_token: str | None = access_token 71 self._refresh_token: str | None = refresh_token 72 73 if not email and not access_token and not refresh_token: 74 email = input("Akiflow account email: ").strip() 75 if not email: 76 raise ValueError("Email is required for interactive login") 77 78 if email and not self._access_token: 79 tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl) 80 self._access_token = tokens["access_token"] 81 self._refresh_token = tokens["refresh_token"] 82 self._client_id = tokens["client_id"] 83 print("\nAuthenticated successfully.") 84 85 if not self._access_token: 86 if self._refresh_token: 87 self._do_refresh() 88 else: 89 raise ValueError("Provide email (for interactive login) or access_token/refresh_token") 90 91 self._http = httpx.Client( 92 base_url=API_BASE, 93 headers=self._auth_headers(), 94 timeout=30.0, 95 verify=self._verify_ssl, 96 ) 97 98 self.label = Label(self) 99 """Label/project operations. See `akiflow.label.Label`.""" 100 101 self.task = Task(self) 102 """Task operations. See `akiflow.task.Task`.""" 103 104 def _auth_headers(self) -> dict[str, str]: 105 return { 106 "Authorization": f"Bearer {self._access_token}", 107 "Akiflow-Platform": "web", 108 "Akiflow-Version": "2.69.3", 109 "Akiflow-Client-Id": self._client_id, 110 "Accept": "application/json", 111 "Content-Type": "application/json", 112 } 113 114 def _do_refresh(self) -> None: 115 if not self._refresh_token: 116 raise TokenExpiredError(401, "No refresh token available") 117 tokens = refresh_access_token(self._refresh_token, client_id=self._client_id, verify_ssl=self._verify_ssl) 118 self._access_token = tokens["access_token"] 119 self._refresh_token = tokens["refresh_token"] 120 self._client_id = tokens["client_id"] 121 if hasattr(self, "_http"): 122 self._http.headers.update(self._auth_headers()) 123 124 def _request(self, method: str, path: str, **kwargs: Any) -> dict: 125 if self._debug: 126 body = kwargs.get("json") 127 print(f"\n>>> {method} {API_BASE}{path}") 128 if kwargs.get("params"): 129 print(f" params: {kwargs['params']}") 130 if body is not None: 131 print(f" body: {json.dumps(body, indent=2)}") 132 133 resp = self._http.request(method, path, **kwargs) 134 135 if self._debug: 136 print(f"<<< {resp.status_code}") 137 try: 138 print(f" body: {json.dumps(resp.json(), indent=2)}") 139 except Exception: 140 print(f" body: {resp.text[:500]}") 141 142 # Auto-refresh on 401 143 if resp.status_code == 401 and self._auto_refresh and self._refresh_token: 144 if self._debug: 145 print(" (refreshing token...)") 146 self._do_refresh() 147 resp = self._http.request(method, path, **kwargs) 148 if self._debug: 149 print(f"<<< {resp.status_code} (after refresh)") 150 try: 151 print(f" body: {json.dumps(resp.json(), indent=2)}") 152 except Exception: 153 print(f" body: {resp.text[:500]}") 154 155 if resp.status_code == 401: 156 raise TokenExpiredError(401, resp.json().get("message", "Unauthorized")) 157 if not resp.is_success: 158 raise APIError(resp.status_code, resp.text) 159 160 return resp.json() 161 162 def _get(self, path: str, **kwargs: Any) -> dict: 163 return self._request("GET", path, **kwargs) 164 165 def _patch(self, path: str, **kwargs: Any) -> dict: 166 return self._request("PATCH", path, **kwargs) 167 168 @property 169 def access_token(self) -> str | None: 170 """Current JWT access token (may change after auto-refresh).""" 171 return self._access_token 172 173 @property 174 def refresh_token(self) -> str | None: 175 """Current refresh token (rotates on each refresh).""" 176 return self._refresh_token 177 178 def close(self) -> None: 179 """Close the underlying HTTP connection.""" 180 self._http.close() 181 182 def __enter__(self): 183 return self 184 185 def __exit__(self, *args): 186 self.close()
Unofficial Akiflow API client.
Provides access to Akiflow resources through sub-clients:
client.task— create, update, delete, and list tasksclient.label— create, update, delete, and list labels/projects
There are three ways to authenticate:
Interactive login (prompts for email + 2FA code):
client = Akiflow()
Access token + refresh token (no prompt, auto-refreshes):
client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
Refresh token only (exchanges for access token on init):
client = Akiflow(refresh_token="def50200...")
Arguments:
- email: Akiflow account email. Triggers interactive OTP flow.
- access_token: JWT access token (expires in 30 min).
- refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh.
- client_id: Client UUID sent with requests. Auto-generated if omitted.
- auto_refresh: Automatically refresh expired tokens on 401. Default
True. - debug: Print request/response details to stdout. Default
False. - verify_ssl: Verify SSL certificates. Set
Falsefor proxy tools. DefaultTrue.
55 def __init__( 56 self, 57 *, 58 email: str | None = None, 59 access_token: str | None = None, 60 refresh_token: str | None = None, 61 client_id: str | None = None, 62 auto_refresh: bool = True, 63 debug: bool = False, 64 verify_ssl: bool = True, 65 ): 66 self._client_id = client_id or str(uuid.uuid4()) 67 self._auto_refresh = auto_refresh 68 self._debug = debug 69 self._verify_ssl = verify_ssl 70 self._access_token: str | None = access_token 71 self._refresh_token: str | None = refresh_token 72 73 if not email and not access_token and not refresh_token: 74 email = input("Akiflow account email: ").strip() 75 if not email: 76 raise ValueError("Email is required for interactive login") 77 78 if email and not self._access_token: 79 tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl) 80 self._access_token = tokens["access_token"] 81 self._refresh_token = tokens["refresh_token"] 82 self._client_id = tokens["client_id"] 83 print("\nAuthenticated successfully.") 84 85 if not self._access_token: 86 if self._refresh_token: 87 self._do_refresh() 88 else: 89 raise ValueError("Provide email (for interactive login) or access_token/refresh_token") 90 91 self._http = httpx.Client( 92 base_url=API_BASE, 93 headers=self._auth_headers(), 94 timeout=30.0, 95 verify=self._verify_ssl, 96 ) 97 98 self.label = Label(self) 99 """Label/project operations. See `akiflow.label.Label`.""" 100 101 self.task = Task(self) 102 """Task operations. See `akiflow.task.Task`."""
168 @property 169 def access_token(self) -> str | None: 170 """Current JWT access token (may change after auto-refresh).""" 171 return self._access_token
Current JWT access token (may change after auto-refresh).
14class Label: 15 """Operations on Akiflow labels/projects, available as `client.label`. 16 17 Labels are what Akiflow calls "projects" in the UI. Tasks reference 18 labels via their `listId` field. Labels can be organized into folders 19 using `parent_id` and `type="folder"`. 20 21 Example: 22 ```python 23 from akiflow import Akiflow 24 25 client = Akiflow(refresh_token="def50200...") 26 27 # Create a label 28 label = client.label.create("My Project", color="palette-green") 29 30 # List all labels 31 result = client.label.list() 32 for lb in result["data"]: 33 print(lb["title"], lb["id"]) 34 35 # Find label ID by name 36 label_id = client.label.get_id("My Project") 37 38 # Update 39 client.label.update(label["id"], title="Renamed Project") 40 41 # Delete 42 client.label.delete(label["id"]) 43 ``` 44 """ 45 46 # Cache of name -> id, populated on first lookup 47 _name_cache: dict[str, str] | None = None 48 49 def __init__(self, client: Akiflow): 50 self._client = client 51 self._name_cache = None 52 53 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 54 """Fetch labels, with optional incremental sync. 55 56 Args: 57 sync_token: Cursor from a previous `list()` response. 58 limit: Max labels per page (default 2500). 59 60 Returns: 61 Dict with `data` (list of label dicts), `sync_token`, and 62 `has_next_page`. 63 """ 64 params: dict[str, Any] = {"limit": str(limit)} 65 if sync_token: 66 params["sync_token"] = sync_token 67 return self._client._get("/v5/labels", params=params) 68 69 def create( 70 self, 71 title: str, 72 *, 73 color: str | None = None, 74 icon: str | None = None, 75 parent_id: str | None = None, 76 type: str | None = None, 77 **extra: Any, 78 ) -> dict: 79 """Create a new label. 80 81 Args: 82 title: Label name. 83 color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`). 84 icon: Emoji icon. 85 parent_id: UUID of a folder label to nest under. 86 type: Set to `"folder"` to create a folder, or `None` for a label. 87 **extra: Additional fields passed directly to the API. 88 89 Returns: 90 The created label dict. 91 92 Example: 93 ```python 94 label = client.label.create("Work", color="palette-cobalt") 95 folder = client.label.create("Area", type="folder") 96 nested = client.label.create("Sub-project", parent_id=folder["id"]) 97 ``` 98 """ 99 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 100 label_id = str(uuid.uuid4()) 101 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 102 103 label: dict[str, Any] = { 104 "id": label_id, 105 "title": title, 106 "color": color, 107 "icon": icon, 108 "sorting": sorting, 109 "parent_id": parent_id, 110 "type": type, 111 "data": {}, 112 "global_created_at": now, 113 "global_updated_at": now, 114 "deleted_at": None, 115 **extra, 116 } 117 118 resp = self._client._patch("/v5/labels", json=[label]) 119 created = resp["data"][0] if resp.get("data") else resp 120 # Invalidate name cache 121 self._name_cache = None 122 return created 123 124 def update(self, label_id: str, **fields: Any) -> dict: 125 """Update a label by ID. 126 127 Args: 128 label_id: UUID of the label to update. 129 **fields: Any label fields to update (e.g. `title`, `color`, `icon`). 130 131 Returns: 132 The updated label dict. 133 134 Example: 135 ```python 136 client.label.update(label_id, title="New Name", color="palette-red") 137 ``` 138 """ 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 payload: dict[str, Any] = { 141 "id": label_id, 142 "global_updated_at": now, 143 } 144 payload.update(fields) 145 146 resp = self._client._patch("/v5/labels", json=[payload]) 147 self._name_cache = None 148 if resp.get("data"): 149 return resp["data"][0] 150 return resp 151 152 def delete(self, label_id: str) -> dict: 153 """Soft-delete a label. 154 155 Args: 156 label_id: UUID of the label to delete. 157 158 Returns: 159 The updated label dict with `deleted_at` set. 160 161 Example: 162 ```python 163 client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 164 ``` 165 """ 166 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 167 payload: dict[str, Any] = { 168 "id": label_id, 169 "title": None, 170 "color": None, 171 "sorting": None, 172 "icon": None, 173 "parent_id": None, 174 "type": None, 175 "data": {}, 176 "deleted_at": now, 177 "global_updated_at": now, 178 } 179 resp = self._client._patch("/v5/labels", json=[payload]) 180 self._name_cache = None 181 if resp.get("data"): 182 return resp["data"][0] 183 return resp 184 185 def _build_name_cache(self) -> dict[str, str]: 186 """Fetch all labels and build a lowercase name -> id mapping.""" 187 result = self.list() 188 cache: dict[str, str] = {} 189 for lb in result.get("data", []): 190 if lb.get("title") and not lb.get("deleted_at"): 191 cache[lb["title"].lower()] = lb["id"] 192 return cache 193 194 def get_id(self, name: str) -> str | None: 195 """Resolve a label name to its UUID. 196 197 Case-insensitive. Returns `None` if no label matches. 198 199 Args: 200 name: Label name to look up. 201 202 Returns: 203 Label UUID, or `None` if not found. 204 205 Example: 206 ```python 207 label_id = client.label.get_id("Work") 208 ``` 209 """ 210 if self._name_cache is None: 211 self._name_cache = self._build_name_cache() 212 return self._name_cache.get(name.lower()) 213 214 def resolve_id(self, label: str) -> str: 215 """Resolve a label name or UUID to a UUID. 216 217 If `label` looks like a UUID, returns it as-is. Otherwise, looks 218 it up by name (case-insensitive). 219 220 Args: 221 label: Label UUID or name. 222 223 Returns: 224 Label UUID. 225 226 Raises: 227 ValueError: If the name doesn't match any label. 228 229 Example: 230 ```python 231 # Both return the same UUID: 232 client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 233 client.label.resolve_id("Work") 234 ``` 235 """ 236 try: 237 uuid.UUID(label) 238 return label 239 except ValueError: 240 pass 241 242 label_id = self.get_id(label) 243 if label_id is None: 244 raise ValueError(f"No label found with name: {label!r}") 245 return label_id
Operations on Akiflow labels/projects, available as client.label.
Labels are what Akiflow calls "projects" in the UI. Tasks reference
labels via their listId field. Labels can be organized into folders
using parent_id and type="folder".
Example:
from akiflow import Akiflow client = Akiflow(refresh_token="def50200...") # Create a label label = client.label.create("My Project", color="palette-green") # List all labels result = client.label.list() for lb in result["data"]: print(lb["title"], lb["id"]) # Find label ID by name label_id = client.label.get_id("My Project") # Update client.label.update(label["id"], title="Renamed Project") # Delete client.label.delete(label["id"])
53 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 54 """Fetch labels, with optional incremental sync. 55 56 Args: 57 sync_token: Cursor from a previous `list()` response. 58 limit: Max labels per page (default 2500). 59 60 Returns: 61 Dict with `data` (list of label dicts), `sync_token`, and 62 `has_next_page`. 63 """ 64 params: dict[str, Any] = {"limit": str(limit)} 65 if sync_token: 66 params["sync_token"] = sync_token 67 return self._client._get("/v5/labels", params=params)
Fetch labels, with optional incremental sync.
Arguments:
- sync_token: Cursor from a previous
list()response. - limit: Max labels per page (default 2500).
Returns:
Dict with
data(list of label dicts),sync_token, andhas_next_page.
69 def create( 70 self, 71 title: str, 72 *, 73 color: str | None = None, 74 icon: str | None = None, 75 parent_id: str | None = None, 76 type: str | None = None, 77 **extra: Any, 78 ) -> dict: 79 """Create a new label. 80 81 Args: 82 title: Label name. 83 color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`). 84 icon: Emoji icon. 85 parent_id: UUID of a folder label to nest under. 86 type: Set to `"folder"` to create a folder, or `None` for a label. 87 **extra: Additional fields passed directly to the API. 88 89 Returns: 90 The created label dict. 91 92 Example: 93 ```python 94 label = client.label.create("Work", color="palette-cobalt") 95 folder = client.label.create("Area", type="folder") 96 nested = client.label.create("Sub-project", parent_id=folder["id"]) 97 ``` 98 """ 99 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 100 label_id = str(uuid.uuid4()) 101 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 102 103 label: dict[str, Any] = { 104 "id": label_id, 105 "title": title, 106 "color": color, 107 "icon": icon, 108 "sorting": sorting, 109 "parent_id": parent_id, 110 "type": type, 111 "data": {}, 112 "global_created_at": now, 113 "global_updated_at": now, 114 "deleted_at": None, 115 **extra, 116 } 117 118 resp = self._client._patch("/v5/labels", json=[label]) 119 created = resp["data"][0] if resp.get("data") else resp 120 # Invalidate name cache 121 self._name_cache = None 122 return created
Create a new label.
Arguments:
- title: Label name.
- color: Color palette name (e.g.
"palette-green","palette-pink"). - icon: Emoji icon.
- parent_id: UUID of a folder label to nest under.
- type: Set to
"folder"to create a folder, orNonefor a label. - **extra: Additional fields passed directly to the API.
Returns:
The created label dict.
Example:
label = client.label.create("Work", color="palette-cobalt") folder = client.label.create("Area", type="folder") nested = client.label.create("Sub-project", parent_id=folder["id"])
124 def update(self, label_id: str, **fields: Any) -> dict: 125 """Update a label by ID. 126 127 Args: 128 label_id: UUID of the label to update. 129 **fields: Any label fields to update (e.g. `title`, `color`, `icon`). 130 131 Returns: 132 The updated label dict. 133 134 Example: 135 ```python 136 client.label.update(label_id, title="New Name", color="palette-red") 137 ``` 138 """ 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 payload: dict[str, Any] = { 141 "id": label_id, 142 "global_updated_at": now, 143 } 144 payload.update(fields) 145 146 resp = self._client._patch("/v5/labels", json=[payload]) 147 self._name_cache = None 148 if resp.get("data"): 149 return resp["data"][0] 150 return resp
Update a label by ID.
Arguments:
- label_id: UUID of the label to update.
- **fields: Any label fields to update (e.g.
title,color,icon).
Returns:
The updated label dict.
Example:
client.label.update(label_id, title="New Name", color="palette-red")
152 def delete(self, label_id: str) -> dict: 153 """Soft-delete a label. 154 155 Args: 156 label_id: UUID of the label to delete. 157 158 Returns: 159 The updated label dict with `deleted_at` set. 160 161 Example: 162 ```python 163 client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 164 ``` 165 """ 166 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 167 payload: dict[str, Any] = { 168 "id": label_id, 169 "title": None, 170 "color": None, 171 "sorting": None, 172 "icon": None, 173 "parent_id": None, 174 "type": None, 175 "data": {}, 176 "deleted_at": now, 177 "global_updated_at": now, 178 } 179 resp = self._client._patch("/v5/labels", json=[payload]) 180 self._name_cache = None 181 if resp.get("data"): 182 return resp["data"][0] 183 return resp
Soft-delete a label.
Arguments:
- label_id: UUID of the label to delete.
Returns:
The updated label dict with
deleted_atset.
Example:
client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
194 def get_id(self, name: str) -> str | None: 195 """Resolve a label name to its UUID. 196 197 Case-insensitive. Returns `None` if no label matches. 198 199 Args: 200 name: Label name to look up. 201 202 Returns: 203 Label UUID, or `None` if not found. 204 205 Example: 206 ```python 207 label_id = client.label.get_id("Work") 208 ``` 209 """ 210 if self._name_cache is None: 211 self._name_cache = self._build_name_cache() 212 return self._name_cache.get(name.lower())
Resolve a label name to its UUID.
Case-insensitive. Returns None if no label matches.
Arguments:
- name: Label name to look up.
Returns:
Label UUID, or
Noneif not found.
Example:
label_id = client.label.get_id("Work")
214 def resolve_id(self, label: str) -> str: 215 """Resolve a label name or UUID to a UUID. 216 217 If `label` looks like a UUID, returns it as-is. Otherwise, looks 218 it up by name (case-insensitive). 219 220 Args: 221 label: Label UUID or name. 222 223 Returns: 224 Label UUID. 225 226 Raises: 227 ValueError: If the name doesn't match any label. 228 229 Example: 230 ```python 231 # Both return the same UUID: 232 client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 233 client.label.resolve_id("Work") 234 ``` 235 """ 236 try: 237 uuid.UUID(label) 238 return label 239 except ValueError: 240 pass 241 242 label_id = self.get_id(label) 243 if label_id is None: 244 raise ValueError(f"No label found with name: {label!r}") 245 return label_id
Resolve a label name or UUID to a UUID.
If label looks like a UUID, returns it as-is. Otherwise, looks
it up by name (case-insensitive).
Arguments:
- label: Label UUID or name.
Returns:
Label UUID.
Raises:
- ValueError: If the name doesn't match any label.
Example:
# Both return the same UUID: client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") client.label.resolve_id("Work")
45class Task: 46 """Operations on Akiflow tasks, available as `client.task`. 47 48 All mutations go through `PATCH /v5/tasks` (Akiflow uses upsert semantics). 49 Deletion is a soft-delete via `trashed_at`. 50 51 Example: 52 ```python 53 from akiflow import Akiflow 54 55 client = Akiflow(refresh_token="def50200...") 56 57 # Create 58 task = client.task.create("Buy groceries") 59 60 # Update 61 client.task.update(task["id"], title="Buy organic groceries") 62 63 # Mark done 64 client.task.done(task["id"]) 65 66 # Delete 67 client.task.delete(task["id"]) 68 69 # List all tasks 70 result = client.task.list() 71 for t in result["data"]: 72 print(t["title"], t["done"]) 73 ``` 74 """ 75 76 def __init__(self, client: Akiflow): 77 self._client = client 78 79 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 80 """Fetch tasks, with optional incremental sync. 81 82 Args: 83 sync_token: Cursor from a previous `list()` response. Pass this 84 to get only tasks changed since the last call. 85 limit: Max tasks per page (default 2500). 86 87 Returns: 88 Dict with `data` (list of task dicts), `sync_token` (cursor for 89 next call), and `has_next_page`. 90 91 Example: 92 ```python 93 # Full sync 94 result = client.task.list() 95 tasks = result["data"] 96 cursor = result["sync_token"] 97 98 # Incremental sync (only changes since last call) 99 result = client.task.list(sync_token=cursor) 100 ``` 101 """ 102 params: dict[str, Any] = {"limit": str(limit)} 103 if sync_token: 104 params["sync_token"] = sync_token 105 return self._client._get("/v5/tasks", params=params) 106 107 def create( 108 self, 109 title: str, 110 *, 111 description: str | None = None, 112 date: str | None = None, 113 datetime_: str | None = None, 114 datetime_tz: str | None = None, 115 duration: int | None = None, 116 due_date: str | None = None, 117 priority: int | None = None, 118 tags_ids: list[str] | None = None, 119 label: str | None = None, 120 list_id: str | None = None, 121 section_id: str | None = None, 122 links: list[str] | None = None, 123 **extra: Any, 124 ) -> TaskResult: 125 """Create a new task. 126 127 By default, tasks land in the **inbox** (`status=1`). To schedule a 128 task on a specific date/time, pass `date` and `datetime_`. 129 130 Args: 131 title: Task title. 132 description: HTML description body. 133 date: Planned date (`"2026-03-27"`). 134 datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`). 135 datetime_tz: Timezone for display (default `"Europe/Zurich"`). 136 duration: Duration in seconds (e.g. `1800` for 30 min). 137 due_date: Hard due date (`"2026-03-28"`). 138 priority: Priority level. 139 tags_ids: List of tag UUIDs. 140 label: Label/project name **or** UUID. Resolved automatically 141 via `client.label.resolve_id()`. Takes precedence over `list_id`. 142 list_id: Project/list UUID (use `label` for name-based lookup). 143 section_id: Section within a project. 144 links: List of URL strings. 145 **extra: Additional fields passed directly to the API. 146 147 Returns: 148 A `TaskResult` dict. Supports direct mutation via 149 `task.update(...)`, `task.done()`, and `task.delete()`. 150 151 Example: 152 ```python 153 # Inbox task 154 task = client.task.create("Buy groceries") 155 156 # Task assigned to a label by name 157 task = client.task.create("Review PR", label="Work") 158 159 # Scheduled task with duration 160 task = client.task.create( 161 "Team standup", 162 date="2026-03-27", 163 datetime_="2026-03-27T09:00:00.000Z", 164 duration=1800, 165 ) 166 ``` 167 """ 168 if label is not None: 169 list_id = self._client.label.resolve_id(label) 170 171 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 172 task_id = str(uuid.uuid4()) 173 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 174 175 task: dict[str, Any] = { 176 "id": task_id, 177 "title": title, 178 "description": description, 179 "status": 1, 180 "done": False, 181 "done_at": None, 182 "date": date, 183 "datetime": datetime_, 184 "datetime_tz": datetime_tz or "Europe/Zurich", 185 "original_date": None, 186 "original_datetime": None, 187 "duration": duration, 188 "due_date": due_date, 189 "priority": priority, 190 "sorting": sorting, 191 "sorting_label": sorting, 192 "tags_ids": tags_ids, 193 "links": links or [], 194 "listId": list_id, 195 "section_id": section_id, 196 "calendar_id": None, 197 "time_slot_id": None, 198 "recurring_id": None, 199 "recurrence": None, 200 "recurrence_version": None, 201 "plan_unit": None, 202 "plan_period": None, 203 "origin": None, 204 "connector_id": None, 205 "origin_id": None, 206 "origin_account_id": None, 207 "doc": None, 208 "content": None, 209 "data": {}, 210 "search_text": "", 211 "trashed_at": None, 212 "deleted_at": None, 213 "global_created_at": now, 214 "global_updated_at": now, 215 "global_list_id_updated_at": now if list_id else None, 216 "global_tags_ids_updated_at": None, 217 **extra, 218 } 219 220 resp = self._client._patch("/v5/tasks", json=[task]) 221 data = resp["data"][0] if resp.get("data") else resp 222 return TaskResult(data, self) 223 224 def update(self, task_id: str, **fields: Any) -> TaskResult: 225 """Update a task by ID. 226 227 Only pass the fields you want to change. The `global_updated_at` 228 timestamp is set automatically. 229 230 Args: 231 task_id: UUID of the task to update. 232 **fields: Any task fields to update. Use `label` for name-based 233 project lookup, `list_id` for UUID-based, and `datetime_` 234 for the datetime field. 235 236 Returns: 237 The updated task dict. 238 239 Example: 240 ```python 241 # Rename 242 client.task.update(task_id, title="New title") 243 244 # Assign to a label by name 245 client.task.update(task_id, label="Work") 246 247 # Reschedule 248 client.task.update( 249 task_id, 250 date="2026-04-01", 251 datetime_="2026-04-01T14:00:00.000Z", 252 ) 253 ``` 254 """ 255 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 256 payload: dict[str, Any] = { 257 "id": task_id, 258 "global_updated_at": now, 259 } 260 261 # Resolve label name/UUID -> listId 262 if "label" in fields: 263 label = fields.pop("label") 264 if label is not None: 265 fields["list_id"] = self._client.label.resolve_id(label) 266 else: 267 fields["list_id"] = None 268 269 # Map python-friendly names to API names 270 if "list_id" in fields: 271 payload["listId"] = fields.pop("list_id") 272 payload["global_list_id_updated_at"] = now 273 if "datetime_" in fields: 274 payload["datetime"] = fields.pop("datetime_") 275 276 payload.update(fields) 277 278 resp = self._client._patch("/v5/tasks", json=[payload]) 279 data = resp["data"][0] if resp.get("data") else resp 280 return TaskResult(data, self) 281 282 def delete(self, task_id: str) -> TaskResult: 283 """Soft-delete a task. 284 285 Sets `trashed_at` to the current time. The task can still be 286 recovered in Akiflow's trash. 287 288 Args: 289 task_id: UUID of the task to delete. 290 291 Returns: 292 The updated task dict with `trashed_at` set. 293 294 Example: 295 ```python 296 client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee") 297 ``` 298 """ 299 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 300 payload = { 301 "id": task_id, 302 "status": 10, 303 "trashed_at": now, 304 "global_updated_at": now, 305 } 306 resp = self._client._patch("/v5/tasks", json=[payload]) 307 data = resp["data"][0] if resp.get("data") else resp 308 return TaskResult(data, self) 309 310 def done(self, task_id: str, *, date: str | None = None) -> TaskResult: 311 """Mark a task as done. 312 313 Args: 314 task_id: UUID of the task to complete. 315 date: Date in ``YYYY-MM-DD`` format. Defaults to today. 316 A date is required for the task to appear in Akiflow's 317 done list. 318 319 Returns: 320 The updated task dict with `done=True`. 321 322 Example: 323 ```python 324 client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee") 325 client.task.done(task_id, date="2026-03-20") 326 ``` 327 """ 328 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 329 if date is None: 330 date = datetime.now(timezone.utc).strftime("%Y-%m-%d") 331 return self.update(task_id, done=True, done_at=now, date=date, status=2)
Operations on Akiflow tasks, available as client.task.
All mutations go through PATCH /v5/tasks (Akiflow uses upsert semantics).
Deletion is a soft-delete via trashed_at.
Example:
from akiflow import Akiflow client = Akiflow(refresh_token="def50200...") # Create task = client.task.create("Buy groceries") # Update client.task.update(task["id"], title="Buy organic groceries") # Mark done client.task.done(task["id"]) # Delete client.task.delete(task["id"]) # List all tasks result = client.task.list() for t in result["data"]: print(t["title"], t["done"])
79 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 80 """Fetch tasks, with optional incremental sync. 81 82 Args: 83 sync_token: Cursor from a previous `list()` response. Pass this 84 to get only tasks changed since the last call. 85 limit: Max tasks per page (default 2500). 86 87 Returns: 88 Dict with `data` (list of task dicts), `sync_token` (cursor for 89 next call), and `has_next_page`. 90 91 Example: 92 ```python 93 # Full sync 94 result = client.task.list() 95 tasks = result["data"] 96 cursor = result["sync_token"] 97 98 # Incremental sync (only changes since last call) 99 result = client.task.list(sync_token=cursor) 100 ``` 101 """ 102 params: dict[str, Any] = {"limit": str(limit)} 103 if sync_token: 104 params["sync_token"] = sync_token 105 return self._client._get("/v5/tasks", params=params)
Fetch tasks, with optional incremental sync.
Arguments:
- sync_token: Cursor from a previous
list()response. Pass this to get only tasks changed since the last call. - limit: Max tasks per page (default 2500).
Returns:
Dict with
data(list of task dicts),sync_token(cursor for next call), andhas_next_page.
Example:
# Full sync result = client.task.list() tasks = result["data"] cursor = result["sync_token"] # Incremental sync (only changes since last call) result = client.task.list(sync_token=cursor)
107 def create( 108 self, 109 title: str, 110 *, 111 description: str | None = None, 112 date: str | None = None, 113 datetime_: str | None = None, 114 datetime_tz: str | None = None, 115 duration: int | None = None, 116 due_date: str | None = None, 117 priority: int | None = None, 118 tags_ids: list[str] | None = None, 119 label: str | None = None, 120 list_id: str | None = None, 121 section_id: str | None = None, 122 links: list[str] | None = None, 123 **extra: Any, 124 ) -> TaskResult: 125 """Create a new task. 126 127 By default, tasks land in the **inbox** (`status=1`). To schedule a 128 task on a specific date/time, pass `date` and `datetime_`. 129 130 Args: 131 title: Task title. 132 description: HTML description body. 133 date: Planned date (`"2026-03-27"`). 134 datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`). 135 datetime_tz: Timezone for display (default `"Europe/Zurich"`). 136 duration: Duration in seconds (e.g. `1800` for 30 min). 137 due_date: Hard due date (`"2026-03-28"`). 138 priority: Priority level. 139 tags_ids: List of tag UUIDs. 140 label: Label/project name **or** UUID. Resolved automatically 141 via `client.label.resolve_id()`. Takes precedence over `list_id`. 142 list_id: Project/list UUID (use `label` for name-based lookup). 143 section_id: Section within a project. 144 links: List of URL strings. 145 **extra: Additional fields passed directly to the API. 146 147 Returns: 148 A `TaskResult` dict. Supports direct mutation via 149 `task.update(...)`, `task.done()`, and `task.delete()`. 150 151 Example: 152 ```python 153 # Inbox task 154 task = client.task.create("Buy groceries") 155 156 # Task assigned to a label by name 157 task = client.task.create("Review PR", label="Work") 158 159 # Scheduled task with duration 160 task = client.task.create( 161 "Team standup", 162 date="2026-03-27", 163 datetime_="2026-03-27T09:00:00.000Z", 164 duration=1800, 165 ) 166 ``` 167 """ 168 if label is not None: 169 list_id = self._client.label.resolve_id(label) 170 171 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 172 task_id = str(uuid.uuid4()) 173 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 174 175 task: dict[str, Any] = { 176 "id": task_id, 177 "title": title, 178 "description": description, 179 "status": 1, 180 "done": False, 181 "done_at": None, 182 "date": date, 183 "datetime": datetime_, 184 "datetime_tz": datetime_tz or "Europe/Zurich", 185 "original_date": None, 186 "original_datetime": None, 187 "duration": duration, 188 "due_date": due_date, 189 "priority": priority, 190 "sorting": sorting, 191 "sorting_label": sorting, 192 "tags_ids": tags_ids, 193 "links": links or [], 194 "listId": list_id, 195 "section_id": section_id, 196 "calendar_id": None, 197 "time_slot_id": None, 198 "recurring_id": None, 199 "recurrence": None, 200 "recurrence_version": None, 201 "plan_unit": None, 202 "plan_period": None, 203 "origin": None, 204 "connector_id": None, 205 "origin_id": None, 206 "origin_account_id": None, 207 "doc": None, 208 "content": None, 209 "data": {}, 210 "search_text": "", 211 "trashed_at": None, 212 "deleted_at": None, 213 "global_created_at": now, 214 "global_updated_at": now, 215 "global_list_id_updated_at": now if list_id else None, 216 "global_tags_ids_updated_at": None, 217 **extra, 218 } 219 220 resp = self._client._patch("/v5/tasks", json=[task]) 221 data = resp["data"][0] if resp.get("data") else resp 222 return TaskResult(data, self)
Create a new task.
By default, tasks land in the inbox (status=1). To schedule a
task on a specific date/time, pass date and datetime_.
Arguments:
- title: Task title.
- description: HTML description body.
- date: Planned date (
"2026-03-27"). - datetime_: Planned datetime in UTC (
"2026-03-27T09:00:00.000Z"). - datetime_tz: Timezone for display (default
"Europe/Zurich"). - duration: Duration in seconds (e.g.
1800for 30 min). - due_date: Hard due date (
"2026-03-28"). - priority: Priority level.
- tags_ids: List of tag UUIDs.
- label: Label/project name or UUID. Resolved automatically
via
client.label.resolve_id(). Takes precedence overlist_id. - list_id: Project/list UUID (use
labelfor name-based lookup). - section_id: Section within a project.
- links: List of URL strings.
- **extra: Additional fields passed directly to the API.
Returns:
A
TaskResultdict. Supports direct mutation viatask.update(...),task.done(), andtask.delete().
Example:
# Inbox task task = client.task.create("Buy groceries") # Task assigned to a label by name task = client.task.create("Review PR", label="Work") # Scheduled task with duration task = client.task.create( "Team standup", date="2026-03-27", datetime_="2026-03-27T09:00:00.000Z", duration=1800, )
224 def update(self, task_id: str, **fields: Any) -> TaskResult: 225 """Update a task by ID. 226 227 Only pass the fields you want to change. The `global_updated_at` 228 timestamp is set automatically. 229 230 Args: 231 task_id: UUID of the task to update. 232 **fields: Any task fields to update. Use `label` for name-based 233 project lookup, `list_id` for UUID-based, and `datetime_` 234 for the datetime field. 235 236 Returns: 237 The updated task dict. 238 239 Example: 240 ```python 241 # Rename 242 client.task.update(task_id, title="New title") 243 244 # Assign to a label by name 245 client.task.update(task_id, label="Work") 246 247 # Reschedule 248 client.task.update( 249 task_id, 250 date="2026-04-01", 251 datetime_="2026-04-01T14:00:00.000Z", 252 ) 253 ``` 254 """ 255 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 256 payload: dict[str, Any] = { 257 "id": task_id, 258 "global_updated_at": now, 259 } 260 261 # Resolve label name/UUID -> listId 262 if "label" in fields: 263 label = fields.pop("label") 264 if label is not None: 265 fields["list_id"] = self._client.label.resolve_id(label) 266 else: 267 fields["list_id"] = None 268 269 # Map python-friendly names to API names 270 if "list_id" in fields: 271 payload["listId"] = fields.pop("list_id") 272 payload["global_list_id_updated_at"] = now 273 if "datetime_" in fields: 274 payload["datetime"] = fields.pop("datetime_") 275 276 payload.update(fields) 277 278 resp = self._client._patch("/v5/tasks", json=[payload]) 279 data = resp["data"][0] if resp.get("data") else resp 280 return TaskResult(data, self)
Update a task by ID.
Only pass the fields you want to change. The global_updated_at
timestamp is set automatically.
Arguments:
- task_id: UUID of the task to update.
- **fields: Any task fields to update. Use
labelfor name-based project lookup,list_idfor UUID-based, anddatetime_for the datetime field.
Returns:
The updated task dict.
Example:
# Rename client.task.update(task_id, title="New title") # Assign to a label by name client.task.update(task_id, label="Work") # Reschedule client.task.update( task_id, date="2026-04-01", datetime_="2026-04-01T14:00:00.000Z", )
282 def delete(self, task_id: str) -> TaskResult: 283 """Soft-delete a task. 284 285 Sets `trashed_at` to the current time. The task can still be 286 recovered in Akiflow's trash. 287 288 Args: 289 task_id: UUID of the task to delete. 290 291 Returns: 292 The updated task dict with `trashed_at` set. 293 294 Example: 295 ```python 296 client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee") 297 ``` 298 """ 299 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 300 payload = { 301 "id": task_id, 302 "status": 10, 303 "trashed_at": now, 304 "global_updated_at": now, 305 } 306 resp = self._client._patch("/v5/tasks", json=[payload]) 307 data = resp["data"][0] if resp.get("data") else resp 308 return TaskResult(data, self)
Soft-delete a task.
Sets trashed_at to the current time. The task can still be
recovered in Akiflow's trash.
Arguments:
- task_id: UUID of the task to delete.
Returns:
The updated task dict with
trashed_atset.
Example:
client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee")
310 def done(self, task_id: str, *, date: str | None = None) -> TaskResult: 311 """Mark a task as done. 312 313 Args: 314 task_id: UUID of the task to complete. 315 date: Date in ``YYYY-MM-DD`` format. Defaults to today. 316 A date is required for the task to appear in Akiflow's 317 done list. 318 319 Returns: 320 The updated task dict with `done=True`. 321 322 Example: 323 ```python 324 client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee") 325 client.task.done(task_id, date="2026-03-20") 326 ``` 327 """ 328 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 329 if date is None: 330 date = datetime.now(timezone.utc).strftime("%Y-%m-%d") 331 return self.update(task_id, done=True, done_at=now, date=date, status=2)
Mark a task as done.
Arguments:
- task_id: UUID of the task to complete.
- date: Date in
YYYY-MM-DDformat. Defaults to today. A date is required for the task to appear in Akiflow's done list.
Returns:
The updated task dict with
done=True.
Example:
client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee") client.task.done(task_id, date="2026-03-20")
Base exception for all Akiflow SDK errors.
13class APIError(AkiflowError): 14 """Raised on non-2xx responses from the Akiflow API. 15 16 Attributes: 17 status_code: HTTP status code. 18 message: Error message from the response body. 19 """ 20 21 def __init__(self, status_code: int, message: str): 22 self.status_code = status_code 23 self.message = message 24 super().__init__(f"HTTP {status_code}: {message}")
Raised on non-2xx responses from the Akiflow API.
Attributes:
- status_code: HTTP status code.
- message: Error message from the response body.
9class AuthError(AkiflowError): 10 """Raised when authentication fails (bad OTP, expired link, etc.)."""
Raised when authentication fails (bad OTP, expired link, etc.).
27class TokenExpiredError(APIError): 28 """Raised when the access token is expired and cannot be refreshed. 29 30 This typically means the refresh token has also expired or was revoked. 31 Re-authenticate with `Akiflow(email=...)` to get new tokens. 32 """
Raised when the access token is expired and cannot be refreshed.
This typically means the refresh token has also expired or was revoked.
Re-authenticate with Akiflow(email=...) to get new tokens.