Coverage for little_loops / session_store.py: 78%

191 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Unified session store: a per-project SQLite + FTS5 database (FEAT-1112). 

2 

3A single ``.ll/session.db`` indexes tool events, file modifications, issue 

4transitions, loop runs, and user corrections so cross-cutting queries 

5("which loops failed on issues touching file X?") can be answered in 

6milliseconds rather than re-parsing scattered JSON/markdown sources. 

7 

8The store is purely additive: it never replaces an existing data path. The 

9``SQLiteTransport`` sink subscribes to the EventBus alongside the other 

10transports, and the backfill routine seeds the database from on-disk sources 

11that the analyze-* skills already read. 

12 

13Public API: 

14 DEFAULT_DB_PATH: default database location (``.ll/session.db``) 

15 SCHEMA_VERSION: current schema version integer 

16 ensure_db(path): create the database and apply pending migrations 

17 connect(path): open a connection (ensures schema first) 

18 SQLiteTransport: EventBus Transport sink writing FSM events to loop_events 

19 backfill(db,...): populate the database from existing on-disk sources 

20 search(db,...): FTS5 full-text query with BM25 ranking 

21 recent(db,...): recent rows for a given event kind 

22""" 

23 

24from __future__ import annotations 

25 

26import hashlib 

27import json 

28import logging 

29import sqlite3 

30import threading 

31from datetime import UTC, datetime 

32from pathlib import Path 

33from typing import Any 

34 

35logger = logging.getLogger(__name__) 

36 

37DEFAULT_DB_PATH = Path(".ll/session.db") 

38SCHEMA_VERSION = 1 

39 

40_VALID_KINDS = frozenset({"tool", "file", "issue", "loop", "correction"}) 

41_KIND_TABLE = { 

42 "tool": "tool_events", 

43 "file": "file_events", 

44 "issue": "issue_events", 

45 "loop": "loop_events", 

46 "correction": "user_corrections", 

47} 

48 

49# FSM event types the SQLiteTransport records as loop_events rows. 

50_LOOP_EVENT_TYPES = frozenset( 

51 { 

52 "loop_start", 

53 "loop_resume", 

54 "loop_complete", 

55 "state_enter", 

56 "route", 

57 "retry_exhausted", 

58 "cycle_detected", 

59 } 

60) 

61 

62 

63# --------------------------------------------------------------------------- 

64# Schema + migrations 

65# --------------------------------------------------------------------------- 

66 

67# Each entry is the full SQL applied to move the schema from version index to 

68# index+1. Migration 0 bootstraps the whole schema; append new entries to 

69# evolve it. ``bytes_in`` / ``bytes_out`` / ``cache_hit`` are reserved on 

70# ``tool_events`` for FEAT-1160 (Context Window Analytics) so that feature does 

71# not require a follow-up migration. 

72_MIGRATIONS: list[str] = [ 

73 """ 

74 CREATE TABLE tool_events ( 

75 id INTEGER PRIMARY KEY AUTOINCREMENT, 

76 ts TEXT NOT NULL, 

77 session_id TEXT, 

78 tool_name TEXT, 

79 args_hash TEXT, 

80 result_size INTEGER, 

81 bytes_in INTEGER, 

82 bytes_out INTEGER, 

83 cache_hit INTEGER 

84 ); 

85 CREATE TABLE file_events ( 

86 id INTEGER PRIMARY KEY AUTOINCREMENT, 

87 ts TEXT NOT NULL, 

88 session_id TEXT, 

89 path TEXT, 

90 op TEXT, 

91 issue_id TEXT, 

92 git_sha TEXT 

93 ); 

94 CREATE TABLE issue_events ( 

95 id INTEGER PRIMARY KEY AUTOINCREMENT, 

96 ts TEXT NOT NULL, 

97 issue_id TEXT, 

98 transition TEXT, 

99 discovered_by TEXT 

100 ); 

101 CREATE TABLE loop_events ( 

102 id INTEGER PRIMARY KEY AUTOINCREMENT, 

103 ts TEXT NOT NULL, 

104 loop_name TEXT, 

105 state TEXT, 

106 transition TEXT, 

107 retries INTEGER 

108 ); 

109 CREATE TABLE user_corrections ( 

110 id INTEGER PRIMARY KEY AUTOINCREMENT, 

111 ts TEXT NOT NULL, 

112 session_id TEXT, 

113 content TEXT, 

114 source TEXT 

115 ); 

116 CREATE VIRTUAL TABLE search_index USING fts5( 

117 content, 

118 kind UNINDEXED, 

119 ref UNINDEXED, 

120 anchor UNINDEXED, 

121 ts UNINDEXED 

122 ); 

123 CREATE TABLE meta (key TEXT PRIMARY KEY, value TEXT); 

124 """, 

125] 

126 

127 

128def _now() -> str: 

129 """Return the current UTC time as a Z-suffixed ISO 8601 string.""" 

130 return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") 

131 

132 

133def _current_version(conn: sqlite3.Connection) -> int: 

134 """Return the applied schema version, or 0 if the meta table is absent.""" 

135 try: 

136 row = conn.execute("SELECT value FROM meta WHERE key = 'schema_version'").fetchone() 

137 except sqlite3.OperationalError: 

138 return 0 

139 return int(row[0]) if row else 0 

140 

141 

142def _apply_migrations(conn: sqlite3.Connection) -> None: 

143 """Apply every migration newer than the database's current version.""" 

144 version = _current_version(conn) 

145 for index in range(version, len(_MIGRATIONS)): 

146 conn.executescript(_MIGRATIONS[index]) 

147 conn.execute( 

148 "INSERT INTO meta(key, value) VALUES('schema_version', ?) " 

149 "ON CONFLICT(key) DO UPDATE SET value = excluded.value", 

150 (str(index + 1),), 

151 ) 

152 conn.commit() 

153 

154 

155def ensure_db(path: Path | str = DEFAULT_DB_PATH) -> Path: 

156 """Create the database at *path* (if needed) and apply pending migrations. 

157 

158 Idempotent: safe to call on every session start. The parent directory is 

159 created if absent. Returns the resolved database path. 

160 """ 

161 db_path = Path(path) 

162 db_path.parent.mkdir(parents=True, exist_ok=True) 

163 conn = sqlite3.connect(str(db_path)) 

164 try: 

165 _apply_migrations(conn) 

166 finally: 

167 conn.close() 

168 return db_path 

169 

170 

171def connect(path: Path | str = DEFAULT_DB_PATH) -> sqlite3.Connection: 

172 """Open a connection to the session database, ensuring the schema first. 

173 

174 Rows are returned as :class:`sqlite3.Row` so callers can index by name. 

175 """ 

176 db_path = ensure_db(path) 

177 conn = sqlite3.connect(str(db_path)) 

178 conn.row_factory = sqlite3.Row 

179 return conn 

180 

181 

182def _index( 

183 conn: sqlite3.Connection, 

184 *, 

185 content: str, 

186 kind: str, 

187 ref: str, 

188 anchor: str, 

189 ts: str, 

190) -> None: 

191 """Insert one row into the FTS5 ``search_index`` table.""" 

192 conn.execute( 

193 "INSERT INTO search_index(content, kind, ref, anchor, ts) VALUES(?, ?, ?, ?, ?)", 

194 (content, kind, ref, anchor, ts), 

195 ) 

196 

197 

198# --------------------------------------------------------------------------- 

199# Query API 

200# --------------------------------------------------------------------------- 

201 

202 

203def search( 

204 db: Path | str = DEFAULT_DB_PATH, 

205 *, 

206 query: str, 

207 limit: int = 20, 

208) -> list[dict[str, Any]]: 

209 """Run an FTS5 full-text query, returning BM25-ranked results. 

210 

211 Each result dict carries ``content``, ``kind``, ``ref``, ``anchor`` (a 

212 file:line-style pointer where available), ``ts`` and a numeric ``score`` 

213 (lower BM25 score = better match). 

214 """ 

215 conn = connect(db) 

216 try: 

217 rows = conn.execute( 

218 "SELECT content, kind, ref, anchor, ts, bm25(search_index) AS score " 

219 "FROM search_index WHERE search_index MATCH ? " 

220 "ORDER BY score LIMIT ?", 

221 (query, limit), 

222 ).fetchall() 

223 except sqlite3.OperationalError as exc: 

224 raise ValueError(f"invalid FTS query {query!r}: {exc}") from exc 

225 finally: 

226 conn.close() 

227 return [dict(row) for row in rows] 

228 

229 

230def recent( 

231 db: Path | str = DEFAULT_DB_PATH, 

232 *, 

233 kind: str, 

234 limit: int = 20, 

235) -> list[dict[str, Any]]: 

236 """Return the most recent rows for *kind* (tool, file, issue, loop, correction).""" 

237 if kind not in _VALID_KINDS: 

238 raise ValueError(f"unknown kind {kind!r}; expected one of {sorted(_VALID_KINDS)}") 

239 table = _KIND_TABLE[kind] 

240 conn = connect(db) 

241 try: 

242 rows = conn.execute( 

243 f"SELECT * FROM {table} ORDER BY id DESC LIMIT ?", # noqa: S608 - table from fixed map 

244 (limit,), 

245 ).fetchall() 

246 finally: 

247 conn.close() 

248 return [dict(row) for row in rows] 

249 

250 

251# --------------------------------------------------------------------------- 

252# SQLiteTransport 

253# --------------------------------------------------------------------------- 

254 

255 

256class SQLiteTransport: 

257 """EventBus sink that records FSM loop events into the session database. 

258 

259 A single connection is opened at construction with ``check_same_thread`` 

260 disabled, since :meth:`send` may be called from the FSM thread while other 

261 transports run their own threads; a lock serialises writes. Every 

262 operation is best-effort — a database error is logged and swallowed so a 

263 failing sink never aborts a loop run (the four ``wire_transports`` call 

264 sites depend on this). 

265 """ 

266 

267 def __init__(self, db_path: Path | str = DEFAULT_DB_PATH) -> None: 

268 self._path = Path(db_path) 

269 self._lock = threading.Lock() 

270 self._conn: sqlite3.Connection | None = None 

271 try: 

272 ensure_db(self._path) 

273 self._conn = sqlite3.connect(str(self._path), check_same_thread=False) 

274 except sqlite3.Error: 

275 logger.warning( 

276 "SQLiteTransport: could not open %s; sink disabled", self._path, exc_info=True 

277 ) 

278 self._conn = None 

279 

280 def send(self, event: dict[str, Any]) -> None: 

281 """Record a recognised FSM event as a ``loop_events`` row (best-effort).""" 

282 conn = self._conn 

283 if conn is None: 

284 return 

285 event_type = str(event.get("event", "")) 

286 if event_type not in _LOOP_EVENT_TYPES: 

287 return 

288 loop_name = str(event.get("loop_name", "")) or None 

289 state = event.get("state") 

290 if event_type == "loop_complete": 

291 state = event.get("outcome", state) 

292 retries = event.get("retries") 

293 ts = str(event.get("ts") or _now()) 

294 try: 

295 with self._lock: 

296 conn.execute( 

297 "INSERT INTO loop_events(ts, loop_name, state, transition, retries) " 

298 "VALUES(?, ?, ?, ?, ?)", 

299 ( 

300 ts, 

301 loop_name, 

302 str(state) if state is not None else None, 

303 event_type, 

304 int(retries) if isinstance(retries, int) else None, 

305 ), 

306 ) 

307 _index( 

308 conn, 

309 content=" ".join( 

310 str(p) for p in (loop_name, state, event_type) if p is not None 

311 ), 

312 kind="loop", 

313 ref=loop_name or "", 

314 anchor=f".loops/{loop_name}.yaml" if loop_name else "", 

315 ts=ts, 

316 ) 

317 conn.commit() 

318 except sqlite3.Error: 

319 logger.warning("SQLiteTransport: write failed for event %r", event_type, exc_info=True) 

320 

321 def close(self) -> None: 

322 """Close the underlying connection (best-effort).""" 

323 if self._conn is not None: 

324 try: 

325 self._conn.close() 

326 except sqlite3.Error: 

327 pass 

328 self._conn = None 

329 

330 

331# --------------------------------------------------------------------------- 

332# Backfill 

333# --------------------------------------------------------------------------- 

334 

335 

336def _hash_args(value: Any) -> str: 

337 """Return a short stable hash of a tool-call argument structure.""" 

338 try: 

339 blob = json.dumps(value, sort_keys=True, default=str) 

340 except (TypeError, ValueError): 

341 blob = repr(value) 

342 return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] 

343 

344 

345def _backfill_issues(conn: sqlite3.Connection, issues_dir: Path) -> int: 

346 """Seed ``issue_events`` from issue-file frontmatter under *issues_dir*.""" 

347 from little_loops.frontmatter import parse_frontmatter 

348 

349 count = 0 

350 for issue_file in sorted(issues_dir.rglob("*.md")): 

351 try: 

352 fm = parse_frontmatter(issue_file.read_text(encoding="utf-8")) 

353 except OSError: 

354 continue 

355 issue_id = fm.get("id") 

356 if not issue_id: 

357 continue 

358 status = str(fm.get("status", "open")) 

359 discovered_by = fm.get("discovered_by") 

360 ts = str(fm.get("completed_at") or fm.get("captured_at") or fm.get("discovered_date") or "") 

361 conn.execute( 

362 "INSERT INTO issue_events(ts, issue_id, transition, discovered_by) VALUES(?, ?, ?, ?)", 

363 (ts, str(issue_id), status, str(discovered_by) if discovered_by else None), 

364 ) 

365 _index( 

366 conn, 

367 content=f"{issue_id} {status} {fm.get('type', '')}", 

368 kind="issue", 

369 ref=str(issue_id), 

370 anchor=str(issue_file), 

371 ts=ts, 

372 ) 

373 count += 1 

374 return count 

375 

376 

377def _backfill_loops(conn: sqlite3.Connection, loops_dir: Path) -> int: 

378 """Seed ``loop_events`` from FSM state JSON under ``.loops/.running`` + ``.history``.""" 

379 count = 0 

380 for sub in (".running", ".history"): 

381 directory = loops_dir / sub 

382 if not directory.is_dir(): 

383 continue 

384 for state_file in sorted(directory.glob("*.json")): 

385 try: 

386 data = json.loads(state_file.read_text(encoding="utf-8")) 

387 except (OSError, json.JSONDecodeError): 

388 continue 

389 if not isinstance(data, dict): 

390 continue 

391 loop_name = str(data.get("loop_name") or state_file.stem) 

392 state = data.get("current_state") or data.get("state") 

393 ts = str(data.get("updated_at") or data.get("started_at") or "") 

394 conn.execute( 

395 "INSERT INTO loop_events(ts, loop_name, state, transition, retries) " 

396 "VALUES(?, ?, ?, ?, ?)", 

397 (ts, loop_name, str(state) if state else None, "backfill", None), 

398 ) 

399 _index( 

400 conn, 

401 content=f"{loop_name} {state or ''}", 

402 kind="loop", 

403 ref=loop_name, 

404 anchor=str(state_file), 

405 ts=ts, 

406 ) 

407 count += 1 

408 return count 

409 

410 

411def _backfill_tool_events(conn: sqlite3.Connection, jsonl_files: list[Path]) -> int: 

412 """Seed ``tool_events`` from assistant tool-use blocks in session JSONL files.""" 

413 count = 0 

414 for jsonl_file in jsonl_files: 

415 try: 

416 handle = jsonl_file.open(encoding="utf-8") 

417 except OSError: 

418 continue 

419 with handle: 

420 for line in handle: 

421 line = line.strip() 

422 if not line: 

423 continue 

424 try: 

425 record = json.loads(line) 

426 except json.JSONDecodeError: 

427 continue 

428 if record.get("type") != "assistant": 

429 continue 

430 session_id = record.get("sessionId") 

431 ts = str(record.get("timestamp") or "") 

432 content = record.get("message", {}).get("content", []) 

433 if not isinstance(content, list): 

434 continue 

435 for block in content: 

436 if not isinstance(block, dict) or block.get("type") != "tool_use": 

437 continue 

438 tool_name = str(block.get("name", "")) 

439 args = block.get("input", {}) 

440 conn.execute( 

441 "INSERT INTO tool_events(ts, session_id, tool_name, args_hash, " 

442 "result_size, bytes_in, bytes_out, cache_hit) " 

443 "VALUES(?, ?, ?, ?, ?, ?, ?, ?)", 

444 (ts, session_id, tool_name, _hash_args(args), None, None, None, None), 

445 ) 

446 _index( 

447 conn, 

448 content=tool_name, 

449 kind="tool", 

450 ref=tool_name, 

451 anchor=str(jsonl_file), 

452 ts=ts, 

453 ) 

454 count += 1 

455 return count 

456 

457 

458def backfill( 

459 db: Path | str = DEFAULT_DB_PATH, 

460 *, 

461 issues_dir: Path | None = None, 

462 loops_dir: Path | None = None, 

463 jsonl_files: list[Path] | None = None, 

464) -> dict[str, int]: 

465 """Populate the database from existing on-disk sources. 

466 

467 Reads issue-file frontmatter, FSM loop-state JSON, and (optionally) session 

468 JSONL tool-use blocks. Returns a per-kind count of rows inserted. Sources 

469 that are absent are skipped silently. 

470 """ 

471 issues_dir = issues_dir if issues_dir is not None else Path(".issues") 

472 loops_dir = loops_dir if loops_dir is not None else Path(".loops") 

473 conn = connect(db) 

474 counts: dict[str, int] = {"issues": 0, "loops": 0, "tools": 0} 

475 try: 

476 if issues_dir.is_dir(): 

477 counts["issues"] = _backfill_issues(conn, issues_dir) 

478 if loops_dir.is_dir(): 

479 counts["loops"] = _backfill_loops(conn, loops_dir) 

480 if jsonl_files: 

481 counts["tools"] = _backfill_tool_events(conn, jsonl_files) 

482 conn.commit() 

483 finally: 

484 conn.close() 

485 return counts