Coverage for src \ truenex_memory \ ingestion \ parsers \ jsonl_sessions.py: 80%

275 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-19 10:21 +0200

1"""Parser for Codex/Claude-style agent JSONL session logs. 

2 

3Handles source_type=agent_session. Walks a directory for .jsonl files, 

4extracts a human-readable digest per file: 

5 

6- Session metadata (model, timestamps) 

7- User requests 

8- Assistant final text responses 

9- Compaction / summary entries 

10 

11Raw tool dumps, system/developer instructions, and intermediate API 

12payloads are intentionally excluded to keep the index lean and private-safe. 

13""" 

14 

15from __future__ import annotations 

16 

17from datetime import datetime, timezone 

18from pathlib import Path 

19import json 

20 

21from truenex_memory.ingestion.manifest import IngestionRecord 

22from truenex_memory.ingestion.parsers import register 

23 

24SUPPORTED_SESSION_EXTENSIONS = {".jsonl"} 

25MAX_EXCHANGE_CHARS = 600 

26 

27 

28@register("agent_session") 

29def parse_agent_sessions( 

30 source_dir: Path, 

31 project: str, 

32 source_tool: str, 

33 privacy_scope: str, 

34) -> list[IngestionRecord]: 

35 """Walk a directory for JSONL session logs and produce exchange records.""" 

36 records: list[IngestionRecord] = [] 

37 resolved = source_dir.resolve() 

38 if not resolved.exists(): 

39 return records 

40 

41 if resolved.is_file(): 

42 candidates = [resolved] 

43 else: 

44 candidates = sorted( 

45 p for p in resolved.rglob("*") 

46 if p.is_file() and p.suffix.lower() in SUPPORTED_SESSION_EXTENSIONS 

47 ) 

48 

49 for file_path in candidates: 

50 file_records = _parse_one_session(file_path, project, source_tool, privacy_scope, resolved) 

51 records.extend(file_records) 

52 return records 

53 

54 

55def _parse_one_session( 

56 file_path: Path, 

57 project: str, 

58 source_tool: str, 

59 privacy_scope: str, 

60 base_dir: Path, 

61) -> list[IngestionRecord]: 

62 mtime = _file_mtime_iso(file_path) 

63 lines: list[dict[str, object]] = [] 

64 try: 

65 raw = file_path.read_text(encoding="utf-8", errors="replace") 

66 except OSError: 

67 return [] 

68 

69 for line_no, line in enumerate(raw.splitlines(), start=1): 

70 stripped = line.strip() 

71 if not stripped: 

72 continue 

73 try: 

74 obj = json.loads(stripped) 

75 except json.JSONDecodeError: 

76 continue 

77 if isinstance(obj, dict): 

78 lines.append(obj) 

79 

80 if not lines: 

81 return [] 

82 

83 session_id = _extract_session_id(lines, file_path.stem) 

84 created_at = _extract_created_at(lines) or mtime 

85 

86 exchanges = _build_exchanges(lines) 

87 compactions = _extract_compactions(lines) 

88 

89 if not exchanges and not compactions: 

90 return [] 

91 

92 records = [] 

93 for idx, exchange_text in enumerate(exchanges): 

94 records.append(IngestionRecord( 

95 project=project, 

96 source_type="agent_session", 

97 source_path=str(file_path.resolve()), 

98 source_tool=source_tool, 

99 text=exchange_text, 

100 session_id=session_id, 

101 created_at=created_at, 

102 last_modified=mtime, 

103 privacy_scope=privacy_scope, 

104 metadata={"session_line_count": len(lines), "exchange_index": idx}, 

105 )) 

106 

107 if compactions: 

108 digest = "## Compaction Summaries\n\n" + "\n\n".join(compactions) 

109 records.append(IngestionRecord( 

110 project=project, 

111 source_type="agent_session", 

112 source_path=str(file_path.resolve()), 

113 source_tool=source_tool, 

114 text=digest, 

115 session_id=session_id, 

116 created_at=created_at, 

117 last_modified=mtime, 

118 privacy_scope=privacy_scope, 

119 metadata={"session_line_count": len(lines), "exchange_index": len(exchanges), "is_compaction": True}, 

120 )) 

121 

122 return records 

123 

124 

125def _build_exchanges(lines: list[dict[str, object]]) -> list[str]: 

126 """Build per-exchange text chunks from session lines. 

127 

128 Groups: one user query + all subsequent assistant text until the next 

129 non-tool user message. 

130 """ 

131 exchanges: list[str] = [] 

132 current_user: str = "" 

133 current_assistant_parts: list[str] = [] 

134 

135 def flush() -> None: 

136 if not current_user: 

137 return 

138 assistant_text = " ".join(current_assistant_parts).strip() 

139 user_part = current_user[:MAX_EXCHANGE_CHARS] 

140 remaining = MAX_EXCHANGE_CHARS - len(user_part) 

141 if assistant_text and remaining > 20: 

142 assistant_part = assistant_text[:remaining] 

143 exchange = f"[User]: {user_part}\n[Assistant]: {assistant_part}" 

144 else: 

145 exchange = f"[User]: {user_part}" 

146 exchanges.append(exchange) 

147 

148 for obj in _iter_message_objects(lines): 

149 role = _resolve_role(obj) 

150 if role == "user": 

151 text = _extract_text(obj) 

152 if not text or _is_noise_user_text(text): 

153 continue 

154 flush() 

155 current_user = text.strip() 

156 current_assistant_parts = [] 

157 elif role == "assistant": 

158 text = _extract_text(obj) 

159 if text: 

160 current_assistant_parts.append(text.strip()) 

161 

162 flush() 

163 return exchanges 

164 

165 

166def _extract_compactions(lines: list[dict[str, object]]) -> list[str]: 

167 """Extract compaction / summary entries from all known formats.""" 

168 summaries: list[str] = [] 

169 for obj in lines: 

170 msg_type = str(obj.get("type", "")).lower() 

171 payload = obj.get("payload") 

172 

173 # Standard compaction entries 

174 if isinstance(payload, dict): 

175 payload_type = str(payload.get("type", "")).lower() 

176 if msg_type in ("compacted", "compaction", "summary") or payload_type in ( 

177 "compacted", 

178 "compaction", 

179 "summary", 

180 ): 

181 text = _extract_text_from_content(payload.get("summary")) 

182 if not text: 

183 text = _extract_text_from_content(payload.get("content")) 

184 if text: 

185 summaries.append(text) 

186 continue 

187 

188 if msg_type in ("compacted", "compaction", "summary"): 

189 message = obj.get("message") 

190 if isinstance(message, dict): 

191 text = _extract_text_from_content(message.get("content")) 

192 if text: 

193 summaries.append(text) 

194 summary = obj.get("summary") 

195 if isinstance(summary, str) and summary.strip(): 

196 summaries.append(summary.strip()) 

197 

198 # Codex turn_context.summary 

199 if msg_type == "turn_context" and isinstance(payload, dict): 

200 summary = payload.get("summary") 

201 if isinstance(summary, str) and summary.strip(): 

202 summaries.append(summary.strip()) 

203 

204 # Claude Code compaction injection: user message starting with 

205 # "This session is being continued from a previous conversation..." 

206 if msg_type == "user": 

207 message = obj.get("message", {}) 

208 content = "" 

209 if isinstance(message, dict): 

210 content = message.get("content", "") 

211 if isinstance(content, list): 

212 for block in content: 

213 if isinstance(block, dict) and block.get("type") == "text": 

214 content = block.get("text", "") 

215 break 

216 if isinstance(content, str) and content.strip().startswith( 

217 "This session is being continued" 

218 ): 

219 summaries.append(content.strip()) 

220 

221 return summaries 

222 

223 

224def _iter_message_objects(lines: list[dict[str, object]]): 

225 """Yield normalized message-like objects from known session schemas.""" 

226 for obj in lines: 

227 msg_type = str(obj.get("type", "")).lower() 

228 payload = obj.get("payload") 

229 if msg_type == "response_item" and isinstance(payload, dict): 

230 if str(payload.get("type", "")).lower() == "message": 

231 yield payload 

232 continue 

233 if msg_type == "event_msg" and isinstance(payload, dict): 

234 pt = str(payload.get("type", "")).lower() 

235 if pt == "user_message": 

236 text = payload.get("message") 

237 if isinstance(text, str): 

238 yield {"role": "user", "content": text} 

239 elif pt == "agent_message": 

240 # Codex agent text response 

241 text = payload.get("message") 

242 if isinstance(text, str): 

243 yield {"role": "assistant", "content": text} 

244 continue 

245 yield obj 

246 

247 

248def _resolve_role(obj: dict[str, object]) -> str: 

249 """Determine the role/type of a session line.""" 

250 # Codex-style: type field at top level 

251 msg_type = str(obj.get("type", "")).lower() 

252 if msg_type in ("user", "assistant", "system"): 

253 return msg_type 

254 

255 # Claude API style: role in nested message 

256 message = obj.get("message") 

257 if isinstance(message, dict): 

258 role = str(message.get("role", "")).lower() 

259 if role in ("user", "assistant", "system"): 

260 return role 

261 # Some formats have type inside message 

262 inner_type = str(message.get("type", "")).lower() 

263 if inner_type in ("user", "assistant", "system"): 

264 return inner_type 

265 

266 # Direct role field (OpenAI-style flattened) 

267 role = str(obj.get("role", "")).lower() 

268 if role in ("user", "assistant", "system"): 

269 return role 

270 

271 return "" 

272 

273 

274def _extract_text(obj: dict[str, object]) -> str: 

275 """Extract human-readable text from a session line, skipping tool calls.""" 

276 

277 # First try nested message block (Codex-style) 

278 message = obj.get("message") 

279 if isinstance(message, dict): 

280 text = _extract_text_from_content(message.get("content")) 

281 if text: 

282 return text 

283 # Simple string content 

284 content = message.get("content") 

285 if isinstance(content, str) and content.strip(): 

286 return content.strip() 

287 

288 # Direct content field (Claude API style) 

289 text = _extract_text_from_content(obj.get("content")) 

290 if text: 

291 return text 

292 

293 # Top-level text/message string 

294 for key in ("text", "message"): 

295 value = obj.get(key) 

296 if isinstance(value, str) and value.strip(): 

297 # Skip system/developer instructions 

298 if str(obj.get("type", "")).lower() == "system": 

299 return "" 

300 if str(obj.get("role", "")).lower() == "system": 

301 return "" 

302 return value.strip() 

303 

304 return "" 

305 

306 

307def _extract_text_from_content(content: object) -> str: 

308 """Extract text from a content field that may be a string or list of blocks. 

309 

310 Skips tool_use / tool_call blocks - only returns human-readable text. 

311 """ 

312 if isinstance(content, str): 

313 return content.strip() 

314 

315 if isinstance(content, list): 

316 text_parts: list[str] = [] 

317 for block in content: 

318 if not isinstance(block, dict): 

319 continue 

320 block_type = str(block.get("type", "")).lower() 

321 # Skip tool use/call blocks 

322 if block_type in ("tool_use", "tool_call", "tool_result", "tool"): 

323 continue 

324 block_text = block.get("text") 

325 if block_text is None: 

326 block_text = block.get("content") 

327 if isinstance(block_text, str) and block_text.strip(): 

328 text_parts.append(block_text.strip()) 

329 return "\n\n".join(text_parts) 

330 

331 return "" 

332 

333 

334def _find_model(lines: list[dict[str, object]]) -> str: 

335 """Try to find model info from session metadata.""" 

336 for obj in lines: 

337 payload = obj.get("payload") 

338 if isinstance(payload, dict): 

339 model = payload.get("model") 

340 if isinstance(model, str) and model.strip(): 

341 return model.strip() 

342 if str(obj.get("type", "")).lower() != "system": 

343 continue 

344 message = obj.get("message") 

345 if isinstance(message, dict): 

346 model = message.get("model") 

347 if isinstance(model, str) and model.strip(): 

348 return model.strip() 

349 model = obj.get("model") 

350 if isinstance(model, str) and model.strip(): 

351 return model.strip() 

352 return "" 

353 

354 

355def _extract_session_id(lines: list[dict[str, object]], fallback: str) -> str | None: 

356 """Try to extract a session/conversation ID from metadata.""" 

357 for obj in lines: 

358 payload = obj.get("payload") 

359 if isinstance(payload, dict): 

360 for key in ("session_id", "conversation_id", "id"): 

361 value = payload.get(key) 

362 if isinstance(value, str) and value.strip(): 

363 return value.strip() 

364 for key in ("session_id", "conversation_id", "id"): 

365 value = obj.get(key) 

366 if isinstance(value, str) and value.strip(): 

367 return value.strip() 

368 # Use filename stem as fallback identifier 

369 if fallback: 

370 return f"session:{fallback}" 

371 return None 

372 

373 

374def _extract_created_at(lines: list[dict[str, object]]) -> str | None: 

375 """Try to extract a timestamp from session metadata.""" 

376 for obj in lines: 

377 payload = obj.get("payload") 

378 if isinstance(payload, dict): 

379 for key in ("created_at", "timestamp", "created"): 

380 value = payload.get(key) 

381 if isinstance(value, str) and value.strip(): 

382 return value.strip() 

383 ts = payload.get("timestamp") 

384 if isinstance(ts, (int, float)): 

385 iso = _numeric_timestamp_to_iso(ts) 

386 if iso is not None: 

387 return iso 

388 for key in ("created_at", "timestamp", "created"): 

389 value = obj.get(key) 

390 if isinstance(value, str) and value.strip(): 

391 return value.strip() 

392 # Some formats use numeric timestamps 

393 ts = obj.get("timestamp") 

394 if isinstance(ts, (int, float)): 

395 iso = _numeric_timestamp_to_iso(ts) 

396 if iso is not None: 

397 return iso 

398 return None 

399 

400 

401def _numeric_timestamp_to_iso(value: int | float) -> str | None: 

402 """Convert seconds or milliseconds since epoch to ISO, ignoring bad values.""" 

403 ts = float(value) 

404 if abs(ts) >= 1_000_000_000_000: 

405 ts = ts / 1000.0 

406 try: 

407 return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() 

408 except (OSError, OverflowError, ValueError): 

409 return None 

410 

411 

412def _file_mtime_iso(path: Path) -> str: 

413 try: 

414 stat = path.stat() 

415 return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat() 

416 except OSError: 

417 return datetime.now(timezone.utc).isoformat() 

418 

419 

420_NOISE_PREFIXES = ( 

421 "<environment_context>", 

422 "<turn_aborted>", 

423 "<INSTRUCTIONS>", 

424 "<instructions>", 

425 "<system>", 

426 "<tool_result>", 

427 "This session is being continued", # Claude Code compaction injection — handled separately 

428) 

429_MIN_EXCHANGE_TEXT_LEN = 3 

430 

431 

432def _is_noise_user_text(text: str) -> bool: 

433 stripped = text.strip() 

434 if len(stripped) < _MIN_EXCHANGE_TEXT_LEN: 

435 return True 

436 return any(stripped.startswith(prefix) for prefix in _NOISE_PREFIXES)