Consumer (read) protocol — plan & proposal

외부 클라이언트가 foundation Source Bus에서 한 source의 Sample 스트림을 받아오는 계약 설계안. 기존 ingest(쓰기) 계약과 in-process subscribe를 거의 대칭으로 변형 — foundation에 새로 추가할 것을 계획부터 정리했습니다.

이미 있음 — 재사용/대칭의 기준 (ingest 계약 · in-process subscribe)
새로 정의 — foundation에 추가할 read 계약

1. 한눈에: 쓰기(있음) ↔ 읽기(제안)는 대칭

Produce / 쓰기 (있음)Consume / 읽기 (제안)
핸드셰이크Connect bidi — 서버가 Describe→클라가 SourceDiscovered 선언, Open으로 시작Subscribe(SubscribeRequest) — 클라가 받을 source_id 지정 (Open의 거울)
데이터 평면StreamSamples client-streaming (클라 → 서버 push)returns (stream SubscribeEvent) server-streaming (서버 → 클라 push) — 방향만 반대
샘플 메시지Sample{ timestamp_us, sequence, oneof payload }그대로 재사용SubscribeEvent.sample = Sample
상태/에러클라가 StreamStateChanged/ClientError emit서버가 StreamStateChanged(FAILED/STOPPED)를 같은 스트림에 실어 보냄 (그대로 재사용)
버스 내부SourceSession.subscribe(source_id) 이미 존재 (in-process). 네트워크 front만 없음

핵심: 메시지(Sample/StreamStateChanged)도, 버스 기능(subscribe 팬아웃)도 이미 있습니다. 빠진 건 "server-streaming RPC 한 개 + 그걸 subscribe에 잇는 thin servicer"뿐. 새로 만드는 양이 작고, ingest의 거울이라 컨벤션도 정해져 있습니다.

2. 참고자료 — 이미 있는 것 (재사용/거울의 기준)

A. ingest 계약 (SourceIngestService)
service SourceIngestService {
  rpc Connect(stream ClientEvent) returns (stream RecorderCommand);
  rpc StreamSamples(stream SampleBatch) returns (StreamSamplesResponse);
}
// 둘 다 ingest 방향(클라→서버). 서버는 sink.
// server→client 로 Sample 흘리는 RPC는 없음.
source_ingest.proto:18–21
B. 재사용할 메시지 (그대로 씀)
message Sample {
  int64 timestamp_us = 1;
  uint64 sequence = 2;
  oneof payload { TactilePayload tactile=10; RobotPayload robot=11; MarkerPayload marker=12; }
}
message StreamStateChanged {
  string session_id=1; string source_id=2; StreamState state=3; string detail=4;
}
// StreamState: UNSPECIFIED|STARTED|STOPPED|FAILED
source_ingest.proto:104–116, 222–231
C. in-process 소비 프리미티브 (네트워크 front만 없음)
def subscribe(self, source_id: str)
    -> AsyncIterator[SourceSample | SourceEvent]
# bounded(256) per-subscriber 큐, 다중 구독 지원,
# 한 소스당 broadcast 런루프. 외부 호출 불가(같은 프로세스만).
port/source_session.py:44 · outbound/in_memory_source_session.py:162,210,237
D. 변환·어댑트 선례 (서버측 그대로 모방)
# grpc_source._stream: 이미 pb2.Sample ↔ SourceSample 변환 코드 존재
# (ingest 수신측). 읽기측은 이 매핑을 역방향으로 재사용.
# preview: subscribe() 이터레이터를 HTTP(MJPEG)로 어댑트한 선례
#   → 우리는 같은 이터레이터를 gRPC server-stream으로 어댑트.
inbound/grpc_source.py:42–131 · collect preview/camera_mjpeg_stream.py

3. 제안 — 새 read 계약 foundation에 추가

proto (additive)
// 별도 서비스 권장(ingest는 sink, read는 source — 방향이 반대).
service SourceReadService {
  rpc Subscribe(SubscribeRequest) returns (stream SubscribeEvent);
}

message SubscribeRequest {
  string source_id = 1;        // 받을 소스 (open 선행 필요 — Q4)
  string session_id = 2;       // 선택: 세션 범위 한정
}
// Q4: source가 open 안 됐으면 SourceNotFoundError(=StreamStateChanged FAILED).
//     open 트리거 방식(별도 단계 vs Subscribe-implies-open)은 합의 항목.

message SubscribeEvent {
  oneof event {
    Sample sample = 1;              // ← 재사용
    StreamStateChanged state = 2;   // ← 재사용 (FAILED/STOPPED)
  }
}
server (thin servicer, 기존 패턴 모방)
# 새 GrpcService — ingest 서버와 동일하게 register()로 붙음
class GrpcSourceReadServer(pb2_grpc.SourceReadServiceServicer, GrpcService):
  async def Subscribe(self, req, ctx):
    async for item in self._session.subscribe(req.source_id):
      if isinstance(item, SourceEvent):
        yield SubscribeEvent(state=_to_state(item)); continue
      yield SubscribeEvent(sample=_to_pb_sample(item))
# _to_pb_sample = grpc_source._stream 매핑의 역방향(SourceSample→pb2.Sample)
# register(): add_SourceReadServiceServicer_to_server(self, server)
데이터 흐름: 클라가 Subscribe(source_id) → 서버가 session.subscribe() 이터레이터를 받아 각 SourceSampleSample로 변환해 server-stream으로 yield. 에러/종료는 StreamStateChanged로. 버스 팬아웃은 이미 다중 구독 지원이라 서버 변경은 얇음.

4. SDK 쪽 (우리, foundation 이후)

class SourceConsumer:   # SourceProducer의 대칭 (이름 이미 예약됨)
  @classmethod
  def connect(loop_addr, ...) -> SourceConsumer ...
  def subscribe(source_id: str) -> Iterator[RobotFrame | TactileFrame | MarkerFrame]:
      # Subscribe RPC 열고, SubscribeEvent.sample → 도메인 frame 디코딩,
      # StreamStateChanged(FAILED) → 도메인 예외. proto_mapping 역방향 추가.

5. 설계 결정 — 코드로 검증 + 합의 결과

Q1. 서비스 분리? 결정: 분리
별도 SourceReadService. 코드 확인: SourceIngestService는 proto 주석상 "remote source clients (sensor/helper processes)" = 외부 전용이고, 내부 collect는 gRPC가 아니라 in-process subscribe만 씀(collect에 stub 없음). 따라서 SourceReadService도 동일하게 외부 경계 전용이고 내부 소비자는 계속 in-process. 방향(sink↔source)이 반대라 분리가 의미·인증·배포상 깔끔. source_ingest.proto:5–9
Q2. 인증/가시성 결정: 명시적 allow-list, foundation에 정의
기본 거부 + 토큰이 읽을 수 있는 source만 허용. SDK는 "정확히 가져갈 수 있는 것"만 받음. 정의 위치 = foundation — 이미 authn 모듈(Account·AuthenticationSession·TokenClaims·JwtTokenIssuer)이 있으니, "이 토큰이 어떤 source를 읽나"는 거기 + source 권한 정책으로. SDK는 토큰만 제시, 정책은 안 가짐. 최종 모델은 플랫폼/보안이 확정. module/foundation/.../authn/
Q3. 느린 reader 백프레셔 결정: 동일 매커니즘
in-process와 동일한 256 bounded + newest-drop + 드롭 수 노출. 실시간 "최신이 이긴다" 의미 유지, 소비자가 손실을 알 수 있게 드롭 카운트 통지(producer의 stats.dropped와 대칭). 무손실 모드는 후속.
Q4. 라이프사이클 정정: open 필요 (앞 제안 철회)
소비자도 open을 거쳐야 함 — 내부와 동일한 역할. 코드 확인: subscribe()는 source가 이미 open 상태가 아니면 SourceNotFoundError를 던짐(in_memory_source_session.py:164). 내부 소비자도 open → subscribe로 동작. 외부 SDK도 축소된 "그냥 구독"이 아니라 동일한 scan/open→subscribe 라이프사이클을 가져야 함. 구현은 (a) Subscribe 전에 별도 open 단계, 또는 (b) Subscribe가 내부적으로 open을 트리거 — 둘 중 합의. in_memory_source_session.py:162–164
Q5. 다중 소스 기존 컨벤션 있음 (새로 정의 아님)
데이터 평면은 source 하나 단위 — 이미 정해진 규약. 컨트롤 평면(OpenCommand.source_ids)은 repeated(여러 개 지정 가능)지만, 데이터 평면은 단일 source: SampleBatch.source_id(단일), subscribe(source_id)(단일). 따라서 Subscribe를 source당 하나로 두는 건 기존 컨벤션을 그대로 따르는 것 — 우리가 새로 정할 부분이 아님. 여러 source는 구독 여러 개(버스가 이미 다중구독 지원). source_ingest.proto:65 vs 218 · source_session.py:44
결정 요약: 별도 SourceReadService(외부 전용) · allow-list 인증은 foundation authn에 정의(SDK는 토큰만) · newest-drop 동일+드롭 카운트 · open 선행 필요(내부와 동일 라이프사이클) · 데이터 평면 source당 하나(기존 컨벤션). 남은 미정: Q2 정책의 구체 모델(플랫폼/보안), Q4의 open 트리거 방식(별도 단계 vs Subscribe-implies-open).

6. 계획 단계

1합의 — Q1–Q5 결정 (foundation + 우리). PR #2 요청서가 출발점.
2foundation — proto에 SourceReadService.Subscribe 추가 → 스텁 재생성 → thin servicer(session.subscribe 연결) + GrpcService 등록.
3SDK — 계약 동기화 후 SourceConsumer 클론 코딩 + proto_mapping 역방향(디코딩) + 예제/테스트(왕복 충실도).
4검증 — produce→bus→consume 왕복 E2E: 보낸 Sample == 받은 Sample(timestamp/sequence/values 일치), 중복·누락·순서 체크.

참고: module/foundation/.../source_ingest.proto, inbound/grpc_source.py(변환), outbound/in_memory_source_session.py(subscribe 팬아웃), collect preview(subscribe→HTTP 어댑트 선례). 계약은 foundation 소유 — 이 문서는 우리(SDK) 측 제안/계획이며 합의 후 foundation이 proto를 추가합니다. 상세 요청서: docs/foundation_request_consumer_rpc.md (PR #2).