Metadata-Version: 2.4
Name: nats-common
Version: 0.1.0
Summary: Add your description here
Author-email: Valentin Schroeter <valentin.schroeter@student.hpi.de>
Requires-Python: >=3.12
Requires-Dist: faststream[nats]>=0.6.5
Requires-Dist: pydantic-settings>=2.12.0
Requires-Dist: uuid>=1.30
Description-Content-Type: text/markdown

## nats-common

Small shared building blocks for NATS-based Python services:

- `Envelope[T]`: a thin, typed message wrapper (metadata + payload)
- `NatsCommonSettings`: shared settings layout using `pydantic-settings` (supports `.env`)
- **Jobs**: Long-running, exactly-once tasks with heartbeat, cancellation, and progress reporting (`NatsJobService` / `NatsJobClient`)
- **RPCs**: Lightweight request/reply pattern with load balancing (`NatsRPCService` / `NatsRPCClient`)

## Envelope (message wrapper)

`Envelope[T]` wraps your actual payload (`data`) with common metadata (`id`, `ts`, `correlation_id`, …).  
The `type` field is auto-filled from `data.__msg_type__` (if present), otherwise from the payload class path.

### Create / serialize / validate

```python
from pydantic import BaseModel

from nats_common.envelope import Envelope


class AudioChunk(BaseModel):
    __msg_type__ = "audio.chunk"
    samples: list[float]


env_out: Envelope[AudioChunk] = Envelope.from_data(AudioChunk(samples=[0.1, 0.2]), id=True)

### The following is usually not needed, as FastStream handles this automatically
# Serialize for transport (JSON-compatible dict)
payload: dict[str, object] = env_out.model_dump(mode="json")

# Validate/parse back (e.g. on the subscriber side)
env_in: Envelope[AudioChunk] = Envelope[AudioChunk].model_validate(payload)
assert env_in.data is not None
print(env_in.type, env_in.data.samples)
```

### FastStream example (publish + subscribe with `Envelope`)

This mirrors the pattern used in `nats-microphone`: define publishers/subscribers with `Envelope[T]`, and publish an `Envelope` instance.

```python
from pydantic import BaseModel
from faststream import FastStream
from faststream.nats import NatsBroker

from nats_common.envelope import Envelope
from nats_common.settings import NatsCommonSettings


class Ping(BaseModel):
    __msg_type__ = "ping"
    text: str


settings = NatsCommonSettings()
broker = NatsBroker(settings.url)
app = FastStream(broker)


@broker.subscriber("demo.in.ping")
async def on_ping(env: Envelope[Ping]) -> None:
    assert env.data is not None
    print(f"got ping: {env.data.text} (type={env.type}, id={env.id})")


@app.after_startup
async def publish_once() -> None:
    await broker.publish(
        Envelope.from_data(Ping(text="hello"), id=True),
        subject="demo.in.ping",
    )
```


## Settings (`NatsCommonSettings`)

`NatsCommonSettings` reads configuration from environment variables and `.env` files.

- Prefix: `NATS_`
- `.env` file: `./.env` (UTF-8)
- Nested delimiter: `__` (double underscore)

### Basic usage

```python
from nats_common.settings import NatsCommonSettings

settings = NatsCommonSettings()
print(settings.url)
print(settings.subject_prefix)
print(settings.device)
```

Example `.env`:

```dotenv
NATS_URL=nats://localhost:4222
NATS_SUBJECT_PREFIX=test.foo
NATS_DEVICE=device1
```


### Nested settings + `.env` (principle)

Because `NatsCommonSettings` uses `env_nested_delimiter="__"`, you can override nested configuration via env vars like:
`NATS_<FIELDNAME>__<SUBFIELD>=...`

Example:

```python
from pydantic import BaseModel, Field

from nats_common.settings import NatsCommonSettings


class MicrophoneSettings(BaseModel):
    sample_rate: int = 44100


class Settings(NatsCommonSettings):
    microphone: MicrophoneSettings = Field(default_factory=MicrophoneSettings)


settings = Settings()
print(settings.microphone.sample_rate)
```

Example `.env` to override the nested value:

```dotenv
NATS_URL=nats://localhost:4222

# Override the nested value, with the nested delimiter "__" (two underscores)
NATS_MICROPHONE__SAMPLE_RATE=16000
```


## Jobs (Long-running tasks)

`NatsJobService` and `NatsJobClient` provide a complete solution for long-running, exactly-once jobs with heartbeat, cancellation, and progress reporting.

### Service-side (Job handler)

```python
import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import JobStatus, NatsJob, NatsJobService


# Define your models
class ProcessRequest(BaseModel):
    file_url: str


class ProcessChunk(BaseModel):
    text: str
    offset: int


class ProcessResult(BaseModel):
    full_text: str


# Implement your job handler
class ProcessJob(NatsJob[ProcessRequest, ProcessResult, ProcessChunk]):
    async def handle(self) -> None:
        req: ProcessRequest | None = self.request.data  # type: ignore[assignment]
        if req is None:
            return

        # Check for cancellation
        if self.cancel_requested:
            return

        # Update progress
        self.set_local_status(JobStatus.RUNNING, progress=0.5, message="Processing...")

        # Publish intermediate results
        await self.publish_intermediate_result(
            ProcessChunk(text="chunk1", offset=0),
        )

        # Publish final result
        await self.publish_final_result(
            ProcessResult(full_text="complete"),
        )


# Start the service
async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    service = NatsJobService(
        nats_client=nats_client,
        subject="myapp.process",
        job_cls=ProcessJob,
    )

    await service.start()
    # ... keep running ...
    await service.stop()
```

### Client-side (Job submitter)

```python
import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import JobStatusUpdate, NatsJobClient


class ProcessRequest(BaseModel):
    file_url: str


class ProcessChunk(BaseModel):
    text: str
    offset: int


class ProcessResult(BaseModel):
    full_text: str


async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    client = NatsJobClient[ProcessRequest, ProcessChunk, ProcessResult](
        nats_client=nats_client,
        subject="myapp.process",
    )

    # Submit a job
    await client.submit(
        job_id="job-123",
        data=ProcessRequest(file_url="https://example.com/file.txt"),
    )

    # Option 1: Wait for completion (high-level)
    completion = await client.wait_for_completion(
        "job-123",
        timeout_seconds=60,
        on_status=lambda update: print(f"Status: {update.status.value}"),
        on_intermediate=lambda chunk: print(f"Chunk: {chunk.text}"),
    )
    if completion.succeeded:
        print(f"Result: {completion.final_result}")

    # Option 2: Manual subscriptions (low-level)
    async def on_status(update: JobStatusUpdate) -> None:
        print(f"Status: {update.status.value} - {update.message}")

    sub = await client.subscribe_status("job-123", on_status)
    # ... later: await sub.unsubscribe()
```


## RPCs (Request/Reply)

`NatsRPCService` and `NatsRPCClient` provide a lightweight request/reply pattern with load balancing across service replicas.

### Service-side (RPC handler)

```python
import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import NatsRPC, NatsRPCService


# Define your models
class EchoRequest(BaseModel):
    message: str


class EchoResponse(BaseModel):
    echoed: str


# Implement your RPC handler
class EchoRpc(NatsRPC[EchoRequest, EchoResponse]):
    async def handle(self) -> EchoResponse:
        req: EchoRequest | None = self.request.data  # type: ignore[assignment]
        return EchoResponse(echoed=req.message if req else "")


# Start the service
async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    service = NatsRPCService(
        nats_client=nats_client,
        subject="myapp.echo",
        rpc_cls=EchoRpc,
        request_model=EchoRequest,
        response_model=EchoResponse,
    )

    await service.start()
    # ... keep running ...
    await service.stop()
```

### Client-side (RPC caller)

```python
import asyncio
import nats
from pydantic import BaseModel

from nats_common.services import NatsRPCClient


class EchoRequest(BaseModel):
    message: str


class EchoResponse(BaseModel):
    echoed: str


async def main():
    nats_client = await nats.connect("nats://localhost:4222")

    client = NatsRPCClient(
        nats_client=nats_client,
        subject="myapp.echo",
        response_model=EchoResponse,
    )

    # Make an RPC call
    resp = await client.call(data=EchoRequest(message="hello"))

    if resp.error:
        print(f"Error: {resp.error}")
    else:
        print(f"Response: {resp.data}")  # EchoResponse(echoed="hello")
```
