Coverage for src \ truenex_memory \ ingestion \ parsers \ jsonl_sessions.py: 80%
275 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-19 10:21 +0200
1"""Parser for Codex/Claude-style agent JSONL session logs.
3Handles source_type=agent_session. Walks a directory for .jsonl files,
4extracts a human-readable digest per file:
6- Session metadata (model, timestamps)
7- User requests
8- Assistant final text responses
9- Compaction / summary entries
11Raw tool dumps, system/developer instructions, and intermediate API
12payloads are intentionally excluded to keep the index lean and private-safe.
13"""
15from __future__ import annotations
17from datetime import datetime, timezone
18from pathlib import Path
19import json
21from truenex_memory.ingestion.manifest import IngestionRecord
22from truenex_memory.ingestion.parsers import register
24SUPPORTED_SESSION_EXTENSIONS = {".jsonl"}
25MAX_EXCHANGE_CHARS = 600
28@register("agent_session")
29def parse_agent_sessions(
30 source_dir: Path,
31 project: str,
32 source_tool: str,
33 privacy_scope: str,
34) -> list[IngestionRecord]:
35 """Walk a directory for JSONL session logs and produce exchange records."""
36 records: list[IngestionRecord] = []
37 resolved = source_dir.resolve()
38 if not resolved.exists():
39 return records
41 if resolved.is_file():
42 candidates = [resolved]
43 else:
44 candidates = sorted(
45 p for p in resolved.rglob("*")
46 if p.is_file() and p.suffix.lower() in SUPPORTED_SESSION_EXTENSIONS
47 )
49 for file_path in candidates:
50 file_records = _parse_one_session(file_path, project, source_tool, privacy_scope, resolved)
51 records.extend(file_records)
52 return records
55def _parse_one_session(
56 file_path: Path,
57 project: str,
58 source_tool: str,
59 privacy_scope: str,
60 base_dir: Path,
61) -> list[IngestionRecord]:
62 mtime = _file_mtime_iso(file_path)
63 lines: list[dict[str, object]] = []
64 try:
65 raw = file_path.read_text(encoding="utf-8", errors="replace")
66 except OSError:
67 return []
69 for line_no, line in enumerate(raw.splitlines(), start=1):
70 stripped = line.strip()
71 if not stripped:
72 continue
73 try:
74 obj = json.loads(stripped)
75 except json.JSONDecodeError:
76 continue
77 if isinstance(obj, dict):
78 lines.append(obj)
80 if not lines:
81 return []
83 session_id = _extract_session_id(lines, file_path.stem)
84 created_at = _extract_created_at(lines) or mtime
86 exchanges = _build_exchanges(lines)
87 compactions = _extract_compactions(lines)
89 if not exchanges and not compactions:
90 return []
92 records = []
93 for idx, exchange_text in enumerate(exchanges):
94 records.append(IngestionRecord(
95 project=project,
96 source_type="agent_session",
97 source_path=str(file_path.resolve()),
98 source_tool=source_tool,
99 text=exchange_text,
100 session_id=session_id,
101 created_at=created_at,
102 last_modified=mtime,
103 privacy_scope=privacy_scope,
104 metadata={"session_line_count": len(lines), "exchange_index": idx},
105 ))
107 if compactions:
108 digest = "## Compaction Summaries\n\n" + "\n\n".join(compactions)
109 records.append(IngestionRecord(
110 project=project,
111 source_type="agent_session",
112 source_path=str(file_path.resolve()),
113 source_tool=source_tool,
114 text=digest,
115 session_id=session_id,
116 created_at=created_at,
117 last_modified=mtime,
118 privacy_scope=privacy_scope,
119 metadata={"session_line_count": len(lines), "exchange_index": len(exchanges), "is_compaction": True},
120 ))
122 return records
125def _build_exchanges(lines: list[dict[str, object]]) -> list[str]:
126 """Build per-exchange text chunks from session lines.
128 Groups: one user query + all subsequent assistant text until the next
129 non-tool user message.
130 """
131 exchanges: list[str] = []
132 current_user: str = ""
133 current_assistant_parts: list[str] = []
135 def flush() -> None:
136 if not current_user:
137 return
138 assistant_text = " ".join(current_assistant_parts).strip()
139 user_part = current_user[:MAX_EXCHANGE_CHARS]
140 remaining = MAX_EXCHANGE_CHARS - len(user_part)
141 if assistant_text and remaining > 20:
142 assistant_part = assistant_text[:remaining]
143 exchange = f"[User]: {user_part}\n[Assistant]: {assistant_part}"
144 else:
145 exchange = f"[User]: {user_part}"
146 exchanges.append(exchange)
148 for obj in _iter_message_objects(lines):
149 role = _resolve_role(obj)
150 if role == "user":
151 text = _extract_text(obj)
152 if not text or _is_noise_user_text(text):
153 continue
154 flush()
155 current_user = text.strip()
156 current_assistant_parts = []
157 elif role == "assistant":
158 text = _extract_text(obj)
159 if text:
160 current_assistant_parts.append(text.strip())
162 flush()
163 return exchanges
166def _extract_compactions(lines: list[dict[str, object]]) -> list[str]:
167 """Extract compaction / summary entries from all known formats."""
168 summaries: list[str] = []
169 for obj in lines:
170 msg_type = str(obj.get("type", "")).lower()
171 payload = obj.get("payload")
173 # Standard compaction entries
174 if isinstance(payload, dict):
175 payload_type = str(payload.get("type", "")).lower()
176 if msg_type in ("compacted", "compaction", "summary") or payload_type in (
177 "compacted",
178 "compaction",
179 "summary",
180 ):
181 text = _extract_text_from_content(payload.get("summary"))
182 if not text:
183 text = _extract_text_from_content(payload.get("content"))
184 if text:
185 summaries.append(text)
186 continue
188 if msg_type in ("compacted", "compaction", "summary"):
189 message = obj.get("message")
190 if isinstance(message, dict):
191 text = _extract_text_from_content(message.get("content"))
192 if text:
193 summaries.append(text)
194 summary = obj.get("summary")
195 if isinstance(summary, str) and summary.strip():
196 summaries.append(summary.strip())
198 # Codex turn_context.summary
199 if msg_type == "turn_context" and isinstance(payload, dict):
200 summary = payload.get("summary")
201 if isinstance(summary, str) and summary.strip():
202 summaries.append(summary.strip())
204 # Claude Code compaction injection: user message starting with
205 # "This session is being continued from a previous conversation..."
206 if msg_type == "user":
207 message = obj.get("message", {})
208 content = ""
209 if isinstance(message, dict):
210 content = message.get("content", "")
211 if isinstance(content, list):
212 for block in content:
213 if isinstance(block, dict) and block.get("type") == "text":
214 content = block.get("text", "")
215 break
216 if isinstance(content, str) and content.strip().startswith(
217 "This session is being continued"
218 ):
219 summaries.append(content.strip())
221 return summaries
224def _iter_message_objects(lines: list[dict[str, object]]):
225 """Yield normalized message-like objects from known session schemas."""
226 for obj in lines:
227 msg_type = str(obj.get("type", "")).lower()
228 payload = obj.get("payload")
229 if msg_type == "response_item" and isinstance(payload, dict):
230 if str(payload.get("type", "")).lower() == "message":
231 yield payload
232 continue
233 if msg_type == "event_msg" and isinstance(payload, dict):
234 pt = str(payload.get("type", "")).lower()
235 if pt == "user_message":
236 text = payload.get("message")
237 if isinstance(text, str):
238 yield {"role": "user", "content": text}
239 elif pt == "agent_message":
240 # Codex agent text response
241 text = payload.get("message")
242 if isinstance(text, str):
243 yield {"role": "assistant", "content": text}
244 continue
245 yield obj
248def _resolve_role(obj: dict[str, object]) -> str:
249 """Determine the role/type of a session line."""
250 # Codex-style: type field at top level
251 msg_type = str(obj.get("type", "")).lower()
252 if msg_type in ("user", "assistant", "system"):
253 return msg_type
255 # Claude API style: role in nested message
256 message = obj.get("message")
257 if isinstance(message, dict):
258 role = str(message.get("role", "")).lower()
259 if role in ("user", "assistant", "system"):
260 return role
261 # Some formats have type inside message
262 inner_type = str(message.get("type", "")).lower()
263 if inner_type in ("user", "assistant", "system"):
264 return inner_type
266 # Direct role field (OpenAI-style flattened)
267 role = str(obj.get("role", "")).lower()
268 if role in ("user", "assistant", "system"):
269 return role
271 return ""
274def _extract_text(obj: dict[str, object]) -> str:
275 """Extract human-readable text from a session line, skipping tool calls."""
277 # First try nested message block (Codex-style)
278 message = obj.get("message")
279 if isinstance(message, dict):
280 text = _extract_text_from_content(message.get("content"))
281 if text:
282 return text
283 # Simple string content
284 content = message.get("content")
285 if isinstance(content, str) and content.strip():
286 return content.strip()
288 # Direct content field (Claude API style)
289 text = _extract_text_from_content(obj.get("content"))
290 if text:
291 return text
293 # Top-level text/message string
294 for key in ("text", "message"):
295 value = obj.get(key)
296 if isinstance(value, str) and value.strip():
297 # Skip system/developer instructions
298 if str(obj.get("type", "")).lower() == "system":
299 return ""
300 if str(obj.get("role", "")).lower() == "system":
301 return ""
302 return value.strip()
304 return ""
307def _extract_text_from_content(content: object) -> str:
308 """Extract text from a content field that may be a string or list of blocks.
310 Skips tool_use / tool_call blocks - only returns human-readable text.
311 """
312 if isinstance(content, str):
313 return content.strip()
315 if isinstance(content, list):
316 text_parts: list[str] = []
317 for block in content:
318 if not isinstance(block, dict):
319 continue
320 block_type = str(block.get("type", "")).lower()
321 # Skip tool use/call blocks
322 if block_type in ("tool_use", "tool_call", "tool_result", "tool"):
323 continue
324 block_text = block.get("text")
325 if block_text is None:
326 block_text = block.get("content")
327 if isinstance(block_text, str) and block_text.strip():
328 text_parts.append(block_text.strip())
329 return "\n\n".join(text_parts)
331 return ""
334def _find_model(lines: list[dict[str, object]]) -> str:
335 """Try to find model info from session metadata."""
336 for obj in lines:
337 payload = obj.get("payload")
338 if isinstance(payload, dict):
339 model = payload.get("model")
340 if isinstance(model, str) and model.strip():
341 return model.strip()
342 if str(obj.get("type", "")).lower() != "system":
343 continue
344 message = obj.get("message")
345 if isinstance(message, dict):
346 model = message.get("model")
347 if isinstance(model, str) and model.strip():
348 return model.strip()
349 model = obj.get("model")
350 if isinstance(model, str) and model.strip():
351 return model.strip()
352 return ""
355def _extract_session_id(lines: list[dict[str, object]], fallback: str) -> str | None:
356 """Try to extract a session/conversation ID from metadata."""
357 for obj in lines:
358 payload = obj.get("payload")
359 if isinstance(payload, dict):
360 for key in ("session_id", "conversation_id", "id"):
361 value = payload.get(key)
362 if isinstance(value, str) and value.strip():
363 return value.strip()
364 for key in ("session_id", "conversation_id", "id"):
365 value = obj.get(key)
366 if isinstance(value, str) and value.strip():
367 return value.strip()
368 # Use filename stem as fallback identifier
369 if fallback:
370 return f"session:{fallback}"
371 return None
374def _extract_created_at(lines: list[dict[str, object]]) -> str | None:
375 """Try to extract a timestamp from session metadata."""
376 for obj in lines:
377 payload = obj.get("payload")
378 if isinstance(payload, dict):
379 for key in ("created_at", "timestamp", "created"):
380 value = payload.get(key)
381 if isinstance(value, str) and value.strip():
382 return value.strip()
383 ts = payload.get("timestamp")
384 if isinstance(ts, (int, float)):
385 iso = _numeric_timestamp_to_iso(ts)
386 if iso is not None:
387 return iso
388 for key in ("created_at", "timestamp", "created"):
389 value = obj.get(key)
390 if isinstance(value, str) and value.strip():
391 return value.strip()
392 # Some formats use numeric timestamps
393 ts = obj.get("timestamp")
394 if isinstance(ts, (int, float)):
395 iso = _numeric_timestamp_to_iso(ts)
396 if iso is not None:
397 return iso
398 return None
401def _numeric_timestamp_to_iso(value: int | float) -> str | None:
402 """Convert seconds or milliseconds since epoch to ISO, ignoring bad values."""
403 ts = float(value)
404 if abs(ts) >= 1_000_000_000_000:
405 ts = ts / 1000.0
406 try:
407 return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
408 except (OSError, OverflowError, ValueError):
409 return None
412def _file_mtime_iso(path: Path) -> str:
413 try:
414 stat = path.stat()
415 return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat()
416 except OSError:
417 return datetime.now(timezone.utc).isoformat()
420_NOISE_PREFIXES = (
421 "<environment_context>",
422 "<turn_aborted>",
423 "<INSTRUCTIONS>",
424 "<instructions>",
425 "<system>",
426 "<tool_result>",
427 "This session is being continued", # Claude Code compaction injection — handled separately
428)
429_MIN_EXCHANGE_TEXT_LEN = 3
432def _is_noise_user_text(text: str) -> bool:
433 stripped = text.strip()
434 if len(stripped) < _MIN_EXCHANGE_TEXT_LEN:
435 return True
436 return any(stripped.startswith(prefix) for prefix in _NOISE_PREFIXES)