Rate Limiting
Understand TaskFlow API rate limits and how to handle them. ✅
Overview
TaskFlow implements rate limiting to ensure fair usage and maintain service reliability. ⏳ All API requests count toward your rate limit.
Rate Limit Tiers 🛠️
Your rate limit depends on your API key type and account plan:
| Plan | Key Type | Requests/Minute | Requests/Hour | Burst Limit |
|---|---|---|---|---|
| Free | Live | 60 | 1,000 | |
| Pro | Live | 600 | 20,000 | 1,000 |
| Enterprise | Live | 6,000 | 200,000 | |
| All Plans | Test | 100 | 2,000 | |
| All Plans | Read-Only | 300 | 10,000 | 500 |
Burst Limit 🛠️
The burst limit allows short spikes above your per-minute rate. Once exhausted, requests are limited to your sustained rate.
Rate Limit Headers
Every API response includes rate limit information in headers: 💡
X-RateLimit-Limit: 600
X-RateLimit-Remaining: 543
X-RateLimit-Reset: 1707577200
X-RateLimit-Retry-After: 42
Header Descriptions ✅
X-RateLimit-Limit- Total requests allowed per windowX-RateLimit-Remaining- Requests remaining in current window ❗X-RateLimit-Reset- Unix timestamp when the limit resetsX-RateLimit-Retry-After- Seconds until you can retry (only present when rate limited)
Handling Rate Limits
429 Too Many Requests 🛠️
When you exceed your rate limit, the API returns:
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 600
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1707577200
X-RateLimit-Retry-After: 42
Retry-After: 42
Content-Type: application/json
{
"error": {
"code": "rate_limit_exceeded",
"message": "Rate limit exceeded. Retry after 42 seconds.",
"retry_after": 42,
"limit": 600,
"window": "minute"
}
}
Handling in Code 🔍
Python Example
import time
from taskflow import Client
from taskflow.exceptions import RateLimitError
client = Client(api_key="your_key")
def create_task_with_retry(title):
"""Create a task with automatic retry on rate limit."""
max_retries = 3
for attempt in range(max_retries):
try:
return client.tasks.create(title=title)
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = e.retry_after
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
This example shows basic retry logic with exponential backoff. ✅
JavaScript Example 🛠️
const taskflow = require('@taskflow/sdk');
async function createTaskWithRetry(title) {
const maxRetries = 3;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await taskflow.tasks.create({ title });
} catch (error) {
if (error.code === 'rate_limit_exceeded' && attempt < maxRetries - 1) {
const waitTime = error.retryAfter * 1000;
console.log(`Rate limited. Waiting ${waitTime}ms...`);
await new Promise(resolve => setTimeout(resolve, waitTime));
} else {
throw error;
}
}
}
}
Best Practices
1. Respect the Retry-After Header 🚨
Always wait at least the Retry-After duration before retrying:
if response.status_code == 429:
retry_after = int(response.headers['Retry-After'])
time.sleep(retry_after)
2. Implement Exponential Backoff ⏳
For transient errors, use exponential backoff:
def exponential_backoff(attempt, base=2, max_wait=60):
"""Calculate exponential backoff delay."""
wait = min(base ** attempt, max_wait)
return wait
3. Cache Responses 💡
Cache API responses that don't change frequently:
import time
cache = {}
def get_task_cached(task_id, ttl=60):
"""Get task with caching."""
now = time.time()
if task_id in cache:
task, timestamp = cache[task_id]
if now - timestamp < ttl:
return task
task = client.tasks.get(task_id)
cache[task_id] = (task, now)
return task
4. Use Webhooks Instead of Polling 🔍
Instead of repeatedly polling for updates, use webhooks to receive real-time notifications:
# ❌ Bad: Polling every second
while True:
tasks = client.tasks.list(status="updated")
process_tasks(tasks)
time.sleep(1) # Wastes rate limit!
# ✅ Good: Use webhooks
# Register webhook once, receive updates automatically
webhook = client.webhooks.create(
url="https://yourapp.com/hooks",
events=["task.updated"]
)
5. Batch Requests 🚨
When possible, batch multiple operations into single requests:
# ❌ Bad: 100 separate requests
for task_id in task_ids:
client.tasks.update(task_id, status="completed")
# ✅ Better: Batch update (coming soon)
client.tasks.batch_update(
task_ids=task_ids,
updates={"status": "completed"}
)
6. Distribute Requests 🛠️
Spread requests evenly throughout your rate limit window:
import time
def rate_limited_loop(items, requests_per_minute=60):
"""Process items within rate limit."""
delay = 60.0 / requests_per_minute
for item in items:
process_item(item)
time.sleep(delay)
Monitoring Rate Limit Usage ✅
Check your rate limit status: ❗
# Check current rate limit status
response = client._request('GET', '/v2/tasks')
remaining = int(response.headers['X-RateLimit-Remaining'])
limit = int(response.headers['X-RateLimit-Limit'])
usage_percent = (1 - remaining / limit) * 100
print(f"Rate limit usage: {usage_percent:.1f}%")
Rate Limit Exemptions ⏳
Certain endpoints have different limits: 🛠️
- Health Check (
GET /v2/health) - Not rate limited ✅ - Webhooks (incoming) - 1000 events/minute
- Bulk Exports - 10 requests/hour (to prevent abuse) 🔍
Increasing Your Limit
Need higher rate limits? 🛠️
- Pro Plan: Upgrade your account
- Enterprise: Contact sales for custom limits
- Temporary Increase: Submit a request for event-specific increases ❗
Troubleshooting
"Rate limit exceeded" immediately after reset 💡
Your system clock may be out of sync. Sync with NTP:
sudo ntpdate -s time.apple.com
Inconsistent rate limit across requests 🔍
Rate limits are enforced per API key across all requests. If multiple servers use the same key, they share the limit.
- Solution*: Use separate API keys for different services or implement client-side coordination. ✅
Hitting limits with low request volume ⏳
Check for: - Polling loops making frequent requests - Webhooks triggering API calls creating feedback loops 🚨 - Multiple instances using the same API key
Next Steps
- Set up Webhooks to reduce polling
- Explore the Tasks API
- Learn about authentication