Coverage for agentos/server/daemon.py: 22%
347 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:01 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 16:01 +0800
1"""
2AgentOS Server Daemon — Independent production server wrapper.
4Provides:
5 - PID file management (start/stop/status/restart)
6 - Signal handling (SIGTERM/SIGINT) with graceful shutdown
7 - Background asyncio task queue
8 - Structured file logging with rotation
9 - Health check endpoint (/healthz auto-mounted)
10 - Configuration via environment variables
11 - Memory persistence (v1.14.9): auto-save on shutdown, auto-load on startup
13Usage:
14 agentos-daemon start # start as daemon
15 agentos-daemon stop # stop running daemon
16 agentos-daemon status # check if daemon is running
17 agentos-daemon restart # stop + start
18 agentos-daemon run # run in foreground (debug)
20Environment:
21 AGENTOS_DAEMON_HOST=0.0.0.0
22 AGENTOS_DAEMON_PORT=8910
23 AGENTOS_DAEMON_PIDFILE=~/.agentos/daemon.pid
24 AGENTOS_DAEMON_LOGFILE=~/.agentos/daemon.log
25 AGENTOS_DAEMON_WORKERS=4
26 AGENTOS_DAEMON_TIMEOUT=30 # graceful shutdown timeout seconds
27 AGENTOS_DAEMON_LOG_LEVEL=info
28 AGENTOS_DAEMON_MEMORY_DIR=~/.agentos/memory # memory persistence dir
29"""
31from __future__ import annotations
33import asyncio
34import atexit
35import json
36import logging
37import os
38import signal
39import sys
40import time
41from contextlib import asynccontextmanager
42from dataclasses import dataclass, field
43from pathlib import Path
44from typing import Optional, Callable, Any
46import uvicorn
47from fastapi import FastAPI, APIRouter
48from fastapi.middleware.cors import CORSMiddleware
50from agentos.memory.persistence import MemoryPersistenceManager
52__all__ = [
53 "ServerDaemon",
54 "DaemonConfig",
55 "BackgroundTask",
56 "get_daemon",
57 "create_daemon_app",
58 "daemon_main",
59]
61# ── Config ──────────────────────────────────────────────
64@dataclass
65class DaemonConfig:
66 """Server daemon configuration."""
68 host: str = "0.0.0.0"
69 port: int = 8910
70 pidfile: str = "~/.agentos/daemon.pid"
71 logfile: str = "~/.agentos/daemon.log"
72 workers: int = 1
73 shutdown_timeout: float = 30.0
74 log_level: str = "info"
75 log_max_bytes: int = 10 * 1024 * 1024 # 10 MB
76 log_backup_count: int = 5
77 memory_dir: str = "~/.agentos/memory"
78 persist_memory: bool = True
80 @classmethod
81 def from_env(cls) -> "DaemonConfig":
82 """Load config from environment variables."""
83 return cls(
84 host=os.getenv("AGENTOS_DAEMON_HOST", "0.0.0.0"),
85 port=int(os.getenv("AGENTOS_DAEMON_PORT", "8910")),
86 pidfile=os.path.expanduser(
87 os.getenv("AGENTOS_DAEMON_PIDFILE", "~/.agentos/daemon.pid")
88 ),
89 logfile=os.path.expanduser(
90 os.getenv("AGENTOS_DAEMON_LOGFILE", "~/.agentos/daemon.log")
91 ),
92 workers=int(os.getenv("AGENTOS_DAEMON_WORKERS", "1")),
93 shutdown_timeout=float(os.getenv("AGENTOS_DAEMON_TIMEOUT", "30")),
94 log_level=os.getenv("AGENTOS_DAEMON_LOG_LEVEL", "info"),
95 memory_dir=os.path.expanduser(
96 os.getenv("AGENTOS_DAEMON_MEMORY_DIR", "~/.agentos/memory")
97 ),
98 persist_memory=os.getenv("AGENTOS_DAEMON_PERSIST_MEMORY", "true").lower()
99 not in ("0", "false", "no"),
100 )
103# ── Background Task Queue ──────────────────────────────
106@dataclass
107class BackgroundTask:
108 """A tracked background task."""
110 task_id: str
111 name: str
112 created_at: float = field(default_factory=time.time)
113 status: str = "pending"
114 result: Any = None
115 error: str | None = None
117 def to_dict(self) -> dict:
118 return {
119 "task_id": self.task_id,
120 "name": self.name,
121 "created_at": self.created_at,
122 "status": self.status,
123 "result": str(self.result)[:500] if self.result is not None else None,
124 "error": self.error,
125 }
128class TaskQueue:
129 """Minimal async background task queue (in-process, no external broker)."""
131 def __init__(self, max_history: int = 1000) -> None:
132 self._tasks: dict[str, BackgroundTask] = {}
133 self._max_history = max_history
134 self._asyncio_tasks: dict[str, asyncio.Task] = {}
135 self._counter = 0
137 def submit(self, name: str, coro) -> str:
138 """Submit a coroutine for background execution. Returns task_id."""
139 self._counter += 1
140 tid = f"task_{self._counter}_{int(time.time())}"
141 bt = BackgroundTask(task_id=tid, name=name, status="running")
142 bt.created_at = time.time()
143 self._tasks[tid] = bt
145 async def _runner():
146 try:
147 bt.result = await coro
148 bt.status = "completed"
149 except Exception as exc:
150 bt.error = str(exc)
151 bt.status = "failed"
153 self._asyncio_tasks[tid] = asyncio.create_task(_runner())
154 self._prune_history()
155 return tid
157 def get(self, task_id: str) -> BackgroundTask | None:
158 return self._tasks.get(task_id)
160 def list_tasks(self, limit: int = 50) -> list[BackgroundTask]:
161 items = sorted(
162 self._tasks.values(), key=lambda t: t.created_at, reverse=True
163 )
164 return items[:limit]
166 def active_count(self) -> int:
167 return sum(1 for t in self._tasks.values() if t.status == "running")
169 def _prune_history(self) -> None:
170 if len(self._tasks) > self._max_history:
171 completed = [
172 tid for tid, t in self._tasks.items() if t.status in ("completed", "failed")
173 ]
174 overflow = len(self._tasks) - self._max_history
175 for tid in completed[:overflow]:
176 del self._tasks[tid]
177 self._asyncio_tasks.pop(tid, None)
179 async def shutdown(self, timeout: float = 10.0) -> None:
180 """Cancel all running tasks with a timeout."""
181 if not self._asyncio_tasks:
182 return
183 for t in list(self._asyncio_tasks.values()):
184 if not t.done():
185 t.cancel()
186 try:
187 await asyncio.wait_for(
188 asyncio.gather(*self._asyncio_tasks.values(), return_exceptions=True),
189 timeout=timeout,
190 )
191 except asyncio.TimeoutError:
192 pass
195# ── Daemon Core ────────────────────────────────────────
198def _setup_file_logging(config: DaemonConfig) -> logging.Logger:
199 """Configure rotating file logger."""
200 from logging.handlers import RotatingFileHandler
202 log_dir = Path(config.logfile).parent
203 log_dir.mkdir(parents=True, exist_ok=True)
205 logger = logging.getLogger("agentos.daemon")
206 logger.setLevel(getattr(logging, config.log_level.upper(), logging.INFO))
207 logger.propagate = False
209 # Remove existing handlers
210 for h in list(logger.handlers):
211 logger.removeHandler(h)
213 handler = RotatingFileHandler(
214 config.logfile,
215 maxBytes=config.log_max_bytes,
216 backupCount=config.log_backup_count,
217 encoding="utf-8",
218 )
219 handler.setFormatter(
220 logging.Formatter(
221 "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
222 datefmt="%Y-%m-%d %H:%M:%S",
223 )
224 )
225 logger.addHandler(handler)
226 return logger
229class ServerDaemon:
230 """Independent server daemon wrapping any ASGI app.
232 Lifecycle:
233 daemon = ServerDaemon(config, app_factory)
234 daemon.start() # daemonize
235 daemon.stop() # send SIGTERM
236 daemon.status() # read PID file
237 daemon.restart() # stop + start
238 """
240 def __init__(
241 self,
242 config: DaemonConfig | None = None,
243 app_factory: Callable[[], FastAPI] | None = None,
244 ) -> None:
245 self.config = config or DaemonConfig.from_env()
246 self._app_factory = app_factory or self._default_app_factory
247 self._logger = _setup_file_logging(self.config)
248 self.task_queue = TaskQueue()
249 self._started_at: float | None = None
250 self._shutdown_event: asyncio.Event | None = None
252 # Memory persistence (v1.14.9)
253 self._persistence_mgr = MemoryPersistenceManager(
254 base_dir=self.config.memory_dir,
255 compress=True,
256 )
257 self._memory_objects: dict[str, Any] = {
258 "pyramid": None,
259 "working": None,
260 "conversation": None,
261 "long_term": None,
262 "reflection_engine": None,
263 "consolidation_pipeline": None,
264 }
266 # ── Memory Persistence API (v1.14.9) ──────
268 def register_memory(
269 self,
270 *,
271 pyramid: Any = None,
272 working: Any = None,
273 conversation: Any = None,
274 long_term: Any = None,
275 reflection_engine: Any = None,
276 consolidation_pipeline: Any = None,
277 ) -> None:
278 """Register memory objects for crash-safe persistence.
280 Objects must implement get_state() and restore_state().
281 On daemon shutdown, all registered objects are automatically saved
282 to disk. On daemon startup, they are automatically restored.
283 """
284 if pyramid is not None:
285 self._memory_objects["pyramid"] = pyramid
286 if working is not None:
287 self._memory_objects["working"] = working
288 if conversation is not None:
289 self._memory_objects["conversation"] = conversation
290 if long_term is not None:
291 self._memory_objects["long_term"] = long_term
292 if reflection_engine is not None:
293 self._memory_objects["reflection_engine"] = reflection_engine
294 if consolidation_pipeline is not None:
295 self._memory_objects["consolidation_pipeline"] = consolidation_pipeline
297 self._logger.info(
298 f"Registered {sum(1 for v in self._memory_objects.values() if v is not None)} "
299 f"memory subsystems for persistence"
300 )
302 async def save_memory_snapshot(self) -> str | None:
303 """Save current memory state to disk. Returns snapshot path or None."""
304 if not self.config.persist_memory:
305 return None
307 mo = self._memory_objects
308 try:
309 path = await self._persistence_mgr.save_all(
310 pyramid=mo["pyramid"],
311 working=mo["working"],
312 conversation=mo["conversation"],
313 long_term=mo["long_term"],
314 reflection_engine=mo["reflection_engine"],
315 consolidation_pipeline=mo["consolidation_pipeline"],
316 )
317 self._logger.info(f"Memory snapshot saved: {path}")
318 return path
319 except Exception as exc:
320 self._logger.error(f"Failed to save memory snapshot: {exc}")
321 return None
323 async def load_memory_snapshot(self) -> int:
324 """Load memory state from disk and restore into registered objects.
325 Returns count of subsystems restored.
326 """
327 if not self.config.persist_memory:
328 return 0
330 mo = self._memory_objects
331 try:
332 restored = await self._persistence_mgr.restore_all(
333 pyramid=mo["pyramid"],
334 working=mo["working"],
335 conversation=mo["conversation"],
336 long_term=mo["long_term"],
337 reflection_engine=mo["reflection_engine"],
338 consolidation_pipeline=mo["consolidation_pipeline"],
339 )
340 if restored > 0:
341 self._logger.info(f"Memory snapshot loaded: {restored} subsystems restored")
342 return restored
343 except Exception as exc:
344 self._logger.error(f"Failed to load memory snapshot: {exc}")
345 return 0
347 def memory_snapshot_info(self) -> dict[str, Any]:
348 """Return metadata about the current memory snapshot on disk."""
349 return self._persistence_mgr.snapshot_info()
351 # ── PID file helpers ──────────────────────
353 def _read_pid(self) -> int | None:
354 """Read PID from pidfile. Returns None if not running."""
355 path = Path(self.config.pidfile)
356 if not path.exists():
357 return None
358 try:
359 pid = int(path.read_text().strip())
360 except (ValueError, OSError):
361 return None
362 # Check if process is actually running
363 try:
364 os.kill(pid, 0)
365 return pid
366 except (ProcessLookupError, PermissionError):
367 return None
369 def _write_pid(self, pid: int) -> None:
370 """Write PID to pidfile."""
371 path = Path(self.config.pidfile)
372 path.parent.mkdir(parents=True, exist_ok=True)
373 path.write_text(f"{pid}\n")
375 def _remove_pid(self) -> None:
376 """Remove pidfile."""
377 path = Path(self.config.pidfile)
378 if path.exists():
379 path.unlink()
381 # ── Status ────────────────────────────────
383 def status(self) -> dict:
384 """Get daemon status as a dict."""
385 pid = self._read_pid()
386 running = pid is not None
387 uptime = time.time() - self._started_at if self._started_at and running else 0
388 return {
389 "running": running,
390 "pid": pid,
391 "host": self.config.host,
392 "port": self.config.port,
393 "pidfile": self.config.pidfile,
394 "logfile": self.config.logfile,
395 "uptime_seconds": round(uptime, 1),
396 "active_tasks": self.task_queue.active_count() if running else 0,
397 }
399 # ── Start ─────────────────────────────────
401 def start(self, daemonize: bool = True) -> int:
402 """Start the server. Returns PID if daemonized, 0 if foreground."""
403 if self._read_pid():
404 self._logger.warning("Daemon is already running.")
405 print(
406 f"Daemon already running (pid={self._read_pid()}) on "
407 f"http://{self.config.host}:{self.config.port}"
408 )
409 return self._read_pid() or 0
411 if daemonize:
412 return self._daemonize()
413 else:
414 return self._run_foreground()
416 def _daemonize(self) -> int:
417 """Fork into background daemon."""
418 pid = os.fork()
419 if pid > 0:
420 # Parent: wait briefly for child to start, then return
421 time.sleep(0.5)
422 child_pid = self._read_pid()
423 if child_pid:
424 self._logger.info(f"Daemon started (pid={child_pid})")
425 print(
426 f"Daemon started (pid={child_pid})\n"
427 f" http://{self.config.host}:{self.config.port}\n"
428 f" health: http://{self.config.host}:{self.config.port}/healthz\n"
429 f" logs: {self.config.logfile}"
430 )
431 return child_pid
432 else:
433 print("Failed to start daemon — check logs.")
434 return 1
436 # Child process
437 os.setsid()
438 # Second fork to detach from session
439 pid2 = os.fork()
440 if pid2 > 0:
441 os._exit(0)
443 # Grandchild: the actual daemon
444 self._write_pid(os.getpid())
445 atexit.register(self._remove_pid)
447 # Redirect stdin/stdout/stderr
448 devnull = os.open(os.devnull, os.O_RDWR)
449 os.dup2(devnull, sys.stdin.fileno())
450 os.dup2(devnull, sys.stdout.fileno())
451 os.dup2(devnull, sys.stderr.fileno())
452 if devnull > 2:
453 os.close(devnull)
455 self._started_at = time.time()
456 self._logger.info(
457 f"Daemon starting on http://{self.config.host}:{self.config.port}"
458 )
459 self._run_server()
460 return 0
462 def _run_foreground(self) -> int:
463 """Run in foreground (debug mode)."""
464 self._write_pid(os.getpid())
465 atexit.register(self._remove_pid)
466 self._started_at = time.time()
467 print(
468 f"Running in foreground on http://{self.config.host}:{self.config.port}\n"
469 f" health: http://{self.config.host}:{self.config.port}/healthz\n"
470 f" press Ctrl+C to stop"
471 )
472 self._run_server()
473 return 0
475 def _run_server(self) -> None:
476 """Run the uvicorn server (blocking)."""
477 app = self._build_app()
478 uvicorn.run(
479 app,
480 host=self.config.host,
481 port=self.config.port,
482 log_level=self.config.log_level,
483 workers=self.config.workers if self.config.workers > 1 else None,
484 timeout_graceful_shutdown=self.config.shutdown_timeout,
485 )
487 # ── Stop ──────────────────────────────────
489 def stop(self) -> bool:
490 """Stop the running daemon via SIGTERM."""
491 pid = self._read_pid()
492 if not pid:
493 print("No running daemon found.")
494 return False
496 self._logger.info(f"Stopping daemon (pid={pid})")
497 print(f"Stopping daemon (pid={pid})...")
498 try:
499 os.kill(pid, signal.SIGTERM)
500 except ProcessLookupError:
501 self._remove_pid()
502 print("Daemon already stopped.")
503 return True
505 # Wait for graceful shutdown
506 timeout = self.config.shutdown_timeout
507 for _ in range(int(timeout * 2)):
508 time.sleep(0.5)
509 try:
510 os.kill(pid, 0)
511 except ProcessLookupError:
512 self._remove_pid()
513 print("Daemon stopped.")
514 return True
516 # Force kill
517 print(f"Daemon did not stop within {timeout}s, sending SIGKILL...")
518 try:
519 os.kill(pid, signal.SIGKILL)
520 except ProcessLookupError:
521 pass
522 self._remove_pid()
523 print("Daemon force-stopped.")
524 return True
526 # ── Restart ───────────────────────────────
528 def restart(self, daemonize: bool = True) -> int:
529 """Stop then start the daemon."""
530 self.stop()
531 time.sleep(1)
532 return self.start(daemonize=daemonize)
534 # ── App building ──────────────────────────
536 def _default_app_factory(self) -> FastAPI:
537 """Default app: minimal standalone with health check."""
538 return create_daemon_app(self.task_queue)
540 def _build_app(self) -> FastAPI:
541 """Build the FastAPI application with memory persistence hooks."""
542 app = self._app_factory()
544 # Inject daemon state into app
545 app.state.daemon = self
546 app.state.task_queue = self.task_queue
548 # Patch lifespan to add memory persistence hooks (v1.14.9)
549 _original_lifespan = getattr(app.router, "lifespan_context", None)
551 @asynccontextmanager
552 async def memory_lifespan(app: FastAPI):
553 """Wrap existing lifespan with memory save/load."""
554 # Load on startup
555 restored = await self.load_memory_snapshot()
556 if restored > 0:
557 self._logger.info(f"Loaded {restored} memory subsystems from snapshot")
559 # Execute original lifespan
560 if _original_lifespan is not None:
561 async with _original_lifespan(app):
562 pass
564 # Yield to the app
565 yield
567 # Save on shutdown
568 await self.save_memory_snapshot()
569 self._logger.info("Memory snapshot saved on shutdown")
571 app.router.lifespan_context = memory_lifespan
573 # Ensure /healthz endpoint exists
574 has_healthz = any(
575 any(r.path == "/healthz" for r in router.routes)
576 for router in app.router.routes
577 if hasattr(router, "routes")
578 )
579 # Also check top-level routes
580 has_healthz = has_healthz or any(
581 getattr(r, "path", "") == "/healthz" for r in app.routes
582 )
584 if not has_healthz:
585 health_router = _make_health_router(self, self.task_queue)
586 app.include_router(health_router)
588 return app
591# ── Health / API routes ─────────────────────────────────
594def _make_health_router(daemon: ServerDaemon, tq: TaskQueue) -> APIRouter:
595 """Create the health check and management router."""
596 router = APIRouter(tags=["daemon"])
598 @router.get("/healthz")
599 async def healthz():
600 """Kubernetes-style health check."""
601 return {
602 "status": "healthy",
603 "uptime_seconds": round(time.time() - (daemon._started_at or time.time()), 1),
604 "active_tasks": tq.active_count(),
605 }
607 @router.get("/healthz/ready")
608 async def ready():
609 """Readiness check."""
610 return {
611 "status": "ready",
612 "active_tasks": tq.active_count(),
613 }
615 @router.get("/api/daemon/status")
616 async def daemon_status():
617 """Full daemon status."""
618 return daemon.status()
620 @router.get("/api/daemon/tasks")
621 async def list_tasks(limit: int = 50):
622 """List background tasks."""
623 tasks = [t.to_dict() for t in tq.list_tasks(limit)]
624 return {"count": len(tasks), "active": tq.active_count(), "tasks": tasks}
626 @router.get("/api/daemon/memory")
627 async def memory_status():
628 """Memory persistence status."""
629 info = daemon.memory_snapshot_info()
630 return {
631 "persistence_enabled": daemon.config.persist_memory,
632 "memory_dir": daemon.config.memory_dir,
633 "snapshot": info,
634 }
636 return router
639# ── Convenience factory ────────────────────────────────
642def create_daemon_app(task_queue: TaskQueue | None = None) -> FastAPI:
643 """Create a minimal standalone daemon FastAPI app."""
644 tq = task_queue or TaskQueue()
646 @asynccontextmanager
647 async def lifespan(app: FastAPI):
648 """Handle startup and shutdown."""
649 yield
650 await tq.shutdown(timeout=10.0)
652 app = FastAPI(
653 title="AgentOS Daemon",
654 version="1.14.9",
655 lifespan=lifespan,
656 )
657 app.add_middleware(
658 CORSMiddleware,
659 allow_origins=["*"],
660 allow_credentials=True,
661 allow_methods=["*"],
662 allow_headers=["*"],
663 )
665 @app.get("/")
666 async def root():
667 return {
668 "service": "AgentOS Daemon",
669 "version": "1.14.9",
670 "endpoints": {
671 "health": "/healthz",
672 "ready": "/healthz/ready",
673 "status": "/api/daemon/status",
674 "tasks": "/api/daemon/tasks",
675 },
676 }
678 return app
681# ── Module-level singleton ─────────────────────────────
683_daemon_instance: ServerDaemon | None = None
686def get_daemon(config: DaemonConfig | None = None) -> ServerDaemon:
687 """Get or create the module-level daemon singleton."""
688 global _daemon_instance
689 if _daemon_instance is None:
690 _daemon_instance = ServerDaemon(config=config)
691 return _daemon_instance
694# ── Main entry point ──────────────────────────────────
697def daemon_main(args: list[str] | None = None) -> int:
698 """CLI entry point for daemon commands."""
699 import argparse
701 parser = argparse.ArgumentParser(
702 prog="agentos-daemon",
703 description="AgentOS Independent Server Daemon",
704 )
705 sub = parser.add_subparsers(dest="command", required=True)
707 sub.add_parser("start", help="Start daemon in background")
708 sub.add_parser("run", help="Run in foreground")
709 sub.add_parser("stop", help="Stop running daemon")
710 sub.add_parser("restart", help="Stop then start")
711 sub.add_parser("status", help="Show daemon status")
713 ns = parser.parse_args(args)
715 daemon = get_daemon()
717 if ns.command == "start":
718 return daemon.start(daemonize=True)
719 elif ns.command == "run":
720 return daemon.start(daemonize=False)
721 elif ns.command == "stop":
722 return 0 if daemon.stop() else 1
723 elif ns.command == "restart":
724 return daemon.restart()
725 elif ns.command == "status":
726 s = daemon.status()
727 if s["running"]:
728 print(
729 f"RUNNING (pid={s['pid']})\n"
730 f" url: http://{s['host']}:{s['port']}\n"
731 f" uptime: {s['uptime_seconds']}s\n"
732 f" tasks: {s['active_tasks']} active"
733 )
734 else:
735 print("STOPPED")
736 return 0
738 return 1
741if __name__ == "__main__":
742 sys.exit(daemon_main())