외부 클라이언트가 foundation Source Bus에서 한 source의 Sample 스트림을 받아오는 계약 설계안. 기존 ingest(쓰기) 계약과 in-process subscribe를 거의 대칭으로 변형 — foundation에 새로 추가할 것을 계획부터 정리했습니다.
| Produce / 쓰기 (있음) | Consume / 읽기 (제안) | |
|---|---|---|
| 핸드셰이크 | Connect bidi — 서버가 Describe→클라가 SourceDiscovered 선언, Open으로 시작 | Subscribe(SubscribeRequest) — 클라가 받을 source_id 지정 (Open의 거울) |
| 데이터 평면 | StreamSamples client-streaming (클라 → 서버 push) | returns (stream SubscribeEvent) server-streaming (서버 → 클라 push) — 방향만 반대 |
| 샘플 메시지 | Sample{ timestamp_us, sequence, oneof payload } | 그대로 재사용 — SubscribeEvent.sample = Sample |
| 상태/에러 | 클라가 StreamStateChanged/ClientError emit | 서버가 StreamStateChanged(FAILED/STOPPED)를 같은 스트림에 실어 보냄 (그대로 재사용) |
| 버스 내부 | — | SourceSession.subscribe(source_id) 이미 존재 (in-process). 네트워크 front만 없음 |
핵심: 메시지(Sample/StreamStateChanged)도, 버스 기능(subscribe 팬아웃)도 이미 있습니다. 빠진 건 "server-streaming RPC 한 개 + 그걸 subscribe에 잇는 thin servicer"뿐. 새로 만드는 양이 작고, ingest의 거울이라 컨벤션도 정해져 있습니다.
service SourceIngestService { rpc Connect(stream ClientEvent) returns (stream RecorderCommand); rpc StreamSamples(stream SampleBatch) returns (StreamSamplesResponse); } // 둘 다 ingest 방향(클라→서버). 서버는 sink. // server→client 로 Sample 흘리는 RPC는 없음.
message Sample { int64 timestamp_us = 1; uint64 sequence = 2; oneof payload { TactilePayload tactile=10; RobotPayload robot=11; MarkerPayload marker=12; } } message StreamStateChanged { string session_id=1; string source_id=2; StreamState state=3; string detail=4; } // StreamState: UNSPECIFIED|STARTED|STOPPED|FAILED
def subscribe(self, source_id: str) -> AsyncIterator[SourceSample | SourceEvent] # bounded(256) per-subscriber 큐, 다중 구독 지원, # 한 소스당 broadcast 런루프. 외부 호출 불가(같은 프로세스만).
# grpc_source._stream: 이미 pb2.Sample ↔ SourceSample 변환 코드 존재 # (ingest 수신측). 읽기측은 이 매핑을 역방향으로 재사용. # preview: subscribe() 이터레이터를 HTTP(MJPEG)로 어댑트한 선례 # → 우리는 같은 이터레이터를 gRPC server-stream으로 어댑트.
// 별도 서비스 권장(ingest는 sink, read는 source — 방향이 반대). service SourceReadService { rpc Subscribe(SubscribeRequest) returns (stream SubscribeEvent); } message SubscribeRequest { string source_id = 1; // 받을 소스 (open 선행 필요 — Q4) string session_id = 2; // 선택: 세션 범위 한정 } // Q4: source가 open 안 됐으면 SourceNotFoundError(=StreamStateChanged FAILED). // open 트리거 방식(별도 단계 vs Subscribe-implies-open)은 합의 항목. message SubscribeEvent { oneof event { Sample sample = 1; // ← 재사용 StreamStateChanged state = 2; // ← 재사용 (FAILED/STOPPED) } }
# 새 GrpcService — ingest 서버와 동일하게 register()로 붙음 class GrpcSourceReadServer(pb2_grpc.SourceReadServiceServicer, GrpcService): async def Subscribe(self, req, ctx): async for item in self._session.subscribe(req.source_id): if isinstance(item, SourceEvent): yield SubscribeEvent(state=_to_state(item)); continue yield SubscribeEvent(sample=_to_pb_sample(item)) # _to_pb_sample = grpc_source._stream 매핑의 역방향(SourceSample→pb2.Sample) # register(): add_SourceReadServiceServicer_to_server(self, server)
Subscribe(source_id) → 서버가 session.subscribe() 이터레이터를 받아 각 SourceSample을 Sample로 변환해 server-stream으로 yield. 에러/종료는 StreamStateChanged로. 버스 팬아웃은 이미 다중 구독 지원이라 서버 변경은 얇음.class SourceConsumer: # SourceProducer의 대칭 (이름 이미 예약됨) @classmethod def connect(loop_addr, ...) -> SourceConsumer ... def subscribe(source_id: str) -> Iterator[RobotFrame | TactileFrame | MarkerFrame]: # Subscribe RPC 열고, SubscribeEvent.sample → 도메인 frame 디코딩, # StreamStateChanged(FAILED) → 도메인 예외. proto_mapping 역방향 추가.
SourceReadService. 코드 확인: SourceIngestService는 proto 주석상 "remote source clients (sensor/helper processes)" = 외부 전용이고, 내부 collect는 gRPC가 아니라 in-process subscribe만 씀(collect에 stub 없음). 따라서 SourceReadService도 동일하게 외부 경계 전용이고 내부 소비자는 계속 in-process. 방향(sink↔source)이 반대라 분리가 의미·인증·배포상 깔끔. source_ingest.proto:5–9authn 모듈(Account·AuthenticationSession·TokenClaims·JwtTokenIssuer)이 있으니, "이 토큰이 어떤 source를 읽나"는 거기 + source 권한 정책으로. SDK는 토큰만 제시, 정책은 안 가짐. 최종 모델은 플랫폼/보안이 확정. module/foundation/.../authn/subscribe()는 source가 이미 open 상태가 아니면 SourceNotFoundError를 던짐(in_memory_source_session.py:164). 내부 소비자도 open → subscribe로 동작. 외부 SDK도 축소된 "그냥 구독"이 아니라 동일한 scan/open→subscribe 라이프사이클을 가져야 함. 구현은 (a) Subscribe 전에 별도 open 단계, 또는 (b) Subscribe가 내부적으로 open을 트리거 — 둘 중 합의. in_memory_source_session.py:162–164OpenCommand.source_ids)은 repeated(여러 개 지정 가능)지만, 데이터 평면은 단일 source: SampleBatch.source_id(단일), subscribe(source_id)(단일). 따라서 Subscribe를 source당 하나로 두는 건 기존 컨벤션을 그대로 따르는 것 — 우리가 새로 정할 부분이 아님. 여러 source는 구독 여러 개(버스가 이미 다중구독 지원). source_ingest.proto:65 vs 218 · source_session.py:44SourceReadService(외부 전용) · allow-list 인증은 foundation authn에 정의(SDK는 토큰만) · newest-drop 동일+드롭 카운트 · open 선행 필요(내부와 동일 라이프사이클) · 데이터 평면 source당 하나(기존 컨벤션). 남은 미정: Q2 정책의 구체 모델(플랫폼/보안), Q4의 open 트리거 방식(별도 단계 vs Subscribe-implies-open).SourceReadService.Subscribe 추가 → 스텁 재생성 → thin servicer(session.subscribe 연결) + GrpcService 등록.SourceConsumer 클론 코딩 + proto_mapping 역방향(디코딩) + 예제/테스트(왕복 충실도).Sample == 받은 Sample(timestamp/sequence/values 일치), 중복·누락·순서 체크.참고: module/foundation/.../source_ingest.proto, inbound/grpc_source.py(변환), outbound/in_memory_source_session.py(subscribe 팬아웃), collect preview(subscribe→HTTP 어댑트 선례). 계약은 foundation 소유 — 이 문서는 우리(SDK) 측 제안/계획이며 합의 후 foundation이 proto를 추가합니다. 상세 요청서: docs/foundation_request_consumer_rpc.md (PR #2).