Source code for scitex_agent_container.agent_meta

"""Rich agent metadata collection (claude-hud-style).

Canonical source of truth for the metadata payload that is:
  1. Emitted by ``scitex-agent-container show-status <name> --json``.
  2. POSTed by the MCP sidecar heartbeat to ``/api/agents/register/``.

Ported 2026-04-12 from the pre-restructure
``~/.scitex/orochi/agents/mamba-healer-mba/scripts/agent_meta.py`` — the
fleet script now lives at ``~/.scitex/orochi/shared/scripts/agent_meta.py``
(2026-04-17 runtime/ layout) and shells out to this module via ``sac
status --json``, so the collection logic still lives in one place.

Every field is best-effort: any failure leaves the field as its default
(``""``, ``0``, ``0.0``, ``[]``) and never raises. The caller merges this
dict on top of the base ``agent_status`` result.
"""

from __future__ import annotations

import json
import os
import re
import socket
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from .claude_usage import fetch_usage


[docs] def detect_multiplexer(session: str) -> str: """Return 'tmux', 'screen', or '' if neither reports the session.""" try: if ( subprocess.run( ["tmux", "has-session", "-t", session], capture_output=True, ).returncode == 0 ): return "tmux" except ( FileNotFoundError ): # stx-allow: fallback (reason: file may not exist on first use) pass try: r = subprocess.run( ["screen", "-ls", session], capture_output=True, text=True, ) if session in r.stdout: return "screen" except ( FileNotFoundError ): # stx-allow: fallback (reason: file may not exist on first use) pass return ""
[docs] def _encode_claude_project(workdir: str) -> str: """Replicate Claude Code's cwd -> projects dir name encoding. ``/`` and ``.`` both become ``-``, but triple-or-more dashes that come from hidden dirs (``/.foo``) are collapsed back to ``--``. """ encoded = workdir.replace("/", "-").replace(".", "-") return re.sub(r"-{3,}", "--", encoded)
def _latest_jsonls(workdir: str) -> list[Path]: # Claude Code encodes the *resolved* cwd, so follow symlinks first. # stx-allow: fallback (reason: broken symlink or cross-device path can # raise — raw workdir string is an acceptable fallback for encoding) try: resolved = str(Path(workdir).expanduser().resolve()) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) resolved = workdir proj_dir = Path.home() / ".claude" / "projects" / _encode_claude_project(resolved) if not proj_dir.is_dir(): return [] # stx-allow: fallback (reason: concurrent file deletion between glob and # stat() causes OSError — return empty list rather than raising) try: return sorted( proj_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True, ) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) return []
[docs] def _parse_skills(workdir: str) -> list[str]: """Parse ```skills fenced code block from workspace CLAUDE.md.""" skills: list[str] = [] # stx-allow: fallback (reason: CLAUDE.md may be absent or unreadable; # empty skills list is an acceptable result for unconfigured agents) try: cmd = Path(workdir) / "CLAUDE.md" if cmd.is_file(): text = cmd.read_text() for block in re.findall(r"```skills\n(.*?)\n```", text, re.DOTALL): for ln in block.splitlines(): ln = ln.strip() if ln and not ln.startswith("#"): skills.append(ln) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) pass return skills
_SUBAGENT_MARKER_RE = re.compile( r"(\d+)\s+local\s+agents?(?:\s+still)?\s+running", re.IGNORECASE, )
[docs] def parse_subagent_count_from_pane_text(pane: str) -> int: """Return the subagent count advertised by Claude Code's status marker. Claude Code emits a line of the form ``N local agent(s) running`` (or ``... still running``) in the tmux pane while subagent ``Agent`` calls are in flight. Match that marker (anchored on the literal ``running`` trailer so chat text that merely mentions "local agent" can't false-positive us). Anything else (no marker, empty pane) is reported as ``0``. """ if not pane: return 0 m = _SUBAGENT_MARKER_RE.search(pane) return int(m.group(1)) if m else 0
def _subagent_count_from_pane(session: str, multiplexer: str) -> int: if multiplexer != "tmux": return 0 # stx-allow: fallback (reason: session may not exist yet; 0 is the # correct "unknown" sentinel — never block a heartbeat on tmux error) try: pane = subprocess.run( ["tmux", "capture-pane", "-t", session, "-p"], capture_output=True, text=True, ).stdout except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) return 0 return parse_subagent_count_from_pane_text(pane)
[docs] def _capture_pane(session: str, multiplexer: str, max_chars: int = 10000) -> str: """Return the current tmux pane contents, truncated. Empty on error.""" if multiplexer != "tmux": return "" # stx-allow: fallback (reason: session may have exited between the # has-session check and capture-pane — empty string is safe for callers) try: out = ( subprocess.run( ["tmux", "capture-pane", "-t", session, "-p", "-J"], capture_output=True, text=True, ).stdout or "" ) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) return "" if len(out) > max_chars: out = out[-max_chars:] return out
_SECRET_PATTERNS = [ re.compile(r"(sk-ant-[A-Za-z0-9_-]+)"), re.compile(r"(wks_[A-Za-z0-9]+)"), re.compile( r"((?:token|secret|api[_-]?key|password|bearer)\s*[=:]\s*)(\S+)", re.IGNORECASE, ), ] def _redact_secrets(text: str) -> str: if not text: return "" s = text for pat in _SECRET_PATTERNS: if pat.groups == 2: s = pat.sub(lambda m: m.group(1) + "***REDACTED***", s) else: s = pat.sub("***REDACTED***", s) return s
[docs] def _classify_pane_state(pane_text: str) -> tuple[str, str]: """Heuristic pane-state classifier. Returns (state, stuck_prompt_text). States: - "running": agent is actively working (prompt >_ present, no stuck marker) - "idle_prompt": prompt visible, no recent activity - "y_n_prompt": y/n prompt blocking - "auth_error": credential error shown - "compose_pending_unsent": user text typed but not yet submitted - "limit_reached": Anthropic rate limit warning visible - "unknown": nothing matched """ if not pane_text: return "unknown", "" tail = pane_text[-2000:] lower = tail.lower() # auth_error patterns — Claude Code surfaces a few wordings depending on # which auth path failed. Keep the list literal-string so adding new # variants is obvious; lower-case match because the source casing varies. _auth_markers = ( "invalid api key", "invalid authentication credentials", # Anthropic API 401 body "please re-run /login", "please run /login", # Claude Code 2.1.x wording "authentication_error", # raw API error type ) if any(m in lower for m in _auth_markers): # Return the line that actually contains the auth marker so the # snippet is operationally useful (a human reading the dashboard # sees "/login" / "401" rather than the trailing bare prompt). snippet = "" for ln in reversed(tail.strip().splitlines()): if any(m in ln.lower() for m in _auth_markers): snippet = ln.strip()[:200] break if not snippet: snippet = tail.strip().splitlines()[-1][:200] return "auth_error", snippet if "limit reached" in lower or "resets in" in lower: return "limit_reached", "" if re.search(r"\(y/n\)|\[y/n\]|\(yes/no\)|\[yes/no\]", lower): return "y_n_prompt", tail.strip().splitlines()[-1][:200] # compose_pending: ❯ followed by non-whitespace on the SAME line. # Use `[^\s\n]` (non-newline, non-whitespace) to avoid matching the # decorative dashed separator that lives a line below the empty # prompt — earlier `❯\s+\S` greedily crossed the newline and lit # compose_pending for every freshly-booted agent. if re.search(r"❯[ \t]+\S", tail): return "compose_pending_unsent", "" if "❯" in tail or ">" in tail: return "running", "" return "unknown", ""
# Note: "waiting" (freshly booted, never received work) is intentionally # NOT detected here. The earlier draft relied on the claude-hud # statusline `Context ░░░░░░░░░░ 0%` marker, but claude-hud is an # external tool not present in every install. The dashboard derives # "waiting" instead from the hub-side `last_tool_at` field — an agent # that is connected but has never recorded a tool call is waiting, # regardless of what its pane statusline looks like.
[docs] def _config_candidates(workdir: str, filename: str) -> list[Path]: """Return a prioritised list of candidate locations for ``filename``. Historically only ``<workdir>/<filename>`` was probed, which meant agents whose workspace wasn't provisioned with that file pushed an empty ``claude_md`` / ``mcp_json`` to the hub. Walk a wider set of plausible locations so every agent gets populated content: 1. ``<workdir>/<filename>`` 2. ``<workdir>/.claude/<filename>`` (nested config style) 3. Legacy sibling ``<workdir-parent>/mamba-<name>/<filename>`` 4. Nearest enclosing git-root ``<filename>`` 5. ``~/.claude/<filename>`` (user-global fallback) 6. ``~/<filename>`` """ home = Path.home() cands: list[Path] = [] if workdir: p = Path(workdir) cands += [p / filename, p / ".claude" / filename] if p.parent.name == "workspaces": cands.append(p.parent / f"mamba-{p.name}" / filename) # stx-allow: fallback (reason: git root walk can fail on pathological # filesystems — missing git root just skips that candidate) try: git_root = p while git_root != git_root.parent and not (git_root / ".git").exists(): git_root = git_root.parent if (git_root / ".git").exists(): cands.append(git_root / filename) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) pass cands += [home / ".claude" / filename, home / filename] # Dedup preserving order. seen: set[str] = set() uniq: list[Path] = [] for c in cands: k = str(c) if k in seen: continue seen.add(k) uniq.append(c) return uniq
def _read_claude_md(workdir: str, max_chars: int = 20000) -> str: for p in _config_candidates(workdir, "CLAUDE.md"): # stx-allow: fallback (reason: permission error on one candidate # must not prevent trying the next — best-effort file read) try: if not p.is_file(): continue return p.read_text(errors="replace")[:max_chars] except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) continue return "" def _redact_mcp_tree(obj): if isinstance(obj, dict): out = {} for k, v in obj.items(): if isinstance(v, str) and any( t in k.upper() for t in ("TOKEN", "SECRET", "KEY", "PASSWORD") ): out[k] = "***REDACTED***" else: out[k] = _redact_mcp_tree(v) return out if isinstance(obj, list): return [_redact_mcp_tree(x) for x in obj] return obj def _read_mcp_json(workdir: str, max_chars: int = 10000) -> str: for p in _config_candidates(workdir, ".mcp.json"): # stx-allow: fallback (reason: permission error on one candidate # must not prevent trying the next — best-effort file read) try: if not p.is_file(): continue raw = p.read_text(errors="replace") except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) continue # stx-allow: fallback (reason: corrupt JSON falls back to raw-with- # redaction rather than raising — collect_rich is best-effort) try: doc = json.loads(raw) pretty = json.dumps(_redact_mcp_tree(doc), indent=2) return pretty[:max_chars] except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) return _redact_secrets(raw[:max_chars]) return ""
[docs] def _parse_mcp_servers(workdir: str) -> list[dict[str, Any]]: """Return a structured summary of MCP servers configured for this agent. Parses ``<workdir>/.mcp.json`` into a flat list of ``{name, transport, url_host, command}`` entries so the dashboard can render a setup-audit table alongside installed plugins. URL hosts (not full URLs) and commands (not args) are surfaced because that is enough to verify the server is pointing at the right endpoint without exposing query-string secrets. Returns [] if the file is missing or malformed — callers never get ``None``. """ try: p = Path(workdir) / ".mcp.json" if not p.is_file(): return [] doc = json.loads(p.read_text(errors="replace")) except Exception: return [] if not isinstance(doc, dict): return [] servers = doc.get("mcpServers") if not isinstance(servers, dict): return [] out: list[dict[str, Any]] = [] for sname, sconf in servers.items(): if not isinstance(sconf, dict): continue transport = sconf.get("type") or sconf.get("transport") url_host: str | None = None url_val = sconf.get("url") if isinstance(url_val, str): try: from urllib.parse import urlparse url_host = urlparse(url_val).hostname or None except Exception: url_host = None command = sconf.get("command") if not isinstance(command, str): command = None out.append( { "name": sname, "transport": transport if isinstance(transport, str) else None, "url_host": url_host, "command": command, } ) return out
def _pids_from_session(session: str, multiplexer: str) -> tuple[int, int]: pid = 0 ppid = 0 if multiplexer != "tmux": return pid, ppid # stx-allow: fallback (reason: tmux session may not exist yet or pgrep # may return no results — pid/ppid of 0 is a valid "unknown" sentinel) try: out = ( subprocess.run( ["tmux", "list-panes", "-t", session, "-F", "#{pane_pid}"], capture_output=True, text=True, ) .stdout.strip() .splitlines() ) if out: ppid = int(out[0]) ps = ( subprocess.run( ["pgrep", "-P", str(ppid), "-f", "claude"], capture_output=True, text=True, ) .stdout.strip() .splitlines() ) pid = int(ps[0]) if ps else ppid except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) pass return pid, ppid
[docs] def _read_sdk_session_state(name: str, workdir: str) -> dict | None: """Surface ``runtime: claude-session`` state on the status JSON. Returns ``None`` for agents that aren't using the SDK runtime (heartbeat file absent). For SDK agents, returns a dict with the persisted session id, accumulated per-turn token totals, and the latest heartbeat state. Best-effort: any IO / parse failure yields ``None`` so non-SDK agents never see this field populated and SDK agents on transient state-dir glitches degrade silently. """ try: from ._runners import claude_session as _runner except Exception: # stx-allow: fallback (reason: import path may differ in tests / partial installs — collect_rich is best-effort) return None # Try the project-local state root first (matches the runtime # adapter's _project_runtime_root logic — keeps the read symmetric # with the write path). # Walk from cwd, NOT workdir: ``workdir`` may point at a /tmp # scratch dir while the agent's YAML lives under a project-scope # repo. cwd is what discovery already uses on ``sac start``, so # the read here stays symmetric with the write. try: from scitex_config._ecosystem import local_state scope = local_state.find_project_scope("agent-container") except Exception: # stx-allow: fallback (reason: scitex-config optional) scope = None state_dir = ( (scope / "runtime" / name) if scope is not None else _runner.state_dir_for(name) ) if not (state_dir / "heartbeat.json").is_file(): return None return { "session_id": _runner.read_session_id(state_dir), "quota": _runner.read_quota(state_dir), "heartbeat": _runner.read_heartbeat(state_dir), "state_dir": str(state_dir), }
[docs] def collect_rich( *, name: str, workdir: str, session: str, ) -> dict[str, Any]: """Collect claude-hud-style metadata for one agent. Parameters ---------- name: Agent name (used only as a fallback identifier). workdir: Absolute workspace dir for the agent (used to locate CLAUDE.md and the Claude Code transcript JSONL files). session: Multiplexer session name (what ``tmux has-session -t`` checks). """ multiplexer = detect_multiplexer(session) # ---- statusline JSON (authoritative, written by sac-statusline) -------- # Prefer the persisted JSON that Claude Code's statusLine command writes # each turn — it contains the exact used_percentage from /context and # authoritative rate-limit resets. Falls back to JSONL approximation when # the file is absent (agent not yet launched with sac-statusline wired). _sl: dict = {} # stx-allow: fallback (reason: statusline module is optional; import or # read failure falls back to JSONL approximation — collect_rich is best-effort) try: from .statusline import read_statusline_json _sl = read_statusline_json(name) or {} except Exception: pass # ---- transcript-derived fields ---------------------------------- context_pct = 0.0 current_tool = "" current_tool_input = "" current_task = "" last_user_msg = "" last_activity = "" model = "" started_at = "" # Seed context_pct from statusline JSON if available (overridden below # by the JSONL scan only when the statusline file is absent). if _sl: _cw = _sl.get("context_window") or {} _cw_pct = _cw.get("used_percentage") if _cw_pct is not None: context_pct = round(float(_cw_pct), 1) _sl_model = (_sl.get("model") or {}).get("display_name", "") if _sl_model: model = _sl_model jsonls = _latest_jsonls(workdir) if jsonls: # stx-allow: fallback (reason: stat() can race with file deletion; # missing started_at is acceptable — field stays empty) try: earliest = min(jsonls, key=lambda p: p.stat().st_mtime) started_at = datetime.fromtimestamp( earliest.stat().st_mtime, tz=timezone.utc ).isoformat() except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) pass # stx-allow: fallback (reason: JSONL may be unreadable mid-rotate; # empty lines list means no transcript fields — non-fatal) try: lines = jsonls[0].read_text().splitlines()[-50:] except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) lines = [] for line in reversed(lines): # stx-allow: fallback (reason: individual JSONL lines may be # truncated mid-write; skip invalid lines and keep scanning) try: obj = json.loads(line) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) continue if obj.get("type") == "assistant" and "message" in obj: msg = obj["message"] if not model: model = msg.get("model", "") if not last_activity: last_activity = obj.get("timestamp", "") # Only fall back to the JSONL approximation when the # authoritative statusline JSON didn't supply a value. if not _sl: u = msg.get("usage", {}) total = ( u.get("input_tokens", 0) + u.get("cache_read_input_tokens", 0) + u.get("cache_creation_input_tokens", 0) ) # Opus 4.6 1M context = 1,000,000 tokens context_pct = round((total / 1_000_000) * 100, 1) break # Find the most recent tool_use AND its input preview, so the # dashboard can show "Bash: docker compose build" instead of just # "Bash". Per ywatanabe complaint msg 5481. for line in reversed(lines): # stx-allow: fallback (reason: truncated JSONL line during rotation; # skip and keep scanning for last valid tool_use) try: obj = json.loads(line) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) continue if obj.get("type") == "assistant": content = obj.get("message", {}).get("content", []) for c in content: if c.get("type") == "tool_use": current_tool = c.get("name", "") tool_input = c.get("input", {}) or {} # Heuristic preview by tool kind: if current_tool == "Bash": preview = tool_input.get("description") or tool_input.get( "command", "" ) elif current_tool in ("Edit", "Write", "Read"): preview = tool_input.get("file_path", "") elif current_tool == "Grep": preview = tool_input.get("pattern", "") elif current_tool == "Glob": preview = tool_input.get("pattern", "") elif current_tool == "Agent": preview = tool_input.get( "description", "" ) or tool_input.get("subagent_type", "") elif current_tool.startswith("mcp__"): preview = ( tool_input.get("text", "") or tool_input.get("chat_id", "") or tool_input.get("query", "") ) else: preview = "" if isinstance(preview, str): current_tool_input = preview[:120].strip() break if current_tool: break # Find the most recent USER message — gives the dashboard a # "what was this agent last asked to do" snippet which is more # meaningful than the tool name alone. for line in reversed(lines): # stx-allow: fallback (reason: truncated JSONL line during rotation; # skip and keep scanning for last valid user message) try: obj = json.loads(line) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) continue if obj.get("type") == "user" and "message" in obj: msg = obj["message"] content = msg.get("content") if isinstance(content, str): last_user_msg = content[:200].strip() elif isinstance(content, list): parts = [] for c in content: if isinstance(c, dict): if c.get("type") == "text": parts.append(c.get("text", "")) elif c.get("type") == "tool_result": # Skip tool results — they're noise here pass last_user_msg = " ".join(parts)[:200].strip() if last_user_msg: break # current_task is the high-level "what is this agent doing": prefer # the tool preview, then the last user message snippet, then the bare # tool name. Never empty if the agent is alive. if current_tool and current_tool_input: current_task = f"{current_tool}: {current_tool_input}" elif current_tool: current_task = current_tool elif last_user_msg: current_task = last_user_msg # ---- process / session / skills --------------------------------- subagent_count = _subagent_count_from_pane(session, multiplexer) pid, ppid = _pids_from_session(session, multiplexer) skills_loaded = _parse_skills(workdir) # ---- terminal pane + classified state --------------------------- # All of these are deterministic (no LLM). tmux capture-pane is the # only I/O beyond file reads; redaction strips tokens before any # downstream consumer sees the data. raw_pane = _capture_pane(session, multiplexer) pane_text = _redact_secrets(raw_pane) pane_state, stuck_prompt_text = _classify_pane_state(pane_text) # ---- workspace file snapshots ----------------------------------- claude_md = _read_claude_md(workdir) mcp_json = _read_mcp_json(workdir) mcp_servers = _parse_mcp_servers(workdir) # ---- hook-captured tool / prompt log ---------------------------- # Populated by `scitex-agent-container ingest-hook-event` entries wired into # the agent's .claude/settings.local.json. Non-agentic: pure ring- # buffer read. # stx-allow: fallback (reason: event_log DB may not exist on agents that # haven't run a hook yet — empty summary is a valid initial state) try: from .event_log import summarize as _summarize_events _event_summary = _summarize_events(name, limit=50) except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) _event_summary = { "recent_tools": [], "recent_prompts": [], "agent_calls": [], "open_agent_calls": [], "background_tasks": [], "counts": {}, } # Use canonical fleet name (e.g. "nas" instead of "DXP480TPLUS-994") # stx-allow: fallback (reason: resolve_hostname reads a YAML that may be # absent on unconfigured hosts — raw gethostname is an acceptable fallback) try: from .config._host import resolve_hostname machine = resolve_hostname() except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) machine = socket.gethostname().split(".")[0] # ---- Claude quota fields ---------------------------------------- quota_5h_used_pct: float | None = None quota_7d_used_pct: float | None = None quota_5h_reset_at: str | None = None quota_7d_reset_at: str | None = None quota_from_cache: bool = False quota_error: str | None = None # Prefer statusline JSON rate-limits (exact values from Claude Code) # over the fetch_usage scrape when available. if _sl: _rl = _sl.get("rate_limits") or {} _fh = _rl.get("five_hour") or {} _sd = _rl.get("seven_day") or {} if _fh.get("used_percentage") is not None: quota_5h_used_pct = round(float(_fh["used_percentage"]), 1) if _sd.get("used_percentage") is not None: quota_7d_used_pct = round(float(_sd["used_percentage"]), 1) quota_5h_reset_at = _fh.get("resets_at") or None quota_7d_reset_at = _sd.get("resets_at") or None quota_from_cache = False # live from statusline, not cached if quota_5h_used_pct is None: # stx-allow: fallback (reason: fetch_usage may fail on network timeout # or missing credentials; quota_error captures the reason for callers) try: usage = fetch_usage() quota_5h_used_pct = usage.get("used_pct_5h") quota_7d_used_pct = usage.get("used_pct_7d") quota_5h_reset_at = usage.get("reset_at_5h") quota_7d_reset_at = usage.get("reset_at_7d") quota_from_cache = bool(usage.get("from_cache", False)) quota_error = usage.get("error") except Exception as exc: # stx-allow: fallback quota_error = f"fetch_usage raised: {exc}" # ---- Account / credential identity ------------------------------------ # Pull the full non-secret credentials view so downstream consumers # can render plan, plugins, statusline command, and auth-rotation # state without re-scanning ~/.claude/. The dashboard previously # showed billing_type ("stripe_subscription") as the plan, which is # wrong — the real plan comes from rateLimitTier in credentials.json # and is normalized to plan_label here. account_email: str | None = None account_plan_label: str | None = None account_subscription_type: str | None = None account_rate_limit_tier: str | None = None account_organization_name: str | None = None account_uuid: str | None = None oauth_expires_at: int | None = None installed_plugins: list = [] status_line_command: str | None = None # stx-allow: fallback (reason: credentials file absent on freshly # provisioned agents — account_email stays None until auth completes) try: from .credentials import read_credentials_metadata _cred = read_credentials_metadata() account_email = _cred.get("email_address") account_plan_label = _cred.get("plan_label") account_subscription_type = _cred.get("subscription_type") account_rate_limit_tier = _cred.get("rate_limit_tier") account_organization_name = _cred.get("organization_name") account_uuid = _cred.get("account_uuid") _expires = _cred.get("oauth_expires_at") if isinstance(_expires, int): oauth_expires_at = _expires _plugins = _cred.get("installed_plugins") if isinstance(_plugins, list): installed_plugins = _plugins _slc = _cred.get("status_line_command") if isinstance(_slc, str): status_line_command = _slc except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) pass # ---- Auth-rotation tracking ------------------------------------------- # When Claude Code rotates the OAuth token the expiresAt timestamp # jumps. We append one NDJSON line per observed change, keyed on the # email so the dashboard can show "this email has rotated N times, # last at T". The log is local-only (not pushed as a bulk field) # because the hub should dedupe rotations on its side from the # per-heartbeat oauth_expires_at field. if account_email and isinstance(oauth_expires_at, int): try: rot_dir = Path.home() / ".scitex" / "agent-container" / "auth-rotations" rot_dir.mkdir(parents=True, exist_ok=True) rot_file = rot_dir / f"{account_email}.ndjson" last_expires: int | None = None if rot_file.is_file(): try: for line in reversed(rot_file.read_text().splitlines()): line = line.strip() if not line: continue obj = json.loads(line) if isinstance(obj, dict) and isinstance( obj.get("oauth_expires_at"), int ): last_expires = obj["oauth_expires_at"] break except Exception: last_expires = None if last_expires != oauth_expires_at: entry = { "ts": datetime.now(tz=timezone.utc).isoformat(), "email": account_email, "account_uuid": account_uuid, "oauth_expires_at": oauth_expires_at, "plan_label": account_plan_label, } with rot_file.open("a", encoding="utf-8") as fh: fh.write(json.dumps(entry) + "\n") except Exception: pass # ---- Machine resource metrics (psutil, optional) ----------------------- # stx-allow: fallback (reason: psutil is an optional dependency; absent # on minimal installs — metrics dict stays empty, dashboard handles it) try: import psutil as _psutil _cpu_pct = _psutil.cpu_percent(interval=None) _vm = _psutil.virtual_memory() _disk = _psutil.disk_usage("/") _load = _psutil.getloadavg() _cpu_count = _psutil.cpu_count(logical=True) or 0 # stx-allow: fallback (reason: cpu_freq may be None on VMs/containers) try: _freq = _psutil.cpu_freq() _cpu_model = f"{_cpu_count}x @ {_freq.max:.0f}MHz" if _freq else "" except Exception: _cpu_model = "" _metrics = { "cpu_count": _cpu_count, "cpu_model": _cpu_model, "cpu_used_percent": round(_cpu_pct, 1), "load_avg_1m": round(_load[0], 2), "load_avg_5m": round(_load[1], 2), "load_avg_15m": round(_load[2], 2), "mem_used_percent": round(_vm.percent, 1), "mem_total_mb": round(_vm.total / 1024 / 1024, 1), "mem_free_mb": round(_vm.available / 1024 / 1024, 1), "mem_used_mb": round((_vm.total - _vm.available) / 1024 / 1024, 1), "disk_used_percent": round(_disk.percent, 1), "disk_total_mb": round(_disk.total / 1024 / 1024, 1), "disk_used_mb": round(_disk.used / 1024 / 1024, 1), "resource_source": "local", } except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) _metrics = {} return { "multiplexer": multiplexer, "pid": pid, "ppid": ppid, "subagent_count": subagent_count, "subagents": subagent_count, # legacy alias "context_pct": context_pct, "current_tool": current_tool, "current_tool_input": current_tool_input, "current_task": current_task, "last_user_msg": last_user_msg, "last_activity": last_activity, "skills_loaded": skills_loaded, "machine": machine, # scitex-orochi todo#55: canonical FQDN for display next to the # short machine label ("spartan (spartan.hpc.unimelb.edu.au)"). # Falls back to the short name on hosts with no reverse DNS. "hostname_canonical": (socket.getfqdn() or "").strip(), "workdir": workdir, "project": name, # Only override started_at if we found one; caller can decide # whether to prefer the registry's started_at over this one. "started_at_transcript": started_at, # model from transcript is more accurate than config.model when # the agent is actually running under a different model alias. "model_transcript": model, "version": os.environ.get("SCITEX_AGENT_CONTAINER_META_VERSION", "0.2"), # ---- claude-session runtime fields ------------------------------ # ``None`` for non-SDK agents; a dict for claude-session agents # exposing the SDK session id, accumulated per-turn token totals # (read from ``runtime/<name>/quota.json``), and the latest # heartbeat state. Lets ``sac show-status --json`` give parity # surface to the claude-code runtime without conflating the # rate-limit fields below (which come from a different source). "sdk_session": _read_sdk_session_state(name, workdir), # ---- Claude quota fields ---------------------------------------- "quota_5h_used_pct": quota_5h_used_pct, "quota_7d_used_pct": quota_7d_used_pct, "quota_5h_reset_at": quota_5h_reset_at, "quota_7d_reset_at": quota_7d_reset_at, "quota_from_cache": quota_from_cache, "quota_error": quota_error, # ---- Account identity (which Claude account this agent is using) ---- # `account_email` stays as the stable id consumers group on. # `account_plan_label` is the human-readable plan ("Max 20x" etc.) # derived from rateLimitTier — dashboards should prefer this over # billing_type, which only reports payment method. # `oauth_expires_at` is the unix-ms token expiry; a change across # heartbeats indicates the OAuth token was rotated. "account_email": account_email, "account_plan_label": account_plan_label, "account_subscription_type": account_subscription_type, "account_rate_limit_tier": account_rate_limit_tier, "account_organization_name": account_organization_name, "account_uuid": account_uuid, "oauth_expires_at": oauth_expires_at, # ---- Claude Code setup audit (for web-UI setup check) --------------- # `installed_plugins` lists what is installed via /plugin install. # `status_line_command` exposes whatever claude-hud / custom # statusline the user wired up — the hub uses it as a "setup-ok" # signal (is claude-hud wired in? is the sac-statusline wrapper # in place?). "installed_plugins": installed_plugins, "status_line_command": status_line_command, # ---- Machine resource metrics (for hub /api/resources/) ------------- # NOTE: metrics are host-level, not agent-level. When multiple agents # run on the same host they all report identical values; the hub is # expected to dedupe under ``machine`` rather than store N copies. "metrics": _metrics, # ---- Live terminal pane + classified state ------------------------- # Deterministic, non-agentic: tmux capture-pane + regex classifier. # Secrets are redacted in-place before inclusion. "pane_text": pane_text, "pane_state": pane_state, "stuck_prompt_text": stuck_prompt_text, # ---- Workspace file snapshots -------------------------------------- # Full CLAUDE.md (truncated) so downstream consumers do not need # per-host filesystem access. .mcp.json has token-style keys # redacted. mcp_servers is the structured view for setup audit # (name + transport + host/command, nothing sensitive). "claude_md": claude_md, "mcp_json": mcp_json, "mcp_servers": mcp_servers, # ---- Claude Code hook-captured events ------------------------------ # Structured view of the last N events the agent fired through # .claude/settings.local.json hooks. Surfaces full tool inputs # (including Agent prompts and Bash run_in_background starts) so # the dashboard and fleet lead can see what the agent is doing # without relying on tmux scraping. "recent_tools": _event_summary.get("recent_tools") or [], "recent_prompts": _event_summary.get("recent_prompts") or [], "agent_calls": _event_summary.get("agent_calls") or [], "open_agent_calls": _event_summary.get("open_agent_calls") or [], # Scalar summaries for terse projection and healer thresholding. # open_agent_calls_count > 0 means there are Agent pretool events # with no matching posttool — possible stuck subagent. # oldest_open_agent_age_s gives the age of the oldest such call. # Cross-check with subagent_count before alerting (ring-buffer # rotation can produce false positives). "open_agent_calls_count": len(_event_summary.get("open_agent_calls") or []), "oldest_open_agent_age_s": max( ( c.get("age_seconds") or 0 for c in (_event_summary.get("open_agent_calls") or []) ), default=None, ) or None, "background_tasks": _event_summary.get("background_tasks") or [], "tool_counts": _event_summary.get("counts") or {}, # Functional-heartbeat shortcuts — top-level so consumers don't # have to walk recent_tools. last_tool_at updates on every tool # use (LLM-level liveness); last_mcp_tool_at only updates on # mcp__* tool calls (proves the MCP sidecar route is live). "last_tool_at": _event_summary.get("last_tool_at") or "", "last_tool_name": _event_summary.get("last_tool_name") or "", "last_mcp_tool_at": _event_summary.get("last_mcp_tool_at") or "", "last_mcp_tool_name": _event_summary.get("last_mcp_tool_name") or "", # PaneAction attempt-log summary (from action_store). Surfaces # the latest run of any configured action (nonce-probe, compact, # etc.) plus aggregate counts, so the dashboard can chip a # "last probe: alive 12s ago" signal without reading the DB. # Fail-open: absent store / I/O error -> empty summary. **_collect_action_summary_fields(name), }
[docs] def _collect_action_summary_fields(agent_name: str) -> dict[str, Any]: """Return a flat dict of action-summary fields for ``collect_rich``. Runs inside a try/except so a corrupt or missing ``~/.scitex/agent-container/actions.db`` never blocks a heartbeat. All keys are prefixed ``action_`` so consumers know which subsystem they came from. """ # stx-allow: fallback (reason: actions.db may be absent or corrupt; # empty action summary never blocks a heartbeat — fail-open by design) try: from . import action_store summary = action_store.summarize(agent_name) return { "last_action_at": summary.get("last_action_at", ""), "last_action_name": summary.get("last_action_name", ""), "last_action_outcome": summary.get("last_action_outcome", ""), "last_action_elapsed_s": summary.get("last_action_elapsed_s"), "action_counts": summary.get("counts", {}), "p95_elapsed_s_by_action": summary.get("p95_elapsed_s_by_action", {}), } except Exception: # stx-allow: fallback (reason: catch-all safety net — see inline comment for context) return { "last_action_at": "", "last_action_name": "", "last_action_outcome": "", "last_action_elapsed_s": None, "action_counts": {}, "p95_elapsed_s_by_action": {}, }