───────────────────────────────────────────────────────────────────── src/bond/__init__.py ─────────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Bond - The Forensic Runtime for AI agents.

Full-spectrum streaming with complete observability. Every token, every thought,
every tool call - captured and surfaced in real-time.
"""

from bond.agent import BondAgent, StreamHandlers
from bond.trace import (
    JSONFileTraceStore,
    TraceEvent,
    TraceMeta,
    TraceReplayer,
    TraceStorageProtocol,
    create_capture_handlers,
    finalize_capture,
)
from bond.utils import (
    create_print_handlers,
    create_sse_handlers,
    create_websocket_handlers,
)

__version__ = "0.1.0"

__all__ = [
    # Core
    "BondAgent",
    "StreamHandlers",
    # Utilities
    "create_websocket_handlers",
    "create_sse_handlers",
    "create_print_handlers",
    # Trace
    "TraceEvent",
    "TraceMeta",
    "TraceStorageProtocol",
    "JSONFileTraceStore",
    "create_capture_handlers",
    "finalize_capture",
    "TraceReplayer",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────────── src/bond/agent.py ───────────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Core agent runtime with high-fidelity streaming."""

import json
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar

from pydantic_ai import Agent
from pydantic_ai.messages import (
    FinalResultEvent,
    FunctionToolCallEvent,
    FunctionToolResultEvent,
    ModelMessage,
    PartDeltaEvent,
    PartEndEvent,
    PartStartEvent,
    TextPartDelta,
    ThinkingPartDelta,
    ToolCallPartDelta,
)
from pydantic_ai.models import Model
from pydantic_ai.tools import Tool

T = TypeVar("T")
DepsT = TypeVar("DepsT")


@dataclass
class StreamHandlers:
    """Callbacks mapping to every stage of the LLM lifecycle.

    This allows the UI to perfectly reconstruct the Agent's thought process.

    Lifecycle Events:
        on_block_start: A new block (Text, Thinking, or Tool Call) has started.
        on_block_end: A block has finished generating.
        on_complete: The entire response is finished.

    Content Events (Typing Effect):
        on_text_delta: Incremental text content.
        on_thinking_delta: Incremental thinking/reasoning content.
        on_tool_call_delta: Incremental tool name and arguments as they form.

    Execution Events:
        on_tool_execute: Tool call is fully formed and NOW executing.
        on_tool_result: Tool has finished and returned data.

    Example:
        ```python
        handlers = StreamHandlers(
            on_block_start=lambda kind, idx: print(f"[Start {kind} #{idx}]"),
            on_text_delta=lambda txt: print(txt, end=""),
            on_tool_execute=lambda id, name, args: print(f"[Running {name}...]"),
            on_tool_result=lambda id, name, res: print(f"[Result: {res}]"),
            on_complete=lambda data: print(f"[Done: {data}]"),
        )
        ```
    """

    # Lifecycle: Block open/close
    on_block_start: Callable[[str, int], None] | None = None  # (type, index)
    on_block_end: Callable[[str, int], None] | None = None  # (type, index)

    # Content: Incremental deltas
    on_text_delta: Callable[[str], None] | None = None
    on_thinking_delta: Callable[[str], None] | None = None
    on_tool_call_delta: Callable[[str, str], None] | None = None  # (name_delta, args_delta)

    # Execution: Tool running/results
    on_tool_execute: Callable[[str, str, dict[str, Any]], None] | None = None  # (id, name, args)
    on_tool_result: Callable[[str, str, str], None] | None = None  # (id, name, result_str)

    # Lifecycle: Response complete
    on_complete: Callable[[Any], None] | None = None


@dataclass
class BondAgent(Generic[T, DepsT]):
    """Generic agent runtime wrapping PydanticAI with full-spectrum streaming.

    A BondAgent provides:
    - High-fidelity streaming with callbacks for every lifecycle event
    - Block start/end notifications for UI rendering
    - Real-time streaming of text, thinking, and tool arguments
    - Tool execution and result callbacks
    - Message history management
    - Dynamic instruction override
    - Toolset composition
    - Retry handling

    Example:
        ```python
        agent = BondAgent(
            name="assistant",
            instructions="You are helpful.",
            model="anthropic:claude-sonnet-4-20250514",
            toolsets=[memory_toolset],
            deps=QdrantMemoryStore(),
        )

        handlers = StreamHandlers(
            on_text_delta=lambda t: print(t, end=""),
            on_tool_execute=lambda id, name, args: print(f"[Running {name}]"),
        )

        response = await agent.ask("Remember my preference", handlers=handlers)
        ```
    """

    name: str
    instructions: str
    model: str | Model
    toolsets: Sequence[Sequence[Tool[DepsT]]] = field(default_factory=list)
    deps: DepsT | None = None
    # output_type can be a type, PromptedOutput, or other pydantic_ai output specs
    output_type: type[T] | Any = str
    max_retries: int = 3

    _agent: Agent[DepsT, T] | None = field(default=None, init=False, repr=False)
    _history: list[ModelMessage] = field(default_factory=list, init=False, repr=False)
    _tool_names: dict[str, str] = field(default_factory=dict, init=False, repr=False)
    _tools: list[Tool[DepsT]] = field(default_factory=list, init=False, repr=False)

    def __post_init__(self) -> None:
        """Initialize the underlying PydanticAI agent."""
        all_tools: list[Tool[DepsT]] = []
        for toolset in self.toolsets:
            all_tools.extend(toolset)

        # Store tools for reuse when creating dynamic agents
        self._tools = all_tools

        # Only pass system_prompt if instructions are non-empty
        # This matches behavior when using raw Agent without system_prompt
        agent_kwargs: dict[str, Any] = {
            "model": self.model,
            "tools": all_tools,
            "output_type": self.output_type,
            "retries": self.max_retries,
        }
        # Only set deps_type when deps is provided
        if self.deps is not None:
            agent_kwargs["deps_type"] = type(self.deps)
        if self.instructions:
            agent_kwargs["system_prompt"] = self.instructions

        self._agent = Agent(**agent_kwargs)

    async def ask(
        self,
        prompt: str,
        *,
        handlers: StreamHandlers | None = None,
        dynamic_instructions: str | None = None,
    ) -> T:
        """Send prompt and get response with high-fidelity streaming.

        Args:
            prompt: The user's message/question.
            handlers: Optional callbacks for streaming events.
            dynamic_instructions: Override system prompt for this call only.

        Returns:
            The agent's response of type T.
        """
        if self._agent is None:
            raise RuntimeError("Agent not initialized")

        active_agent = self._agent
        if dynamic_instructions and dynamic_instructions != self.instructions:
            dynamic_kwargs: dict[str, Any] = {
                "model": self.model,
                "system_prompt": dynamic_instructions,
                "tools": self._tools,
                "output_type": self.output_type,
                "retries": self.max_retries,
            }
            if self.deps is not None:
                dynamic_kwargs["deps_type"] = type(self.deps)
            active_agent = Agent(**dynamic_kwargs)

        if handlers:
            # Track tool call IDs to names for result lookup
            tool_id_to_name: dict[str, str] = {}

            # Build run_stream kwargs - only include deps if provided
            stream_kwargs: dict[str, Any] = {"message_history": self._history}
            if self.deps is not None:
                stream_kwargs["deps"] = self.deps

            async with active_agent.run_stream(prompt, **stream_kwargs) as result:
                async for event in result.stream():
                    # --- 1. BLOCK LIFECYCLE (Open/Close) ---
                    if isinstance(event, PartStartEvent):
                        if handlers.on_block_start:
                            kind = getattr(event.part, "part_kind", "unknown")
                            handlers.on_block_start(kind, event.index)

                    elif isinstance(event, PartEndEvent):
                        if handlers.on_block_end:
                            kind = getattr(event.part, "part_kind", "unknown")
                            handlers.on_block_end(kind, event.index)

                    # --- 2. DELTAS (Typing Effect) ---
                    elif isinstance(event, PartDeltaEvent):
                        delta = event.delta

                        if isinstance(delta, TextPartDelta):
                            if handlers.on_text_delta:
                                handlers.on_text_delta(delta.content_delta)

                        elif isinstance(delta, ThinkingPartDelta):
                            if handlers.on_thinking_delta and delta.content_delta:
                                handlers.on_thinking_delta(delta.content_delta)

                        elif isinstance(delta, ToolCallPartDelta):
                            if handlers.on_tool_call_delta:
                                name_d = delta.tool_name_delta or ""
                                args_d = delta.args_delta or ""
                                # Handle dict args (rare but possible)
                                if isinstance(args_d, dict):
                                    args_d = json.dumps(args_d)
                                handlers.on_tool_call_delta(name_d, args_d)

                    # --- 3. EXECUTION (Tool Running/Results) ---
                    elif isinstance(event, FunctionToolCallEvent):
                        # Tool call fully formed, starting execution
                        tool_id_to_name[event.tool_call_id] = event.part.tool_name
                        if handlers.on_tool_execute:
                            handlers.on_tool_execute(
                                event.tool_call_id,
                                event.part.tool_name,
                                event.part.args_as_dict(),
                            )

                    elif isinstance(event, FunctionToolResultEvent):
                        # Tool returned data
                        if handlers.on_tool_result:
                            tool_name = tool_id_to_name.get(event.tool_call_id, "unknown")
                            handlers.on_tool_result(
                                event.tool_call_id,
                                tool_name,
                                str(event.result.content),
                            )

                    # --- 4. COMPLETION ---
                    elif isinstance(event, FinalResultEvent):
                        pass  # Handled after stream

                # Stream finished
                self._history = list(result.all_messages())

                # Get output - use get_output() which is the awaitable method
                output: T = await result.get_output()

                if handlers.on_complete:
                    handlers.on_complete(output)

                return output

        # Non-streaming fallback - build kwargs similarly
        run_kwargs: dict[str, Any] = {"message_history": self._history}
        if self.deps is not None:
            run_kwargs["deps"] = self.deps

        run_result = await active_agent.run(prompt, **run_kwargs)
        self._history = list(run_result.all_messages())
        result_output: T = run_result.output
        return result_output

    def get_message_history(self) -> list[ModelMessage]:
        """Get current conversation history."""
        return list(self._history)

    def set_message_history(self, history: list[ModelMessage]) -> None:
        """Replace conversation history."""
        self._history = list(history)

    def clear_history(self) -> None:
        """Clear conversation history."""
        self._history = []

    def clone_with_history(self, history: list[ModelMessage]) -> "BondAgent[T, DepsT]":
        """Create new agent instance with given history (for branching).

        Args:
            history: The message history to use for the clone.

        Returns:
            A new BondAgent with the same configuration but different history.
        """
        clone: BondAgent[T, DepsT] = BondAgent(
            name=self.name,
            instructions=self.instructions,
            model=self.model,
            toolsets=list(self.toolsets),
            deps=self.deps,
            output_type=self.output_type,
            max_retries=self.max_retries,
        )
        clone.set_message_history(history)
        return clone


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────── src/bond/tools/__init__.py ──────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Bond toolsets for agent capabilities."""


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/tools/githunter/__init__.py ─────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Git Hunter: Forensic code ownership tool.

Provides tools for investigating git history to determine:
- Who last modified a specific line (blame)
- What PR discussion led to a change
- Who are the experts for a file based on commit frequency
"""

from ._adapter import GitHunterAdapter
from ._exceptions import (
    BinaryFileError,
    FileNotFoundInRepoError,
    GitHubUnavailableError,
    GitHunterError,
    LineOutOfRangeError,
    RateLimitedError,
    RepoNotFoundError,
    ShallowCloneError,
)
from ._models import (
    BlameLineRequest,
    Error,
    FindPRDiscussionRequest,
    GetExpertsRequest,
)
from ._protocols import GitHunterProtocol
from ._types import AuthorProfile, BlameResult, FileExpert, PRDiscussion
from .tools import githunter_toolset

__all__ = [
    # Adapter
    "GitHunterAdapter",
    # Types
    "AuthorProfile",
    "BlameResult",
    "FileExpert",
    "PRDiscussion",
    # Protocol
    "GitHunterProtocol",
    # Toolset
    "githunter_toolset",
    # Request Models
    "BlameLineRequest",
    "FindPRDiscussionRequest",
    "GetExpertsRequest",
    "Error",
    # Exceptions
    "GitHunterError",
    "RepoNotFoundError",
    "FileNotFoundInRepoError",
    "LineOutOfRangeError",
    "BinaryFileError",
    "ShallowCloneError",
    "RateLimitedError",
    "GitHubUnavailableError",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/tools/githunter/_adapter.py ─────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""GitHunter adapter implementation.

Provides git forensics capabilities via subprocess calls to git CLI
and httpx calls to GitHub API.
"""

from __future__ import annotations

import asyncio
import logging
import os
import re
from datetime import UTC, datetime
from pathlib import Path

import httpx

from ._exceptions import (
    BinaryFileError,
    FileNotFoundInRepoError,
    GitHubUnavailableError,
    LineOutOfRangeError,
    RateLimitedError,
    RepoNotFoundError,
)
from ._types import AuthorProfile, BlameResult, FileExpert, PRDiscussion

logger = logging.getLogger(__name__)

# Regex patterns for parsing git remote URLs
SSH_REMOTE_PATTERN = re.compile(r"git@github\.com:([^/]+)/(.+?)(?:\.git)?$")
HTTPS_REMOTE_PATTERN = re.compile(r"https://github\.com/([^/]+)/(.+?)(?:\.git)?$")


class GitHunterAdapter:
    """Git Hunter adapter for forensic code ownership analysis.

    Uses git CLI via async subprocess for blame and log operations.
    Optionally uses GitHub API for PR lookup and author enrichment.
    """

    def __init__(self, timeout: int = 30) -> None:
        """Initialize adapter.

        Args:
            timeout: Timeout in seconds for git/HTTP operations.
        """
        self._timeout = timeout
        self._head_cache: dict[str, str] = {}
        self._github_token = os.environ.get("GITHUB_TOKEN")
        self._http_client: httpx.AsyncClient | None = None

    async def _get_http_client(self) -> httpx.AsyncClient:
        """Get or create HTTP client for GitHub API.

        Returns:
            Configured httpx.AsyncClient.
        """
        if self._http_client is None:
            headers = {
                "Accept": "application/vnd.github+json",
                "X-GitHub-Api-Version": "2022-11-28",
            }
            if self._github_token:
                headers["Authorization"] = f"Bearer {self._github_token}"
            self._http_client = httpx.AsyncClient(
                base_url="https://api.github.com",
                headers=headers,
                timeout=self._timeout,
            )
        return self._http_client

    async def _run_git(
        self,
        repo_path: Path,
        *args: str,
    ) -> tuple[str, str, int]:
        """Run a git command asynchronously.

        Args:
            repo_path: Path to git repository.
            *args: Git command arguments.

        Returns:
            Tuple of (stdout, stderr, return_code).

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
        """
        cmd = ["git", "-C", str(repo_path), *args]
        try:
            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            stdout, stderr = await asyncio.wait_for(
                proc.communicate(),
                timeout=self._timeout,
            )
            return (
                stdout.decode("utf-8", errors="replace"),
                stderr.decode("utf-8", errors="replace"),
                proc.returncode or 0,
            )
        except FileNotFoundError as e:
            raise RepoNotFoundError(str(repo_path)) from e

    async def _get_head_sha(self, repo_path: Path) -> str:
        """Get current HEAD SHA for cache invalidation.

        Args:
            repo_path: Path to git repository.

        Returns:
            HEAD commit SHA.
        """
        cache_key = str(repo_path.resolve())
        if cache_key in self._head_cache:
            return self._head_cache[cache_key]

        stdout, stderr, code = await self._run_git(repo_path, "rev-parse", "HEAD")
        if code != 0:
            raise RepoNotFoundError(str(repo_path))

        sha = stdout.strip()
        self._head_cache[cache_key] = sha
        return sha

    async def _get_github_repo(self, repo_path: Path) -> tuple[str, str] | None:
        """Get GitHub owner/repo from git remote URL.

        Args:
            repo_path: Path to git repository.

        Returns:
            Tuple of (owner, repo) or None if not a GitHub repo.
        """
        stdout, stderr, code = await self._run_git(repo_path, "remote", "get-url", "origin")
        if code != 0:
            return None

        remote_url = stdout.strip()

        # Try SSH format: git@github.com:owner/repo.git
        match = SSH_REMOTE_PATTERN.match(remote_url)
        if match:
            return (match.group(1), match.group(2))

        # Try HTTPS format: https://github.com/owner/repo.git
        match = HTTPS_REMOTE_PATTERN.match(remote_url)
        if match:
            return (match.group(1), match.group(2))

        return None

    def _check_rate_limit(self, response: httpx.Response) -> None:
        """Check GitHub rate limit headers and warn/raise as needed.

        Args:
            response: HTTP response from GitHub API.

        Raises:
            RateLimitedError: If rate limit is exceeded.
        """
        remaining = response.headers.get("X-RateLimit-Remaining")
        reset_at = response.headers.get("X-RateLimit-Reset")

        if remaining is not None:
            remaining_int = int(remaining)
            if remaining_int < 100:
                logger.warning("GitHub API rate limit low: %d requests remaining", remaining_int)

        if response.status_code == 403:
            # Check if it's a rate limit error
            if "rate limit" in response.text.lower():
                reset_timestamp = int(reset_at) if reset_at else 0
                reset_datetime = datetime.fromtimestamp(reset_timestamp, tz=UTC)
                retry_after = max(0, reset_timestamp - int(datetime.now(tz=UTC).timestamp()))
                raise RateLimitedError(retry_after, reset_datetime)

    def _parse_porcelain_blame(self, output: str) -> dict[str, str]:
        """Parse git blame --porcelain output.

        Args:
            output: Raw porcelain output from git blame.

        Returns:
            Dict with parsed fields.
        """
        result: dict[str, str] = {}
        lines = output.strip().split("\n")

        if not lines:
            return result

        # First line is: <sha> <orig_line> <final_line> [<num_lines>]
        first_line = lines[0]
        parts = first_line.split()
        if parts:
            result["commit"] = parts[0]

        # Parse header lines
        for line in lines[1:]:
            if line.startswith("\t"):
                # Content line (starts with tab)
                result["content"] = line[1:]
            elif " " in line:
                key, _, value = line.partition(" ")
                result[key] = value

        return result

    async def blame_line(
        self,
        repo_path: Path,
        file_path: str,
        line_no: int,
    ) -> BlameResult:
        """Get blame information for a specific line.

        Args:
            repo_path: Path to the git repository root.
            file_path: Path to file relative to repo root.
            line_no: Line number to blame (1-indexed).

        Returns:
            BlameResult with author, commit, and line information.

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
            FileNotFoundInRepoError: If file doesn't exist in repo.
            LineOutOfRangeError: If line_no is invalid.
            BinaryFileError: If file is binary.
        """
        if line_no < 1:
            raise LineOutOfRangeError(line_no)

        # Check if repo is valid
        await self._get_head_sha(repo_path)

        # Run git blame
        stdout, stderr, code = await self._run_git(
            repo_path,
            "blame",
            "--porcelain",
            "-L",
            f"{line_no},{line_no}",
            "--",
            file_path,
        )

        if code != 0:
            stderr_lower = stderr.lower()
            if "no such path" in stderr_lower or "does not exist" in stderr_lower:
                raise FileNotFoundInRepoError(file_path, str(repo_path))
            if "invalid line" in stderr_lower or "no lines to blame" in stderr_lower:
                raise LineOutOfRangeError(line_no)
            if "binary file" in stderr_lower:
                raise BinaryFileError(file_path)
            if "fatal: not a git repository" in stderr_lower:
                raise RepoNotFoundError(str(repo_path))
            raise RepoNotFoundError(str(repo_path))

        # Parse output
        parsed = self._parse_porcelain_blame(stdout)

        if not parsed.get("commit"):
            raise LineOutOfRangeError(line_no)

        commit_hash = parsed["commit"]
        is_boundary = commit_hash.startswith("^") or parsed.get("boundary") == "1"

        # Clean up boundary marker from hash
        if commit_hash.startswith("^"):
            commit_hash = commit_hash[1:]

        # Parse author time
        author_time_str = parsed.get("author-time", "0")
        try:
            author_time = int(author_time_str)
            commit_date = datetime.fromtimestamp(author_time, tz=UTC)
        except (ValueError, OSError):
            commit_date = datetime.now(tz=UTC)

        # Build author profile (enrichment happens separately if needed)
        author = AuthorProfile(
            git_email=parsed.get("author-mail", "").strip("<>"),
            git_name=parsed.get("author", "Unknown"),
        )

        return BlameResult(
            line_no=line_no,
            content=parsed.get("content", ""),
            author=author,
            commit_hash=commit_hash,
            commit_date=commit_date,
            commit_message=parsed.get("summary", ""),
            is_boundary=is_boundary,
        )

    async def find_pr_discussion(
        self,
        repo_path: Path,
        commit_hash: str,
    ) -> PRDiscussion | None:
        """Find the PR discussion for a commit.

        Args:
            repo_path: Path to the git repository root.
            commit_hash: Full or abbreviated commit SHA.

        Returns:
            PRDiscussion if commit is associated with a PR, None otherwise.

        Raises:
            RateLimitedError: If GitHub rate limit exceeded.
            GitHubUnavailableError: If GitHub API is unavailable.
        """
        if not self._github_token:
            logger.debug("No GITHUB_TOKEN set, skipping PR lookup")
            return None

        # Get owner/repo from remote
        github_repo = await self._get_github_repo(repo_path)
        if not github_repo:
            logger.debug("Not a GitHub repository, skipping PR lookup")
            return None

        owner, repo = github_repo
        client = await self._get_http_client()

        try:
            # Find PRs associated with this commit
            response = await client.get(f"/repos/{owner}/{repo}/commits/{commit_hash}/pulls")
            self._check_rate_limit(response)

            if response.status_code == 404:
                return None
            if response.status_code != 200:
                logger.warning(
                    "GitHub API error %d for commit %s", response.status_code, commit_hash
                )
                return None

            prs = response.json()
            if not prs:
                return None

            # Get the first (most recent) PR
            pr_data = prs[0]
            pr_number = pr_data["number"]

            # Fetch issue comments (top-level PR comments)
            comments_response = await client.get(
                f"/repos/{owner}/{repo}/issues/{pr_number}/comments",
                params={"per_page": 100},
            )
            self._check_rate_limit(comments_response)

            comments: list[str] = []
            if comments_response.status_code == 200:
                for comment in comments_response.json():
                    body = comment.get("body", "")
                    if body:
                        comments.append(body)

            return PRDiscussion(
                pr_number=pr_number,
                title=pr_data.get("title", ""),
                body=pr_data.get("body", "") or "",
                url=pr_data.get("html_url", ""),
                issue_comments=tuple(comments),
            )

        except httpx.TimeoutException as e:
            raise GitHubUnavailableError("GitHub API timeout") from e
        except httpx.RequestError as e:
            raise GitHubUnavailableError(f"GitHub API error: {e}") from e

    async def enrich_author(self, author: AuthorProfile) -> AuthorProfile:
        """Enrich author profile with GitHub data.

        Args:
            author: Author profile with git_email.

        Returns:
            Author profile with github_username and avatar_url if found.
        """
        if not self._github_token or not author.git_email:
            return author

        client = await self._get_http_client()

        try:
            # Search for user by email
            response = await client.get(
                "/search/users",
                params={"q": f"{author.git_email} in:email"},
            )
            self._check_rate_limit(response)

            if response.status_code != 200:
                return author

            data = response.json()
            if data.get("total_count", 0) > 0 and data.get("items"):
                user = data["items"][0]
                return AuthorProfile(
                    git_email=author.git_email,
                    git_name=author.git_name,
                    github_username=user.get("login"),
                    github_avatar_url=user.get("avatar_url"),
                )

        except (httpx.TimeoutException, httpx.RequestError):
            # Graceful degradation - return unenriched author
            pass

        return author

    async def get_expert_for_file(
        self,
        repo_path: Path,
        file_path: str,
        window_days: int = 90,
        limit: int = 3,
    ) -> list[FileExpert]:
        """Get experts for a file based on commit frequency.

        Args:
            repo_path: Path to the git repository root.
            file_path: Path to file relative to repo root.
            window_days: Time window for commit history (0 for all time).
            limit: Maximum number of experts to return.

        Returns:
            List of FileExpert sorted by commit count (descending).

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
            FileNotFoundInRepoError: If file doesn't exist in repo.
        """
        # Build git log command
        # Format: email|name|hash|timestamp
        args = [
            "log",
            "--format=%aE|%aN|%H|%at",
            "--follow",
            "--no-merges",
        ]

        # Add time window if specified
        if window_days and window_days > 0:
            args.append(f"--since={window_days} days ago")

        args.extend(["--", file_path])

        stdout, stderr, code = await self._run_git(repo_path, *args)

        if code != 0:
            stderr_lower = stderr.lower()
            if "fatal: not a git repository" in stderr_lower:
                raise RepoNotFoundError(str(repo_path))
            # Empty output for non-existent files is handled below
            return []

        # Parse output and group by author email (case-insensitive)
        author_stats: dict[str, dict[str, str | int | datetime]] = {}

        for line in stdout.strip().split("\n"):
            if not line or "|" not in line:
                continue

            parts = line.split("|")
            if len(parts) < 4:
                continue

            email = parts[0].lower()  # Case-insensitive grouping
            name = parts[1]
            # commit_hash = parts[2]  # Not needed for stats
            try:
                timestamp = int(parts[3])
                commit_date = datetime.fromtimestamp(timestamp, tz=UTC)
            except (ValueError, OSError):
                commit_date = datetime.now(tz=UTC)

            if email not in author_stats:
                author_stats[email] = {
                    "name": name,
                    "email": email,
                    "commit_count": 0,
                    "last_commit_date": commit_date,
                }

            current_count = author_stats[email]["commit_count"]
            if isinstance(current_count, int):
                author_stats[email]["commit_count"] = current_count + 1

            # Track most recent commit
            current_last = author_stats[email]["last_commit_date"]
            if isinstance(current_last, datetime) and commit_date > current_last:
                author_stats[email]["last_commit_date"] = commit_date

        # Sort by commit count descending and take top N
        sorted_authors = sorted(
            author_stats.values(),
            key=lambda x: x["commit_count"] if isinstance(x["commit_count"], int) else 0,
            reverse=True,
        )[:limit]

        # Build FileExpert results
        experts: list[FileExpert] = []
        for stats in sorted_authors:
            author = AuthorProfile(
                git_email=str(stats["email"]),
                git_name=str(stats["name"]),
            )
            commit_count = stats["commit_count"]
            last_date = stats["last_commit_date"]
            last_commit = last_date if isinstance(last_date, datetime) else datetime.now(tz=UTC)
            experts.append(
                FileExpert(
                    author=author,
                    commit_count=commit_count if isinstance(commit_count, int) else 0,
                    last_commit_date=last_commit,
                )
            )

        return experts

    async def close(self) -> None:
        """Close HTTP client and cleanup resources."""
        if self._http_client:
            await self._http_client.aclose()
            self._http_client = None


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
─────────────────────────────────────────────────────────── src/bond/tools/githunter/_exceptions.py ────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Exception hierarchy for Git Hunter tool.

All exceptions inherit from GitHunterError for easy catching.
"""

from __future__ import annotations

from datetime import datetime


class GitHunterError(Exception):
    """Base exception for all Git Hunter errors."""

    pass


class RepoNotFoundError(GitHunterError):
    """Raised when path is not inside a git repository."""

    def __init__(self, path: str) -> None:
        """Initialize with the invalid path.

        Args:
            path: The path that is not in a git repository.
        """
        self.path = path
        super().__init__(f"Path is not inside a git repository: {path}")


class FileNotFoundInRepoError(GitHunterError):
    """Raised when file does not exist in the repository."""

    def __init__(self, file_path: str, repo_path: str) -> None:
        """Initialize with file and repo paths.

        Args:
            file_path: The file that was not found.
            repo_path: The repository path.
        """
        self.file_path = file_path
        self.repo_path = repo_path
        super().__init__(f"File not found in repository: {file_path} (repo: {repo_path})")


class LineOutOfRangeError(GitHunterError):
    """Raised when line number is invalid for the file."""

    def __init__(self, line_no: int, max_lines: int | None = None) -> None:
        """Initialize with line number and optional max.

        Args:
            line_no: The invalid line number.
            max_lines: Maximum valid line number if known.
        """
        self.line_no = line_no
        self.max_lines = max_lines
        if max_lines is not None:
            msg = f"Line {line_no} out of range (file has {max_lines} lines)"
        else:
            msg = f"Line {line_no} out of range"
        super().__init__(msg)


class BinaryFileError(GitHunterError):
    """Raised when attempting to blame a binary file."""

    def __init__(self, file_path: str) -> None:
        """Initialize with file path.

        Args:
            file_path: The binary file path.
        """
        self.file_path = file_path
        super().__init__(f"Cannot blame binary file: {file_path}")


class ShallowCloneError(GitHunterError):
    """Raised when shallow clone prevents full history access."""

    def __init__(self, message: str = "Repository is a shallow clone") -> None:
        """Initialize with message.

        Args:
            message: Description of the shallow clone issue.
        """
        super().__init__(message)


class RateLimitedError(GitHunterError):
    """Raised when GitHub API rate limit is exceeded."""

    def __init__(
        self,
        retry_after_seconds: int,
        reset_at: datetime,
        message: str | None = None,
    ) -> None:
        """Initialize with rate limit details.

        Args:
            retry_after_seconds: Seconds until rate limit resets.
            reset_at: UTC datetime when rate limit resets.
            message: Optional custom message.
        """
        self.retry_after_seconds = retry_after_seconds
        self.reset_at = reset_at
        msg = message or f"GitHub rate limit exceeded. Retry after {retry_after_seconds}s"
        super().__init__(msg)


class GitHubUnavailableError(GitHunterError):
    """Raised when GitHub API is unavailable."""

    def __init__(self, message: str = "GitHub API is unavailable") -> None:
        """Initialize with message.

        Args:
            message: Description of the unavailability.
        """
        super().__init__(message)


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/tools/githunter/_models.py ──────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""GitHunter request and error models.

Pydantic models for GitHunter tool inputs and error responses.
"""

from typing import Annotated

from pydantic import BaseModel, Field


class BlameLineRequest(BaseModel):
    """Request to get blame information for a specific line.

    Agent Usage: Use this when you need to know who last modified a specific
    line of code, what commit changed it, and when.
    """

    repo_path: Annotated[
        str,
        Field(description="Path to the git repository root"),
    ]

    file_path: Annotated[
        str,
        Field(description="Path to file relative to repo root"),
    ]

    line_no: Annotated[
        int,
        Field(ge=1, description="Line number to blame (1-indexed)"),
    ]


class FindPRDiscussionRequest(BaseModel):
    """Request to find PR discussion for a commit.

    Agent Usage: Use this when you have a commit hash and want to find
    the pull request discussion that introduced it.
    """

    repo_path: Annotated[
        str,
        Field(description="Path to the git repository root"),
    ]

    commit_hash: Annotated[
        str,
        Field(min_length=7, description="Full or abbreviated commit SHA"),
    ]


class GetExpertsRequest(BaseModel):
    """Request to get file experts based on commit frequency.

    Agent Usage: Use this when you need to identify who has the most
    knowledge about a file based on their commit history.
    """

    repo_path: Annotated[
        str,
        Field(description="Path to the git repository root"),
    ]

    file_path: Annotated[
        str,
        Field(description="Path to file relative to repo root"),
    ]

    window_days: Annotated[
        int,
        Field(default=90, ge=0, description="Days of history to consider (0=all time)"),
    ]

    limit: Annotated[
        int,
        Field(default=3, ge=1, le=10, description="Maximum number of experts to return"),
    ]


class Error(BaseModel):
    """Error response from GitHunter operations.

    Used as union return type: `BlameResult | Error`, `PRDiscussion | None | Error`, etc.
    """

    description: Annotated[
        str,
        Field(description="Error message explaining what went wrong"),
    ]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────── src/bond/tools/githunter/_protocols.py ────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Protocol definition for Git Hunter tool.

Defines the interface that GitHunterAdapter must implement.
"""

from __future__ import annotations

from pathlib import Path
from typing import Protocol, runtime_checkable

from ._types import BlameResult, FileExpert, PRDiscussion


@runtime_checkable
class GitHunterProtocol(Protocol):
    """Protocol for Git Hunter forensic code ownership tool.

    Provides methods to:
    - Blame individual lines to find who last modified them
    - Find PR discussions for commits
    - Determine file experts based on commit frequency
    """

    async def blame_line(
        self,
        repo_path: Path,
        file_path: str,
        line_no: int,
    ) -> BlameResult:
        """Get blame information for a specific line.

        Args:
            repo_path: Path to the git repository root.
            file_path: Path to file relative to repo root.
            line_no: Line number to blame (1-indexed).

        Returns:
            BlameResult with author, commit, and line information.

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
            FileNotFoundInRepoError: If file doesn't exist in repo.
            LineOutOfRangeError: If line_no is invalid.
            BinaryFileError: If file is binary.
        """
        ...

    async def find_pr_discussion(
        self,
        repo_path: Path,
        commit_hash: str,
    ) -> PRDiscussion | None:
        """Find the PR discussion for a commit.

        Args:
            repo_path: Path to the git repository root.
            commit_hash: Full or abbreviated commit SHA.

        Returns:
            PRDiscussion if commit is associated with a PR, None otherwise.

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
            RateLimitedError: If GitHub rate limit exceeded.
            GitHubUnavailableError: If GitHub API is unavailable.
        """
        ...

    async def get_expert_for_file(
        self,
        repo_path: Path,
        file_path: str,
        window_days: int = 90,
        limit: int = 3,
    ) -> list[FileExpert]:
        """Get experts for a file based on commit frequency.

        Args:
            repo_path: Path to the git repository root.
            file_path: Path to file relative to repo root.
            window_days: Time window for commit history (0 or None for all time).
            limit: Maximum number of experts to return.

        Returns:
            List of FileExpert sorted by commit count (descending).

        Raises:
            RepoNotFoundError: If repo_path is not a git repository.
            FileNotFoundInRepoError: If file doesn't exist in repo.
        """
        ...


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────── src/bond/tools/githunter/_types.py ──────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Type definitions for Git Hunter tool.

Frozen dataclasses for git blame results, author profiles,
file experts, and PR discussions.
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime


@dataclass(frozen=True)
class AuthorProfile:
    """Git commit author with optional GitHub enrichment.

    Attributes:
        git_email: Author email from git commit.
        git_name: Author name from git commit.
        github_username: GitHub username if resolved from email.
        github_avatar_url: GitHub avatar URL if resolved.
    """

    git_email: str
    git_name: str
    github_username: str | None = None
    github_avatar_url: str | None = None


@dataclass(frozen=True)
class BlameResult:
    """Result of git blame for a single line.

    Attributes:
        line_no: Line number that was blamed.
        content: Content of the line.
        author: Author who last modified the line.
        commit_hash: Full SHA of the commit.
        commit_date: UTC datetime of the commit (author date).
        commit_message: First line of commit message.
        is_boundary: True if this is a shallow clone boundary commit.
    """

    line_no: int
    content: str
    author: AuthorProfile
    commit_hash: str
    commit_date: datetime
    commit_message: str
    is_boundary: bool = False


@dataclass(frozen=True)
class FileExpert:
    """Code ownership expert for a file based on commit history.

    Attributes:
        author: The author profile.
        commit_count: Number of commits touching the file.
        last_commit_date: UTC datetime of most recent commit.
    """

    author: AuthorProfile
    commit_count: int
    last_commit_date: datetime


@dataclass(frozen=True)
class PRDiscussion:
    """Pull request discussion associated with a commit.

    Attributes:
        pr_number: PR number.
        title: PR title.
        body: PR description body.
        url: URL to the PR on GitHub.
        issue_comments: Top-level PR comments (not review comments).
    """

    pr_number: int
    title: str
    body: str
    url: str
    issue_comments: tuple[str, ...]  # Frozen, so use tuple instead of list


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────── src/bond/tools/githunter/tools.py ───────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""GitHunter tools for PydanticAI agents.

This module provides the agent-facing tool functions that use
RunContext to access the GitHunter adapter via dependency injection.
"""

from pathlib import Path

from pydantic_ai import RunContext
from pydantic_ai.tools import Tool

from bond.tools.githunter._exceptions import GitHunterError
from bond.tools.githunter._models import (
    BlameLineRequest,
    Error,
    FindPRDiscussionRequest,
    GetExpertsRequest,
)
from bond.tools.githunter._protocols import GitHunterProtocol
from bond.tools.githunter._types import BlameResult, FileExpert, PRDiscussion


async def blame_line(
    ctx: RunContext[GitHunterProtocol],
    request: BlameLineRequest,
) -> BlameResult | Error:
    """Get blame information for a specific line.

    Agent Usage:
        Call this tool when you need to know who last modified a specific
        line of code, what commit changed it, and when:
        - "Who wrote this line?" → blame_line with the file and line number
        - "When was this changed?" → check commit_date in result
        - "What was the commit message?" → check commit_message in result

    Example:
        ```python
        blame_line({
            "repo_path": "/path/to/repo",
            "file_path": "src/main.py",
            "line_no": 42
        })
        ```

    Returns:
        BlameResult with author, commit hash, date, and message,
        or Error if the operation failed.
    """
    try:
        return await ctx.deps.blame_line(
            repo_path=Path(request.repo_path),
            file_path=request.file_path,
            line_no=request.line_no,
        )
    except GitHunterError as e:
        return Error(description=str(e))


async def find_pr_discussion(
    ctx: RunContext[GitHunterProtocol],
    request: FindPRDiscussionRequest,
) -> PRDiscussion | None | Error:
    """Find the PR discussion for a commit.

    Agent Usage:
        Call this tool when you have a commit hash and want to find
        the pull request discussion that introduced it:
        - "What PR introduced this commit?" → find_pr_discussion
        - "What was discussed when this was merged?" → check PR comments
        - "Why was this change made?" → read PR description and comments

    Example:
        ```python
        find_pr_discussion({
            "repo_path": "/path/to/repo",
            "commit_hash": "abc123def"
        })
        ```

    Returns:
        PRDiscussion with PR number, title, body, and comments,
        None if no PR is associated with the commit,
        or Error if the operation failed.
    """
    try:
        return await ctx.deps.find_pr_discussion(
            repo_path=Path(request.repo_path),
            commit_hash=request.commit_hash,
        )
    except GitHunterError as e:
        return Error(description=str(e))


async def get_file_experts(
    ctx: RunContext[GitHunterProtocol],
    request: GetExpertsRequest,
) -> list[FileExpert] | Error:
    """Get experts for a file based on commit frequency.

    Agent Usage:
        Call this tool when you need to identify who has the most
        knowledge about a file based on their commit history:
        - "Who knows this file best?" → get_file_experts
        - "Who should review changes to this?" → check top experts
        - "Who to ask about this code?" → contact the experts

    Example:
        ```python
        get_file_experts({
            "repo_path": "/path/to/repo",
            "file_path": "src/auth/login.py",
            "window_days": 90,
            "limit": 3
        })
        ```

    Returns:
        List of FileExpert sorted by commit count (descending),
        containing author info, commit count, and last commit date,
        or Error if the operation failed.
    """
    try:
        return await ctx.deps.get_expert_for_file(
            repo_path=Path(request.repo_path),
            file_path=request.file_path,
            window_days=request.window_days,
            limit=request.limit,
        )
    except GitHunterError as e:
        return Error(description=str(e))


# Export as toolset for BondAgent
githunter_toolset: list[Tool[GitHunterProtocol]] = [
    Tool(blame_line),
    Tool(find_pr_discussion),
    Tool(get_file_experts),
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────── src/bond/tools/memory/__init__.py ───────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Memory toolset for Bond agents.

Provides semantic memory storage and retrieval using vector databases.
Default backend: pgvector (PostgreSQL) for unified infrastructure.
"""

from bond.tools.memory._models import (
    CreateMemoryRequest,
    DeleteMemoryRequest,
    Error,
    GetMemoryRequest,
    Memory,
    SearchMemoriesRequest,
    SearchResult,
)
from bond.tools.memory._protocols import AgentMemoryProtocol
from bond.tools.memory.backends import (
    MemoryBackendType,
    PgVectorMemoryStore,
    QdrantMemoryStore,
    create_memory_backend,
)
from bond.tools.memory.tools import memory_toolset

__all__ = [
    # Protocol
    "AgentMemoryProtocol",
    # Models
    "Memory",
    "SearchResult",
    "CreateMemoryRequest",
    "SearchMemoriesRequest",
    "DeleteMemoryRequest",
    "GetMemoryRequest",
    "Error",
    # Toolset
    "memory_toolset",
    # Backend factory
    "MemoryBackendType",
    "create_memory_backend",
    # Backend implementations
    "PgVectorMemoryStore",
    "QdrantMemoryStore",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
─────────────────────────────────────────────────────────────── src/bond/tools/memory/_models.py ───────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Memory data models."""

from datetime import datetime
from typing import Annotated
from uuid import UUID

from pydantic import BaseModel, Field


class Memory(BaseModel):
    """A stored memory unit.

    Memories are the fundamental storage unit in Bond's memory system.
    Each memory has content, metadata for filtering, and an embedding
    for semantic search.
    """

    id: Annotated[
        UUID,
        Field(description="Unique identifier for this memory"),
    ]

    content: Annotated[
        str,
        Field(description="The actual content of the memory"),
    ]

    created_at: Annotated[
        datetime,
        Field(description="When this memory was created"),
    ]

    agent_id: Annotated[
        str,
        Field(description="ID of the agent that created this memory"),
    ]

    conversation_id: Annotated[
        str | None,
        Field(description="Optional conversation context for this memory"),
    ] = None

    tags: Annotated[
        list[str],
        Field(description="Tags for filtering memories"),
    ] = Field(default_factory=list)


class SearchResult(BaseModel):
    """Memory with similarity score from search."""

    memory: Annotated[
        Memory,
        Field(description="The matched memory"),
    ]

    score: Annotated[
        float,
        Field(description="Similarity score (higher is more similar)"),
    ]


class CreateMemoryRequest(BaseModel):
    """Request to create a new memory.

    The agent provides content and metadata. Embeddings can be
    pre-computed or left for the backend to generate.
    """

    content: Annotated[
        str,
        Field(description="Content to store as a memory"),
    ]

    agent_id: Annotated[
        str,
        Field(description="ID of the agent creating this memory"),
    ]

    tenant_id: Annotated[
        UUID,
        Field(description="Tenant UUID for multi-tenant isolation"),
    ]

    conversation_id: Annotated[
        str | None,
        Field(description="Optional conversation context"),
    ] = None

    tags: Annotated[
        list[str],
        Field(description="Tags for categorizing and filtering"),
    ] = Field(default_factory=list)

    embedding: Annotated[
        list[float] | None,
        Field(description="Pre-computed embedding (Bond generates if not provided)"),
    ] = None

    embedding_model: Annotated[
        str | None,
        Field(description="Override default embedding model for this operation"),
    ] = None


class SearchMemoriesRequest(BaseModel):
    """Request to search memories by semantic similarity.

    Supports hybrid search: top-k results filtered by score threshold
    and optional tag/agent filtering.
    """

    query: Annotated[
        str,
        Field(description="Search query text"),
    ]

    tenant_id: Annotated[
        UUID,
        Field(description="Tenant UUID for multi-tenant isolation"),
    ]

    top_k: Annotated[
        int,
        Field(description="Maximum number of results to return", ge=1, le=100),
    ] = 10

    score_threshold: Annotated[
        float | None,
        Field(description="Minimum similarity score (0-1) to include in results"),
    ] = None

    tags: Annotated[
        list[str] | None,
        Field(description="Filter by memories containing these tags"),
    ] = None

    agent_id: Annotated[
        str | None,
        Field(description="Filter by agent that created the memories"),
    ] = None

    embedding_model: Annotated[
        str | None,
        Field(description="Override default embedding model for this search"),
    ] = None


class DeleteMemoryRequest(BaseModel):
    """Request to delete a memory by ID."""

    memory_id: Annotated[
        UUID,
        Field(description="UUID of the memory to delete"),
    ]

    tenant_id: Annotated[
        UUID,
        Field(description="Tenant UUID for multi-tenant isolation"),
    ]


class GetMemoryRequest(BaseModel):
    """Request to retrieve a memory by ID."""

    memory_id: Annotated[
        UUID,
        Field(description="UUID of the memory to retrieve"),
    ]

    tenant_id: Annotated[
        UUID,
        Field(description="Tenant UUID for multi-tenant isolation"),
    ]


class Error(BaseModel):
    """Error response from memory operations.

    Used as union return type: `Memory | Error` or `list[SearchResult] | Error`
    """

    description: Annotated[
        str,
        Field(description="Error message explaining what went wrong"),
    ]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/tools/memory/_protocols.py ──────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Memory protocol - interface for memory backends.

All operations are scoped to a tenant for multi-tenant isolation.
This ensures memories are always scoped correctly and enables
efficient indexing on tenant boundaries.
"""

from typing import Protocol
from uuid import UUID

from bond.tools.memory._models import Error, Memory, SearchResult


class AgentMemoryProtocol(Protocol):
    """Protocol for memory storage backends.

    All operations require tenant_id for multi-tenant isolation.
    This ensures memories are always scoped correctly and enables
    efficient indexing on tenant boundaries.

    Implementations:
        - PgVectorMemoryStore: PostgreSQL + pgvector (default)
        - QdrantMemoryStore: Qdrant vector database
    """

    async def store(
        self,
        content: str,
        agent_id: str,
        *,
        tenant_id: UUID,
        conversation_id: str | None = None,
        tags: list[str] | None = None,
        embedding: list[float] | None = None,
        embedding_model: str | None = None,
    ) -> Memory | Error:
        """Store a memory and return the created Memory object.

        Args:
            content: The text content to store.
            agent_id: ID of the agent creating this memory.
            tenant_id: Tenant UUID for multi-tenant isolation (required).
            conversation_id: Optional conversation context.
            tags: Optional tags for filtering.
            embedding: Pre-computed embedding (backend generates if None).
            embedding_model: Override default embedding model.

        Returns:
            The created Memory on success, or Error on failure.
        """
        ...

    async def search(
        self,
        query: str,
        *,
        tenant_id: UUID,
        top_k: int = 10,
        score_threshold: float | None = None,
        tags: list[str] | None = None,
        agent_id: str | None = None,
        embedding_model: str | None = None,
    ) -> list[SearchResult] | Error:
        """Search memories by semantic similarity.

        Args:
            query: Search query text.
            tenant_id: Tenant UUID for multi-tenant isolation (required).
            top_k: Maximum number of results.
            score_threshold: Minimum similarity score to include.
            tags: Filter by memories with these tags.
            agent_id: Filter by creating agent.
            embedding_model: Override default embedding model.

        Returns:
            List of SearchResult ordered by similarity, or Error on failure.
        """
        ...

    async def delete(self, memory_id: UUID, *, tenant_id: UUID) -> bool | Error:
        """Delete a memory by ID.

        Args:
            memory_id: The UUID of the memory to delete.
            tenant_id: Tenant UUID for multi-tenant isolation (required).

        Returns:
            True if deleted, False if not found, or Error on failure.
        """
        ...

    async def get(self, memory_id: UUID, *, tenant_id: UUID) -> Memory | None | Error:
        """Retrieve a specific memory by ID.

        Args:
            memory_id: The UUID of the memory to retrieve.
            tenant_id: Tenant UUID for multi-tenant isolation (required).

        Returns:
            The Memory if found, None if not found, or Error on failure.
        """
        ...


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────── src/bond/tools/memory/backends/__init__.py ──────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Memory backend implementations.

Provides factory function for backend selection based on configuration.
Default: pgvector (PostgreSQL) for unified infrastructure.
"""

from enum import Enum
from typing import TYPE_CHECKING

from bond.tools.memory.backends.pgvector import PgVectorMemoryStore
from bond.tools.memory.backends.qdrant import QdrantMemoryStore

if TYPE_CHECKING:
    from asyncpg import Pool


class MemoryBackendType(str, Enum):
    """Supported memory backend types."""

    PGVECTOR = "pgvector"
    QDRANT = "qdrant"


def create_memory_backend(
    backend_type: MemoryBackendType = MemoryBackendType.PGVECTOR,
    *,
    # pgvector options
    pool: "Pool | None" = None,
    table_name: str = "agent_memories",
    # qdrant options
    qdrant_url: str | None = None,
    qdrant_api_key: str | None = None,
    collection_name: str = "memories",
    # shared options
    embedding_model: str = "openai:text-embedding-3-small",
) -> PgVectorMemoryStore | QdrantMemoryStore:
    """Create a memory backend based on configuration.

    Args:
        backend_type: Which backend to use (default: pgvector).
        pool: asyncpg Pool (required for pgvector).
        table_name: Postgres table name (pgvector only).
        qdrant_url: Qdrant server URL (qdrant only, None = in-memory).
        qdrant_api_key: Qdrant API key (qdrant only).
        collection_name: Qdrant collection (qdrant only).
        embedding_model: Model for embeddings (both backends).

    Returns:
        Configured memory backend instance.

    Raises:
        ValueError: If pgvector selected but no pool provided.

    Example:
        ```python
        # pgvector (recommended)
        memory = create_memory_backend(
            backend_type=MemoryBackendType.PGVECTOR,
            pool=app_db.pool,
        )

        # Qdrant (for specific use cases)
        memory = create_memory_backend(
            backend_type=MemoryBackendType.QDRANT,
            qdrant_url="http://localhost:6333",
        )
        ```
    """
    if backend_type == MemoryBackendType.PGVECTOR:
        if pool is None:
            raise ValueError("pgvector backend requires asyncpg Pool")
        return PgVectorMemoryStore(
            pool=pool,
            table_name=table_name,
            embedding_model=embedding_model,
        )
    else:
        return QdrantMemoryStore(
            collection_name=collection_name,
            embedding_model=embedding_model,
            qdrant_url=qdrant_url,
            qdrant_api_key=qdrant_api_key,
        )


__all__ = [
    "MemoryBackendType",
    "PgVectorMemoryStore",
    "QdrantMemoryStore",
    "create_memory_backend",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────── src/bond/tools/memory/backends/pgvector.py ──────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""PostgreSQL + pgvector memory backend.

Uses existing asyncpg pool from dataing for zero additional infrastructure.
Provides transactional consistency with application data.
"""

from datetime import UTC, datetime
from uuid import UUID, uuid4

from asyncpg import Pool
from pydantic_ai.embeddings import Embedder

from bond.tools.memory._models import Error, Memory, SearchResult


class PgVectorMemoryStore:
    """pgvector-backed memory store using PydanticAI Embedder.

    Benefits over Qdrant:
    - No separate infrastructure (uses existing Postgres)
    - Transactional consistency (CASCADE deletes, atomic commits)
    - Native tenant isolation via SQL WHERE clauses
    - Unified backup/restore with application data

    Example:
        ```python
        # Inject pool from dataing's AppDatabase
        store = PgVectorMemoryStore(pool=app_db.pool)

        # With OpenAI embeddings
        store = PgVectorMemoryStore(
            pool=app_db.pool,
            embedding_model="openai:text-embedding-3-small",
        )
        ```
    """

    def __init__(
        self,
        pool: Pool,
        table_name: str = "agent_memories",
        embedding_model: str = "openai:text-embedding-3-small",
    ) -> None:
        """Initialize the pgvector memory store.

        Args:
            pool: asyncpg connection pool (typically from AppDatabase).
            table_name: Name of the memories table.
            embedding_model: PydanticAI embedding model string.
        """
        self._pool = pool
        self._table = table_name
        self._embedder = Embedder(embedding_model)

    async def _embed(self, text: str) -> list[float]:
        """Generate embedding using PydanticAI Embedder.

        This is non-blocking (runs in thread pool) and instrumented.
        """
        result = await self._embedder.embed_query(text)
        return list(result.embeddings[0])

    async def store(
        self,
        content: str,
        agent_id: str,
        *,
        tenant_id: UUID,
        conversation_id: str | None = None,
        tags: list[str] | None = None,
        embedding: list[float] | None = None,
        embedding_model: str | None = None,
    ) -> Memory | Error:
        """Store memory with transactional guarantee."""
        try:
            vector = embedding if embedding else await self._embed(content)
            memory_id = uuid4()
            created_at = datetime.now(UTC)

            await self._pool.execute(
                f"""
                INSERT INTO {self._table}
                (id, tenant_id, agent_id, content, conversation_id, tags, embedding, created_at)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                """,
                memory_id,
                tenant_id,
                agent_id,
                content,
                conversation_id,
                tags or [],
                str(vector),  # pgvector accepts string representation
                created_at,
            )

            return Memory(
                id=memory_id,
                content=content,
                created_at=created_at,
                agent_id=agent_id,
                conversation_id=conversation_id,
                tags=tags or [],
            )
        except Exception as e:
            return Error(description=f"Failed to store memory: {e}")

    async def search(
        self,
        query: str,
        *,
        tenant_id: UUID,
        top_k: int = 10,
        score_threshold: float | None = None,
        tags: list[str] | None = None,
        agent_id: str | None = None,
        embedding_model: str | None = None,
    ) -> list[SearchResult] | Error:
        """Semantic search using cosine similarity.

        Note: Postgres '<=>' operator returns distance (0=same, 2=opposite).
        We convert distance to similarity (1 - distance) for the interface.
        """
        try:
            query_vector = await self._embed(query)

            # Build query with filters
            conditions = ["tenant_id = $1"]
            args: list[object] = [tenant_id, str(query_vector), top_k]

            if agent_id:
                conditions.append(f"agent_id = ${len(args) + 1}")
                args.append(agent_id)

            if tags:
                conditions.append(f"tags @> ${len(args) + 1}")
                args.append(tags)

            where_clause = " AND ".join(conditions)

            # Score threshold filter (cosine similarity = 1 - distance)
            score_filter = ""
            if score_threshold:
                score_filter = f"AND (1 - (embedding <=> $2)) >= {score_threshold}"

            rows = await self._pool.fetch(
                f"""
                SELECT id, content, conversation_id, tags, agent_id, created_at,
                       1 - (embedding <=> $2) AS score
                FROM {self._table}
                WHERE {where_clause} {score_filter}
                ORDER BY embedding <=> $2
                LIMIT $3
                """,
                *args,
            )

            return [
                SearchResult(
                    memory=Memory(
                        id=row["id"],
                        content=row["content"],
                        created_at=row["created_at"],
                        agent_id=row["agent_id"],
                        conversation_id=row["conversation_id"],
                        tags=list(row["tags"]) if row["tags"] else [],
                    ),
                    score=row["score"],
                )
                for row in rows
            ]
        except Exception as e:
            return Error(description=f"Failed to search memories: {e}")

    async def delete(self, memory_id: UUID, *, tenant_id: UUID) -> bool | Error:
        """Hard delete a specific memory (scoped to tenant for safety)."""
        try:
            result = await self._pool.execute(
                f"DELETE FROM {self._table} WHERE id = $1 AND tenant_id = $2",
                memory_id,
                tenant_id,
            )
            # asyncpg returns "DELETE N" where N is row count
            return "DELETE 1" in result
        except Exception as e:
            return Error(description=f"Failed to delete memory: {e}")

    async def get(self, memory_id: UUID, *, tenant_id: UUID) -> Memory | None | Error:
        """Retrieve a specific memory by ID (scoped to tenant)."""
        try:
            row = await self._pool.fetchrow(
                f"""
                SELECT id, content, conversation_id, tags, agent_id, created_at
                FROM {self._table}
                WHERE id = $1 AND tenant_id = $2
                """,
                memory_id,
                tenant_id,
            )

            if not row:
                return None

            return Memory(
                id=row["id"],
                content=row["content"],
                created_at=row["created_at"],
                agent_id=row["agent_id"],
                conversation_id=row["conversation_id"],
                tags=list(row["tags"]) if row["tags"] else [],
            )
        except Exception as e:
            return Error(description=f"Failed to retrieve memory: {e}")


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
─────────────────────────────────────────────────────────── src/bond/tools/memory/backends/qdrant.py ───────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Qdrant memory backend implementation.

This module provides a Qdrant-backed implementation of AgentMemoryProtocol
using PydanticAI Embedder for non-blocking, instrumented embeddings.
"""

from datetime import UTC, datetime
from uuid import UUID, uuid4

from pydantic_ai.embeddings import Embedder
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import (
    Distance,
    FieldCondition,
    Filter,
    MatchValue,
    PointStruct,
    VectorParams,
)

from bond.tools.memory._models import Error, Memory, SearchResult


class QdrantMemoryStore:
    """Qdrant-backed memory store using PydanticAI Embedder.

    Benefits over raw sentence-transformers:
    - Non-blocking embeddings (runs in thread pool via run_in_executor)
    - Supports OpenAI, Cohere, and Local models seamlessly
    - Automatic cost/latency tracking via OpenTelemetry
    - Zero-refactor provider swapping

    Example:
        ```python
        # In-memory for development/testing (local embeddings)
        store = QdrantMemoryStore()

        # Persistent with local embeddings
        store = QdrantMemoryStore(qdrant_url="http://localhost:6333")

        # OpenAI embeddings
        store = QdrantMemoryStore(
            embedding_model="openai:text-embedding-3-small",
            qdrant_url="http://localhost:6333",
        )
        ```
    """

    def __init__(
        self,
        collection_name: str = "memories",
        embedding_model: str = "sentence-transformers:all-MiniLM-L6-v2",
        qdrant_url: str | None = None,
        qdrant_api_key: str | None = None,
    ) -> None:
        """Initialize the Qdrant memory store.

        Args:
            collection_name: Name of the Qdrant collection.
            embedding_model: Embedding model string. Supports:
                - "sentence-transformers:all-MiniLM-L6-v2" (local, default)
                - "openai:text-embedding-3-small"
                - "cohere:embed-english-v3.0"
            qdrant_url: Qdrant server URL. None = in-memory (for dev/testing).
            qdrant_api_key: Optional API key for Qdrant Cloud.
        """
        self._collection = collection_name

        # PydanticAI Embedder handles model logic + instrumentation
        self._embedder = Embedder(embedding_model)

        # Use AsyncQdrantClient for true async operation
        if qdrant_url:
            self._client = AsyncQdrantClient(url=qdrant_url, api_key=qdrant_api_key)
        else:
            self._client = AsyncQdrantClient(":memory:")

        self._initialized = False

    async def _ensure_collection(self) -> None:
        """Lazy init collection with correct dimensions."""
        if self._initialized:
            return

        # Determine dimensions dynamically by generating a dummy embedding
        # Works for ANY provider (OpenAI, Cohere, Local)
        dummy_result = await self._embedder.embed_query("warmup")
        dimensions = len(dummy_result.embeddings[0])

        # Check and create collection
        collections = await self._client.get_collections()
        exists = any(c.name == self._collection for c in collections.collections)

        if not exists:
            await self._client.create_collection(
                self._collection,
                vectors_config=VectorParams(
                    size=dimensions,
                    distance=Distance.COSINE,
                ),
            )

        self._initialized = True

    async def _embed(self, text: str) -> list[float]:
        """Generate embedding using PydanticAI Embedder.

        This is non-blocking (runs in thread pool) and instrumented.
        """
        result = await self._embedder.embed_query(text)
        return list(result.embeddings[0])

    def _build_filters(
        self,
        tenant_id: UUID,
        tags: list[str] | None,
        agent_id: str | None,
    ) -> Filter:
        """Build Qdrant filter from parameters."""
        conditions: list[FieldCondition] = [
            # Always filter by tenant_id for multi-tenant isolation
            FieldCondition(key="tenant_id", match=MatchValue(value=str(tenant_id)))
        ]
        if agent_id:
            conditions.append(FieldCondition(key="agent_id", match=MatchValue(value=agent_id)))
        if tags:
            for tag in tags:
                conditions.append(FieldCondition(key="tags", match=MatchValue(value=tag)))
        return Filter(must=conditions)

    async def store(
        self,
        content: str,
        agent_id: str,
        *,
        tenant_id: UUID,
        conversation_id: str | None = None,
        tags: list[str] | None = None,
        embedding: list[float] | None = None,
        embedding_model: str | None = None,
    ) -> Memory | Error:
        """Store memory with embedding."""
        try:
            await self._ensure_collection()

            # Use provided embedding or generate one
            vector = embedding if embedding else await self._embed(content)

            memory = Memory(
                id=uuid4(),
                content=content,
                created_at=datetime.now(UTC),
                agent_id=agent_id,
                conversation_id=conversation_id,
                tags=tags or [],
            )

            # Include tenant_id in payload for filtering
            payload = memory.model_dump(mode="json")
            payload["tenant_id"] = str(tenant_id)

            await self._client.upsert(
                self._collection,
                points=[
                    PointStruct(
                        id=str(memory.id),
                        vector=vector,
                        payload=payload,
                    )
                ],
            )
            return memory
        except Exception as e:
            return Error(description=f"Failed to store memory: {e}")

    async def search(
        self,
        query: str,
        *,
        tenant_id: UUID,
        top_k: int = 10,
        score_threshold: float | None = None,
        tags: list[str] | None = None,
        agent_id: str | None = None,
        embedding_model: str | None = None,
    ) -> list[SearchResult] | Error:
        """Semantic search with optional filtering."""
        try:
            await self._ensure_collection()

            query_vector = await self._embed(query)
            filters = self._build_filters(tenant_id, tags, agent_id)

            # Use query_points (qdrant-client >= 1.7.0)
            response = await self._client.query_points(
                self._collection,
                query=query_vector,
                limit=top_k,
                score_threshold=score_threshold,
                query_filter=filters,
            )

            results: list[SearchResult] = []
            for r in response.points:
                payload = r.payload
                if payload is None:
                    continue
                results.append(
                    SearchResult(
                        memory=Memory(
                            id=UUID(payload["id"]),
                            content=payload["content"],
                            created_at=datetime.fromisoformat(payload["created_at"]),
                            agent_id=payload["agent_id"],
                            conversation_id=payload.get("conversation_id"),
                            tags=payload.get("tags", []),
                        ),
                        score=r.score,
                    )
                )
            return results
        except Exception as e:
            return Error(description=f"Failed to search memories: {e}")

    async def delete(self, memory_id: UUID, *, tenant_id: UUID) -> bool | Error:
        """Delete a memory by ID (scoped to tenant)."""
        try:
            await self._ensure_collection()

            # Use filter to ensure tenant isolation
            await self._client.delete(
                self._collection,
                points_selector=Filter(
                    must=[
                        FieldCondition(key="id", match=MatchValue(value=str(memory_id))),
                        FieldCondition(key="tenant_id", match=MatchValue(value=str(tenant_id))),
                    ]
                ),
            )
            return True
        except Exception as e:
            return Error(description=f"Failed to delete memory: {e}")

    async def get(self, memory_id: UUID, *, tenant_id: UUID) -> Memory | None | Error:
        """Retrieve a specific memory by ID (scoped to tenant)."""
        try:
            await self._ensure_collection()
            results = await self._client.retrieve(
                self._collection,
                ids=[str(memory_id)],
            )
            if results:
                payload = results[0].payload
                if payload is None:
                    return None
                # Verify tenant ownership
                if payload.get("tenant_id") != str(tenant_id):
                    return None
                return Memory(
                    id=UUID(payload["id"]),
                    content=payload["content"],
                    created_at=datetime.fromisoformat(payload["created_at"]),
                    agent_id=payload["agent_id"],
                    conversation_id=payload.get("conversation_id"),
                    tags=payload.get("tags", []),
                )
            return None
        except Exception as e:
            return Error(description=f"Failed to retrieve memory: {e}")


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────────── src/bond/tools/memory/tools.py ────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Memory tools for PydanticAI agents.

This module provides the agent-facing tool functions that use
RunContext to access the memory backend via dependency injection.
"""

from pydantic_ai import RunContext
from pydantic_ai.tools import Tool

from bond.tools.memory._models import (
    CreateMemoryRequest,
    DeleteMemoryRequest,
    Error,
    GetMemoryRequest,
    Memory,
    SearchMemoriesRequest,
    SearchResult,
)
from bond.tools.memory._protocols import AgentMemoryProtocol


async def create_memory(
    ctx: RunContext[AgentMemoryProtocol],
    request: CreateMemoryRequest,
) -> Memory | Error:
    """Store a new memory for later retrieval.

    Agent Usage:
        Call this tool to remember information for future conversations:
        - User preferences: "Remember that I prefer dark mode"
        - Important facts: "Note that the project deadline is March 15"
        - Context: "Store that we discussed the authentication flow"

    Example:
        ```python
        create_memory({
            "content": "User prefers dark mode and compact view",
            "agent_id": "assistant",
            "tenant_id": "550e8400-e29b-41d4-a716-446655440000",
            "tags": ["preferences", "ui"]
        })
        ```

    Returns:
        The created Memory object with its ID, or an Error if storage failed.
    """
    result: Memory | Error = await ctx.deps.store(
        content=request.content,
        agent_id=request.agent_id,
        tenant_id=request.tenant_id,
        conversation_id=request.conversation_id,
        tags=request.tags,
        embedding=request.embedding,
        embedding_model=request.embedding_model,
    )
    return result


async def search_memories(
    ctx: RunContext[AgentMemoryProtocol],
    request: SearchMemoriesRequest,
) -> list[SearchResult] | Error:
    """Search memories by semantic similarity.

    Agent Usage:
        Call this tool to recall relevant information:
        - Find preferences: "What are the user's UI preferences?"
        - Recall context: "What did we discuss about authentication?"
        - Find related: "Search for memories about the project deadline"

    Example:
        ```python
        search_memories({
            "query": "user interface preferences",
            "tenant_id": "550e8400-e29b-41d4-a716-446655440000",
            "top_k": 5,
            "tags": ["preferences"]
        })
        ```

    Returns:
        List of SearchResult with memories and similarity scores,
        ordered by relevance (highest score first).
    """
    result: list[SearchResult] | Error = await ctx.deps.search(
        query=request.query,
        tenant_id=request.tenant_id,
        top_k=request.top_k,
        score_threshold=request.score_threshold,
        tags=request.tags,
        agent_id=request.agent_id,
        embedding_model=request.embedding_model,
    )
    return result


async def delete_memory(
    ctx: RunContext[AgentMemoryProtocol],
    request: DeleteMemoryRequest,
) -> bool | Error:
    """Delete a memory by ID.

    Agent Usage:
        Call this tool to remove outdated or incorrect memories:
        - Remove stale: "Delete the old deadline memory"
        - Correct mistakes: "Remove the incorrect preference"

    Example:
        ```python
        delete_memory({
            "memory_id": "550e8400-e29b-41d4-a716-446655440000",
            "tenant_id": "660e8400-e29b-41d4-a716-446655440000"
        })
        ```

    Returns:
        True if deleted, False if not found, or Error if deletion failed.
    """
    result: bool | Error = await ctx.deps.delete(
        request.memory_id,
        tenant_id=request.tenant_id,
    )
    return result


async def get_memory(
    ctx: RunContext[AgentMemoryProtocol],
    request: GetMemoryRequest,
) -> Memory | None | Error:
    """Retrieve a specific memory by ID.

    Agent Usage:
        Call this tool to get details of a specific memory:
        - Verify content: "Get the full text of memory X"
        - Check metadata: "What tags does memory X have?"

    Example:
        ```python
        get_memory({
            "memory_id": "550e8400-e29b-41d4-a716-446655440000",
            "tenant_id": "660e8400-e29b-41d4-a716-446655440000"
        })
        ```

    Returns:
        The Memory if found, None if not found, or Error if retrieval failed.
    """
    result: Memory | None | Error = await ctx.deps.get(
        request.memory_id,
        tenant_id=request.tenant_id,
    )
    return result


# Export as toolset for BondAgent
memory_toolset: list[Tool[AgentMemoryProtocol]] = [
    Tool(create_memory),
    Tool(search_memories),
    Tool(delete_memory),
    Tool(get_memory),
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────── src/bond/tools/schema/__init__.py ───────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Schema toolset for Bond agents.

Provides on-demand schema lookup for database tables and lineage.
"""

from bond.tools.schema._models import (
    ColumnSchema,
    GetDownstreamRequest,
    GetTableSchemaRequest,
    GetUpstreamRequest,
    ListTablesRequest,
    TableSchema,
)
from bond.tools.schema._protocols import SchemaLookupProtocol
from bond.tools.schema.tools import schema_toolset

__all__ = [
    # Protocol
    "SchemaLookupProtocol",
    # Models
    "GetTableSchemaRequest",
    "ListTablesRequest",
    "GetUpstreamRequest",
    "GetDownstreamRequest",
    "TableSchema",
    "ColumnSchema",
    # Toolset
    "schema_toolset",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
─────────────────────────────────────────────────────────────── src/bond/tools/schema/_models.py ───────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Pydantic models for schema tools."""

from __future__ import annotations

from pydantic import BaseModel, Field


class GetTableSchemaRequest(BaseModel):
    """Request to get schema for a specific table."""

    table_name: str = Field(..., description="Table name (can be qualified like schema.table)")


class ListTablesRequest(BaseModel):
    """Request to list available tables."""

    pattern: str | None = Field(None, description="Optional glob pattern to filter tables")


class GetUpstreamRequest(BaseModel):
    """Request to get upstream dependencies."""

    table_name: str = Field(..., description="Table name to get upstream for")


class GetDownstreamRequest(BaseModel):
    """Request to get downstream dependencies."""

    table_name: str = Field(..., description="Table name to get downstream for")


class ColumnSchema(BaseModel):
    """Schema information for a single column."""

    name: str
    data_type: str
    native_type: str | None = None
    nullable: bool = True
    is_primary_key: bool = False
    is_partition_key: bool = False
    description: str | None = None
    default_value: str | None = None


class TableSchema(BaseModel):
    """Schema information for a table."""

    name: str
    columns: list[ColumnSchema]
    schema_name: str | None = None
    catalog_name: str | None = None
    description: str | None = None

    @property
    def qualified_name(self) -> str:
        """Get fully qualified table name."""
        parts = []
        if self.catalog_name:
            parts.append(self.catalog_name)
        if self.schema_name:
            parts.append(self.schema_name)
        parts.append(self.name)
        return ".".join(parts)


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/tools/schema/_protocols.py ──────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Protocol definitions for schema lookup tools.

This module defines the interface that schema lookup implementations
must satisfy. The protocol is runtime-checkable for flexibility.
"""

from __future__ import annotations

from typing import Any, Protocol, runtime_checkable


@runtime_checkable
class SchemaLookupProtocol(Protocol):
    """Protocol for schema lookup operations.

    Implementations provide access to database schema information
    and lineage data for agent tools.
    """

    async def get_table_schema(self, table_name: str) -> dict[str, Any] | None:
        """Get schema for a specific table.

        Args:
            table_name: Name of the table (can be qualified like schema.table).

        Returns:
            Table schema as dict with columns, types, etc. or None if not found.
        """
        ...

    async def list_tables(self) -> list[str]:
        """List all available table names.

        Returns:
            List of table names (may be qualified).
        """
        ...

    async def get_upstream(self, table_name: str) -> list[str]:
        """Get upstream dependencies for a table.

        Args:
            table_name: Name of the table.

        Returns:
            List of upstream table names.
        """
        ...

    async def get_downstream(self, table_name: str) -> list[str]:
        """Get downstream dependencies for a table.

        Args:
            table_name: Name of the table.

        Returns:
            List of downstream table names.
        """
        ...


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────────── src/bond/tools/schema/tools.py ────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Schema tools for PydanticAI agents.

This module provides agent-facing tool functions that use
RunContext to access schema lookup via dependency injection.
"""

from __future__ import annotations

from typing import Any

from pydantic_ai import RunContext
from pydantic_ai.tools import Tool

from bond.tools.schema._models import (
    GetDownstreamRequest,
    GetTableSchemaRequest,
    GetUpstreamRequest,
    ListTablesRequest,
)
from bond.tools.schema._protocols import SchemaLookupProtocol


async def get_table_schema(
    ctx: RunContext[SchemaLookupProtocol],
    request: GetTableSchemaRequest,
) -> dict[str, Any] | None:
    """Get the full schema for a specific table.

    Agent Usage:
        Call this tool to get column details for a table you need to query:
        - Get join columns: "What columns does the customers table have?"
        - Check types: "What's the data type of the created_at column?"
        - Find keys: "Which columns are primary/partition keys?"

    Example:
        ```python
        get_table_schema({"table_name": "customers"})
        ```

    Returns:
        Full table schema as JSON with columns, types, keys, etc.
        Returns None if table not found.
    """
    return await ctx.deps.get_table_schema(request.table_name)


async def list_tables(
    ctx: RunContext[SchemaLookupProtocol],
    request: ListTablesRequest,
) -> list[str]:
    """List all available tables in the database.

    Agent Usage:
        Call this tool to discover what tables exist:
        - Find tables: "What tables are available?"
        - Explore schema: "List all tables to understand the data model"

    Example:
        ```python
        list_tables({})
        ```

    Returns:
        List of table names (may be qualified like schema.table).
    """
    return await ctx.deps.list_tables()


async def get_upstream_tables(
    ctx: RunContext[SchemaLookupProtocol],
    request: GetUpstreamRequest,
) -> list[str]:
    """Get tables that feed data into the specified table.

    Agent Usage:
        Call this tool to understand data lineage:
        - Find sources: "Where does the orders table get its data from?"
        - Trace issues: "What upstream tables might cause this anomaly?"

    Example:
        ```python
        get_upstream_tables({"table_name": "orders"})
        ```

    Returns:
        List of upstream table names (data sources for this table).
    """
    return await ctx.deps.get_upstream(request.table_name)


async def get_downstream_tables(
    ctx: RunContext[SchemaLookupProtocol],
    request: GetDownstreamRequest,
) -> list[str]:
    """Get tables that consume data from the specified table.

    Agent Usage:
        Call this tool to understand data impact:
        - Find dependents: "What tables use data from orders?"
        - Assess impact: "What would be affected by this anomaly?"

    Example:
        ```python
        get_downstream_tables({"table_name": "orders"})
        ```

    Returns:
        List of downstream table names (tables that depend on this one).
    """
    return await ctx.deps.get_downstream(request.table_name)


# Export as toolset for BondAgent
schema_toolset: list[Tool[SchemaLookupProtocol]] = [
    Tool(get_table_schema),
    Tool(list_tables),
    Tool(get_upstream_tables),
    Tool(get_downstream_tables),
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────── src/bond/trace/__init__.py ──────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Trace module: Forensic capture and replay for agent executions.

Provides tools for recording all StreamHandlers events during agent runs
and replaying them later for debugging, auditing, and analysis.

Example:
    from bond.trace import JSONFileTraceStore, create_capture_handlers, TraceReplayer

    # Capture during execution
    store = JSONFileTraceStore()
    handlers, trace_id = create_capture_handlers(store)
    result = await agent.ask("hello", stream=handlers)

    # Replay later
    replayer = TraceReplayer(store, trace_id)
    async for event in replayer:
        print(f"{event.event_type}: {event.payload}")
"""

from bond.trace._models import (
    ALL_EVENT_TYPES,
    EVENT_BLOCK_END,
    EVENT_BLOCK_START,
    EVENT_COMPLETE,
    EVENT_TEXT_DELTA,
    EVENT_THINKING_DELTA,
    EVENT_TOOL_CALL_DELTA,
    EVENT_TOOL_EXECUTE,
    EVENT_TOOL_RESULT,
    STATUS_COMPLETE,
    STATUS_FAILED,
    STATUS_IN_PROGRESS,
    TraceEvent,
    TraceMeta,
)
from bond.trace._protocols import TraceStorageProtocol
from bond.trace.backends import JSONFileTraceStore
from bond.trace.capture import create_capture_handlers, finalize_capture
from bond.trace.replay import TraceReplayer

__all__ = [
    # Models
    "TraceEvent",
    "TraceMeta",
    # Protocol
    "TraceStorageProtocol",
    # Backends
    "JSONFileTraceStore",
    # Capture
    "create_capture_handlers",
    "finalize_capture",
    # Replay
    "TraceReplayer",
    # Event type constants
    "EVENT_BLOCK_START",
    "EVENT_BLOCK_END",
    "EVENT_TEXT_DELTA",
    "EVENT_THINKING_DELTA",
    "EVENT_TOOL_CALL_DELTA",
    "EVENT_TOOL_EXECUTE",
    "EVENT_TOOL_RESULT",
    "EVENT_COMPLETE",
    "ALL_EVENT_TYPES",
    # Status constants
    "STATUS_IN_PROGRESS",
    "STATUS_COMPLETE",
    "STATUS_FAILED",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────── src/bond/trace/_models.py ───────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Trace event models for forensic capture and replay.

Provides Pydantic models for trace events and metadata that capture
all 8 StreamHandlers callback types for persistence and replay.
"""

from __future__ import annotations

from datetime import datetime
from typing import Any

from pydantic import BaseModel, ConfigDict

# Event type constants matching StreamHandlers callbacks
EVENT_BLOCK_START = "block_start"
EVENT_BLOCK_END = "block_end"
EVENT_TEXT_DELTA = "text_delta"
EVENT_THINKING_DELTA = "thinking_delta"
EVENT_TOOL_CALL_DELTA = "tool_call_delta"
EVENT_TOOL_EXECUTE = "tool_execute"
EVENT_TOOL_RESULT = "tool_result"
EVENT_COMPLETE = "complete"

ALL_EVENT_TYPES = frozenset(
    {
        EVENT_BLOCK_START,
        EVENT_BLOCK_END,
        EVENT_TEXT_DELTA,
        EVENT_THINKING_DELTA,
        EVENT_TOOL_CALL_DELTA,
        EVENT_TOOL_EXECUTE,
        EVENT_TOOL_RESULT,
        EVENT_COMPLETE,
    }
)


class TraceEvent(BaseModel):
    """A single event in an execution trace.

    Captures one callback from StreamHandlers with full context
    for later replay or analysis.

    Attributes:
        trace_id: UUID identifying this trace session.
        sequence: Zero-indexed order within the trace.
        timestamp: Monotonic clock value for relative ordering.
        wall_time: Human-readable UTC timestamp.
        event_type: One of the 8 callback types.
        payload: Event-specific data (varies by event_type).
    """

    model_config = ConfigDict(frozen=True)

    trace_id: str
    sequence: int
    timestamp: float
    wall_time: datetime
    event_type: str
    payload: dict[str, Any]


# Trace status constants
STATUS_IN_PROGRESS = "in_progress"
STATUS_COMPLETE = "complete"
STATUS_FAILED = "failed"


class TraceMeta(BaseModel):
    """Metadata about a stored trace.

    Provides summary information without loading all events.

    Attributes:
        trace_id: UUID identifying this trace.
        created_at: When the trace was started.
        event_count: Number of events in the trace.
        status: One of "in_progress", "complete", "failed".
    """

    model_config = ConfigDict(frozen=True)

    trace_id: str
    created_at: datetime
    event_count: int
    status: str


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────────── src/bond/trace/_protocols.py ─────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Trace storage protocol - interface for trace backends.

Defines the interface that trace storage implementations must follow.
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from typing import Protocol, runtime_checkable

from bond.trace._models import TraceEvent, TraceMeta


@runtime_checkable
class TraceStorageProtocol(Protocol):
    """Protocol for trace storage backends.

    Provides async methods for saving, loading, and managing execution traces.
    All operations are async to support various backend implementations
    (file, database, remote storage).

    Implementations:
        - JSONFileTraceStore: Local JSON file storage (default)
    """

    async def save_event(self, event: TraceEvent) -> None:
        """Append an event to a trace.

        The trace is created if it doesn't exist. Events should be
        saved in order (by sequence number).

        Args:
            event: The trace event to save.

        Raises:
            IOError: If storage fails.
        """
        ...

    async def finalize_trace(
        self,
        trace_id: str,
        status: str = "complete",
    ) -> None:
        """Mark a trace as complete or failed.

        Should be called when the agent run finishes. This updates
        the trace metadata and may trigger cleanup or indexing.

        Args:
            trace_id: The trace to finalize.
            status: Final status ("complete" or "failed").

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If storage fails.
        """
        ...

    def load_trace(self, trace_id: str) -> AsyncIterator[TraceEvent]:
        """Load all events from a trace for replay.

        Yields events in sequence order. Memory-efficient for large traces.
        This is an async generator method.

        Args:
            trace_id: The trace to load.

        Yields:
            TraceEvent objects in sequence order.

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If loading fails.
        """
        ...

    async def list_traces(self, limit: int = 100) -> list[TraceMeta]:
        """List available traces with metadata.

        Returns traces ordered by creation time (newest first).

        Args:
            limit: Maximum number of traces to return.

        Returns:
            List of TraceMeta for available traces.

        Raises:
            IOError: If listing fails.
        """
        ...

    async def delete_trace(self, trace_id: str) -> None:
        """Delete a trace and all its events.

        Args:
            trace_id: The trace to delete.

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If deletion fails.
        """
        ...


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/trace/backends/__init__.py ──────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Trace storage backends.

Provides implementations of TraceStorageProtocol for different
storage systems.
"""

from bond.trace.backends.json_file import JSONFileTraceStore

__all__ = [
    "JSONFileTraceStore",
]


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
───────────────────────────────────────────────────────────── src/bond/trace/backends/json_file.py ─────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""JSON file storage backend for traces.

Stores traces as newline-delimited JSON files with separate metadata files.
"""

from __future__ import annotations

import json
from collections.abc import AsyncIterator
from pathlib import Path

import aiofiles
import aiofiles.os

from bond.trace._models import (
    STATUS_COMPLETE,
    STATUS_IN_PROGRESS,
    TraceEvent,
    TraceMeta,
)


class JSONFileTraceStore:
    """Store traces as JSON files in a directory.

    Each trace consists of two files:
        - {trace_id}.json: Newline-delimited JSON events
        - {trace_id}.meta.json: TraceMeta as JSON

    File Structure:
        {base_path}/
        ├── {trace_id}.json       # Events (one JSON object per line)
        └── {trace_id}.meta.json  # TraceMeta

    Example:
        ```python
        store = JSONFileTraceStore(".bond/traces")
        await store.save_event(event)
        await store.finalize_trace(trace_id)

        async for event in store.load_trace(trace_id):
            print(event)
        ```
    """

    def __init__(self, base_path: Path | str = ".bond/traces") -> None:
        """Initialize JSON file store.

        Args:
            base_path: Directory for trace files. Created if doesn't exist.
        """
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)

    def _events_path(self, trace_id: str) -> Path:
        """Get path to events file for a trace."""
        return self.base_path / f"{trace_id}.json"

    def _meta_path(self, trace_id: str) -> Path:
        """Get path to metadata file for a trace."""
        return self.base_path / f"{trace_id}.meta.json"

    async def save_event(self, event: TraceEvent) -> None:
        """Append event to trace file.

        Creates or updates the metadata file to track event count.
        Uses newline-delimited JSON for efficient streaming reads.

        Args:
            event: The trace event to save.

        Raises:
            IOError: If writing fails.
        """
        events_path = self._events_path(event.trace_id)
        meta_path = self._meta_path(event.trace_id)

        # Append event to events file
        async with aiofiles.open(events_path, "a") as f:
            await f.write(event.model_dump_json() + "\n")

        # Update or create metadata
        if meta_path.exists():
            async with aiofiles.open(meta_path) as f:
                content = await f.read()
                meta_data = json.loads(content)
                meta_data["event_count"] = event.sequence + 1
        else:
            meta_data = {
                "trace_id": event.trace_id,
                "created_at": event.wall_time.isoformat(),
                "event_count": event.sequence + 1,
                "status": STATUS_IN_PROGRESS,
            }

        async with aiofiles.open(meta_path, "w") as f:
            await f.write(json.dumps(meta_data, indent=2))

    async def finalize_trace(
        self,
        trace_id: str,
        status: str = STATUS_COMPLETE,
    ) -> None:
        """Mark a trace as complete or failed.

        Updates the metadata file with the final status.

        Args:
            trace_id: The trace to finalize.
            status: Final status ("complete" or "failed").

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If writing fails.
        """
        meta_path = self._meta_path(trace_id)

        if not meta_path.exists():
            msg = f"Trace not found: {trace_id}"
            raise KeyError(msg)

        async with aiofiles.open(meta_path) as f:
            content = await f.read()
            meta_data = json.loads(content)

        meta_data["status"] = status

        async with aiofiles.open(meta_path, "w") as f:
            await f.write(json.dumps(meta_data, indent=2))

    async def load_trace(self, trace_id: str) -> AsyncIterator[TraceEvent]:
        """Load all events from a trace for replay.

        Yields events in sequence order. Memory-efficient for large traces
        since it streams line by line.

        Args:
            trace_id: The trace to load.

        Yields:
            TraceEvent objects in sequence order.

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If reading fails.
        """
        events_path = self._events_path(trace_id)

        if not events_path.exists():
            msg = f"Trace not found: {trace_id}"
            raise KeyError(msg)

        async with aiofiles.open(events_path) as f:
            async for line in f:
                line = line.strip()
                if line:
                    yield TraceEvent.model_validate_json(line)

    async def list_traces(self, limit: int = 100) -> list[TraceMeta]:
        """List available traces with metadata.

        Returns traces ordered by creation time (newest first).

        Args:
            limit: Maximum number of traces to return.

        Returns:
            List of TraceMeta for available traces.

        Raises:
            IOError: If listing fails.
        """
        traces: list[TraceMeta] = []

        # Find all meta files
        for meta_path in self.base_path.glob("*.meta.json"):
            try:
                async with aiofiles.open(meta_path) as f:
                    content = await f.read()
                    meta_data = json.loads(content)
                    traces.append(TraceMeta.model_validate(meta_data))
            except (json.JSONDecodeError, KeyError):
                # Skip malformed meta files
                continue

        # Sort by creation time (newest first)
        traces.sort(key=lambda m: m.created_at, reverse=True)

        return traces[:limit]

    async def delete_trace(self, trace_id: str) -> None:
        """Delete a trace and all its files.

        Removes both the events file and metadata file.

        Args:
            trace_id: The trace to delete.

        Raises:
            KeyError: If trace_id doesn't exist.
            IOError: If deletion fails.
        """
        events_path = self._events_path(trace_id)
        meta_path = self._meta_path(trace_id)

        if not events_path.exists() and not meta_path.exists():
            msg = f"Trace not found: {trace_id}"
            raise KeyError(msg)

        if events_path.exists():
            await aiofiles.os.remove(events_path)

        if meta_path.exists():
            await aiofiles.os.remove(meta_path)

    async def get_trace_meta(self, trace_id: str) -> TraceMeta | None:
        """Get metadata for a specific trace.

        Args:
            trace_id: The trace to get metadata for.

        Returns:
            TraceMeta if found, None otherwise.
        """
        meta_path = self._meta_path(trace_id)

        if not meta_path.exists():
            return None

        async with aiofiles.open(meta_path) as f:
            content = await f.read()
            meta_data = json.loads(content)
            return TraceMeta.model_validate(meta_data)


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────── src/bond/trace/capture.py ───────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Capture handler factory for trace recording.

Creates StreamHandlers that record all events to a trace storage backend
for later replay and analysis.
"""

from __future__ import annotations

import asyncio
import time
import uuid
from datetime import UTC, datetime
from typing import Any

from bond.agent import StreamHandlers
from bond.trace._models import (
    EVENT_BLOCK_END,
    EVENT_BLOCK_START,
    EVENT_COMPLETE,
    EVENT_TEXT_DELTA,
    EVENT_THINKING_DELTA,
    EVENT_TOOL_CALL_DELTA,
    EVENT_TOOL_EXECUTE,
    EVENT_TOOL_RESULT,
    STATUS_COMPLETE,
    TraceEvent,
)
from bond.trace._protocols import TraceStorageProtocol


def create_capture_handlers(
    storage: TraceStorageProtocol,
    trace_id: str | None = None,
) -> tuple[StreamHandlers, str]:
    """Create handlers that capture all events to storage.

    Returns handlers that can be passed to agent.ask() along with the
    trace ID for later replay. All 8 StreamHandlers callbacks are wired
    to record events with sequence numbers for ordering.

    Args:
        storage: Backend to store events (e.g., JSONFileTraceStore).
        trace_id: Optional trace ID (auto-generated UUID if None).

    Returns:
        Tuple of (handlers, trace_id) - use handlers with agent.ask(),
        keep trace_id for later replay.

    Example:
        ```python
        store = JSONFileTraceStore()
        handlers, trace_id = create_capture_handlers(store)
        result = await agent.ask("query", handlers=handlers)
        await finalize_capture(store, trace_id)

        # Later: replay with trace_id
        async for event in store.load_trace(trace_id):
            print(event)
        ```
    """
    if trace_id is None:
        trace_id = str(uuid.uuid4())

    # Mutable state for closure
    sequence = [0]  # List to allow mutation in nested function
    start_time = time.monotonic()

    def _record(event_type: str, payload: dict[str, Any]) -> None:
        """Record an event to storage.

        Called from sync callbacks, schedules async save.
        """
        event = TraceEvent(
            trace_id=trace_id,
            sequence=sequence[0],
            timestamp=time.monotonic() - start_time,
            wall_time=datetime.now(UTC),
            event_type=event_type,
            payload=payload,
        )
        sequence[0] += 1

        # Schedule async save from sync callback
        try:
            loop = asyncio.get_running_loop()
            loop.create_task(storage.save_event(event))
        except RuntimeError:
            # No running loop - shouldn't happen in normal usage
            pass

    return (
        StreamHandlers(
            on_block_start=lambda kind, idx: _record(
                EVENT_BLOCK_START,
                {"kind": kind, "index": idx},
            ),
            on_block_end=lambda kind, idx: _record(
                EVENT_BLOCK_END,
                {"kind": kind, "index": idx},
            ),
            on_text_delta=lambda text: _record(
                EVENT_TEXT_DELTA,
                {"text": text},
            ),
            on_thinking_delta=lambda text: _record(
                EVENT_THINKING_DELTA,
                {"text": text},
            ),
            on_tool_call_delta=lambda name, args: _record(
                EVENT_TOOL_CALL_DELTA,
                {"name": name, "args": args},
            ),
            on_tool_execute=lambda tool_id, name, args: _record(
                EVENT_TOOL_EXECUTE,
                {"id": tool_id, "name": name, "args": args},
            ),
            on_tool_result=lambda tool_id, name, result: _record(
                EVENT_TOOL_RESULT,
                {"id": tool_id, "name": name, "result": result},
            ),
            on_complete=lambda data: _record(
                EVENT_COMPLETE,
                {"data": data},
            ),
        ),
        trace_id,
    )


async def finalize_capture(
    storage: TraceStorageProtocol,
    trace_id: str,
    status: str = STATUS_COMPLETE,
) -> None:
    """Mark a trace as complete after agent.ask() returns.

    Should be called after the agent run finishes to update the trace
    metadata with the final status.

    Args:
        storage: The storage backend used for capture.
        trace_id: The trace ID from create_capture_handlers().
        status: Final status ("complete" or "failed").

    Example:
        ```python
        handlers, trace_id = create_capture_handlers(store)
        try:
            result = await agent.ask("query", handlers=handlers)
            await finalize_capture(store, trace_id, "complete")
        except Exception:
            await finalize_capture(store, trace_id, "failed")
            raise
        ```
    """
    await storage.finalize_trace(trace_id, status)


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
─────────────────────────────────────────────────────────────────── src/bond/trace/replay.py ───────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Trace replayer for stepping through stored events.

Provides TraceReplayer class for iterating through stored traces
event by event, with support for manual stepping and seeking.
"""

from __future__ import annotations

from collections.abc import AsyncIterator

from bond.trace._models import TraceEvent
from bond.trace._protocols import TraceStorageProtocol


class TraceReplayer:
    """Replay a stored trace event by event.

    Supports both async iteration and manual stepping through events.
    Events are loaded on-demand and cached for stepping operations.

    Example (async iteration):
        replayer = TraceReplayer(storage, trace_id)
        async for event in replayer:
            print(f"{event.event_type}: {event.payload}")

    Example (manual stepping):
        replayer = TraceReplayer(storage, trace_id)
        while event := await replayer.step():
            print(event)
            await asyncio.sleep(event.timestamp)  # Replay at original timing

    Example (seeking):
        replayer = TraceReplayer(storage, trace_id)
        await replayer.seek(10)  # Jump to event 10
        event = await replayer.step()
    """

    def __init__(self, storage: TraceStorageProtocol, trace_id: str) -> None:
        """Initialize replayer for a trace.

        Args:
            storage: Backend containing the trace.
            trace_id: ID of the trace to replay.
        """
        self.storage = storage
        self.trace_id = trace_id
        self._events: list[TraceEvent] | None = None
        self._position: int = 0

    async def _load(self) -> None:
        """Load all events into memory for stepping.

        Called automatically by step/seek operations.
        """
        if self._events is None:
            self._events = [e async for e in self.storage.load_trace(self.trace_id)]

    async def __aiter__(self) -> AsyncIterator[TraceEvent]:
        """Iterate through all events.

        Streams directly from storage without loading all events
        into memory first.

        Yields:
            TraceEvent objects in sequence order.
        """
        async for event in self.storage.load_trace(self.trace_id):
            yield event

    async def step(self) -> TraceEvent | None:
        """Get the next event in the trace.

        Returns:
            The next TraceEvent, or None if at end of trace.
        """
        await self._load()
        assert self._events is not None
        if self._position >= len(self._events):
            return None
        event = self._events[self._position]
        self._position += 1
        return event

    async def step_back(self) -> TraceEvent | None:
        """Go back one event.

        Returns:
            The previous TraceEvent, or None if at start of trace.
        """
        await self._load()
        assert self._events is not None
        if self._position <= 0:
            return None
        self._position -= 1
        return self._events[self._position]

    @property
    def position(self) -> int:
        """Current position in trace (0-indexed).

        Returns:
            The current event index.
        """
        return self._position

    @property
    def total_events(self) -> int | None:
        """Total number of events in trace.

        Returns:
            Event count if loaded, None if not yet loaded.
        """
        return len(self._events) if self._events is not None else None

    async def seek(self, position: int) -> TraceEvent | None:
        """Jump to a specific position in the trace.

        Args:
            position: The event index to seek to (0-indexed).

        Returns:
            The event at that position, or None if position is at/past end.
        """
        await self._load()
        assert self._events is not None
        self._position = max(0, min(position, len(self._events)))
        if self._position < len(self._events):
            return self._events[self._position]
        return None

    async def reset(self) -> None:
        """Reset to the beginning of the trace."""
        self._position = 0

    async def current(self) -> TraceEvent | None:
        """Get the current event without advancing position.

        Returns:
            The current TraceEvent, or None if at end.
        """
        await self._load()
        assert self._events is not None
        if self._position < len(self._events):
            return self._events[self._position]
        return None


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
────────────────────────────────────────────────────────────────────── src/bond/utils.py ───────────────────────────────────────────────────────────────────────


────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
"""Utility functions for Bond agents.

Includes helpers for WebSocket/SSE streaming integration.
"""

from collections.abc import Awaitable, Callable
from typing import Any, Protocol

from bond.agent import StreamHandlers


class WebSocketProtocol(Protocol):
    """Protocol for WebSocket-like objects."""

    async def send_json(self, data: dict[str, Any]) -> None:
        """Send JSON data over the WebSocket."""
        ...


def create_websocket_handlers(
    send: Callable[[dict[str, Any]], Awaitable[None]],
) -> StreamHandlers:
    """Create StreamHandlers that send events over WebSocket/SSE.

    This creates handlers that serialize all streaming events to JSON
    and send them via the provided async send function.

    Args:
        send: Async function to send JSON data (e.g., ws.send_json).

    Returns:
        StreamHandlers configured for WebSocket streaming.

    Example:
        ```python
        async def websocket_handler(ws: WebSocket):
            handlers = create_websocket_handlers(ws.send_json)
            await agent.ask("Check the database", handlers=handlers)
        ```

    Message Types:
        ```json
        {"t": "block_start", "kind": str, "idx": int}
        {"t": "block_end", "kind": str, "idx": int}
        {"t": "text", "c": str}
        {"t": "thinking", "c": str}
        {"t": "tool_delta", "n": str, "a": str}
        {"t": "tool_exec", "id": str, "name": str, "args": dict}
        {"t": "tool_result", "id": str, "name": str, "result": str}
        {"t": "complete", "data": Any}
        ```
    """
    # We need to handle the sync callbacks by scheduling async sends
    import asyncio

    def _send_sync(data: dict[str, Any]) -> None:
        """Schedule async send from sync callback."""
        try:
            loop = asyncio.get_running_loop()
            coro = send(data)
            loop.create_task(coro)  # type: ignore[arg-type]
        except RuntimeError:
            # No running loop - this shouldn't happen in normal usage
            pass

    return StreamHandlers(
        on_block_start=lambda kind, idx: _send_sync(
            {
                "t": "block_start",
                "kind": kind,
                "idx": idx,
            }
        ),
        on_block_end=lambda kind, idx: _send_sync(
            {
                "t": "block_end",
                "kind": kind,
                "idx": idx,
            }
        ),
        on_text_delta=lambda txt: _send_sync(
            {
                "t": "text",
                "c": txt,
            }
        ),
        on_thinking_delta=lambda txt: _send_sync(
            {
                "t": "thinking",
                "c": txt,
            }
        ),
        on_tool_call_delta=lambda name, args: _send_sync(
            {
                "t": "tool_delta",
                "n": name,
                "a": args,
            }
        ),
        on_tool_execute=lambda tool_id, name, args: _send_sync(
            {
                "t": "tool_exec",
                "id": tool_id,
                "name": name,
                "args": args,
            }
        ),
        on_tool_result=lambda tool_id, name, result: _send_sync(
            {
                "t": "tool_result",
                "id": tool_id,
                "name": name,
                "result": result,
            }
        ),
        on_complete=lambda data: _send_sync(
            {
                "t": "complete",
                "data": data,
            }
        ),
    )


def create_sse_handlers(
    send: Callable[[str, dict[str, Any]], Awaitable[None]],
) -> StreamHandlers:
    r"""Create StreamHandlers for Server-Sent Events (SSE).

    Similar to WebSocket handlers but uses SSE event format.

    Args:
        send: Async function to send SSE event (event_type, data).

    Returns:
        StreamHandlers configured for SSE streaming.

    Example:
        ```python
        async def sse_handler(request):
            async def send_sse(event: str, data: dict):
                await response.write(f"event: {event}\\ndata: {json.dumps(data)}\\n\\n")

            handlers = create_sse_handlers(send_sse)
            await agent.ask("Query", handlers=handlers)
        ```
    """
    import asyncio

    def _send_sync(event: str, data: dict[str, Any]) -> None:
        try:
            loop = asyncio.get_running_loop()
            coro = send(event, data)
            loop.create_task(coro)  # type: ignore[arg-type]
        except RuntimeError:
            pass

    return StreamHandlers(
        on_block_start=lambda kind, idx: _send_sync("block_start", {"kind": kind, "idx": idx}),
        on_block_end=lambda kind, idx: _send_sync("block_end", {"kind": kind, "idx": idx}),
        on_text_delta=lambda txt: _send_sync("text", {"content": txt}),
        on_thinking_delta=lambda txt: _send_sync("thinking", {"content": txt}),
        on_tool_call_delta=lambda n, a: _send_sync("tool_delta", {"name": n, "args": a}),
        on_tool_execute=lambda i, n, a: _send_sync("tool_exec", {"id": i, "name": n, "args": a}),
        on_tool_result=lambda i, n, r: _send_sync("tool_result", {"id": i, "name": n, "result": r}),
        on_complete=lambda data: _send_sync("complete", {"data": data}),
    )


def create_print_handlers(
    *,
    show_thinking: bool = False,
    show_tool_args: bool = False,
) -> StreamHandlers:
    """Create StreamHandlers that print to console.

    Useful for CLI applications and debugging.

    Args:
        show_thinking: Whether to print thinking/reasoning content.
        show_tool_args: Whether to print tool argument deltas.

    Returns:
        StreamHandlers configured for console output.

    Example:
        ```python
        handlers = create_print_handlers(show_thinking=True)
        await agent.ask("Hello", handlers=handlers)
        ```
    """
    return StreamHandlers(
        on_block_start=lambda kind, idx: print(f"\n[{kind} block #{idx}]", end=""),
        on_text_delta=lambda txt: print(txt, end="", flush=True),
        on_thinking_delta=(
            (lambda txt: print(f"[think: {txt}]", end="", flush=True)) if show_thinking else None
        ),
        on_tool_call_delta=(
            (lambda n, a: print(f"[tool: {n}{a}]", end="", flush=True)) if show_tool_args else None
        ),
        on_tool_execute=lambda i, name, args: print(f"\n[Running {name}...]", flush=True),
        on_tool_result=lambda i, name, res: print(
            f"[{name} returned: {res[:100]}{'...' if len(res) > 100 else ''}]",
            flush=True,
        ),
        on_complete=lambda data: print("\n[Complete]", flush=True),
    )
