Metadata-Version: 2.4
Name: blend-agents-observability
Version: 1.0.2
Summary: Observability logger for Blend360 internal agent workflows.
Author-email: Simon Calderon <simon.calderon@blend360.com>
License-Expression: GPL-3.0-only
Project-URL: Homepage, https://github.com/ai-scm/AgentObservability
Project-URL: Documentation, https://github.com/ai-scm/AgentObservability
Project-URL: Repository, https://github.com/ai-scm/AgentObservability
Project-URL: Issues, https://github.com/ai-scm/AgentObservability/issues
Keywords: observability,agents,ai,monitoring,tracing,blend360
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: boto3>=1.34.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10.0; extra == "dev"
Requires-Dist: hypothesis>=6.0.0; extra == "dev"
Requires-Dist: moto>=5.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: boto3-stubs[kinesis]>=1.34.0; extra == "dev"
Dynamic: license-file

# Blend Agents Observability

[![PyPI version](https://img.shields.io/pypi/v/blend-agents-observability.svg)](https://pypi.org/project/blend-agents-observability/)
[![License](https://img.shields.io/pypi/l/blend-agents-observability.svg)](https://pypi.org/project/blend-agents-observability/)
[![Build Status](https://img.shields.io/circleci/project/github/user/repo/tree/main.svg)](https://circleci.com/gh/user/repo/tree/main)

Enterprise-grade observability library for instrumenting multi-step AI agent systems. Capture, process, and visualize complex agent execution graphs with ease.

> **Note:** This package is owned by **Blend360** and is intended for **internal usage only**.

## Features

- **Manual Instrumentation**: Explicit control over trace creation and node lifecycle for precise observability.
- **AWS Kinesis Integration**: Stream observability events directly to AWS Kinesis Data Streams.
- **Type-Safe Events**: Leverages Pydantic for robust event validation, ensuring data integrity.
- **Resilient by Design**: Gracefully handles errors and fails silently, preventing observability from impacting your application'\''s stability.
- **Detailed Agent Tracking**: Capture fine-grained details of agent execution, including reasoning steps and tool usage.
- **Parallel Workflow Support**: Built-in support for tracing parallel execution branches and sub-traces.

## Installation

```bash
pip install blend-agents-observability
```

## Quick Start

Here is an example of how to use the logger with Strands Agents:

```python
from observability_logger import AgentLogger, generate_id
from strands import Agent, tool

# 1. Create a trace
trace_id = generate_id("trace_")
logger = AgentLogger(
    trace_id=trace_id,
    title="My Workflow",
    workflow_id="workflow-v1"
)

# 2. Create an agent node
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Assistant", "description": "Helpful agent"}
)

# 3. Execute agent with callback handler
# The agent_node acts as a callback handler to capture streaming events
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

# 4. End the trace
logger.end(status="completed", final_output={"result": result.message})
```

## Concepts

### Traces

A trace represents a single workflow execution. It contains nodes (operations) and edges (connections between nodes).

```python
logger = AgentLogger(
    trace_id=generate_id("trace_"),  # Unique identifier
    title="My Workflow",              # Human-readable title
    workflow_id="workflow-v1",        # Workflow version identifier
    parent_trace_id=None              # Optional parent for hierarchical traces
)
```

### Nodes

Nodes represent individual operations in the workflow. There are four node types:

#### Agent Node

For AI agent execution with real-time step capture:

```python
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Assistant", "description": "Helpful agent"},
    metadata={"tools": ["search", "calculate"]}
)

# Set input before execution
agent_node.set_input(prompt)

# Execute with callback handler for real-time capture
result = agent(prompt, callback_handler=agent_node)

# Complete with result (extracts output and token usage automatically)
agent_node.complete(result=result)
```

#### Router Node

For routing/branching decisions (entry points):

```python
router = logger.router(
    node_id=generate_id("node_"),
    config={"name": "Workflow Router", "description": "Entry point"},
    metadata={"input_length": len(text)}
)

# ... routing logic ...

router.complete(status="completed")
```

#### Parallel Node

For concurrent execution branches:

```python
parallel = logger.parallel(
    node_id=generate_id("node_"),
    config={"name": "Process Items", "description": "Parallel processing"},
    metadata={"parallel_count": 3}
)

# Link to child trace
child_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    parent_trace_id=logger.trace_id
)
parallel.subTrace(child_logger)

# ... child execution ...

parallel.complete(status="completed")
```

#### Miscellaneous Node

For general operations (auto-completed on creation):

```python
output = logger.miscellaneous(
    node_id=generate_id("node_"),
    config={"name": "Final Output", "description": "Format results"},
    content="Processed 100 items successfully",
    metadata={"item_count": 100}
)
# Node is already completed
```

### Edges

Edges define the execution flow between nodes:

```python
router = logger.router(node_id1, config1)
agent = logger.strands.agent(node_id2, config2)
output = logger.miscellaneous(node_id3, config3, content)

# Create edges
logger.edge(router, agent)
logger.edge(agent, output)
```

## API Reference

### AgentLogger

Main entry point for observability logging.

```python
AgentLogger(
    trace_id: str,                    # Unique trace identifier
    workflow_id: Optional[str],       # Workflow identifier (defaults to trace_id)
    title: Optional[str],             # Human-readable title (defaults to trace_id)
    parent_trace_id: Optional[str],   # Parent trace for hierarchical traces
    auto_create: bool = True          # Auto-emit trace_updated on init
)
```

**Methods:**

| Method | Description |
|--------|-------------|
| `miscellaneous(node_id, config, content, metadata)` | Create miscellaneous node (auto-completed) |
| `parallel(node_id, config, content, metadata)` | Create parallel node |
| `router(node_id, config, content, metadata)` | Create router node |
| `edge(source_node, target_node)` | Create edge between nodes |
| `end(status, final_output)` | End the trace |

### Node Types

#### Common Methods

All nodes share these methods:

| Method | Description |
|--------|-------------|
| `create()` | Create node (called automatically) |
| `complete(status, payload, metadata)` | Complete the node |
| `status` | Current node status property |
| `is_created` | Whether node was created |
| `is_completed` | Whether node was completed |

#### AgentNode Methods

| Method | Description |
|--------|-------------|
| `set_input(text)` | Set agent input prompt |
| `set_output(text)` | Set agent output manually |
| `set_error(error)` | Set error information |
| `set_token_usage(input, output, total)` | Set token metrics |
| `complete(result=result)` | Complete with agent result |
| `__call__(**kwargs)` | Callback handler for streaming |

#### ParallelNode Methods

| Method | Description |
|--------|-------------|
| `subTrace(child_logger)` | Link child trace to parallel node |
| `complete(status, metadata, execution_time_ms)` | Complete with timing |

### Utilities

```python
from observability_logger import generate_id, get_current_timestamp_ms

# Generate unique IDs with prefix
trace_id = generate_id("trace_")  # trace_a1b2c3d4-...
node_id = generate_id("node_")    # node_e5f6g7h8-...

# Get current timestamp in milliseconds
timestamp = get_current_timestamp_ms()  # 1700000000000
```

## Usage Guides

### Callback Mode (Real-time Capture)

Capture agent execution steps in real-time:

```python
from observability_logger import AgentLogger, generate_id
from strands import Agent, tool

# Define tools
@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

# Create agent
agent = Agent(tools=[search], name="SearchAgent")

# Initialize trace and node
logger = AgentLogger(trace_id=generate_id("trace_"), title="Search Workflow")
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Search Agent"}
)

# Execute with callback
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

logger.end(status="completed")
```

### Multi-node Workflows

Create workflows with multiple nodes and edges:

```python
logger = AgentLogger(trace_id=generate_id("trace_"), title="Analysis Workflow")

# Router node (entry point)
router = logger.router(
    node_id=generate_id("node_"),
    config={"name": "Router"}
)
router.complete(status="completed")

# Agent node
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Analyzer"}
)
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

# Output node (auto-completed)
output = logger.miscellaneous(
    node_id=generate_id("node_"),
    config={"name": "Output"},
    content=result.message
)

# Create edges
logger.edge(router, agent_node)
logger.edge(agent_node, output)

logger.end(status="completed")
```

### Hierarchical Traces (Parallel Execution)

Create parent-child trace relationships:

```python
# Parent trace
parent_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    title="Orchestrator"
)

# Parallel node in parent
parallel = parent_logger.parallel(
    node_id=generate_id("node_"),
    config={"name": "Parallel Processing"}
)

# Child trace
child_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    title="Child Workflow",
    parent_trace_id=parent_logger.trace_id
)

# Link child to parallel node
parallel.subTrace(child_logger)

# Execute child workflow
child_agent = child_logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Child Agent"}
)
# ... child execution ...
child_agent.complete(result=result)
child_logger.end(status="completed")

# Complete parallel node
parallel.complete(status="completed")

parent_logger.end(status="completed")
```

## Configuration

### Environment Variables

| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `KINESIS_STREAM_NAME` | Yes | - | Kinesis stream name |
| `AWS_REGION` | No | us-east-1 | AWS region |
| `OBSERVABILITY_LOG_LEVEL` | No | WARNING | Log level |
| `OBSERVABILITY_ENABLE_VALIDATION` | No | true | Enable validation |

### AWS Credentials

The library uses the standard boto3 credential chain:
1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
2. Shared credentials file (`~/.aws/credentials`)
3. IAM role (for EC2/Lambda)

## Error Handling

```python
try:
    logger = AgentLogger(trace_id=generate_id("trace_"), title="My Workflow")

    agent_node = logger.strands.agent(
        node_id=generate_id("node_"),
        config={"name": "Agent"}
    )
    agent_node.set_input(prompt)
    result = agent(prompt, callback_handler=agent_node)
    agent_node.complete(result=result)

    logger.end(status="completed", final_output={"result": result.message})

except Exception as e:
    # Complete with error
    if agent_node and not agent_node.is_completed:
        agent_node.set_error(e)
        agent_node.complete(status="failed")

    logger.end(status="failed", final_output={"error": str(e)})
```

## Node Lifecycle

| Node Type | Auto-Complete | Lifecycle |
|-----------|---------------|-----------|
| agent | No | create() -> set_input() -> execute -> complete() |
| router | No | create() -> complete() |
| parallel | No | create() -> subTrace() -> complete() |
| miscellaneous | Yes | create() (auto-completes) |

## Data Flow

```
Agent Code -> AgentLogger -> Kinesis -> Lambda -> DynamoDB + S3 -> Dashboard
```

- **DynamoDB**: Stores trace/node/edge metadata
- **S3**: Stores node payloads (input, output, steps)
