# AgentOps

> AgentOps is the developer favorite platform for testing, debugging, and deploying AI agents and LLM apps. Monitor, analyze, and optimize your agent workflows with comprehensive observability and analytics.

## Repository Overview

  Observability and DevTool platform for AI Agents

AgentOps helps developers build, evaluate, and monitor AI agents. From prototype to production.

## Key Integrations 

## Quick Start 

```bash
pip install agentops
```

#### Session replays in 2 lines of code

Initialize the AgentOps client and automatically get analytics on all your LLM calls.

[Get an API key](https://app.agentops.ai/settings/projects)

```python
import agentops

# Beginning of your program (i.e. main.py, __init__.py)
agentops.init( )

...

# End of program
agentops.end_session('Success')
```

All your sessions can be viewed on the [AgentOps dashboard](https://app.agentops.ai?ref=gh)

  Agent Debugging
  
  Session Replays
  
  Summary Analytics
  
### First class Developer Experience
Add powerful observability to your agents, tools, and functions with as little code as possible: one line at a time.

Refer to our [documentation](http://docs.agentops.ai)

```python
# Create a session span (root for all other spans)
from agentops.sdk.decorators import session

@session
def my_workflow():
    # Your session code here
    return result
```

```python
# Create an agent span for tracking agent operations
from agentops.sdk.decorators import agent

@agent
class MyAgent:
    def __init__(self, name):
        self.name = name
        
    # Agent methods here
```

```python
# Create operation/task spans for tracking specific operations
from agentops.sdk.decorators import operation, task

@operation  # or @task
def process_data(data):
    # Process the data
    return result
```

```python
# Create workflow spans for tracking multi-operation workflows
from agentops.sdk.decorators import workflow

@workflow
def my_workflow(data):
    # Workflow implementation
    return result
```

```python
# Nest decorators for proper span hierarchy
from agentops.sdk.decorators import session, agent, operation

@agent
class MyAgent:
    @operation
    def nested_operation(self, message):
        return f"Processed: {message}"
        
    @operation
    def main_operation(self):
        result = self.nested_operation("test message")
        return result

@session
def my_session():
    agent = MyAgent()
    return agent.main_operation()
```

All decorators support:
- Input/Output Recording
- Exception Handling
- Async/await functions
- Generator functions
- Custom attributes and names

## Integrations 

### OpenAI Agents SDK 

Build multi-agent systems with tools, handoffs, and guardrails. AgentOps natively integrates with the OpenAI Agents SDKs for both Python and TypeScript.

#### Python

```bash
pip install openai-agents
```

- [Python integration guide](https://docs.agentops.ai/v2/integrations/openai_agents_python)
- [OpenAI Agents Python documentation](https://openai.github.io/openai-agents-python/)

#### TypeScript

```bash
npm install agentops @openai/agents
```

- [TypeScript integration guide](https://docs.agentops.ai/v2/integrations/openai_agents_js)
- [OpenAI Agents JS documentation](https://openai.github.io/openai-agents-js)

### CrewAI 

Build Crew agents with observability in just 2 lines of code. Simply set an `AGENTOPS_API_KEY` in your environment, and your crews will get automatic monitoring on the AgentOps dashboard.

```bash
pip install 'crewai[agentops]'
```

- [AgentOps integration example](https://docs.agentops.ai/v1/integrations/crewai)
- [Official CrewAI documentation](https://docs.crewai.com/how-to/AgentOps-Observability)

### AG2 
With only two lines of code, add full observability and monitoring to AG2 (formerly AutoGen) agents. Set an `AGENTOPS_API_KEY` in your environment and call `agentops.init()`

- [AG2 Observability Example](https://docs.ag2.ai/notebooks/agentchat_agentops)
- [AG2 - AgentOps Documentation](https://docs.ag2.ai/docs/ecosystem/agentops)

### Camel AI 

Track and analyze CAMEL agents with full observability. Set an `AGENTOPS_API_KEY` in your environment and initialize AgentOps to get started.

- [Camel AI](https://www.camel-ai.org/) - Advanced agent communication framework
- [AgentOps integration example](https://docs.agentops.ai/v1/integrations/camel)
- [Official Camel AI documentation](https://docs.camel-ai.org/cookbooks/agents_tracking.html)

  Installation

```bash
pip install "camel-ai[all]==0.2.11"
pip install agentops
```

```python
import os
import agentops
from camel.agents import ChatAgent
from camel.messages import BaseMessage
from camel.models import ModelFactory
from camel.types import ModelPlatformType, ModelType

# Initialize AgentOps
agentops.init(os.getenv("AGENTOPS_API_KEY"), tags=["CAMEL Example"])

# Import toolkits after AgentOps init for tracking
from camel.toolkits import SearchToolkit

# Set up the agent with search tools
sys_msg = BaseMessage.make_assistant_message(
    role_name='Tools calling operator',
    content='You are a helpful assistant'
)

# Configure tools and model
tools = [*SearchToolkit().get_tools()]
model = ModelFactory.create(
    model_platform=ModelPlatformType.OPENAI,
    model_type=ModelType.GPT_4O_MINI,
)

# Create and run the agent
camel_agent = ChatAgent(
    system_message=sys_msg,
    model=model,
    tools=tools,
)

response = camel_agent.step("What is AgentOps?")
print(response)

agentops.end_session("Success")
```

Check out our [Camel integration guide](https://docs.agentops.ai/v1/integrations/camel) for more examples including multi-agent scenarios.

### Langchain 

AgentOps works seamlessly with applications built using Langchain. To use the handler, install Langchain as an optional dependency:

  Installation
  
```shell
pip install agentops[langchain]
```

To use the handler, import and set

```python
import os
from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from agentops.integration.callbacks.langchain import LangchainCallbackHandler

AGENTOPS_API_KEY = os.environ['AGENTOPS_API_KEY']
handler = LangchainCallbackHandler(api_key=AGENTOPS_API_KEY, tags=['Langchain Example'])

llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY,
                 callbacks=[handler],
                 model='gpt-3.5-turbo')

agent = initialize_agent(tools,
                         llm,
                         agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,
                         verbose=True,
                         callbacks=[handler], # You must pass in a callback handler to record your agent
                         handle_parsing_errors=True)
```

Check out the [Langchain Examples Notebook](https://github.com/AgentOps-AI/agentops/blob/main/examples/langchain/langchain_examples.ipynb) for more details including Async handlers.

### Cohere 

First class support for Cohere(>=5.4.0). This is a living integration, should you need any added functionality please message us on Discord!

- [AgentOps integration example](https://docs.agentops.ai/v1/integrations/cohere)
- [Official Cohere documentation](https://docs.cohere.com/reference/about)

  Installation
  
```bash
pip install cohere
```

```python python
import cohere
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()
co = cohere.Client()

chat = co.chat(
    message="Is it pronounced ceaux-hear or co-hehray?"
)

print(chat)

agentops.end_session('Success')
```

```python python
import cohere
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()

co = cohere.Client()

stream = co.chat_stream(
    message="Write me a haiku about the synergies between Cohere and AgentOps"
)

for event in stream:
    if event.event_type == "text-generation":
        print(event.text, end='')

agentops.end_session('Success')
```

### Anthropic 

Track agents built with the Anthropic Python SDK (>=0.32.0).

- [AgentOps integration guide](https://docs.agentops.ai/v1/integrations/anthropic)
- [Official Anthropic documentation](https://docs.anthropic.com/en/docs/welcome)

  Installation
  
```bash
pip install anthropic
```

```python python
import anthropic
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()

client = anthropic.Anthropic(
    # This is the default and can be omitted
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
)

message = client.messages.create(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Tell me a cool fact about AgentOps",
            }
        ],
        model="claude-3-opus-20240229",
    )
print(message.content)

agentops.end_session('Success')
```

Streaming
```python python
import anthropic
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()

client = anthropic.Anthropic(
    # This is the default and can be omitted
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
)

stream = client.messages.create(
    max_tokens=1024,
    model="claude-3-opus-20240229",
    messages=[
        {
            "role": "user",
            "content": "Tell me something cool about streaming agents",
        }
    ],
    stream=True,
)

response = ""
for event in stream:
    if event.type == "content_block_delta":
        response += event.delta.text
    elif event.type == "message_stop":
        print("\n")
        print(response)
        print("\n")
```

Async

```python python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic(
    # This is the default and can be omitted
    api_key=os.environ.get("ANTHROPIC_API_KEY"),
)

async def main() -> None:
    message = await client.messages.create(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "Tell me something interesting about async agents",
            }
        ],
        model="claude-3-opus-20240229",
    )
    print(message.content)

await main()
```

### Mistral 

Track agents built with the Mistral Python SDK (>=0.32.0).

- [AgentOps integration example](https://github.com/AgentOps-AI/agentops/blob/main/examples/mistral/mistral_example.ipynb)
- [Official Mistral documentation](https://docs.mistral.ai)

  Installation
  
```bash
pip install mistralai
```

Sync

```python python
from mistralai import Mistral
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()

client = Mistral(
    # This is the default and can be omitted
    api_key=os.environ.get("MISTRAL_API_KEY"),
)

message = client.chat.complete(
        messages=[
            {
                "role": "user",
                "content": "Tell me a cool fact about AgentOps",
            }
        ],
        model="open-mistral-nemo",
    )
print(message.choices[0].message.content)

agentops.end_session('Success')
```

Streaming

```python python
from mistralai import Mistral
import agentops

# Beginning of program's code (i.e. main.py, __init__.py)
agentops.init()

client = Mistral(
    # This is the default and can be omitted
    api_key=os.environ.get("MISTRAL_API_KEY"),
)

message = client.chat.stream(
        messages=[
            {
                "role": "user",
                "content": "Tell me something cool about streaming agents",
            }
        ],
        model="open-mistral-nemo",
    )

response = ""
for event in message:
    if event.data.choices[0].finish_reason == "stop":
        print("\n")
        print(response)
        print("\n")
    else:
        response += event.text

agentops.end_session('Success')
```

Async

```python python
import asyncio
from mistralai import Mistral

client = Mistral(
    # This is the default and can be omitted
    api_key=os.environ.get("MISTRAL_API_KEY"),
)

async def main() -> None:
    message = await client.chat.complete_async(
        messages=[
            {
                "role": "user",
                "content": "Tell me something interesting about async agents",
            }
        ],
        model="open-mistral-nemo",
    )
    print(message.choices[0].message.content)

await main()
```

Async Streaming

```python python
import asyncio
from mistralai import Mistral

client = Mistral(
    # This is the default and can be omitted
    api_key=os.environ.get("MISTRAL_API_KEY"),
)

async def main() -> None:
    message = await client.chat.stream_async(
        messages=[
            {
                "role": "user",
                "content": "Tell me something interesting about async streaming agents",
            }
        ],
        model="open-mistral-nemo",
    )

    response = ""
    async for event in message:
        if event.data.choices[0].finish_reason == "stop":
            print("\n")
            print(response)
            print("\n")
        else:
            response += event.text

await main()
```

### CamelAI 

Track agents built with the CamelAI Python SDK (>=0.32.0).

- [CamelAI integration guide](https://docs.camel-ai.org/cookbooks/agents_tracking.html#)
- [Official CamelAI documentation](https://docs.camel-ai.org/index.html)

  Installation
  
```bash
pip install camel-ai[all]
pip install agentops
```

```python python
#Import Dependencies
import agentops
import os
from getpass import getpass
from dotenv import load_dotenv

#Set Keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY") or ""
agentops_api_key = os.getenv("AGENTOPS_API_KEY") or ""

```

[You can find usage examples here!](https://github.com/AgentOps-AI/agentops/blob/main/examples/camelai_examples/README.md).

### LiteLLM 

AgentOps provides support for LiteLLM(>=1.3.1), allowing you to call 100+ LLMs using the same Input/Output Format. 

- [AgentOps integration example](https://docs.agentops.ai/v1/integrations/litellm)
- [Official LiteLLM documentation](https://docs.litellm.ai/docs/providers)

  Installation
  
```bash
pip install litellm
```

```python python
# Do not use LiteLLM like this
# from litellm import completion
# ...
# response = completion(model="claude-3", messages=messages)

# Use LiteLLM like this
import litellm
...
response = litellm.completion(model="claude-3", messages=messages)
# or
response = await litellm.acompletion(model="claude-3", messages=messages)
```

### LlamaIndex 

AgentOps works seamlessly with applications built using LlamaIndex, a framework for building context-augmented generative AI applications with LLMs.

  Installation
  
```shell
pip install llama-index-instrumentation-agentops
```

To use the handler, import and set

```python
from llama_index.core import set_global_handler

# NOTE: Feel free to set your AgentOps environment variables (e.g., 'AGENTOPS_API_KEY')
# as outlined in the AgentOps documentation, or pass the equivalent keyword arguments
# anticipated by AgentOps' AOClient as **eval_params in set_global_handler.

set_global_handler("agentops")
```

Check out the [LlamaIndex docs](https://docs.llamaindex.ai/en/stable/module_guides/observability/?h=agentops#agentops) for more details.

### Llama Stack 

AgentOps provides support for Llama Stack Python Client(>=0.0.53), allowing you to monitor your Agentic applications. 

- [AgentOps integration example 1](https://github.com/AgentOps-AI/agentops/pull/530/files/65a5ab4fdcf310326f191d4b870d4f553591e3ea#diff-fdddf65549f3714f8f007ce7dfd1cde720329fe54155d54389dd50fbd81813cb)
- [AgentOps integration example 2](https://github.com/AgentOps-AI/agentops/pull/530/files/65a5ab4fdcf310326f191d4b870d4f553591e3ea#diff-6688ff4fb7ab1ce7b1cc9b8362ca27264a3060c16737fb1d850305787a6e3699)
- [Official Llama Stack Python Client](https://github.com/meta-llama/llama-stack-client-python)

### SwarmZero AI 

Track and analyze SwarmZero agents with full observability. Set an `AGENTOPS_API_KEY` in your environment and initialize AgentOps to get started.

- [SwarmZero](https://swarmzero.ai) - Advanced multi-agent framework
- [AgentOps integration example](https://docs.agentops.ai/v1/integrations/swarmzero)
- [SwarmZero AI integration example](https://docs.swarmzero.ai/examples/ai-agents/build-and-monitor-a-web-search-agent)
- [SwarmZero AI - AgentOps documentation](https://docs.swarmzero.ai/sdk/observability/agentops)
- [Official SwarmZero Python SDK](https://github.com/swarmzero/swarmzero)

  Installation

```bash
pip install swarmzero
pip install agentops
```

```python
from dotenv import load_dotenv
load_dotenv()

import agentops
agentops.init()

from swarmzero import Agent, Swarm
# ...
```

## Evaluations Roadmap 

## Debugging Roadmap 

### Why AgentOps? 

Without the right tools, AI agents are slow, expensive, and unreliable. Our mission is to bring your agent from prototype to production. Here's why AgentOps stands out:

- **Comprehensive Observability**: Track your AI agents' performance, user interactions, and API usage.
- **Real-Time Monitoring**: Get instant insights with session replays, metrics, and live monitoring tools.
- **Cost Control**: Monitor and manage your spend on LLM and API calls.
- **Failure Detection**: Quickly identify and respond to agent failures and multi-agent interaction issues.
- **Tool Usage Statistics**: Understand how your agents utilize external tools with detailed analytics.
- **Session-Wide Metrics**: Gain a holistic view of your agents' sessions with comprehensive statistics.

AgentOps is designed to make agent observability, testing, and monitoring easy.

## Star History

Check out our growth in the community:

## Popular projects using AgentOps

_Generated using [github-dependents-info](https://github.com/nvuillam/github-dependents-info), by [Nicolas Vuillamy](https://github.com/nvuillam)_


## Contributing Guide

# Contributing to AgentOps

Thanks for checking out AgentOps. We're building tools to help developers like you make AI agents that actually work reliably. If you've ever tried to build an agent system, you know the pain - they're a nightmare to debug, impossible to monitor, and when something goes wrong... good luck figuring out why.

We created AgentOps to solve these headaches, and we'd love your help making it even better. Our SDK hooks into all the major Python frameworks (AG2, CrewAI, LangChain) and LLM providers (OpenAI, Anthropic, Cohere, etc.) to give you visibility into what your agents are actually doing.

## How You Can Help

There are tons of ways to contribute, and we genuinely appreciate all of them:

1. **Add More Providers**: Help us support new LLM providers. Each one helps more developers monitor their agents.
2. **Improve Framework Support**: Using a framework we don't support yet? Help us add it!
3. **Make Docs Better**: Found our docs confusing? Help us fix them! Clear documentation makes everyone's life easier.
4. **Share Your Experience**: Using AgentOps? Let us know what's working and what isn't. Your feedback shapes our roadmap.

Even if you're not ready to contribute code, we'd love to hear your thoughts. Drop into our Discord, open an issue, or start a discussion. We're building this for developers like you, so your input matters.

## Table of Contents
- [Getting Started](https://github.com/AgentOps-AI/agentops/blob/main/README.md#getting-started)
- [Development Environment](https://github.com/AgentOps-AI/agentops/blob/main/README.md#development-environment)
- [Testing](https://github.com/AgentOps-AI/agentops/blob/main/README.md#testing)
- [Adding LLM Providers](https://github.com/AgentOps-AI/agentops/blob/main/README.md#adding-llm-providers)
- [Code Style](https://github.com/AgentOps-AI/agentops/blob/main/README.md#code-style)
- [Pull Request Process](https://github.com/AgentOps-AI/agentops/blob/main/README.md#pull-request-process)
- [Documentation](https://github.com/AgentOps-AI/agentops/blob/main/README.md#documentation)

## Getting Started

1. **Fork and Clone**:
   First, fork the repository by clicking the 'Fork' button in the top right of the [AgentOps repository](https://github.com/AgentOps-AI/agentops). This creates your own copy of the repository where you can make changes.

   Then clone your fork:
   ```bash
   git clone https://github.com/YOUR_USERNAME/agentops.git
   cd agentops
   ```

   Add the upstream repository to stay in sync:
   ```bash
   git remote add upstream https://github.com/AgentOps-AI/agentops.git
   git fetch upstream
   ```

   Before starting work on a new feature:
   ```bash
   git checkout main
   git pull upstream main
   git checkout -b feature/your-feature-name
   ```

2. **Install Dependencies**:
   ```bash
   pip install -e .
   ```

3. **Set Up Pre-commit Hooks**:
   ```bash
   pre-commit install
   ```

## Development Environment

1. **Environment Variables**:
   Create a `.env` file:
   ```
   AGENTOPS_API_KEY=your_api_key
   OPENAI_API_KEY=your_openai_key  # For testing
   ANTHROPIC_API_KEY=your_anthropic_key  # For testing
   # Other keys...
   ```

2. **Virtual Environment**:
   We recommend using `poetry` or `venv`:
   ```bash
   python -m venv venv
   source venv/bin/activate  # Unix
   .\venv\Scripts\activate   # Windows
   ```

3. **Pre-commit Setup**:
   We use pre-commit hooks to automatically format and lint code. Set them up with:
   ```bash
   pip install pre-commit
   pre-commit install
   ```

   That's it! The hooks will run automatically when you commit. To manually check all files:
   ```bash
   pre-commit run --all-files
   ```

## Testing

We use a comprehensive testing stack to ensure code quality and reliability. Our testing framework includes pytest and several specialized testing tools.

### Testing Dependencies

Install all testing dependencies:
```bash
pip install -e ".[dev]"
```

We use the following testing packages:
- `pytest==7.4.0`: Core testing framework
- `pytest-depends`: Manage test dependencies
- `pytest-asyncio`: Test async code
- `pytest-vcr`: Record and replay HTTP interactions
- `pytest-mock`: Mocking functionality
- `pyfakefs`: Mock filesystem operations
- `requests_mock==1.11.0`: Mock HTTP requests

### Using Tox

We use tox to automate and standardize testing. Tox:
- Creates isolated virtual environments for testing
- Tests against multiple Python versions (3.7-3.12)
- Runs all test suites consistently
- Ensures dependencies are correctly specified
- Verifies the package installs correctly

Run tox:
```bash
tox
```

This will:
1. Create fresh virtual environments
2. Install dependencies
3. Run pytest with our test suite
4. Generate coverage reports

### Running Tests

1. **Run All Tests**:
   ```bash
   tox
   ```

2. **Run Specific Test File**:
   ```bash
   pytest tests/llms/test_anthropic.py -v
   ```

3. **Run with Coverage**:
   ```bash
   coverage run -m pytest
   coverage report
   ```

### Writing Tests

1. **Test Structure**:
   ```python
   import pytest
   from pytest_mock import MockerFixture
   from unittest.mock import Mock, patch

   @pytest.mark.asyncio  # For async tests
   async def test_async_function():
       # Test implementation

   @pytest.mark.depends(on=['test_prerequisite'])  # Declare test dependencies
   def test_dependent_function():
       # Test implementation
   ```

2. **Recording HTTP Interactions**:
   ```python
   @pytest.mark.vcr()  # Records HTTP interactions
   def test_api_call():
       response = client.make_request()
       assert response.status_code == 200
   ```

3. **Mocking Filesystem**:
   ```python
   def test_file_operations(fs):  # fs fixture provided by pyfakefs
       fs.create_file('/fake/file.txt', contents='test')
       assert os.path.exists('/fake/file.txt')
   ```

4. **Mocking HTTP Requests**:
   ```python
   def test_http_client(requests_mock):
       requests_mock.get('http://api.example.com', json={'key': 'value'})
       response = make_request()
       assert response.json()['key'] == 'value'
   ```

### Testing Best Practices

1. **Test Categories**:
   - Unit tests: Test individual components
   - Integration tests: Test component interactions
   - End-to-end tests: Test complete workflows
   - Performance tests: Test response times and resource usage

2. **Fixtures**:
   Create reusable test fixtures in `conftest.py`:
   ```python
   @pytest.fixture
   def mock_llm_client():
       client = Mock()
       client.chat.completions.create.return_value = Mock()
       return client
   ```

3. **Test Data**:
   - Store test data in `tests/data/`
   - Use meaningful test data names
   - Document data format and purpose

4. **VCR Cassettes**:
   - Store in `tests/cassettes/`
   - Sanitize sensitive information
   - Update cassettes when API changes

### CI Testing Strategy

We use Jupyter notebooks as integration tests for LLM providers. This approach:
- Tests real-world usage patterns
- Verifies end-to-end functionality
- Ensures examples stay up-to-date
- Tests against actual LLM APIs

1. **Notebook Tests**:
   - Located in `examples/` directory
   - Each LLM provider has example notebooks
   - CI runs notebooks on PR merges to main
   - Tests run against multiple Python versions

2. **Test Workflow**:
   The `test-notebooks.yml` workflow:
   ```yaml
   name: Test Notebooks
   on:
     pull_request:
       paths:
         - "agentops/**"
         - "examples/**"
         - "tests/**"
   ```
   - Runs on PR merges and manual triggers
   - Sets up environment with provider API keys
   - Installs AgentOps from main branch
   - Executes each notebook
   - Excludes specific notebooks that require manual testing

3. **Provider Coverage**:
   Each provider should have notebooks demonstrating:
   - Basic completion calls
   - Streaming responses
   - Async operations (if supported)
   - Error handling
   - Tool usage (if applicable)

4. **Adding Provider Tests**:
   - Create notebook in `examples/provider_name/`
   - Include all provider functionality
   - Add necessary secrets to GitHub Actions
   - Update `exclude_notebooks` in workflow if manual testing needed

## Adding LLM Providers

The `agentops/llms/` directory contains provider implementations. Each provider must:

1. **Inherit from BaseProvider**:
   ```python
   @singleton
   class NewProvider(BaseProvider):
       def __init__(self, client):
           super().__init__(client)
           self._provider_name = "ProviderName"
   ```

2. **Implement Required Methods**:
   - `handle_response()`: Process LLM responses
   - `override()`: Patch the provider's methods
   - `undo_override()`: Restore original methods

3. **Handle Events**:
   Track:
   - Prompts and completions
   - Token usage
   - Timestamps
   - Errors
   - Tool usage (if applicable)

4. **Example Implementation Structure**:
   ```python
   def handle_response(self, response, kwargs, init_timestamp, session=None):
       llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
       try:
           # Process response
           llm_event.returns = response.model_dump()
           llm_event.prompt = kwargs["messages"]
           # ... additional processing
           self._safe_record(session, llm_event)
       except Exception as e:
           self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e))
   ```

## Code Style

1. **Formatting**:
   - Use Black for Python code formatting
   - Maximum line length: 88 characters
   - Use type hints

2. **Documentation**:
   - Docstrings for all public methods
   - Clear inline comments
   - Update relevant documentation

3. **Error Handling**:
   - Use specific exception types
   - Log errors with meaningful messages
   - Include context in error messages

## Pull Request Process

1. **Branch Naming**:
   - `feature/description`
   - `fix/description`
   - `docs/description`

2. **Commit Messages**:
   - Clear and descriptive
   - Reference issues when applicable

3. **PR Requirements**:
   - Pass all tests
   - Maintain or improve code coverage
   - Include relevant documentation
   - Update CHANGELOG.md if applicable

4. **Review Process**:
   - At least one approval required
   - Address all review comments
   - Maintain PR scope

## Documentation

1. **Types of Documentation**:
   - API reference
   - Integration guides
   - Examples
   - Troubleshooting guides

2. **Documentation Location**:
   - Code documentation in docstrings
   - User guides in `docs/`
   - Examples in `examples/`

3. **Documentation Style**:
   - Clear and concise
   - Include code examples
   - Explain the why, not just the what

## Getting Help & Community

We encourage active community participation and are here to help!

### Preferred Communication Channels

1. **GitHub Issues & Discussions**:
   - Open an [issue](https://github.com/AgentOps-AI/agentops/issues) for:
     - Bug reports
     - Feature requests
     - Documentation improvements
   - Start a [discussion](https://github.com/AgentOps-AI/agentops/discussions) for:
     - Questions about usage
     - Ideas for new features
     - Community showcase
     - General feedback

2. **Discord Community**:
   - Join our [Discord server](https://discord.gg/FagdcwwXRR) for:
     - Real-time help
     - Community discussions
     - Feature announcements
     - Sharing your projects

3. **Contact Form**:
   - For private inquiries, use our [contact form](https://agentops.ai/contact)
   - Please note that public channels are preferred for technical discussions

## License

By contributing to AgentOps, you agree that your contributions will be licensed under the MIT License.


## Core SDK Implementation

### agentops/__init__.py

```python
# For backwards compatibility
from agentops.legacy import (
    start_session,
    end_session,
    track_agent,
    track_tool,
    end_all_sessions,
    Session,
    ToolEvent,
    ErrorEvent,
    ActionEvent,
    LLMEvent,
)  # type: ignore

# Import all required modules at the top
from opentelemetry.trace import get_current_span
from agentops.semconv import (
    AgentAttributes,
    ToolAttributes,
    WorkflowAttributes,
    CoreAttributes,
    SpanKind,
    SpanAttributes,
)
import json
from typing import List, Optional, Union, Dict, Any
from agentops.client import Client
from agentops.sdk.core import TraceContext, tracer
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail, track_endpoint
from agentops.enums import TraceState, SUCCESS, ERROR, UNSET
from opentelemetry.trace.status import StatusCode

from agentops.logging.config import logger
from agentops.helpers.deprecation import deprecated, warn_deprecated_param
import threading

# Import validation functions
from agentops.validation import validate_trace_spans, print_validation_summary, ValidationError

# Thread-safe client management
_client_lock = threading.Lock()
_client = None


def get_client() -> Client:
    """Get the singleton client instance in a thread-safe manner"""
    global _client

    # Double-checked locking pattern for thread safety
    if _client is None:
        with _client_lock:
            if _client is None:
                _client = Client()

    return _client


@deprecated("Automatically tracked in v4.")
def record(event):
    """
    Legacy function to record an event. This is kept for backward compatibility.

    In the current version, this simply sets the end_timestamp on the event.

    Args:
        event: The event to record
    """
    from agentops.helpers.time import get_ISO_time

    # TODO: Manual timestamp assignment is a temporary fix; should use proper event lifecycle
    if event and hasattr(event, "end_timestamp"):
        event.end_timestamp = get_ISO_time()

    return event


def init(
    api_key: Optional[str] = None,
    endpoint: Optional[str] = None,
    app_url: Optional[str] = None,
    max_wait_time: Optional[int] = None,
    max_queue_size: Optional[int] = None,
    tags: Optional[List[str]] = None,
    default_tags: Optional[List[str]] = None,
    trace_name: Optional[str] = None,
    instrument_llm_calls: Optional[bool] = None,
    auto_start_session: Optional[bool] = None,
    auto_init: Optional[bool] = None,
    skip_auto_end_session: Optional[bool] = None,
    env_data_opt_out: Optional[bool] = None,
    log_level: Optional[Union[str, int]] = None,
    fail_safe: Optional[bool] = None,
    log_session_replay_url: Optional[bool] = None,
    exporter_endpoint: Optional[str] = None,
    **kwargs,
):
    """
    Initializes the AgentOps SDK.

    Args:
        api_key (str, optional): API Key for AgentOps services. If none is provided, key will
            be read from the AGENTOPS_API_KEY environment variable.
        endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will
            be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'.
        app_url (str, optional): The dashboard URL for the AgentOps app. If none is provided, key will
            be read from the AGENTOPS_APP_URL environment variable. Defaults to 'https://app.agentops.ai'.
        max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue.
            Defaults to 5,000 (5 seconds)
        max_queue_size (int, optional): The maximum size of the event queue. Defaults to 512.
        tags (List[str], optional): [Deprecated] Use `default_tags` instead.
        default_tags (List[str], optional): Default tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]).
        trace_name (str, optional): Name for the default trace/session. If none is provided, defaults to "default".
        instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents.
        auto_start_session (bool): Whether to start a session automatically when the client is created.
        auto_init (bool): Whether to automatically initialize the client on import. Defaults to True.
        skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making
            (i.e. Crew determining when tasks are complete and ending the session)
        env_data_opt_out (bool): Whether to opt out of collecting environment data.
        log_level (str, int): The log level to use for the client. Defaults to 'CRITICAL'.
        fail_safe (bool): Whether to suppress errors and continue execution when possible.
        log_session_replay_url (bool): Whether to log session replay URLs to the console. Defaults to True.
        exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will
            be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable.
        **kwargs: Additional configuration parameters to be passed to the client.
    """
    global _client

    # Check for deprecated parameters and emit warnings
    if tags is not None:
        warn_deprecated_param("tags", "default_tags")

    # Merge tags and default_tags if both are provided
    merged_tags = None
    if tags and default_tags:
        merged_tags = list(set(tags + default_tags))
    elif tags:
        merged_tags = tags
    elif default_tags:
        merged_tags = default_tags

    # Check if in a Jupyter Notebook (manual start/end_trace())
    try:
        get_ipython().__class__.__name__ == "ZMQInteractiveShell"  # type: ignore
        auto_start_session = False
    except NameError:
        pass

    # Prepare initialization arguments
    init_kwargs = {
        "api_key": api_key,
        "endpoint": endpoint,
        "app_url": app_url,
        "max_wait_time": max_wait_time,
        "max_queue_size": max_queue_size,
        "default_tags": merged_tags,
        "trace_name": trace_name,
        "instrument_llm_calls": instrument_llm_calls,
        "auto_start_session": auto_start_session,
        "auto_init": auto_init,
        "skip_auto_end_session": skip_auto_end_session,
        "env_data_opt_out": env_data_opt_out,
        "log_level": log_level,
        "fail_safe": fail_safe,
        "log_session_replay_url": log_session_replay_url,
        "exporter_endpoint": exporter_endpoint,
        **kwargs,
    }

    # Get the current client instance (creates new one if needed)
    client = get_client()

    # Initialize the client directly
    return client.init(**init_kwargs)


def configure(**kwargs):
    """Update client configuration

    Args:
        **kwargs: Configuration parameters. Supported parameters include:
            - api_key: API Key for AgentOps services
            - endpoint: The endpoint for the AgentOps service
            - app_url: The dashboard URL for the AgentOps app
            - max_wait_time: Maximum time to wait in milliseconds before flushing the queue
            - max_queue_size: Maximum size of the event queue
            - default_tags: Default tags for the sessions
            - instrument_llm_calls: Whether to instrument LLM calls
            - auto_start_session: Whether to start a session automatically
            - skip_auto_end_session: Don't automatically end session
            - env_data_opt_out: Whether to opt out of collecting environment data
            - log_level: The log level to use for the client
            - fail_safe: Whether to suppress errors and continue execution
            - exporter: Custom span exporter for OpenTelemetry trace data
            - processor: Custom span processor for OpenTelemetry trace data
            - exporter_endpoint: Endpoint for the exporter
    """
    global _client

    # List of valid parameters that can be passed to configure
    valid_params = {
        "api_key",
        "endpoint",
        "app_url",
        "max_wait_time",
        "max_queue_size",
        "default_tags",
        "instrument_llm_calls",
        "auto_start_session",
        "skip_auto_end_session",
        "env_data_opt_out",
        "log_level",
        "fail_safe",
        "exporter",
        "processor",
        "exporter_endpoint",
    }

    # Check for invalid parameters
    invalid_params = set(kwargs.keys()) - valid_params
    if invalid_params:
        logger.warning(f"Invalid configuration parameters: {invalid_params}")

    client = get_client()
    client.configure(**kwargs)


def start_trace(
    trace_name: str = "session", tags: Optional[Union[Dict[str, Any], List[str]]] = None
) -> Optional[TraceContext]:
    """
    Starts a new trace (root span) and returns its context.
    This allows for multiple concurrent, user-managed traces.

    Args:
        trace_name: Name for the trace (e.g., "session", "my_custom_task").
        tags: Optional tags to attach to the trace span (list of strings or dict).

    Returns:
        A TraceContext object containing the span and context token, or None if SDK not initialized.
    """
    if not tracer.initialized:
        # Optionally, attempt to initialize the client if not already, or log a more severe warning.
        # For now, align with legacy start_session that would try to init.
        # However, explicit init is preferred before starting traces.
        logger.warning("AgentOps SDK not initialized. Attempting to initialize with defaults before starting trace.")
        try:
            init()  # Attempt to initialize with environment variables / defaults
            if not tracer.initialized:
                logger.error("SDK initialization failed. Cannot start trace.")
                return None
        except Exception as e:
            logger.error(f"SDK auto-initialization failed during start_trace: {e}. Cannot start trace.")
            return None

    return tracer.start_trace(trace_name=trace_name, tags=tags)


def end_trace(
    trace_context: Optional[TraceContext] = None, end_state: Union[TraceState, StatusCode, str] = TraceState.SUCCESS
) -> None:
    """
    Ends a trace (its root span) and finalizes it.
    If no trace_context is provided, ends all active session spans.

    Args:
        trace_context: The TraceContext object returned by start_trace. If None, ends all active traces.
        end_state: The final state of the trace (e.g., "Success", "Indeterminate", "Error").
    """
    if not tracer.initialized:
        logger.warning("AgentOps SDK not initialized. Cannot end trace.")
        return
    tracer.end_trace(trace_context=trace_context, end_state=end_state)


def update_trace_metadata(metadata: Dict[str, Any], prefix: str = "trace.metadata") -> bool:
    """
    Update metadata on the current running trace.

    Args:
        metadata: Dictionary of key-value pairs to set as trace metadata.
                 Values must be strings, numbers, booleans, or lists of these types.
                 Lists are converted to JSON string representation.
                 Keys can be either custom keys or semantic convention aliases.
        prefix: Prefix for metadata attributes (default: "trace.metadata").
               Ignored for semantic convention attributes.

    Returns:
        bool: True if metadata was successfully updated, False otherwise.

    """
    if not tracer.initialized:
        logger.warning("AgentOps SDK not initialized. Cannot update trace metadata.")
        return False

    # Build semantic convention mappings dynamically
    def build_semconv_mappings():
        """Build mappings from user-friendly keys to semantic convention attributes."""
        mappings = {}

        # Helper function to extract attribute name from semantic convention
        def extract_key_from_attr(attr_value: str) -> str:
            parts = attr_value.split(".")
            if len(parts) >= 2:
                # Handle special cases
                if parts[0] == "error":
                    # error.type -> error_type
                    return "_".join(parts)
                else:
                    # Default: entity.attribute -> entity_attribute
                    return "_".join(parts)
            return attr_value

        # Process each semantic convention class
        for cls in [AgentAttributes, ToolAttributes, WorkflowAttributes, CoreAttributes, SpanAttributes]:
            for attr_name, attr_value in cls.__dict__.items():
                if not attr_name.startswith("_") and isinstance(attr_value, str):
                    # Skip gen_ai attributes
                    if attr_value.startswith("gen_ai."):
                        continue

                    # Generate user-friendly key
                    user_key = extract_key_from_attr(attr_value)
                    mappings[user_key] = attr_value

                    # Add some additional convenience mappings
                    if attr_value == CoreAttributes.TAGS:
                        mappings["tags"] = attr_value

        return mappings

    # Build mappings if using semantic conventions
    SEMCONV_MAPPINGS = build_semconv_mappings()

    # Collect all valid semantic convention attributes
    VALID_SEMCONV_ATTRS = set()
    for cls in [AgentAttributes, ToolAttributes, WorkflowAttributes, CoreAttributes, SpanAttributes]:
        for key, value in cls.__dict__.items():
            if not key.startswith("_") and isinstance(value, str):
                # Include all attributes except gen_ai ones
                if not value.startswith("gen_ai."):
                    VALID_SEMCONV_ATTRS.add(value)

    # Find the current trace span
    span = None

    # Get the current span from OpenTelemetry context
    current_span = get_current_span()

    # Check if the current span is valid and recording
    if current_span and hasattr(current_span, "is_recording") and current_span.is_recording():
        # Check if this is a trace/session span or a child span
        span_name = getattr(current_span, "name", "")

        # If it's a session/trace span, use it directly
        if span_name.endswith(f".{SpanKind.SESSION}"):
            span = current_span
        else:
            # It's a child span, try to find the root trace span
            # Get all active traces
            active_traces = tracer.get_active_traces()
            if active_traces:
                # Find the trace that contains the current span
                current_trace_id = current_span.get_span_context().trace_id

                for trace_id_str, trace_ctx in active_traces.items():
                    try:
                        # Convert hex string back to int for comparison
                        trace_id = int(trace_id_str, 16)
                        if trace_id == current_trace_id:
                            span = trace_ctx.span
                            break
                    except (ValueError, AttributeError):
                        continue

                # If we couldn't find the parent trace, use the current span
                if not span:
                    span = current_span
            else:
                # No active traces, use the current span
                span = current_span

    # If no current span or it's not recording, check active traces
    if not span:
        active_traces = tracer.get_active_traces()
        if active_traces:
            # Get the most recently created trace (last in the dict)
            trace_context = list(active_traces.values())[-1]
            span = trace_context.span
            logger.debug("Using most recent active trace for metadata update")
        else:
            logger.warning("No active trace found. Cannot update metadata.")
            return False

    # Ensure the span is recording before updating
    if not span or (hasattr(span, "is_recording") and not span.is_recording()):
        logger.warning("Span is not recording. Cannot update metadata.")
        return False

    # Update the span attributes with the metadata
    try:
        updated_count = 0
        for key, value in metadata.items():
            # Validate the value type
            if value is None:
                continue

            # Convert lists to JSON string representation for OpenTelemetry compatibility
            if isinstance(value, list):
                # Ensure all list items are valid types
                if all(isinstance(item, (str, int, float, bool)) for item in value):
                    value = json.dumps(value)
                else:
                    logger.warning(f"Skipping metadata key '{key}': list contains invalid types")
                    continue
            elif not isinstance(value, (str, int, float, bool)):
                logger.warning(f"Skipping metadata key '{key}': value type {type(value)} not supported")
                continue

            # Determine the attribute key
            attribute_key = key

            # Check if key is already a valid semantic convention attribute
            if key in VALID_SEMCONV_ATTRS:
                # Key is already a valid semantic convention, use as-is
                attribute_key = key
            elif key in SEMCONV_MAPPINGS:
                # It's a user-friendly key, map it to semantic convention
                attribute_key = SEMCONV_MAPPINGS[key]
                logger.debug(f"Mapped '{key}' to semantic convention '{attribute_key}'")
            else:
                # Not a semantic convention, use with prefix
                attribute_key = f"{prefix}.{key}"

            # Set the attribute
            span.set_attribute(attribute_key, value)
            updated_count += 1

        if updated_count > 0:
            logger.debug(f"Successfully updated {updated_count} metadata attributes on trace")
            return True
        else:
            logger.warning("No valid metadata attributes were updated")
            return False

    except Exception as e:
        logger.error(f"Error updating trace metadata: {e}")
        return False


__all__ = [
    # Legacy exports
    "start_session",
    "end_session",
    "track_agent",
    "track_tool",
    "end_all_sessions",
    "Session",
    "ToolEvent",
    "ErrorEvent",
    "ActionEvent",
    "LLMEvent",
    # Modern exports
    "init",
    "start_trace",
    "end_trace",
    "update_trace_metadata",
    "Client",
    "get_client",
    # Decorators
    "trace",
    "session",
    "agent",
    "task",
    "workflow",
    "operation",
    "tool",
    "guardrail",
    "track_endpoint",
    # Enums
    "TraceState",
    "SUCCESS",
    "ERROR",
    "UNSET",
    # Validation
    "validate_trace_spans",
    "print_validation_summary",
    "ValidationError",
]

```

### agentops/client/client.py

```python
import atexit
from typing import Optional, Any

from agentops.client.api import ApiClient
from agentops.config import Config
from agentops.exceptions import NoApiKeyException
from agentops.instrumentation import instrument_all
from agentops.logging import logger
from agentops.logging.config import configure_logging, intercept_opentelemetry_logging
from agentops.sdk.core import TraceContext, tracer
from agentops.legacy import Session

# Global variables to hold the client's auto-started trace and its legacy session wrapper
_client_init_trace_context: Optional[TraceContext] = None
_client_legacy_session_for_init_trace: Optional[Session] = None

# Single atexit handler registered flag
_atexit_registered = False


def _end_init_trace_atexit():
    """Global atexit handler to end the client's auto-initialized trace during shutdown."""
    global _client_init_trace_context, _client_legacy_session_for_init_trace
    if _client_init_trace_context is not None:
        logger.debug("Auto-ending client's init trace during shutdown.")
        try:
            # Use global tracer to end the trace directly
            if tracer.initialized and _client_init_trace_context.span.is_recording():
                tracer.end_trace(_client_init_trace_context, end_state="Shutdown")
        except Exception as e:
            logger.warning(f"Error ending client's init trace during shutdown: {e}")
        finally:
            _client_init_trace_context = None
            _client_legacy_session_for_init_trace = None  # Clear its legacy wrapper too


class Client:
    """Singleton client for AgentOps service"""

    config: Config
    _initialized: bool
    _init_trace_context: Optional[TraceContext] = None  # Stores the context of the auto-started trace
    _legacy_session_for_init_trace: Optional[
        Session
    ] = None  # Stores the legacy Session wrapper for the auto-started trace

    __instance = None  # Class variable for singleton pattern

    api: ApiClient

    def __new__(cls, *args: Any, **kwargs: Any) -> "Client":
        if cls.__instance is None:
            cls.__instance = super(Client, cls).__new__(cls)
            # Initialize instance variables that should only be set once per instance
            cls.__instance._init_trace_context = None
            cls.__instance._legacy_session_for_init_trace = None
        return cls.__instance

    def __init__(self):
        # Initialization of attributes like config, _initialized should happen here if they are instance-specific
        # and not shared via __new__ for a true singleton that can be re-configured.
        # However, the current pattern re-initializes config in init().
        if (
            not hasattr(self, "_initialized") or not self._initialized
        ):  # Ensure init logic runs only once per actual initialization intent
            self.config = Config()  # Initialize config here for the instance
            self._initialized = False
            # self._init_trace_context = None # Already done in __new__
            # self._legacy_session_for_init_trace = None # Already done in __new__

    def init(self, **kwargs: Any) -> None:  # Return type updated to None
        # Recreate the Config object to parse environment variables at the time of initialization
        # This allows re-init with new env vars if needed, though true singletons usually init once.
        self.config = Config()
        self.configure(**kwargs)

        # Only treat as re-initialization if a different non-None API key is explicitly provided
        provided_api_key = kwargs.get("api_key")
        if self.initialized and provided_api_key is not None and provided_api_key != self.config.api_key:
            logger.warning("AgentOps Client being re-initialized with a different API key. This is unusual.")
            # Reset initialization status to allow re-init with new key/config
            self._initialized = False
            if self._init_trace_context and self._init_trace_context.span.is_recording():
                logger.warning("Ending previously auto-started trace due to re-initialization.")
                tracer.end_trace(self._init_trace_context, "Reinitialized")
            self._init_trace_context = None
            self._legacy_session_for_init_trace = None

        if self.initialized:
            logger.debug("AgentOps Client already initialized.")
            # If auto_start_session was true, return the existing legacy session wrapper
            if self.config.auto_start_session:
                return self._legacy_session_for_init_trace
            return None  # If not auto-starting, and already initialized, return None

        if not self.config.api_key:
            raise NoApiKeyException

        configure_logging(self.config)
        intercept_opentelemetry_logging()

        self.api = ApiClient(self.config.endpoint)

        try:
            response = self.api.v3.fetch_auth_token(self.config.api_key)
            if response is None:
                # If auth fails, we cannot proceed with tracer initialization that depends on project_id
                logger.error("Failed to fetch auth token. AgentOps SDK will not be initialized.")
                return None  # Explicitly return None if auth fails
        except Exception as e:
            # Re-raise authentication exceptions so they can be caught by tests and calling code
            logger.error(f"Authentication failed: {e}")
            raise

        self.api.v4.set_auth_token(response["token"])

        tracing_config = self.config.dict()
        tracing_config["project_id"] = response["project_id"]

        tracer.initialize_from_config(tracing_config, jwt=response["token"])

        if self.config.instrument_llm_calls:
            instrument_all()

        # self._initialized = True # Set initialized to True here - MOVED to after trace start attempt

        global _atexit_registered
        if not _atexit_registered:
            atexit.register(_end_init_trace_atexit)  # Register new atexit handler
            _atexit_registered = True

        # Auto-start trace if configured
        if self.config.auto_start_session:
            if self._init_trace_context is None or not self._init_trace_context.span.is_recording():
                logger.debug("Auto-starting init trace.")
                trace_name = self.config.trace_name or "default"
                self._init_trace_context = tracer.start_trace(
                    trace_name=trace_name,
                    tags=list(self.config.default_tags) if self.config.default_tags else None,
                    is_init_trace=True,
                )
                if self._init_trace_context:
                    self._legacy_session_for_init_trace = Session(self._init_trace_context)

                    # For backward compatibility, also update the global references in legacy and client modules
                    # These globals are what old code might have been using via agentops.legacy.get_session() or similar indirect access.
                    global _client_init_trace_context, _client_legacy_session_for_init_trace
                    _client_init_trace_context = self._init_trace_context
                    _client_legacy_session_for_init_trace = self._legacy_session_for_init_trace

                    # Update legacy module's _current_session and _current_trace_context
                    # This is tricky; direct access to another module's globals is not ideal.
                    # Prefer explicit calls if possible, but for maximum BC:
                    try:
                        import agentops.legacy

                        agentops.legacy._current_session = self._legacy_session_for_init_trace
                        agentops.legacy._current_trace_context = self._init_trace_context
                    except ImportError:
                        pass  # Should not happen

                else:
                    logger.error("Failed to start the auto-init trace.")
                    # Even if auto-start fails, core services up to the tracer might be initialized.
                    # Set self.initialized to True if tracer is up, but return None.
                    self._initialized = tracer.initialized
                    return None  # Failed to start trace

            self._initialized = True  # Successfully initialized and auto-trace started (if configured)
            # For backward compatibility, return the legacy session wrapper when auto_start_session=True
            return self._legacy_session_for_init_trace
        else:
            logger.debug("Auto-start session is disabled. No init trace started by client.")
            self._initialized = True  # Successfully initialized, just no auto-trace
            return None  # No auto-session, so return None

    def configure(self, **kwargs: Any) -> None:
        """Update client configuration"""
        self.config.configure(**kwargs)

    @property
    def initialized(self) -> bool:
        return self._initialized

    @initialized.setter
    def initialized(self, value: bool) -> None:
        if self._initialized and self._initialized != value:
            # Allow re-setting to False if we are intentionally re-initializing
            # This logic is now partly in init() to handle re-init cases
            pass
        self._initialized = value

    # ------------------------------------------------------------
    # Remove the old __instance = None at the end of the class definition if it's a repeat
    # __instance = None # This was a class variable, should be defined once

    # Make _init_trace_context and _legacy_session_for_init_trace accessible
    # to the atexit handler if it becomes a static/class method or needs access
    # For now, the atexit handler is global and uses global vars copied from these.

    # Deprecate and remove the old global _active_session from this module.
    # Consumers should use agentops.start_trace() or rely on the auto-init trace.
    # For a transition, the auto-init trace's legacy wrapper is set to legacy module's globals.


# Ensure the global _active_session (if needed for some very old compatibility) points to the client's legacy session for init trace.
# This specific global _active_session in client.py is problematic and should be phased out.
# For now, _client_legacy_session_for_init_trace is the primary global for the auto-init trace's legacy Session.

# Remove the old global _active_session defined at the top of this file if it's no longer the primary mechanism.
# The new globals _client_init_trace_context and _client_legacy_session_for_init_trace handle the auto-init trace.

```

### agentops/sdk/decorators/__init__.py

```python
"""
Decorators for instrumenting code with AgentOps.
Provides @trace for creating trace-level spans (sessions) and other decorators for nested spans.
"""

from agentops.helpers.deprecation import deprecated
from agentops.sdk.decorators.factory import create_entity_decorator
from agentops.semconv.span_kinds import SpanKind

# Create decorators for specific entity types using the factory
agent = create_entity_decorator(SpanKind.AGENT)
task = create_entity_decorator(SpanKind.TASK)
operation_decorator = create_entity_decorator(SpanKind.OPERATION)
workflow = create_entity_decorator(SpanKind.WORKFLOW)
trace = create_entity_decorator(SpanKind.SESSION)
tool = create_entity_decorator(SpanKind.TOOL)
operation = task
guardrail = create_entity_decorator(SpanKind.GUARDRAIL)
track_endpoint = create_entity_decorator(SpanKind.HTTP)


# For backward compatibility: @session decorator calls @trace decorator
def session(*args, **kwargs):  # noqa: F811
    """@deprecated Use @agentops.trace instead. Wraps the @trace decorator for backward compatibility."""
    # If called as @session or @session(...)
    if not args or not callable(args[0]):  # called with kwargs like @session(name=...)
        return trace(*args, **kwargs)
    else:  # called as @session directly on a function
        return trace(args[0], **kwargs)  # args[0] is the wrapped function


# Apply deprecation decorator to session function
session = deprecated("Use @trace decorator instead.")(session)


# Note: The original `operation = task` was potentially problematic if `operation` was meant to be distinct.
# Using operation_decorator for clarity if a distinct OPERATION kind decorator is needed.
# For now, keeping the alias as it was, assuming it was intentional for `operation` to be `task`.
operation = task

__all__ = [
    "agent",
    "task",
    "workflow",
    "trace",
    "session",
    "operation",
    "tool",
    "guardrail",
    "track_endpoint",
]

```

## Documentation

### v2/introduction.mdx

---
title: "Introduction"
description: "AgentOps is the developer favorite platform for testing, debugging, and deploying AI agents and LLM apps."
---

Prefer asking your IDE? Install the Mintlify MCP Docs Server for AgentOps to chat with the docs while you code:
`npx mint-mcp add agentops`

## Integrate with developer favorite LLM providers and agent frameworks

### Agent Frameworks

  } iconType="image" href="/v2/integrations/ag2" />
  } iconType="image" href="/v2/integrations/agno" />
  } iconType="image" href="/v2/integrations/autogen" />
  } iconType="image" href="/v2/integrations/crewai" />
  } iconType="image" href="/v2/integrations/google_adk" />
  } iconType="image" href="/v2/integrations/langchain" />
  } iconType="image" href="/v2/integrations/openai_agents_python" />
  } iconType="image" href="/v2/integrations/openai_agents_js" />
  } iconType="image" href="/v2/integrations/smolagents" />

### LLM Providers

  } iconType="image" href="/v2/integrations/anthropic" />
  } iconType="image" href="/v2/integrations/google_generative_ai" />
  } iconType="image" href="/v2/integrations/openai" />
  } iconType="image" href="/v2/integrations/litellm" />
  } iconType="image" href="/v2/integrations/ibm_watsonx_ai" />
  } iconType="image" href="/v2/integrations/xai" />
  } iconType="image" href="/v2/integrations/mem0" />

Observability and monitoring for your AI agents and LLM apps. And we do it all in just two lines of code...

	```python python
	import agentops
	agentops.init()
	```

... that logs everything back to your AgentOps Dashboard.

AgentOps is also available for TypeScript/JavaScript applications. Check out our [TypeScript SDK guide](https://github.com/AgentOps-AI/agentops/blob/main/v2/usage/typescript-sdk) for Node.js projects.

That's it! AgentOps will automatically instrument your code and start tracking traces.

Need more control? You can create custom traces using the `@trace` decorator (recommended) or manage traces manually for advanced use cases:

	```python python
	import agentops
	from agentops.sdk.decorators import trace
	
	agentops.init(, auto_start_session=False)
	
	@trace(name="my-workflow", tags=["production"])
	def my_workflow():
		# Your code here
		return "Workflow completed"
	
	```

You can also set a custom trace name during initialization:

	```python python
	import agentops
	agentops.init(, trace_name="custom-trace-name")
	```

## The AgentOps Dashboard

[Give us a star](https://github.com/AgentOps-AI/agentops) to bookmark on GitHub, save for later )

With just two lines of code, you can free yourself from the chains of the terminal and, instead, visualize your agents' behavior
in your AgentOps Dashboard. After setting up AgentOps, each execution of your program is recorded as a session and the above
data is automatically recorded for you.

The examples below were captured with two lines of code.

### Session Drilldown
Here you will find a list of all of your previously recorded sessions and useful data about each such as total execution time.
You also get helpful debugging info such as any SDK versions you were on if you're building on a supported agent framework like Crew or AutoGen.
LLM calls are presented as a familiar chat history view, and charts give you a breakdown of the types of events that were called and how long they took.

Find any past sessions from your Session Drawer.

Most powerful of all is the Session Waterfall. On the left, a time visualization of all your LLM calls, Action events, Tool calls, and Errors.
On the right, specific details about the event you've selected on the waterfall. For instance the exact prompt and completion for a given LLM call. 
Most of which has been automatically recorded for you.

### Session Overview
View a meta-analysis of all of your sessions in a single view.


### v2/quickstart.mdx

---
title: "Quickstart"
description: "Get started with AgentOps in minutes with just 2 lines of code for basic monitoring, and explore powerful decorators for custom tracing."
---

AgentOps is designed for easy integration into your AI agent projects, providing powerful observability with minimal setup. This guide will get you started quickly.

[Give us a star on GitHub!](https://github.com/AgentOps-AI/agentops) Your support helps us grow. 

Prefer asking your IDE? Install the Mintlify MCP Docs Server for AgentOps to chat with the docs while you code:
`npx mint-mcp add agentops`

## Installation
First, install the AgentOps SDK. We recommend including `python-dotenv` for easy API key management.

  ```bash pip 
  pip install agentops python-dotenv
  ```
  ```bash poetry
  poetry add agentops python-dotenv
  ```
  ```bash uv
  uv add agentops python-dotenv
  ```

## Initial Setup (2 Lines of Code)

At its simplest, AgentOps can start monitoring your supported LLM and agent framework calls with just two lines of Python code.

1.  **Import AgentOps**: Add `import agentops` to your script.
2.  **Initialize AgentOps**: Call `agentops.init()` with your API key.

```python Python
import agentops
import os
from dotenv import load_dotenv

# Load environment variables (recommended for API keys)
load_dotenv()

# Initialize AgentOps
# The API key can be passed directly or set as an environment variable AGENTOPS_API_KEY
AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY") 
agentops.init(AGENTOPS_API_KEY) 

# That's it for basic auto-instrumentation!
# If you're using a supported library (like OpenAI, LangChain, CrewAI, etc.),
# AgentOps will now automatically track LLM calls and agent actions.
```

### Setting Your AgentOps API Key
You need an AgentOps API key to send data to your dashboard.
- Get your API key from the [AgentOps Dashboard](https://app.agentops.ai/settings/projects).

It's best practice to set your API key as an environment variable.

  ```bash Export to CLI
  export AGENTOPS_API_KEY="your_agentops_api_key_here"
  ```
  ```txt Set in .env file
  AGENTOPS_API_KEY="your_agentops_api_key_here"
  ```

If you use a `.env` file, make sure `load_dotenv()` is called before `agentops.init()`.

## Running Your Agent & Viewing Traces

After adding the two lines and ensuring your API key is set up:
1.  Run your agent application as you normally would.
2.  AgentOps will automatically instrument supported libraries and send trace data.
3.  Visit your [AgentOps Dashboard](https://app.agentops.ai/traces) to observe your agent's operations!

## Beyond Automatic Instrumentation: Decorators

While AgentOps automatically instruments many popular libraries, you can gain finer-grained control and track custom parts of your code using our powerful decorators. This allows you to define specific operations, group logic under named agents, track tool usage with costs, and create custom traces.

### Tracking Custom Operations with `@operation`
Instrument any function in your code to create spans that track its execution, parameters, and return values. These operations will appear in your session visualization alongside LLM calls.
```python
from agentops.sdk.decorators import operation

@operation
def process_data(data):
    # Your function logic here
    processed_result = data.upper()
    # agentops.record(Events("Processed Data", result=processed_result)) # Optional: record specific events
    return processed_result

# Example usage:
# my_data = "example input"
# output = process_data(my_data)
```

### Tracking Agent Logic with `@agent`
If you structure your system with specific named agents (e.g., classes), use the `@agent` decorator on the class and `@operation` on its methods to group all downstream operations under that agent's context.
```python
from agentops.sdk.decorators import agent, operation

@agent(name="MyCustomAgent") # You can provide a name for the agent
class MyAgent:
    def __init__(self, agent_id):
        self.agent_id = agent_id # agent_id is a reserved parameter for AgentOps
        
    @operation
    def perform_task(self, task_description):
        # Agent task logic here
        # This could include LLM calls or calls to other @operation decorated functions
        return f"Agent {self.agent_id} completed: {task_description}"

# Example usage:
# research_agent = MyAgent(agent_id="researcher-001")
# result = research_agent.perform_task("Analyze market trends")
```

### Tracking Tools with `@tool`
Track the usage of specific tools or functions, and optionally associate costs with them. This data will be aggregated in your dashboard.
```python
from agentops.sdk.decorators import tool

@tool(name="WebSearchTool", cost=0.05) # Cost is optional
def web_search(query: str) -> str:
    # Tool logic here
    return f"Search results for: {query}"
    
@tool # No cost specified
def calculator(expression: str) -> str:
    try:
        return str(eval(expression))
    except Exception as e:
        return f"Error: {e}"

# Example usage:
# search_result = web_search("AgentOps features")
# calculation = calculator("2 + 2")
```

### Grouping with Traces (`@trace` or manual)
Create custom traces to group a sequence of operations or define logical units of work. You can use the `@trace` decorator or manage traces manually for more complex scenarios.
If `auto_start_session=False` in `agentops.init()`, you must use `@trace` or `agentops.start_trace()` for any data to be recorded.

```python
from agentops.sdk.decorators import trace
# Assuming MyAgent and web_search are defined as above

# Option 1: Using the @trace decorator
@trace(name="MyMainWorkflow", tags=["main-flow"])
def my_workflow_decorated(task_to_perform):
    # Your workflow code here
    main_agent = MyAgent(agent_id="workflow-agent") # Assuming MyAgent is defined
    result = main_agent.perform_task(task_to_perform)
    # Example of using a tool within the trace
    tool_result = web_search(f"details for {task_to_perform}") # Assuming web_search is defined
    return result, tool_result
    
# result_decorated = my_workflow_decorated("complex data processing")

# Option 2: Managing traces manually
# import agentops # Already imported

# custom_trace = agentops.start_trace(name="MyManualWorkflow", tags=["manual-flow"])
# try:
#     # Your code here
#     main_agent = MyAgent(agent_id="manual-workflow-agent") # Assuming MyAgent is defined
#     result = main_agent.perform_task("another complex task")
#     tool_result = web_search(f"info for {result}") # Assuming web_search is defined
#     agentops.end_trace(custom_trace, end_state="Success", end_prompt=f"Completed: {result}")
# except Exception as e:
#     if custom_trace: # Ensure trace was started before trying to end it
#         agentops.end_trace(custom_trace, end_state="Fail", error_message=str(e))
#     raise
```

### Updating Trace Metadata

You can also update metadata on running traces to add context or track progress:

```python
from agentops import update_trace_metadata

# Update metadata during trace execution
update_trace_metadata({
    "operation_name": "AI Agent Processing",
    "processing_stage": "data_validation",
    "records_processed": 1500,
    "user_id": "user_123",
    "tags": ["validation", "production"]
})
```

## Complete Example with Decorators

Here's a consolidated example showcasing how these decorators can work together:
```python
import agentops
from agentops.sdk.decorators import agent, operation, tool, trace
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()
AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY")

# Initialize AgentOps. 
# Set auto_start_session=False because @trace will manage the session.
agentops.init(AGENTOPS_API_KEY, auto_start_session=False, tags=["quickstart-complete-example"])

# Define a tool
@tool(name="AdvancedSearch", cost=0.02)
def advanced_web_search(query: str) -> str:
    # Simulate a more advanced search
    return f"Advanced search results for '{query}': [Details...]"

# Define an agent class
@agent(name="ResearchSpecialistAgent")
class ResearchAgent:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id # This will be used as the agent_id in AgentOps
        
    @operation(name="ConductResearch")
    def conduct_research(self, research_topic: str) -> str:
        # Use the tool within the agent's operation
        search_results = advanced_web_search(f"Deep dive into {research_topic}")
        # Simulate further processing
        analysis = f"Analysis of '{research_topic}': Based on '{search_results}', the key findings are..."
        return analysis

# Define a workflow using the @trace decorator
@trace(name="FullResearchWorkflow", tags=["research", "analysis", "example"])
def run_full_research_workflow(topic: str) -> str:
    specialist_agent = ResearchAgent(agent_id="researcher-alpha-007")
    research_findings = specialist_agent.conduct_research(topic)
    
    final_report = f"Research Report for '{topic}':\n{research_findings}"
    # agentops.record(Events("ReportGenerated", details=final_report)) # Optional: record a custom event
    return final_report

# Execute the workflow
final_output = run_full_research_workflow("AI in healthcare")
print(final_output)
```

## Next Steps

You've seen how to get started with AgentOps! Explore further to leverage its full potential:

    See how AgentOps automatically instruments popular LLM and agent frameworks.
  
    Explore detailed examples for various use cases and integrations.
  
    Dive deeper into the AgentOps SDK capabilities and API.
  
    Learn how to group operations and create custom traces using the @trace decorator.
  

### v2/concepts/core-concepts.mdx

---
title: 'Core Concepts'
description: 'Understanding the fundamental concepts of AgentOps'
---

# The AgentOps SDK Architecture

AgentOps is designed to provide comprehensive monitoring and analytics for AI agent workflows with minimal implementation effort. The SDK follows these key design principles:

## Automated Instrumentation

After calling `agentops.init()`, the SDK automatically identifies installed LLM providers and instruments their API calls. This allows AgentOps to capture interactions between your code and the LLM providers to collect data for your dashboard without requiring manual instrumentation for every call.

## Declarative Tracing with Decorators

The [decorators](https://github.com/AgentOps-AI/agentops/blob/main/v2/concepts/decorators) system allows you to add tracing to your existing functions and classes with minimal code changes. Decorators create hierarchical spans that provide a structured view of your agent's operations for monitoring and analysis.

## OpenTelemetry Foundation

AgentOps is built on [OpenTelemetry](https://opentelemetry.io/), a widely-adopted standard for observability instrumentation. This provides a robust and standardized approach to collecting, processing, and exporting telemetry data.

# Sessions

A [Session](https://github.com/AgentOps-AI/agentops/blob/main/v2/concepts/sessions) represents a single user interaction with your agent. When you initialize AgentOps using the `init` function, a session is automatically created for you:

```python
import agentops

# Initialize AgentOps with automatic session creation
agentops.init(api_key="YOUR_API_KEY")
```

By default, all events and API calls will be associated with this session. For more advanced use cases, you can control session creation manually:

```python
# Initialize without auto-starting a session
agentops.init(api_key="YOUR_API_KEY", auto_start_session=False)

# Later, manually start a session when needed
agentops.start_session(tags=["customer-query"])
```

# Span Hierarchy

In AgentOps, activities are organized into a hierarchical structure of spans:

- **SESSION**: The root container for all activities in a single execution of your workflow
- **AGENT**: Represents an autonomous entity with specialized capabilities
- **WORKFLOW**: A logical grouping of related operations
- **OPERATION/TASK**: A specific task or function performed by an agent
- **LLM**: An interaction with a language model
- **TOOL**: The use of a tool or API by an agent

This hierarchy creates a complete trace of your agent's execution:

```
SESSION
   AGENT
        OPERATION/TASK
             LLM
             TOOL
        WORKFLOW
              OPERATION/TASK
   LLM (unattributed to a specific agent)
```

# Agents

An **Agent** represents a component in your application that performs tasks. You can create and track agents using the `@agent` decorator:

```python
from agentops.sdk.decorators import agent, operation

@agent(name="customer_service")
class CustomerServiceAgent:
    @operation
    def answer_query(self, query):
        # Agent logic here
        pass
```

# LLM Events

AgentOps automatically tracks LLM API calls from supported providers, collecting valuable information like:

- **Model**: The specific model used (e.g., "gpt-4", "claude-3-opus")
- **Provider**: The LLM provider (e.g., "OpenAI", "Anthropic")
- **Prompt Tokens**: Number of tokens in the input
- **Completion Tokens**: Number of tokens in the output
- **Cost**: The estimated cost of the interaction
- **Messages**: The prompt and completion content

```python
import agentops
from openai import OpenAI

# Initialize AgentOps
agentops.init(api_key="YOUR_API_KEY")

# Initialize the OpenAI client
client = OpenAI()

# This LLM call is automatically tracked
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "What's the capital of France?"}]
)
```

# Tags

[Tags](https://github.com/AgentOps-AI/agentops/blob/main/v2/concepts/tags) help you organize and filter your sessions. You can add tags when initializing AgentOps or when starting a session:

```python
# Add tags when initializing
agentops.init(api_key="YOUR_API_KEY", tags=["production", "web-app"])

# Or when manually starting a session
agentops.start_session(tags=["customer-service", "tier-1"])
```

# Host Environment

AgentOps automatically collects basic [information](https://github.com/AgentOps-AI/agentops/blob/main/v2/concepts/host-env) about the environment where your agent is running:

- **Operating System**: The OS type and version
- **Python Version**: The version of Python being used
- **Hostname**: The name of the host machine (anonymized)
- **SDK Version**: The version of the AgentOps SDK being used

# Dashboard Views

The AgentOps dashboard provides several ways to visualize and analyze your agent's performance:

- **Session List**: Overview of all sessions with filtering options
- **Timeline View**: Chronological display of spans showing duration and relationships
- **Tree View**: Hierarchical representation of spans showing parent-child relationships
- **Message View**: Detailed view of LLM interactions with prompt and completion content
- **Analytics**: Aggregated metrics across sessions and operations

# Putting It All Together

A typical implementation looks like this:

```python
import agentops
from openai import OpenAI
from agentops.sdk.decorators import agent, operation

# Initialize AgentOps
agentops.init(api_key="YOUR_API_KEY", tags=["production"])

# Define an agent
@agent(name="assistant")
class AssistantAgent:
    def __init__(self):
        self.client = OpenAI()
    
    @operation
    def answer_question(self, question):
        # This LLM call will be automatically tracked and associated with this agent
        response = self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": question}]
        )
        return response.choices[0].message.content

def workflow():
    # Use the agent
    assistant = AssistantAgent()
    answer = assistant.answer_question("What's the capital of France?")
    print(answer)

workflow()
# Session is automatically tracked until application terminates
```


### v1/quickstart.mdx

---
title: "Quickstart"
description: "Start using AgentOps with just 2 lines of code"
---
import CodeTooltip from '/snippets/add-code-tooltip.mdx'
import EnvTooltip from '/snippets/add-env-tooltip.mdx'

      ```bash pip 
      pip install agentops
      ```
      ```bash poetry
      poetry add agentops
      ```
    
    Get an AgentOps API key [here](https://app.agentops.ai/settings/projects)
      
        ```python python
        import agentops
        agentops.init()
        ```
      
    Execute your program and visit [app.agentops.ai/drilldown](https://app.agentops.ai/drilldown) to observe your Agent! 

    After your run, AgentOps prints a clickable URL to console linking directly to your session in the Dashboard 
     {/* Intentionally blank div for newline */}

[Give us a star](https://github.com/AgentOps-AI/agentops) if you liked AgentOps! (you may be our 3,000th )

## More basic functionality

  You can instrument functions inside your code with the `@operation` decorator, which will create spans that track function execution, parameters, and return values. These operations will be displayed in your session visualization alongside LLM calls.
  ```python python
  # Instrument a function as an operation
  from agentops.sdk.decorators import operation
  
  @operation
  def process_data(data):
      # Your function logic here
      result = data.upper()
      return result
```

  If you use specific named agents within your system, you can create agent spans that contain all downstream operations using the `@agent` decorator.
  ```python python
  # Create an agent class
  from agentops.sdk.decorators import agent, operation
  
  @agent
  class MyAgent:
      def __init__(self, name):
          self.name = name
          
      @operation
      def perform_task(self, task):
          # Agent task logic here
          return f"Completed {task}"
```

  Create a session to group all your agent operations by using the `@session` decorator. Sessions serve as the root span for all operations.
  ```python python
  # Create a session
  from agentops.sdk.decorators import session
  
  @session
  def my_workflow():
      # Your session code here
      agent = MyAgent("research-agent")
      result = agent.perform_task("data analysis")
      return result
      
  # Run the session
  my_workflow()
  ```

## Example Code

Here is the complete code from the sections above

```python python
import agentops
from agentops.sdk.decorators import session, agent, operation

# Initialize AgentOps
agentops.init()

# Create an agent class
@agent
class MyAgent:
    def __init__(self, name):
        self.name = name
        
    @operation
    def perform_task(self, task):
        # Agent task logic here
        return f"Completed {task}"

# Create a session
@session
def my_workflow():
    # Your session code here
    agent = MyAgent("research-agent")
    result = agent.perform_task("data analysis")
    return result
    
# Run the session
my_workflow()
```

  Jupyter Notebook with sample code that you can run!

  That's all you need to get started! Check out the documentation below to see how you can record other operations. AgentOps is a lot more powerful this way!

## Explore our more advanced functionality!

    Record all of your operations the way AgentOps intends.
  
    Associate operations with specific named agents.
  

## Instrumentation Architecture

### agentops/instrumentation/__init__.py

```python
"""
AgentOps Instrumentation Module

This module provides automatic instrumentation for various LLM providers and agentic libraries.
It works by monitoring Python imports and automatically instrumenting packages as they are imported.

Key Features:
- Automatic detection and instrumentation of LLM providers (OpenAI, Anthropic, etc.)
- Support for agentic libraries (CrewAI, AutoGen, etc.)
- Version-aware instrumentation (only activates for supported versions)
- Smart handling of provider vs agentic library conflicts
- Non-intrusive monitoring using Python's import system
"""

from typing import Optional, Set, TypedDict

try:
    from typing import NotRequired
except ImportError:
    from typing_extensions import NotRequired
from types import ModuleType
from dataclasses import dataclass
import importlib
import sys
from packaging.version import Version, parse
import builtins

# Add os and site for path checking
import os
import site

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor  # type: ignore

from agentops.logging import logger
from agentops.sdk.core import tracer
from agentops.instrumentation.common import get_library_version


# Define the structure for instrumentor configurations
class InstrumentorConfig(TypedDict):
    module_name: str
    class_name: str
    min_version: str
    package_name: NotRequired[str]  # Optional: actual pip package name if different from module


# Configuration for supported LLM providers
PROVIDERS: dict[str, InstrumentorConfig] = {
    "openai": {
        "module_name": "agentops.instrumentation.providers.openai",
        "class_name": "OpenaiInstrumentor",
        "min_version": "1.0.0",
    },
    "anthropic": {
        "module_name": "agentops.instrumentation.providers.anthropic",
        "class_name": "AnthropicInstrumentor",
        "min_version": "0.32.0",
    },
    "ibm_watsonx_ai": {
        "module_name": "agentops.instrumentation.providers.ibm_watsonx_ai",
        "class_name": "WatsonxInstrumentor",
        "min_version": "0.1.0",
    },
    "google.genai": {
        "module_name": "agentops.instrumentation.providers.google_genai",
        "class_name": "GoogleGenaiInstrumentor",
        "min_version": "0.1.0",
        "package_name": "google-genai",  # Actual pip package name
    },
    "mem0": {
        "module_name": "agentops.instrumentation.providers.mem0",
        "class_name": "Mem0Instrumentor",
        "min_version": "0.1.0",
        "package_name": "mem0ai",
    },
}

# Configuration for supported agentic libraries
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
    "crewai": {
        "module_name": "agentops.instrumentation.agentic.crewai",
        "class_name": "CrewaiInstrumentor",
        "min_version": "0.56.0",
    },
    "autogen": {
        "module_name": "agentops.instrumentation.agentic.ag2",
        "class_name": "AG2Instrumentor",
        "min_version": "0.3.2",
    },
    "agents": {
        "module_name": "agentops.instrumentation.agentic.openai_agents",
        "class_name": "OpenAIAgentsInstrumentor",
        "min_version": "0.0.1",
    },
    "google.adk": {
        "module_name": "agentops.instrumentation.agentic.google_adk",
        "class_name": "GooogleAdkInstrumentor",
        "min_version": "0.1.0",
    },
    "agno": {
        "module_name": "agentops.instrumentation.agentic.agno",
        "class_name": "AgnoInstrumentor",
        "min_version": "1.5.8",
    },
    "smolagents": {
        "module_name": "agentops.instrumentation.agentic.smolagents",
        "class_name": "SmolagentsInstrumentor",
        "min_version": "1.0.0",
    },
    "langgraph": {
        "module_name": "agentops.instrumentation.agentic.langgraph",
        "class_name": "LanggraphInstrumentor",
        "min_version": "0.2.0",
    },
}

# Combine all target packages for monitoring
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys())

# Create a single instance of the manager
# _manager = InstrumentationManager() # Removed

# Module-level state variables
_active_instrumentors: list[BaseInstrumentor] = []
_original_builtins_import = builtins.__import__  # Store original import
_instrumenting_packages: Set[str] = set()
_has_agentic_library: bool = False


# New helper function to check module origin
def _is_installed_package(module_obj: ModuleType, package_name_key: str) -> bool:
    """
    Determines if the given module object corresponds to an installed site-package
    rather than a local module, especially when names might collide.
    `package_name_key` is the key from TARGET_PACKAGES (e.g., 'agents', 'google.adk').
    """
    if not hasattr(module_obj, "__file__") or not module_obj.__file__:
        logger.debug(
            f"_is_installed_package: Module '{package_name_key}' has no __file__, assuming it might be an SDK namespace package. Returning True."
        )
        return True

    module_path = os.path.normcase(os.path.realpath(os.path.abspath(module_obj.__file__)))

    # Priority 1: Check if it's in any site-packages directory.
    site_packages_dirs = site.getsitepackages()
    if isinstance(site_packages_dirs, str):
        site_packages_dirs = [site_packages_dirs]

    if hasattr(site, "USER_SITE") and site.USER_SITE and os.path.exists(site.USER_SITE):
        site_packages_dirs.append(site.USER_SITE)

    normalized_site_packages_dirs = [
        os.path.normcase(os.path.realpath(p)) for p in site_packages_dirs if p and os.path.exists(p)
    ]

    for sp_dir in normalized_site_packages_dirs:
        if module_path.startswith(sp_dir):
            logger.debug(
                f"_is_installed_package: Module '{package_name_key}' is a library, instrumenting '{package_name_key}'."
            )
            return True

    # Priority 2: If not in site-packages, it's highly likely a local module or not an SDK we target.
    logger.debug(f"_is_installed_package: Module '{package_name_key}' is a local module, skipping instrumentation.")
    return False


def _is_package_instrumented(package_name: str) -> bool:
    """Check if a package is already instrumented by looking at active instrumentors."""
    # Handle package.module names by converting dots to underscores for comparison
    normalized_target_name = package_name.replace(".", "_").lower()
    for instrumentor in _active_instrumentors:
        # Check based on the key it was registered with
        if (
            hasattr(instrumentor, "_agentops_instrumented_package_key")
            and instrumentor._agentops_instrumented_package_key == package_name
        ):
            return True

        # Fallback to class name check (existing logic, less precise)
        # We use split('.')[-1] for cases like 'google.genai' to match GenAIInstrumentor
        instrumentor_class_name_prefix = instrumentor.__class__.__name__.lower().replace("instrumentor", "")
        target_base_name = package_name.split(".")[-1].lower()
        normalized_class_name_match = (
            normalized_target_name.startswith(instrumentor_class_name_prefix)
            or target_base_name == instrumentor_class_name_prefix
        )

        if normalized_class_name_match:
            # This fallback can be noisy, let's make it more specific or rely on the key above more
            # For now, if the key matches or this broad name match works, consider instrumented.
            # This helps if _agentops_instrumented_package_key was somehow not set.
            return True

    return False


def _uninstrument_providers():
    """Uninstrument all provider instrumentors while keeping agentic libraries active."""
    global _active_instrumentors
    new_active_instrumentors = []
    uninstrumented_any = False
    for instrumentor in _active_instrumentors:
        instrumented_key = getattr(instrumentor, "_agentops_instrumented_package_key", None)
        if instrumented_key and instrumented_key in PROVIDERS:
            try:
                instrumentor.uninstrument()
                logger.debug(
                    f"AgentOps: Uninstrumented provider: {instrumentor.__class__.__name__} (for package '{instrumented_key}') due to agentic library activation."
                )
                uninstrumented_any = True
            except Exception as e:
                logger.error(f"Error uninstrumenting provider {instrumentor.__class__.__name__}: {e}")
        else:
            # Keep non-provider instrumentors or those without our key (shouldn't happen for managed ones)
            new_active_instrumentors.append(instrumentor)

    if uninstrumented_any or not new_active_instrumentors and _active_instrumentors:
        logger.debug(
            f"_uninstrument_providers: Processed. Previous active: {len(_active_instrumentors)}, New active after filtering providers: {len(new_active_instrumentors)}"
        )
    _active_instrumentors = new_active_instrumentors


def _should_instrument_package(package_name: str) -> bool:
    """
    Determine if a package should be instrumented based on current state.
    Handles special cases for agentic libraries and providers.
    """
    global _has_agentic_library

    # If already instrumented by AgentOps (using our refined check), skip.
    if _is_package_instrumented(package_name):
        logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.")
        return False

    is_target_agentic = package_name in AGENTIC_LIBRARIES
    is_target_provider = package_name in PROVIDERS

    if not is_target_agentic and not is_target_provider:
        logger.debug(
            f"_should_instrument_package: '{package_name}' is not a targeted provider or agentic library. Skipping."
        )
        return False

    if _has_agentic_library:
        # An agentic library is already active.
        if is_target_agentic:
            logger.debug(
                f"AgentOps: An agentic library is active. Skipping instrumentation for subsequent agentic library '{package_name}'."
            )
            return False
        if is_target_provider:
            logger.debug(
                f"AgentOps: An agentic library is active. Skipping instrumentation for provider '{package_name}'."
            )
            return False
    else:
        # No agentic library is active yet.
        if is_target_agentic:
            logger.debug(
                f"AgentOps: '{package_name}' is the first-targeted agentic library. Will uninstrument providers if any are/become active."
            )
            _uninstrument_providers()
            return True
        if is_target_provider:
            logger.debug(
                f"_should_instrument_package: '{package_name}' is a provider, no agentic library active. Allowing."
            )
            return True

    logger.debug(
        f"_should_instrument_package: Defaulting to False for '{package_name}' (state: _has_agentic_library={_has_agentic_library})"
    )
    return False


def _perform_instrumentation(package_name: str):
    """Helper function to perform instrumentation for a given package."""
    global _instrumenting_packages, _active_instrumentors, _has_agentic_library
    if not _should_instrument_package(package_name):
        return

    # Get the appropriate configuration for the package
    # Ensure package_name is a key in either PROVIDERS or AGENTIC_LIBRARIES
    if package_name not in PROVIDERS and package_name not in AGENTIC_LIBRARIES:
        logger.debug(
            f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS or AGENTIC_LIBRARIES. Skipping."
        )
        return

    config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name)
    loader = InstrumentorLoader(**config)

    # instrument_one already checks loader.should_activate
    instrumentor_instance = instrument_one(loader)
    if instrumentor_instance is not None:
        # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
        # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
        # For now, assuming instrument_one returns instance only on full success.
        # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.

        # Let's assume instrument_one might return an instance whose .instrument() failed.
        # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
        # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.

        # Store the package key this instrumentor is for, to aid _is_package_instrumented
        instrumentor_instance._agentops_instrumented_package_key = package_name

        # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
        # This is a safeguard, _is_package_instrumented should catch this earlier.
        is_newly_added = True
        for existing_inst in _active_instrumentors:
            if (
                hasattr(existing_inst, "_agentops_instrumented_package_key")
                and existing_inst._agentops_instrumented_package_key == package_name
            ):
                is_newly_added = False
                logger.debug(
                    f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
                )
                break
        if is_newly_added:
            _active_instrumentors.append(instrumentor_instance)

        # If this was an agentic library AND it's newly effectively instrumented.
        if (
            package_name in AGENTIC_LIBRARIES and not _has_agentic_library
        ):  # Check _has_agentic_library to ensure this is the *first* one.
            # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
            _has_agentic_library = True

        # Special case: If mem0 is instrumented, also instrument concurrent.futures
        if package_name == "mem0" and is_newly_added:
            try:
                # Check if concurrent.futures module is available

                # Create config for concurrent.futures instrumentor
                concurrent_config = InstrumentorConfig(
                    module_name="agentops.instrumentation.utilities.concurrent_futures",
                    class_name="ConcurrentFuturesInstrumentor",
                    min_version="3.7.0",  # Python 3.7+ (concurrent.futures is stdlib)
                    package_name="python",  # Special case for stdlib modules
                )

                # Create and instrument concurrent.futures
                concurrent_loader = InstrumentorLoader(**concurrent_config)
                concurrent_instrumentor = instrument_one(concurrent_loader)

                if concurrent_instrumentor is not None:
                    concurrent_instrumentor._agentops_instrumented_package_key = "concurrent.futures"
                    _active_instrumentors.append(concurrent_instrumentor)
                    logger.debug("AgentOps: Instrumented concurrent.futures as a dependency of mem0.")
            except Exception as e:
                logger.debug(f"Could not instrument concurrent.futures for mem0: {e}")
    else:
        logger.debug(
            f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
        )


def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
    """
    Monitor imports and instrument packages as they are imported.
    This replaces the built-in import function to intercept package imports.
    """
    global _instrumenting_packages, _has_agentic_library

    # If an agentic library is already instrumented, skip all further instrumentation
    if _has_agentic_library:
        return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)

    # First, do the actual import
    module = _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)

    # Check for exact matches first (handles package.module like google.adk)
    packages_to_check = set()

    # Check the imported module itself
    if name in TARGET_PACKAGES:
        packages_to_check.add(name)
    else:
        # Check if any target package is a prefix of the import name
        for target in TARGET_PACKAGES:
            if name.startswith(target + ".") or name == target:
                packages_to_check.add(target)

    # For "from X import Y" style imports, also check submodules
    if fromlist:
        for item in fromlist:
            # Construct potential full name, e.g., "google.adk" from name="google", item="adk"
            # Or if name="os", item="path", full_name="os.path"
            # If the original name itself is a multi-part name like "a.b", and item is "c", then "a.b.c"
            # This logic needs to correctly identify the root package if 'name' is already a sub-package.
            # The existing TARGET_PACKAGES check is simpler: it checks against pre-defined full names.

            # Check full name if item forms part of a target package name
            full_item_name_candidate = f"{name}.{item}"

            if full_item_name_candidate in TARGET_PACKAGES:
                packages_to_check.add(full_item_name_candidate)
            else:  # Fallback to checking if 'name' itself is a target
                for target in TARGET_PACKAGES:
                    if name == target or name.startswith(target + "."):
                        packages_to_check.add(target)  # Check the base target if a submodule is imported from it.

    # Instrument all matching packages
    for package_to_check in packages_to_check:
        if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check):
            target_module_obj = sys.modules.get(package_to_check)

            if target_module_obj:
                is_sdk = _is_installed_package(target_module_obj, package_to_check)
                if not is_sdk:
                    logger.debug(
                        f"AgentOps: Target '{package_to_check}' appears to be a local module/directory. Skipping AgentOps SDK instrumentation for it."
                    )
                    continue
            else:
                logger.debug(
                    f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt."
                )

            _instrumenting_packages.add(package_to_check)
            try:
                _perform_instrumentation(package_to_check)
                # If we just instrumented an agentic library, stop
                if _has_agentic_library:
                    break
            except Exception as e:
                logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
            finally:
                _instrumenting_packages.discard(package_to_check)

    return module


@dataclass
class InstrumentorLoader:
    """
    Represents a dynamically-loadable instrumentor.
    Handles version checking and instantiation of instrumentors.
    """

    module_name: str
    class_name: str
    min_version: str
    package_name: Optional[str] = None  # Optional: actual pip package name

    @property
    def module(self) -> ModuleType:
        """Get the instrumentor module."""
        return importlib.import_module(self.module_name)

    @property
    def should_activate(self) -> bool:
        """Check if the package is available and meets version requirements."""
        try:
            # Special case for stdlib modules (like concurrent.futures)
            if self.package_name == "python":
                import sys

                python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
                return Version(python_version) >= parse(self.min_version)

            # Use explicit package_name if provided, otherwise derive from module_name
            if self.package_name:
                provider_name = self.package_name
            else:
                provider_name = self.module_name.split(".")[-1]

            # Use common version utility
            module_version = get_library_version(provider_name)
            return module_version != "unknown" and Version(module_version) >= parse(self.min_version)
        except Exception:
            return False

    def get_instance(self) -> BaseInstrumentor:
        """Create and return a new instance of the instrumentor."""
        return getattr(self.module, self.class_name)()


def instrument_one(loader: InstrumentorLoader) -> Optional[BaseInstrumentor]:
    """
    Instrument a single package using the provided loader.
    Returns the instrumentor instance if successful, None otherwise.
    """
    if not loader.should_activate:
        # This log is important for users to know why something wasn't instrumented.
        logger.debug(
            f"AgentOps: Package '{loader.package_name or loader.module_name}' not found or version is less than minimum required ('{loader.min_version}'). Skipping instrumentation."
        )
        return None

    instrumentor = loader.get_instance()
    try:
        # Use the provider directly from the global tracer instance
        instrumentor.instrument(tracer_provider=tracer.provider)
        logger.debug(
            f"AgentOps: Successfully instrumented '{loader.class_name}' for package '{loader.package_name or loader.module_name}'."
        )
    except Exception as e:
        logger.error(
            f"Failed to instrument {loader.class_name} for {loader.package_name or loader.module_name}: {e}",
            exc_info=True,
        )
    return instrumentor


def instrument_all():
    """Start monitoring and instrumenting packages if not already started."""
    # Check if active_instrumentors is empty, as a proxy for not started.
    if not _active_instrumentors:
        builtins.__import__ = _import_monitor
        global _instrumenting_packages, _has_agentic_library

        # If an agentic library is already instrumented, don't instrument anything else
        if _has_agentic_library:
            return

        for name in list(sys.modules.keys()):
            # Stop if an agentic library gets instrumented during the loop
            if _has_agentic_library:
                break

            module = sys.modules.get(name)
            if not isinstance(module, ModuleType):
                continue

            # Check for exact matches first (handles package.module like google.adk)
            package_to_check = None
            if name in TARGET_PACKAGES:
                package_to_check = name
            else:
                # Check if any target package is a prefix of the module name
                for target in TARGET_PACKAGES:
                    if name.startswith(target + ".") or name == target:
                        package_to_check = target
                        break

            if (
                package_to_check
                and package_to_check not in _instrumenting_packages
                and not _is_package_instrumented(package_to_check)
            ):
                target_module_obj = sys.modules.get(package_to_check)

                if target_module_obj:
                    is_sdk = _is_installed_package(target_module_obj, package_to_check)
                    if not is_sdk:
                        continue
                else:
                    logger.debug(
                        f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously."
                    )

                _instrumenting_packages.add(package_to_check)
                try:
                    _perform_instrumentation(package_to_check)
                except Exception as e:
                    logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
                finally:
                    _instrumenting_packages.discard(package_to_check)


def uninstrument_all():
    """Stop monitoring and uninstrument all packages."""
    global _active_instrumentors, _has_agentic_library
    builtins.__import__ = _original_builtins_import
    for instrumentor in _active_instrumentors:
        instrumentor.uninstrument()
        logger.debug(f"Uninstrumented {instrumentor.__class__.__name__}")
    _active_instrumentors = []
    _has_agentic_library = False


def get_active_libraries() -> set[str]:
    """
    Get all actively used libraries in the current execution context.
    Returns a set of package names that are currently imported and being monitored.
    """
    active_libs = set()
    for name, module in sys.modules.items():
        if not isinstance(module, ModuleType):
            continue

        # Check for exact matches first
        if name in TARGET_PACKAGES:
            active_libs.add(name)
        else:
            # Check if any target package is a prefix of the module name
            for target in TARGET_PACKAGES:
                if name.startswith(target + ".") or name == target:
                    active_libs.add(target)
                    break

    return active_libs

```

### agentops/instrumentation/README.md

# AgentOps Instrumentation

This package provides OpenTelemetry instrumentation for various LLM providers and related services.

## Available Instrumentors

- **OpenAI** (`v0.27.0+` and `v1.0.0+`)
- **Anthropic** (`v0.7.0+`)
- **Google GenAI** (`v0.1.0+`)
- **IBM WatsonX AI** (`v0.1.0+`)
- **CrewAI** (`v0.56.0+`)
- **AG2/AutoGen** (`v0.3.2+`)
- **Google ADK** (`v0.1.0+`)
- **Agno** (`v0.0.1+`)
- **Mem0** (`v0.1.0+`)
- **smolagents** (`v0.1.0+`)

## Common Module Usage

The `agentops.instrumentation.common` module provides shared utilities for creating instrumentations:

### Base Instrumentor

Use `CommonInstrumentor` for creating new instrumentations:

```python
from agentops.instrumentation.common import CommonInstrumentor, InstrumentorConfig, WrapConfig

class MyInstrumentor(CommonInstrumentor):
    def __init__(self):
        config = InstrumentorConfig(
            library_name="my-library",
            library_version="1.0.0",
            wrapped_methods=[
                WrapConfig(
                    trace_name="my.method",
                    package="my_library.module",
                    class_name="MyClass",
                    method_name="my_method",
                    handler=my_attribute_handler
                )
            ],
            dependencies=["my-library >= 1.0.0"]
        )
        super().__init__(config)
```

### Attribute Handlers

Create attribute handlers to extract data from method calls:

```python
from agentops.instrumentation.common import AttributeMap

def my_attribute_handler(args=None, kwargs=None, return_value=None) -> AttributeMap:
    attributes = {}
    
    if kwargs and "model" in kwargs:
        attributes["llm.request.model"] = kwargs["model"]
    
    if return_value and hasattr(return_value, "usage"):
        attributes["llm.usage.total_tokens"] = return_value.usage.total_tokens
    
    return attributes
```

### Span Management

Use the span management utilities for consistent span creation:

```python
from agentops.instrumentation.common import create_span, SpanAttributeManager

# Create an attribute manager
attr_manager = SpanAttributeManager(service_name="my-service")

# Use the create_span context manager
with create_span(
    tracer,
    "my.operation",
    attributes={"my.attribute": "value"},
    attribute_manager=attr_manager
) as span:
    # Your operation code here
    pass
```

### Token Counting

Use the token counting utilities for consistent token usage extraction:

```python
from agentops.instrumentation.common import TokenUsageExtractor, set_token_usage_attributes

# Extract token usage from a response
usage = TokenUsageExtractor.extract_from_response(response)

# Set token usage attributes on a span
set_token_usage_attributes(span, response)
```

### Streaming Support

Use streaming utilities for handling streaming responses:

```python
from agentops.instrumentation.common import create_stream_wrapper_factory, StreamingResponseHandler

# Create a stream wrapper factory
wrapper = create_stream_wrapper_factory(
    tracer,
    "my.stream",
    extract_chunk_content=StreamingResponseHandler.extract_generic_chunk_content,
    initial_attributes={"stream.type": "text"}
)

# Apply to streaming methods
wrap_function_wrapper("my_module", "stream_method", wrapper)
```

### Metrics

Use standard metrics for consistency across instrumentations:

```python
from agentops.instrumentation.common import StandardMetrics, MetricsRecorder

# Create standard metrics
metrics = StandardMetrics.create_standard_metrics(meter)

# Use the metrics recorder
recorder = MetricsRecorder(metrics)
recorder.record_token_usage(prompt_tokens=100, completion_tokens=50)
recorder.record_duration(1.5)
```

## Creating a New Instrumentor

1. Create a new directory under `agentops/instrumentation/` for your provider
2. Create an `__init__.py` file with version information
3. Create an `instrumentor.py` file extending `CommonInstrumentor`
4. Create attribute handlers in an `attributes/` subdirectory
5. Add your instrumentor to the main `__init__.py` configuration

Example structure:
```
agentops/instrumentation/
 my_provider/
    __init__.py
    instrumentor.py
    attributes/
        __init__.py
        handlers.py
```

## Best Practices

1. **Use Common Utilities**: Leverage the common module for consistency
2. **Follow Semantic Conventions**: Use attributes from `agentops.semconv`
3. **Handle Errors Gracefully**: Wrap operations in try-except blocks
4. **Support Async**: Provide both sync and async method wrapping
5. **Document Attributes**: Comment on what attributes are captured
6. **Test Thoroughly**: Write unit tests for your instrumentor

## Examples

See the `examples/` directory for usage examples of each instrumentor.


### agentops/instrumentation/providers/openai/instrumentor.py

```python
"""OpenAI API Instrumentation for AgentOps

This module provides comprehensive instrumentation for the OpenAI API, including:
- Chat completions (streaming and non-streaming)
- Regular completions
- Embeddings
- Image generation
- Assistants API (create, runs, messages)
- Responses API (Agents SDK)

The instrumentation supports both sync and async methods, metrics collection,
and distributed tracing.
"""

from typing import Dict, Any
from wrapt import wrap_function_wrapper

from opentelemetry.metrics import Meter

from agentops.logging import logger
from agentops.instrumentation.common import (
    CommonInstrumentor,
    InstrumentorConfig,
    WrapConfig,
    StandardMetrics,
    MetricsRecorder,
)
from agentops.instrumentation.providers.openai import LIBRARY_NAME, LIBRARY_VERSION
from agentops.instrumentation.providers.openai.config import Config
from agentops.instrumentation.providers.openai.utils import is_openai_v1
from agentops.instrumentation.providers.openai.wrappers import (
    handle_completion_attributes,
    handle_embeddings_attributes,
    handle_image_gen_attributes,
    handle_assistant_attributes,
    handle_run_attributes,
    handle_run_retrieve_attributes,
    handle_run_stream_attributes,
    handle_messages_attributes,
)
from agentops.instrumentation.providers.openai.stream_wrapper import (
    chat_completion_stream_wrapper,
    async_chat_completion_stream_wrapper,
    responses_stream_wrapper,
    async_responses_stream_wrapper,
)
from agentops.instrumentation.providers.openai.v0 import OpenAIV0Instrumentor
from agentops.semconv import Meters

_instruments = ("openai >= 0.27.0",)


class OpenaiInstrumentor(CommonInstrumentor):
    """An instrumentor for OpenAI's client library with comprehensive coverage."""

    def __init__(
        self,
        enrich_assistant: bool = False,
        enrich_token_usage: bool = False,
        exception_logger=None,
        get_common_metrics_attributes=None,
        upload_base64_image=None,
        enable_trace_context_propagation: bool = True,
    ):
        # Configure the global config with provided options
        Config.enrich_assistant = enrich_assistant
        Config.enrich_token_usage = enrich_token_usage
        Config.exception_logger = exception_logger
        Config.get_common_metrics_attributes = get_common_metrics_attributes or (lambda: {})
        Config.upload_base64_image = upload_base64_image
        Config.enable_trace_context_propagation = enable_trace_context_propagation

        # Create instrumentor config
        config = InstrumentorConfig(
            library_name=LIBRARY_NAME,
            library_version=LIBRARY_VERSION,
            wrapped_methods=self._get_wrapped_methods(),
            metrics_enabled=True,
            dependencies=_instruments,
        )

        super().__init__(config)

    def _initialize(self, **kwargs):
        """Handle version-specific initialization."""
        if not is_openai_v1():
            # For v0, use the legacy instrumentor
            OpenAIV0Instrumentor().instrument(**kwargs)
            # Skip normal instrumentation
            self.config.wrapped_methods = []

    def _custom_wrap(self, **kwargs):
        """Add custom wrappers for streaming functionality."""
        if is_openai_v1() and self._tracer:
            # from wrapt import wrap_function_wrapper
            # # Add streaming wrappers for v1
            try:
                # Chat completion streaming wrappers

                wrap_function_wrapper(
                    "openai.resources.chat.completions",
                    "Completions.create",
                    chat_completion_stream_wrapper(self._tracer),
                )

                wrap_function_wrapper(
                    "openai.resources.chat.completions",
                    "AsyncCompletions.create",
                    async_chat_completion_stream_wrapper(self._tracer),
                )

                # Beta chat completion streaming wrappers
                wrap_function_wrapper(
                    "openai.resources.beta.chat.completions",
                    "Completions.parse",
                    chat_completion_stream_wrapper(self._tracer),
                )

                wrap_function_wrapper(
                    "openai.resources.beta.chat.completions",
                    "AsyncCompletions.parse",
                    async_chat_completion_stream_wrapper(self._tracer),
                )

                # Responses API streaming wrappers
                wrap_function_wrapper(
                    "openai.resources.responses",
                    "Responses.create",
                    responses_stream_wrapper(self._tracer),
                )

                wrap_function_wrapper(
                    "openai.resources.responses",
                    "AsyncResponses.create",
                    async_responses_stream_wrapper(self._tracer),
                )
            except Exception as e:
                logger.warning(f"[OPENAI INSTRUMENTOR] Error setting up OpenAI streaming wrappers: {e}")
        else:
            if not is_openai_v1():
                logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - not using OpenAI v1")
            if not self._tracer:
                logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - no tracer available")

    def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
        """Create metrics for OpenAI instrumentation."""
        metrics = StandardMetrics.create_standard_metrics(meter)

        # Add OpenAI-specific metrics
        metrics.update(
            {
                "chat_choice_counter": meter.create_counter(
                    name=Meters.LLM_GENERATION_CHOICES,
                    unit="choice",
                    description="Number of choices returned by chat completions call",
                ),
                "streaming_time_to_first_token": meter.create_histogram(
                    name=Meters.LLM_STREAMING_TIME_TO_FIRST_TOKEN,
                    unit="s",
                    description="Time to first token in streaming chat completions",
                ),
                "streaming_time_to_generate": meter.create_histogram(
                    name=Meters.LLM_STREAMING_TIME_TO_GENERATE,
                    unit="s",
                    description="Time between first token and completion in streaming chat completions",
                ),
                "embeddings_vector_size_counter": meter.create_counter(
                    name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE,
                    unit="element",
                    description="The size of returned vector",
                ),
                "embeddings_exception_counter": meter.create_counter(
                    name=Meters.LLM_EMBEDDINGS_EXCEPTIONS,
                    unit="time",
                    description="Number of exceptions occurred during embeddings operation",
                ),
                "image_gen_exception_counter": meter.create_counter(
                    name=Meters.LLM_IMAGE_GENERATIONS_EXCEPTIONS,
                    unit="time",
                    description="Number of exceptions occurred during image generations operation",
                ),
            }
        )

        return metrics

    def _custom_unwrap(self, **kwargs):
        """Handle version-specific uninstrumentation."""
        if not is_openai_v1():
            OpenAIV0Instrumentor().uninstrument(**kwargs)

    def _get_wrapped_methods(self) -> list[WrapConfig]:
        """Get all methods that should be wrapped.

        Note: Chat completions and Responses API methods are NOT included here
        as they are wrapped directly in _custom_wrap to support streaming.
        """
        wrapped_methods = []

        # Regular completions
        wrapped_methods.extend(
            [
                WrapConfig(
                    trace_name="openai.completion",
                    package="openai.resources.completions",
                    class_name="Completions",
                    method_name="create",
                    handler=handle_completion_attributes,
                ),
                WrapConfig(
                    trace_name="openai.completion",
                    package="openai.resources.completions",
                    class_name="AsyncCompletions",
                    method_name="create",
                    handler=handle_completion_attributes,
                    is_async=True,
                ),
            ]
        )

        # Embeddings
        wrapped_methods.extend(
            [
                WrapConfig(
                    trace_name="openai.embeddings",
                    package="openai.resources.embeddings",
                    class_name="Embeddings",
                    method_name="create",
                    handler=handle_embeddings_attributes,
                ),
                WrapConfig(
                    trace_name="openai.embeddings",
                    package="openai.resources.embeddings",
                    class_name="AsyncEmbeddings",
                    method_name="create",
                    handler=handle_embeddings_attributes,
                    is_async=True,
                ),
            ]
        )

        # Image generation
        wrapped_methods.append(
            WrapConfig(
                trace_name="openai.images.generate",
                package="openai.resources.images",
                class_name="Images",
                method_name="generate",
                handler=handle_image_gen_attributes,
            )
        )

        # Beta APIs - these may not be available in all versions
        beta_methods = []

        # Assistants
        beta_methods.append(
            WrapConfig(
                trace_name="openai.assistants.create",
                package="openai.resources.beta.assistants",
                class_name="Assistants",
                method_name="create",
                handler=handle_assistant_attributes,
            )
        )

        # Runs
        beta_methods.extend(
            [
                WrapConfig(
                    trace_name="openai.runs.create",
                    package="openai.resources.beta.threads.runs",
                    class_name="Runs",
                    method_name="create",
                    handler=handle_run_attributes,
                ),
                WrapConfig(
                    trace_name="openai.runs.retrieve",
                    package="openai.resources.beta.threads.runs",
                    class_name="Runs",
                    method_name="retrieve",
                    handler=handle_run_retrieve_attributes,
                ),
                WrapConfig(
                    trace_name="openai.runs.create_and_stream",
                    package="openai.resources.beta.threads.runs",
                    class_name="Runs",
                    method_name="create_and_stream",
                    handler=handle_run_stream_attributes,
                ),
            ]
        )

        # Messages
        beta_methods.append(
            WrapConfig(
                trace_name="openai.messages.list",
                package="openai.resources.beta.threads.messages",
                class_name="Messages",
                method_name="list",
                handler=handle_messages_attributes,
            )
        )

        # Add beta methods to wrapped methods (they might fail)
        wrapped_methods.extend(beta_methods)

        return wrapped_methods

    def get_metrics_recorder(self) -> MetricsRecorder:
        """Get a metrics recorder for use in wrappers."""
        return MetricsRecorder(self._metrics)

```

## Examples

### examples/openai/openai_example_sync.py

```python
# OpenAI Sync Example
#
# We are going to create a simple chatbot that creates stories based on a prompt. The chatbot will use the gpt-4o-mini LLM to generate the story using a user prompt.
#
# We will track the chatbot with AgentOps and see how it performs!
# First let's install the required packages
# # Install required dependencies
# %pip install agentops
# %pip install openai
# %pip install python-dotenv
# Then import them
from openai import OpenAI
import agentops
import os
from dotenv import load_dotenv

# Next, we'll grab our API keys. You can use dotenv like below or however else you like to load environment variables
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here")
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here")

# Next we initialize the AgentOps client.
agentops.init(auto_start_session=True, trace_name="OpenAI Sync Example", tags=["openai", "sync", "agentops-example"])
tracer = agentops.start_trace(
    trace_name="OpenAI Sync Example", tags=["openai-sync-example", "openai", "agentops-example"]
)
client = OpenAI()

# And we are all set! Note the seesion url above. We will use it to track the chatbot.
#
# Let's create a simple chatbot that generates stories.
system_prompt = """
You are a master storyteller, with the ability to create vivid and engaging stories.
You have experience in writing for children and adults alike.
You are given a prompt and you need to generate a story based on the prompt.
"""

user_prompt = "Write a very short story about a cyber-warrior trapped in the imperial time period."

messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_prompt},
]

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
)

print(response.choices[0].message.content)

# The response is a string that contains the story. We can track this with AgentOps by navigating to the trace url and viewing the run.
# ## Streaming Version
# We will demonstrate the streaming version of the API.
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
    stream=True,
)

for chunk in stream:
    if chunk.choices and len(chunk.choices) > 0:
        print(chunk.choices[0].delta.content or "", end="")

agentops.end_trace(tracer, end_state="Success")

# Let's check programmatically that spans were recorded in AgentOps
print("\n" + "=" * 50)
print("Now let's verify that our LLM calls were tracked properly...")
try:
    result = agentops.validate_trace_spans(trace_context=tracer)
    agentops.print_validation_summary(result)
except agentops.ValidationError as e:
    print(f"\n❌ Error validating spans: {e}")
    raise

# Note that the response is a generator that yields chunks of the story. We can track this with AgentOps by navigating to the trace url and viewing the run.
# All done!

```

### examples/crewai/job_posting.py

```python
# Crew Job Posting

# First let's install the required packages
# %pip install -U 'crewai[tools]'
# Then import them
from crewai import Crew, Agent, Task
from crewai_tools.tools import WebsiteSearchTool, SerperDevTool, FileReadTool
import agentops
import os
from dotenv import load_dotenv
from textwrap import dedent

# Next, we'll set our API keys. There are several ways to do this, the code below is just the most foolproof way for the purposes of this notebook. It accounts for both users who use environment variables and those who just want to set the API Key here in this notebook.
# [Get an AgentOps API key](https://agentops.ai/settings/projects)
# 1. Create an environment variable in a .env file or other method. By default, the AgentOps `init()` function will look for an environment variable named `AGENTOPS_API_KEY`. Or...
# 2. Replace `<your_agentops_key>` below and pass in the optional `api_key` parameter to the AgentOps `init(api_key=...)` function. Remember not to commit your API key to a public repo!
load_dotenv()
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here")
os.environ["SERPER_API_KEY"] = os.getenv("SERPER_API_KEY", "your_serper_api_key_here")

# Initialize AgentOps client
agentops.init(
    auto_start_session=False, trace_name="CrewAI Job Posting", tags=["crewai", "job-posting", "agentops-example"]
)

web_search_tool = WebsiteSearchTool()
serper_dev_tool = SerperDevTool()
file_read_tool = FileReadTool(
    file_path="job_description_example.md",
    description="A tool to read the job description example file.",
)


class Agents:
    def research_agent(self):
        return Agent(
            role="Research Analyst",
            goal="Analyze the company website and provided description to extract insights on culture, values, and specific needs.",
            tools=[web_search_tool, serper_dev_tool],
            backstory="Expert in analyzing company cultures and identifying key values and needs from various sources, including websites and brief descriptions.",
            verbose=True,
        )

    def writer_agent(self):
        return Agent(
            role="Job Description Writer",
            goal="Use insights from the Research Analyst to create a detailed, engaging, and enticing job posting.",
            tools=[web_search_tool, serper_dev_tool, file_read_tool],
            backstory="Skilled in crafting compelling job descriptions that resonate with the company's values and attract the right candidates.",
            verbose=True,
        )

    def review_agent(self):
        return Agent(
            role="Review and Editing Specialist",
            goal="Review the job posting for clarity, engagement, grammatical accuracy, and alignment with company values and refine it to ensure perfection.",
            tools=[web_search_tool, serper_dev_tool, file_read_tool],
            backstory="A meticulous editor with an eye for detail, ensuring every piece of content is clear, engaging, and grammatically perfect.",
            verbose=True,
        )


class Tasks:
    def research_company_culture_task(self, agent, company_description, company_domain):
        return Task(
            description=dedent(
                f"""\
								Analyze the provided company website and the hiring manager's company's domain {company_domain}, description: "{company_description}". Focus on understanding the company's culture, values, and mission. Identify unique selling points and specific projects or achievements highlighted on the site.
								Compile a report summarizing these insights, specifically how they can be leveraged in a job posting to attract the right candidates."""
            ),
            expected_output=dedent(
                """\
								A comprehensive report detailing the company's culture, values, and mission, along with specific selling points relevant to the job role. Suggestions on incorporating these insights into the job posting should be included."""
            ),
            agent=agent,
        )

    def research_role_requirements_task(self, agent, hiring_needs):
        return Task(
            description=dedent(
                f"""\
								Based on the hiring manager's needs: "{hiring_needs}", identify the key skills, experiences, and qualities the ideal candidate should possess for the role. Consider the company's current projects, its competitive landscape, and industry trends. Prepare a list of recommended job requirements and qualifications that align with the company's needs and values."""
            ),
            expected_output=dedent(
                """\
								A list of recommended skills, experiences, and qualities for the ideal candidate, aligned with the company's culture, ongoing projects, and the specific role's requirements."""
            ),
            agent=agent,
        )

    def draft_job_posting_task(self, agent, company_description, hiring_needs, specific_benefits):
        return Task(
            description=dedent(
                f"""\
								Draft a job posting for the role described by the hiring manager: "{hiring_needs}". Use the insights on "{company_description}" to start with a compelling introduction, followed by a detailed role description, responsibilities, and required skills and qualifications. Ensure the tone aligns with the company's culture and incorporate any unique benefits or opportunities offered by the company.
								Specfic benefits: "{specific_benefits}"""
            ),
            expected_output=dedent(
                """\
								A detailed, engaging job posting that includes an introduction, role description, responsibilities, requirements, and unique company benefits. The tone should resonate with the company's culture and values, aimed at attracting the right candidates."""
            ),
            agent=agent,
        )

    def review_and_edit_job_posting_task(self, agent, hiring_needs):
        return Task(
            description=dedent(
                f"""\
								Review the draft job posting for the role: "{hiring_needs}". Check for clarity, engagement, grammatical accuracy, and alignment with the company's culture and values. Edit and refine the content, ensuring it speaks directly to the desired candidates and accurately reflects the role's unique benefits and opportunities. Provide feedback for any necessary revisions."""
            ),
            expected_output=dedent(
                """\
								A polished, error-free job posting that is clear, engaging, and perfectly aligned with the company's culture and values. Feedback on potential improvements and final approval for publishing. Formated in markdown."""
            ),
            agent=agent,
            output_file="job_posting.md",
        )

    def industry_analysis_task(self, agent, company_domain, company_description):
        return Task(
            description=dedent(
                f"""\
								Conduct an in-depth analysis of the industry related to the company's domain: "{company_domain}". Investigate current trends, challenges, and opportunities within the industry, utilizing market reports, recent developments, and expert opinions. Assess how these factors could impact the role being hired for and the overall attractiveness of the position to potential candidates.
								Consider how the company's position within this industry and its response to these trends could be leveraged to attract top talent. Include in your report how the role contributes to addressing industry challenges or seizing opportunities."""
            ),
            expected_output=dedent(
                """\
								A detailed analysis report that identifies major industry trends, challenges, and opportunities relevant to the company's domain and the specific job role. This report should provide strategic insights on positioning the job role and the company as an attractive choice for potential candidates."""
            ),
            agent=agent,
        )


tracer = agentops.start_trace(trace_name="CrewAI Job Posting", tags=["crew-job-posting-example", "agentops-example"])
tasks = Tasks()
agents = Agents()
company_description = "We are a software company that builds AI-powered tools for businesses."
company_domain = "https://www.agentops.ai"
hiring_needs = "We are looking for a software engineer with 3 years of experience in Python and Django."
specific_benefits = "We offer a competitive salary, health insurance, and a 401k plan."

# Create Agents
researcher_agent = agents.research_agent()
writer_agent = agents.writer_agent()
review_agent = agents.review_agent()

# Define Tasks for each agent
research_company_culture_task = tasks.research_company_culture_task(
    researcher_agent, company_description, company_domain
)
industry_analysis_task = tasks.industry_analysis_task(researcher_agent, company_domain, company_description)
research_role_requirements_task = tasks.research_role_requirements_task(researcher_agent, hiring_needs)
draft_job_posting_task = tasks.draft_job_posting_task(
    writer_agent, company_description, hiring_needs, specific_benefits
)
review_and_edit_job_posting_task = tasks.review_and_edit_job_posting_task(review_agent, hiring_needs)

# Instantiate the crew with a sequential process
crew = Crew(
    agents=[researcher_agent, writer_agent, review_agent],
    tasks=[
        research_company_culture_task,
        industry_analysis_task,
        research_role_requirements_task,
        draft_job_posting_task,
        review_and_edit_job_posting_task,
    ],
)

result = crew.kickoff()
print("Job Posting Creation Process Completed.")
print("Final Job Posting:")
print(result)

agentops.end_trace(tracer, end_state="Success")

# Let's check programmatically that spans were recorded in AgentOps
print("\n" + "=" * 50)
print("Now let's verify that our LLM calls were tracked properly...")
try:
    agentops.validate_trace_spans(trace_context=tracer)
    print("\n✅ Success! All LLM spans were properly recorded in AgentOps.")
except agentops.ValidationError as e:
    print(f"\n❌ Error validating spans: {e}")
    raise

```

### examples/langchain/langchain_examples.py

```python
# AgentOps Langchain Agent Implementation
#
# Using AgentOps monitoring with Langchain is simple. We've created a LangchainCallbackHandler that will do all of the heavy lifting!
#
# First let's install the required packages
# %pip install langchain
# %pip install langchain_openai
# %pip install agentops
# %pip install python-dotenv
# Then import them
import os
from langchain_openai import ChatOpenAI
from langchain.agents import tool, AgentExecutor, create_openai_tools_agent
from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate

# The only difference with using AgentOps is that we'll also import this special Callback Handler
from agentops.integration.callbacks.langchain import (
    LangchainCallbackHandler as AgentOpsLangchainCallbackHandler,
)

# Next, we'll set our API keys. There are several ways to do this, the code below is just the most foolproof way for the purposes of this notebook. It accounts for both users who use environment variables and those who just want to set the API Key here in this notebook.
#
# [Get an AgentOps API key](https://agentops.ai/settings/projects)
#
# 1. Create an environment variable in a .env file or other method. By default, the AgentOps `init()` function will look for an environment variable named `AGENTOPS_API_KEY`. Or...
#
# 2. Replace `<your_agentops_key>` below and pass in the optional `api_key` parameter to the AgentOps `init(api_key=...)` function. Remember not to commit your API key to a public repo!
load_dotenv()
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here")

# This is where AgentOps comes into play. Before creating our LLM instance via Langchain, first we'll create an instance of the AO LangchainCallbackHandler. After the handler is initialized, a session will be recorded automatically.
#
# Pass in your API key, and optionally any tags to describe this session for easier lookup in the AO dashboard.
agentops_handler = AgentOpsLangchainCallbackHandler(tags=["Langchain Example", "agentops-example"])

llm = ChatOpenAI(callbacks=[agentops_handler], model="gpt-3.5-turbo")

# You must pass in a callback handler to record your agent
llm.callbacks = [agentops_handler]

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant. Respond only in Spanish."),
        ("human", "{input}"),
        # Placeholders fill up a **list** of messages
        ("placeholder", "{agent_scratchpad}"),
        # ("tool_names", "find_movie")
    ]
)


# Agents generally use tools. Let's define a simple tool here. Tool usage is also recorded.
@tool
def find_movie(genre: str) -> str:
    """Find available movies"""
    if genre == "drama":
        return "Dune 2"
    else:
        return "Pineapple Express"


tools = [find_movie]

# For each tool, you need to also add the callback handler
for t in tools:
    t.callbacks = [agentops_handler]

# Add the tools to our LLM
llm_with_tools = llm.bind_tools([find_movie])

# Finally, let's create our agent! Pass in the callback handler to the agent, and all the actions will be recorded in the AO Dashboard
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)
agent_executor.invoke({"input": "What comedies are playing?"}, config={"callback": [agentops_handler]})

# ## Check your session
# Finally, check your run on [AgentOps](https://app.agentops.ai). You will see a session recorded with the LLM calls and tool usage.

# Let's check programmatically that spans were recorded in AgentOps
print("\n" + "=" * 50)
print("Now let's verify that our LLM calls were tracked properly...")
try:
    import agentops

    agentops.validate_trace_spans(trace_context=None)
    print("\n✅ Success! All LLM spans were properly recorded in AgentOps.")
except ImportError:
    print("\n❌ Error: agentops library not installed. Please install it to validate spans.")
except agentops.ValidationError as e:
    print(f"\n❌ Error validating spans: {e}")
    raise

```

### examples/README.md

# AgentOps Examples

This directory contains comprehensive examples demonstrating how to integrate AgentOps with various AI/ML frameworks, libraries, and providers. Each example is provided as a Jupyter notebook and a Python script with detailed explanations and code samples.

##  Directory Structure

- **[`ag2/`](https://github.com/AgentOps-AI/agentops/blob/main/ag2)** - Examples for AG2 (AutoGen 2.0) multi-agent conversations
  - `agentchat_with_memory` - Agent chat with persistent memory
  - `async_human_input` - Asynchronous human input handling
  - `tools_wikipedia_search` - Wikipedia search tool integration

- **[`anthropic/`](https://github.com/AgentOps-AI/agentops/blob/main/anthropic)** - Anthropic Claude API integration examples
  - `agentops-anthropic-understanding-tools` - Deep dive into tool usage
  - `anthropic-example-async` - Asynchronous API calls
  - `anthropic-example-sync` - Synchronous API calls
  - `antrophic-example-tool` - Tool calling examples
  - `README.md` - Detailed Anthropic integration guide

- **[`autogen/`](https://github.com/AgentOps-AI/agentops/blob/main/autogen)** - Microsoft AutoGen framework examples
  - `AgentChat` - Basic agent chat functionality
  - `MathAgent` - Mathematical problem-solving agent

- **[`crewai/`](https://github.com/AgentOps-AI/agentops/blob/main/crewai)** - CrewAI multi-agent framework examples
  - `job_posting` - Job posting automation workflow
  - `markdown_validator` - Markdown validation agent

- **[`gemini/`](https://github.com/AgentOps-AI/agentops/blob/main/gemini)** - Google Gemini API integration
  - `gemini_example` - Basic Gemini API usage with AgentOps

- **[`google_adk/`](https://github.com/AgentOps-AI/agentops/blob/main/google_adk)** - Google AI Development Kit examples
  - `human_approval` - Human-in-the-loop approval workflows

- **[`langchain/`](https://github.com/AgentOps-AI/agentops/blob/main/langchain)** - LangChain framework integration
  - `langchain_examples` - Comprehensive LangChain usage examples

- **[`litellm/`](https://github.com/AgentOps-AI/agentops/blob/main/litellm)** - LiteLLM proxy integration
  - `litellm_example` - Multi-provider LLM access through LiteLLM

- **[`openai/`](https://github.com/AgentOps-AI/agentops/blob/main/openai)** - OpenAI API integration examples
  - `multi_tool_orchestration` - Complex tool orchestration
  - `openai_example_async` - Asynchronous OpenAI API calls
  - `openai_example_sync` - Synchronous OpenAI API calls
  - `web_search` - Web search functionality

- **[`openai_agents/`](https://github.com/AgentOps-AI/agentops/blob/main/openai_agents)** - OpenAI Agents SDK examples
  - `agent_patterns` - Common agent design patterns
  - `agents_tools` - Agent tool integration
  - `customer_service_agent` - Customer service automation

- **[`smolagents/`](https://github.com/AgentOps-AI/agentops/blob/main/smolagents)** - SmolAgents framework examples
  - `multi_smolagents_system` - Multi-agent system coordination
  - `text_to_sql` - Natural language to SQL conversion

- **[`watsonx/`](https://github.com/AgentOps-AI/agentops/blob/main/watsonx)** - IBM Watsonx AI integration
  - `watsonx-streaming` - Streaming text generation
  - `watsonx-text-chat` - Text generation and chat completion
  - `watsonx-tokeniation-model` - Tokenization and model details
  - `README.md` - Detailed Watsonx integration guide

- **[`xai/`](https://github.com/AgentOps-AI/agentops/blob/main/xai)** - xAI (Grok) API integration
  - `grok_examples` - Basic Grok API usage
  - `grok_vision_examples` - Vision capabilities with Grok

### Utility Scripts

- **[`generate_documentation.py`](https://github.com/AgentOps-AI/agentops/blob/main/generate_documentation.py)** - Script to convert Jupyter notebooks to MDX documentation files
  - Converts notebooks from `examples/` to `docs/v2/examples/`
  - Handles frontmatter, GitHub links, and installation sections
  - Transforms `%pip install` commands to CodeGroup format

##  Prerequisites

1. **AgentOps Account**: Sign up at [agentops.ai](https://agentops.ai)
2. **Python Environment**: Python 3.10+ recommended
3. **API Keys**: Obtain API keys for the services you want to use

##  Documentation Generation

The `generate_documentation.py` script automatically converts these Jupyter notebook examples into documentation for the AgentOps website. It:

- Extracts notebook content and converts to Markdown
- Adds proper frontmatter and metadata
- Transforms installation commands into user-friendly format
- Generates GitHub links for source notebooks
- Creates MDX files in `docs/v2/examples/`

### Usage
```bash
python examples/generate_documentation.py examples/langchain/langchain_examples.ipynb
```

##  Contributing

When adding new examples:

1. Create a new subdirectory for the framework/provider
2. Include comprehensive Jupyter notebooks with explanations
3. Add a README.md if the integration is complex
4. Ensure examples are self-contained and runnable
5. Follow the existing naming conventions
6. Use the `generate_documentation.py` script to create documentation files
7. Add the example notebook to the main `README.md` for visibility
8. Add the generated documentation to the `docs/v2/examples/` directory for website visibility
9. Submit a pull request with a clear description of your changes

##  Additional Resources

- [AgentOps Documentation](https://docs.agentops.ai)
- [AgentOps Dashboard](https://app.agentops.ai)
- [GitHub Repository](https://github.com/AgentOps-AI/agentops)
- [Community Discord](https://discord.gg/agentops)

##  License

These examples are provided under the same license as the AgentOps project. See the main repository for license details.


