Metadata-Version: 2.4
Name: aio-service-caller
Version: 0.1.2
Summary: An async service invocation framework with registry, discovery, load balancing, and declarative HTTP client features.
Author-email: ferdinand <gongjpa@gmail.com>
License: MIT
Keywords: aio,config,load balancing,nacos,service discovery,service registry
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.12
Requires-Dist: aiohttp>=3.8.0
Requires-Dist: nacos-sdk-python>=2.0.9
Requires-Dist: pydantic>=2.10.6
Provides-Extra: config
Requires-Dist: yamlpyconfig>=0.0.9; extra == 'config'
Provides-Extra: dev
Requires-Dist: aresponses>=3.0.0; extra == 'dev'
Requires-Dist: black>=25.11.0; extra == 'dev'
Requires-Dist: mypy>=1.18.2; extra == 'dev'
Requires-Dist: pytest-asyncio>=1.3.0; extra == 'dev'
Requires-Dist: pytest-cov>=7.0.0; extra == 'dev'
Requires-Dist: pytest>=9.0.1; extra == 'dev'
Requires-Dist: ruff>=0.14.5; extra == 'dev'
Description-Content-Type: text/markdown

# Aio Service Caller

A lightweight and high-performance asynchronous service invocation framework for Python, providing service registration, discovery, load balancing, and declarative HTTP client capabilities.

## 1. Features

* 🔍 **Service Discovery**: Built-in Nacos integration for automatic service registration and discovery
* ⚖️ **Load Balancing**: Multiple strategies included (round robin, random, weighted random, weighted round robin)
* 🔌 **Interceptor Mechanism**: Spring-style interceptors supporting pre-request, post-response, and exception handling
* 🚀 **High Performance Async I/O**: Powered by `aiohttp` with connection pooling for efficient HTTP requests

---

## 2. Installation

```bash
pip install aio-service-caller[config]
```

> The optional `config` extra automatically installs [`yamlpyconfig`](https://pypi.org/project/yamlpyconfig/).
> It is recommended to use [`yamlpyconfig`](https://pypi.org/project/yamlpyconfig/) to load service registry, discovery, and caller configuration from local configuration files combined with environment variables.

---

## 3. Quick Start

### 3.1. Add configuration

Create a config file such as `application.yaml` or `application-{profile}.yaml` inside your config directory (e.g., `/config`):

> For details about `yamlpyconfig`, refer to the [documentation](https://pypi.org/project/yamlpyconfig/).

```yaml
# Service caller settings
service-caller:
  # Supported load balancer types: round_robin, random, weight_round_robin, weight_random
  lb-type: round_robin           # Load balancer type
  connection-timeout: 6          # Connection timeout in seconds
  read-timeout: 6                # Read timeout in seconds
  connection-pool-size: 100      # Connection pool size

# Nacos configuration
app-registry:
  nacos:
    server-addr: "192.168.30.36:9090"   # Nacos server address
    namespace: "dev"                    # Optional namespace
    cluster: "DEFAULT"                  # Optional cluster
    group: "DEFAULT_GROUP"              # Default group name
    ip: "127.0.0.1"                     # Local service IP
    port: 9999                          # Local service port
    app-name: "my-app"                  # Application name
    username: "nacos"                   # Optional username
    password: "Y789uioJKL"              # Optional password
    weight: 1.0                         # Service instance weight
```

---

### 3.2. Create `ServiceManager` and call other services in the cluster

Creating a `ServiceManager` serves two purposes:

1. Automatically registers the current service to Nacos using the provided configuration
2. Allows you to call other service instances registered under the same namespace (real HTTP requests are handled internally via `aiohttp`)

Example:

```python
@pytest.mark.asyncio
async def test_get_service_instances_with_nacos(self):
    # Load configuration
    async with ConfigManager("./") as config_manager:
        # Creating the ServiceManager automatically registers the current service into Nacos
        async with ServiceManager(
            config_manager=config_manager,
            interceptors=[LoggingInterceptor(), AuthInterceptor(token="123456"), MetricsInterceptor()]
        ) as manager:

            # Option 1: Directly get the parsed result
            result = await manager.get("other-app", "/hello")
            assert result == {"status": "OK"}

            # Option 2: Get the raw aiohttp response object
            async with manager.raw_get("other-app", "/hello") as response:
                assert response.status == 200
                result = await response.json()
                assert result == {"status": "OK"}
```

You may pass any argument supported by `aiohttp` (except `method` and `url`) to the request—
such as `headers`, `params`, `data`, `json`, `timeout`, etc.

---

### 3.3. Custom Interceptors

When using `ServiceManager`, interceptors will be executed at the appropriate time during the call lifecycle.
You can implement your own interceptors and pass them via the `interceptors` parameter to implement custom behavior.

Example: a logging interceptor

```python
class LoggingInterceptor(IServiceInterceptor):
    """Logging interceptor"""

    def __init__(self, log_request: bool = True, log_response: bool = True):
        """
        Args:
            log_request: Whether to log request details
            log_response: Whether to log response info
        """
        self.log_request = log_request
        self.log_response = log_response

    @property
    def name(self) -> str:
        return "LoggingInterceptor"

    async def before_request(self, context: RequestContext) -> None:
        """Log before sending the request"""
        if self.log_request:
            logger.info(
                f"→ {context.method} {context.service_name}{context.path} | "
                f"Headers: {context.kwargs.get('headers', {})} | "
                f"Params: {context.kwargs.get('params', {})}"
            )

    async def after_response(self, context: RequestContext) -> None:
        """Log after receiving the response"""
        if self.log_response and context.response:
            logger.info(
                f"← {context.method} {context.service_name}{context.path} | "
                f"Resolved URL: {context.resolved_url} | "
                f"Status: {context.response.status} | "
                f"Duration: {context.duration:.3f}s | "
                f"Size: {len(str(context.result)) if context.result else 0} bytes"
            )

    async def handle_exception(self, context: RequestContext) -> None:
        """Log when an exception occurs"""
        if context.exception:
            logger.error(
                f"✗ {context.method} {context.service_name}{context.path} | "
                f"Exception: {context.exception} | "
                f"Duration: {context.duration:.3f}s"
            )

    @property
    def order(self) -> int:
        return 99999
```

Another example: adding authentication headers before the request is sent

```python
class AuthInterceptor(IServiceInterceptor):
    """Authentication interceptor"""

    def __init__(self, token: str, header_name: str = "Authorization", prefix: str = "Bearer "):
        """
        Args:
            token: Authentication token
            header_name: HTTP header key
            prefix: Token prefix
        """
        self.token = token
        self.header_name = header_name
        self.prefix = prefix

    @property
    def name(self) -> str:
        return "AuthInterceptor"

    async def before_request(self, context: RequestContext) -> None:
        """Add auth headers before request"""
        if "headers" not in context.kwargs:
            context.kwargs["headers"] = {}

        context.kwargs["headers"][self.header_name] = f"{self.prefix}{self.token}"

    async def after_response(self, context: RequestContext) -> None:
        """Handle authentication-related responses"""
        if context.response and context.response.status == 401:
            logger.warning(f"Authentication failed for {context.service_name}{context.path}")

    async def handle_exception(self, context: RequestContext) -> None:
        """Authentication exception handling"""
        pass  # Let upper layers handle the error
```

Interceptor execution order is determined by the `order` property.
Default is `0`, and **smaller values execute earlier**.
