Coverage for little_loops / session_store.py: 78%
191 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Unified session store: a per-project SQLite + FTS5 database (FEAT-1112).
3A single ``.ll/session.db`` indexes tool events, file modifications, issue
4transitions, loop runs, and user corrections so cross-cutting queries
5("which loops failed on issues touching file X?") can be answered in
6milliseconds rather than re-parsing scattered JSON/markdown sources.
8The store is purely additive: it never replaces an existing data path. The
9``SQLiteTransport`` sink subscribes to the EventBus alongside the other
10transports, and the backfill routine seeds the database from on-disk sources
11that the analyze-* skills already read.
13Public API:
14 DEFAULT_DB_PATH: default database location (``.ll/session.db``)
15 SCHEMA_VERSION: current schema version integer
16 ensure_db(path): create the database and apply pending migrations
17 connect(path): open a connection (ensures schema first)
18 SQLiteTransport: EventBus Transport sink writing FSM events to loop_events
19 backfill(db,...): populate the database from existing on-disk sources
20 search(db,...): FTS5 full-text query with BM25 ranking
21 recent(db,...): recent rows for a given event kind
22"""
24from __future__ import annotations
26import hashlib
27import json
28import logging
29import sqlite3
30import threading
31from datetime import UTC, datetime
32from pathlib import Path
33from typing import Any
35logger = logging.getLogger(__name__)
37DEFAULT_DB_PATH = Path(".ll/session.db")
38SCHEMA_VERSION = 1
40_VALID_KINDS = frozenset({"tool", "file", "issue", "loop", "correction"})
41_KIND_TABLE = {
42 "tool": "tool_events",
43 "file": "file_events",
44 "issue": "issue_events",
45 "loop": "loop_events",
46 "correction": "user_corrections",
47}
49# FSM event types the SQLiteTransport records as loop_events rows.
50_LOOP_EVENT_TYPES = frozenset(
51 {
52 "loop_start",
53 "loop_resume",
54 "loop_complete",
55 "state_enter",
56 "route",
57 "retry_exhausted",
58 "cycle_detected",
59 }
60)
63# ---------------------------------------------------------------------------
64# Schema + migrations
65# ---------------------------------------------------------------------------
67# Each entry is the full SQL applied to move the schema from version index to
68# index+1. Migration 0 bootstraps the whole schema; append new entries to
69# evolve it. ``bytes_in`` / ``bytes_out`` / ``cache_hit`` are reserved on
70# ``tool_events`` for FEAT-1160 (Context Window Analytics) so that feature does
71# not require a follow-up migration.
72_MIGRATIONS: list[str] = [
73 """
74 CREATE TABLE tool_events (
75 id INTEGER PRIMARY KEY AUTOINCREMENT,
76 ts TEXT NOT NULL,
77 session_id TEXT,
78 tool_name TEXT,
79 args_hash TEXT,
80 result_size INTEGER,
81 bytes_in INTEGER,
82 bytes_out INTEGER,
83 cache_hit INTEGER
84 );
85 CREATE TABLE file_events (
86 id INTEGER PRIMARY KEY AUTOINCREMENT,
87 ts TEXT NOT NULL,
88 session_id TEXT,
89 path TEXT,
90 op TEXT,
91 issue_id TEXT,
92 git_sha TEXT
93 );
94 CREATE TABLE issue_events (
95 id INTEGER PRIMARY KEY AUTOINCREMENT,
96 ts TEXT NOT NULL,
97 issue_id TEXT,
98 transition TEXT,
99 discovered_by TEXT
100 );
101 CREATE TABLE loop_events (
102 id INTEGER PRIMARY KEY AUTOINCREMENT,
103 ts TEXT NOT NULL,
104 loop_name TEXT,
105 state TEXT,
106 transition TEXT,
107 retries INTEGER
108 );
109 CREATE TABLE user_corrections (
110 id INTEGER PRIMARY KEY AUTOINCREMENT,
111 ts TEXT NOT NULL,
112 session_id TEXT,
113 content TEXT,
114 source TEXT
115 );
116 CREATE VIRTUAL TABLE search_index USING fts5(
117 content,
118 kind UNINDEXED,
119 ref UNINDEXED,
120 anchor UNINDEXED,
121 ts UNINDEXED
122 );
123 CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT);
124 """,
125]
128def _now() -> str:
129 """Return the current UTC time as a Z-suffixed ISO 8601 string."""
130 return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
133def _current_version(conn: sqlite3.Connection) -> int:
134 """Return the applied schema version, or 0 if the meta table is absent."""
135 try:
136 row = conn.execute("SELECT value FROM meta WHERE key = 'schema_version'").fetchone()
137 except sqlite3.OperationalError:
138 return 0
139 return int(row[0]) if row else 0
142def _apply_migrations(conn: sqlite3.Connection) -> None:
143 """Apply every migration newer than the database's current version."""
144 version = _current_version(conn)
145 for index in range(version, len(_MIGRATIONS)):
146 conn.executescript(_MIGRATIONS[index])
147 conn.execute(
148 "INSERT INTO meta(key, value) VALUES('schema_version', ?) "
149 "ON CONFLICT(key) DO UPDATE SET value = excluded.value",
150 (str(index + 1),),
151 )
152 conn.commit()
155def ensure_db(path: Path | str = DEFAULT_DB_PATH) -> Path:
156 """Create the database at *path* (if needed) and apply pending migrations.
158 Idempotent: safe to call on every session start. The parent directory is
159 created if absent. Returns the resolved database path.
160 """
161 db_path = Path(path)
162 db_path.parent.mkdir(parents=True, exist_ok=True)
163 conn = sqlite3.connect(str(db_path))
164 try:
165 _apply_migrations(conn)
166 finally:
167 conn.close()
168 return db_path
171def connect(path: Path | str = DEFAULT_DB_PATH) -> sqlite3.Connection:
172 """Open a connection to the session database, ensuring the schema first.
174 Rows are returned as :class:`sqlite3.Row` so callers can index by name.
175 """
176 db_path = ensure_db(path)
177 conn = sqlite3.connect(str(db_path))
178 conn.row_factory = sqlite3.Row
179 return conn
182def _index(
183 conn: sqlite3.Connection,
184 *,
185 content: str,
186 kind: str,
187 ref: str,
188 anchor: str,
189 ts: str,
190) -> None:
191 """Insert one row into the FTS5 ``search_index`` table."""
192 conn.execute(
193 "INSERT INTO search_index(content, kind, ref, anchor, ts) VALUES(?, ?, ?, ?, ?)",
194 (content, kind, ref, anchor, ts),
195 )
198# ---------------------------------------------------------------------------
199# Query API
200# ---------------------------------------------------------------------------
203def search(
204 db: Path | str = DEFAULT_DB_PATH,
205 *,
206 query: str,
207 limit: int = 20,
208) -> list[dict[str, Any]]:
209 """Run an FTS5 full-text query, returning BM25-ranked results.
211 Each result dict carries ``content``, ``kind``, ``ref``, ``anchor`` (a
212 file:line-style pointer where available), ``ts`` and a numeric ``score``
213 (lower BM25 score = better match).
214 """
215 conn = connect(db)
216 try:
217 rows = conn.execute(
218 "SELECT content, kind, ref, anchor, ts, bm25(search_index) AS score "
219 "FROM search_index WHERE search_index MATCH ? "
220 "ORDER BY score LIMIT ?",
221 (query, limit),
222 ).fetchall()
223 except sqlite3.OperationalError as exc:
224 raise ValueError(f"invalid FTS query {query!r}: {exc}") from exc
225 finally:
226 conn.close()
227 return [dict(row) for row in rows]
230def recent(
231 db: Path | str = DEFAULT_DB_PATH,
232 *,
233 kind: str,
234 limit: int = 20,
235) -> list[dict[str, Any]]:
236 """Return the most recent rows for *kind* (tool, file, issue, loop, correction)."""
237 if kind not in _VALID_KINDS:
238 raise ValueError(f"unknown kind {kind!r}; expected one of {sorted(_VALID_KINDS)}")
239 table = _KIND_TABLE[kind]
240 conn = connect(db)
241 try:
242 rows = conn.execute(
243 f"SELECT * FROM {table} ORDER BY id DESC LIMIT ?", # noqa: S608 - table from fixed map
244 (limit,),
245 ).fetchall()
246 finally:
247 conn.close()
248 return [dict(row) for row in rows]
251# ---------------------------------------------------------------------------
252# SQLiteTransport
253# ---------------------------------------------------------------------------
256class SQLiteTransport:
257 """EventBus sink that records FSM loop events into the session database.
259 A single connection is opened at construction with ``check_same_thread``
260 disabled, since :meth:`send` may be called from the FSM thread while other
261 transports run their own threads; a lock serialises writes. Every
262 operation is best-effort — a database error is logged and swallowed so a
263 failing sink never aborts a loop run (the four ``wire_transports`` call
264 sites depend on this).
265 """
267 def __init__(self, db_path: Path | str = DEFAULT_DB_PATH) -> None:
268 self._path = Path(db_path)
269 self._lock = threading.Lock()
270 self._conn: sqlite3.Connection | None = None
271 try:
272 ensure_db(self._path)
273 self._conn = sqlite3.connect(str(self._path), check_same_thread=False)
274 except sqlite3.Error:
275 logger.warning(
276 "SQLiteTransport: could not open %s; sink disabled", self._path, exc_info=True
277 )
278 self._conn = None
280 def send(self, event: dict[str, Any]) -> None:
281 """Record a recognised FSM event as a ``loop_events`` row (best-effort)."""
282 conn = self._conn
283 if conn is None:
284 return
285 event_type = str(event.get("event", ""))
286 if event_type not in _LOOP_EVENT_TYPES:
287 return
288 loop_name = str(event.get("loop_name", "")) or None
289 state = event.get("state")
290 if event_type == "loop_complete":
291 state = event.get("outcome", state)
292 retries = event.get("retries")
293 ts = str(event.get("ts") or _now())
294 try:
295 with self._lock:
296 conn.execute(
297 "INSERT INTO loop_events(ts, loop_name, state, transition, retries) "
298 "VALUES(?, ?, ?, ?, ?)",
299 (
300 ts,
301 loop_name,
302 str(state) if state is not None else None,
303 event_type,
304 int(retries) if isinstance(retries, int) else None,
305 ),
306 )
307 _index(
308 conn,
309 content=" ".join(
310 str(p) for p in (loop_name, state, event_type) if p is not None
311 ),
312 kind="loop",
313 ref=loop_name or "",
314 anchor=f".loops/{loop_name}.yaml" if loop_name else "",
315 ts=ts,
316 )
317 conn.commit()
318 except sqlite3.Error:
319 logger.warning("SQLiteTransport: write failed for event %r", event_type, exc_info=True)
321 def close(self) -> None:
322 """Close the underlying connection (best-effort)."""
323 if self._conn is not None:
324 try:
325 self._conn.close()
326 except sqlite3.Error:
327 pass
328 self._conn = None
331# ---------------------------------------------------------------------------
332# Backfill
333# ---------------------------------------------------------------------------
336def _hash_args(value: Any) -> str:
337 """Return a short stable hash of a tool-call argument structure."""
338 try:
339 blob = json.dumps(value, sort_keys=True, default=str)
340 except (TypeError, ValueError):
341 blob = repr(value)
342 return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16]
345def _backfill_issues(conn: sqlite3.Connection, issues_dir: Path) -> int:
346 """Seed ``issue_events`` from issue-file frontmatter under *issues_dir*."""
347 from little_loops.frontmatter import parse_frontmatter
349 count = 0
350 for issue_file in sorted(issues_dir.rglob("*.md")):
351 try:
352 fm = parse_frontmatter(issue_file.read_text(encoding="utf-8"))
353 except OSError:
354 continue
355 issue_id = fm.get("id")
356 if not issue_id:
357 continue
358 status = str(fm.get("status", "open"))
359 discovered_by = fm.get("discovered_by")
360 ts = str(fm.get("completed_at") or fm.get("captured_at") or fm.get("discovered_date") or "")
361 conn.execute(
362 "INSERT INTO issue_events(ts, issue_id, transition, discovered_by) VALUES(?, ?, ?, ?)",
363 (ts, str(issue_id), status, str(discovered_by) if discovered_by else None),
364 )
365 _index(
366 conn,
367 content=f"{issue_id} {status} {fm.get('type', '')}",
368 kind="issue",
369 ref=str(issue_id),
370 anchor=str(issue_file),
371 ts=ts,
372 )
373 count += 1
374 return count
377def _backfill_loops(conn: sqlite3.Connection, loops_dir: Path) -> int:
378 """Seed ``loop_events`` from FSM state JSON under ``.loops/.running`` + ``.history``."""
379 count = 0
380 for sub in (".running", ".history"):
381 directory = loops_dir / sub
382 if not directory.is_dir():
383 continue
384 for state_file in sorted(directory.glob("*.json")):
385 try:
386 data = json.loads(state_file.read_text(encoding="utf-8"))
387 except (OSError, json.JSONDecodeError):
388 continue
389 if not isinstance(data, dict):
390 continue
391 loop_name = str(data.get("loop_name") or state_file.stem)
392 state = data.get("current_state") or data.get("state")
393 ts = str(data.get("updated_at") or data.get("started_at") or "")
394 conn.execute(
395 "INSERT INTO loop_events(ts, loop_name, state, transition, retries) "
396 "VALUES(?, ?, ?, ?, ?)",
397 (ts, loop_name, str(state) if state else None, "backfill", None),
398 )
399 _index(
400 conn,
401 content=f"{loop_name} {state or ''}",
402 kind="loop",
403 ref=loop_name,
404 anchor=str(state_file),
405 ts=ts,
406 )
407 count += 1
408 return count
411def _backfill_tool_events(conn: sqlite3.Connection, jsonl_files: list[Path]) -> int:
412 """Seed ``tool_events`` from assistant tool-use blocks in session JSONL files."""
413 count = 0
414 for jsonl_file in jsonl_files:
415 try:
416 handle = jsonl_file.open(encoding="utf-8")
417 except OSError:
418 continue
419 with handle:
420 for line in handle:
421 line = line.strip()
422 if not line:
423 continue
424 try:
425 record = json.loads(line)
426 except json.JSONDecodeError:
427 continue
428 if record.get("type") != "assistant":
429 continue
430 session_id = record.get("sessionId")
431 ts = str(record.get("timestamp") or "")
432 content = record.get("message", {}).get("content", [])
433 if not isinstance(content, list):
434 continue
435 for block in content:
436 if not isinstance(block, dict) or block.get("type") != "tool_use":
437 continue
438 tool_name = str(block.get("name", ""))
439 args = block.get("input", {})
440 conn.execute(
441 "INSERT INTO tool_events(ts, session_id, tool_name, args_hash, "
442 "result_size, bytes_in, bytes_out, cache_hit) "
443 "VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
444 (ts, session_id, tool_name, _hash_args(args), None, None, None, None),
445 )
446 _index(
447 conn,
448 content=tool_name,
449 kind="tool",
450 ref=tool_name,
451 anchor=str(jsonl_file),
452 ts=ts,
453 )
454 count += 1
455 return count
458def backfill(
459 db: Path | str = DEFAULT_DB_PATH,
460 *,
461 issues_dir: Path | None = None,
462 loops_dir: Path | None = None,
463 jsonl_files: list[Path] | None = None,
464) -> dict[str, int]:
465 """Populate the database from existing on-disk sources.
467 Reads issue-file frontmatter, FSM loop-state JSON, and (optionally) session
468 JSONL tool-use blocks. Returns a per-kind count of rows inserted. Sources
469 that are absent are skipped silently.
470 """
471 issues_dir = issues_dir if issues_dir is not None else Path(".issues")
472 loops_dir = loops_dir if loops_dir is not None else Path(".loops")
473 conn = connect(db)
474 counts: dict[str, int] = {"issues": 0, "loops": 0, "tools": 0}
475 try:
476 if issues_dir.is_dir():
477 counts["issues"] = _backfill_issues(conn, issues_dir)
478 if loops_dir.is_dir():
479 counts["loops"] = _backfill_loops(conn, loops_dir)
480 if jsonl_files:
481 counts["tools"] = _backfill_tool_events(conn, jsonl_files)
482 conn.commit()
483 finally:
484 conn.close()
485 return counts