"""Agent lifecycle management -- start, stop, restart, status."""
from __future__ import annotations
import subprocess
import threading
import time
import traceback
from pathlib import Path
from .config import AgentConfig, load_config, resolve_config
from .health import health_monitor
from .hooks import run_hook
from .registry import Registry
from .runtimes.claude_code import ClaudeCodeRuntime
from .runtimes.slurm import SlurmRuntime
[docs]
def _get_runtime(config: AgentConfig):
"""Return the appropriate runtime for the config."""
if config.runtime == "claude-code":
return ClaudeCodeRuntime()
if config.runtime == "slurm":
return SlurmRuntime()
if config.runtime == "slurm-tenant":
from .runtimes.slurm_tenant import SlurmTenantRuntime
return SlurmTenantRuntime()
raise ValueError(f"Unsupported runtime: {config.runtime}")
[docs]
def _fallback_workdir(name: str) -> str:
"""Return the workdir path used when the agent's YAML can't be loaded.
Canonical 2026-04-17 layout: ``~/.scitex/orochi/runtime/workspaces/<id>/``.
"""
return str(Path.home() / ".scitex" / "orochi" / "runtime" / "workspaces" / name)
[docs]
def _fire_forget_hook(
agent_name: str,
hook_name: str,
commands: list[str],
context: dict | None = None,
) -> None:
"""Invoke ``run_hook`` (non-blocking, handles URL + shell entries).
Called alongside the legacy synchronous ``_run_hooks`` path so
existing YAML pipes/redirects keep working unchanged while
external tools (orochi etc.) can additionally plug in via
``http(s)://`` URLs. The legacy path filters out URL entries to
avoid double-dispatch of the same side-effect.
"""
# stx-allow: fallback (reason: hook dispatch is fire-and-forget; a URL hook failing must never crash the caller)
try:
run_hook(agent_name, hook_name, list(commands or []), context=context)
except Exception: # pragma: no cover # stx-allow: fallback (reason: hook dispatch safety net — hook crashes must not propagate to caller)
import sys
print(
f"[WARN] run_hook {hook_name} dispatch failed for {agent_name}",
file=sys.stderr,
)
[docs]
def _run_hooks(hooks: list[str], extra_env: dict[str, str] | None = None) -> None:
"""Execute a list of shell hook commands.
Args:
hooks: Shell commands to execute.
extra_env: Additional env vars passed to hook subprocesses
(e.g., SCITEX_AGENT_CONTAINER_CONFIG_PATH, SCITEX_AGENT_CONTAINER_SCREEN_NAME, SCITEX_AGENT_CONTAINER_NAME).
"""
import os
env = {**os.environ, **(extra_env or {})}
for hook in hooks:
if not hook:
continue
# URL entries are handled by the new fire-and-forget path
# (see _fire_forget_hook / hooks.run_hook). Skip them here to
# avoid trying to ``sh -c "https://..."``.
if isinstance(hook, str) and hook.startswith(("http://", "https://")):
continue
result = subprocess.run(
hook, shell=True, capture_output=True, text=True, env=env
)
if result.returncode != 0:
# Log but don't fail
import sys
print(f"[WARN] Hook failed: {hook}", file=sys.stderr)
if result.stderr:
print(f" {result.stderr.strip()}", file=sys.stderr)
[docs]
def agent_start(
config_path: str,
registry: Registry | None = None,
no_preflight: bool = False,
force: bool = False,
session_override: str | None = None,
resume_id_override: str | None = None,
dry_run: bool = False,
) -> bool:
"""Start an agent from a YAML config file.
Args:
config_path: Path to the YAML agent definition.
registry: Optional registry instance.
no_preflight: If True, skip SSH preflight checks (useful for slow hosts).
force: If True and the agent is already running, stop it first
and then start fresh. Also tolerates stale registry entries
and ghost screens (via force-stop).
session_override: If set, override config.claude.session for this
start invocation (one of continue-or-new | continue | new | resume).
resume_id_override: If set, override config.claude.resume_id. Pass
with session_override="resume" to launch ``claude --resume <id>``
without editing the YAML.
dry_run: If True, materialize the workspace files but skip the
multiplexer / Claude Code launch and registry registration.
Hooks (pre_start / post_start) are also skipped.
Returns True on success, False on failure.
"""
config_path = resolve_config(config_path)
registry = registry or Registry()
config = load_config(config_path)
if session_override:
config.claude.session = session_override
if resume_id_override is not None:
config.claude.resume_id = resume_id_override
runtime = _get_runtime(config)
# Already running?
if registry.exists(config.name) and runtime.is_running(config):
if not force:
if dry_run:
# Allow dry-run to inspect the planned workspace even
# while the live agent is running — the prep does not
# touch the live tmux session.
pass
else:
raise RuntimeError(f"Agent '{config.name}' is already running")
else:
agent_stop(config.name, registry=registry, force=True)
# Small grace period so the screen is fully torn down before we
# try to create a new one with the same name.
time.sleep(1)
elif force and registry.exists(config.name):
# Registry says it exists but runtime says not running — stale entry.
agent_stop(config.name, registry=registry, force=True)
# Hook env vars — let hooks know about the agent context
hook_env = {
"SCITEX_AGENT_CONTAINER_CONFIG_PATH": str(Path(config_path).resolve()),
"SCITEX_AGENT_CONTAINER_SCREEN_NAME": config.screen_name,
"SCITEX_AGENT_CONTAINER_NAME": config.name,
}
if dry_run:
# Materialize the workspace via the runtime's dry-run path; skip
# hooks, registry, context-manager, health monitor.
try:
return runtime.start(
config, no_preflight=no_preflight, force=force, dry_run=True
)
except TypeError:
# Older runtimes without dry_run support — fail loudly so
# the caller knows this runtime can't dry-run.
raise RuntimeError(
f"runtime {type(runtime).__name__} does not support --dry-run"
)
# ZOO#12 — lead-state-handover plumbing. All three calls are
# best-effort: missing token / 404 / network errors must NOT block
# agent_start. ``ensure_instance_uuid`` writes
# ``SCITEX_AGENT_INSTANCE_UUID`` into ``config.env`` so the runtime's
# ``_build_env_exports`` (claude_code.py) propagates it; the runtime
# is supposed to read it back when wiring up the orochi WS connect
# (FR-E). ``hydrate_from_hub`` is pre-start so the agent's boot-time
# skill can pick up the snapshot before claude actually launches.
from . import _handover as _h
_h.ensure_instance_uuid(config)
try:
_h.hydrate_from_hub(config)
except Exception:
# Defensive: hub_client already swallows transport errors, but
# in case of a serialization bug here, never let agent_start
# die because of a snapshot fetch.
traceback.print_exc()
# Pre-start hooks
_run_hooks(config.hooks.get("pre_start", []), extra_env=hook_env)
_fire_forget_hook(config.name, "pre_start", config.hooks.get("pre_start", []))
# Start — pass ``force`` through so remote dispatchers (SSHRemote)
# can relay ``--force`` to the remote CLI and skip its own
# already-running check.
# Config-level no_preflight overrides CLI flag
if config.remote.no_preflight:
no_preflight = True
success = runtime.start(config, no_preflight=no_preflight, force=force)
if not success:
raise RuntimeError(f"Failed to start agent '{config.name}'")
# Register
registry.add(
name=config.name,
config_path=str(Path(config_path).resolve()),
screen_name=config.screen_name,
)
# Post-start hooks
_run_hooks(config.hooks.get("post_start", []), extra_env=hook_env)
_fire_forget_hook(config.name, "post_start", config.hooks.get("post_start", []))
# Start context-management sensor in background if enabled
if config.context_management.enabled:
# stx-allow: fallback (reason: context_manager sensor spawn may fail if tmux is unavailable; agent has already started and a sensor failure must not abort it)
try:
from .context_manager import start_sensor
start_sensor(config)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
import sys
print(
f"[WARN] context_manager failed to start for {config.name}",
file=sys.stderr,
)
traceback.print_exc()
# Start health monitor in background if enabled
if config.health.enabled:
thread = threading.Thread(
target=health_monitor,
args=(config.name, config, registry, lambda c: _get_runtime(c).start(c)),
daemon=True,
)
thread.start()
# ZOO#12 FR-B — priority-failback poller. No-op when the spec lacks
# a ``priority_list``; otherwise polls the hub every 60 s and steps
# aside (snapshot push + SIGTERM) when a higher-priority host is
# healthy. Daemon thread, dies with the process.
try:
_h.start_failback_poller(config)
except Exception:
traceback.print_exc()
return True
[docs]
def agent_stop(
name: str,
registry: Registry | None = None,
force: bool = False,
) -> bool:
"""Stop a running agent by name.
Args:
name: Agent name.
registry: Optional registry instance.
force: If True, do not fail when the agent is missing from the
registry or when hooks/runtime.stop() raise; wipe stale
state and return True. Useful for bulk cleanup.
"""
registry = registry or Registry()
entry = registry.get(name)
if entry is None:
if force:
return True
raise RuntimeError(f"Agent '{name}' not found in registry")
# stx-allow: fallback (reason: YAML file may have been deleted while the agent was registered; force-stop must succeed even without a config)
try:
config = load_config(entry["config"])
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
if not force:
raise
# Config gone — just nuke the registry entry
registry.remove(name)
return True
runtime = _get_runtime(config)
hook_env = {
"SCITEX_AGENT_CONTAINER_CONFIG_PATH": str(Path(entry["config"]).resolve()),
"SCITEX_AGENT_CONTAINER_SCREEN_NAME": config.screen_name,
"SCITEX_AGENT_CONTAINER_NAME": config.name,
}
# ZOO#12 FR-A — push a sentinel snapshot to the hub right before
# the agent stops, so a future agent_start (here or on a different
# host) can hydrate. Best-effort: never block the stop path on a
# hub outage. The sentinel is a marker; the agent's own pre_stop
# hook is the right place for richer state (transcript, memory).
try:
from . import _handover as _h
_h.push_pre_stop_snapshot(config)
except Exception:
traceback.print_exc()
# Pre-stop hooks
# stx-allow: fallback (reason: hook commands may reference paths or env vars absent at stop time; force-stop must continue regardless)
try:
_run_hooks(config.hooks.get("pre_stop", []), extra_env=hook_env)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
if not force:
raise
_fire_forget_hook(config.name, "pre_stop", config.hooks.get("pre_stop", []))
# stx-allow: fallback (reason: tmux/screen session may already be dead; force-stop should still proceed to clean up registry)
try:
runtime.stop(config)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
if not force:
raise
# Post-stop hooks
# stx-allow: fallback (reason: post-stop hooks are best-effort notification; a failed hook must not prevent registry cleanup)
try:
_run_hooks(config.hooks.get("post_stop", []), extra_env=hook_env)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
if not force:
raise
_fire_forget_hook(config.name, "post_stop", config.hooks.get("post_stop", []))
registry.remove(name)
return True
[docs]
def agent_stop_all(
registry: Registry | None = None,
force: bool = False,
) -> list[tuple[str, bool, str]]:
"""Stop every agent in the registry.
Returns a list of ``(name, success, message)`` tuples, one per agent.
With ``force=True``, continues through errors so a partial failure
doesn't block cleanup of the rest.
"""
registry = registry or Registry()
results: list[tuple[str, bool, str]] = []
for entry in registry.list_all():
name = entry.get("name", "?")
# stx-allow: fallback (reason: stopping one agent may fail due to a missing config or dead session; other agents in the registry should still be stopped)
try:
agent_stop(name, registry=registry, force=force)
results.append((name, True, "stopped"))
except Exception as exc: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
results.append((name, False, str(exc)))
if not force:
break
return results
[docs]
def agent_restart(name: str, registry: Registry | None = None) -> bool:
"""Restart an agent by name."""
registry = registry or Registry()
entry = registry.get(name)
if entry is None:
raise RuntimeError(f"Agent '{name}' not found in registry")
config_path = entry["config"]
agent_stop(name, registry)
time.sleep(2)
return agent_start(config_path, registry)
[docs]
def agent_status(name: str, registry: Registry | None = None) -> dict:
"""Get detailed status for an agent."""
registry = registry or Registry()
entry = registry.get(name)
if entry is None:
raise RuntimeError(f"Agent '{name}' not found in registry")
# stx-allow: fallback (reason: YAML or runtime may be unavailable; status should degrade to stopped=False rather than raise)
try:
config = load_config(entry["config"])
runtime = _get_runtime(config)
running = runtime.is_running(config)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
traceback.print_exc()
running = False
config = None
result = {
"name": name,
"config": entry.get("config", ""),
"screen": entry.get("screen", ""),
"started_at": entry.get("started_at", ""),
"status": "running" if running else "stopped",
"model": config.model if config else "unknown",
"runtime": config.runtime if config else "unknown",
}
if config and config.remote.is_remote:
result["remote"] = config.remote.host
# Hook-points / listen / extensions plumbing (todo#286 Phase 4).
# Counts are exposed so consumers can see what's wired up; command
# bodies are intentionally NOT echoed to avoid leaking URLs or
# secrets through status --json.
if config is not None:
hooks = config.hooks or {}
result["hooks_configured"] = {
key: len(hooks.get(key, []) or [])
for key in (
"pre_start",
"post_start",
"pre_stop",
"post_stop",
"on_compact",
"on_restart",
"on_diff",
)
}
result["listen"] = [
{
"port": lp.port,
"proto": lp.proto,
"path": lp.path,
"name": lp.name,
"owner": lp.owner,
}
for lp in (config.listen or [])
]
# Opaque pass-through — echoed verbatim.
result["extensions"] = dict(config.extensions or {})
else:
result["hooks_configured"] = {}
result["listen"] = []
result["extensions"] = {}
# Surface context-management state for fleet_watch.sh / NAS orchestrator
# (todo#285). ``None`` when the feature is unconfigured or noop so
# consumers can distinguish "disabled" from "0%".
if config and config.context_management.enabled:
from .context_manager import get_sensor
sensor = get_sensor(name)
result["context_management"] = {
"percent": sensor.last_percent if sensor is not None else None,
"strategy": config.context_management.strategy,
"trigger_at_percent": config.context_management.trigger_at_percent,
}
else:
result["context_management"] = None
# Expose the full agent_meta dict from the live sensor if present
# (todo#285 Phase 2b). This is the transcript-derived source of
# truth used by the dashboard when it's available.
# stx-allow: fallback (reason: context_manager module may be unimported or sensor absent; agent_meta is optional enrichment and None is acceptable)
try:
from .context_manager import get_sensor as _gs
_live = _gs(name)
if _live is not None and _live.last_meta is not None:
result["agent_meta"] = _live.last_meta
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
pass
# Snapshot block — cheap read from cache (todo#286). Never re-gathers.
# stx-allow: fallback (reason: snapshot module may not yet exist or cache may be absent on first run; None snapshot is valid initial state)
try:
from .snapshot import read_latest
latest = read_latest(name)
if latest is not None:
result["snapshot"] = {
"timestamp": latest.get("timestamp"),
"has_diff": latest.get("has_diff", False),
"diff_fields": latest.get("diff_fields", []),
}
else:
result["snapshot"] = None
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
result["snapshot"] = None
# Enrich with claude-hud-style metadata. Canonical source for the
# Agents-tab dashboard; the MCP sidecar heartbeat shells out to this
# command rather than duplicating the logic in TypeScript.
# stx-allow: fallback (reason: agent_meta requires psutil and an active tmux session; metadata enrichment is optional and must never break status)
try:
from .agent_meta import collect_rich
workdir = config.expanded_workdir if config else _fallback_workdir(name)
session = entry.get("screen", "") or (config.screen_name if config else name)
rich = collect_rich(name=name, workdir=workdir, session=session)
# Prefer transcript-derived started_at only if the registry
# doesn't have one.
if not result.get("started_at") and rich.get("started_at_transcript"):
result["started_at"] = rich["started_at_transcript"]
rich.pop("started_at_transcript", None)
rich.pop("model_transcript", None)
# Never let rich overwrite the canonical registry/config fields.
for k, v in rich.items():
result.setdefault(k, v)
except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context)
# Never let metadata collection break status.
pass
return result
[docs]
def agent_logs(name: str, lines: int = 50, registry: Registry | None = None) -> str:
"""Get recent logs from an agent."""
registry = registry or Registry()
entry = registry.get(name)
if entry is None:
raise RuntimeError(f"Agent '{name}' not found in registry")
config = load_config(entry["config"])
runtime = _get_runtime(config)
return runtime.logs(config, lines)