Metadata-Version: 2.4
Name: vectorshift
Version: 0.1.7
Summary: VectorShift Python SDK
Author: Alex Leonardi, Pratham Goyal, Eric Shen
Author-email: support@vectorshift.ai
Classifier: Programming Language :: Python :: 3
Description-Content-Type: text/markdown
Requires-Dist: networkx==3.1
Requires-Dist: tomli>=2.0.1
Requires-Dist: pytest>=7.0.0
Requires-Dist: pytest-asyncio>=0.23.0
Requires-Dist: bson>=0.5.10
Requires-Dist: black>=23.0.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: aiohttp>=3.8.0
Requires-Dist: requests>=2.32.5
Requires-Dist: python-dotenv>=0.9.9
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: requires-dist
Dynamic: summary

# VectorShift SDK
Python SDK in development for VS pipeline creation and interaction

## Documentation

The VectorShift SDK provides a Python interface for creating, managing, and executing AI pipelines on the VectorShift platform.

For comprehensive API documentation, visit: [https://docs.vectorshift.ai/api-reference/overview](https://docs.vectorshift.ai/api-reference/overview)

## Installation

```
pip install vectorshift
```


## Usage

### Authentication
Set api key in code
```python
vectorshift.api_key = 'sk-****'
```
or set as environement variable
```bash
export VECTORSHIFT_API_KEY='sk-***'
```


Create a new pipeline using the pipeline builder API
```python
import vectorshift
from vectorshift.pipeline import Pipeline

vectorshift.api_key = "your api key here"

pipeline = Pipeline.new(name="basic-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save()
```

The builder provides full IDE autocomplete for all 200+ node types. You can also still instantiate nodes directly:
```python
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

input_node = InputNode(node_name="input_0")

llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response
)

pipeline = Pipeline.new(
    name="basic-llm-pipeline",
    nodes=[input_node, llm_node, output_node]
)
```

Basic Rag Pipeline

```python
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, KnowledgeBaseNode, OutputNode, LlmNode
from vectorshift import KnowledgeBase

# Set API key
vectorshift.api_key = "your api key here"

# Create input node for user query
input_node = InputNode(
    node_name="Query",
)

# Fetch knowledge base
knowledge_base = KnowledgeBase.fetch(name="your knowledge base name here")

# Create knowledge base node to retrieve relevant documents
knowledge_base_node = KnowledgeBaseNode(
    query=input_node.text,
    knowledge_base=knowledge_base,
    format_context_for_llm=True,
)

# Create LLM node that uses both the query and retrieved documents
llm_node = LlmNode(
    system="You are a helpful assistant that answers questions based on the provided context documents.",
    prompt=f"Query: {input_node.text}\n\nContext: {knowledge_base_node.formatted_text}",
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

# Create output node for the LLM response
output_node = OutputNode(
    node_name="Response",
    value=llm_node.response
)

# Create the RAG pipeline
rag_pipeline = Pipeline.new(
    name="rag-pipeline",
    nodes=[input_node, knowledge_base_node, llm_node, output_node],
)
```


### Batch Mode (List Processing)
Use `pipeline.add_batch` to run a node in batch mode. The node will process lists of inputs in parallel.

```python
from vectorshift.pipeline import Pipeline

pipeline = Pipeline.new(name="batch-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="vec<string>")

llm = pipeline.add_batch(name="batch_llm").llm(
    provider="openai",
    model="gpt-4o-mini",
    prompt=inp.value,
    system="You are concise. Reply in one sentence.",
    stream=False
)

pipeline.save()

result = pipeline.run(inputs={
    "input_0": [
        "Say hello to New York",
        "Say hello to Tokyo",
        "Say hello to Lagos",
    ]
})
```

You can also set batch mode explicitly on any node:
```python
from vectorshift.pipeline import InputNode, PipelineNode, SplitTextNode, OutputNode, Pipeline

sub_pipeline = Pipeline.fetch(name="your sub pipeline")

input_node = InputNode(node_name="input_0")

split_text_node = SplitTextNode(
    node_name="split_text_node",
    text=input_node.text,
    delimiter="newline"
)

pipeline_node = PipelineNode(
    pipeline_id=sub_pipeline.id,
    node_name="sub_pipeline",
    input_0=split_text_node.processed_text,
    execution_mode="batch"
)

output_node = OutputNode(node_name="output_0", value=pipeline_node.output_0)

main_pipeline = Pipeline.new(
    name="batched-pipeline",
    nodes=[input_node, split_text_node, pipeline_node, output_node]
)
```

### Streaming

```python
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai", 
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response,
    output_type="stream<string>"
)

# Create and save the pipeline
pipeline = Pipeline.new(
    name="streaming-llm-pipeline-1",
    nodes=[input_node, llm_node, output_node]
)

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            import json
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue
```

### Async Usage

Call the async sdk methods by prefixing the sdk method with `a`. Here we can fetch a pipeline by name, run it with a particular input and await the pipeline results.
```python
import asyncio
import vectorshift

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode


vectorshift.api_key = "your api key here"


pipeline = Pipeline.fetch(name="your pipeline name here")

input_data = {"input_0": "Hello, how are you?"}

result  = asyncio.run(pipeline.arun(input_data))
print(result)
```

### Parallel Knowledge Base Upload
We can use the async methods to parallelize bulk upload of the files in a directory to a knowledge base. Here we have a script that takes in a vectorstore name and a local diretory to upload. 

```python
import asyncio
import os
import argparse
import vectorshift
from vectorshift.knowledge_base import KnowledgeBase, IndexingConfig
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()

def upload_documents(vectorstore_name, upload_dir, max_concurrent=16):
    vectorshift.api_key = 'your api key here'
    vectorstore = KnowledgeBase.fetch(name=vectorstore_name)

    num_files = sum([len(files) for r, d, files in os.walk(upload_dir)])
    print(f'Number of files in the upload directory: {num_files}')
    
    async def upload_document(semaphore, script_path, document_title, dirpath):
        async with semaphore:
            try:
                # Create indexing configuration
                indexing_config = IndexingConfig(
                    chunk_size=512,
                    chunk_overlap=0,
                    file_processing_implementation='Default',
                    index_tables=False,
                    analyze_documents=False
                )
                response = await vectorstore.aindex_document(
                    document_type='file',
                    document=script_path,
                    indexing_config=indexing_config
                )
                return f"Response for {document_title} in directory {dirpath}: {response}"
            except Exception as e:
                return f"Response for {document_title} in directory {dirpath}: Failed due to {e}"

    async def upload_all_documents():
        # Create semaphore to limit concurrent uploads
        semaphore = asyncio.Semaphore(max_concurrent)
        
        all_files = []
        for dirpath, dirnames, filenames in os.walk(upload_dir):
            for script_file in filenames:
                script_path = os.path.join(dirpath, script_file)
                document_title = os.path.basename(script_path)
                all_files.append((script_path, document_title, dirpath))
        
        # Create tasks for all files
        tasks = []
        for script_path, document_title, dirpath in all_files:
            task = upload_document(semaphore, script_path, document_title, dirpath)
            tasks.append(task)
        
        # Process all tasks with progress bar
        with tqdm(total=len(all_files), desc="Uploading documents") as pbar:
            for coro in asyncio.as_completed(tasks):
                result = await coro
                if "Failed due to" in result:
                    print(f"Error: {result}")
                else:
                    print(result)
                pbar.update(1)
    
    asyncio.run(upload_all_documents())

if __name__ == "__main__":
    # Setup command line argument parsing
    parser = argparse.ArgumentParser(description='Upload documents to a VectorStore.')
    parser.add_argument('--vectorstore_name', type=str, required=True, help='Name of the VectorStore to upload documents to.')
    parser.add_argument('--upload_dir', type=str, required=True, help='Directory path of documents to upload.')
    parser.add_argument('--max_concurrent', type=int, default=16, help='Maximum number of concurrent uploads.')
    args = parser.parse_args()

    upload_documents(args.vectorstore_name, args.upload_dir, args.max_concurrent)
```

## Pipeline Management

### Fetch by Name or ID
```python
# Fetch by name
pipeline = Pipeline.fetch(name="My Pipeline")

# Fetch by ID
pipeline = Pipeline.fetch(id="pipeline-id-123")

# Fetch a specific version or branch
pipeline = Pipeline.fetch(id="pipeline-id-123", version="1.2.0")
pipeline = Pipeline.fetch(id="pipeline-id-123", branch_id="branch-id")

# Fetch from another user/org
pipeline = Pipeline.fetch(name="Shared Pipeline", username="other_user", org_name="my-org")
```

### List Pipelines
```python
pipelines = Pipeline.list()

# With filtering and pagination
pipelines = Pipeline.list(folder_id="folder-123", include_shared=True, offset=0, limit=20)
```

### Duplicate a Pipeline
```python
new_pipeline = existing_pipeline.duplicate(name="My Pipeline Copy")
```

## Version Control
The SDK supports version control with semantic versioning and deploy controls via `pipeline.save()`.

```python
from vectorshift.pipeline import Pipeline, BumpLevel

pipeline = Pipeline.fetch(name="basic-llm-pipeline")

# Save and deploy (deploy=True is the default)
pipeline.save()

# Save with a version bump
pipeline.save(bump=BumpLevel.MINOR, description="Updated model to gpt-4o")

# Save without deploying
pipeline.save(deploy=False)
```

`BumpLevel` supports `PATCH`, `MINOR`, and `MAJOR` semantic version bumps.

To update a pipeline's nodes, modify them in code and save:
```python
from vectorshift.pipeline import Pipeline

pipeline = Pipeline.fetch(name="basic-llm-pipeline")
pipeline.nodes.clear()

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save(bump=BumpLevel.MINOR, description="Upgraded model to gpt-4o")
```

## Conditional Logic

Use `ConditionNode` to add branching logic to your pipelines.

```python
from vectorshift.pipeline import (
    Pipeline, InputNode, OutputNode, TextNode, MergeNode,
    ConditionNode, Operator, LogicalOp, Clause, ConditionGroup,
)

score_input = InputNode(node_name="score", input_type="int32")

condition = ConditionNode(
    node_name="grade_check",
    conditions=[
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 90)],
        ),
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 50)],
        ),
    ],
)

# Each condition group creates a path output (path_0, path_1, ..., path_else)
msg_excellent = TextNode(node_name="msg_a", text="Excellent!", dependencies=[condition.path_0])
msg_passing = TextNode(node_name="msg_b", text="You passed.", dependencies=[condition.path_1])
msg_fail = TextNode(node_name="msg_c", text="Did not pass.", dependencies=[condition.path_else])

merge = MergeNode(
    node_name="merge",
    function="first",
    type="string",
    fields=[msg_excellent.text, msg_passing.text, msg_fail.text],
)

output = OutputNode(node_name="result", output_type="string", value=merge.output)

pipeline = Pipeline.new(
    name="conditional-pipeline",
    nodes=[score_input, condition, msg_excellent, msg_passing, msg_fail, merge, output],
)
```

Conditions support 20+ operators including comparisons (`EQUAL`, `GREATER_THAN`, etc.), text checks (`TEXT_CONTAINS`, `TEXT_STARTS_WITH`, etc.), and unary checks (`IS_EMPTY`, `IS_TRUE`, etc.). Multiple clauses can be combined with `LogicalOp.AND` / `LogicalOp.OR`.

## Chatbots

Run a chatbot. This code allows you to chat with your chatbot in your terminal. Since we provide conversation_id = None in the initial run the chatbot will start a new conversation. Note how by entering the conversation id returned by chatbot.run we can continue the conversation and have the chatbot see previous repsponses. 
```python 

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id)
    conversation_id = response['conversation_id']
    print(response['output_message'])
```

Streaming Chatbot


```python

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    conversation_id = None
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if chunk_str.startswith('data: '):
                json_str = chunk_str[6:]  # Remove 'data: ' prefix
                import json
                data = json.loads(json_str)
                if data.get('conversation_id'):
                    conversation_id = data.get('conversation_id')
                elif data.get('output_value') and data.get('type') == 'stream':
                    print(data.get('output_value', ''), end="", flush=True)
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    print()  # Add newline after streaming is complete
```

Chatbot File Upload

```python
from vectorshift import Chatbot
import os
import json
chatbot = Chatbot.fetch(name='your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    
    # Handle file upload
    if user_input.startswith("add_file "):
        file_path = user_input[9:]  # Remove "add_file " prefix
        if os.path.isfile(file_path):
            try:
                upload_response = chatbot.upload_files(file_paths=[file_path], conversation_id=conversation_id)
                conversation_id = upload_response.get('conversation_id')
                print(f"File uploaded successfully: {upload_response.get('uploaded_files', [])}")
            except Exception as e:
                print(f"Error uploading file: {e}")
        else:
            print(f"File not found: {file_path}")
        continue
    
    # Handle text input with streaming
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if not chunk_str.startswith('data: '):
                continue
                
            data = json.loads(chunk_str[6:])  # Remove 'data: ' prefix
            
            # Update conversation_id if present
            if data.get('conversation_id'):
                conversation_id = data.get('conversation_id')
            
            # Print streaming output
            if data.get('output_value') and data.get('type') == 'stream':
                print(data.get('output_value', ''), end="", flush=True)
                
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    
    print()  # Add newline after streaming
```

## Integrations

### Managing Integrations

The `Integration` class provides methods to list, fetch, check status, and delete your integrations programmatically.

#### List all integrations
```python
from vectorshift.integrations import Integration

# List all integrations
integrations = Integration.list()
for integration in integrations:
    print(f"{integration.name} ({integration.type}) - {integration.status}")

# Filter by type
slack_integrations = Integration.list(type="slack")
```

#### Fetch a specific integration
```python
# Fetch by ID
integration = Integration.fetch(id="your_integration_id")
print(integration.name, integration.type, integration.status)
```

#### Check integration status
```python
integration = Integration.fetch(id="your_integration_id")
status = integration.get_status()
print(f"Status: {status}")
```

#### Async support
All methods have async variants: `alist()`, `afetch()`, `aget_status()`.
```python
import asyncio
from vectorshift.integrations import Integration

async def main():
    integrations = await Integration.alist()
    if integrations:
        integration = await Integration.afetch(id=integrations[0].id)
        status = await integration.aget_status()
        print(f"{integration.name}: {status}")

asyncio.run(main())
```

#### Integration properties
| Property | Description |
|---|---|
| `id` | Unique integration identifier |
| `name` | Display name (account name) |
| `type` | Integration type (e.g., `slack`, `gmail`, `google_drive`) |
| `status` | Current status |
| `created_date` | When the integration was created |
| `authorized_scopes` | List of authorized OAuth scopes |
| `allowed_actions` | List of permitted actions |
| `unhealthy` | Whether the integration is in an unhealthy state |

#### Using an integration in a pipeline
Integration nodes accept an integration object. You can fetch an existing integration or create one directly with the id.

```python
from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Gmail Message to Send')

# Fetch integration to use in a pipeline
integration = Integration.fetch(id='your_integration_id')
# Or create directly with the id
# integration = Integration(id='your_integration_id')
gmail_node = IntegrationGmailNode(
    integration = integration.to_dict(),
    node_name="gmail_node",
    action="send_email",
    recipients="recipient@gmail.com",
    subject="Test Email from Pipeline",
    body=input_node.text,
    format="text"
)

gmail_pipeline = Pipeline.new(
    name="gmail-pipeline",
    nodes=[input_node, gmail_node]
)
```

To use the slack node specify the channel and team id accessible from the slack app
```python
from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Slack Message to Send')


slack_node = IntegrationSlackNode(
    node_name="slack_node",
    integration = Integration(id='your_integration_id').to_dict(),
    action = 'send_message',
    channel = 'your_channel_id',
    message = input_node.text,
    team = 'your_team_id'
)

slack_pipeline = Pipeline.new(
    name = 'slack-pipeline',
    nodes = [input_node, slack_node]
)
```
