Coverage for agentos/server/daemon.py: 22%

347 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 16:01 +0800

1""" 

2AgentOS Server Daemon — Independent production server wrapper. 

3 

4Provides: 

5 - PID file management (start/stop/status/restart) 

6 - Signal handling (SIGTERM/SIGINT) with graceful shutdown 

7 - Background asyncio task queue 

8 - Structured file logging with rotation 

9 - Health check endpoint (/healthz auto-mounted) 

10 - Configuration via environment variables 

11 - Memory persistence (v1.14.9): auto-save on shutdown, auto-load on startup 

12 

13Usage: 

14 agentos-daemon start # start as daemon 

15 agentos-daemon stop # stop running daemon 

16 agentos-daemon status # check if daemon is running 

17 agentos-daemon restart # stop + start 

18 agentos-daemon run # run in foreground (debug) 

19 

20Environment: 

21 AGENTOS_DAEMON_HOST=0.0.0.0 

22 AGENTOS_DAEMON_PORT=8910 

23 AGENTOS_DAEMON_PIDFILE=~/.agentos/daemon.pid 

24 AGENTOS_DAEMON_LOGFILE=~/.agentos/daemon.log 

25 AGENTOS_DAEMON_WORKERS=4 

26 AGENTOS_DAEMON_TIMEOUT=30 # graceful shutdown timeout seconds 

27 AGENTOS_DAEMON_LOG_LEVEL=info 

28 AGENTOS_DAEMON_MEMORY_DIR=~/.agentos/memory # memory persistence dir 

29""" 

30 

31from __future__ import annotations 

32 

33import asyncio 

34import atexit 

35import json 

36import logging 

37import os 

38import signal 

39import sys 

40import time 

41from contextlib import asynccontextmanager 

42from dataclasses import dataclass, field 

43from pathlib import Path 

44from typing import Optional, Callable, Any 

45 

46import uvicorn 

47from fastapi import FastAPI, APIRouter 

48from fastapi.middleware.cors import CORSMiddleware 

49 

50from agentos.memory.persistence import MemoryPersistenceManager 

51 

52__all__ = [ 

53 "ServerDaemon", 

54 "DaemonConfig", 

55 "BackgroundTask", 

56 "get_daemon", 

57 "create_daemon_app", 

58 "daemon_main", 

59] 

60 

61# ── Config ────────────────────────────────────────────── 

62 

63 

64@dataclass 

65class DaemonConfig: 

66 """Server daemon configuration.""" 

67 

68 host: str = "0.0.0.0" 

69 port: int = 8910 

70 pidfile: str = "~/.agentos/daemon.pid" 

71 logfile: str = "~/.agentos/daemon.log" 

72 workers: int = 1 

73 shutdown_timeout: float = 30.0 

74 log_level: str = "info" 

75 log_max_bytes: int = 10 * 1024 * 1024 # 10 MB 

76 log_backup_count: int = 5 

77 memory_dir: str = "~/.agentos/memory" 

78 persist_memory: bool = True 

79 

80 @classmethod 

81 def from_env(cls) -> "DaemonConfig": 

82 """Load config from environment variables.""" 

83 return cls( 

84 host=os.getenv("AGENTOS_DAEMON_HOST", "0.0.0.0"), 

85 port=int(os.getenv("AGENTOS_DAEMON_PORT", "8910")), 

86 pidfile=os.path.expanduser( 

87 os.getenv("AGENTOS_DAEMON_PIDFILE", "~/.agentos/daemon.pid") 

88 ), 

89 logfile=os.path.expanduser( 

90 os.getenv("AGENTOS_DAEMON_LOGFILE", "~/.agentos/daemon.log") 

91 ), 

92 workers=int(os.getenv("AGENTOS_DAEMON_WORKERS", "1")), 

93 shutdown_timeout=float(os.getenv("AGENTOS_DAEMON_TIMEOUT", "30")), 

94 log_level=os.getenv("AGENTOS_DAEMON_LOG_LEVEL", "info"), 

95 memory_dir=os.path.expanduser( 

96 os.getenv("AGENTOS_DAEMON_MEMORY_DIR", "~/.agentos/memory") 

97 ), 

98 persist_memory=os.getenv("AGENTOS_DAEMON_PERSIST_MEMORY", "true").lower() 

99 not in ("0", "false", "no"), 

100 ) 

101 

102 

103# ── Background Task Queue ────────────────────────────── 

104 

105 

106@dataclass 

107class BackgroundTask: 

108 """A tracked background task.""" 

109 

110 task_id: str 

111 name: str 

112 created_at: float = field(default_factory=time.time) 

113 status: str = "pending" 

114 result: Any = None 

115 error: str | None = None 

116 

117 def to_dict(self) -> dict: 

118 return { 

119 "task_id": self.task_id, 

120 "name": self.name, 

121 "created_at": self.created_at, 

122 "status": self.status, 

123 "result": str(self.result)[:500] if self.result is not None else None, 

124 "error": self.error, 

125 } 

126 

127 

128class TaskQueue: 

129 """Minimal async background task queue (in-process, no external broker).""" 

130 

131 def __init__(self, max_history: int = 1000) -> None: 

132 self._tasks: dict[str, BackgroundTask] = {} 

133 self._max_history = max_history 

134 self._asyncio_tasks: dict[str, asyncio.Task] = {} 

135 self._counter = 0 

136 

137 def submit(self, name: str, coro) -> str: 

138 """Submit a coroutine for background execution. Returns task_id.""" 

139 self._counter += 1 

140 tid = f"task_{self._counter}_{int(time.time())}" 

141 bt = BackgroundTask(task_id=tid, name=name, status="running") 

142 bt.created_at = time.time() 

143 self._tasks[tid] = bt 

144 

145 async def _runner(): 

146 try: 

147 bt.result = await coro 

148 bt.status = "completed" 

149 except Exception as exc: 

150 bt.error = str(exc) 

151 bt.status = "failed" 

152 

153 self._asyncio_tasks[tid] = asyncio.create_task(_runner()) 

154 self._prune_history() 

155 return tid 

156 

157 def get(self, task_id: str) -> BackgroundTask | None: 

158 return self._tasks.get(task_id) 

159 

160 def list_tasks(self, limit: int = 50) -> list[BackgroundTask]: 

161 items = sorted( 

162 self._tasks.values(), key=lambda t: t.created_at, reverse=True 

163 ) 

164 return items[:limit] 

165 

166 def active_count(self) -> int: 

167 return sum(1 for t in self._tasks.values() if t.status == "running") 

168 

169 def _prune_history(self) -> None: 

170 if len(self._tasks) > self._max_history: 

171 completed = [ 

172 tid for tid, t in self._tasks.items() if t.status in ("completed", "failed") 

173 ] 

174 overflow = len(self._tasks) - self._max_history 

175 for tid in completed[:overflow]: 

176 del self._tasks[tid] 

177 self._asyncio_tasks.pop(tid, None) 

178 

179 async def shutdown(self, timeout: float = 10.0) -> None: 

180 """Cancel all running tasks with a timeout.""" 

181 if not self._asyncio_tasks: 

182 return 

183 for t in list(self._asyncio_tasks.values()): 

184 if not t.done(): 

185 t.cancel() 

186 try: 

187 await asyncio.wait_for( 

188 asyncio.gather(*self._asyncio_tasks.values(), return_exceptions=True), 

189 timeout=timeout, 

190 ) 

191 except asyncio.TimeoutError: 

192 pass 

193 

194 

195# ── Daemon Core ──────────────────────────────────────── 

196 

197 

198def _setup_file_logging(config: DaemonConfig) -> logging.Logger: 

199 """Configure rotating file logger.""" 

200 from logging.handlers import RotatingFileHandler 

201 

202 log_dir = Path(config.logfile).parent 

203 log_dir.mkdir(parents=True, exist_ok=True) 

204 

205 logger = logging.getLogger("agentos.daemon") 

206 logger.setLevel(getattr(logging, config.log_level.upper(), logging.INFO)) 

207 logger.propagate = False 

208 

209 # Remove existing handlers 

210 for h in list(logger.handlers): 

211 logger.removeHandler(h) 

212 

213 handler = RotatingFileHandler( 

214 config.logfile, 

215 maxBytes=config.log_max_bytes, 

216 backupCount=config.log_backup_count, 

217 encoding="utf-8", 

218 ) 

219 handler.setFormatter( 

220 logging.Formatter( 

221 "%(asctime)s [%(levelname)s] %(name)s: %(message)s", 

222 datefmt="%Y-%m-%d %H:%M:%S", 

223 ) 

224 ) 

225 logger.addHandler(handler) 

226 return logger 

227 

228 

229class ServerDaemon: 

230 """Independent server daemon wrapping any ASGI app. 

231 

232 Lifecycle: 

233 daemon = ServerDaemon(config, app_factory) 

234 daemon.start() # daemonize 

235 daemon.stop() # send SIGTERM 

236 daemon.status() # read PID file 

237 daemon.restart() # stop + start 

238 """ 

239 

240 def __init__( 

241 self, 

242 config: DaemonConfig | None = None, 

243 app_factory: Callable[[], FastAPI] | None = None, 

244 ) -> None: 

245 self.config = config or DaemonConfig.from_env() 

246 self._app_factory = app_factory or self._default_app_factory 

247 self._logger = _setup_file_logging(self.config) 

248 self.task_queue = TaskQueue() 

249 self._started_at: float | None = None 

250 self._shutdown_event: asyncio.Event | None = None 

251 

252 # Memory persistence (v1.14.9) 

253 self._persistence_mgr = MemoryPersistenceManager( 

254 base_dir=self.config.memory_dir, 

255 compress=True, 

256 ) 

257 self._memory_objects: dict[str, Any] = { 

258 "pyramid": None, 

259 "working": None, 

260 "conversation": None, 

261 "long_term": None, 

262 "reflection_engine": None, 

263 "consolidation_pipeline": None, 

264 } 

265 

266 # ── Memory Persistence API (v1.14.9) ────── 

267 

268 def register_memory( 

269 self, 

270 *, 

271 pyramid: Any = None, 

272 working: Any = None, 

273 conversation: Any = None, 

274 long_term: Any = None, 

275 reflection_engine: Any = None, 

276 consolidation_pipeline: Any = None, 

277 ) -> None: 

278 """Register memory objects for crash-safe persistence. 

279 

280 Objects must implement get_state() and restore_state(). 

281 On daemon shutdown, all registered objects are automatically saved 

282 to disk. On daemon startup, they are automatically restored. 

283 """ 

284 if pyramid is not None: 

285 self._memory_objects["pyramid"] = pyramid 

286 if working is not None: 

287 self._memory_objects["working"] = working 

288 if conversation is not None: 

289 self._memory_objects["conversation"] = conversation 

290 if long_term is not None: 

291 self._memory_objects["long_term"] = long_term 

292 if reflection_engine is not None: 

293 self._memory_objects["reflection_engine"] = reflection_engine 

294 if consolidation_pipeline is not None: 

295 self._memory_objects["consolidation_pipeline"] = consolidation_pipeline 

296 

297 self._logger.info( 

298 f"Registered {sum(1 for v in self._memory_objects.values() if v is not None)} " 

299 f"memory subsystems for persistence" 

300 ) 

301 

302 async def save_memory_snapshot(self) -> str | None: 

303 """Save current memory state to disk. Returns snapshot path or None.""" 

304 if not self.config.persist_memory: 

305 return None 

306 

307 mo = self._memory_objects 

308 try: 

309 path = await self._persistence_mgr.save_all( 

310 pyramid=mo["pyramid"], 

311 working=mo["working"], 

312 conversation=mo["conversation"], 

313 long_term=mo["long_term"], 

314 reflection_engine=mo["reflection_engine"], 

315 consolidation_pipeline=mo["consolidation_pipeline"], 

316 ) 

317 self._logger.info(f"Memory snapshot saved: {path}") 

318 return path 

319 except Exception as exc: 

320 self._logger.error(f"Failed to save memory snapshot: {exc}") 

321 return None 

322 

323 async def load_memory_snapshot(self) -> int: 

324 """Load memory state from disk and restore into registered objects. 

325 Returns count of subsystems restored. 

326 """ 

327 if not self.config.persist_memory: 

328 return 0 

329 

330 mo = self._memory_objects 

331 try: 

332 restored = await self._persistence_mgr.restore_all( 

333 pyramid=mo["pyramid"], 

334 working=mo["working"], 

335 conversation=mo["conversation"], 

336 long_term=mo["long_term"], 

337 reflection_engine=mo["reflection_engine"], 

338 consolidation_pipeline=mo["consolidation_pipeline"], 

339 ) 

340 if restored > 0: 

341 self._logger.info(f"Memory snapshot loaded: {restored} subsystems restored") 

342 return restored 

343 except Exception as exc: 

344 self._logger.error(f"Failed to load memory snapshot: {exc}") 

345 return 0 

346 

347 def memory_snapshot_info(self) -> dict[str, Any]: 

348 """Return metadata about the current memory snapshot on disk.""" 

349 return self._persistence_mgr.snapshot_info() 

350 

351 # ── PID file helpers ────────────────────── 

352 

353 def _read_pid(self) -> int | None: 

354 """Read PID from pidfile. Returns None if not running.""" 

355 path = Path(self.config.pidfile) 

356 if not path.exists(): 

357 return None 

358 try: 

359 pid = int(path.read_text().strip()) 

360 except (ValueError, OSError): 

361 return None 

362 # Check if process is actually running 

363 try: 

364 os.kill(pid, 0) 

365 return pid 

366 except (ProcessLookupError, PermissionError): 

367 return None 

368 

369 def _write_pid(self, pid: int) -> None: 

370 """Write PID to pidfile.""" 

371 path = Path(self.config.pidfile) 

372 path.parent.mkdir(parents=True, exist_ok=True) 

373 path.write_text(f"{pid}\n") 

374 

375 def _remove_pid(self) -> None: 

376 """Remove pidfile.""" 

377 path = Path(self.config.pidfile) 

378 if path.exists(): 

379 path.unlink() 

380 

381 # ── Status ──────────────────────────────── 

382 

383 def status(self) -> dict: 

384 """Get daemon status as a dict.""" 

385 pid = self._read_pid() 

386 running = pid is not None 

387 uptime = time.time() - self._started_at if self._started_at and running else 0 

388 return { 

389 "running": running, 

390 "pid": pid, 

391 "host": self.config.host, 

392 "port": self.config.port, 

393 "pidfile": self.config.pidfile, 

394 "logfile": self.config.logfile, 

395 "uptime_seconds": round(uptime, 1), 

396 "active_tasks": self.task_queue.active_count() if running else 0, 

397 } 

398 

399 # ── Start ───────────────────────────────── 

400 

401 def start(self, daemonize: bool = True) -> int: 

402 """Start the server. Returns PID if daemonized, 0 if foreground.""" 

403 if self._read_pid(): 

404 self._logger.warning("Daemon is already running.") 

405 print( 

406 f"Daemon already running (pid={self._read_pid()}) on " 

407 f"http://{self.config.host}:{self.config.port}" 

408 ) 

409 return self._read_pid() or 0 

410 

411 if daemonize: 

412 return self._daemonize() 

413 else: 

414 return self._run_foreground() 

415 

416 def _daemonize(self) -> int: 

417 """Fork into background daemon.""" 

418 pid = os.fork() 

419 if pid > 0: 

420 # Parent: wait briefly for child to start, then return 

421 time.sleep(0.5) 

422 child_pid = self._read_pid() 

423 if child_pid: 

424 self._logger.info(f"Daemon started (pid={child_pid})") 

425 print( 

426 f"Daemon started (pid={child_pid})\n" 

427 f" http://{self.config.host}:{self.config.port}\n" 

428 f" health: http://{self.config.host}:{self.config.port}/healthz\n" 

429 f" logs: {self.config.logfile}" 

430 ) 

431 return child_pid 

432 else: 

433 print("Failed to start daemon — check logs.") 

434 return 1 

435 

436 # Child process 

437 os.setsid() 

438 # Second fork to detach from session 

439 pid2 = os.fork() 

440 if pid2 > 0: 

441 os._exit(0) 

442 

443 # Grandchild: the actual daemon 

444 self._write_pid(os.getpid()) 

445 atexit.register(self._remove_pid) 

446 

447 # Redirect stdin/stdout/stderr 

448 devnull = os.open(os.devnull, os.O_RDWR) 

449 os.dup2(devnull, sys.stdin.fileno()) 

450 os.dup2(devnull, sys.stdout.fileno()) 

451 os.dup2(devnull, sys.stderr.fileno()) 

452 if devnull > 2: 

453 os.close(devnull) 

454 

455 self._started_at = time.time() 

456 self._logger.info( 

457 f"Daemon starting on http://{self.config.host}:{self.config.port}" 

458 ) 

459 self._run_server() 

460 return 0 

461 

462 def _run_foreground(self) -> int: 

463 """Run in foreground (debug mode).""" 

464 self._write_pid(os.getpid()) 

465 atexit.register(self._remove_pid) 

466 self._started_at = time.time() 

467 print( 

468 f"Running in foreground on http://{self.config.host}:{self.config.port}\n" 

469 f" health: http://{self.config.host}:{self.config.port}/healthz\n" 

470 f" press Ctrl+C to stop" 

471 ) 

472 self._run_server() 

473 return 0 

474 

475 def _run_server(self) -> None: 

476 """Run the uvicorn server (blocking).""" 

477 app = self._build_app() 

478 uvicorn.run( 

479 app, 

480 host=self.config.host, 

481 port=self.config.port, 

482 log_level=self.config.log_level, 

483 workers=self.config.workers if self.config.workers > 1 else None, 

484 timeout_graceful_shutdown=self.config.shutdown_timeout, 

485 ) 

486 

487 # ── Stop ────────────────────────────────── 

488 

489 def stop(self) -> bool: 

490 """Stop the running daemon via SIGTERM.""" 

491 pid = self._read_pid() 

492 if not pid: 

493 print("No running daemon found.") 

494 return False 

495 

496 self._logger.info(f"Stopping daemon (pid={pid})") 

497 print(f"Stopping daemon (pid={pid})...") 

498 try: 

499 os.kill(pid, signal.SIGTERM) 

500 except ProcessLookupError: 

501 self._remove_pid() 

502 print("Daemon already stopped.") 

503 return True 

504 

505 # Wait for graceful shutdown 

506 timeout = self.config.shutdown_timeout 

507 for _ in range(int(timeout * 2)): 

508 time.sleep(0.5) 

509 try: 

510 os.kill(pid, 0) 

511 except ProcessLookupError: 

512 self._remove_pid() 

513 print("Daemon stopped.") 

514 return True 

515 

516 # Force kill 

517 print(f"Daemon did not stop within {timeout}s, sending SIGKILL...") 

518 try: 

519 os.kill(pid, signal.SIGKILL) 

520 except ProcessLookupError: 

521 pass 

522 self._remove_pid() 

523 print("Daemon force-stopped.") 

524 return True 

525 

526 # ── Restart ─────────────────────────────── 

527 

528 def restart(self, daemonize: bool = True) -> int: 

529 """Stop then start the daemon.""" 

530 self.stop() 

531 time.sleep(1) 

532 return self.start(daemonize=daemonize) 

533 

534 # ── App building ────────────────────────── 

535 

536 def _default_app_factory(self) -> FastAPI: 

537 """Default app: minimal standalone with health check.""" 

538 return create_daemon_app(self.task_queue) 

539 

540 def _build_app(self) -> FastAPI: 

541 """Build the FastAPI application with memory persistence hooks.""" 

542 app = self._app_factory() 

543 

544 # Inject daemon state into app 

545 app.state.daemon = self 

546 app.state.task_queue = self.task_queue 

547 

548 # Patch lifespan to add memory persistence hooks (v1.14.9) 

549 _original_lifespan = getattr(app.router, "lifespan_context", None) 

550 

551 @asynccontextmanager 

552 async def memory_lifespan(app: FastAPI): 

553 """Wrap existing lifespan with memory save/load.""" 

554 # Load on startup 

555 restored = await self.load_memory_snapshot() 

556 if restored > 0: 

557 self._logger.info(f"Loaded {restored} memory subsystems from snapshot") 

558 

559 # Execute original lifespan 

560 if _original_lifespan is not None: 

561 async with _original_lifespan(app): 

562 pass 

563 

564 # Yield to the app 

565 yield 

566 

567 # Save on shutdown 

568 await self.save_memory_snapshot() 

569 self._logger.info("Memory snapshot saved on shutdown") 

570 

571 app.router.lifespan_context = memory_lifespan 

572 

573 # Ensure /healthz endpoint exists 

574 has_healthz = any( 

575 any(r.path == "/healthz" for r in router.routes) 

576 for router in app.router.routes 

577 if hasattr(router, "routes") 

578 ) 

579 # Also check top-level routes 

580 has_healthz = has_healthz or any( 

581 getattr(r, "path", "") == "/healthz" for r in app.routes 

582 ) 

583 

584 if not has_healthz: 

585 health_router = _make_health_router(self, self.task_queue) 

586 app.include_router(health_router) 

587 

588 return app 

589 

590 

591# ── Health / API routes ───────────────────────────────── 

592 

593 

594def _make_health_router(daemon: ServerDaemon, tq: TaskQueue) -> APIRouter: 

595 """Create the health check and management router.""" 

596 router = APIRouter(tags=["daemon"]) 

597 

598 @router.get("/healthz") 

599 async def healthz(): 

600 """Kubernetes-style health check.""" 

601 return { 

602 "status": "healthy", 

603 "uptime_seconds": round(time.time() - (daemon._started_at or time.time()), 1), 

604 "active_tasks": tq.active_count(), 

605 } 

606 

607 @router.get("/healthz/ready") 

608 async def ready(): 

609 """Readiness check.""" 

610 return { 

611 "status": "ready", 

612 "active_tasks": tq.active_count(), 

613 } 

614 

615 @router.get("/api/daemon/status") 

616 async def daemon_status(): 

617 """Full daemon status.""" 

618 return daemon.status() 

619 

620 @router.get("/api/daemon/tasks") 

621 async def list_tasks(limit: int = 50): 

622 """List background tasks.""" 

623 tasks = [t.to_dict() for t in tq.list_tasks(limit)] 

624 return {"count": len(tasks), "active": tq.active_count(), "tasks": tasks} 

625 

626 @router.get("/api/daemon/memory") 

627 async def memory_status(): 

628 """Memory persistence status.""" 

629 info = daemon.memory_snapshot_info() 

630 return { 

631 "persistence_enabled": daemon.config.persist_memory, 

632 "memory_dir": daemon.config.memory_dir, 

633 "snapshot": info, 

634 } 

635 

636 return router 

637 

638 

639# ── Convenience factory ──────────────────────────────── 

640 

641 

642def create_daemon_app(task_queue: TaskQueue | None = None) -> FastAPI: 

643 """Create a minimal standalone daemon FastAPI app.""" 

644 tq = task_queue or TaskQueue() 

645 

646 @asynccontextmanager 

647 async def lifespan(app: FastAPI): 

648 """Handle startup and shutdown.""" 

649 yield 

650 await tq.shutdown(timeout=10.0) 

651 

652 app = FastAPI( 

653 title="AgentOS Daemon", 

654 version="1.14.9", 

655 lifespan=lifespan, 

656 ) 

657 app.add_middleware( 

658 CORSMiddleware, 

659 allow_origins=["*"], 

660 allow_credentials=True, 

661 allow_methods=["*"], 

662 allow_headers=["*"], 

663 ) 

664 

665 @app.get("/") 

666 async def root(): 

667 return { 

668 "service": "AgentOS Daemon", 

669 "version": "1.14.9", 

670 "endpoints": { 

671 "health": "/healthz", 

672 "ready": "/healthz/ready", 

673 "status": "/api/daemon/status", 

674 "tasks": "/api/daemon/tasks", 

675 }, 

676 } 

677 

678 return app 

679 

680 

681# ── Module-level singleton ───────────────────────────── 

682 

683_daemon_instance: ServerDaemon | None = None 

684 

685 

686def get_daemon(config: DaemonConfig | None = None) -> ServerDaemon: 

687 """Get or create the module-level daemon singleton.""" 

688 global _daemon_instance 

689 if _daemon_instance is None: 

690 _daemon_instance = ServerDaemon(config=config) 

691 return _daemon_instance 

692 

693 

694# ── Main entry point ────────────────────────────────── 

695 

696 

697def daemon_main(args: list[str] | None = None) -> int: 

698 """CLI entry point for daemon commands.""" 

699 import argparse 

700 

701 parser = argparse.ArgumentParser( 

702 prog="agentos-daemon", 

703 description="AgentOS Independent Server Daemon", 

704 ) 

705 sub = parser.add_subparsers(dest="command", required=True) 

706 

707 sub.add_parser("start", help="Start daemon in background") 

708 sub.add_parser("run", help="Run in foreground") 

709 sub.add_parser("stop", help="Stop running daemon") 

710 sub.add_parser("restart", help="Stop then start") 

711 sub.add_parser("status", help="Show daemon status") 

712 

713 ns = parser.parse_args(args) 

714 

715 daemon = get_daemon() 

716 

717 if ns.command == "start": 

718 return daemon.start(daemonize=True) 

719 elif ns.command == "run": 

720 return daemon.start(daemonize=False) 

721 elif ns.command == "stop": 

722 return 0 if daemon.stop() else 1 

723 elif ns.command == "restart": 

724 return daemon.restart() 

725 elif ns.command == "status": 

726 s = daemon.status() 

727 if s["running"]: 

728 print( 

729 f"RUNNING (pid={s['pid']})\n" 

730 f" url: http://{s['host']}:{s['port']}\n" 

731 f" uptime: {s['uptime_seconds']}s\n" 

732 f" tasks: {s['active_tasks']} active" 

733 ) 

734 else: 

735 print("STOPPED") 

736 return 0 

737 

738 return 1 

739 

740 

741if __name__ == "__main__": 

742 sys.exit(daemon_main())