Coverage for src/alprina_cli/auth.py: 12%
243 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-12 18:07 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-12 18:07 +0100
1"""
2Authentication module for Alprina CLI.
3Handles OAuth, API key authentication, and token management.
4"""
6import os
7import json
8from pathlib import Path
9from typing import Optional
10import httpx
11from rich.console import Console
12from rich.panel import Panel
13from rich.prompt import Prompt
14from rich.progress import Progress, SpinnerColumn, TextColumn
16# Import our new error classes and utilities
17from .utils.errors import AuthenticationError, APIError, NetworkError
18from .utils.welcome import show_welcome
20console = Console()
22ALPRINA_DIR = Path.home() / ".alprina"
23TOKEN_FILE = ALPRINA_DIR / "token"
24CONFIG_FILE = ALPRINA_DIR / "config.json"
27def ensure_alprina_dir():
28 """Ensure the .alprina directory exists."""
29 ALPRINA_DIR.mkdir(exist_ok=True)
32def save_token(token: str, user_info: Optional[dict] = None):
33 """Save authentication token to disk."""
34 ensure_alprina_dir()
36 auth_data = {
37 "token": token,
38 "user": user_info or {}
39 }
41 TOKEN_FILE.write_text(json.dumps(auth_data, indent=2))
42 TOKEN_FILE.chmod(0o600) # Restrict permissions
43 console.print("[green]✓[/green] Authentication successful")
46def load_token() -> Optional[dict]:
47 """
48 Load authentication token from environment or disk.
50 Priority:
51 1. ALPRINA_API_KEY environment variable
52 2. ~/.alprina/token file
54 Returns:
55 dict with token and user info, or None if not authenticated
56 """
57 from .config import get_api_key
59 # Check environment variable first
60 api_key = get_api_key()
62 if api_key:
63 # Try to load user info from file if it exists
64 if TOKEN_FILE.exists():
65 try:
66 data = json.loads(TOKEN_FILE.read_text())
67 return {
68 "token": api_key,
69 "user": data.get("user", {})
70 }
71 except Exception:
72 pass
74 # Return just the API key if file doesn't exist
75 return {
76 "token": api_key,
77 "user": {}
78 }
80 return None
83def remove_token():
84 """Remove authentication token."""
85 if TOKEN_FILE.exists():
86 TOKEN_FILE.unlink()
89def get_backend_url() -> str:
90 """Get backend URL from environment or use default."""
91 return os.getenv("ALPRINA_BACKEND", "https://api.alprina.com/v1")
94def login_command(api_key: Optional[str] = None, oauth_provider: Optional[str] = None, code: Optional[str] = None):
95 """
96 Handle user login via browser-based OAuth, CLI code, or API key.
97 """
98 console.print(Panel("🔐 Alprina Authentication", style="bold cyan"))
100 # If CLI code is provided, use the reverse flow
101 if code:
102 login_with_cli_code(code)
103 return
105 # If API key is provided directly, use it
106 if api_key:
107 console.print("Authenticating with API key...")
108 authenticate_with_api_key(api_key)
109 return
111 backend_url = get_backend_url()
113 # Default: Browser-based OAuth (recommended)
114 console.print("\n[bold cyan]🌐 Opening browser for authentication...[/bold cyan]")
115 console.print("[dim]This is the fastest way to get started![/dim]\n")
117 try:
118 login_with_browser()
119 return
120 except Exception as e:
121 console.print(f"\n[yellow]⚠️ Browser authentication failed: {e}[/yellow]")
122 console.print("\n[bold]Alternative authentication methods:[/bold]")
123 console.print(" [cyan]1.[/cyan] Use dashboard code: [bold]alprina auth login --code YOUR_CODE[/bold]")
124 console.print(" [cyan]2.[/cyan] Use API key: [bold]alprina auth login --api-key YOUR_KEY[/bold]")
125 console.print()
126 console.print("[dim]💡 Get your code from: https://www.alprina.com/dashboard[/dim]")
128 if Prompt.ask("\nWould you like to try manual authentication now?", choices=["y", "n"], default="n") == "y":
129 console.print("\n[bold]Choose method:[/bold]")
130 console.print(" [cyan]1.[/cyan] Dashboard code (quick)")
131 console.print(" [cyan]2.[/cyan] API key (advanced)")
133 choice = Prompt.ask("Select option", choices=["1", "2"], default="1")
135 if choice == "1":
136 code_input = Prompt.ask("Enter your 6-digit code from dashboard")
137 login_with_cli_code(code_input)
138 else:
139 console.print("\n[yellow]ℹ️ To get your API key:[/yellow]")
140 console.print(" 1. Visit: [bold cyan]https://www.alprina.com/dashboard?tab=keys[/bold cyan]")
141 console.print(" 2. Click 'Create New API Key'")
142 console.print(" 3. Copy your API key")
143 console.print()
144 api_key = Prompt.ask("Enter your API key")
145 authenticate_with_api_key(api_key)
147def authenticate_with_api_key(api_key: str):
148 """Authenticate using an API key."""
149 backend_url = get_backend_url()
151 try:
152 with Progress(
153 SpinnerColumn(),
154 TextColumn("[progress.description]{task.description}"),
155 console=console,
156 transient=True
157 ) as progress:
158 progress.add_task("Verifying API key...", total=None)
160 response = httpx.get(
161 f"{backend_url}/auth/me",
162 headers={"Authorization": f"Bearer {api_key}"},
163 timeout=10.0
164 )
166 if response.status_code == 200:
167 data = response.json()
168 user_info = data.get("user", {})
170 # Save API key and user info
171 save_token(api_key, user_info)
173 # Show welcome screen
174 console.print()
175 show_welcome(force=True)
177 elif response.status_code == 401:
178 raise AuthenticationError()
179 elif response.status_code == 403:
180 from .utils.errors import InvalidTierError
181 raise InvalidTierError("CLI access", "Developer")
183 else:
184 error_msg = "Authentication failed"
185 try:
186 error_data = response.json()
187 error_msg = error_data.get('detail', error_msg)
188 except:
189 pass
190 raise APIError(response.status_code, error_msg)
192 except httpx.ConnectError:
193 raise NetworkError(f"Could not connect to {backend_url}")
194 except (AuthenticationError, APIError, NetworkError):
195 raise # Re-raise our custom errors
196 except Exception as e:
197 from .utils.errors import AlprinaError
198 raise AlprinaError(
199 message=f"Unexpected error: {e}",
200 solution="Please try again or contact support@alprina.com"
201 )
204def login_with_cli_code(cli_code: str):
205 """
206 Login using a CLI code from the dashboard (reverse flow).
207 User gets code from dashboard, enters it here.
208 """
209 backend_url = get_backend_url()
211 try:
212 with Progress(
213 SpinnerColumn(),
214 TextColumn("[progress.description]{task.description}"),
215 console=console,
216 transient=True
217 ) as progress:
218 progress.add_task(f"Verifying CLI code: {cli_code}...", total=None)
220 response = httpx.post(
221 f"{backend_url}/auth/cli-verify",
222 json={"cli_code": cli_code.upper()},
223 timeout=10.0
224 )
226 if response.status_code == 200:
227 auth_data = response.json()
228 api_key = auth_data["api_key"]
229 user = auth_data["user"]
231 # Save token
232 save_token(api_key, user)
234 # Show welcome screen
235 console.print()
236 show_welcome(force=True)
238 elif response.status_code == 404:
239 console.print(f"\n[red]✗ Invalid or expired CLI code: {cli_code}[/red]")
240 console.print("[yellow]Please get a new code from your dashboard.[/yellow]")
242 elif response.status_code == 400:
243 console.print(f"\n[red]✗ CLI code has already been used: {cli_code}[/red]")
244 console.print("[yellow]Please generate a new code from your dashboard.[/yellow]")
246 else:
247 console.print(f"\n[red]✗ Failed to verify CLI code: {response.status_code}[/red]")
248 try:
249 error_data = response.json()
250 # Handle both API response formats: {detail: ...} and {error: {message: ...}}
251 error_message = (
252 error_data.get('detail') or
253 error_data.get('error', {}).get('message') or
254 'Unknown error'
255 )
256 console.print(f"[red]{error_message}[/red]")
257 except:
258 console.print(f"[red]{response.text}[/red]")
260 except httpx.ConnectError:
261 console.print(f"[red]✗ Could not connect to Alprina backend at {backend_url}[/red]")
262 console.print("[yellow]Make sure you have internet connectivity.[/yellow]")
264 except Exception as e:
265 console.print(f"[red]✗ Error: {e}[/red]")
268def logout_command():
269 """Handle user logout."""
270 if TOKEN_FILE.exists():
271 remove_token()
272 console.print("[green]✓[/green] Logged out successfully")
273 else:
274 console.print("[yellow]You are not logged in[/yellow]")
277def status_command():
278 """Show current authentication status."""
279 auth_data = load_token()
281 if auth_data:
282 user = auth_data.get("user", {})
283 api_key = auth_data.get("token", "")
285 # Show masked API key
286 if api_key:
287 masked_key = f"{api_key[:15]}...{api_key[-4:]}" if len(api_key) > 20 else "***"
288 else:
289 masked_key = "None"
291 console.print(Panel(
292 f"[green]✓ Authenticated[/green]\n\n"
293 f"Name: {user.get('full_name', 'N/A')}\n"
294 f"Email: {user.get('email', 'N/A')}\n"
295 f"Plan: {user.get('tier', 'free').title()}\n"
296 f"API Key: {masked_key}",
297 title="Authentication Status"
298 ))
299 else:
300 console.print(Panel(
301 "[red]✗ Not authenticated[/red]\n\n"
302 "Run [bold]alprina auth login[/bold] to authenticate",
303 title="Authentication Status"
304 ))
307def get_auth_headers() -> dict:
308 """Get authentication headers for API requests."""
309 auth_data = load_token()
311 if not auth_data:
312 console.print("[red]Not authenticated. Run 'alprina auth login' first.[/red]")
313 raise Exception("Not authenticated")
315 return {"Authorization": f"Bearer {auth_data['token']}"}
318def is_authenticated() -> bool:
319 """Check if user is authenticated."""
320 return TOKEN_FILE.exists() and load_token() is not None
323def login_with_browser():
324 """
325 Browser-based OAuth flow (like GitHub CLI).
326 Opens browser for user to authorize, polls for completion.
327 """
328 import webbrowser
329 import time
331 backend_url = get_backend_url()
333 try:
334 # Step 1: Request device authorization
335 console.print("\n[cyan]→[/cyan] Requesting device authorization...")
337 response = httpx.post(f"{backend_url}/auth/device", timeout=10.0)
339 if response.status_code != 200:
340 console.print(f"[red]✗ Failed to request authorization: {response.status_code}[/red]")
341 return
343 data = response.json()
344 device_code = data["device_code"]
345 user_code = data["user_code"]
346 verification_url = data["verification_url"]
347 expires_in = data.get("expires_in", 900)
348 interval = data.get("interval", 5)
350 # Step 2: Display code and open browser
351 console.print()
352 console.print(Panel(
353 f"[bold yellow]{user_code}[/bold yellow]",
354 title="🔑 Your Verification Code",
355 subtitle="Enter this code in your browser"
356 ))
357 console.print()
358 console.print(f"[cyan]→[/cyan] Opening browser to: [dim]{verification_url}[/dim]")
359 console.print(f"[dim]If browser doesn't open, visit manually[/dim]")
360 console.print()
362 # Open browser with code pre-filled (like GitHub CLI)
363 url_with_code = f"{verification_url}?user_code={user_code}"
364 try:
365 webbrowser.open(url_with_code)
366 except:
367 console.print("[yellow]⚠️ Could not open browser automatically[/yellow]")
368 console.print(f"[yellow]Please visit: {url_with_code}[/yellow]")
370 # Step 3: Poll for authorization with progress indicator
371 max_attempts = expires_in // interval # Usually 180 attempts (15 minutes)
373 console.print()
374 console.print("[dim]Tip: Make sure you're logged into the website first![/dim]")
375 console.print("[dim]Press Ctrl+C to cancel[/dim]")
376 console.print()
378 try:
379 with Progress(
380 SpinnerColumn(),
381 TextColumn("[progress.description]{task.description}"),
382 console=console,
383 transient=False
384 ) as progress:
385 task = progress.add_task("Waiting for authorization...", total=None)
387 for attempt in range(max_attempts):
388 time.sleep(interval)
390 try:
391 poll_response = httpx.post(
392 f"{backend_url}/auth/device/token",
393 json={"device_code": device_code},
394 timeout=10.0
395 )
397 if poll_response.status_code == 200:
398 # ✓ Authorized!
399 progress.update(task, description="[green]✓ Authorization successful![/green]")
401 auth_data = poll_response.json()
402 api_key = auth_data["api_key"]
403 user = auth_data["user"]
405 # Save token
406 save_token(api_key, user)
408 # Show welcome screen
409 console.print()
410 show_welcome(force=True)
411 return
413 elif poll_response.status_code == 400:
414 error_data = poll_response.json()
415 error_type = error_data.get("detail", {})
417 if isinstance(error_type, dict):
418 error_code = error_type.get("error")
420 if error_code == "authorization_pending":
421 # Still waiting...
422 console.print(".", end="", style="dim")
423 continue
424 elif error_code == "expired_token":
425 console.print("\n[red]✗ Authorization expired. Please try again.[/red]")
426 return
427 else:
428 console.print(f"\n[red]✗ Error: {error_type}[/red]")
429 return
431 except httpx.ReadTimeout:
432 console.print(".", end="", style="dim")
433 continue
434 except httpx.ConnectError:
435 console.print(f"\n[red]✗ Could not connect to backend[/red]")
436 return
437 except Exception as e:
438 console.print(".", end="", style="dim")
439 continue
441 console.print("\n[red]✗ Authorization timed out. Please try again.[/red]")
442 except KeyboardInterrupt:
443 console.print("\n\n[yellow]Authorization cancelled by user.[/yellow]")
444 console.print("[dim]You can try again with: alprina auth login[/dim]")
445 return
447 except httpx.ConnectError:
448 console.print(f"[red]✗ Could not connect to Alprina backend at {backend_url}[/red]")
449 console.print("[yellow]Make sure the API server is running[/yellow]")
450 except Exception as e:
451 console.print(f"[red]✗ Error: {e}[/red]")