Metadata-Version: 2.4
Name: slimrpc
Version: 0.2.0
Summary: RPC over SLIM Library
License-Expression: Apache-2.0
Requires-Python: >=3.9
Requires-Dist: async-timeout>=5.0.1
Requires-Dist: googleapis-common-protos>=1.70.0
Requires-Dist: grpcio>=1.74.0
Requires-Dist: slim-bindings>=0.7.1
Description-Content-Type: text/markdown

# SLIM RPC (SLIM Remote Procedure Call)

SLIMRCP, or SLIM Remote Procedure Call, is a library designed to enable
Protocol Buffers (protobuf) RPC over SLIM (Secure Low-latency Inter-process
Messaging). This is analogous to gRPC, which leverages HTTP/2 as its underlying
transport layer for protobuf RPC.

To use SLIM RPC you can compile you protobuf file using the [SLIM RPC compiler](https://github.com/agntcy/slim/tree/slim-v0.5.0/data-plane/slimrpc-compiler) and
use the generated code to create you application.

## Protobuf example

```
syntax = "proto3";

package example_service;

service Test {
  rpc ExampleUnaryUnary(ExampleRequest) returns (ExampleResponse);
  rpc ExampleUnaryStream(ExampleRequest) returns (stream ExampleResponse);
  rpc ExampleStreamUnary(stream ExampleRequest) returns (ExampleResponse);
  rpc ExampleStreamStream(stream ExampleRequest) returns (stream ExampleResponse);
}

message ExampleRequest {
  string example_string = 1;
  int64  example_integer = 2;
}

message ExampleResponse {
  string example_string = 1;
  int64  example_integer = 2;
}
```

The following code is based on autogenerated files produced by compiling the protobuf example using the
SRPC compiler. Specifically, the files `example_pb2_slimrpc.py` and `example_pb2.py` in the imports
were generated automatically.

## Server Usage

```python
import asyncio
import logging
from collections.abc import AsyncIterable

from slimrpc.context import Context
from slimrpc.examples.simple.types.example_pb2 import ExampleRequest, ExampleResponse
from slimrpc.examples.simple.types.example_pb2_slimrpc import (
    TestServicer,
    add_TestServicer_to_server,
)
from slimrpc.server import Server

logger = logging.getLogger(__name__)


class TestService(TestServicer):
    async def ExampleUnaryUnary(
        self, request: ExampleRequest, context: Context
    ) -> ExampleResponse:
        logger.info(f"Received unary-unary request: {request}")

        return ExampleResponse(example_integer=1, example_string="Hello, World!")

    async def ExampleUnaryStream(
        self, request: ExampleRequest, context: Context
    ) -> AsyncIterable[ExampleResponse]:
        logger.info(f"Received unary-stream request: {request}")

        # generate async responses stream
        for i in range(5):
            logger.info(f"Sending response {i}")
            yield ExampleResponse(example_integer=i, example_string=f"Response {i}")

    async def ExampleStreamUnary(
        self, request_iterator: AsyncIterable[ExampleRequest], context: Context
    ) -> ExampleResponse:
        logger.info(f"Received stream-unary request: {request_iterator}")

        async for request in request_iterator:
            logger.info(f"Received stream-unary request: {request}")
        response = ExampleResponse(
            example_integer=1, example_string="Stream Unary Response"
        )
        return response

    async def ExampleStreamStream(
        self, request_iterator: AsyncIterable[ExampleRequest], context: Context
    ) -> AsyncIterable[ExampleResponse]:
        """Missing associated documentation comment in .proto file."""
        raise NotImplementedError("Method not implemented!")


def create_server(
    local: str,
    slim: dict,
    enable_opentelemetry: bool = False,
    shared_secret: str = "",
) -> Server:
    """
    Create a new SRPC server instance.
    """
    server = Server(
        local=local,
        slim=slim,
        enable_opentelemetry=enable_opentelemetry,
        shared_secret=shared_secret,
    )

    return server


async def amain() -> None:
    server = create_server(
        local="agntcy/grpc/server",
        slim={
            "endpoint": "http://localhost:46357",
            "tls": {
                "insecure": True,
            },
        },
        enable_opentelemetry=False,
        shared_secret="my_shared_secret",
    )

    # Create RPCs
    add_TestServicer_to_server(
        TestService(),
        server,
    )

    await server.run()


def main() -> None:
    """
    Main entry point for the server.
    """
    logging.basicConfig(level=logging.DEBUG)
    try:
        asyncio.run(amain())
    except KeyboardInterrupt:
        print("Server interrupted by user.")
```


## Client Usage

```python
import asyncio
import logging
from collections.abc import AsyncGenerator

import slimrpc
from slimrpc.examples.simple.types.example_pb2 import ExampleRequest
from slimrpc.examples.simple.types.example_pb2_slimrpc import TestStub

logger = logging.getLogger(__name__)


async def amain() -> None:
    channel_factory = slimrpc.ChannelFactory(
        slim_app_config=slimrpc.SLIMAppConfig(
            identity="agntcy/grpc/client",
            slim_client_config={
                "endpoint": "http://localhost:46357",
                "tls": {
                    "insecure": True,
                },
            },
            enable_opentelemetry=False,
            shared_secret="my_shared_secret",
        ),
    )

    channel = channel_factory.new_channel(remote="agntcy/grpc/server")

    # Stubs
    stubs = TestStub(channel)

    # Call method
    try:
        request = ExampleRequest(example_integer=1, example_string="hello")
        response = await stubs.ExampleUnaryUnary(request, timeout=2)

        logger.info(f"Response: {response}")

        responses = stubs.ExampleUnaryStream(request, timeout=2)
        async for resp in responses:
            logger.info(f"Stream Response: {resp}")

        async def stream_requests() -> AsyncGenerator[ExampleRequest, None]:
            for i in range(10):
                yield ExampleRequest(example_integer=i, example_string=f"Request {i}")

        response = await stubs.ExampleStreamUnary(stream_requests(), timeout=2)
        logger.info(f"Stream Unary Response: {response}")
    except asyncio.TimeoutError:
        logger.error("timeout while waiting for response")

    await asyncio.sleep(1)


def main() -> None:
    """
    Main entry point for the server.
    """
    logging.basicConfig(level=logging.INFO)
    try:
        asyncio.run(amain())
    except KeyboardInterrupt:
        print("Server interrupted by user.")
```