Metadata-Version: 2.4
Name: ai-datasource-lib
Version: 0.2.11
Summary: Data source sync strategies for vector DBs
Home-page: https://github.com/Hacker062008/ai-datasource-lib
Author: Akash Kumar Maurya
Author-email: mrelectronicsarduino@gmail.com
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fastapi>=0.68.0
Requires-Dist: pydantic>=1.10.0
Requires-Dist: uvicorn>=0.17.0
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license-file
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# datasourcelib

**Version**: 0.2.11  
**Last Updated**: May 2026

## Overview

`datasourcelib` is an enterprise-grade Python library for unified data extraction and synchronization across heterogeneous sources. It provides a pluggable architecture for connecting to multiple data systems, executing configurable sync strategies, and indexing results into Azure Cognitive Search with vector support.

**Key Use Cases:**
- Unified data synchronization from SQL Server, Azure DevOps, SharePoint, and other sources
- Batch indexing of structured and unstructured data to Azure Cognitive Search
- Scheduled and on-demand data refresh operations
- Vector-enabled RAG (Retrieval Augmented Generation) pipelines

---

## Table of Contents

1. [Getting Started](#getting-started)
2. [Architecture](#architecture)
3. [Core Concepts](#core-concepts)
4. [API Reference](#api-reference)
5. [Data Source Configuration](#data-source-configuration)
6. [Error Handling](#error-handling)
7. [Best Practices](#best-practices)
8. [Troubleshooting](#troubleshooting)
9. [Extension Guide](#extension-guide)

---

## Getting Started

### Prerequisites

- Python 3.8 or higher
- Virtual environment manager (`venv` or `conda`)
- Network access to configured data sources
- For Azure Search: Azure Cognitive Search service with API key
- For vector embeddings: Azure OpenAI deployment with embedding model

### Installation

#### 1. Create and activate Python virtual environment

```bash
# Using venv
python -m venv .venv

# Activate (Windows)
.venv\Scripts\activate

# Activate (macOS/Linux)
source .venv/bin/activate
```

#### 2. Install dependencies

```bash
pip install -r requirements.txt
```

Core dependencies:
- `pydantic>=1.10.0` — data validation
- `pyodbc>=4.0.0` — SQL Server connectivity
- `azure-search-documents>=11.5.0` — Azure Cognitive Search
- `azure-storage-blob>=12.12.0` — Azure Blob Storage
- `azure-identity>=1.12.0` — Azure authentication
- `openai>=0.27.0` — Azure OpenAI embeddings
- `requests>=2.26.0` — HTTP client
- `pandas>=1.3.0` — data manipulation
- `fastapi>=0.68.0`, `uvicorn>=0.17.0` — API server (optional)

#### 3. Install the package in editable mode

```bash
pip install -e .
```

### Quick Start

#### Python API

```python
from datasourcelib.core.sync_manager import SyncManager

manager = SyncManager()

# Execute a full sync from SQL Server to Azure Search
result = manager.execute_sync(
    sync_type="full",
    source_type="sql",
    source_config={
        "sql_server": "my-sql-server.database.windows.net",
        "sql_database": "my_database",
        "sql_username": "user@domain",
        "sql_password": "password",
        "sql_query": "SELECT id, name, description FROM dbo.documents",
        "sql_is_onprem": False,
        "sql_windows_auth": False,
    },
    vector_db_config={
        "aisearch_endpoint": "https://my-search.search.windows.net",
        "aisearch_api_key": "key",
        "aisearch_index_name": "my-index",
        "vectorization": True,
        "embedding_endpoint": "https://my-oai.openai.azure.com/",
        "embedding_key": "oai-key",
        "embedding_deployment": "text-embedding-ada-002",
        "embedding_api_version": "2023-05-15",
        "vector_config": {"dimensions": 1536},
    }
)

print(f"Status: {result['status']}")
print(f"Loaded: {result.get('loaded_records', 0)} records")
print(f"Message: {result['message']}")
```

#### FastAPI Server

```bash
# Start the API server
uvicorn datasourcelib.api.routes:app --host 0.0.0.0 --port 8000

# Test the /sync endpoint
curl -X POST http://localhost:8000/sync \
  -H "Content-Type: application/json" \
  -d '{
    "sync_type": "full",
    "source_type": "sql",
    "source_config": {...},
    "vector_db_config": {...}
  }'
```

---

## Architecture

### System Components

```
┌──────────────────────────────────────────────────────────┐
│                      Application Layer                    │
│                    (Your Application)                     │
└─────────────────────────┬──────────────────────────────────┘
                          │
┌─────────────────────────▼──────────────────────────────────┐
│                   SyncManager (Orchestrator)               │
│  - Receives sync_type, source_type, configurations        │
│  - Normalizes and validates parameters                    │
│  - Coordinates data source and strategy instances         │
└─────────────────────────┬──────────────────────────────────┘
                          │
        ┌─────────────────┼──────────────────┐
        ▼                 ▼                  ▼
   ┌─────────────┐  ┌──────────────┐  ┌──────────────┐
   │ DataSource  │  │ Sync         │  │ Vector DB    │
   │ (Abstract)  │  │ Strategy     │  │ Config       │
   └──────┬──────┘  │ (Abstract)   │  │              │
          │         └──────┬───────┘  └──────────────┘
    ┌─────┴──────────────────────────────┐
    │                                    │
 ┌──▼────┐ ┌─────────┐ ┌────────┐ ┌────▼───┐
 │  SQL  │ │Azure    │ │Share   │ │Blob    │ ...
 │Server │ │DevOps   │ │Point   │ │Storage │
 └───────┘ └─────────┘ └────────┘ └────────┘
```

### Data Flow

1. **Configuration Phase**: Client provides `source_config` and `vector_db_config`
2. **Initialization**: `SyncManager` resolves the appropriate `DataSource` and `SyncStrategy` classes
3. **Validation**: Each component validates its configuration
4. **Connection**: Data source establishes connection
5. **Fetch**: Data source retrieves records (optionally filtered, aggregated)
6. **Index**: Strategy uploads records to Azure Search with optional vector embeddings
7. **Completion**: Result summary returned to caller

---

## Core Concepts

### DataSource

Abstract base class for all data source implementations. Each concrete source implements:

- **`validate_config()`**: Verifies required configuration keys are present
- **`connect()`**: Establishes connection and returns `True`/`False`
- **`disconnect()`**: Closes connections and cleans up resources
- **`fetch_data(query: str, **kwargs)`**: Returns `List[Dict[str, Any]]` of records

### SyncStrategy

Abstract base class for synchronization patterns. Each strategy implements:

- **`validate()`**: Checks preconditions before execution
- **`sync(**kwargs)`**: Executes the sync operation, returns result dict

Built-in strategies:
- **`FullLoadStrategy`**: Replace entire index with fresh data (fully functional)
- **`IncrementalLoadStrategy`**: Sync only changed records since last run (scaffold)
- **`TimeRangeLoadStrategy`**: Sync records within a date range (scaffold)
- **`DailyLoadStrategy`**: Scheduled daily synchronization (scaffold)
- **`OnDemandLoadStrategy`**: User-triggered arbitrary loads (scaffold)

### SyncManager

Central orchestrator that:
- Maps `sync_type` and `source_type` strings to strategy and source classes
- Normalizes inputs (case-insensitive enum matching)
- Handles configuration validation and error reporting
- Returns standardized result dictionaries

---

## API Reference

### SyncManager.execute_sync()

```python
def execute_sync(
    sync_type: Union[str, SyncType],
    source_type: Union[str, DataSourceType],
    source_config: Dict[str, Any],
    vector_db_config: Dict[str, Any],
    **kwargs
) -> Dict[str, Any]:
    """
    Execute a data synchronization operation.
    
    Parameters
    ----------
    sync_type : str or SyncType
        Synchronization strategy: 'full', 'incremental', 'timerange', 'daily', 'ondemand'
        Case-insensitive; matched by enum name or value.
    
    source_type : str or DataSourceType
        Data source connector: 'sql', 'azure_devops', 'sharepoint', 'blob_storage', 'Dataverse'
        Case-insensitive; matched by enum name or value.
    
    source_config : Dict[str, Any]
        Configuration dictionary for the chosen data source.
        See Data Source Configuration section for required/optional keys.
    
    vector_db_config : Dict[str, Any]
        Configuration for Azure Cognitive Search and embedding service.
        Required for full strategy with vector support.
    
    **kwargs
        Additional parameters passed to data source fetch_data() or strategy sync().
        Examples: 'last_sync' for incremental, 'start'/'end' for timerange.
    
    Returns
    -------
    Dict[str, Any]
        Result dictionary with keys:
        - status: 'success' or 'failed'
        - message: Human-readable status message
        - started_at: ISO 8601 timestamp (UTC)
        - finished_at: ISO 8601 timestamp (UTC)
        - loaded_records: (FullLoadStrategy) Count of records indexed
    
    Raises
    ------
    None
        All errors are caught and returned in the result dict with status='failed'.
    
    Examples
    --------
    >>> result = manager.execute_sync(
    ...     sync_type='full',
    ...     source_type='sql',
    ...     source_config={'sql_server': '...', ...},
    ...     vector_db_config={'aisearch_endpoint': '...', ...}
    ... )
    >>> if result['status'] == 'success':
    ...     print(f"Loaded {result['loaded_records']} records")
    ... else:
    ...     print(f"Error: {result['message']}")
    """
```

### Result Format

```python
{
    "status": "success" or "failed",
    "message": "Human-readable status or error message",
    "started_at": "2026-05-31T14:30:00.123456+00:00",  # ISO 8601
    "finished_at": "2026-05-31T14:31:00.456789+00:00",  # ISO 8601
    "loaded_records": 1000  # FullLoadStrategy only
}
```

---

## Data Source Configuration

### SQL Server

#### Overview

Connects to SQL Server via ODBC with support for Windows authentication, SQL authentication, and aggregation/grouping of results.

#### Required Configuration

| Key | Type | Description |
|-----|------|-------------|
| `sql_server` | str | Hostname or FQDN of SQL Server instance (e.g., `server.database.windows.net`) |
| `sql_database` | str | Database name |
| `sql_query` | str | T-SQL query text to execute |

#### Authentication

Choose one:

1. **Windows Integrated Authentication**:
   ```python
   source_config = {
       "sql_server": "my-server.database.windows.net",
       "sql_database": "my_db",
       "sql_query": "SELECT * FROM documents",
       "sql_windows_auth": True,
       "sql_is_onprem": False,
   }
   ```

2. **SQL Authentication**:
   ```python
   source_config = {
       "sql_server": "my-server.database.windows.net",
       "sql_database": "my_db",
       "sql_query": "SELECT * FROM documents",
       "sql_username": "user@domain",
       "sql_password": "SecurePassword123!",
       "sql_windows_auth": False,
       "sql_is_onprem": False,
   }
   ```

#### Optional Configuration

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `sql_windows_auth` | bool | `False` | Use integrated Windows authentication |
| `sql_is_onprem` | bool | `False` | On-premises deployment; relaxes encryption requirements |
| `sql_aggregation_field` | str | `None` | Column name to group results by |
| `sql_aggregation_row_format` | str | `None` | Template string for individual rows: `{i}`, `{field_name}` |
| `sql_aggregation_header_format` | str | `None` | Template string for group headers: `{group_value}`, `{count}`, `{plural}` |
| `sql_aggregation_sort_by` | list[str] \| str | `None` | Column(s) to sort aggregated results |

#### Return Format

```python
# Without aggregation: list of dicts
[
    {"id": 1, "name": "Item A", "description": "..."},
    {"id": 2, "name": "Item B", "description": "..."},
]

# With aggregation: formatted summaries
[
    "Item A has 5 rows as following: Row 1: ... Row 2: ...",
    "Item B has 3 rows as following: Row 1: ...",
]
```

#### Error Handling

| Condition | Behavior |
|-----------|----------|
| Missing required keys | `validate_config()` returns `False` |
| ODBC driver not found | `connect()` raises `RuntimeError` |
| Invalid connection string | `connect()` returns `False`; error logged |
| Query execution failure | `fetch_data()` raises exception |
| Connection lost during fetch | Retries up to 3 times before failing |

#### Example: Complete SQL Configuration

```python
result = manager.execute_sync(
    sync_type="full",
    source_type="sql",
    source_config={
        "sql_server": "contoso-sql.database.windows.net",
        "sql_database": "sales_db",
        "sql_query": """
            SELECT 
                product_id AS id,
                product_name AS name,
                category,
                description
            FROM products
            WHERE status = 'active'
        """,
        "sql_username": "dbadmin",
        "sql_password": "P@ssw0rd!",
        "sql_windows_auth": False,
        "sql_is_onprem": False,
        "sql_aggregation_field": "category",
        "sql_aggregation_row_format": "{product_name}: {description}",
        "sql_aggregation_header_format": "{group_value} has {count} product{plural}",
        "sql_aggregation_sort_by": ["product_name"],
    },
    vector_db_config={...}
)
```

---

### Azure DevOps

#### Overview

Extracts work items via WIQL queries or wiki page content via Graph API. Supports nested wiki hierarchies and full work item details including custom fields.

#### Required Configuration

| Key | Type | Description |
|-----|------|-------------|
| `ado_organization` | str | Azure DevOps organization name (from URL: `dev.azure.com/{organization}`) |
| `ado_personal_access_token` | str | PAT with `read:wiki`, `vso.work` scopes |

#### Optional Configuration

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `ado_project` | str | `None` | Project name; if omitted, fetches all projects |
| `ado_query_id` | str | `None` | WIQL saved query ID (UUID) |
| `ado_download_wiki` | bool \| str | `False` | Download wiki pages instead of work items |
| `api_version` | str | `7.1` | Azure DevOps API version |
| `method` | str | `GET` | HTTP method for WIQL queries |

#### Behavior

**Mode 1: WIQL Query** (default, `ado_download_wiki=False`)

Returns work item details as list of dicts:

```python
[
    {
        "id": 12345,
        "type": "User Story",
        "title": "Feature: New Dashboard",
        "status": "Active",
        "assigned_to": "john@contoso.com",
        "created": "2026-01-15T10:30:00Z",
        "changed_date": "2026-05-30T14:20:00Z",
        "tags": "ui, frontend",
        "project": "MyProject",
        "release_type": "Sprint",
        "target_date": "2026-06-15",
        "description": "Build a new analytics dashboard...",
        "full": "User Story ID 12345... [full text summary]"
    }
]
```

**Mode 2: Wiki Download** (`ado_download_wiki=True`)

Returns wiki pages as list of dicts:

```python
[
    {
        "display_name": "Getting Started",
        "url": "https://dev.azure.com/org/proj/_wiki/wikis/...",
        "content": "# Getting Started\n\nThis page explains...",
        "wiki": "Project Wiki",
        "project": "MyProject"
    }
]
```

#### Error Handling

| Condition | Behavior |
|-----------|----------|
| Invalid PAT | 401 HTTP error; authentication failure logged |
| Query ID not found | Empty list returned |
| Project not found | Connection succeeds; wiki list is empty |
| Network timeout | Exception raised after 30s timeout |

#### Example

```python
# Fetch work items from a WIQL query
result = manager.execute_sync(
    sync_type="full",
    source_type="azure_devops",
    source_config={
        "ado_organization": "contoso",
        "ado_personal_access_token": "***",
        "ado_project": "Platform",
        "ado_query_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
        "api_version": "7.1",
    },
    vector_db_config={...}
)

# Fetch wiki pages
result = manager.execute_sync(
    sync_type="full",
    source_type="azure_devops",
    source_config={
        "ado_organization": "contoso",
        "ado_personal_access_token": "***",
        "ado_project": "Platform",
        "ado_download_wiki": True,
    },
    vector_db_config={...}
)
```

---

### SharePoint

#### Overview

Accesses SharePoint sites via Microsoft Graph API. Requires two app registrations with appropriate delegated permissions.

#### Required Configuration

| Key | Type | Description |
|-----|------|-------------|
| `sp_site_url` | str | SharePoint site URL (e.g., `https://contoso.sharepoint.com/sites/MyTeam`) |
| `sp_site_display_name` | str | Display name of the site (used for Graph lookup) |
| `sp_master_config` | dict | App registration credentials with `Sites.Read.All` |
| `sp_client_config` | dict | App registration credentials with `Site.Selected` |

#### Master Config (Sites.Read.All scope)

```python
"sp_master_config": {
    "sp_client_id": "00000000-0000-0000-0000-000000000001",
    "sp_client_secret": "ClientSecret123!",
    "sp_tenant_id": "00000000-0000-0000-0000-000000000002",
    "sp_domain_name": "contoso.com"
}
```

#### Client Config (Site.Selected scope)

```python
"sp_client_config": {
    "sp_client_id": "00000000-0000-0000-0000-000000000003",
    "sp_client_secret": "ClientSecret456!",
}
```

#### Optional Configuration

| Key | Type | Description |
|-----|------|-------------|
| `sp_pull_sub_items_enabled` | bool \| str | Download nested folder items |
| `sp_list_columns_to_keep` | list[str] | Filter columns from list items |

#### Error Handling

| Condition | Behavior |
|-----------|----------|
| Invalid credentials | 401 error; logs and returns `False` |
| Site not found | Graph lookup fails; partial connection state |
| Missing domain_name | Connection may fail silently |

---

### Azure Blob Storage

#### Overview

Lists or downloads blobs from Azure Storage containers. Supports both connection string and account/credential authentication.

#### Required Configuration (Choose one)

**Option A: Connection String**
```python
source_config = {
    "container": "documents",
    "connection_string": "DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net"
}
```

**Option B: Account URL + Credential**
```python
source_config = {
    "container": "documents",
    "account_url": "https://myaccount.blob.core.windows.net",
    "credential": "AccountKey..."
}
```

#### Optional Configuration

| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `blob_prefix` | str | `None` | Filter blobs by prefix (e.g., `docs/2026/`) |
| `download` | bool | `False` | Download blob content vs. metadata |

#### Return Format

**Without download** (metadata only):
```python
[
    {"name": "doc1.pdf", "size": 102400, "last_modified": "2026-05-30T..."},
    {"name": "doc2.pdf", "size": 204800, "last_modified": "2026-05-29T..."},
]
```

**With download** (content included):
```python
[
    {"name": "doc1.pdf", "content": b'\x89PNG\r\n...'},  # binary content
]
```

---

### Dataverse

#### Overview

Connects to Microsoft Dynamics 365 / Power Platform Dataverse via two mechanisms: TDS (ODBC) or OData Web API.

#### Mode: TDS (ODBC)

**Option 1: Connection String**
```python
source_config = {
    "dv_tds_connection_string": "Driver={ODBC Driver 18 for SQL Server};Server=...;Database=...;Trusted_Connection=yes;"
}
```

**Option 2: Components**
```python
source_config = {
    "dv_tds_server": "org.crm.dynamics.com",
    "dv_tds_database": "orgdb_GUID",
    "dv_tds_username": "user@contoso.com",
    "dv_tds_password": "Password123!",
    "dv_tds_windows_auth": False,
    "dv_tds_is_onprem": False,
}
```

#### Mode: Web API

**With Client Credentials**
```python
source_config = {
    "dv_webapi_url": "https://org.crm.dynamics.com",
    "dv_webapi_client_id": "00000000-0000-0000-0000-000000000001",
    "dv_webapi_client_secret": "ClientSecret123!",
    "dv_webapi_tenant_id": "00000000-0000-0000-0000-000000000002",
}
```

**With Managed Identity**
```python
source_config = {
    "dv_webapi_url": "https://org.crm.dynamics.com",
    "dv_webapi_managed_identity_auth": True,
}
```

#### Optional Configuration

| Key | Type | Default |
|-----|------|---------|
| `dv_tds_timeout` | int | `30` |
| `dv_mode` | str | `"tds"` |

---

## Error Handling

### Exception Strategy

The library follows a **defensive error handling** pattern:

1. **Configuration errors** are caught in `validate_config()` and return `False`
2. **Connection errors** are caught in `connect()` and return `False`
3. **Execution errors** are caught in `sync()` and returned in the result dict with `status="failed"`
4. **Unhandled exceptions** are logged and reported in the result message

### HTTP Error Codes (API)

| Status | Cause |
|--------|-------|
| 400 | Invalid `sync_type` in request |
| 500 | Unhandled exception during sync |

### Common Error Messages

| Message | Cause | Resolution |
|---------|-------|-----------|
| "Invalid source_type. Permitted names: ..." | Unknown source type | Check spelling; use one of: `sql`, `azure_devops`, `sharepoint`, `blob_storage`, `Dataverse` |
| "No source registered for ..." | Source class not found | Ensure source is implemented and registered in SyncManager |
| "Invalid data source configuration" | Config validation failed | Review required fields for chosen source |
| "Strategy validation failed" | Strategy preconditions not met | Check strategy-specific requirements |
| "Vector DB config invalid for Azure Search indexer" | Azure Search config missing keys | Provide `aisearch_endpoint`, `aisearch_index_name`, `aisearch_api_key` |

---

## Best Practices

### 1. Configuration Management

- **Never hardcode secrets**. Use environment variables or secure vaults:
  ```python
  import os
  source_config = {
      "sql_username": os.environ["SQL_USER"],
      "sql_password": os.environ["SQL_PASSWORD"],
  }
  ```

- **Validate credentials** before production deployment
- **Use connection pooling** for repeated syncs

### 2. Query Optimization

- **Limit result sets** in SQL queries to avoid memory pressure:
  ```sql
  SELECT TOP 10000 * FROM large_table WHERE modified > @last_sync
  ```

- **Use indexes** on filtered columns
- **Batch large syncs** into multiple smaller runs

### 3. Error Recovery

- **Implement retry logic** for transient failures (network, timeouts)
- **Log all sync results** for audit and debugging
- **Monitor sync duration** to detect performance regressions

### 4. Security

- **Use TLS/HTTPS** for all connections
- **Apply principle of least privilege**: limit PATs, managed identities to required scopes
- **Rotate credentials** regularly
- **Audit access** to source systems via connection logs

### 5. Performance

- **Profile queries** with Azure SQL Query Store
- **Use aggregation** to reduce record count and vector embedding costs
- **Schedule syncs** during off-peak hours for large datasets
- **Monitor vector DB costs** if using embeddings

---

## Troubleshooting

### Common Issues

#### 1. SQL Connection Fails

**Symptom**: `status: "failed"`, message contains "ODBC Error"

**Diagnosis**:
```python
# Test connection manually
source = SQLDataSource(config)
if not source.connect():
    print("Connection failed")
    # Check: firewall, credentials, server name, ODBC driver
```

**Solutions**:
- Verify server name, database, credentials
- Check firewall rules allow client IP
- Install ODBC driver: `python -m pip install pyodbc`, then `pyodbc.drivers()`
- For on-premises, ensure VPN/network access

#### 2. Azure Search Indexing Fails

**Symptom**: `status: "failed"`, message mentions "Azure Search"

**Diagnosis**:
```python
# Verify Azure Search config
from datasourcelib.indexes.azure_search_index import AzureSearchIndexer
indexer = AzureSearchIndexer(vector_db_config)
if not indexer.validate_config():
    print("Invalid config")
```

**Solutions**:
- Verify endpoint URL, API key, index name
- Confirm index exists in Azure portal
- Check API key permissions (must have "Index Data Contributor" role)
- Ensure vector dimensions match embedding model (e.g., 1536 for `text-embedding-ada-002`)

#### 3. No Records Returned

**Symptom**: `status: "success"`, `loaded_records: 0`

**Diagnosis**:
```python
# Test data source directly
source = SQLDataSource(config)
source.connect()
data = source.fetch_data()
print(f"Records fetched: {len(data)}")
```

**Solutions**:
- Review query filters (e.g., `WHERE` clauses)
- Verify data exists in source system
- Check date ranges for incremental syncs
- Inspect error logs for silent failures

#### 4. Timeout Errors

**Symptom**: Message contains "timeout"

**Solutions**:
- Increase query timeout (SQL: 30s default, configurable)
- Reduce batch size
- Check network latency to source
- Optimize query (add indexes, reduce result set)

---

## Extension Guide

### Adding a New Data Source

1. **Create a new file** `src/datasourcelib/datasources/my_source.py`:

```python
from typing import Any, Dict, List
from datasourcelib.datasources.datasource_base import DataSourceBase
from datasourcelib.utils.logger import get_logger

logger = get_logger(__name__)

class MyDataSource(DataSourceBase):
    """Custom data source implementation."""
    
    def validate_config(self) -> bool:
        """Validate configuration keys."""
        try:
            required = ["my_server", "my_api_key"]
            for key in required:
                if key not in self.config:
                    raise KeyError(f"Missing {key}")
            return True
        except Exception as ex:
            logger.error("Config validation failed: %s", ex)
            return False
    
    def connect(self) -> bool:
        """Establish connection."""
        try:
            # Initialize connection
            self._connected = True
            logger.info("MyDataSource connected")
            return True
        except Exception as ex:
            logger.exception("Connection failed")
            return False
    
    def disconnect(self) -> None:
        """Close connection."""
        self._connected = False
        logger.info("MyDataSource disconnected")
    
    def fetch_data(self, query: str = None, **kwargs) -> List[Dict[str, Any]]:
        """Fetch data from source."""
        if not self._connected:
            self.connect()
        # Fetch and return list of dicts
        return [{"id": 1, "name": "Item"}]
```

2. **Register in SyncManager** (`src/datasourcelib/core/sync_manager.py`):

```python
from datasourcelib.datasources.my_source import MyDataSource

class SyncManager:
    _datasource_map = {
        # ... existing ...
        DataSourceType.MY_SOURCE: MyDataSource,
    }
```

3. **Add enum value** (`src/datasourcelib/datasources/datasource_types.py`):

```python
class DataSourceType(str, Enum):
    MY_SOURCE = "my_source"
```

### Adding a New Sync Strategy

1. **Create** `src/datasourcelib/strategies/my_strategy.py`:

```python
from datasourcelib.core.sync_base import SyncBase
from datasourcelib.utils.logger import get_logger
from typing import Dict, Any
from datetime import datetime, timezone

logger = get_logger(__name__)

class MyStrategy(SyncBase):
    """Custom sync strategy."""
    
    def validate(self) -> bool:
        return bool(self.data_source and self.data_source.validate_config())
    
    def sync(self, **kwargs) -> Dict[str, Any]:
        started_at = datetime.now(timezone.utc).isoformat()
        try:
            logger.info("MyStrategy executing")
            data = self.data_source.fetch_data(**kwargs)
            # Process and sync
            finished_at = datetime.now(timezone.utc).isoformat()
            return {
                "status": "success",
                "message": f"Processed {len(data)} records",
                "started_at": started_at,
                "finished_at": finished_at
            }
        except Exception as ex:
            logger.exception("Strategy failed")
            finished_at = datetime.now(timezone.utc).isoformat()
            return {
                "status": "failed",
                "message": str(ex),
                "started_at": started_at,
                "finished_at": finished_at
            }
```

2. **Register in SyncManager**:

```python
from datasourcelib.strategies.my_strategy import MyStrategy

class SyncManager:
    _strategy_map = {
        SyncType.MY_TYPE: MyStrategy,
    }
```

---

## Performance Characteristics

| Operation | Typical Duration | Bottleneck |
|-----------|------------------|-----------|
| SQL 10K records → Azure Search | 30–60 sec | Network + embedding generation |
| Azure DevOps 500 work items | 10–15 sec | API rate limits |
| SharePoint 1K files (no download) | 20–30 sec | Graph API pagination |
| Blob Storage 500 blobs (metadata) | 5–10 sec | Network |
| Vector embedding (1K records) | 2–3 min | Azure OpenAI quota |

---

## Support and Contribution

For issues, feature requests, or contributions, contact the development team or refer to the project repository.

**Version**: 0.1.15  
**Last Updated**: May 2026
