Coverage for src/alprina_cli/auth.py: 12%

243 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-12 18:07 +0100

1""" 

2Authentication module for Alprina CLI. 

3Handles OAuth, API key authentication, and token management. 

4""" 

5 

6import os 

7import json 

8from pathlib import Path 

9from typing import Optional 

10import httpx 

11from rich.console import Console 

12from rich.panel import Panel 

13from rich.prompt import Prompt 

14from rich.progress import Progress, SpinnerColumn, TextColumn 

15 

16# Import our new error classes and utilities 

17from .utils.errors import AuthenticationError, APIError, NetworkError 

18from .utils.welcome import show_welcome 

19 

20console = Console() 

21 

22ALPRINA_DIR = Path.home() / ".alprina" 

23TOKEN_FILE = ALPRINA_DIR / "token" 

24CONFIG_FILE = ALPRINA_DIR / "config.json" 

25 

26 

27def ensure_alprina_dir(): 

28 """Ensure the .alprina directory exists.""" 

29 ALPRINA_DIR.mkdir(exist_ok=True) 

30 

31 

32def save_token(token: str, user_info: Optional[dict] = None): 

33 """Save authentication token to disk.""" 

34 ensure_alprina_dir() 

35 

36 auth_data = { 

37 "token": token, 

38 "user": user_info or {} 

39 } 

40 

41 TOKEN_FILE.write_text(json.dumps(auth_data, indent=2)) 

42 TOKEN_FILE.chmod(0o600) # Restrict permissions 

43 console.print("[green]✓[/green] Authentication successful") 

44 

45 

46def load_token() -> Optional[dict]: 

47 """ 

48 Load authentication token from environment or disk. 

49  

50 Priority: 

51 1. ALPRINA_API_KEY environment variable 

52 2. ~/.alprina/token file 

53  

54 Returns: 

55 dict with token and user info, or None if not authenticated 

56 """ 

57 from .config import get_api_key 

58 

59 # Check environment variable first 

60 api_key = get_api_key() 

61 

62 if api_key: 

63 # Try to load user info from file if it exists 

64 if TOKEN_FILE.exists(): 

65 try: 

66 data = json.loads(TOKEN_FILE.read_text()) 

67 return { 

68 "token": api_key, 

69 "user": data.get("user", {}) 

70 } 

71 except Exception: 

72 pass 

73 

74 # Return just the API key if file doesn't exist 

75 return { 

76 "token": api_key, 

77 "user": {} 

78 } 

79 

80 return None 

81 

82 

83def remove_token(): 

84 """Remove authentication token.""" 

85 if TOKEN_FILE.exists(): 

86 TOKEN_FILE.unlink() 

87 

88 

89def get_backend_url() -> str: 

90 """Get backend URL from environment or use default.""" 

91 return os.getenv("ALPRINA_BACKEND", "https://api.alprina.com/v1") 

92 

93 

94def login_command(api_key: Optional[str] = None, oauth_provider: Optional[str] = None, code: Optional[str] = None): 

95 """ 

96 Handle user login via browser-based OAuth, CLI code, or API key. 

97 """ 

98 console.print(Panel("🔐 Alprina Authentication", style="bold cyan")) 

99 

100 # If CLI code is provided, use the reverse flow 

101 if code: 

102 login_with_cli_code(code) 

103 return 

104 

105 # If API key is provided directly, use it 

106 if api_key: 

107 console.print("Authenticating with API key...") 

108 authenticate_with_api_key(api_key) 

109 return 

110 

111 backend_url = get_backend_url() 

112 

113 # Default: Browser-based OAuth (recommended) 

114 console.print("\n[bold cyan]🌐 Opening browser for authentication...[/bold cyan]") 

115 console.print("[dim]This is the fastest way to get started![/dim]\n") 

116 

117 try: 

118 login_with_browser() 

119 return 

120 except Exception as e: 

121 console.print(f"\n[yellow]⚠️ Browser authentication failed: {e}[/yellow]") 

122 console.print("\n[bold]Alternative authentication methods:[/bold]") 

123 console.print(" [cyan]1.[/cyan] Use dashboard code: [bold]alprina auth login --code YOUR_CODE[/bold]") 

124 console.print(" [cyan]2.[/cyan] Use API key: [bold]alprina auth login --api-key YOUR_KEY[/bold]") 

125 console.print() 

126 console.print("[dim]💡 Get your code from: https://www.alprina.com/dashboard[/dim]") 

127 

128 if Prompt.ask("\nWould you like to try manual authentication now?", choices=["y", "n"], default="n") == "y": 

129 console.print("\n[bold]Choose method:[/bold]") 

130 console.print(" [cyan]1.[/cyan] Dashboard code (quick)") 

131 console.print(" [cyan]2.[/cyan] API key (advanced)") 

132 

133 choice = Prompt.ask("Select option", choices=["1", "2"], default="1") 

134 

135 if choice == "1": 

136 code_input = Prompt.ask("Enter your 6-digit code from dashboard") 

137 login_with_cli_code(code_input) 

138 else: 

139 console.print("\n[yellow]ℹ️ To get your API key:[/yellow]") 

140 console.print(" 1. Visit: [bold cyan]https://www.alprina.com/dashboard?tab=keys[/bold cyan]") 

141 console.print(" 2. Click 'Create New API Key'") 

142 console.print(" 3. Copy your API key") 

143 console.print() 

144 api_key = Prompt.ask("Enter your API key") 

145 authenticate_with_api_key(api_key) 

146 

147def authenticate_with_api_key(api_key: str): 

148 """Authenticate using an API key.""" 

149 backend_url = get_backend_url() 

150 

151 try: 

152 with Progress( 

153 SpinnerColumn(), 

154 TextColumn("[progress.description]{task.description}"), 

155 console=console, 

156 transient=True 

157 ) as progress: 

158 progress.add_task("Verifying API key...", total=None) 

159 

160 response = httpx.get( 

161 f"{backend_url}/auth/me", 

162 headers={"Authorization": f"Bearer {api_key}"}, 

163 timeout=10.0 

164 ) 

165 

166 if response.status_code == 200: 

167 data = response.json() 

168 user_info = data.get("user", {}) 

169 

170 # Save API key and user info 

171 save_token(api_key, user_info) 

172 

173 # Show welcome screen 

174 console.print() 

175 show_welcome(force=True) 

176 

177 elif response.status_code == 401: 

178 raise AuthenticationError() 

179 elif response.status_code == 403: 

180 from .utils.errors import InvalidTierError 

181 raise InvalidTierError("CLI access", "Developer") 

182 

183 else: 

184 error_msg = "Authentication failed" 

185 try: 

186 error_data = response.json() 

187 error_msg = error_data.get('detail', error_msg) 

188 except: 

189 pass 

190 raise APIError(response.status_code, error_msg) 

191 

192 except httpx.ConnectError: 

193 raise NetworkError(f"Could not connect to {backend_url}") 

194 except (AuthenticationError, APIError, NetworkError): 

195 raise # Re-raise our custom errors 

196 except Exception as e: 

197 from .utils.errors import AlprinaError 

198 raise AlprinaError( 

199 message=f"Unexpected error: {e}", 

200 solution="Please try again or contact support@alprina.com" 

201 ) 

202 

203 

204def login_with_cli_code(cli_code: str): 

205 """ 

206 Login using a CLI code from the dashboard (reverse flow). 

207 User gets code from dashboard, enters it here. 

208 """ 

209 backend_url = get_backend_url() 

210 

211 try: 

212 with Progress( 

213 SpinnerColumn(), 

214 TextColumn("[progress.description]{task.description}"), 

215 console=console, 

216 transient=True 

217 ) as progress: 

218 progress.add_task(f"Verifying CLI code: {cli_code}...", total=None) 

219 

220 response = httpx.post( 

221 f"{backend_url}/auth/cli-verify", 

222 json={"cli_code": cli_code.upper()}, 

223 timeout=10.0 

224 ) 

225 

226 if response.status_code == 200: 

227 auth_data = response.json() 

228 api_key = auth_data["api_key"] 

229 user = auth_data["user"] 

230 

231 # Save token 

232 save_token(api_key, user) 

233 

234 # Show welcome screen 

235 console.print() 

236 show_welcome(force=True) 

237 

238 elif response.status_code == 404: 

239 console.print(f"\n[red]✗ Invalid or expired CLI code: {cli_code}[/red]") 

240 console.print("[yellow]Please get a new code from your dashboard.[/yellow]") 

241 

242 elif response.status_code == 400: 

243 console.print(f"\n[red]✗ CLI code has already been used: {cli_code}[/red]") 

244 console.print("[yellow]Please generate a new code from your dashboard.[/yellow]") 

245 

246 else: 

247 console.print(f"\n[red]✗ Failed to verify CLI code: {response.status_code}[/red]") 

248 try: 

249 error_data = response.json() 

250 # Handle both API response formats: {detail: ...} and {error: {message: ...}} 

251 error_message = ( 

252 error_data.get('detail') or 

253 error_data.get('error', {}).get('message') or 

254 'Unknown error' 

255 ) 

256 console.print(f"[red]{error_message}[/red]") 

257 except: 

258 console.print(f"[red]{response.text}[/red]") 

259 

260 except httpx.ConnectError: 

261 console.print(f"[red]✗ Could not connect to Alprina backend at {backend_url}[/red]") 

262 console.print("[yellow]Make sure you have internet connectivity.[/yellow]") 

263 

264 except Exception as e: 

265 console.print(f"[red]✗ Error: {e}[/red]") 

266 

267 

268def logout_command(): 

269 """Handle user logout.""" 

270 if TOKEN_FILE.exists(): 

271 remove_token() 

272 console.print("[green]✓[/green] Logged out successfully") 

273 else: 

274 console.print("[yellow]You are not logged in[/yellow]") 

275 

276 

277def status_command(): 

278 """Show current authentication status.""" 

279 auth_data = load_token() 

280 

281 if auth_data: 

282 user = auth_data.get("user", {}) 

283 api_key = auth_data.get("token", "") 

284 

285 # Show masked API key 

286 if api_key: 

287 masked_key = f"{api_key[:15]}...{api_key[-4:]}" if len(api_key) > 20 else "***" 

288 else: 

289 masked_key = "None" 

290 

291 console.print(Panel( 

292 f"[green]✓ Authenticated[/green]\n\n" 

293 f"Name: {user.get('full_name', 'N/A')}\n" 

294 f"Email: {user.get('email', 'N/A')}\n" 

295 f"Plan: {user.get('tier', 'free').title()}\n" 

296 f"API Key: {masked_key}", 

297 title="Authentication Status" 

298 )) 

299 else: 

300 console.print(Panel( 

301 "[red]✗ Not authenticated[/red]\n\n" 

302 "Run [bold]alprina auth login[/bold] to authenticate", 

303 title="Authentication Status" 

304 )) 

305 

306 

307def get_auth_headers() -> dict: 

308 """Get authentication headers for API requests.""" 

309 auth_data = load_token() 

310 

311 if not auth_data: 

312 console.print("[red]Not authenticated. Run 'alprina auth login' first.[/red]") 

313 raise Exception("Not authenticated") 

314 

315 return {"Authorization": f"Bearer {auth_data['token']}"} 

316 

317 

318def is_authenticated() -> bool: 

319 """Check if user is authenticated.""" 

320 return TOKEN_FILE.exists() and load_token() is not None 

321 

322 

323def login_with_browser(): 

324 """ 

325 Browser-based OAuth flow (like GitHub CLI). 

326 Opens browser for user to authorize, polls for completion. 

327 """ 

328 import webbrowser 

329 import time 

330 

331 backend_url = get_backend_url() 

332 

333 try: 

334 # Step 1: Request device authorization 

335 console.print("\n[cyan]→[/cyan] Requesting device authorization...") 

336 

337 response = httpx.post(f"{backend_url}/auth/device", timeout=10.0) 

338 

339 if response.status_code != 200: 

340 console.print(f"[red]✗ Failed to request authorization: {response.status_code}[/red]") 

341 return 

342 

343 data = response.json() 

344 device_code = data["device_code"] 

345 user_code = data["user_code"] 

346 verification_url = data["verification_url"] 

347 expires_in = data.get("expires_in", 900) 

348 interval = data.get("interval", 5) 

349 

350 # Step 2: Display code and open browser 

351 console.print() 

352 console.print(Panel( 

353 f"[bold yellow]{user_code}[/bold yellow]", 

354 title="🔑 Your Verification Code", 

355 subtitle="Enter this code in your browser" 

356 )) 

357 console.print() 

358 console.print(f"[cyan]→[/cyan] Opening browser to: [dim]{verification_url}[/dim]") 

359 console.print(f"[dim]If browser doesn't open, visit manually[/dim]") 

360 console.print() 

361 

362 # Open browser with code pre-filled (like GitHub CLI) 

363 url_with_code = f"{verification_url}?user_code={user_code}" 

364 try: 

365 webbrowser.open(url_with_code) 

366 except: 

367 console.print("[yellow]⚠️ Could not open browser automatically[/yellow]") 

368 console.print(f"[yellow]Please visit: {url_with_code}[/yellow]") 

369 

370 # Step 3: Poll for authorization with progress indicator 

371 max_attempts = expires_in // interval # Usually 180 attempts (15 minutes) 

372 

373 console.print() 

374 console.print("[dim]Tip: Make sure you're logged into the website first![/dim]") 

375 console.print("[dim]Press Ctrl+C to cancel[/dim]") 

376 console.print() 

377 

378 try: 

379 with Progress( 

380 SpinnerColumn(), 

381 TextColumn("[progress.description]{task.description}"), 

382 console=console, 

383 transient=False 

384 ) as progress: 

385 task = progress.add_task("Waiting for authorization...", total=None) 

386 

387 for attempt in range(max_attempts): 

388 time.sleep(interval) 

389 

390 try: 

391 poll_response = httpx.post( 

392 f"{backend_url}/auth/device/token", 

393 json={"device_code": device_code}, 

394 timeout=10.0 

395 ) 

396 

397 if poll_response.status_code == 200: 

398 # ✓ Authorized! 

399 progress.update(task, description="[green]✓ Authorization successful![/green]") 

400 

401 auth_data = poll_response.json() 

402 api_key = auth_data["api_key"] 

403 user = auth_data["user"] 

404 

405 # Save token 

406 save_token(api_key, user) 

407 

408 # Show welcome screen 

409 console.print() 

410 show_welcome(force=True) 

411 return 

412 

413 elif poll_response.status_code == 400: 

414 error_data = poll_response.json() 

415 error_type = error_data.get("detail", {}) 

416 

417 if isinstance(error_type, dict): 

418 error_code = error_type.get("error") 

419 

420 if error_code == "authorization_pending": 

421 # Still waiting... 

422 console.print(".", end="", style="dim") 

423 continue 

424 elif error_code == "expired_token": 

425 console.print("\n[red]✗ Authorization expired. Please try again.[/red]") 

426 return 

427 else: 

428 console.print(f"\n[red]✗ Error: {error_type}[/red]") 

429 return 

430 

431 except httpx.ReadTimeout: 

432 console.print(".", end="", style="dim") 

433 continue 

434 except httpx.ConnectError: 

435 console.print(f"\n[red]✗ Could not connect to backend[/red]") 

436 return 

437 except Exception as e: 

438 console.print(".", end="", style="dim") 

439 continue 

440 

441 console.print("\n[red]✗ Authorization timed out. Please try again.[/red]") 

442 except KeyboardInterrupt: 

443 console.print("\n\n[yellow]Authorization cancelled by user.[/yellow]") 

444 console.print("[dim]You can try again with: alprina auth login[/dim]") 

445 return 

446 

447 except httpx.ConnectError: 

448 console.print(f"[red]✗ Could not connect to Alprina backend at {backend_url}[/red]") 

449 console.print("[yellow]Make sure the API server is running[/yellow]") 

450 except Exception as e: 

451 console.print(f"[red]✗ Error: {e}[/red]")