Metadata-Version: 2.3
Name: qena-shared-lib
Version: 0.1.24
Summary: A shared tools for other services
Requires-Dist: fastapi[all]==0.127.1
Requires-Dist: prometheus-client==0.22.1
Requires-Dist: prometheus-fastapi-instrumentator==7.0.2
Requires-Dist: punq==0.7.0
Requires-Dist: pydantic-core==2.33.2
Requires-Dist: pydantic==2.11.10
Requires-Dist: starlette==0.49.3
Requires-Dist: typing-extensions==4.14.1
Requires-Dist: aiokafka==0.12.0 ; extra == 'all'
Requires-Dist: cronsim==2.6 ; extra == 'all'
Requires-Dist: jwt==1.3.1 ; extra == 'all'
Requires-Dist: passlib[bcrypt]==1.7.4 ; extra == 'all'
Requires-Dist: pika==1.3.2 ; extra == 'all'
Requires-Dist: pymongo==4.15.5 ; extra == 'all'
Requires-Dist: redis==7.1.0 ; extra == 'all'
Requires-Dist: aiokafka==0.12.0 ; extra == 'kafka'
Requires-Dist: pymongo==4.15.5 ; extra == 'mongodb'
Requires-Dist: pika==1.3.2 ; extra == 'rabbitmq'
Requires-Dist: redis==7.1.0 ; extra == 'redis'
Requires-Dist: cronsim==2.6 ; extra == 'scheduler'
Requires-Dist: jwt==1.3.1 ; extra == 'security'
Requires-Dist: passlib[bcrypt]==1.7.4 ; extra == 'security'
Requires-Python: >=3.10
Provides-Extra: all
Provides-Extra: kafka
Provides-Extra: mongodb
Provides-Extra: rabbitmq
Provides-Extra: redis
Provides-Extra: scheduler
Provides-Extra: security
Description-Content-Type: text/markdown

# Qena shared lib

A shared tools for other services. It includes.

- FastAPI app builder
- A wrapper around fastapi to make it class based.
- RabbitMQ utility class to listen , respond , publish and make rpc request.
- Remote logging
    - Logstash utility class to log message in `ecs` ( elastic common schema ).
- A simple task scheduler , to schedule task to run in specific time.
- Background task runner.
- Security tools ( password hasher , jwt , acl ).
- IOC container to manager dependencies used across fastapi , rabbitmq manager and schedule manager.
- Kafka producer and consumer wrapper.
- Mongodb client wrapper , with repository and index manager.
- Redis wrapper with cache and distributed lock manager.

## Installation

It is prefered to use [astral.sh / uv](https://docs.astral.sh/uv) as a package manager.

``` sh
$ uv add qena-shared-lib[all]
```
to install all extras , or specific extras `kafka` , `rabbitmq` , `scheduler` , `security` , `redis` or `mongodb`.

## Usage

- [Environment variables](#environment-variables)
- [Http](#http)
- [Lifespan](#lifespan)
- [Dependencies](#dependencies)
- [Controllers](#controllers)
- [Routers](#routers)
- [Remote logging](#remote-logging)
    - [Logstash](#logstash)
- [Rabbitmq](#rabbitmq)
    - [Publisher](#publisher)
    - [RPC client](#rpc-client)
    - [Flow control](#flow-control)
    - [Rpc reply](#rpc-reply)
    - [Retry consumer](#retry-consumer)
- [Scheduler](#scheduler)
- [Background](#background)
- [Security](#security)
    - [Password hasher](#password-hasher)
    - [JWT](#jwt)
    - [ACL](#acl)
- [Kafka](#kafka)
    - [Producer](#producer)
    - [Consumer](#consumer)
- [Mongodb](#mongodb)
    - [Aggregation](#aggregation)
    - [Index](#index)
    - [Crud](#crud)
- [Redis](#redis)
    - [Cache](#cache)
    - [Distribute lock](#distribute-lock)

### Environment variables

- `QENA_SHARED_LIB_LOGGING_LOGGER_NAME` root logger name.
- `QENA_SHARED_LIB_SECURITY_UNAUTHORIZED_RESPONSE_CODE` an integer response on an authorized access of resource.
- `QENA_SHARED_LIB_SECURITY_TOKEN_HEADER` to header key for jwt token.

### Http

To create fastapi app.

``` py
from qena_shared_lib.application import Builder, Environment


def main() -> FastAPI:
    builder = (
        Builder()
        .with_title("Qena shared lib")
        .with_description("A shared tools for other services.")
        .with_version("0.1.0")
        .with_environment(Environment.PRODUCTION)
        .with_default_exception_handlers()
    )

    app = builder.build()

    return app
```

To run app

``` sh
$ uvicorn --factory main:main
```

### Lifespan

``` py
from contextlib import asynccontextmanager

from fastapi import FastAPI


@asynccontextmanager
def lifespan(app: FastAPI):
    ...

    yield

    ...


def main() -> FastAPI:
    ...

    builder.with_lifespan(lifespan)

    ...
```

### Dependencies

``` py
class EmailService:
    def __init__(self):
        ...


class Database:
    def __init__(self):
        ...


def main() -> FastAPI:
    ...

    builder.with_singleton(EmailService)
    builder.with_transient(Database)

    ...
```

### Controllers

``` py
from qena_shared_lib.http import ControllerBase, api_controller, post


@api_controller("/users")
class UserController(ControllerBase):

    def __init__(self, email_service: EmailService):
        self._email_service = email_service

    @post()
    async def send_email(self, message: str):
        await self._email_service.send(message)


def main() -> FastAPI:
    ...

    builder.with_controllers(UserController)

    ...
```

### Routers

``` py
from fastapi import APIRouter

from qena_shared_lib.dependencies.http import DependsOn


router = APIRouter(prefix="/auth")


@router.post("")
async def login(
    db: Annotated[Database, DependsOn(Database)],
    username: str,
    password: str
):
    ...


def main() -> FastAPI:
    ...

    builder.with_routers(router)

    ...
```

To enable metrics.

``` py
def main() -> FastAPI:
    ...

    builder.with_metrics()

    ...
```

### Remote logging

#### Logstash

``` py
from qena_shared_lib.remotelogging import BaseRemoteLogSender
from qena_shared_lib.remotelogging.logstash import HTTPSender, # TCPSender


@asynccontextmanager
async def lifespan(app: FastAPI):
    remote_logger = get_service(BaseRemoteLogSender)

    await remote_logger.start()

    yield

    await remote_logger.stop()


def main() -> FastAPI:
    ...

    remote_logger = HTTPSender(
        service_name="qena-shared-lib",
        url="http://127.0.0.1:18080",
        user="logstash",
        password="logstash",
    )
    # or
    # remote_logger = TCPSender(
    #   service_name="qena-shared-lib",
    #   host="127.0.0.1",
    #   port=18090
    # )
    builder.with_singleton(
        service=BaseRemoteLogSender,
        instance=remote_logger,
    )

    ...


@router.get("")
def log_message(
    remote_logger: Annotated[
        BaseRemoteLogSender,
        DependsOn(BaseRemoteLogSender),
    ],
    message: str,
):
    remote_logger.info(message)
```

### Rabbitmq

To create rabbitmq connection manager.

``` py
from qena_shared_lib.rabbitmq import ListenerBase, consume, consumer


@asynccontextmanager
async def lifespan(app: FastAPI):
    rabbitmq = get_service(RabbitMqManager)

    await rabbitmq.connect()

    yield

    rabbitmq.disconnect()


@consumer("UserQueue")
class UserConsumer(ListenerBase):

    def __init__(self, db: Database):
        self._db = db

    @consume()
    async def store_user(self, user: User):
        await self._db.save(user)


def main() -> FastAPI:
    ...

    rabbitmq = RabbitMqManager(
        remote_logger=remote_logger,
        container=builder.container,
    )

    rabbitmq.init_default_exception_handlers()
    rabbitmq.include_listener(UserConsumer)
    builder.add_singleton(
        service=RabbitMqManager,
        instance=rabbitmq,
    )

    ...
```

#### Publisher

``` py
@router.post("")
async def store_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOn(RabbitMqManager)
    ],
    user: User,
)
    publisher = rabbitmq.publisher("UserQueue")

    await publisher.publish(user)
    # await publisher.publish_as_arguments(user)
```

#### RPC client

``` py
@router.get("")
async def get_user(
    rabbitmq: Annotated[
        RabbitMqManager,
        DependsOn(RabbitMqManager)
    ],
    user_id: str,
)
    rpc_client = rabbitmq.rpc_client("UserQueue")

    user = await rpc_client.call(user_id)
    # user = await rpc_client.call_with_arguments(user_id)

    return user
```

#### Flow control

``` py
from qena_shared_lib.rabbitmq import ... , ListenerContext


@consumer("UserQueue")
class UserConsumer(ListenerBase):

    @consume()
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.flow_control.request(10)

        ...

```

#### Rpc reply

Optionally it is possible to reply to rpc calls, through.

``` py
from qena_shared_lib.rabbitmq import ... , rpc_worker


@rpc_worker("UserQueue")
class UserWorker(ListenerBase):

    @execute()
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.rpc_reply.reply("Done")

        ...
```

#### Retry consumer

Consumer can retry to consumer a message in an event of failure.

``` py
from qena_shared_lib.rabbitmq import (
    BackoffRetryDelay,
    FixedRetryDelay,
    RabbitMqManager,
    RetryDelayJitter,
    RetryPolicy,
)


@consumer(
    queue="UserQueue",
    # can be defined for consumer of specific queue
    retry_policy=RetryPolicy(
        exceptions=(AMQPError,),
        max_retry=5,
        retry_delay_strategy=FixedRetryDelay(
            retry_delay=2
        ),
        retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
    )
)
class UserConsumer(ListenerBase):

    @consume(
        # for specific target
        retry_policy=RetryPolicy(
            exceptions=(AMQPError,),
            max_retry=5,
            retry_delay_strategy=FixedRetryDelay(
                retry_delay=2
            ),
            retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
        )
    )
    async def store_user(self, ctx: ListenerContext, user: User):
        ...

        await ctx.flow_control.request(10)

        ...


def main() -> FastAPI:
    ...

    rabbitmq = RabbitMqManager(
        remote_logger=remote_logger,
        container=builder.container,
        # or globally for all consumers
        listener_global_retry_policy=RetryPolicy(
            exceptions=(AMQPError,),
            max_retry=10,
            retry_delay_strategy=BackoffRetryDelay(
                multiplier=1.5, min=2, max=10
            ),
            retry_delay_jitter=RetryDelayJitter(min=0.5, max=5.0),
            match_by_cause=True,
        ),
    )

    rabbitmq.include_listener(UserConsumer)
    builder.add_singleton(
        service=RabbitMqManager,
        instance=rabbitmq,
    )
```



### Scheduler

``` py
from qena_shared_lib.scheduler import (
    ScheduleManager,
    # Scheduler,
    SchedulerBase,
    schedule,
    scheduler,
)


@asynccontextmanager
async def lifespan(app: FastAPI):
    schedule_manager = get_service(ScheduleManager)

    rabbitmq.start()

    yield

    schedule_manager.stop()


@scheduler()
class TaskScheduler(SchedulerBase):

    def __init__(self, db: Database)

    @schedule("* * * * *")
    def do_task(
        self,

    ):
        ...
# or
# scheduler = Scheduler()

# @scheduler.schedule("* * * * *")
# def do_task(
#     db: Annotated[Database, DependsOn(Database)]
# ):
#     ...


def main() -> FastAPI:
    ...

    schedule_manager = ScheduleManager(
        remote_logger=remote_logger,
        container=builder.container
    )

    schedule_manager.include_scheduler(TaskScheduler)
    builder.with_singleton(
        service=ScheduleManager,
        instance=schedule_manager,
    )

    ...
```

### Background

``` py
from qena_shared_lib.background import Background


@asynccontextmanager
async def lifespan(app: FastAPI):
    background = get_service(Background)

    background.start()

    yield

    background.stop()


def main() -> FastAPI:
    ...

    builder.with_singleton(
        service=BaseRemoteLogSender,
        instance=remote_logger,
    )
    builder.with_singleton(Background)

    ...


async def data_processor(data: Data):
    ...


@router.get("")
async def process_data(
    background: Annotated[
        Background,
        DependsOne(Background)
    ],
    data: Data
)
    background.add_task(BackgroundTask(data_processor, data))
```

### Security

#### Password hasher

``` py
from qena_shared_lib.security import PasswordHasher


@api_controller("/users")
class UserController(ControllerBase):

    def __init__(self, password_hasher: PasswordHasher):
        self._password_hasher = password_hasher

    @post()
    async def signup(self, user: User):
        await self._password_hasher.hash(user.password)

    @post()
    async def login(self, user: User):
        await self._password_hasher.verify(user.password)


def main() -> FastAPI:
    ...

    builder.with_singleton(PasswordHasher)
    builder.with_controllers([
        UserController
    ])

    ...
```

#### JWT

``` py
from qena_shared_lib.security import JwtAdapter


@ApiController("/users")
class UserController(ControllerBase):

    def __init__(
        self,

        ...

        jwt: JwtAdapter,
    ):
        ...

        self._jwt = jwt

    @post()
    async def login(self, user: User):
        payload = { ... }

        await self._jwt.encode(payload)

    @post
    async def verifiy(self, token: str):
        await self._jwt.decode(token)


def main() -> FastAPI:
    ...

    builder.with_singleton(JwtAdapter)
    builder.with_controllers([
        UserController
    ])

    ...
```

#### ACL

``` py
from qena_shared_lib.security import Authorization


@api_controller("/users")
class UserController(ControllerBase):

    @post()
    async def get_user(
        self,
        user: Annotated[
            UserInfo,
            Authorization(
                user_type="ADMIN",
                persmissions=[
                    "READ"
                ],
            )
        ]
    ):
        ...


@router.get("")
async def get_users(
    user: Annotated[
        UserInfo,
        Authorization("ADMIN")
    ]
)
    ...
```

### Kafka

``` py
from qena_shared_lib.kafka import KafkaManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    kafka = get_service(KafkaManager)

    await kafka.connect()

    yield

    await kafka.disconnect()


def main() -> FastAPI:
    ...

    kafka = KafkaManager(
        remote_logger=...,
        bootstrap_servers="127.0.0.1:9092",
    )

    builder.with_singleton(
        service=KafkaManager,
        instance=kafka,
    )

    ...
```

#### Producer

``` py
class UserService:
    def __init__(self, kafka: KafkaManager):
        self._kafka = kafka

    async def create_user(self):
        ...

        async with await self._kafka.producer("user") as producer:
            await producer.send(key="some_key", value=user)

        ...
```

#### Consumer

``` py
from qena_shared_lib.kafka import (
    ConsumerBase,
    consume,
    consumer,
)


@consumer(["user"])
class USerConsumer(ConsumerBase):
    def __init__(self, user_service: UserService):
        self._user_service = user_service

    @consume()
    async def user_created(self, key: Any | None, value: Any | None):
        ...

        await self._user_service.create_user(...)

        ...
```

### Mongodb

``` py
from qena_shared_lib.mongodb import MongoDBManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    db = get_service(MongoDBManager)

    await db.connect()

    yield

    await db.disconnect()


def main() -> FastAPI:
    ...

    db = MongoDBManager(
        connection_string="mongodb://127.0.0.1:27017",
        db="userDb"
    )

    builder.with_singleton(
        service=MongoDBManager,
        instance=db
    )

    ...
```

#### Crud

``` py
from qena_shared_lib.mongodb import (
    Document,
    MongoDBManager,
    ProjectedDocument,
    RepositoryBase,
)


class User(Document):
    __collection_name__ = "users"

    full_name: str
    phone: str


class FullNameProjectedUser(ProjectedDocument):
    full_name: str


class UserRepository(RepositoryBase[User]):
    pass


class UserService:
    def __init__(self, user_repository: UserRepository):
        self._user_repository = user_repository

    async def add_user(self):
        await self._user_repository.insert(
            User(
                full_name="user one",
                phone="+251900000000"
            )
        )

    async def get_user(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}
        )

    async def get_user_fullname(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}, projection=FullNameProjectedUser
        )

    async def update_user(self):
        user = await self._user_repository.find_by_filter(
            filter={"phone": "+251900000000"}, projection
        )
        user.phone = "+251900000001"

        await user_repository.replace(user)
```

#### Aggregation

``` py
class AggregatedUser(AggregatedDocument):
    __pipeline__ = [
        {"$match": {"phone": {"$in": ["+251900000000", "+251900000001"]}}},
        {"$project": {"fullName": True}},
    ]

    full_name: str

class UserService:
    ...

    async def get_user_fullnames(self):
        users = user_repository.aggregate(aggregation=AggregatedUser)

    ...
```

#### Index

``` py
from qena_shared_lib.mongodb import Document, IndexManager, IndexModel


class User(Document):
    __collection_name__ = "users"
    __indexes__ = [IndexModel("phone")]

    full_name: str
    phone: str


async def manage_indexes():
    ...

    index_manager = IndexManager(db=db, documents=[User])

    await index_manager.create_indexes

    ...

    await index_manager.drop_indexes()

    ...
```

### Redis

``` py
from qena_shared_lib.redis import RedisManager


@asynccontextmanager
async def lifespan(app: FastAPI):
    redis_manager = get_service(RedisManager)

    await dredis_managerb.connect()

    yield

    await redis_manager.disconnect()


def main() -> FastAPI:
    ...

    redis_manager = RedisManager("redis://127.0.0.1:6379")

    builder.with_singleton(
        service=RedisManager,
        instance=redis_manager
    )

    ...
```

#### Cache

``` py
from qena_shared_lib.cache import CachedObject, CacheManager


def main() -> FastAPI:
    ...

    cache_manager = CacheManager()

    redis_manager.add(cache_manager)
    builder.with_singleton(
        service=CacheManager,
        instance=cache_manager
    )

    ...


class UserCache(CachedObject):
    full_name: str


class UserService:
    def __init__(self, cache_manager: CacheManager):
        self._cache_manager = cache_manager

    async def cache_user(self):
        await self._cache_manager.set(
            UserCache(full_name="user one")
        )

    async def get_cached_user(self):
        user_cache = await self._cache_manager.get(UserCache)

    async def unset_cached_user(self):
        await self._cache_manager.unset(UserCache)
```

#### Distribute lock

``` py
from qena_shared_lib.sync import DistributedLockManager


def main() -> FastAPI:
    ...

    distributed_lock_manager = DistributedLockManager()

    redis_manager.add(distributed_lock_manager)
    builder.with_singleton(
        service=DistributedLockManager,
        instance=distributed_lock_manager
    )

    ...


class UserService:
    def __init__(self, distributed_lock_manager: DistributedLockManager):
        self._distributed_lock_manager = distributed_lock_manager

    async def create_user(self):
        async with self._distributed_lock_manager("user_one_create") as _:
            ...
```
