Metadata-Version: 2.4
Name: rainbow-rb-zenoh
Version: 0.0.9.dev7
Summary: Rainbow Zenoh Python
Author-email: Derek <dfd1123@rainbow-robotics.com>
Requires-Python: <3.13,>=3.12
Description-Content-Type: text/markdown
Requires-Dist: psutil<8.0.0,>=7.0.0
Requires-Dist: flatbuffers<26.0.0,>=25.2.10
Requires-Dist: fastapi>=0.116.1
Requires-Dist: rainbow-rb-utils==0.0.9.dev7
Requires-Dist: rainbow-rb-log==0.0.9.dev7

# rb_zenoh

`rb_zenoh`는 Zenoh 기반의 공통 통신 패키지입니다.
주요 기능은 `publish/subscribe`, `query/queryable`, `ZenohRouter` 데코레이터 라우팅입니다.

## 설치/의존성

- Python: `>=3.12,<3.13`
- 패키지 의존성: `flatbuffers`, `psutil`, `rb_utils`, `rb_modules`

## 핵심 객체

- `ZenohClient`: 저수준 통신 API
- `ZenohRouter`: 데코레이터 기반 라우터
- `SubscribeOptions`: subscribe 동작 옵션

## 1) publish

```python
from rb_zenoh.client import ZenohClient

client = ZenohClient()
client.publish("muscat/sample/topic", payload={"ok": True})
```

FlatBuffer 요청 publish:

```python
from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT

req = Request_MotionPauseT()
client.publish(
    "C500920/call_pause",
    flatbuffer_req_obj=req,
    flatbuffer_buf_size=64,
)
```

## 2) subscribe

```python
from rb_zenoh.client import ZenohClient
from rb_zenoh.schema import SubscribeOptions

client = ZenohClient()

async def on_msg(*, topic, mv, obj_payload, dict_payload, attachment):
    print(topic, obj_payload, dict_payload, attachment)

client.subscribe(
    "muscat/sample/topic",
    on_msg,
    options=SubscribeOptions(dispatch="immediate"),
)
```

콜백 시그니처:
- `topic`: topic 문자열
- `mv`: raw payload(memoryview)
- `obj_payload`: `flatbuffer_obj_t=<FlatBuffer T class>` 파싱 객체 (`T | None`)
- `dict_payload`: `flatbuffer_obj_t` 유무와 무관하게 dict 변환 결과 (`dict | None`)
- `attachment`: `sender`, `sender_id` 정보

## 3) queryable

```python
def on_query(req=None, params=None):
    return {"status": "ok"}

client.queryable("muscat/sample/query", on_query)
```

콜백 인자 의미:
- `req`: query payload 본문
  - `flatbuffer_req_t`/`flatbuffer_req_T_class`를 지정한 경우에만 주입됩니다.
  - 내부에서 `InitFromPackedBuf(...)`로 파싱한 FlatBuffer `...T` 객체입니다.
  - 요청 payload가 없는데 `flatbuffer_req_t`를 지정하면 에러가 발생합니다.
- `params`: query parameter
  - 내부 `q.parameters` 값을 `dict[str, str]` 형태로 주입합니다.
  - payload(`req`)와 별도로 필터/옵션 전달에 사용합니다.

주의사항:
- 콜백 파라미터 이름이 정확히 `req`, `params`일 때만 자동 주입됩니다.
- `request`, `query_params` 같은 다른 이름으로 선언하면 주입되지 않습니다.
- 둘 다 선언하지 않으면 인자 없이 호출됩니다.

## 4) query_one / query_all

### query_one

```python
res = client.query_one("muscat/program/state", timeout=0.3)
print(res.get("dict_payload"))
```

- 첫 응답 1개만 반환
- 응답이 없으면 `ZenohNoReply` 예외

### query_all

```python
res_list = client.query_all("*/health", timeout=0.5)
for item in res_list:
    print(item.get("key"), item.get("dict_payload"))
```

- timeout 내 도착한 응답을 모두 list로 반환
- 응답이 없어도 빈 리스트 반환

### 언제 무엇을 쓸지

- 단일 대상 호출: `query_one`
- 와일드카드/다중 서비스 수집: `query_all`

## FlatBuffer 파라미터 상세

### `flatbuffer_req_obj`

- 의미: 요청으로 보낼 FlatBuffer 객체(`Pack()` 가능한 객체)
- 사용 위치: `publish`, `query_one`, `query_all`
- 내부 동작: builder로 pack 후 bytes payload 전송

### `flatbuffer_buf_size`

- 의미: FlatBuffer 직렬화 builder 초기 버퍼 크기
- 사용 위치:
  - 요청 직렬화: `publish`, `query_one`, `query_all`
  - queryable 응답 직렬화: `queryable(..., flatbuffer_res_buf_size=...)`
- 너무 작으면 직렬화 실패 가능
- 권장: 64/128/256/512부터 시작해 payload 크기에 맞게 상향

### `flatbuffer_res_T_class`

- 의미: 응답 payload를 어떤 FlatBuffer 타입으로 파싱할지 지정
- 사용 위치: `query_one`, `query_all`
- 타입: **FlatBuffer Object API 클래스(T 클래스)** 를 넘겨야 함
  - 예: `Response_FunctionsT`, `State_CoreT`
  - 보통 이름이 `...T`로 끝나는 클래스
  - 내부적으로 `InitFromPackedBuf(...)`를 통해 파싱됨
- 결과:
  - 지정 시: `obj_payload`에 파싱 객체
  - 미지정 시: `dict_payload`/raw payload 중심 사용

예시:

```python
from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT
from rb_flat_buffers.IPC.Response_Functions import Response_FunctionsT

res = client.query_one(
    "C500920/call_pause",
    flatbuffer_req_obj=Request_MotionPauseT(),
    flatbuffer_res_T_class=Response_FunctionsT,
    flatbuffer_buf_size=64,
    timeout=0.3,
)
obj = res.get("obj_payload")
```

## 5) ZenohRouter 사용

```python
from rb_zenoh.router import ZenohRouter

router = ZenohRouter(prefix="muscat/common")

@router.subscribe("health")
async def on_health(*, topic, obj_payload, dict_payload, attachment):
    pass

@router.queryable("echo")
async def on_echo(params=None):
    return {"ok": True}
```

라이프사이클:
- 시작 시 `await router.startup()`
- 종료 시 `await router.shutdown()`

## 예외/주의사항

- `query_one` no-reply: `ZenohNoReply`
- transport 재연결 계열: `ZenohTransportError`
- `ZenohClient`는 프로세스 단위 싱글톤이므로 불필요한 잦은 `close()` 호출은 피하는 것을 권장
