Metadata-Version: 2.4
Name: rao_agent
Version: 1.0.2.post2344
Summary: Base framework for building RAO (Retrieval-Augmented Orchestrator) agents on the Progress Agentic RAG platform
License: MIT License
Project-URL: Homepage, https://progress.com
Keywords: rag,agent,nuclia,llm,retrieval-augmented-generation,retrieval-augmented-output,ai,generative-ai
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: nuclia
Requires-Dist: nuclia-models
Requires-Dist: nucliadb-sdk
Requires-Dist: nucliadb_telemetry[otel]
Requires-Dist: jinja2
Requires-Dist: aiodns
Requires-Dist: httpx

# rao_agent

[![PyPI version](https://img.shields.io/pypi/v/rao_agent.svg)](https://pypi.org/project/rao_agent/)
[![Python](https://img.shields.io/pypi/pyversions/rao_agent.svg)](https://pypi.org/project/rao_agent/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Base framework for building **RAO (Retrieval-Augmented Orchestrator) agents** on the [PARAG](https://rag.progress.cloud/) platform.

`rao_agent` provides the abstract base classes, data models, and shared utilities that every custom RAO agent needs. It is intentionally minimal - it contains no business logic - so you only pull in the pieces relevant to your agent.

`rao_agent` belongs to Progress Agentic RAG (PARAG). In this README, product naming follows Progress/PARAG branding, while code-level Python package and module names (for example, `nuclia_arag`, `nuclia`, and `nucliadb_*`) are preserved as the current technical namespaces.

See the PARAG documentation for product and ecosystem context: https://docs.rag.progress.cloud/
---

## Table of Contents

- [Installation](#installation)
- [Core Concepts](#core-concepts)
- [Quick Start](#quick-start)
- [Implementing the Abstractions](#implementing-the-abstractions)
  - [Agent](#agent)
  - [ContextAgent](#contextagent)
  - [QuestionMemory](#questionmemory)
  - [Driver](#driver)
- [Agent Registration](#agent-registration)
- [PARAG NUA Primitives](#parag-nua-primitives)
- [Testing with Built-in Fixtures](#testing-with-built-in-fixtures)
- [OpenTelemetry Tracing](#opentelemetry-tracing)
- [License](#license)

---

## Installation

```bash
pip install rao_agent
```

Requires Python 3.10 or later.

### Optional: full Progress Agentic RAG integration

The package works standalone, but installing `nuclia_arag` unlocks centralized agent/driver registration used by the Progress Agentic RAG server:

```bash
pip install rao_agent nuclia_arag
```

When `nuclia_arag` is not installed, a lightweight fallback implementation is used automatically (with a warning logged at import time).

---

## Core Concepts

| Class / Module | File | Purpose |
|---|---|---|
| `Agent` | `agent.py` | Abstract base for all agent types. Implement `__call__` to execute your agent logic. |
| `ContextAgent` | `context/agent.py` | Specialised `Agent` that fetches data from an external source, validates the context against the user's question, and chains to fallback/next agents. Implement `_get_question_context`. |
| `Manager` | `manager.py` | Facade over the PARAG NUA client (implemented via existing `nuclia` namespaces). Provides LLM calls (`execute`, `execute_json`, `execute_raw`), reranking (`rerank`), token counting, and REMI evaluation. |
| `QuestionMemory` | `memory.py` | Conversation state bag. Holds the question, accumulated context, steps, answers, and chat history for one interaction turn. Must be subclassed. |
| `Driver` | `driver.py` | Adaptor for an external data source (database, API, …). Implement `init()` to establish a connection from a `DriverConfig`. |
| pub/sub messages | `pubsub.py` | Pydantic wire-protocol models used between the ARAG API server and agent worker processes (`StartInteraction`, `AgentAnswer`, `AgentDone`, …). |
| `AragAnswer` | `interaction.py` | Streaming response envelope sent from an agent to the client (answer text, citations, steps, data visualisations, OAuth requests, feedback requests, …). |

---

## Quick Start

### Scaffold a new agent

`rao_agent` ships with a CLI tool that generates the boilerplate for a new agent:

```bash
rao_generate MyAgent
```

This creates a directory `myagent/` with a ready-to-edit `pyproject.toml`, `src/rao_agent_myagent/` package, and a `tests/` directory.

### Minimal custom agent

```python
from rao_agent.agent import Agent, AgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import QuestionMemory


class GreetingAgentConfig(AgentConfig):
    greeting: str = "Hello"


class GreetingAgent(Agent[GreetingAgentConfig]):
    async def inner_from_config(self, config: GreetingAgentConfig, agent_id=None):
        pass  # no async setup needed

    async def __call__(self, memory: QuestionMemory, manager: Manager):
        answer = f"{self.config.greeting}, you asked: {memory.original_question}"
        await memory.add_answer(answer, module="greeting", agent_path="/greeting")
        await memory.add_final_answer()
        await memory.send_final_answer()
```

### Minimal context agent

```python
from typing import Optional
from rao_agent.context.agent import ContextAgent
from rao_agent.context.config import ContextAgentConfig
from rao_agent.manager import Manager
from rao_agent.memory import Chunk, Context, QuestionMemory


class MyContextAgentConfig(ContextAgentConfig):
    model_config = {"title": "My Data Source"}
    my_param: str = "default"


class MyContextAgent(ContextAgent):
    agent_description = "Fetches data from My Data Source to answer questions."

    async def _get_question_context(
        self,
        memory: QuestionMemory,
        manager: Manager,
        question_uuid: str,
        question: str,
        flow_id: str,
        extra_context=None,
    ):
        # Fetch data from your source and build a Context object
        chunks = [Chunk(chunk_id="c1", text="Relevant data here...")]
        context = Context(
            original_question_uuid=memory.original_question_uuid,
            actual_question_uuid=question_uuid,
            question=question,
            chunks=chunks,
            source="my_data_source",
            agent=self.agent_id,
        )
        missing = await self.save_ctx_and_return_missing(
            memory=memory,
            manager=manager,
            question=question,
            context=context,
            flow_id=flow_id,
        )
        # Return a list of (uuid, question) for any missing information
        return [missing] if missing else []
```

---

## Implementing the Abstractions

### Agent

`Agent[T_Config]` is a generic base class. `T_Config` must be a subclass of `AgentConfig`.

```python
class AgentConfig(BaseModel):
    id: Optional[str]          # auto-generated UUID if not set
    title: str                 # display name shown in step traces
    rules: Optional[List[str]] # optional rules forwarded to the LLM
    max_retries: int           # default 1
    module: Any                # agent type identifier (set by the framework)
```

**Required methods to implement:**

| Method | Signature | Description |
|---|---|---|
| `inner_from_config` | `async (config, agent_id) -> None` | Async initialisation (e.g. connect to services). Called by `from_config`. |
| `__call__` | `async (memory, manager) -> None` | Main execution entry point. Write context to `memory` and call `memory.add_final_answer()` / `memory.send_final_answer()`. |

**Utility method:**

```python
self.step_title("Fetching data")
# → "My Agent: Fetching data"  (uses config title)
```

### ContextAgent

`ContextAgent` extends `Agent` with context retrieval, validation, and chaining.

**Required method to implement:**

| Method | Signature | Description |
|---|---|---|
| `_get_question_context` | `async (memory, manager, question_uuid, question, flow_id, extra_context) -> list[tuple[str, str]]` | Fetch data, build a `Context`, call `save_ctx_and_return_missing`, return any missing sub-questions as `(uuid, question)` tuples. |

**Optional class attributes:**

```python
class MyContextAgent(ContextAgent):
    agent_description = "One-sentence description used by the rephrase LLM."
    exposed_functions: Optional[List[str]] = None  # function names to expose to other agents
```

**Configuration fields** (`ContextAgentConfig`):

| Field | Default | Description |
|---|---|---|
| `context_validation_model` | `"chatgpt-azure-4o-mini"` | LLM used to validate context and attempt an answer |
| `rephrase_model` | `"chatgpt-azure-4o-mini"` | LLM used to rephrase questions when chaining agents |
| `context_aware_rephrasing_prompt` | `None` | Custom system prompt for the rephrase step |
| `prune_context` | `True` | Remove non-cited chunks after context validation |
| `fallback` | `None` | `ContextAgentConfig` for a fallback agent |
| `next_agent` | `None` | `ContextAgentConfig` for the next agent in a chain |

### QuestionMemory

`QuestionMemory` holds all state for a single interaction turn. It is **abstract** — you must subclass it and implement all `@abc.abstractmethod` methods.

**Required abstract methods:**

| Method | Return type | Description |
|---|---|---|
| `context_user_info()` | `str` | String describing the current user (for LLM personalisation) |
| `get_session_id()` | `str` | Unique identifier for the conversation session |
| `get_agent_id()` | `str` | Identifier of the active agent |
| `get_workflow_id()` | `str` | Identifier of the active workflow |
| `get_rules()` | `list[Rule \| str]` | Rules to inject into LLM prompts |
| `search_in_questions()` | `KnowledgeboxFindResults` | Semantic search over conversation history |
| `user_info()` | `Dict[str, str]` | Arbitrary user metadata |
| `set_session_source()` | — | Persist a `Source` for the session |
| `get_session_source()` | `Optional[Source]` | Retrieve a persisted `Source` |
| `context_history()` | `tuple[str, int]` | Formatted history string + token count |
| `get_chat_history()` | `list[Message]` | Full chat history for the LLM |
| `save_context()` | — | Persist a `Context` object |
| `save_image_urls()` | — | Persist image URLs |
| `get_agent_contexts()` | `list[Context]` | Retrieve saved contexts for an agent |
| `get_agent_answer_summaries()` | `list[str]` | Retrieve answer summaries for an agent |
| `list_contexts_markdown()` | `list[str]` | Contexts as markdown strings |
| `list_chunks_markdown()` | `list[str]` | Individual chunks as markdown strings |
| `contexts_markdown()` | `str` | Concatenated contexts as markdown |
| `list_contexts_minimal()` | `list[str]` | Minimal (summary-preferred) context list |
| `contexts_minimal()` | `str` | Concatenated minimal contexts |
| `get_prompt_texts()` | `list[str]` | Rendered prompt texts for all contexts |
| `add_generated_text()` | — | Store raw LLM output |
| `add_step()` | — | Record an intermediate reasoning step |
| `set_actual_question()` | — | Update the current working question |
| `add_future_questions()` | — | Store follow-up questions for the user |
| `add_context_questions()` | — | Store sub-questions generated during retrieval |
| `get_questions()` | `list[tuple[str, str]]` | Return context sub-questions (or the original) |
| `add_answer()` | — | Record a candidate answer |
| `add_final_answer()` | — | Finalise the answer |
| `send_final_answer()` | — | Dispatch the final answer to the user |
| `send_oauth()` | — | Initiate an OAuth flow |
| `recv_oauth_callback()` | `Optional[str]` | Receive OAuth credentials |
| `send_feedback()` | `Optional[UserToAgentInteraction]` | Send a feedback/confirmation prompt to the user |
| `save()` | — | Persist memory to durable storage |

A complete mock implementation is available in `rao_agent.fixtures.QuestionMemory` — useful as a starting point for tests or development.

### Driver

A `Driver` connects to an external data source. Subclass `Driver` and implement `init()`:

```python
from typing import Any, Self
from rao_agent.driver import Driver, DriverConfig, EncryptedPayload
from pydantic import BaseModel


class MyDBConfig(EncryptedPayload):
    connection_string: str


class MyDBDriver(Driver):
    name: str = "my_db"
    provider: str = "my_db"
    connection: Any = None  # your actual connection object

    @classmethod
    async def init(cls, driver: DriverConfig) -> Self:
        instance = cls(name=driver.name, provider=driver.provider)
        instance.connection = await connect_to_db(driver.config.connection_string)
        return instance
```

Register the driver so the `Manager` can discover it (see [Agent Registration](#agent-registration)).

---

## Agent Registration

The `@agent` and `@driver` decorators from `rao_agent.configure` register your classes with the framework:

```python
from rao_agent.configure import agent, driver

@agent(id="my_agent", config_schema=MyContextAgentConfig)
class MyContextAgent(ContextAgent):
    ...

@driver(id="my_db", config_schema=MyDBConfig)
class MyDBDriver(Driver):
    ...
```

When `nuclia_arag` is installed, registration is global and used by the PARAG ARAG server to instantiate agents from stored configuration. Without it, the lightweight fallback stores registrations in module-level dicts (`AGENTS`, `DRIVERS`).

---

## PARAG NUA Primitives

`rao_agent` uses PARAG NUA primitives through `Manager`. These types are part of the execution contract between your agent code and PARAG's generation/evaluation APIs.

| Primitive | Source | Used by |
|---|---|---|
| `ChatModel` | `nuclia.lib.nua_responses` | `Manager.execute_raw()` and low-level generation payloads |
| `UserPrompt` | `nuclia.lib.nua_responses` | `Manager.execute()`, `execute_json()`, `execute_from_context()` |
| `Message` | `nuclia.lib.nua_responses` | Chat history in `Manager.execute()` |
| `Image` | `nuclia.lib.nua_responses` | Vision/context images in execute methods |
| `Tokens` | `nuclia.lib.nua_responses` | `Manager.tokens_predict()` |
| `RerankModel`, `RerankResponse` | `nuclia.lib.nua_responses` | `Manager.rerank()` |
| `RemiRequest`, `RemiResponse` | `nuclia_models.predict.remi` | `Manager.remi()` |

Minimal flow from an agent to NUA:

```python
answer, input_tokens, output_tokens, code = await manager.execute(
    prompt="Answer the question using available context.",
    user_id=memory.get_session_id(),
    model="chatgpt-azure-4o-mini",
    chat_history=await memory.get_chat_history(),
)
```

Compatibility note:

- These primitives come from upstream PARAG runtime libraries, currently published under `nuclia`-prefixed package names (`nuclia`, `nuclia-models`).
- Changes in those libraries can affect payload formats or validation behavior.
- Keep `rao_agent` and PARAG runtime dependency versions aligned in production deployments.

---

## Testing with Built-in Fixtures

`rao_agent.fixtures` provides pytest fixtures for testing your agents without a live PARAG account:

```python
# conftest.py
from rao_agent.fixtures import question_memory, manager  # re-export fixtures

# test_my_agent.py
import pytest
from my_agent import MyContextAgent, MyContextAgentConfig


@pytest.mark.asyncio
async def test_my_agent(question_memory):
    question_memory.original_question = "What is Progress Agentic RAG?"

    config = MyContextAgentConfig(
        module="my_agent",
        context_validation_model="chatgpt-azure-4o-mini",
    )
    agent = await MyContextAgent.from_config(config)

    # manager fixture requires NUA_KEY env var and a driver config
    # For unit tests, mock the manager instead:
    from unittest.mock import AsyncMock
    mock_manager = AsyncMock()
    mock_manager.execute_json.return_value = (
        {"answer": "Progress Agentic RAG is a retrieval-augmented platform.", "useful": "yes",
         "missing_info_query": "", "reason": "context matches", "citations": []},
        10.0, 5.0,
    )

    await agent(question_memory, mock_manager)
    assert question_memory.final_answer is not None
```

The `manager` fixture (parameterised) requires a `NUA_KEY` environment variable and a driver config tuple:

```python
@pytest.mark.asyncio
@pytest.mark.parametrize("manager", [("my_db", {"connection_string": "..."})], indirect=True)
async def test_with_real_manager(question_memory, manager):
    ...
```

---

## OpenTelemetry Tracing

Wrap any agent method with `@trace_agent` to get automatic span creation and metrics:

```python
from rao_agent.trace import trace_agent

class MyContextAgent(ContextAgent):
    @trace_agent
    async def _get_question_context(self, ...):
        ...
```

The tracer reads `RAO_SERVICE_NAME` from the environment (default: `"nuclia_arag_server"`) and integrates with `nucliadb_telemetry`.

---

## License

MIT License. See [LICENSE](https://opensource.org/licenses/MIT) for details.
