heaven_base.memory.history
1from __future__ import annotations 2import sys 3import uuid 4import json 5import os 6import re 7from datetime import datetime 8from typing import List, Optional, Dict, Any, Union 9from langchain_core.messages import ( 10 BaseMessage, 11 SystemMessage, 12 HumanMessage, 13 AIMessage, 14 ToolMessage 15) 16from pydantic import BaseModel, Field 17from .base_piece import BasePiece 18from ..utils.name_utils import normalize_agent_name 19from ..utils.get_env_value import EnvConfigUtil 20 21 22# Stubs replacing google.adk/genai imports to avoid expensive litellm initialization. 23# ADKEvent/ADKSession are only used in isinstance() checks (always False for Anthropic) 24# and model_validate() calls behind if-guards that never fire without ADK data. 25# Dead imports removed: BaseSessionService, DatabaseSessionService, EventActions, adk_types. 26# See: Litellm_Heaven_Cpu_Analysis_Feb18 in CartON. 27# To restore ADK support, replace these stubs with the original google.adk imports. 28class ADKEvent(BaseModel): 29 """Stub for google.adk.events.event.Event — passes Pydantic schema generation 30 and isinstance checks. No real ADKEvents exist in Anthropic-only histories.""" 31 model_config = {"arbitrary_types_allowed": True} 32 33class ADKSession(BaseModel): 34 """Stub for google.adk.sessions.session.Session""" 35 model_config = {"arbitrary_types_allowed": True} 36 37class AgentStatus(BaseModel): 38 goal: Optional[str] = None 39 task_list: List[str] = Field(default_factory=list) 40 current_task: Optional[str] = None 41 completed: bool = False 42 extracted_content: Dict[str, str] = Field(default_factory=dict) 43 44 45 46class History(BasePiece): 47 """Single conversation with metadata, backed by an ADK Session.""" 48 # Legacy fallback messages 49 messages: List[Union[BaseMessage, ADKEvent]] 50 history_id: Optional[str] = None 51 metadata: Dict[str, Any] = {} 52 agent_status: Optional[AgentStatus] = None 53 json_md_path: Optional[str] = None 54 55 # The single source of truth for ADK-based histories 56 adk_session: Optional[ADKSession] = None 57 58 @property 59 def events(self) -> List[ADKEvent]: 60 """All ADK events in this history (or fallback to any stored `messages`).""" 61 if self.adk_session: 62 return self.adk_session.events 63 return [m for m in self.messages if isinstance(m, ADKEvent)] 64 65 def to_markdown(self) -> str: 66 """Convert to markdown with metadata and a human-readable dump of ADK events.""" 67 md_parts: List[str] = [] 68 md_parts.append("===[METADATA]===") 69 md_parts.append(f"datetime: {self.created_datetime.isoformat()}") 70 md_parts.append(f"history_id: {self.history_id}") 71 md_parts.append(f"json_md_path: {self.json_md_path}") 72 for k, v in self.metadata.items(): 73 md_parts.append(f"{k}: {v}") 74 md_parts.append("===[CONTENT]===\n") 75 76 # If we have an ADK session, render its events 77 for ev in self.events: 78 md_parts.append(f"--- [ADK Event] id={ev.id}, author={ev.author}, timestamp={datetime.fromtimestamp(ev.timestamp)}") 79 # Render any function calls/responses/text 80 for part in getattr(ev.content, "parts", []): 81 if getattr(part, "function_call", None): 82 fc = part.function_call 83 md_parts.append(f"*Function Call*: {fc.name}({fc.args})") 84 if getattr(part, "function_response", None): 85 fr = part.function_response 86 md_parts.append(f"*Function Response*: {fr.response}") 87 if getattr(part, "text", None): 88 md_parts.append(f"{part.text}") 89 md_parts.append("") # blank line between events 90 91 # Fallback: if no ADK session, render legacy BaseMessages 92 if not self.adk_session: 93 for msg in self.messages: 94 md_parts.append("===[MESSAGE]===") 95 if isinstance(msg, SystemMessage): 96 md_parts.append(f"**System**: {msg.content}") 97 elif isinstance(msg, HumanMessage): 98 md_parts.append(f"**Human**: {msg.content}") 99 elif isinstance(msg, AIMessage): 100 md_parts.append(f"**AI**: {msg.content}") 101 elif isinstance(msg, ToolMessage): 102 md_parts.append(f"**Tool** (id: {msg.tool_call_id})\n```{msg.content}```") 103 104 return "\n".join(md_parts) 105 106 def to_json(self) -> dict: 107 """Convert to JSON format.""" 108 def _make_jsonable(x): 109 if isinstance(x, dict): 110 return {k: _make_jsonable(v) for k, v in x.items()} 111 if isinstance(x, list): 112 return [_make_jsonable(v) for v in x] 113 if isinstance(x, set): 114 return [_make_jsonable(v) for v in x] 115 return x 116 117 msgs: list[dict[str, Any]] = [] 118 for msg in self.messages: 119 if isinstance(msg, ADKEvent): 120 raw_evt = msg.model_dump() 121 msgs.append({"adk_event": _make_jsonable(raw_evt)}) 122 else: 123 msgs.append({ 124 "type": type(msg).__name__, 125 "content": msg.content, 126 "tool_call_id": getattr(msg, "tool_call_id", None), 127 "additional_kwargs": getattr(msg, "additional_kwargs", None), 128 "tool_calls": getattr(msg, "tool_calls", None), 129 }) 130 131 out: dict[str, Any] = { 132 "history_id": self.history_id, 133 "created_datetime": self.created_datetime.isoformat(), 134 "metadata": self.metadata, 135 "messages": msgs, 136 "agent_status": self.agent_status.dict() if self.agent_status else None, 137 "json_md_path": self.json_md_path, 138 } 139 140 if self.adk_session: 141 raw_sess = self.adk_session.model_dump() 142 out["adk_session"] = _make_jsonable(raw_sess) 143 144 return out 145 146 147 @classmethod 148 def _load_history_file(cls, history_id: str) -> History: 149 """Load a saved history + ADK session from disk.""" 150 # Use HEAVEN_DATA_DIR for loading agent histories 151 base_path = os.path.join(EnvConfigUtil.get_heaven_data_dir(), "agents") 152 date_str = "_".join(history_id.split("_")[:3]) 153 154 for agent_dir in os.listdir(base_path): 155 fn = os.path.join(base_path, agent_dir, "memories", "histories", date_str, f"{history_id}.json") 156 if os.path.exists(fn): 157 with open(fn, "r") as f: 158 data = json.load(f) 159 hist = cls.from_json(data) 160 hist.json_md_path = os.path.dirname(fn) 161 # restore ADK session 162 if data.get("adk_session"): 163 hist.adk_session = ADKSession.model_validate(data["adk_session"]) 164 return hist 165 166 raise FileNotFoundError(f"No history file found for ID {history_id}") 167 168 @classmethod 169 def load_from_id(cls, history_id: str) -> History: 170 """Continue an existing history (or start fresh if None).""" 171 if history_id is None: 172 return cls(messages=[], history_id=str(uuid.uuid4())) 173 174 orig = cls._load_history_file(history_id) 175 # bump continuation suffix 176 if "_continued_" in history_id: 177 m = re.search(r"_continued_(\d+)$", history_id) 178 num = int(m.group(1)) + 1 if m else 1 179 new_id = history_id.split("_continued_")[0] + f"_continued_{num}" 180 else: 181 new_id = f"{history_id}_continued_1" 182 183 return History( 184 messages=orig.messages.copy(), 185 adk_session=orig.adk_session, 186 created_datetime=datetime.now(), 187 metadata=orig.metadata.copy(), 188 agent_status=orig.agent_status, 189 history_id=new_id, 190 json_md_path=orig.json_md_path, 191 ) 192 193 # def save(self, agent_name: str) -> str: 194 # """Persist both .json and .md to disk.""" 195 # base_dir = os.path.dirname(os.path.abspath(__file__)) 196 # base_path = os.path.join(os.path.dirname(base_dir), "agents") 197 # nm = normalize_agent_name(agent_name) 198 # now = datetime.now() 199 # date_dir = now.strftime("%Y_%m_%d") 200 # dt_str = now.strftime("%Y_%m_%d_%H_%M_%S") 201 # hist_id = f"{dt_str}_{nm}" 202 # path = os.path.join(base_path, nm, "memories", "histories", date_dir) 203 # os.makedirs(path, exist_ok=True) 204 205 # existing = [f for f in os.listdir(path) if f.startswith(hist_id)] 206 # if existing: 207 # nums = [int(f.split("_continued_")[1].split(".")[0]) for f in existing if "_continued_" in f] 208 # nxt = max(nums, default=0) + 1 209 # hist_id = f"{hist_id}_continued_{nxt}" 210 # self.history_id = hist_id 211 # json_fp = os.path.join(path, f"{hist_id}.json") 212 # md_fp = os.path.join(path, f"{hist_id}.md") 213 214 # with open(json_fp, "w") as f: 215 # json.dump(self.to_json(), f, indent=2) 216 # with open(md_fp, "w") as f: 217 # f.write(self.to_markdown()) 218 219 # return hist_id 220 def save(self, agent_name: str) -> str: 221 """Persist both .json and .md to disk, overwriting if history_id exists.""" 222 # Use HEAVEN_DATA_DIR for saving agent histories 223 base_path = os.path.join(EnvConfigUtil.get_heaven_data_dir(), "agents") 224 nm = normalize_agent_name(agent_name) 225 226 # If we already have a history_id, use it 227 if self.history_id: 228 # Extract date directory from existing history_id 229 date_str = "_".join(self.history_id.split("_")[:3]) 230 path = os.path.join(base_path, nm, "memories", "histories", date_str) 231 hist_id = self.history_id 232 else: 233 # Create new history_id for new conversation 234 now = datetime.now() 235 date_dir = now.strftime("%Y_%m_%d") 236 hist_id = f"{now.strftime('%Y_%m_%d_%H_%M_%S')}_{nm}" 237 path = os.path.join(base_path, nm, "memories", "histories", date_dir) 238 239 # Ensure directory exists 240 os.makedirs(path, exist_ok=True) 241 242 # Use existing or new history_id 243 self.history_id = hist_id 244 json_fp = os.path.join(path, f"{hist_id}.json") 245 md_fp = os.path.join(path, f"{hist_id}.md") 246 247 # Write files, overwriting existing ones 248 with open(json_fp, "w") as f: 249 json.dump(self.to_json(), f, indent=2) 250 with open(md_fp, "w") as f: 251 f.write(self.to_markdown()) 252 253 return hist_id 254 255 @classmethod 256 def from_json(cls, data: dict, project: str = None) -> History: 257 msgs: List[Union[BaseMessage, ADKEvent]] = [] 258 for m in data["messages"]: 259 if "adk_event" in m: 260 evt = ADKEvent.model_validate(m["adk_event"]) 261 msgs.append(evt) 262 else: 263 t = m["type"] 264 if t == "SystemMessage": 265 msgs.append(SystemMessage(content=m["content"])) 266 elif t == "HumanMessage": 267 msgs.append(HumanMessage(content=m["content"])) 268 elif t == "AIMessage": 269 msgs.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 270 elif t == "ToolMessage": 271 msgs.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 272 273 status = AgentStatus(**data["agent_status"]) if data.get("agent_status") else None 274 275 hist = cls( 276 messages=msgs, 277 created_datetime=datetime.fromisoformat(data["created_datetime"]), 278 metadata=data.get("metadata", {}), 279 project=project, 280 agent_status=status, 281 history_id=data.get("history_id"), 282 json_md_path=data.get("json_md_path"), 283 ) 284 285 # rehydrate session if present 286 if "adk_session" in data: 287 hist.adk_session = ADKSession.model_validate(data["adk_session"]) 288 289 return hist 290 291 292 @classmethod 293 def from_markdown(cls, md_content: str, project: str = None) -> "History": 294 print("Parsing markdown:", md_content) # Debug 295 """Parse markdown back into History""" 296 # Split into metadata and content sections 297 parts = md_content.split("===[CONTENT]===") 298 if len(parts) == 2: 299 metadata_section = parts[0].split("===[METADATA]===")[1].strip() 300 content = parts[1].strip() 301 metadata_lines = metadata_section.split("\n") 302 else: 303 metadata_lines = [] 304 content = md_content 305 306 # Parse metadata 307 metadata = {} 308 datetime_val = None 309 history_id = None 310 for line in metadata_lines: 311 if ":" in line: 312 key, value = line.split(":", 1) 313 key = key.strip() 314 value = value.strip() 315 if key == "datetime": 316 datetime_val = datetime.fromisoformat(value) 317 elif key == "history_id": 318 history_id = value 319 elif key == "json_md_path": # Add this condition 320 json_md_path = value 321 322 else: 323 metadata[key] = value 324 325 # Parse messages by splitting on message separator 326 messages = [] 327 message_blocks = content.split("===[MESSAGE]===") 328 329 for block in message_blocks: 330 block = block.strip() 331 if not block: # Skip empty blocks 332 continue 333 334 lines = block.split("\n") 335 current_type = None 336 current_lines = [] 337 338 # Special handling for tool messages 339 if block.startswith("**Tool**"): 340 # Extract tool ID 341 header = lines[0] 342 tool_id = header.split("id:", 1)[1].split(")", 1)[0].strip() 343 344 # Extract content between backticks 345 content_lines = [] 346 in_code_block = False 347 for line in lines[1:]: # Skip header 348 if line.strip() == "```": 349 in_code_block = not in_code_block 350 continue 351 if in_code_block: 352 content_lines.append(line) 353 354 content = "\n".join(content_lines) 355 messages.append(ToolMessage(content=content, tool_call_id=tool_id)) 356 continue 357 358 # Normal message handling 359 for line in lines: 360 if line.startswith("**") and "**:" in line: 361 current_type = line.split("**:", 1)[0].strip("*").strip() 362 current_lines = [line.split("**:", 1)[1].strip()] 363 else: 364 current_lines.append(line) 365 366 # Process accumulated message at end of block 367 if current_lines and current_type: 368 msg_content = "\n".join(current_lines).strip() 369 if msg_content: # Only add if there's actual content 370 if current_type == "System": 371 messages.append(SystemMessage(content=msg_content)) 372 elif current_type == "Human": 373 messages.append(HumanMessage(content=msg_content)) 374 elif current_type == "AI": 375 messages.append(AIMessage(content=msg_content)) 376 elif current_type.startswith("Tool"): 377 # Extract tool_call_id if present 378 tool_id = None 379 if "(id:" in current_type: 380 tool_id = current_type.split("id:", 1)[1].split(")", 1)[0].strip() 381 382 # Process tool message more carefully 383 try: 384 print(f"Raw tool message content: {msg_content}") # Debug 385 sys.stdout.flush() 386 lines = [line for line in msg_content.split("\n") if line.strip()] # Remove empty lines 387 print(f"Non-empty lines: {lines}") # Debug 388 389 # Extract content between ``` markers 390 content_start = lines.index("```") + 1 391 content_end = lines.index("```", content_start) 392 tool_content = "\n".join(lines[content_start:content_end]) 393 394 print(f"Extracted tool content: {tool_content}") # Debug 395 messages.append(ToolMessage(content=tool_content, tool_call_id=tool_id)) 396 except Exception as e: 397 print(f"Error processing tool message: {e}") # Debug 398 399 # Process blocks complete 400 401 return cls( 402 messages=messages, 403 created_datetime=datetime_val or datetime.now(), 404 project=project, 405 metadata=metadata, 406 history_id=history_id, 407 json_md_path=json_md_path 408 ) 409 410 411 @classmethod 412 def from_adk_session(cls, session: ADKSession) -> "History": 413 """ 414 Wrap an ADKSession without mutating history.messages at all. 415 """ 416 hist = cls( 417 messages=[], # untouched 418 history_id=None, # will be set on save() 419 created_datetime=datetime.fromtimestamp(session.last_update_time), 420 metadata={}, 421 agent_status=None, 422 ) 423 hist.adk_session = session 424 return hist 425 426 427 428 def to_adk_session( 429 self, 430 app_name: str, 431 user_id: str, 432 ) -> ADKSession: 433 """ 434 Return the exact same ADKSession we stored—state and events intact. 435 """ 436 if not hasattr(self, "adk_session") or self.adk_session is None: 437 raise RuntimeError("No ADK session attached to this History") 438 return self.adk_session 439 440 # def _compute_iterations(self) -> Dict[str, List[ADKEvent]]: 441 # """ 442 # Split events into “iterations”: chunks between user turns. 443 # """ 444 # iters: Dict[str, List[ADKEvent]] = {} 445 # current: List[ADKEvent] = [] 446 # idx = 0 447 # for ev in self.events: 448 # if ev.author == "user": 449 # if current: 450 # iters[f"iteration_{idx}"] = current 451 # idx += 1 452 # current = [] 453 # else: 454 # current.append(ev) 455 # if current: 456 # iters[f"iteration_{idx}"] = current 457 # return iters 458 459 # @property 460 # def iterations(self) -> Dict[str, List[ADKEvent]]: 461 # return self._compute_iterations() 462 463 464 def to_uni_messages(self) -> List[Dict[str, Any]]: 465 """Convert dictionary messages to uni-api format: dictionary → LangChain → uni-api""" 466 import json 467 468 # Convert dictionary messages (stored in self.messages) to LangChain objects first 469 msgs_as_dicts = self.to_json()["messages"] 470 471 langchain_messages = [] 472 for m in msgs_as_dicts: 473 if "adk_event" in m: 474 continue # Skip ADK events 475 else: 476 t = m["type"] 477 if t == "SystemMessage": 478 langchain_messages.append(SystemMessage(content=m["content"])) 479 elif t == "HumanMessage": 480 langchain_messages.append(HumanMessage(content=m["content"])) 481 elif t == "AIMessage": 482 langchain_messages.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 483 elif t == "ToolMessage": 484 langchain_messages.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 485 486 # Convert LangChain objects to uni-api format 487 uni_messages = [] 488 489 for msg in langchain_messages: 490 if isinstance(msg, SystemMessage): 491 uni_messages.append({"role": "system", "content": msg.content}) 492 493 elif isinstance(msg, HumanMessage): 494 uni_messages.append({"role": "user", "content": msg.content}) 495 496 elif isinstance(msg, AIMessage): 497 uni_msg = {"role": "assistant", "content": msg.content or ""} 498 499 # Preserve additional_kwargs 500 if msg.additional_kwargs: 501 uni_msg["additional_kwargs"] = msg.additional_kwargs 502 503 # Handle tool calls in additional_kwargs (OpenAI style) 504 if msg.additional_kwargs.get("tool_calls"): 505 uni_msg["tool_calls"] = msg.additional_kwargs["tool_calls"] 506 507 # Handle tool calls as direct attribute 508 elif hasattr(msg, 'tool_calls') and msg.tool_calls: 509 uni_msg["tool_calls"] = msg.tool_calls 510 511 # Handle Anthropic style tool_use in content 512 elif isinstance(msg.content, list): 513 tool_calls = [] 514 content_parts = [] 515 516 for item in msg.content: 517 if isinstance(item, dict) and item.get('type') == 'tool_use': 518 # Convert Anthropic format to OpenAI format 519 tool_calls.append({ 520 "id": item.get("id", ""), 521 "type": "function", 522 "function": { 523 "name": item.get("name", ""), 524 "arguments": json.dumps(item.get("input", {})) 525 } 526 }) 527 elif isinstance(item, dict) and item.get('type') == 'text': 528 content_parts.append(item.get('text', '')) 529 elif isinstance(item, str): 530 content_parts.append(item) 531 532 if tool_calls: 533 uni_msg["tool_calls"] = tool_calls 534 if content_parts: 535 uni_msg["content"] = ' '.join(content_parts) 536 537 uni_messages.append(uni_msg) 538 539 elif isinstance(msg, ToolMessage): 540 uni_messages.append({ 541 "role": "tool", 542 "tool_call_id": msg.tool_call_id, 543 "content": msg.content 544 }) 545 546 return uni_messages 547 548 @classmethod 549 def from_uni_messages( 550 cls, 551 uni_messages: List[Dict[str, Any]], 552 history_id: Optional[str] = None, 553 metadata: Optional[Dict[str, Any]] = None 554 ) -> "History": 555 """Create History from uni-api message format: uni-api → LangChain → dictionary""" 556 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 557 558 # First: uni-api → LangChain (original logic) 559 messages = [] 560 561 for uni_msg in uni_messages: 562 role = uni_msg["role"] 563 content = uni_msg.get("content", "") 564 565 if role == "system": 566 messages.append(SystemMessage(content=content)) 567 568 elif role == "user": 569 messages.append(HumanMessage(content=content)) 570 571 elif role == "assistant": 572 # Restore additional_kwargs from uni-api format 573 additional_kwargs = uni_msg.get("additional_kwargs", {}) 574 575 if uni_msg.get("tool_calls"): 576 # Transform OpenAI format to LangChain format 577 langchain_tool_calls = [] 578 for tc in uni_msg["tool_calls"]: 579 if "function" in tc: 580 # Convert OpenAI format to LangChain format 581 langchain_tool_calls.append({ 582 "id": tc["id"], 583 "name": tc["function"]["name"], 584 "args": json.loads(tc["function"]["arguments"]), 585 "type": "tool_call" 586 }) 587 else: 588 # Already in LangChain format 589 langchain_tool_calls.append(tc) 590 591 # Merge tool_calls into additional_kwargs 592 additional_kwargs["tool_calls"] = uni_msg["tool_calls"] 593 messages.append(AIMessage( 594 content=content, 595 additional_kwargs=additional_kwargs, 596 tool_calls=langchain_tool_calls # Use converted format 597 )) 598 else: 599 messages.append(AIMessage(content=content, additional_kwargs=additional_kwargs)) 600 601 elif role == "tool": 602 messages.append(ToolMessage( 603 content=content, 604 tool_call_id=uni_msg["tool_call_id"] 605 )) 606 607 # Convert LangChain messages to dictionary format for storage 608 message_dicts = [] 609 for msg in messages: 610 if isinstance(msg, SystemMessage): 611 message_dicts.append({ 612 "type": "SystemMessage", 613 "content": msg.content, 614 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 615 }) 616 elif isinstance(msg, HumanMessage): 617 message_dicts.append({ 618 "type": "HumanMessage", 619 "content": msg.content, 620 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 621 }) 622 elif isinstance(msg, AIMessage): 623 message_dicts.append({ 624 "type": "AIMessage", 625 "content": msg.content, 626 "additional_kwargs": getattr(msg, 'additional_kwargs', {}), 627 "tool_calls": getattr(msg, 'tool_calls', None) 628 }) 629 elif isinstance(msg, ToolMessage): 630 message_dicts.append({ 631 "type": "ToolMessage", 632 "content": msg.content, 633 "tool_call_id": getattr(msg, 'tool_call_id', None) 634 }) 635 636 # Create History using from_json to properly convert dictionaries to LangChain objects 637 json_data = { 638 "messages": message_dicts, 639 "history_id": history_id, 640 "metadata": metadata or {}, 641 "created_datetime": datetime.now().isoformat(), 642 "agent_status": None, 643 "json_md_path": None 644 } 645 return cls.from_json(json_data) 646 647 def save_with_uni_context(self, agent_name: str, uni_api_used: bool = False) -> str: 648 """ 649 Enhanced save method that adds uni-api context to metadata. 650 This is optional - the regular save() method will work fine too. 651 """ 652 if uni_api_used: 653 self.metadata["uni_api_used"] = True 654 self.metadata["provider_unified"] = True 655 656 return self.save(agent_name) 657 658 def get_last_n_uni_messages(self, n: int) -> List[Dict[str, Any]]: 659 """Get last N messages in uni-api format""" 660 uni_messages = self.to_uni_messages() 661 return uni_messages[-n:] if len(uni_messages) >= n else uni_messages 662 663 def append_uni_messages(self, uni_messages: List[Dict[str, Any]]): 664 """Append uni-api messages to history after converting to LangChain format""" 665 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 666 667 for uni_msg in uni_messages: 668 role = uni_msg["role"] 669 content = uni_msg.get("content", "") 670 671 if role == "system": 672 self.messages.append(SystemMessage(content=content)) 673 674 elif role == "user": 675 self.messages.append(HumanMessage(content=content)) 676 677 elif role == "assistant": 678 if uni_msg.get("tool_calls"): 679 self.messages.append(AIMessage( 680 content=content, 681 additional_kwargs={"tool_calls": uni_msg["tool_calls"]} 682 )) 683 else: 684 self.messages.append(AIMessage(content=content)) 685 686 elif role == "tool": 687 self.messages.append(ToolMessage( 688 content=content, 689 tool_call_id=uni_msg["tool_call_id"] 690 )) 691 692 693# def get_iteration_view( 694# history: History, 695# start: int, 696# end: int 697# ) -> Dict[str, Any]: 698# """ 699# Return a dict containing: 700# • history_id 701# • total_iterations 702# • view_range: {'start', 'end'} 703# • view: sub-dict iteration_i → List[ADKEvent] 704# """ 705# if start > end: 706# raise ValueError(f"start ({start}) must be <= end ({end})") 707# all_iters = history.iterations 708# total = len(all_iters) 709 710# view: Dict[str, List[ADKEvent]] = {} 711# for i in range(start, end + 1): 712# key = f"iteration_{i}" 713# if key not in all_iters: 714# raise KeyError(f"Missing iteration: {key}") 715# view[key] = all_iters[key] 716 717# return { 718# "history_id": history.history_id, 719# "total_iterations": total, 720# "view_range": {"start": start, "end": end}, 721# "view": view, 722# } 723 724 725# Should add utils like this 726# def get_last_tool_msg(history) -> Optional[str]: 727# """ 728# Scan history.messages to find the most recent tool use and return its ToolMessage.content. 729 730# Args: 731# history: The agent's history object with a .messages attribute. 732 733# Returns: 734# Optional[str]: The content of the last ToolMessage, or None if none found. 735# """ 736# for i, msg in enumerate(history.messages): 737# if isinstance(msg, AIMessage) and isinstance(msg.content, list): 738# for item in msg.content: 739# if isinstance(item, dict) and item.get('type') == 'tool_use': 740# # Next message should be the tool result 741# if i + 1 < len(history.messages) and isinstance(history.messages[i + 1], ToolMessage): 742# return history.messages[i + 1].content 743# return None 744 745 746 747 748# class History(BasePiece): 749# """Single conversation with metadata""" 750# # messages: List[BaseMessage] # old 751# messages: List[Union[BaseMessage, ADKEvent]] 752# history_id: Optional[str] = None 753# # Using created_datetime from BasePiece 754# metadata: Dict = {} 755# agent_status: Optional[AgentStatus] = None 756# json_md_path: Optional[str] = None 757# adk_session: Optional[ADKSession] = None 758 759# def to_markdown(self) -> str: 760# """Convert to markdown with metadata header""" 761# md_parts = [] 762 763# # Add metadata header with unique separator 764# md_parts.append("===[METADATA]===") 765# md_parts.append(f"datetime: {self.created_datetime.isoformat()}") 766# md_parts.append(f"history_id: {self.history_id}") 767# md_parts.append(f"json_md_path: {self.json_md_path}") 768# for k, v in self.metadata.items(): 769# md_parts.append(f"{k}: {v}") 770# md_parts.append("===[CONTENT]===\n") 771 772# # Add messages 773# for msg in self.messages: 774# # This needs work 775# if isinstance(msg, ADKEvent): 776# md_parts.append(f"**[ADK Event {msg.id}]** {msg.actions=}, …") # however you’d like to render it 777# else: 778# # Add a separator between messages 779# md_parts.append("===[MESSAGE]===") 780 781# if isinstance(msg, SystemMessage): 782# md_parts.append(f"**System**: {msg.content}") 783# elif isinstance(msg, HumanMessage): 784# md_parts.append(f"**Human**: {msg.content}") 785# elif isinstance(msg, AIMessage): 786# md_parts.append(f"**AI**: {msg.content}") 787# elif isinstance(msg, ToolMessage): 788# # Special format for tool messages to make parsing easier 789# tool_msg = [ 790# "**Tool** (id: {tool_id})".format(tool_id=msg.tool_call_id), 791# "```", 792# msg.content, 793# "```" 794# ] 795# md_parts.append("\n".join(tool_msg)) 796 797# return "\n\n".join(md_parts) 798 799# def to_json(self) -> dict: 800# """Convert to JSON format""" 801# msgs = [] 802# for msg in self.messages: 803# if isinstance(msg, ADKEvent): 804# # ADK Event knows how to JSON‐serialize itself 805# # dump to dict and convert any sets -> lists 806# raw = msg.model_dump() 807# def _make_jsonable(x): 808# if isinstance(x, dict): 809# return {k: _make_jsonable(v) for k,v in x.items()} 810# if isinstance(x, list): 811# return [_make_jsonable(v) for v in x] 812# if isinstance(x, set): 813# return [_make_jsonable(v) for v in x] 814# return x 815# clean = _make_jsonable(raw) 816# msgs.append({"adk_event": clean}) 817# else: 818# # existing BaseMessage branch 819# msgs.append({ 820# "type": type(msg).__name__, 821# "content": msg.content, 822# "tool_call_id": getattr(msg, "tool_call_id", None), 823# "additional_kwargs": getattr(msg, "additional_kwargs", None), 824# }) 825 826# return { 827# "history_id": self.history_id, 828# "created_datetime": self.created_datetime.isoformat(), 829# "metadata": self.metadata, 830# "messages": msgs, 831# "agent_status": self.agent_status.dict() if self.agent_status else None, 832# "json_md_path": self.json_md_path 833# } 834 835 836# @classmethod 837# def _load_history_file(cls, history_id: str) -> 'History': 838# """Load history file from disk""" 839# base_dir = os.path.dirname(os.path.abspath(__file__)) 840# base_path = os.path.join(os.path.dirname(base_dir), "agents") 841 842# if not os.path.exists(base_path): 843# raise FileNotFoundError(f"Agents directory not found at {base_path}") 844 845# # Get date from history_id for narrowing search 846# date_str = '_'.join(history_id.split('_')[:3]) 847 848# # Search for the history file in all agent directories 849# for agent_dir in os.listdir(base_path): 850# history_path = os.path.join(base_path, agent_dir, "memories", "histories", date_str, f"{history_id}.json") 851# print(f"Checking path: {history_path}") # Debug print 852# if os.path.exists(history_path): 853# print(f"Found history file at: {history_path}") # Debug print 854# with open(history_path, 'r') as f: 855# history_data = json.load(f) 856# history = cls.from_json(history_data) 857# history.json_md_path = os.path.dirname(history_path) 858# history.adk_session = ADKSession.model_validate(history_data["adk_session"]) 859 860# return history 861 862# raise FileNotFoundError(f"No history file found for ID {history_id} in any agent directory") 863 864# @classmethod 865# def load_from_id(cls, history_id: str) -> 'History': 866# """Load history from ID with proper continuation handling""" 867# if history_id is None: 868# # Brand new conversation 869# return cls(messages=[], history_id=str(uuid.uuid4())) 870 871# try: 872# # Load existing history 873# original = cls._load_history_file(history_id) 874 875# # Create continuation ID 876# if "_continued_" in history_id: 877# pattern = r"_continued_(\d+)$" 878# match = re.search(pattern, history_id) 879# if match: 880# current_num = int(match.group(1)) 881# base_id = history_id.split("_continued_")[0] 882# new_history_id = f"{base_id}_continued_{current_num + 1}" 883# else: 884# raise ValueError(f"Invalid continuation ID format: {history_id}") 885# else: 886# new_history_id = f"{history_id}_continued_1" 887# # Create a cleaned copy of agent_status 888# clean_agent_status = None 889# if original.agent_status: 890# # Create a copy by converting to dict and back 891# agent_status_dict = original.agent_status.dict() 892 893# # Clean any block reports from extracted_content 894# if 'extracted_content' in agent_status_dict and agent_status_dict['extracted_content']: 895# if 'block_report' in agent_status_dict['extracted_content']: 896# del agent_status_dict['extracted_content']['block_report'] 897 898# # Create new AgentStatus with cleaned data 899# clean_agent_status = AgentStatus(**agent_status_dict) 900 901# # Create new history with same messages but new ID 902# return cls( 903# messages=original.messages.copy(), 904# created_datetime=datetime.now(), 905# metadata=original.metadata.copy(), 906# history_id=new_history_id, 907# agent_status=clean_agent_status, 908# json_md_path=original.json_md_path 909# ) 910# except FileNotFoundError: 911# raise ValueError(f"No history found with ID {history_id}") 912 913# def save(self, agent_name: str): 914# # Get base path 915# base_dir = os.path.dirname(os.path.abspath(__file__)) 916# base_path = os.path.join(os.path.dirname(base_dir), "agents") 917 918# # Normalize agent name to match AgentMakerTool's format 919# normalized_agent_name = normalize_agent_name(agent_name) # Using our new function 920 921# # Generate history_id based on datetime and agent name 922# now = datetime.now() 923# date_str = now.strftime('%Y_%m_%d') # For directory 924# datetime_str = now.strftime('%Y_%m_%d_%H_%M_%S') # For file name 925# history_id = f"{datetime_str}_{normalized_agent_name}" 926 927# # Create path including date directory 928# path = os.path.join(base_path, normalized_agent_name, "memories", "histories", date_str) 929# self.json_md_path = path 930# os.makedirs(path, exist_ok=True) 931 932# # Look for existing files with same base name 933# existing_files = [f for f in os.listdir(path) if f.startswith(history_id)] 934# if existing_files: 935# # If files exist, this is a continuation 936# max_cont = 0 937# for f in existing_files: 938# if "_continued_" in f: 939# cont_num = int(f.split("_continued_")[1].split(".")[0]) 940# max_cont = max(max_cont, cont_num) 941# history_id = f"{history_id}_continued_{max_cont + 1}" 942# self.history_id = history_id 943# # Save both JSON and Markdown versions 944# json_filepath = os.path.join(path, f"{history_id}.json") 945# md_filepath = os.path.join(path, f"{history_id}.md") 946 947# # Save JSON 948# with open(json_filepath, 'w') as f: 949# json.dump(self.to_json(), f) 950 951# # Save Markdown 952# with open(md_filepath, 'w') as f: 953# f.write(self.to_markdown()) 954 955# return history_id 956 957# @classmethod 958# def from_json(cls, data: dict, project: str = None) -> "History": 959# """Create History from JSON""" 960# messages = [] 961# for msg in data["messages"]: 962# if msg["type"] == "SystemMessage": 963# messages.append(SystemMessage(content=msg["content"])) 964# elif msg["type"] == "HumanMessage": 965# messages.append(HumanMessage(content=msg["content"])) 966# elif msg["type"] == "AIMessage": 967# messages.append(AIMessage( 968# content=msg["content"], 969# additional_kwargs=msg["additional_kwargs"] 970# )) 971# elif msg["type"] == "ToolMessage": 972# messages.append(ToolMessage( 973# content=msg["content"], 974# tool_call_id=msg["tool_call_id"] 975# )) 976 977# # Handle status if it exists 978# agent_status = None 979# if data.get("agent_status"): 980# agent_status = AgentStatus(**data["agent_status"]) 981 982# return cls( 983# messages=messages, 984# created_datetime=datetime.fromisoformat(data["created_datetime"]), 985# metadata=data["metadata"], 986# project=project, 987# agent_status=agent_status, 988# history_id=data.get("history_id"), 989# json_md_path=data.get("json_md_path") 990# ) 991 992# @classmethod 993# def from_markdown(cls, md_content: str, project: str = None) -> "History": 994# print("Parsing markdown:", md_content) # Debug 995# """Parse markdown back into History""" 996# # Split into metadata and content sections 997# parts = md_content.split("===[CONTENT]===") 998# if len(parts) == 2: 999# metadata_section = parts[0].split("===[METADATA]===")[1].strip() 1000# content = parts[1].strip() 1001# metadata_lines = metadata_section.split("\n") 1002# else: 1003# metadata_lines = [] 1004# content = md_content 1005 1006# # Parse metadata 1007# metadata = {} 1008# datetime_val = None 1009# history_id = None 1010# for line in metadata_lines: 1011# if ":" in line: 1012# key, value = line.split(":", 1) 1013# key = key.strip() 1014# value = value.strip() 1015# if key == "datetime": 1016# datetime_val = datetime.fromisoformat(value) 1017# elif key == "history_id": 1018# history_id = value 1019# elif key == "json_md_path": # Add this condition 1020# json_md_path = value 1021 1022# else: 1023# metadata[key] = value 1024 1025# # Parse messages by splitting on message separator 1026# messages = [] 1027# message_blocks = content.split("===[MESSAGE]===") 1028 1029# for block in message_blocks: 1030# block = block.strip() 1031# if not block: # Skip empty blocks 1032# continue 1033 1034# lines = block.split("\n") 1035# current_type = None 1036# current_lines = [] 1037 1038# # Special handling for tool messages 1039# if block.startswith("**Tool**"): 1040# # Extract tool ID 1041# header = lines[0] 1042# tool_id = header.split("id:", 1)[1].split(")", 1)[0].strip() 1043 1044# # Extract content between backticks 1045# content_lines = [] 1046# in_code_block = False 1047# for line in lines[1:]: # Skip header 1048# if line.strip() == "```": 1049# in_code_block = not in_code_block 1050# continue 1051# if in_code_block: 1052# content_lines.append(line) 1053 1054# content = "\n".join(content_lines) 1055# messages.append(ToolMessage(content=content, tool_call_id=tool_id)) 1056# continue 1057 1058# # Normal message handling 1059# for line in lines: 1060# if line.startswith("**") and "**:" in line: 1061# current_type = line.split("**:", 1)[0].strip("*").strip() 1062# current_lines = [line.split("**:", 1)[1].strip()] 1063# else: 1064# current_lines.append(line) 1065 1066# # Process accumulated message at end of block 1067# if current_lines and current_type: 1068# msg_content = "\n".join(current_lines).strip() 1069# if msg_content: # Only add if there's actual content 1070# if current_type == "System": 1071# messages.append(SystemMessage(content=msg_content)) 1072# elif current_type == "Human": 1073# messages.append(HumanMessage(content=msg_content)) 1074# elif current_type == "AI": 1075# messages.append(AIMessage(content=msg_content)) 1076# elif current_type.startswith("Tool"): 1077# # Extract tool_call_id if present 1078# tool_id = None 1079# if "(id:" in current_type: 1080# tool_id = current_type.split("id:", 1)[1].split(")", 1)[0].strip() 1081 1082# # Process tool message more carefully 1083# try: 1084# print(f"Raw tool message content: {msg_content}") # Debug 1085# sys.stdout.flush() 1086# lines = [line for line in msg_content.split("\n") if line.strip()] # Remove empty lines 1087# print(f"Non-empty lines: {lines}") # Debug 1088 1089# # Extract content between ``` markers 1090# content_start = lines.index("```") + 1 1091# content_end = lines.index("```", content_start) 1092# tool_content = "\n".join(lines[content_start:content_end]) 1093 1094# print(f"Extracted tool content: {tool_content}") # Debug 1095# messages.append(ToolMessage(content=tool_content, tool_call_id=tool_id)) 1096# except Exception as e: 1097# print(f"Error processing tool message: {e}") # Debug 1098 1099# # Process blocks complete 1100 1101# return cls( 1102# messages=messages, 1103# created_datetime=datetime_val or datetime.now(), 1104# project=project, 1105# metadata=metadata, 1106# history_id=history_id, 1107# json_md_path=json_md_path 1108# ) 1109 1110# @classmethod 1111# def from_adk_session(cls, session: ADKSession) -> "History": 1112# return cls( 1113# messages=session.events, # now a list of ADKEvent 1114# history_id=session.id, 1115# created_datetime=datetime.fromtimestamp(session.last_update_time), 1116# metadata={}, # or deserialize session.state if you like 1117# # agent_status left None or reconstructed from state 1118# ) 1119 1120# def to_adk_session( 1121# self, 1122# app_name: str, 1123# user_id: str, 1124# ) -> ADKSession: 1125# return ADKSession( 1126# id=self.history_id or "", 1127# app_name=app_name, 1128# user_id=user_id, 1129# state={}, # serialize .agent_status or metadata if needed 1130# events=[m for m in self.messages if isinstance(m, ADKEvent)], 1131# last_update_time=self.created_datetime.timestamp(), 1132# ) 1133# # Old 1134# # def _compute_iterations(self) -> Dict[str, List[BaseMessage]]: 1135# # """ 1136# # Split self.messages into “iterations”: 1137# # each iteration is the sequence of messages *between* two human turns. 1138# # HumanMessages themselves are not included. 1139# # """ 1140# # iterations: Dict[str, List[BaseMessage]] = {} 1141# # current: List[BaseMessage] = [] 1142# # idx = 0 1143 1144# # for msg in self.messages: 1145# # if isinstance(msg, HumanMessage): 1146# # if current: 1147# # iterations[f"iteration_{idx}"] = current 1148# # idx += 1 1149# # current = [] 1150# # else: 1151# # current.append(msg) 1152 1153# # if current: 1154# # iterations[f"iteration_{idx}"] = current 1155 1156# # return iterations 1157 # unsure if this handles ADK correctly 1158 def _compute_iterations(self) -> Dict[str, List[Union[BaseMessage, ADKEvent]]]: 1159 """ 1160 Split self.messages into “iterations”: 1161 each iteration is the sequence of messages *between* two user turns. 1162 HumanMessage and ADKEvent(author="user") both start a new iteration. 1163 """ 1164 iterations: Dict[str, List[Union[BaseMessage, ADKEvent]]] = {} 1165 current: List[Union[BaseMessage, ADKEvent]] = [] 1166 idx = 0 1167 1168 for msg in self.messages: 1169 is_user = ( 1170 isinstance(msg, HumanMessage) 1171 or (isinstance(msg, ADKEvent) and getattr(msg, "author", None) == "user") 1172 ) 1173 if is_user: 1174 if current: 1175 iterations[f"iteration_{idx}"] = current 1176 idx += 1 1177 current = [] 1178 else: 1179 current.append(msg) 1180 1181 if current: 1182 iterations[f"iteration_{idx}"] = current 1183 1184 return iterations 1185 1186 @property 1187 def iterations(self) -> Dict[str, List[Union[BaseMessage, ADKEvent]]]: # changed return annotation 1188 """ 1189 Returns a dict of all “iterations” in this history, 1190 where each iteration is the sequence of non‑HumanMessage 1191 messages between two human turns. 1192 """ 1193 return self._compute_iterations() 1194 1195 1196 1197 1198def get_iteration_view( 1199 history: History, 1200 start: int, 1201 end: int 1202) -> Dict[str, Any]: 1203 """ 1204 Return a dict containing: 1205 • history_id – the ID of this history 1206 • total_iterations – how many iterations the history has 1207 • view_range – dict with 'start' and 'end' 1208 • view – sub‑dict of iteration_i → List[BaseMessage] 1209 Raises: 1210 • ValueError if start > end 1211 • KeyError if any iteration_i in that range is missing 1212 """ 1213 if start > end: 1214 raise ValueError(f"start ({start}) must be <= end ({end})") 1215 1216 all_iters = history.iterations 1217 total = len(all_iters) 1218 1219 # Build the view 1220 view: Dict[str, List[BaseMessage]] = {} 1221 for i in range(start, end + 1): 1222 key = f"iteration_{i}" 1223 if key not in all_iters: 1224 raise KeyError(f"Missing iteration: {key}") 1225 view[key] = all_iters[key] 1226 1227 return { 1228 "history_id": history.history_id, 1229 "total_iterations": total, 1230 "view_range": {"start": start, "end": end}, 1231 "view": view 1232 } 1233 1234# from google.genai import types 1235 # def to_contents(self) -> list[types.Content]: 1236 # contents: list[types.Content] = [] 1237 # for msg in self.messages: 1238 # if isinstance(msg, SystemMessage): 1239 # role = "system"; text = msg.content 1240 # elif isinstance(msg, HumanMessage): 1241 # role = "user"; text = msg.content 1242 # elif isinstance(msg, AIMessage): 1243 # role = "assistant" 1244 # text = ( 1245 # msg.content 1246 # if isinstance(msg.content, str) 1247 # else "".join( 1248 # b.get("text","") for b in msg.content if b.get("type")=="text" 1249 # ) 1250 # ) 1251 # elif isinstance(msg, ToolMessage): 1252 # role = "tool"; text = msg.content 1253 # else: 1254 # continue 1255 1256 # contents.append(types.Content( 1257 # role=role, 1258 # parts=[types.Part(text=text)] 1259 # )) 1260 # return contents 1261 1262 # def update_from_contents(self, contents: list[types.Content]): 1263 # new_msgs: list[BaseMessage] = [] 1264 # for c in contents: 1265 # text = "".join(p.text or "" for p in c.parts) 1266 # if c.role == "system": 1267 # new_msgs.append(SystemMessage(content=text)) 1268 # elif c.role == "user": 1269 # new_msgs.append(HumanMessage(content=text)) 1270 # elif c.role == "assistant": 1271 # new_msgs.append(AIMessage(content=text)) 1272 # elif c.role == "tool": 1273 # new_msgs.append(ToolMessage(content=text)) 1274 # self.messages = new_msgs
29class ADKEvent(BaseModel): 30 """Stub for google.adk.events.event.Event — passes Pydantic schema generation 31 and isinstance checks. No real ADKEvents exist in Anthropic-only histories.""" 32 model_config = {"arbitrary_types_allowed": True}
Stub for google.adk.events.event.Event — passes Pydantic schema generation and isinstance checks. No real ADKEvents exist in Anthropic-only histories.
34class ADKSession(BaseModel): 35 """Stub for google.adk.sessions.session.Session""" 36 model_config = {"arbitrary_types_allowed": True}
Stub for google.adk.sessions.session.Session
38class AgentStatus(BaseModel): 39 goal: Optional[str] = None 40 task_list: List[str] = Field(default_factory=list) 41 current_task: Optional[str] = None 42 completed: bool = False 43 extracted_content: Dict[str, str] = Field(default_factory=dict)
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
47class History(BasePiece): 48 """Single conversation with metadata, backed by an ADK Session.""" 49 # Legacy fallback messages 50 messages: List[Union[BaseMessage, ADKEvent]] 51 history_id: Optional[str] = None 52 metadata: Dict[str, Any] = {} 53 agent_status: Optional[AgentStatus] = None 54 json_md_path: Optional[str] = None 55 56 # The single source of truth for ADK-based histories 57 adk_session: Optional[ADKSession] = None 58 59 @property 60 def events(self) -> List[ADKEvent]: 61 """All ADK events in this history (or fallback to any stored `messages`).""" 62 if self.adk_session: 63 return self.adk_session.events 64 return [m for m in self.messages if isinstance(m, ADKEvent)] 65 66 def to_markdown(self) -> str: 67 """Convert to markdown with metadata and a human-readable dump of ADK events.""" 68 md_parts: List[str] = [] 69 md_parts.append("===[METADATA]===") 70 md_parts.append(f"datetime: {self.created_datetime.isoformat()}") 71 md_parts.append(f"history_id: {self.history_id}") 72 md_parts.append(f"json_md_path: {self.json_md_path}") 73 for k, v in self.metadata.items(): 74 md_parts.append(f"{k}: {v}") 75 md_parts.append("===[CONTENT]===\n") 76 77 # If we have an ADK session, render its events 78 for ev in self.events: 79 md_parts.append(f"--- [ADK Event] id={ev.id}, author={ev.author}, timestamp={datetime.fromtimestamp(ev.timestamp)}") 80 # Render any function calls/responses/text 81 for part in getattr(ev.content, "parts", []): 82 if getattr(part, "function_call", None): 83 fc = part.function_call 84 md_parts.append(f"*Function Call*: {fc.name}({fc.args})") 85 if getattr(part, "function_response", None): 86 fr = part.function_response 87 md_parts.append(f"*Function Response*: {fr.response}") 88 if getattr(part, "text", None): 89 md_parts.append(f"{part.text}") 90 md_parts.append("") # blank line between events 91 92 # Fallback: if no ADK session, render legacy BaseMessages 93 if not self.adk_session: 94 for msg in self.messages: 95 md_parts.append("===[MESSAGE]===") 96 if isinstance(msg, SystemMessage): 97 md_parts.append(f"**System**: {msg.content}") 98 elif isinstance(msg, HumanMessage): 99 md_parts.append(f"**Human**: {msg.content}") 100 elif isinstance(msg, AIMessage): 101 md_parts.append(f"**AI**: {msg.content}") 102 elif isinstance(msg, ToolMessage): 103 md_parts.append(f"**Tool** (id: {msg.tool_call_id})\n```{msg.content}```") 104 105 return "\n".join(md_parts) 106 107 def to_json(self) -> dict: 108 """Convert to JSON format.""" 109 def _make_jsonable(x): 110 if isinstance(x, dict): 111 return {k: _make_jsonable(v) for k, v in x.items()} 112 if isinstance(x, list): 113 return [_make_jsonable(v) for v in x] 114 if isinstance(x, set): 115 return [_make_jsonable(v) for v in x] 116 return x 117 118 msgs: list[dict[str, Any]] = [] 119 for msg in self.messages: 120 if isinstance(msg, ADKEvent): 121 raw_evt = msg.model_dump() 122 msgs.append({"adk_event": _make_jsonable(raw_evt)}) 123 else: 124 msgs.append({ 125 "type": type(msg).__name__, 126 "content": msg.content, 127 "tool_call_id": getattr(msg, "tool_call_id", None), 128 "additional_kwargs": getattr(msg, "additional_kwargs", None), 129 "tool_calls": getattr(msg, "tool_calls", None), 130 }) 131 132 out: dict[str, Any] = { 133 "history_id": self.history_id, 134 "created_datetime": self.created_datetime.isoformat(), 135 "metadata": self.metadata, 136 "messages": msgs, 137 "agent_status": self.agent_status.dict() if self.agent_status else None, 138 "json_md_path": self.json_md_path, 139 } 140 141 if self.adk_session: 142 raw_sess = self.adk_session.model_dump() 143 out["adk_session"] = _make_jsonable(raw_sess) 144 145 return out 146 147 148 @classmethod 149 def _load_history_file(cls, history_id: str) -> History: 150 """Load a saved history + ADK session from disk.""" 151 # Use HEAVEN_DATA_DIR for loading agent histories 152 base_path = os.path.join(EnvConfigUtil.get_heaven_data_dir(), "agents") 153 date_str = "_".join(history_id.split("_")[:3]) 154 155 for agent_dir in os.listdir(base_path): 156 fn = os.path.join(base_path, agent_dir, "memories", "histories", date_str, f"{history_id}.json") 157 if os.path.exists(fn): 158 with open(fn, "r") as f: 159 data = json.load(f) 160 hist = cls.from_json(data) 161 hist.json_md_path = os.path.dirname(fn) 162 # restore ADK session 163 if data.get("adk_session"): 164 hist.adk_session = ADKSession.model_validate(data["adk_session"]) 165 return hist 166 167 raise FileNotFoundError(f"No history file found for ID {history_id}") 168 169 @classmethod 170 def load_from_id(cls, history_id: str) -> History: 171 """Continue an existing history (or start fresh if None).""" 172 if history_id is None: 173 return cls(messages=[], history_id=str(uuid.uuid4())) 174 175 orig = cls._load_history_file(history_id) 176 # bump continuation suffix 177 if "_continued_" in history_id: 178 m = re.search(r"_continued_(\d+)$", history_id) 179 num = int(m.group(1)) + 1 if m else 1 180 new_id = history_id.split("_continued_")[0] + f"_continued_{num}" 181 else: 182 new_id = f"{history_id}_continued_1" 183 184 return History( 185 messages=orig.messages.copy(), 186 adk_session=orig.adk_session, 187 created_datetime=datetime.now(), 188 metadata=orig.metadata.copy(), 189 agent_status=orig.agent_status, 190 history_id=new_id, 191 json_md_path=orig.json_md_path, 192 ) 193 194 # def save(self, agent_name: str) -> str: 195 # """Persist both .json and .md to disk.""" 196 # base_dir = os.path.dirname(os.path.abspath(__file__)) 197 # base_path = os.path.join(os.path.dirname(base_dir), "agents") 198 # nm = normalize_agent_name(agent_name) 199 # now = datetime.now() 200 # date_dir = now.strftime("%Y_%m_%d") 201 # dt_str = now.strftime("%Y_%m_%d_%H_%M_%S") 202 # hist_id = f"{dt_str}_{nm}" 203 # path = os.path.join(base_path, nm, "memories", "histories", date_dir) 204 # os.makedirs(path, exist_ok=True) 205 206 # existing = [f for f in os.listdir(path) if f.startswith(hist_id)] 207 # if existing: 208 # nums = [int(f.split("_continued_")[1].split(".")[0]) for f in existing if "_continued_" in f] 209 # nxt = max(nums, default=0) + 1 210 # hist_id = f"{hist_id}_continued_{nxt}" 211 # self.history_id = hist_id 212 # json_fp = os.path.join(path, f"{hist_id}.json") 213 # md_fp = os.path.join(path, f"{hist_id}.md") 214 215 # with open(json_fp, "w") as f: 216 # json.dump(self.to_json(), f, indent=2) 217 # with open(md_fp, "w") as f: 218 # f.write(self.to_markdown()) 219 220 # return hist_id 221 def save(self, agent_name: str) -> str: 222 """Persist both .json and .md to disk, overwriting if history_id exists.""" 223 # Use HEAVEN_DATA_DIR for saving agent histories 224 base_path = os.path.join(EnvConfigUtil.get_heaven_data_dir(), "agents") 225 nm = normalize_agent_name(agent_name) 226 227 # If we already have a history_id, use it 228 if self.history_id: 229 # Extract date directory from existing history_id 230 date_str = "_".join(self.history_id.split("_")[:3]) 231 path = os.path.join(base_path, nm, "memories", "histories", date_str) 232 hist_id = self.history_id 233 else: 234 # Create new history_id for new conversation 235 now = datetime.now() 236 date_dir = now.strftime("%Y_%m_%d") 237 hist_id = f"{now.strftime('%Y_%m_%d_%H_%M_%S')}_{nm}" 238 path = os.path.join(base_path, nm, "memories", "histories", date_dir) 239 240 # Ensure directory exists 241 os.makedirs(path, exist_ok=True) 242 243 # Use existing or new history_id 244 self.history_id = hist_id 245 json_fp = os.path.join(path, f"{hist_id}.json") 246 md_fp = os.path.join(path, f"{hist_id}.md") 247 248 # Write files, overwriting existing ones 249 with open(json_fp, "w") as f: 250 json.dump(self.to_json(), f, indent=2) 251 with open(md_fp, "w") as f: 252 f.write(self.to_markdown()) 253 254 return hist_id 255 256 @classmethod 257 def from_json(cls, data: dict, project: str = None) -> History: 258 msgs: List[Union[BaseMessage, ADKEvent]] = [] 259 for m in data["messages"]: 260 if "adk_event" in m: 261 evt = ADKEvent.model_validate(m["adk_event"]) 262 msgs.append(evt) 263 else: 264 t = m["type"] 265 if t == "SystemMessage": 266 msgs.append(SystemMessage(content=m["content"])) 267 elif t == "HumanMessage": 268 msgs.append(HumanMessage(content=m["content"])) 269 elif t == "AIMessage": 270 msgs.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 271 elif t == "ToolMessage": 272 msgs.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 273 274 status = AgentStatus(**data["agent_status"]) if data.get("agent_status") else None 275 276 hist = cls( 277 messages=msgs, 278 created_datetime=datetime.fromisoformat(data["created_datetime"]), 279 metadata=data.get("metadata", {}), 280 project=project, 281 agent_status=status, 282 history_id=data.get("history_id"), 283 json_md_path=data.get("json_md_path"), 284 ) 285 286 # rehydrate session if present 287 if "adk_session" in data: 288 hist.adk_session = ADKSession.model_validate(data["adk_session"]) 289 290 return hist 291 292 293 @classmethod 294 def from_markdown(cls, md_content: str, project: str = None) -> "History": 295 print("Parsing markdown:", md_content) # Debug 296 """Parse markdown back into History""" 297 # Split into metadata and content sections 298 parts = md_content.split("===[CONTENT]===") 299 if len(parts) == 2: 300 metadata_section = parts[0].split("===[METADATA]===")[1].strip() 301 content = parts[1].strip() 302 metadata_lines = metadata_section.split("\n") 303 else: 304 metadata_lines = [] 305 content = md_content 306 307 # Parse metadata 308 metadata = {} 309 datetime_val = None 310 history_id = None 311 for line in metadata_lines: 312 if ":" in line: 313 key, value = line.split(":", 1) 314 key = key.strip() 315 value = value.strip() 316 if key == "datetime": 317 datetime_val = datetime.fromisoformat(value) 318 elif key == "history_id": 319 history_id = value 320 elif key == "json_md_path": # Add this condition 321 json_md_path = value 322 323 else: 324 metadata[key] = value 325 326 # Parse messages by splitting on message separator 327 messages = [] 328 message_blocks = content.split("===[MESSAGE]===") 329 330 for block in message_blocks: 331 block = block.strip() 332 if not block: # Skip empty blocks 333 continue 334 335 lines = block.split("\n") 336 current_type = None 337 current_lines = [] 338 339 # Special handling for tool messages 340 if block.startswith("**Tool**"): 341 # Extract tool ID 342 header = lines[0] 343 tool_id = header.split("id:", 1)[1].split(")", 1)[0].strip() 344 345 # Extract content between backticks 346 content_lines = [] 347 in_code_block = False 348 for line in lines[1:]: # Skip header 349 if line.strip() == "```": 350 in_code_block = not in_code_block 351 continue 352 if in_code_block: 353 content_lines.append(line) 354 355 content = "\n".join(content_lines) 356 messages.append(ToolMessage(content=content, tool_call_id=tool_id)) 357 continue 358 359 # Normal message handling 360 for line in lines: 361 if line.startswith("**") and "**:" in line: 362 current_type = line.split("**:", 1)[0].strip("*").strip() 363 current_lines = [line.split("**:", 1)[1].strip()] 364 else: 365 current_lines.append(line) 366 367 # Process accumulated message at end of block 368 if current_lines and current_type: 369 msg_content = "\n".join(current_lines).strip() 370 if msg_content: # Only add if there's actual content 371 if current_type == "System": 372 messages.append(SystemMessage(content=msg_content)) 373 elif current_type == "Human": 374 messages.append(HumanMessage(content=msg_content)) 375 elif current_type == "AI": 376 messages.append(AIMessage(content=msg_content)) 377 elif current_type.startswith("Tool"): 378 # Extract tool_call_id if present 379 tool_id = None 380 if "(id:" in current_type: 381 tool_id = current_type.split("id:", 1)[1].split(")", 1)[0].strip() 382 383 # Process tool message more carefully 384 try: 385 print(f"Raw tool message content: {msg_content}") # Debug 386 sys.stdout.flush() 387 lines = [line for line in msg_content.split("\n") if line.strip()] # Remove empty lines 388 print(f"Non-empty lines: {lines}") # Debug 389 390 # Extract content between ``` markers 391 content_start = lines.index("```") + 1 392 content_end = lines.index("```", content_start) 393 tool_content = "\n".join(lines[content_start:content_end]) 394 395 print(f"Extracted tool content: {tool_content}") # Debug 396 messages.append(ToolMessage(content=tool_content, tool_call_id=tool_id)) 397 except Exception as e: 398 print(f"Error processing tool message: {e}") # Debug 399 400 # Process blocks complete 401 402 return cls( 403 messages=messages, 404 created_datetime=datetime_val or datetime.now(), 405 project=project, 406 metadata=metadata, 407 history_id=history_id, 408 json_md_path=json_md_path 409 ) 410 411 412 @classmethod 413 def from_adk_session(cls, session: ADKSession) -> "History": 414 """ 415 Wrap an ADKSession without mutating history.messages at all. 416 """ 417 hist = cls( 418 messages=[], # untouched 419 history_id=None, # will be set on save() 420 created_datetime=datetime.fromtimestamp(session.last_update_time), 421 metadata={}, 422 agent_status=None, 423 ) 424 hist.adk_session = session 425 return hist 426 427 428 429 def to_adk_session( 430 self, 431 app_name: str, 432 user_id: str, 433 ) -> ADKSession: 434 """ 435 Return the exact same ADKSession we stored—state and events intact. 436 """ 437 if not hasattr(self, "adk_session") or self.adk_session is None: 438 raise RuntimeError("No ADK session attached to this History") 439 return self.adk_session 440 441 # def _compute_iterations(self) -> Dict[str, List[ADKEvent]]: 442 # """ 443 # Split events into “iterations”: chunks between user turns. 444 # """ 445 # iters: Dict[str, List[ADKEvent]] = {} 446 # current: List[ADKEvent] = [] 447 # idx = 0 448 # for ev in self.events: 449 # if ev.author == "user": 450 # if current: 451 # iters[f"iteration_{idx}"] = current 452 # idx += 1 453 # current = [] 454 # else: 455 # current.append(ev) 456 # if current: 457 # iters[f"iteration_{idx}"] = current 458 # return iters 459 460 # @property 461 # def iterations(self) -> Dict[str, List[ADKEvent]]: 462 # return self._compute_iterations() 463 464 465 def to_uni_messages(self) -> List[Dict[str, Any]]: 466 """Convert dictionary messages to uni-api format: dictionary → LangChain → uni-api""" 467 import json 468 469 # Convert dictionary messages (stored in self.messages) to LangChain objects first 470 msgs_as_dicts = self.to_json()["messages"] 471 472 langchain_messages = [] 473 for m in msgs_as_dicts: 474 if "adk_event" in m: 475 continue # Skip ADK events 476 else: 477 t = m["type"] 478 if t == "SystemMessage": 479 langchain_messages.append(SystemMessage(content=m["content"])) 480 elif t == "HumanMessage": 481 langchain_messages.append(HumanMessage(content=m["content"])) 482 elif t == "AIMessage": 483 langchain_messages.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 484 elif t == "ToolMessage": 485 langchain_messages.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 486 487 # Convert LangChain objects to uni-api format 488 uni_messages = [] 489 490 for msg in langchain_messages: 491 if isinstance(msg, SystemMessage): 492 uni_messages.append({"role": "system", "content": msg.content}) 493 494 elif isinstance(msg, HumanMessage): 495 uni_messages.append({"role": "user", "content": msg.content}) 496 497 elif isinstance(msg, AIMessage): 498 uni_msg = {"role": "assistant", "content": msg.content or ""} 499 500 # Preserve additional_kwargs 501 if msg.additional_kwargs: 502 uni_msg["additional_kwargs"] = msg.additional_kwargs 503 504 # Handle tool calls in additional_kwargs (OpenAI style) 505 if msg.additional_kwargs.get("tool_calls"): 506 uni_msg["tool_calls"] = msg.additional_kwargs["tool_calls"] 507 508 # Handle tool calls as direct attribute 509 elif hasattr(msg, 'tool_calls') and msg.tool_calls: 510 uni_msg["tool_calls"] = msg.tool_calls 511 512 # Handle Anthropic style tool_use in content 513 elif isinstance(msg.content, list): 514 tool_calls = [] 515 content_parts = [] 516 517 for item in msg.content: 518 if isinstance(item, dict) and item.get('type') == 'tool_use': 519 # Convert Anthropic format to OpenAI format 520 tool_calls.append({ 521 "id": item.get("id", ""), 522 "type": "function", 523 "function": { 524 "name": item.get("name", ""), 525 "arguments": json.dumps(item.get("input", {})) 526 } 527 }) 528 elif isinstance(item, dict) and item.get('type') == 'text': 529 content_parts.append(item.get('text', '')) 530 elif isinstance(item, str): 531 content_parts.append(item) 532 533 if tool_calls: 534 uni_msg["tool_calls"] = tool_calls 535 if content_parts: 536 uni_msg["content"] = ' '.join(content_parts) 537 538 uni_messages.append(uni_msg) 539 540 elif isinstance(msg, ToolMessage): 541 uni_messages.append({ 542 "role": "tool", 543 "tool_call_id": msg.tool_call_id, 544 "content": msg.content 545 }) 546 547 return uni_messages 548 549 @classmethod 550 def from_uni_messages( 551 cls, 552 uni_messages: List[Dict[str, Any]], 553 history_id: Optional[str] = None, 554 metadata: Optional[Dict[str, Any]] = None 555 ) -> "History": 556 """Create History from uni-api message format: uni-api → LangChain → dictionary""" 557 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 558 559 # First: uni-api → LangChain (original logic) 560 messages = [] 561 562 for uni_msg in uni_messages: 563 role = uni_msg["role"] 564 content = uni_msg.get("content", "") 565 566 if role == "system": 567 messages.append(SystemMessage(content=content)) 568 569 elif role == "user": 570 messages.append(HumanMessage(content=content)) 571 572 elif role == "assistant": 573 # Restore additional_kwargs from uni-api format 574 additional_kwargs = uni_msg.get("additional_kwargs", {}) 575 576 if uni_msg.get("tool_calls"): 577 # Transform OpenAI format to LangChain format 578 langchain_tool_calls = [] 579 for tc in uni_msg["tool_calls"]: 580 if "function" in tc: 581 # Convert OpenAI format to LangChain format 582 langchain_tool_calls.append({ 583 "id": tc["id"], 584 "name": tc["function"]["name"], 585 "args": json.loads(tc["function"]["arguments"]), 586 "type": "tool_call" 587 }) 588 else: 589 # Already in LangChain format 590 langchain_tool_calls.append(tc) 591 592 # Merge tool_calls into additional_kwargs 593 additional_kwargs["tool_calls"] = uni_msg["tool_calls"] 594 messages.append(AIMessage( 595 content=content, 596 additional_kwargs=additional_kwargs, 597 tool_calls=langchain_tool_calls # Use converted format 598 )) 599 else: 600 messages.append(AIMessage(content=content, additional_kwargs=additional_kwargs)) 601 602 elif role == "tool": 603 messages.append(ToolMessage( 604 content=content, 605 tool_call_id=uni_msg["tool_call_id"] 606 )) 607 608 # Convert LangChain messages to dictionary format for storage 609 message_dicts = [] 610 for msg in messages: 611 if isinstance(msg, SystemMessage): 612 message_dicts.append({ 613 "type": "SystemMessage", 614 "content": msg.content, 615 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 616 }) 617 elif isinstance(msg, HumanMessage): 618 message_dicts.append({ 619 "type": "HumanMessage", 620 "content": msg.content, 621 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 622 }) 623 elif isinstance(msg, AIMessage): 624 message_dicts.append({ 625 "type": "AIMessage", 626 "content": msg.content, 627 "additional_kwargs": getattr(msg, 'additional_kwargs', {}), 628 "tool_calls": getattr(msg, 'tool_calls', None) 629 }) 630 elif isinstance(msg, ToolMessage): 631 message_dicts.append({ 632 "type": "ToolMessage", 633 "content": msg.content, 634 "tool_call_id": getattr(msg, 'tool_call_id', None) 635 }) 636 637 # Create History using from_json to properly convert dictionaries to LangChain objects 638 json_data = { 639 "messages": message_dicts, 640 "history_id": history_id, 641 "metadata": metadata or {}, 642 "created_datetime": datetime.now().isoformat(), 643 "agent_status": None, 644 "json_md_path": None 645 } 646 return cls.from_json(json_data) 647 648 def save_with_uni_context(self, agent_name: str, uni_api_used: bool = False) -> str: 649 """ 650 Enhanced save method that adds uni-api context to metadata. 651 This is optional - the regular save() method will work fine too. 652 """ 653 if uni_api_used: 654 self.metadata["uni_api_used"] = True 655 self.metadata["provider_unified"] = True 656 657 return self.save(agent_name) 658 659 def get_last_n_uni_messages(self, n: int) -> List[Dict[str, Any]]: 660 """Get last N messages in uni-api format""" 661 uni_messages = self.to_uni_messages() 662 return uni_messages[-n:] if len(uni_messages) >= n else uni_messages 663 664 def append_uni_messages(self, uni_messages: List[Dict[str, Any]]): 665 """Append uni-api messages to history after converting to LangChain format""" 666 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 667 668 for uni_msg in uni_messages: 669 role = uni_msg["role"] 670 content = uni_msg.get("content", "") 671 672 if role == "system": 673 self.messages.append(SystemMessage(content=content)) 674 675 elif role == "user": 676 self.messages.append(HumanMessage(content=content)) 677 678 elif role == "assistant": 679 if uni_msg.get("tool_calls"): 680 self.messages.append(AIMessage( 681 content=content, 682 additional_kwargs={"tool_calls": uni_msg["tool_calls"]} 683 )) 684 else: 685 self.messages.append(AIMessage(content=content)) 686 687 elif role == "tool": 688 self.messages.append(ToolMessage( 689 content=content, 690 tool_call_id=uni_msg["tool_call_id"] 691 )) 692 693 694# def get_iteration_view( 695# history: History, 696# start: int, 697# end: int 698# ) -> Dict[str, Any]: 699# """ 700# Return a dict containing: 701# • history_id 702# • total_iterations 703# • view_range: {'start', 'end'} 704# • view: sub-dict iteration_i → List[ADKEvent] 705# """ 706# if start > end: 707# raise ValueError(f"start ({start}) must be <= end ({end})") 708# all_iters = history.iterations 709# total = len(all_iters) 710 711# view: Dict[str, List[ADKEvent]] = {} 712# for i in range(start, end + 1): 713# key = f"iteration_{i}" 714# if key not in all_iters: 715# raise KeyError(f"Missing iteration: {key}") 716# view[key] = all_iters[key] 717 718# return { 719# "history_id": history.history_id, 720# "total_iterations": total, 721# "view_range": {"start": start, "end": end}, 722# "view": view, 723# } 724 725 726# Should add utils like this 727# def get_last_tool_msg(history) -> Optional[str]: 728# """ 729# Scan history.messages to find the most recent tool use and return its ToolMessage.content. 730 731# Args: 732# history: The agent's history object with a .messages attribute. 733 734# Returns: 735# Optional[str]: The content of the last ToolMessage, or None if none found. 736# """ 737# for i, msg in enumerate(history.messages): 738# if isinstance(msg, AIMessage) and isinstance(msg.content, list): 739# for item in msg.content: 740# if isinstance(item, dict) and item.get('type') == 'tool_use': 741# # Next message should be the tool result 742# if i + 1 < len(history.messages) and isinstance(history.messages[i + 1], ToolMessage): 743# return history.messages[i + 1].content 744# return None 745 746 747 748 749# class History(BasePiece): 750# """Single conversation with metadata""" 751# # messages: List[BaseMessage] # old 752# messages: List[Union[BaseMessage, ADKEvent]] 753# history_id: Optional[str] = None 754# # Using created_datetime from BasePiece 755# metadata: Dict = {} 756# agent_status: Optional[AgentStatus] = None 757# json_md_path: Optional[str] = None 758# adk_session: Optional[ADKSession] = None 759 760# def to_markdown(self) -> str: 761# """Convert to markdown with metadata header""" 762# md_parts = [] 763 764# # Add metadata header with unique separator 765# md_parts.append("===[METADATA]===") 766# md_parts.append(f"datetime: {self.created_datetime.isoformat()}") 767# md_parts.append(f"history_id: {self.history_id}") 768# md_parts.append(f"json_md_path: {self.json_md_path}") 769# for k, v in self.metadata.items(): 770# md_parts.append(f"{k}: {v}") 771# md_parts.append("===[CONTENT]===\n") 772 773# # Add messages 774# for msg in self.messages: 775# # This needs work 776# if isinstance(msg, ADKEvent): 777# md_parts.append(f"**[ADK Event {msg.id}]** {msg.actions=}, …") # however you’d like to render it 778# else: 779# # Add a separator between messages 780# md_parts.append("===[MESSAGE]===") 781 782# if isinstance(msg, SystemMessage): 783# md_parts.append(f"**System**: {msg.content}") 784# elif isinstance(msg, HumanMessage): 785# md_parts.append(f"**Human**: {msg.content}") 786# elif isinstance(msg, AIMessage): 787# md_parts.append(f"**AI**: {msg.content}") 788# elif isinstance(msg, ToolMessage): 789# # Special format for tool messages to make parsing easier 790# tool_msg = [ 791# "**Tool** (id: {tool_id})".format(tool_id=msg.tool_call_id), 792# "```", 793# msg.content, 794# "```" 795# ] 796# md_parts.append("\n".join(tool_msg)) 797 798# return "\n\n".join(md_parts) 799 800# def to_json(self) -> dict: 801# """Convert to JSON format""" 802# msgs = [] 803# for msg in self.messages: 804# if isinstance(msg, ADKEvent): 805# # ADK Event knows how to JSON‐serialize itself 806# # dump to dict and convert any sets -> lists 807# raw = msg.model_dump() 808# def _make_jsonable(x): 809# if isinstance(x, dict): 810# return {k: _make_jsonable(v) for k,v in x.items()} 811# if isinstance(x, list): 812# return [_make_jsonable(v) for v in x] 813# if isinstance(x, set): 814# return [_make_jsonable(v) for v in x] 815# return x 816# clean = _make_jsonable(raw) 817# msgs.append({"adk_event": clean}) 818# else: 819# # existing BaseMessage branch 820# msgs.append({ 821# "type": type(msg).__name__, 822# "content": msg.content, 823# "tool_call_id": getattr(msg, "tool_call_id", None), 824# "additional_kwargs": getattr(msg, "additional_kwargs", None), 825# }) 826 827# return { 828# "history_id": self.history_id, 829# "created_datetime": self.created_datetime.isoformat(), 830# "metadata": self.metadata, 831# "messages": msgs, 832# "agent_status": self.agent_status.dict() if self.agent_status else None, 833# "json_md_path": self.json_md_path 834# } 835 836 837# @classmethod 838# def _load_history_file(cls, history_id: str) -> 'History': 839# """Load history file from disk""" 840# base_dir = os.path.dirname(os.path.abspath(__file__)) 841# base_path = os.path.join(os.path.dirname(base_dir), "agents") 842 843# if not os.path.exists(base_path): 844# raise FileNotFoundError(f"Agents directory not found at {base_path}") 845 846# # Get date from history_id for narrowing search 847# date_str = '_'.join(history_id.split('_')[:3]) 848 849# # Search for the history file in all agent directories 850# for agent_dir in os.listdir(base_path): 851# history_path = os.path.join(base_path, agent_dir, "memories", "histories", date_str, f"{history_id}.json") 852# print(f"Checking path: {history_path}") # Debug print 853# if os.path.exists(history_path): 854# print(f"Found history file at: {history_path}") # Debug print 855# with open(history_path, 'r') as f: 856# history_data = json.load(f) 857# history = cls.from_json(history_data) 858# history.json_md_path = os.path.dirname(history_path) 859# history.adk_session = ADKSession.model_validate(history_data["adk_session"]) 860 861# return history 862 863# raise FileNotFoundError(f"No history file found for ID {history_id} in any agent directory") 864 865# @classmethod 866# def load_from_id(cls, history_id: str) -> 'History': 867# """Load history from ID with proper continuation handling""" 868# if history_id is None: 869# # Brand new conversation 870# return cls(messages=[], history_id=str(uuid.uuid4())) 871 872# try: 873# # Load existing history 874# original = cls._load_history_file(history_id) 875 876# # Create continuation ID 877# if "_continued_" in history_id: 878# pattern = r"_continued_(\d+)$" 879# match = re.search(pattern, history_id) 880# if match: 881# current_num = int(match.group(1)) 882# base_id = history_id.split("_continued_")[0] 883# new_history_id = f"{base_id}_continued_{current_num + 1}" 884# else: 885# raise ValueError(f"Invalid continuation ID format: {history_id}") 886# else: 887# new_history_id = f"{history_id}_continued_1" 888# # Create a cleaned copy of agent_status 889# clean_agent_status = None 890# if original.agent_status: 891# # Create a copy by converting to dict and back 892# agent_status_dict = original.agent_status.dict() 893 894# # Clean any block reports from extracted_content 895# if 'extracted_content' in agent_status_dict and agent_status_dict['extracted_content']: 896# if 'block_report' in agent_status_dict['extracted_content']: 897# del agent_status_dict['extracted_content']['block_report'] 898 899# # Create new AgentStatus with cleaned data 900# clean_agent_status = AgentStatus(**agent_status_dict) 901 902# # Create new history with same messages but new ID 903# return cls( 904# messages=original.messages.copy(), 905# created_datetime=datetime.now(), 906# metadata=original.metadata.copy(), 907# history_id=new_history_id, 908# agent_status=clean_agent_status, 909# json_md_path=original.json_md_path 910# ) 911# except FileNotFoundError: 912# raise ValueError(f"No history found with ID {history_id}") 913 914# def save(self, agent_name: str): 915# # Get base path 916# base_dir = os.path.dirname(os.path.abspath(__file__)) 917# base_path = os.path.join(os.path.dirname(base_dir), "agents") 918 919# # Normalize agent name to match AgentMakerTool's format 920# normalized_agent_name = normalize_agent_name(agent_name) # Using our new function 921 922# # Generate history_id based on datetime and agent name 923# now = datetime.now() 924# date_str = now.strftime('%Y_%m_%d') # For directory 925# datetime_str = now.strftime('%Y_%m_%d_%H_%M_%S') # For file name 926# history_id = f"{datetime_str}_{normalized_agent_name}" 927 928# # Create path including date directory 929# path = os.path.join(base_path, normalized_agent_name, "memories", "histories", date_str) 930# self.json_md_path = path 931# os.makedirs(path, exist_ok=True) 932 933# # Look for existing files with same base name 934# existing_files = [f for f in os.listdir(path) if f.startswith(history_id)] 935# if existing_files: 936# # If files exist, this is a continuation 937# max_cont = 0 938# for f in existing_files: 939# if "_continued_" in f: 940# cont_num = int(f.split("_continued_")[1].split(".")[0]) 941# max_cont = max(max_cont, cont_num) 942# history_id = f"{history_id}_continued_{max_cont + 1}" 943# self.history_id = history_id 944# # Save both JSON and Markdown versions 945# json_filepath = os.path.join(path, f"{history_id}.json") 946# md_filepath = os.path.join(path, f"{history_id}.md") 947 948# # Save JSON 949# with open(json_filepath, 'w') as f: 950# json.dump(self.to_json(), f) 951 952# # Save Markdown 953# with open(md_filepath, 'w') as f: 954# f.write(self.to_markdown()) 955 956# return history_id 957 958# @classmethod 959# def from_json(cls, data: dict, project: str = None) -> "History": 960# """Create History from JSON""" 961# messages = [] 962# for msg in data["messages"]: 963# if msg["type"] == "SystemMessage": 964# messages.append(SystemMessage(content=msg["content"])) 965# elif msg["type"] == "HumanMessage": 966# messages.append(HumanMessage(content=msg["content"])) 967# elif msg["type"] == "AIMessage": 968# messages.append(AIMessage( 969# content=msg["content"], 970# additional_kwargs=msg["additional_kwargs"] 971# )) 972# elif msg["type"] == "ToolMessage": 973# messages.append(ToolMessage( 974# content=msg["content"], 975# tool_call_id=msg["tool_call_id"] 976# )) 977 978# # Handle status if it exists 979# agent_status = None 980# if data.get("agent_status"): 981# agent_status = AgentStatus(**data["agent_status"]) 982 983# return cls( 984# messages=messages, 985# created_datetime=datetime.fromisoformat(data["created_datetime"]), 986# metadata=data["metadata"], 987# project=project, 988# agent_status=agent_status, 989# history_id=data.get("history_id"), 990# json_md_path=data.get("json_md_path") 991# ) 992 993# @classmethod 994# def from_markdown(cls, md_content: str, project: str = None) -> "History": 995# print("Parsing markdown:", md_content) # Debug 996# """Parse markdown back into History""" 997# # Split into metadata and content sections 998# parts = md_content.split("===[CONTENT]===") 999# if len(parts) == 2: 1000# metadata_section = parts[0].split("===[METADATA]===")[1].strip() 1001# content = parts[1].strip() 1002# metadata_lines = metadata_section.split("\n") 1003# else: 1004# metadata_lines = [] 1005# content = md_content 1006 1007# # Parse metadata 1008# metadata = {} 1009# datetime_val = None 1010# history_id = None 1011# for line in metadata_lines: 1012# if ":" in line: 1013# key, value = line.split(":", 1) 1014# key = key.strip() 1015# value = value.strip() 1016# if key == "datetime": 1017# datetime_val = datetime.fromisoformat(value) 1018# elif key == "history_id": 1019# history_id = value 1020# elif key == "json_md_path": # Add this condition 1021# json_md_path = value 1022 1023# else: 1024# metadata[key] = value 1025 1026# # Parse messages by splitting on message separator 1027# messages = [] 1028# message_blocks = content.split("===[MESSAGE]===") 1029 1030# for block in message_blocks: 1031# block = block.strip() 1032# if not block: # Skip empty blocks 1033# continue 1034 1035# lines = block.split("\n") 1036# current_type = None 1037# current_lines = [] 1038 1039# # Special handling for tool messages 1040# if block.startswith("**Tool**"): 1041# # Extract tool ID 1042# header = lines[0] 1043# tool_id = header.split("id:", 1)[1].split(")", 1)[0].strip() 1044 1045# # Extract content between backticks 1046# content_lines = [] 1047# in_code_block = False 1048# for line in lines[1:]: # Skip header 1049# if line.strip() == "```": 1050# in_code_block = not in_code_block 1051# continue 1052# if in_code_block: 1053# content_lines.append(line) 1054 1055# content = "\n".join(content_lines) 1056# messages.append(ToolMessage(content=content, tool_call_id=tool_id)) 1057# continue 1058 1059# # Normal message handling 1060# for line in lines: 1061# if line.startswith("**") and "**:" in line: 1062# current_type = line.split("**:", 1)[0].strip("*").strip() 1063# current_lines = [line.split("**:", 1)[1].strip()] 1064# else: 1065# current_lines.append(line) 1066 1067# # Process accumulated message at end of block 1068# if current_lines and current_type: 1069# msg_content = "\n".join(current_lines).strip() 1070# if msg_content: # Only add if there's actual content 1071# if current_type == "System": 1072# messages.append(SystemMessage(content=msg_content)) 1073# elif current_type == "Human": 1074# messages.append(HumanMessage(content=msg_content)) 1075# elif current_type == "AI": 1076# messages.append(AIMessage(content=msg_content)) 1077# elif current_type.startswith("Tool"): 1078# # Extract tool_call_id if present 1079# tool_id = None 1080# if "(id:" in current_type: 1081# tool_id = current_type.split("id:", 1)[1].split(")", 1)[0].strip() 1082 1083# # Process tool message more carefully 1084# try: 1085# print(f"Raw tool message content: {msg_content}") # Debug 1086# sys.stdout.flush() 1087# lines = [line for line in msg_content.split("\n") if line.strip()] # Remove empty lines 1088# print(f"Non-empty lines: {lines}") # Debug 1089 1090# # Extract content between ``` markers 1091# content_start = lines.index("```") + 1 1092# content_end = lines.index("```", content_start) 1093# tool_content = "\n".join(lines[content_start:content_end]) 1094 1095# print(f"Extracted tool content: {tool_content}") # Debug 1096# messages.append(ToolMessage(content=tool_content, tool_call_id=tool_id)) 1097# except Exception as e: 1098# print(f"Error processing tool message: {e}") # Debug 1099 1100# # Process blocks complete 1101 1102# return cls( 1103# messages=messages, 1104# created_datetime=datetime_val or datetime.now(), 1105# project=project, 1106# metadata=metadata, 1107# history_id=history_id, 1108# json_md_path=json_md_path 1109# ) 1110 1111# @classmethod 1112# def from_adk_session(cls, session: ADKSession) -> "History": 1113# return cls( 1114# messages=session.events, # now a list of ADKEvent 1115# history_id=session.id, 1116# created_datetime=datetime.fromtimestamp(session.last_update_time), 1117# metadata={}, # or deserialize session.state if you like 1118# # agent_status left None or reconstructed from state 1119# ) 1120 1121# def to_adk_session( 1122# self, 1123# app_name: str, 1124# user_id: str, 1125# ) -> ADKSession: 1126# return ADKSession( 1127# id=self.history_id or "", 1128# app_name=app_name, 1129# user_id=user_id, 1130# state={}, # serialize .agent_status or metadata if needed 1131# events=[m for m in self.messages if isinstance(m, ADKEvent)], 1132# last_update_time=self.created_datetime.timestamp(), 1133# ) 1134# # Old 1135# # def _compute_iterations(self) -> Dict[str, List[BaseMessage]]: 1136# # """ 1137# # Split self.messages into “iterations”: 1138# # each iteration is the sequence of messages *between* two human turns. 1139# # HumanMessages themselves are not included. 1140# # """ 1141# # iterations: Dict[str, List[BaseMessage]] = {} 1142# # current: List[BaseMessage] = [] 1143# # idx = 0 1144 1145# # for msg in self.messages: 1146# # if isinstance(msg, HumanMessage): 1147# # if current: 1148# # iterations[f"iteration_{idx}"] = current 1149# # idx += 1 1150# # current = [] 1151# # else: 1152# # current.append(msg) 1153 1154# # if current: 1155# # iterations[f"iteration_{idx}"] = current 1156 1157# # return iterations 1158 # unsure if this handles ADK correctly 1159 def _compute_iterations(self) -> Dict[str, List[Union[BaseMessage, ADKEvent]]]: 1160 """ 1161 Split self.messages into “iterations”: 1162 each iteration is the sequence of messages *between* two user turns. 1163 HumanMessage and ADKEvent(author="user") both start a new iteration. 1164 """ 1165 iterations: Dict[str, List[Union[BaseMessage, ADKEvent]]] = {} 1166 current: List[Union[BaseMessage, ADKEvent]] = [] 1167 idx = 0 1168 1169 for msg in self.messages: 1170 is_user = ( 1171 isinstance(msg, HumanMessage) 1172 or (isinstance(msg, ADKEvent) and getattr(msg, "author", None) == "user") 1173 ) 1174 if is_user: 1175 if current: 1176 iterations[f"iteration_{idx}"] = current 1177 idx += 1 1178 current = [] 1179 else: 1180 current.append(msg) 1181 1182 if current: 1183 iterations[f"iteration_{idx}"] = current 1184 1185 return iterations 1186 1187 @property 1188 def iterations(self) -> Dict[str, List[Union[BaseMessage, ADKEvent]]]: # changed return annotation 1189 """ 1190 Returns a dict of all “iterations” in this history, 1191 where each iteration is the sequence of non‑HumanMessage 1192 messages between two human turns. 1193 """ 1194 return self._compute_iterations()
Single conversation with metadata, backed by an ADK Session.
59 @property 60 def events(self) -> List[ADKEvent]: 61 """All ADK events in this history (or fallback to any stored `messages`).""" 62 if self.adk_session: 63 return self.adk_session.events 64 return [m for m in self.messages if isinstance(m, ADKEvent)]
All ADK events in this history (or fallback to any stored messages).
66 def to_markdown(self) -> str: 67 """Convert to markdown with metadata and a human-readable dump of ADK events.""" 68 md_parts: List[str] = [] 69 md_parts.append("===[METADATA]===") 70 md_parts.append(f"datetime: {self.created_datetime.isoformat()}") 71 md_parts.append(f"history_id: {self.history_id}") 72 md_parts.append(f"json_md_path: {self.json_md_path}") 73 for k, v in self.metadata.items(): 74 md_parts.append(f"{k}: {v}") 75 md_parts.append("===[CONTENT]===\n") 76 77 # If we have an ADK session, render its events 78 for ev in self.events: 79 md_parts.append(f"--- [ADK Event] id={ev.id}, author={ev.author}, timestamp={datetime.fromtimestamp(ev.timestamp)}") 80 # Render any function calls/responses/text 81 for part in getattr(ev.content, "parts", []): 82 if getattr(part, "function_call", None): 83 fc = part.function_call 84 md_parts.append(f"*Function Call*: {fc.name}({fc.args})") 85 if getattr(part, "function_response", None): 86 fr = part.function_response 87 md_parts.append(f"*Function Response*: {fr.response}") 88 if getattr(part, "text", None): 89 md_parts.append(f"{part.text}") 90 md_parts.append("") # blank line between events 91 92 # Fallback: if no ADK session, render legacy BaseMessages 93 if not self.adk_session: 94 for msg in self.messages: 95 md_parts.append("===[MESSAGE]===") 96 if isinstance(msg, SystemMessage): 97 md_parts.append(f"**System**: {msg.content}") 98 elif isinstance(msg, HumanMessage): 99 md_parts.append(f"**Human**: {msg.content}") 100 elif isinstance(msg, AIMessage): 101 md_parts.append(f"**AI**: {msg.content}") 102 elif isinstance(msg, ToolMessage): 103 md_parts.append(f"**Tool** (id: {msg.tool_call_id})\n```{msg.content}```") 104 105 return "\n".join(md_parts)
Convert to markdown with metadata and a human-readable dump of ADK events.
107 def to_json(self) -> dict: 108 """Convert to JSON format.""" 109 def _make_jsonable(x): 110 if isinstance(x, dict): 111 return {k: _make_jsonable(v) for k, v in x.items()} 112 if isinstance(x, list): 113 return [_make_jsonable(v) for v in x] 114 if isinstance(x, set): 115 return [_make_jsonable(v) for v in x] 116 return x 117 118 msgs: list[dict[str, Any]] = [] 119 for msg in self.messages: 120 if isinstance(msg, ADKEvent): 121 raw_evt = msg.model_dump() 122 msgs.append({"adk_event": _make_jsonable(raw_evt)}) 123 else: 124 msgs.append({ 125 "type": type(msg).__name__, 126 "content": msg.content, 127 "tool_call_id": getattr(msg, "tool_call_id", None), 128 "additional_kwargs": getattr(msg, "additional_kwargs", None), 129 "tool_calls": getattr(msg, "tool_calls", None), 130 }) 131 132 out: dict[str, Any] = { 133 "history_id": self.history_id, 134 "created_datetime": self.created_datetime.isoformat(), 135 "metadata": self.metadata, 136 "messages": msgs, 137 "agent_status": self.agent_status.dict() if self.agent_status else None, 138 "json_md_path": self.json_md_path, 139 } 140 141 if self.adk_session: 142 raw_sess = self.adk_session.model_dump() 143 out["adk_session"] = _make_jsonable(raw_sess) 144 145 return out
Convert to JSON format.
169 @classmethod 170 def load_from_id(cls, history_id: str) -> History: 171 """Continue an existing history (or start fresh if None).""" 172 if history_id is None: 173 return cls(messages=[], history_id=str(uuid.uuid4())) 174 175 orig = cls._load_history_file(history_id) 176 # bump continuation suffix 177 if "_continued_" in history_id: 178 m = re.search(r"_continued_(\d+)$", history_id) 179 num = int(m.group(1)) + 1 if m else 1 180 new_id = history_id.split("_continued_")[0] + f"_continued_{num}" 181 else: 182 new_id = f"{history_id}_continued_1" 183 184 return History( 185 messages=orig.messages.copy(), 186 adk_session=orig.adk_session, 187 created_datetime=datetime.now(), 188 metadata=orig.metadata.copy(), 189 agent_status=orig.agent_status, 190 history_id=new_id, 191 json_md_path=orig.json_md_path, 192 )
Continue an existing history (or start fresh if None).
221 def save(self, agent_name: str) -> str: 222 """Persist both .json and .md to disk, overwriting if history_id exists.""" 223 # Use HEAVEN_DATA_DIR for saving agent histories 224 base_path = os.path.join(EnvConfigUtil.get_heaven_data_dir(), "agents") 225 nm = normalize_agent_name(agent_name) 226 227 # If we already have a history_id, use it 228 if self.history_id: 229 # Extract date directory from existing history_id 230 date_str = "_".join(self.history_id.split("_")[:3]) 231 path = os.path.join(base_path, nm, "memories", "histories", date_str) 232 hist_id = self.history_id 233 else: 234 # Create new history_id for new conversation 235 now = datetime.now() 236 date_dir = now.strftime("%Y_%m_%d") 237 hist_id = f"{now.strftime('%Y_%m_%d_%H_%M_%S')}_{nm}" 238 path = os.path.join(base_path, nm, "memories", "histories", date_dir) 239 240 # Ensure directory exists 241 os.makedirs(path, exist_ok=True) 242 243 # Use existing or new history_id 244 self.history_id = hist_id 245 json_fp = os.path.join(path, f"{hist_id}.json") 246 md_fp = os.path.join(path, f"{hist_id}.md") 247 248 # Write files, overwriting existing ones 249 with open(json_fp, "w") as f: 250 json.dump(self.to_json(), f, indent=2) 251 with open(md_fp, "w") as f: 252 f.write(self.to_markdown()) 253 254 return hist_id
Persist both .json and .md to disk, overwriting if history_id exists.
256 @classmethod 257 def from_json(cls, data: dict, project: str = None) -> History: 258 msgs: List[Union[BaseMessage, ADKEvent]] = [] 259 for m in data["messages"]: 260 if "adk_event" in m: 261 evt = ADKEvent.model_validate(m["adk_event"]) 262 msgs.append(evt) 263 else: 264 t = m["type"] 265 if t == "SystemMessage": 266 msgs.append(SystemMessage(content=m["content"])) 267 elif t == "HumanMessage": 268 msgs.append(HumanMessage(content=m["content"])) 269 elif t == "AIMessage": 270 msgs.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 271 elif t == "ToolMessage": 272 msgs.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 273 274 status = AgentStatus(**data["agent_status"]) if data.get("agent_status") else None 275 276 hist = cls( 277 messages=msgs, 278 created_datetime=datetime.fromisoformat(data["created_datetime"]), 279 metadata=data.get("metadata", {}), 280 project=project, 281 agent_status=status, 282 history_id=data.get("history_id"), 283 json_md_path=data.get("json_md_path"), 284 ) 285 286 # rehydrate session if present 287 if "adk_session" in data: 288 hist.adk_session = ADKSession.model_validate(data["adk_session"]) 289 290 return hist
293 @classmethod 294 def from_markdown(cls, md_content: str, project: str = None) -> "History": 295 print("Parsing markdown:", md_content) # Debug 296 """Parse markdown back into History""" 297 # Split into metadata and content sections 298 parts = md_content.split("===[CONTENT]===") 299 if len(parts) == 2: 300 metadata_section = parts[0].split("===[METADATA]===")[1].strip() 301 content = parts[1].strip() 302 metadata_lines = metadata_section.split("\n") 303 else: 304 metadata_lines = [] 305 content = md_content 306 307 # Parse metadata 308 metadata = {} 309 datetime_val = None 310 history_id = None 311 for line in metadata_lines: 312 if ":" in line: 313 key, value = line.split(":", 1) 314 key = key.strip() 315 value = value.strip() 316 if key == "datetime": 317 datetime_val = datetime.fromisoformat(value) 318 elif key == "history_id": 319 history_id = value 320 elif key == "json_md_path": # Add this condition 321 json_md_path = value 322 323 else: 324 metadata[key] = value 325 326 # Parse messages by splitting on message separator 327 messages = [] 328 message_blocks = content.split("===[MESSAGE]===") 329 330 for block in message_blocks: 331 block = block.strip() 332 if not block: # Skip empty blocks 333 continue 334 335 lines = block.split("\n") 336 current_type = None 337 current_lines = [] 338 339 # Special handling for tool messages 340 if block.startswith("**Tool**"): 341 # Extract tool ID 342 header = lines[0] 343 tool_id = header.split("id:", 1)[1].split(")", 1)[0].strip() 344 345 # Extract content between backticks 346 content_lines = [] 347 in_code_block = False 348 for line in lines[1:]: # Skip header 349 if line.strip() == "```": 350 in_code_block = not in_code_block 351 continue 352 if in_code_block: 353 content_lines.append(line) 354 355 content = "\n".join(content_lines) 356 messages.append(ToolMessage(content=content, tool_call_id=tool_id)) 357 continue 358 359 # Normal message handling 360 for line in lines: 361 if line.startswith("**") and "**:" in line: 362 current_type = line.split("**:", 1)[0].strip("*").strip() 363 current_lines = [line.split("**:", 1)[1].strip()] 364 else: 365 current_lines.append(line) 366 367 # Process accumulated message at end of block 368 if current_lines and current_type: 369 msg_content = "\n".join(current_lines).strip() 370 if msg_content: # Only add if there's actual content 371 if current_type == "System": 372 messages.append(SystemMessage(content=msg_content)) 373 elif current_type == "Human": 374 messages.append(HumanMessage(content=msg_content)) 375 elif current_type == "AI": 376 messages.append(AIMessage(content=msg_content)) 377 elif current_type.startswith("Tool"): 378 # Extract tool_call_id if present 379 tool_id = None 380 if "(id:" in current_type: 381 tool_id = current_type.split("id:", 1)[1].split(")", 1)[0].strip() 382 383 # Process tool message more carefully 384 try: 385 print(f"Raw tool message content: {msg_content}") # Debug 386 sys.stdout.flush() 387 lines = [line for line in msg_content.split("\n") if line.strip()] # Remove empty lines 388 print(f"Non-empty lines: {lines}") # Debug 389 390 # Extract content between ``` markers 391 content_start = lines.index("```") + 1 392 content_end = lines.index("```", content_start) 393 tool_content = "\n".join(lines[content_start:content_end]) 394 395 print(f"Extracted tool content: {tool_content}") # Debug 396 messages.append(ToolMessage(content=tool_content, tool_call_id=tool_id)) 397 except Exception as e: 398 print(f"Error processing tool message: {e}") # Debug 399 400 # Process blocks complete 401 402 return cls( 403 messages=messages, 404 created_datetime=datetime_val or datetime.now(), 405 project=project, 406 metadata=metadata, 407 history_id=history_id, 408 json_md_path=json_md_path 409 )
412 @classmethod 413 def from_adk_session(cls, session: ADKSession) -> "History": 414 """ 415 Wrap an ADKSession without mutating history.messages at all. 416 """ 417 hist = cls( 418 messages=[], # untouched 419 history_id=None, # will be set on save() 420 created_datetime=datetime.fromtimestamp(session.last_update_time), 421 metadata={}, 422 agent_status=None, 423 ) 424 hist.adk_session = session 425 return hist
Wrap an ADKSession without mutating history.messages at all.
429 def to_adk_session( 430 self, 431 app_name: str, 432 user_id: str, 433 ) -> ADKSession: 434 """ 435 Return the exact same ADKSession we stored—state and events intact. 436 """ 437 if not hasattr(self, "adk_session") or self.adk_session is None: 438 raise RuntimeError("No ADK session attached to this History") 439 return self.adk_session
Return the exact same ADKSession we stored—state and events intact.
465 def to_uni_messages(self) -> List[Dict[str, Any]]: 466 """Convert dictionary messages to uni-api format: dictionary → LangChain → uni-api""" 467 import json 468 469 # Convert dictionary messages (stored in self.messages) to LangChain objects first 470 msgs_as_dicts = self.to_json()["messages"] 471 472 langchain_messages = [] 473 for m in msgs_as_dicts: 474 if "adk_event" in m: 475 continue # Skip ADK events 476 else: 477 t = m["type"] 478 if t == "SystemMessage": 479 langchain_messages.append(SystemMessage(content=m["content"])) 480 elif t == "HumanMessage": 481 langchain_messages.append(HumanMessage(content=m["content"])) 482 elif t == "AIMessage": 483 langchain_messages.append(AIMessage(content=m["content"], additional_kwargs=m.get("additional_kwargs", {}), tool_calls=m.get("tool_calls", None))) 484 elif t == "ToolMessage": 485 langchain_messages.append(ToolMessage(content=m["content"], tool_call_id=m.get("tool_call_id"))) 486 487 # Convert LangChain objects to uni-api format 488 uni_messages = [] 489 490 for msg in langchain_messages: 491 if isinstance(msg, SystemMessage): 492 uni_messages.append({"role": "system", "content": msg.content}) 493 494 elif isinstance(msg, HumanMessage): 495 uni_messages.append({"role": "user", "content": msg.content}) 496 497 elif isinstance(msg, AIMessage): 498 uni_msg = {"role": "assistant", "content": msg.content or ""} 499 500 # Preserve additional_kwargs 501 if msg.additional_kwargs: 502 uni_msg["additional_kwargs"] = msg.additional_kwargs 503 504 # Handle tool calls in additional_kwargs (OpenAI style) 505 if msg.additional_kwargs.get("tool_calls"): 506 uni_msg["tool_calls"] = msg.additional_kwargs["tool_calls"] 507 508 # Handle tool calls as direct attribute 509 elif hasattr(msg, 'tool_calls') and msg.tool_calls: 510 uni_msg["tool_calls"] = msg.tool_calls 511 512 # Handle Anthropic style tool_use in content 513 elif isinstance(msg.content, list): 514 tool_calls = [] 515 content_parts = [] 516 517 for item in msg.content: 518 if isinstance(item, dict) and item.get('type') == 'tool_use': 519 # Convert Anthropic format to OpenAI format 520 tool_calls.append({ 521 "id": item.get("id", ""), 522 "type": "function", 523 "function": { 524 "name": item.get("name", ""), 525 "arguments": json.dumps(item.get("input", {})) 526 } 527 }) 528 elif isinstance(item, dict) and item.get('type') == 'text': 529 content_parts.append(item.get('text', '')) 530 elif isinstance(item, str): 531 content_parts.append(item) 532 533 if tool_calls: 534 uni_msg["tool_calls"] = tool_calls 535 if content_parts: 536 uni_msg["content"] = ' '.join(content_parts) 537 538 uni_messages.append(uni_msg) 539 540 elif isinstance(msg, ToolMessage): 541 uni_messages.append({ 542 "role": "tool", 543 "tool_call_id": msg.tool_call_id, 544 "content": msg.content 545 }) 546 547 return uni_messages
Convert dictionary messages to uni-api format: dictionary → LangChain → uni-api
549 @classmethod 550 def from_uni_messages( 551 cls, 552 uni_messages: List[Dict[str, Any]], 553 history_id: Optional[str] = None, 554 metadata: Optional[Dict[str, Any]] = None 555 ) -> "History": 556 """Create History from uni-api message format: uni-api → LangChain → dictionary""" 557 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 558 559 # First: uni-api → LangChain (original logic) 560 messages = [] 561 562 for uni_msg in uni_messages: 563 role = uni_msg["role"] 564 content = uni_msg.get("content", "") 565 566 if role == "system": 567 messages.append(SystemMessage(content=content)) 568 569 elif role == "user": 570 messages.append(HumanMessage(content=content)) 571 572 elif role == "assistant": 573 # Restore additional_kwargs from uni-api format 574 additional_kwargs = uni_msg.get("additional_kwargs", {}) 575 576 if uni_msg.get("tool_calls"): 577 # Transform OpenAI format to LangChain format 578 langchain_tool_calls = [] 579 for tc in uni_msg["tool_calls"]: 580 if "function" in tc: 581 # Convert OpenAI format to LangChain format 582 langchain_tool_calls.append({ 583 "id": tc["id"], 584 "name": tc["function"]["name"], 585 "args": json.loads(tc["function"]["arguments"]), 586 "type": "tool_call" 587 }) 588 else: 589 # Already in LangChain format 590 langchain_tool_calls.append(tc) 591 592 # Merge tool_calls into additional_kwargs 593 additional_kwargs["tool_calls"] = uni_msg["tool_calls"] 594 messages.append(AIMessage( 595 content=content, 596 additional_kwargs=additional_kwargs, 597 tool_calls=langchain_tool_calls # Use converted format 598 )) 599 else: 600 messages.append(AIMessage(content=content, additional_kwargs=additional_kwargs)) 601 602 elif role == "tool": 603 messages.append(ToolMessage( 604 content=content, 605 tool_call_id=uni_msg["tool_call_id"] 606 )) 607 608 # Convert LangChain messages to dictionary format for storage 609 message_dicts = [] 610 for msg in messages: 611 if isinstance(msg, SystemMessage): 612 message_dicts.append({ 613 "type": "SystemMessage", 614 "content": msg.content, 615 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 616 }) 617 elif isinstance(msg, HumanMessage): 618 message_dicts.append({ 619 "type": "HumanMessage", 620 "content": msg.content, 621 "additional_kwargs": getattr(msg, 'additional_kwargs', {}) 622 }) 623 elif isinstance(msg, AIMessage): 624 message_dicts.append({ 625 "type": "AIMessage", 626 "content": msg.content, 627 "additional_kwargs": getattr(msg, 'additional_kwargs', {}), 628 "tool_calls": getattr(msg, 'tool_calls', None) 629 }) 630 elif isinstance(msg, ToolMessage): 631 message_dicts.append({ 632 "type": "ToolMessage", 633 "content": msg.content, 634 "tool_call_id": getattr(msg, 'tool_call_id', None) 635 }) 636 637 # Create History using from_json to properly convert dictionaries to LangChain objects 638 json_data = { 639 "messages": message_dicts, 640 "history_id": history_id, 641 "metadata": metadata or {}, 642 "created_datetime": datetime.now().isoformat(), 643 "agent_status": None, 644 "json_md_path": None 645 } 646 return cls.from_json(json_data)
Create History from uni-api message format: uni-api → LangChain → dictionary
648 def save_with_uni_context(self, agent_name: str, uni_api_used: bool = False) -> str: 649 """ 650 Enhanced save method that adds uni-api context to metadata. 651 This is optional - the regular save() method will work fine too. 652 """ 653 if uni_api_used: 654 self.metadata["uni_api_used"] = True 655 self.metadata["provider_unified"] = True 656 657 return self.save(agent_name)
Enhanced save method that adds uni-api context to metadata. This is optional - the regular save() method will work fine too.
659 def get_last_n_uni_messages(self, n: int) -> List[Dict[str, Any]]: 660 """Get last N messages in uni-api format""" 661 uni_messages = self.to_uni_messages() 662 return uni_messages[-n:] if len(uni_messages) >= n else uni_messages
Get last N messages in uni-api format
664 def append_uni_messages(self, uni_messages: List[Dict[str, Any]]): 665 """Append uni-api messages to history after converting to LangChain format""" 666 from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage 667 668 for uni_msg in uni_messages: 669 role = uni_msg["role"] 670 content = uni_msg.get("content", "") 671 672 if role == "system": 673 self.messages.append(SystemMessage(content=content)) 674 675 elif role == "user": 676 self.messages.append(HumanMessage(content=content)) 677 678 elif role == "assistant": 679 if uni_msg.get("tool_calls"): 680 self.messages.append(AIMessage( 681 content=content, 682 additional_kwargs={"tool_calls": uni_msg["tool_calls"]} 683 )) 684 else: 685 self.messages.append(AIMessage(content=content)) 686 687 elif role == "tool": 688 self.messages.append(ToolMessage( 689 content=content, 690 tool_call_id=uni_msg["tool_call_id"] 691 ))
Append uni-api messages to history after converting to LangChain format
1187 @property 1188 def iterations(self) -> Dict[str, List[Union[BaseMessage, ADKEvent]]]: # changed return annotation 1189 """ 1190 Returns a dict of all “iterations” in this history, 1191 where each iteration is the sequence of non‑HumanMessage 1192 messages between two human turns. 1193 """ 1194 return self._compute_iterations()
Returns a dict of all “iterations” in this history, where each iteration is the sequence of non‑HumanMessage messages between two human turns.
1199def get_iteration_view( 1200 history: History, 1201 start: int, 1202 end: int 1203) -> Dict[str, Any]: 1204 """ 1205 Return a dict containing: 1206 • history_id – the ID of this history 1207 • total_iterations – how many iterations the history has 1208 • view_range – dict with 'start' and 'end' 1209 • view – sub‑dict of iteration_i → List[BaseMessage] 1210 Raises: 1211 • ValueError if start > end 1212 • KeyError if any iteration_i in that range is missing 1213 """ 1214 if start > end: 1215 raise ValueError(f"start ({start}) must be <= end ({end})") 1216 1217 all_iters = history.iterations 1218 total = len(all_iters) 1219 1220 # Build the view 1221 view: Dict[str, List[BaseMessage]] = {} 1222 for i in range(start, end + 1): 1223 key = f"iteration_{i}" 1224 if key not in all_iters: 1225 raise KeyError(f"Missing iteration: {key}") 1226 view[key] = all_iters[key] 1227 1228 return { 1229 "history_id": history.history_id, 1230 "total_iterations": total, 1231 "view_range": {"start": start, "end": end}, 1232 "view": view 1233 } 1234 1235# from google.genai import types 1236 # def to_contents(self) -> list[types.Content]: 1237 # contents: list[types.Content] = [] 1238 # for msg in self.messages: 1239 # if isinstance(msg, SystemMessage): 1240 # role = "system"; text = msg.content 1241 # elif isinstance(msg, HumanMessage): 1242 # role = "user"; text = msg.content 1243 # elif isinstance(msg, AIMessage): 1244 # role = "assistant" 1245 # text = ( 1246 # msg.content 1247 # if isinstance(msg.content, str) 1248 # else "".join( 1249 # b.get("text","") for b in msg.content if b.get("type")=="text" 1250 # ) 1251 # ) 1252 # elif isinstance(msg, ToolMessage): 1253 # role = "tool"; text = msg.content 1254 # else: 1255 # continue 1256 1257 # contents.append(types.Content( 1258 # role=role, 1259 # parts=[types.Part(text=text)] 1260 # )) 1261 # return contents 1262 1263 # def update_from_contents(self, contents: list[types.Content]): 1264 # new_msgs: list[BaseMessage] = [] 1265 # for c in contents: 1266 # text = "".join(p.text or "" for p in c.parts) 1267 # if c.role == "system": 1268 # new_msgs.append(SystemMessage(content=text)) 1269 # elif c.role == "user": 1270 # new_msgs.append(HumanMessage(content=text)) 1271 # elif c.role == "assistant": 1272 # new_msgs.append(AIMessage(content=text)) 1273 # elif c.role == "tool": 1274 # new_msgs.append(ToolMessage(content=text)) 1275 # self.messages = new_msgs
Return a dict containing: • history_id – the ID of this history • total_iterations – how many iterations the history has • view_range – dict with 'start' and 'end' • view – sub‑dict of iteration_i → List[BaseMessage] Raises: • ValueError if start > end • KeyError if any iteration_i in that range is missing