akiflow
Akiflow SDK
Unofficial Python SDK for the Akiflow task management API.
Quick Start
from akiflow import Akiflow
# Interactive login (prompts for 2FA code via email)
client = Akiflow(email="you@example.com")
# Or reuse saved tokens (no interactive prompt)
client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
# Create a task in your inbox
task = client.task.create("Buy groceries")
# Schedule a task
task = client.task.create(
"Team standup",
date="2026-03-27",
datetime_="2026-03-27T09:00:00.000Z",
duration=1800,
)
# Update, complete, or delete
client.task.update(task["id"], title="Team standup (moved)")
client.task.done(task["id"])
client.task.delete(task["id"])
Authentication
Akiflow uses passwordless email + OTP authentication. On first use,
pass your email to trigger the interactive flow. The client prints
access_token and refresh_token after success — save them for reuse.
The access token expires in 30 minutes, but the refresh token is
long-lived (~13 months). The client auto-refreshes on 401, so passing just
a refresh_token is enough for persistent scripts.
Debugging
Pass debug=True to print every request and response:
client = Akiflow(email="you@example.com", debug=True)
Pass verify_ssl=False to disable SSL verification (useful with Proxyman/Charles):
client = Akiflow(access_token="...", verify_ssl=False)
1""" 2# Akiflow SDK 3 4Unofficial Python SDK for the [Akiflow](https://akiflow.com) task management API. 5 6## Quick Start 7 8```python 9from akiflow import Akiflow 10 11# Interactive login (prompts for 2FA code via email) 12client = Akiflow(email="you@example.com") 13 14# Or reuse saved tokens (no interactive prompt) 15client = Akiflow(access_token="eyJ...", refresh_token="def50200...") 16 17# Create a task in your inbox 18task = client.task.create("Buy groceries") 19 20# Schedule a task 21task = client.task.create( 22 "Team standup", 23 date="2026-03-27", 24 datetime_="2026-03-27T09:00:00.000Z", 25 duration=1800, 26) 27 28# Update, complete, or delete 29client.task.update(task["id"], title="Team standup (moved)") 30client.task.done(task["id"]) 31client.task.delete(task["id"]) 32``` 33 34## Authentication 35 36Akiflow uses **passwordless email + OTP** authentication. On first use, 37pass your `email` to trigger the interactive flow. The client prints 38`access_token` and `refresh_token` after success — save them for reuse. 39 40The **access token** expires in 30 minutes, but the **refresh token** is 41long-lived (~13 months). The client auto-refreshes on 401, so passing just 42a `refresh_token` is enough for persistent scripts. 43 44## Debugging 45 46Pass `debug=True` to print every request and response: 47 48```python 49client = Akiflow(email="you@example.com", debug=True) 50``` 51 52Pass `verify_ssl=False` to disable SSL verification (useful with Proxyman/Charles): 53 54```python 55client = Akiflow(access_token="...", verify_ssl=False) 56``` 57""" 58 59from .client import Akiflow 60from .exceptions import AkiflowError, APIError, AuthError, TokenExpiredError 61from .label import Label 62from .task import Task 63 64__all__ = ["Akiflow", "Label", "Task", "AkiflowError", "APIError", "AuthError", "TokenExpiredError"]
20class Akiflow: 21 """Unofficial Akiflow API client. 22 23 Provides access to Akiflow resources through sub-clients: 24 25 - `client.task` — create, update, delete, and list tasks 26 - `client.label` — create, update, delete, and list labels/projects 27 28 There are three ways to authenticate: 29 30 **Interactive login** (prompts for 2FA code): 31 ```python 32 client = Akiflow(email="you@example.com") 33 ``` 34 35 **Access token + refresh token** (no prompt, auto-refreshes): 36 ```python 37 client = Akiflow(access_token="eyJ...", refresh_token="def50200...") 38 ``` 39 40 **Refresh token only** (exchanges for access token on init): 41 ```python 42 client = Akiflow(refresh_token="def50200...") 43 ``` 44 45 Args: 46 email: Akiflow account email. Triggers interactive OTP flow. 47 access_token: JWT access token (expires in 30 min). 48 refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh. 49 client_id: Client UUID sent with requests. Auto-generated if omitted. 50 auto_refresh: Automatically refresh expired tokens on 401. Default `True`. 51 debug: Print request/response details to stdout. Default `False`. 52 verify_ssl: Verify SSL certificates. Set `False` for proxy tools. Default `True`. 53 """ 54 55 def __init__( 56 self, 57 *, 58 email: str | None = None, 59 access_token: str | None = None, 60 refresh_token: str | None = None, 61 client_id: str | None = None, 62 auto_refresh: bool = True, 63 debug: bool = False, 64 verify_ssl: bool = True, 65 ): 66 self._client_id = client_id or str(uuid.uuid4()) 67 self._auto_refresh = auto_refresh 68 self._debug = debug 69 self._verify_ssl = verify_ssl 70 self._access_token: str | None = access_token 71 self._refresh_token: str | None = refresh_token 72 73 if email and not access_token: 74 tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl) 75 self._access_token = tokens["access_token"] 76 self._refresh_token = tokens["refresh_token"] 77 self._client_id = tokens["client_id"] 78 print(f"\nAuthenticated successfully. Save these for next time:") 79 print(f" access_token: {self._access_token[:50]}...") 80 print(f" refresh_token: {self._refresh_token[:50]}...") 81 82 if not self._access_token: 83 if self._refresh_token: 84 self._do_refresh() 85 else: 86 raise ValueError("Provide email (for interactive login) or access_token/refresh_token") 87 88 self._http = httpx.Client( 89 base_url=API_BASE, 90 headers=self._auth_headers(), 91 timeout=30.0, 92 verify=self._verify_ssl, 93 ) 94 95 self.label = Label(self) 96 """Label/project operations. See `akiflow.label.Label`.""" 97 98 self.task = Task(self) 99 """Task operations. See `akiflow.task.Task`.""" 100 101 def _auth_headers(self) -> dict[str, str]: 102 return { 103 "Authorization": f"Bearer {self._access_token}", 104 "Akiflow-Platform": "web", 105 "Akiflow-Version": "2.69.3", 106 "Akiflow-Client-Id": self._client_id, 107 "Accept": "application/json", 108 "Content-Type": "application/json", 109 } 110 111 def _do_refresh(self) -> None: 112 if not self._refresh_token: 113 raise TokenExpiredError(401, "No refresh token available") 114 tokens = refresh_access_token(self._refresh_token, client_id=self._client_id, verify_ssl=self._verify_ssl) 115 self._access_token = tokens["access_token"] 116 self._refresh_token = tokens["refresh_token"] 117 self._client_id = tokens["client_id"] 118 if hasattr(self, "_http"): 119 self._http.headers.update(self._auth_headers()) 120 121 def _request(self, method: str, path: str, **kwargs: Any) -> dict: 122 if self._debug: 123 body = kwargs.get("json") 124 print(f"\n>>> {method} {API_BASE}{path}") 125 if kwargs.get("params"): 126 print(f" params: {kwargs['params']}") 127 if body is not None: 128 print(f" body: {json.dumps(body, indent=2)}") 129 130 resp = self._http.request(method, path, **kwargs) 131 132 if self._debug: 133 print(f"<<< {resp.status_code}") 134 try: 135 print(f" body: {json.dumps(resp.json(), indent=2)}") 136 except Exception: 137 print(f" body: {resp.text[:500]}") 138 139 # Auto-refresh on 401 140 if resp.status_code == 401 and self._auto_refresh and self._refresh_token: 141 if self._debug: 142 print(" (refreshing token...)") 143 self._do_refresh() 144 resp = self._http.request(method, path, **kwargs) 145 if self._debug: 146 print(f"<<< {resp.status_code} (after refresh)") 147 try: 148 print(f" body: {json.dumps(resp.json(), indent=2)}") 149 except Exception: 150 print(f" body: {resp.text[:500]}") 151 152 if resp.status_code == 401: 153 raise TokenExpiredError(401, resp.json().get("message", "Unauthorized")) 154 if not resp.is_success: 155 raise APIError(resp.status_code, resp.text) 156 157 return resp.json() 158 159 def _get(self, path: str, **kwargs: Any) -> dict: 160 return self._request("GET", path, **kwargs) 161 162 def _patch(self, path: str, **kwargs: Any) -> dict: 163 return self._request("PATCH", path, **kwargs) 164 165 @property 166 def access_token(self) -> str | None: 167 """Current JWT access token (may change after auto-refresh).""" 168 return self._access_token 169 170 @property 171 def refresh_token(self) -> str | None: 172 """Current refresh token (rotates on each refresh).""" 173 return self._refresh_token 174 175 def close(self) -> None: 176 """Close the underlying HTTP connection.""" 177 self._http.close() 178 179 def __enter__(self): 180 return self 181 182 def __exit__(self, *args): 183 self.close()
Unofficial Akiflow API client.
Provides access to Akiflow resources through sub-clients:
client.task— create, update, delete, and list tasksclient.label— create, update, delete, and list labels/projects
There are three ways to authenticate:
Interactive login (prompts for 2FA code):
client = Akiflow(email="you@example.com")
Access token + refresh token (no prompt, auto-refreshes):
client = Akiflow(access_token="eyJ...", refresh_token="def50200...")
Refresh token only (exchanges for access token on init):
client = Akiflow(refresh_token="def50200...")
Args:
email: Akiflow account email. Triggers interactive OTP flow.
access_token: JWT access token (expires in 30 min).
refresh_token: Long-lived refresh token (~13 months). Enables auto-refresh.
client_id: Client UUID sent with requests. Auto-generated if omitted.
auto_refresh: Automatically refresh expired tokens on 401. Default True.
debug: Print request/response details to stdout. Default False.
verify_ssl: Verify SSL certificates. Set False for proxy tools. Default True.
55 def __init__( 56 self, 57 *, 58 email: str | None = None, 59 access_token: str | None = None, 60 refresh_token: str | None = None, 61 client_id: str | None = None, 62 auto_refresh: bool = True, 63 debug: bool = False, 64 verify_ssl: bool = True, 65 ): 66 self._client_id = client_id or str(uuid.uuid4()) 67 self._auto_refresh = auto_refresh 68 self._debug = debug 69 self._verify_ssl = verify_ssl 70 self._access_token: str | None = access_token 71 self._refresh_token: str | None = refresh_token 72 73 if email and not access_token: 74 tokens = interactive_login(email, client_id=self._client_id, verify_ssl=self._verify_ssl) 75 self._access_token = tokens["access_token"] 76 self._refresh_token = tokens["refresh_token"] 77 self._client_id = tokens["client_id"] 78 print(f"\nAuthenticated successfully. Save these for next time:") 79 print(f" access_token: {self._access_token[:50]}...") 80 print(f" refresh_token: {self._refresh_token[:50]}...") 81 82 if not self._access_token: 83 if self._refresh_token: 84 self._do_refresh() 85 else: 86 raise ValueError("Provide email (for interactive login) or access_token/refresh_token") 87 88 self._http = httpx.Client( 89 base_url=API_BASE, 90 headers=self._auth_headers(), 91 timeout=30.0, 92 verify=self._verify_ssl, 93 ) 94 95 self.label = Label(self) 96 """Label/project operations. See `akiflow.label.Label`.""" 97 98 self.task = Task(self) 99 """Task operations. See `akiflow.task.Task`."""
165 @property 166 def access_token(self) -> str | None: 167 """Current JWT access token (may change after auto-refresh).""" 168 return self._access_token
Current JWT access token (may change after auto-refresh).
14class Label: 15 """Operations on Akiflow labels/projects, available as `client.label`. 16 17 Labels are what Akiflow calls "projects" in the UI. Tasks reference 18 labels via their `listId` field. Labels can be organized into folders 19 using `parent_id` and `type="folder"`. 20 21 Example: 22 ```python 23 from akiflow import Akiflow 24 25 client = Akiflow(refresh_token="def50200...") 26 27 # Create a label 28 label = client.label.create("My Project", color="palette-green") 29 30 # List all labels 31 result = client.label.list() 32 for lb in result["data"]: 33 print(lb["title"], lb["id"]) 34 35 # Find label ID by name 36 label_id = client.label.get_id("My Project") 37 38 # Update 39 client.label.update(label["id"], title="Renamed Project") 40 41 # Delete 42 client.label.delete(label["id"]) 43 ``` 44 """ 45 46 # Cache of name -> id, populated on first lookup 47 _name_cache: dict[str, str] | None = None 48 49 def __init__(self, client: Akiflow): 50 self._client = client 51 self._name_cache = None 52 53 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 54 """Fetch labels, with optional incremental sync. 55 56 Args: 57 sync_token: Cursor from a previous `list()` response. 58 limit: Max labels per page (default 2500). 59 60 Returns: 61 Dict with `data` (list of label dicts), `sync_token`, and 62 `has_next_page`. 63 """ 64 params: dict[str, Any] = {"limit": str(limit)} 65 if sync_token: 66 params["sync_token"] = sync_token 67 return self._client._get("/v5/labels", params=params) 68 69 def create( 70 self, 71 title: str, 72 *, 73 color: str | None = None, 74 icon: str | None = None, 75 parent_id: str | None = None, 76 type: str | None = None, 77 **extra: Any, 78 ) -> dict: 79 """Create a new label. 80 81 Args: 82 title: Label name. 83 color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`). 84 icon: Emoji icon. 85 parent_id: UUID of a folder label to nest under. 86 type: Set to `"folder"` to create a folder, or `None` for a label. 87 **extra: Additional fields passed directly to the API. 88 89 Returns: 90 The created label dict. 91 92 Example: 93 ```python 94 label = client.label.create("Work", color="palette-cobalt") 95 folder = client.label.create("Area", type="folder") 96 nested = client.label.create("Sub-project", parent_id=folder["id"]) 97 ``` 98 """ 99 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 100 label_id = str(uuid.uuid4()) 101 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 102 103 label: dict[str, Any] = { 104 "id": label_id, 105 "title": title, 106 "color": color, 107 "icon": icon, 108 "sorting": sorting, 109 "parent_id": parent_id, 110 "type": type, 111 "data": {}, 112 "global_created_at": now, 113 "global_updated_at": now, 114 "deleted_at": None, 115 **extra, 116 } 117 118 resp = self._client._patch("/v5/labels", json=[label]) 119 created = resp["data"][0] if resp.get("data") else resp 120 # Invalidate name cache 121 self._name_cache = None 122 return created 123 124 def update(self, label_id: str, **fields: Any) -> dict: 125 """Update a label by ID. 126 127 Args: 128 label_id: UUID of the label to update. 129 **fields: Any label fields to update (e.g. `title`, `color`, `icon`). 130 131 Returns: 132 The updated label dict. 133 134 Example: 135 ```python 136 client.label.update(label_id, title="New Name", color="palette-red") 137 ``` 138 """ 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 payload: dict[str, Any] = { 141 "id": label_id, 142 "global_updated_at": now, 143 } 144 payload.update(fields) 145 146 resp = self._client._patch("/v5/labels", json=[payload]) 147 self._name_cache = None 148 if resp.get("data"): 149 return resp["data"][0] 150 return resp 151 152 def delete(self, label_id: str) -> dict: 153 """Soft-delete a label. 154 155 Args: 156 label_id: UUID of the label to delete. 157 158 Returns: 159 The updated label dict with `deleted_at` set. 160 161 Example: 162 ```python 163 client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 164 ``` 165 """ 166 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 167 payload: dict[str, Any] = { 168 "id": label_id, 169 "title": None, 170 "color": None, 171 "sorting": None, 172 "icon": None, 173 "parent_id": None, 174 "type": None, 175 "data": {}, 176 "deleted_at": now, 177 "global_updated_at": now, 178 } 179 resp = self._client._patch("/v5/labels", json=[payload]) 180 self._name_cache = None 181 if resp.get("data"): 182 return resp["data"][0] 183 return resp 184 185 def _build_name_cache(self) -> dict[str, str]: 186 """Fetch all labels and build a lowercase name -> id mapping.""" 187 result = self.list() 188 cache: dict[str, str] = {} 189 for lb in result.get("data", []): 190 if lb.get("title") and not lb.get("deleted_at"): 191 cache[lb["title"].lower()] = lb["id"] 192 return cache 193 194 def get_id(self, name: str) -> str | None: 195 """Resolve a label name to its UUID. 196 197 Case-insensitive. Returns `None` if no label matches. 198 199 Args: 200 name: Label name to look up. 201 202 Returns: 203 Label UUID, or `None` if not found. 204 205 Example: 206 ```python 207 label_id = client.label.get_id("Work") 208 ``` 209 """ 210 if self._name_cache is None: 211 self._name_cache = self._build_name_cache() 212 return self._name_cache.get(name.lower()) 213 214 def resolve_id(self, label: str) -> str: 215 """Resolve a label name or UUID to a UUID. 216 217 If `label` looks like a UUID, returns it as-is. Otherwise, looks 218 it up by name (case-insensitive). 219 220 Args: 221 label: Label UUID or name. 222 223 Returns: 224 Label UUID. 225 226 Raises: 227 ValueError: If the name doesn't match any label. 228 229 Example: 230 ```python 231 # Both return the same UUID: 232 client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 233 client.label.resolve_id("Work") 234 ``` 235 """ 236 try: 237 uuid.UUID(label) 238 return label 239 except ValueError: 240 pass 241 242 label_id = self.get_id(label) 243 if label_id is None: 244 raise ValueError(f"No label found with name: {label!r}") 245 return label_id
Operations on Akiflow labels/projects, available as client.label.
Labels are what Akiflow calls "projects" in the UI. Tasks reference
labels via their listId field. Labels can be organized into folders
using parent_id and type="folder".
Example:
from akiflow import Akiflow
client = Akiflow(refresh_token="def50200...")
# Create a label
label = client.label.create("My Project", color="palette-green")
# List all labels
result = client.label.list()
for lb in result["data"]:
print(lb["title"], lb["id"])
# Find label ID by name
label_id = client.label.get_id("My Project")
# Update
client.label.update(label["id"], title="Renamed Project")
# Delete
client.label.delete(label["id"])
53 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 54 """Fetch labels, with optional incremental sync. 55 56 Args: 57 sync_token: Cursor from a previous `list()` response. 58 limit: Max labels per page (default 2500). 59 60 Returns: 61 Dict with `data` (list of label dicts), `sync_token`, and 62 `has_next_page`. 63 """ 64 params: dict[str, Any] = {"limit": str(limit)} 65 if sync_token: 66 params["sync_token"] = sync_token 67 return self._client._get("/v5/labels", params=params)
Fetch labels, with optional incremental sync.
Args:
sync_token: Cursor from a previous list() response.
limit: Max labels per page (default 2500).
Returns:
Dict with data (list of label dicts), sync_token, and
has_next_page.
69 def create( 70 self, 71 title: str, 72 *, 73 color: str | None = None, 74 icon: str | None = None, 75 parent_id: str | None = None, 76 type: str | None = None, 77 **extra: Any, 78 ) -> dict: 79 """Create a new label. 80 81 Args: 82 title: Label name. 83 color: Color palette name (e.g. `"palette-green"`, `"palette-pink"`). 84 icon: Emoji icon. 85 parent_id: UUID of a folder label to nest under. 86 type: Set to `"folder"` to create a folder, or `None` for a label. 87 **extra: Additional fields passed directly to the API. 88 89 Returns: 90 The created label dict. 91 92 Example: 93 ```python 94 label = client.label.create("Work", color="palette-cobalt") 95 folder = client.label.create("Area", type="folder") 96 nested = client.label.create("Sub-project", parent_id=folder["id"]) 97 ``` 98 """ 99 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 100 label_id = str(uuid.uuid4()) 101 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 102 103 label: dict[str, Any] = { 104 "id": label_id, 105 "title": title, 106 "color": color, 107 "icon": icon, 108 "sorting": sorting, 109 "parent_id": parent_id, 110 "type": type, 111 "data": {}, 112 "global_created_at": now, 113 "global_updated_at": now, 114 "deleted_at": None, 115 **extra, 116 } 117 118 resp = self._client._patch("/v5/labels", json=[label]) 119 created = resp["data"][0] if resp.get("data") else resp 120 # Invalidate name cache 121 self._name_cache = None 122 return created
Create a new label.
Args:
title: Label name.
color: Color palette name (e.g. "palette-green", "palette-pink").
icon: Emoji icon.
parent_id: UUID of a folder label to nest under.
type: Set to "folder" to create a folder, or None for a label.
**extra: Additional fields passed directly to the API.
Returns: The created label dict.
Example:
label = client.label.create("Work", color="palette-cobalt")
folder = client.label.create("Area", type="folder")
nested = client.label.create("Sub-project", parent_id=folder["id"])
124 def update(self, label_id: str, **fields: Any) -> dict: 125 """Update a label by ID. 126 127 Args: 128 label_id: UUID of the label to update. 129 **fields: Any label fields to update (e.g. `title`, `color`, `icon`). 130 131 Returns: 132 The updated label dict. 133 134 Example: 135 ```python 136 client.label.update(label_id, title="New Name", color="palette-red") 137 ``` 138 """ 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 payload: dict[str, Any] = { 141 "id": label_id, 142 "global_updated_at": now, 143 } 144 payload.update(fields) 145 146 resp = self._client._patch("/v5/labels", json=[payload]) 147 self._name_cache = None 148 if resp.get("data"): 149 return resp["data"][0] 150 return resp
Update a label by ID.
Args:
label_id: UUID of the label to update.
**fields: Any label fields to update (e.g. title, color, icon).
Returns: The updated label dict.
Example:
client.label.update(label_id, title="New Name", color="palette-red")
152 def delete(self, label_id: str) -> dict: 153 """Soft-delete a label. 154 155 Args: 156 label_id: UUID of the label to delete. 157 158 Returns: 159 The updated label dict with `deleted_at` set. 160 161 Example: 162 ```python 163 client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 164 ``` 165 """ 166 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 167 payload: dict[str, Any] = { 168 "id": label_id, 169 "title": None, 170 "color": None, 171 "sorting": None, 172 "icon": None, 173 "parent_id": None, 174 "type": None, 175 "data": {}, 176 "deleted_at": now, 177 "global_updated_at": now, 178 } 179 resp = self._client._patch("/v5/labels", json=[payload]) 180 self._name_cache = None 181 if resp.get("data"): 182 return resp["data"][0] 183 return resp
Soft-delete a label.
Args: label_id: UUID of the label to delete.
Returns:
The updated label dict with deleted_at set.
Example:
client.label.delete("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
194 def get_id(self, name: str) -> str | None: 195 """Resolve a label name to its UUID. 196 197 Case-insensitive. Returns `None` if no label matches. 198 199 Args: 200 name: Label name to look up. 201 202 Returns: 203 Label UUID, or `None` if not found. 204 205 Example: 206 ```python 207 label_id = client.label.get_id("Work") 208 ``` 209 """ 210 if self._name_cache is None: 211 self._name_cache = self._build_name_cache() 212 return self._name_cache.get(name.lower())
Resolve a label name to its UUID.
Case-insensitive. Returns None if no label matches.
Args: name: Label name to look up.
Returns:
Label UUID, or None if not found.
Example:
label_id = client.label.get_id("Work")
214 def resolve_id(self, label: str) -> str: 215 """Resolve a label name or UUID to a UUID. 216 217 If `label` looks like a UUID, returns it as-is. Otherwise, looks 218 it up by name (case-insensitive). 219 220 Args: 221 label: Label UUID or name. 222 223 Returns: 224 Label UUID. 225 226 Raises: 227 ValueError: If the name doesn't match any label. 228 229 Example: 230 ```python 231 # Both return the same UUID: 232 client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9") 233 client.label.resolve_id("Work") 234 ``` 235 """ 236 try: 237 uuid.UUID(label) 238 return label 239 except ValueError: 240 pass 241 242 label_id = self.get_id(label) 243 if label_id is None: 244 raise ValueError(f"No label found with name: {label!r}") 245 return label_id
Resolve a label name or UUID to a UUID.
If label looks like a UUID, returns it as-is. Otherwise, looks
it up by name (case-insensitive).
Args: label: Label UUID or name.
Returns: Label UUID.
Raises: ValueError: If the name doesn't match any label.
Example:
# Both return the same UUID:
client.label.resolve_id("d7f7c026-bd8a-4c3a-8c16-d9677ee959e9")
client.label.resolve_id("Work")
14class Task: 15 """Operations on Akiflow tasks, available as `client.task`. 16 17 All mutations go through `PATCH /v5/tasks` (Akiflow uses upsert semantics). 18 Deletion is a soft-delete via `trashed_at`. 19 20 Example: 21 ```python 22 from akiflow import Akiflow 23 24 client = Akiflow(refresh_token="def50200...") 25 26 # Create 27 task = client.task.create("Buy groceries") 28 29 # Update 30 client.task.update(task["id"], title="Buy organic groceries") 31 32 # Mark done 33 client.task.done(task["id"]) 34 35 # Delete 36 client.task.delete(task["id"]) 37 38 # List all tasks 39 result = client.task.list() 40 for t in result["data"]: 41 print(t["title"], t["done"]) 42 ``` 43 """ 44 45 def __init__(self, client: Akiflow): 46 self._client = client 47 48 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 49 """Fetch tasks, with optional incremental sync. 50 51 Args: 52 sync_token: Cursor from a previous `list()` response. Pass this 53 to get only tasks changed since the last call. 54 limit: Max tasks per page (default 2500). 55 56 Returns: 57 Dict with `data` (list of task dicts), `sync_token` (cursor for 58 next call), and `has_next_page`. 59 60 Example: 61 ```python 62 # Full sync 63 result = client.task.list() 64 tasks = result["data"] 65 cursor = result["sync_token"] 66 67 # Incremental sync (only changes since last call) 68 result = client.task.list(sync_token=cursor) 69 ``` 70 """ 71 params: dict[str, Any] = {"limit": str(limit)} 72 if sync_token: 73 params["sync_token"] = sync_token 74 return self._client._get("/v5/tasks", params=params) 75 76 def create( 77 self, 78 title: str, 79 *, 80 description: str | None = None, 81 date: str | None = None, 82 datetime_: str | None = None, 83 datetime_tz: str | None = None, 84 duration: int | None = None, 85 due_date: str | None = None, 86 priority: int | None = None, 87 tags_ids: list[str] | None = None, 88 label: str | None = None, 89 list_id: str | None = None, 90 section_id: str | None = None, 91 links: list[str] | None = None, 92 **extra: Any, 93 ) -> dict: 94 """Create a new task. 95 96 By default, tasks land in the **inbox** (`status=1`). To schedule a 97 task on a specific date/time, pass `date` and `datetime_`. 98 99 Args: 100 title: Task title. 101 description: HTML description body. 102 date: Planned date (`"2026-03-27"`). 103 datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`). 104 datetime_tz: Timezone for display (default `"Europe/Zurich"`). 105 duration: Duration in seconds (e.g. `1800` for 30 min). 106 due_date: Hard due date (`"2026-03-28"`). 107 priority: Priority level. 108 tags_ids: List of tag UUIDs. 109 label: Label/project name **or** UUID. Resolved automatically 110 via `client.label.resolve_id()`. Takes precedence over `list_id`. 111 list_id: Project/list UUID (use `label` for name-based lookup). 112 section_id: Section within a project. 113 links: List of URL strings. 114 **extra: Additional fields passed directly to the API. 115 116 Returns: 117 The created task dict as returned by the API. 118 119 Example: 120 ```python 121 # Inbox task 122 task = client.task.create("Buy groceries") 123 124 # Task assigned to a label by name 125 task = client.task.create("Review PR", label="Work") 126 127 # Scheduled task with duration 128 task = client.task.create( 129 "Team standup", 130 date="2026-03-27", 131 datetime_="2026-03-27T09:00:00.000Z", 132 duration=1800, 133 ) 134 ``` 135 """ 136 if label is not None: 137 list_id = self._client.label.resolve_id(label) 138 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 task_id = str(uuid.uuid4()) 141 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 142 143 task: dict[str, Any] = { 144 "id": task_id, 145 "title": title, 146 "description": description, 147 "status": 1, 148 "done": False, 149 "done_at": None, 150 "date": date, 151 "datetime": datetime_, 152 "datetime_tz": datetime_tz or "Europe/Zurich", 153 "original_date": None, 154 "original_datetime": None, 155 "duration": duration, 156 "due_date": due_date, 157 "priority": priority, 158 "sorting": sorting, 159 "sorting_label": sorting, 160 "tags_ids": tags_ids, 161 "links": links or [], 162 "listId": list_id, 163 "section_id": section_id, 164 "calendar_id": None, 165 "time_slot_id": None, 166 "recurring_id": None, 167 "recurrence": None, 168 "recurrence_version": None, 169 "plan_unit": None, 170 "plan_period": None, 171 "origin": None, 172 "connector_id": None, 173 "origin_id": None, 174 "origin_account_id": None, 175 "doc": None, 176 "content": None, 177 "data": {}, 178 "search_text": "", 179 "trashed_at": None, 180 "deleted_at": None, 181 "global_created_at": now, 182 "global_updated_at": now, 183 "global_list_id_updated_at": now if list_id else None, 184 "global_tags_ids_updated_at": None, 185 **extra, 186 } 187 188 resp = self._client._patch("/v5/tasks", json=[task]) 189 # Return the single created task 190 if resp.get("data"): 191 return resp["data"][0] 192 return resp 193 194 def update(self, task_id: str, **fields: Any) -> dict: 195 """Update a task by ID. 196 197 Only pass the fields you want to change. The `global_updated_at` 198 timestamp is set automatically. 199 200 Args: 201 task_id: UUID of the task to update. 202 **fields: Any task fields to update. Use `label` for name-based 203 project lookup, `list_id` for UUID-based, and `datetime_` 204 for the datetime field. 205 206 Returns: 207 The updated task dict. 208 209 Example: 210 ```python 211 # Rename 212 client.task.update(task_id, title="New title") 213 214 # Assign to a label by name 215 client.task.update(task_id, label="Work") 216 217 # Reschedule 218 client.task.update( 219 task_id, 220 date="2026-04-01", 221 datetime_="2026-04-01T14:00:00.000Z", 222 ) 223 ``` 224 """ 225 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 226 payload: dict[str, Any] = { 227 "id": task_id, 228 "global_updated_at": now, 229 } 230 231 # Resolve label name/UUID -> listId 232 if "label" in fields: 233 label = fields.pop("label") 234 if label is not None: 235 fields["list_id"] = self._client.label.resolve_id(label) 236 else: 237 fields["list_id"] = None 238 239 # Map python-friendly names to API names 240 if "list_id" in fields: 241 payload["listId"] = fields.pop("list_id") 242 payload["global_list_id_updated_at"] = now 243 if "datetime_" in fields: 244 payload["datetime"] = fields.pop("datetime_") 245 246 payload.update(fields) 247 248 resp = self._client._patch("/v5/tasks", json=[payload]) 249 if resp.get("data"): 250 return resp["data"][0] 251 return resp 252 253 def delete(self, task_id: str) -> dict: 254 """Soft-delete a task. 255 256 Sets `trashed_at` to the current time. The task can still be 257 recovered in Akiflow's trash. 258 259 Args: 260 task_id: UUID of the task to delete. 261 262 Returns: 263 The updated task dict with `trashed_at` set. 264 265 Example: 266 ```python 267 client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee") 268 ``` 269 """ 270 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 271 payload = { 272 "id": task_id, 273 "status": 10, 274 "trashed_at": now, 275 "global_updated_at": now, 276 } 277 resp = self._client._patch("/v5/tasks", json=[payload]) 278 if resp.get("data"): 279 return resp["data"][0] 280 return resp 281 282 def done(self, task_id: str, *, date: str | None = None) -> dict: 283 """Mark a task as done. 284 285 Args: 286 task_id: UUID of the task to complete. 287 date: Date in ``YYYY-MM-DD`` format. Defaults to today. 288 A date is required for the task to appear in Akiflow's 289 done list. 290 291 Returns: 292 The updated task dict with `done=True`. 293 294 Example: 295 ```python 296 client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee") 297 client.task.done(task_id, date="2026-03-20") 298 ``` 299 """ 300 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 301 if date is None: 302 date = datetime.now(timezone.utc).strftime("%Y-%m-%d") 303 return self.update(task_id, done=True, done_at=now, date=date, status=2)
Operations on Akiflow tasks, available as client.task.
All mutations go through PATCH /v5/tasks (Akiflow uses upsert semantics).
Deletion is a soft-delete via trashed_at.
Example:
from akiflow import Akiflow
client = Akiflow(refresh_token="def50200...")
# Create
task = client.task.create("Buy groceries")
# Update
client.task.update(task["id"], title="Buy organic groceries")
# Mark done
client.task.done(task["id"])
# Delete
client.task.delete(task["id"])
# List all tasks
result = client.task.list()
for t in result["data"]:
print(t["title"], t["done"])
48 def list(self, *, sync_token: str | None = None, limit: int = 2500) -> dict: 49 """Fetch tasks, with optional incremental sync. 50 51 Args: 52 sync_token: Cursor from a previous `list()` response. Pass this 53 to get only tasks changed since the last call. 54 limit: Max tasks per page (default 2500). 55 56 Returns: 57 Dict with `data` (list of task dicts), `sync_token` (cursor for 58 next call), and `has_next_page`. 59 60 Example: 61 ```python 62 # Full sync 63 result = client.task.list() 64 tasks = result["data"] 65 cursor = result["sync_token"] 66 67 # Incremental sync (only changes since last call) 68 result = client.task.list(sync_token=cursor) 69 ``` 70 """ 71 params: dict[str, Any] = {"limit": str(limit)} 72 if sync_token: 73 params["sync_token"] = sync_token 74 return self._client._get("/v5/tasks", params=params)
Fetch tasks, with optional incremental sync.
Args:
sync_token: Cursor from a previous list() response. Pass this
to get only tasks changed since the last call.
limit: Max tasks per page (default 2500).
Returns:
Dict with data (list of task dicts), sync_token (cursor for
next call), and has_next_page.
Example:
# Full sync
result = client.task.list()
tasks = result["data"]
cursor = result["sync_token"]
# Incremental sync (only changes since last call)
result = client.task.list(sync_token=cursor)
76 def create( 77 self, 78 title: str, 79 *, 80 description: str | None = None, 81 date: str | None = None, 82 datetime_: str | None = None, 83 datetime_tz: str | None = None, 84 duration: int | None = None, 85 due_date: str | None = None, 86 priority: int | None = None, 87 tags_ids: list[str] | None = None, 88 label: str | None = None, 89 list_id: str | None = None, 90 section_id: str | None = None, 91 links: list[str] | None = None, 92 **extra: Any, 93 ) -> dict: 94 """Create a new task. 95 96 By default, tasks land in the **inbox** (`status=1`). To schedule a 97 task on a specific date/time, pass `date` and `datetime_`. 98 99 Args: 100 title: Task title. 101 description: HTML description body. 102 date: Planned date (`"2026-03-27"`). 103 datetime_: Planned datetime in UTC (`"2026-03-27T09:00:00.000Z"`). 104 datetime_tz: Timezone for display (default `"Europe/Zurich"`). 105 duration: Duration in seconds (e.g. `1800` for 30 min). 106 due_date: Hard due date (`"2026-03-28"`). 107 priority: Priority level. 108 tags_ids: List of tag UUIDs. 109 label: Label/project name **or** UUID. Resolved automatically 110 via `client.label.resolve_id()`. Takes precedence over `list_id`. 111 list_id: Project/list UUID (use `label` for name-based lookup). 112 section_id: Section within a project. 113 links: List of URL strings. 114 **extra: Additional fields passed directly to the API. 115 116 Returns: 117 The created task dict as returned by the API. 118 119 Example: 120 ```python 121 # Inbox task 122 task = client.task.create("Buy groceries") 123 124 # Task assigned to a label by name 125 task = client.task.create("Review PR", label="Work") 126 127 # Scheduled task with duration 128 task = client.task.create( 129 "Team standup", 130 date="2026-03-27", 131 datetime_="2026-03-27T09:00:00.000Z", 132 duration=1800, 133 ) 134 ``` 135 """ 136 if label is not None: 137 list_id = self._client.label.resolve_id(label) 138 139 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 140 task_id = str(uuid.uuid4()) 141 sorting = int(datetime.now(timezone.utc).timestamp() * 1000) 142 143 task: dict[str, Any] = { 144 "id": task_id, 145 "title": title, 146 "description": description, 147 "status": 1, 148 "done": False, 149 "done_at": None, 150 "date": date, 151 "datetime": datetime_, 152 "datetime_tz": datetime_tz or "Europe/Zurich", 153 "original_date": None, 154 "original_datetime": None, 155 "duration": duration, 156 "due_date": due_date, 157 "priority": priority, 158 "sorting": sorting, 159 "sorting_label": sorting, 160 "tags_ids": tags_ids, 161 "links": links or [], 162 "listId": list_id, 163 "section_id": section_id, 164 "calendar_id": None, 165 "time_slot_id": None, 166 "recurring_id": None, 167 "recurrence": None, 168 "recurrence_version": None, 169 "plan_unit": None, 170 "plan_period": None, 171 "origin": None, 172 "connector_id": None, 173 "origin_id": None, 174 "origin_account_id": None, 175 "doc": None, 176 "content": None, 177 "data": {}, 178 "search_text": "", 179 "trashed_at": None, 180 "deleted_at": None, 181 "global_created_at": now, 182 "global_updated_at": now, 183 "global_list_id_updated_at": now if list_id else None, 184 "global_tags_ids_updated_at": None, 185 **extra, 186 } 187 188 resp = self._client._patch("/v5/tasks", json=[task]) 189 # Return the single created task 190 if resp.get("data"): 191 return resp["data"][0] 192 return resp
Create a new task.
By default, tasks land in the inbox (status=1). To schedule a
task on a specific date/time, pass date and datetime_.
Args:
title: Task title.
description: HTML description body.
date: Planned date ("2026-03-27").
datetime_: Planned datetime in UTC ("2026-03-27T09:00:00.000Z").
datetime_tz: Timezone for display (default "Europe/Zurich").
duration: Duration in seconds (e.g. 1800 for 30 min).
due_date: Hard due date ("2026-03-28").
priority: Priority level.
tags_ids: List of tag UUIDs.
label: Label/project name or UUID. Resolved automatically
via client.label.resolve_id(). Takes precedence over list_id.
list_id: Project/list UUID (use label for name-based lookup).
section_id: Section within a project.
links: List of URL strings.
**extra: Additional fields passed directly to the API.
Returns: The created task dict as returned by the API.
Example:
# Inbox task
task = client.task.create("Buy groceries")
# Task assigned to a label by name
task = client.task.create("Review PR", label="Work")
# Scheduled task with duration
task = client.task.create(
"Team standup",
date="2026-03-27",
datetime_="2026-03-27T09:00:00.000Z",
duration=1800,
)
194 def update(self, task_id: str, **fields: Any) -> dict: 195 """Update a task by ID. 196 197 Only pass the fields you want to change. The `global_updated_at` 198 timestamp is set automatically. 199 200 Args: 201 task_id: UUID of the task to update. 202 **fields: Any task fields to update. Use `label` for name-based 203 project lookup, `list_id` for UUID-based, and `datetime_` 204 for the datetime field. 205 206 Returns: 207 The updated task dict. 208 209 Example: 210 ```python 211 # Rename 212 client.task.update(task_id, title="New title") 213 214 # Assign to a label by name 215 client.task.update(task_id, label="Work") 216 217 # Reschedule 218 client.task.update( 219 task_id, 220 date="2026-04-01", 221 datetime_="2026-04-01T14:00:00.000Z", 222 ) 223 ``` 224 """ 225 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 226 payload: dict[str, Any] = { 227 "id": task_id, 228 "global_updated_at": now, 229 } 230 231 # Resolve label name/UUID -> listId 232 if "label" in fields: 233 label = fields.pop("label") 234 if label is not None: 235 fields["list_id"] = self._client.label.resolve_id(label) 236 else: 237 fields["list_id"] = None 238 239 # Map python-friendly names to API names 240 if "list_id" in fields: 241 payload["listId"] = fields.pop("list_id") 242 payload["global_list_id_updated_at"] = now 243 if "datetime_" in fields: 244 payload["datetime"] = fields.pop("datetime_") 245 246 payload.update(fields) 247 248 resp = self._client._patch("/v5/tasks", json=[payload]) 249 if resp.get("data"): 250 return resp["data"][0] 251 return resp
Update a task by ID.
Only pass the fields you want to change. The global_updated_at
timestamp is set automatically.
Args:
task_id: UUID of the task to update.
**fields: Any task fields to update. Use label for name-based
project lookup, list_id for UUID-based, and datetime_
for the datetime field.
Returns: The updated task dict.
Example:
# Rename
client.task.update(task_id, title="New title")
# Assign to a label by name
client.task.update(task_id, label="Work")
# Reschedule
client.task.update(
task_id,
date="2026-04-01",
datetime_="2026-04-01T14:00:00.000Z",
)
253 def delete(self, task_id: str) -> dict: 254 """Soft-delete a task. 255 256 Sets `trashed_at` to the current time. The task can still be 257 recovered in Akiflow's trash. 258 259 Args: 260 task_id: UUID of the task to delete. 261 262 Returns: 263 The updated task dict with `trashed_at` set. 264 265 Example: 266 ```python 267 client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee") 268 ``` 269 """ 270 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 271 payload = { 272 "id": task_id, 273 "status": 10, 274 "trashed_at": now, 275 "global_updated_at": now, 276 } 277 resp = self._client._patch("/v5/tasks", json=[payload]) 278 if resp.get("data"): 279 return resp["data"][0] 280 return resp
Soft-delete a task.
Sets trashed_at to the current time. The task can still be
recovered in Akiflow's trash.
Args: task_id: UUID of the task to delete.
Returns:
The updated task dict with trashed_at set.
Example:
client.task.delete("59442bbd-a57d-464f-9fa2-2cb9678379ee")
282 def done(self, task_id: str, *, date: str | None = None) -> dict: 283 """Mark a task as done. 284 285 Args: 286 task_id: UUID of the task to complete. 287 date: Date in ``YYYY-MM-DD`` format. Defaults to today. 288 A date is required for the task to appear in Akiflow's 289 done list. 290 291 Returns: 292 The updated task dict with `done=True`. 293 294 Example: 295 ```python 296 client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee") 297 client.task.done(task_id, date="2026-03-20") 298 ``` 299 """ 300 now = datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z") 301 if date is None: 302 date = datetime.now(timezone.utc).strftime("%Y-%m-%d") 303 return self.update(task_id, done=True, done_at=now, date=date, status=2)
Mark a task as done.
Args:
task_id: UUID of the task to complete.
date: Date in YYYY-MM-DD format. Defaults to today.
A date is required for the task to appear in Akiflow's
done list.
Returns:
The updated task dict with done=True.
Example:
client.task.done("59442bbd-a57d-464f-9fa2-2cb9678379ee")
client.task.done(task_id, date="2026-03-20")
Base exception for all Akiflow SDK errors.
13class APIError(AkiflowError): 14 """Raised on non-2xx responses from the Akiflow API. 15 16 Attributes: 17 status_code: HTTP status code. 18 message: Error message from the response body. 19 """ 20 21 def __init__(self, status_code: int, message: str): 22 self.status_code = status_code 23 self.message = message 24 super().__init__(f"HTTP {status_code}: {message}")
Raised on non-2xx responses from the Akiflow API.
Attributes: status_code: HTTP status code. message: Error message from the response body.
9class AuthError(AkiflowError): 10 """Raised when authentication fails (bad OTP, expired link, etc.)."""
Raised when authentication fails (bad OTP, expired link, etc.).
27class TokenExpiredError(APIError): 28 """Raised when the access token is expired and cannot be refreshed. 29 30 This typically means the refresh token has also expired or was revoked. 31 Re-authenticate with `Akiflow(email=...)` to get new tokens. 32 """
Raised when the access token is expired and cannot be refreshed.
This typically means the refresh token has also expired or was revoked.
Re-authenticate with Akiflow(email=...) to get new tokens.