Source ingest — control plane registration & data plane streaming

How the SDK's data registers into loop's Source Bus (module/foundation) and how a consumer/recorder reads it. Two gRPC calls on one contract: SourceIngestService.Connect (control) and StreamSamples (data).

Teleop SDK · client

your process. Holds the schema, answers Describe, pushes samples.

── gRPC ──▶

Foundation Source Bus · server

GrpcSourceServer + InMemorySourceSession. Registers connectors, fans out samples.

── subscribe() ──▶

Recorder / capture · consumer

subscribes to a source, persists to parquet/S3.

Agreed contract — every field/message below is from the real proto module/foundation/proto/…/source_ingest.proto, which is byte-identical to the SDK's proto (verified diff). session_id · source_id · timestamp_us · sequence · values are not invented — they are the existing contract the SDK was built against.
Proposed extension (§A2, not in proto yet) — the group structure + per-dim name/unit/dtype/rotation_type/range. Today the code carries only RobotJointDescriptor{key,label} + RobotPayload{values}. Marked proposed below.

What we are adding — exactly

🟠 NEW — proposed, not in the proto today
  • RobotLayout.groups[] — named modality slices (left_leg, eef, …). Today RobotLayout is a flat joints[] only.
  • group.role ∈ {measured, commanded, teleop_input}
  • RobotJointDescriptor.unit
  • RobotJointDescriptor.dtype
  • RobotJointDescriptor.rotation_type (none/quaternion/euler/rotation_6d)
  • RobotJointDescriptor.range / limits
  • source-level: robot_class · instance_id · teleop_method · control_hz · planner_hz · floating_base · component_versionsSourceDiscovered/RobotLayout has none of these today
🟢 EXISTING — already in the proto, reused (NOT new)
  • RobotJointDescriptor.key + .labelalready one per dimension. Per-scalar naming is not something we add; joints[i] already names value i. We only need to use it well.
  • RobotPayload.values — the flat vector
  • envelope: source_id · session_id · timestamp_us · sequence
Convention (usage rule, not a field)
  • one robot unit = one source_id; values = concat of all groups; len(values) == len(joints)

Usage check — are we using the existing contract for its purpose? (all rows verified against server + SDK code)

field (existing)intended purposeour usageverdict
source_ididentify a source; data-plane routing keyone robot unit = one source_id (dual-arm concatenated). Server routes batches purely by source_id (grpc_source_server.py:83)✅ verified correct
session_idserver-assigned at Open, echoed on the data planeSDK records it from OpenCommand (grpc_source_client.py:251), echoes it in every SampleBatch✅ verified correct
RobotPayload.valuesthe robot payload vectorconcat; length validated vs declared joints (teleop_session.py:62–66); server reads tuple(sample.robot.values) (grpc_source.py:55)✅ verified correct
RobotJointDescriptor.key/labelname each ordered value dimensionone descriptor per scalar; mapped 1:1 both ways (proto_mapping.py:35, grpc_source_connector.py:130)✅ verified — already per-dim names
timestamp_usper-sample time (µs)server is pure pass-through, no ordering/sync (grpc_source.py:124). proto fixes no clock domain; Heartbeat.monotonic_us is the only "monotonic" name, so by contrast this = wall-clock. Prior stack used wall-clock.⚠️ no bug, but UNDOCUMENTED — must specify wall-clock epoch µs
sequenceper-sample ordering (uint64)server is pass-through; no dedup/ordering/gap detection (grpc_source.py:125, grpc_connection.py:146). caller-provided.⚠️ informational only — spec "monotonic per source, gap = loss"
StreamStateChangedper-source STARTED / STOPPED / FAILEDserver consumes it: FAILED→SourceEvent(FAILED), STOPPED→clean EOF, STARTED ignored (grpc_connection.py:115–123). SDK doesn't emit it directly — per-source FAILED is reported via ClientError (below); clean stop = ending the stream.✅ covered via ClientError + stream-end
ClientError{code,message,source_id}per-source / scan error reportingserver consumes it → synthetic per-source FAILED (grpc_connection.py:125–135). SDK now emits it via SourceProducer.report_source_error()grpc_source_client.report_error()/_error_event().✅ implemented
SourceDiscovered.pathdevice path / locatorserver maps it to informational SourceInfo.path only (grpc_source_connector.py:104); SDK leaves it empty✅ verified — informational, empty is fine
RecorderCommand.close (Close)server asks client to stop streaming named sources for a sessionSDK _handle_command handles describe/open/shutdown but not close — Close is silently ignored (grpc_source_client.py:222–233)⚠️ BUG — Close unhandled; only full Shutdown stops the SDK
control plane

Connect — bidirectional stream

registers the client & discovers its sources, once
1
SDK ▶ BusClientReady
ClientEvent{ ClientReady{ version } }
SDK opens Connect and announces itself. The first ready makes the server create a GrpcConnection + a connection-scoped GrpcSourceConnector.
grpc_source_server.py:54–62, 96–102
2
Bus (internal)register
source_service.register(connector) → session.register()registered into bus
The connector is stored in the session's _connectors map. This is the SDK client becoming a registered source provider on the bus.
source_service.py:27 · in_memory_source_session.py:70–74
3
consumer ▶ Busscan
source_service.scan() → fan-out connector.get_info()
A consumer asks the bus to enumerate sources; the session calls get_info() on every registered connector concurrently.
source_service.py:30 · in_memory_source_session.py:76–102
4
Bus ▶ SDKDescribe
RecorderCommand{ Describe{} }
The connector sends Describe down the Connect stream and waits (scan timeout, default 5 s) for the answer.
grpc_source_connector.py:70–86
5
SDK ▶ BusSourceDiscovered
ClientEvent{ SourceDiscovered{ source_id, name, RobotLayout{ joints[] } } } ×N + ScanCompleted
One SourceDiscovered per source carrying the layout. proposed the layout becomes group-structured with per-dim name/unit/rotation/range; today it is only joints[{key,label}].
proto SourceDiscovered/RobotLayout/RobotJointDescriptor
6
Bus (internal)map + own
SourceDiscovered → RobotSourceInfo(RobotStateLayout) ; record _source_owner[source_id]sources known to bus
Each discovered source is mapped to a domain SourceInfo and its owning connector recorded. The bus now knows which sources exist and who serves them.
grpc_source_connector.py:103–136 · in_memory_source_session.py:96–101
7
consumer ▶ Bus ▶ SDKopen
open(source_id) → RecorderCommand{ OpenCommand{ session_id, source_ids, RobotOpenParams } }
Opening builds a gRPC-backed GrpcRobotSource, registers source_id → connection in the data-plane routing map, and tells the SDK to start streaming. session_id is assigned here, server-side.
grpc_source_connector.py:88–96 · grpc_source_server.py:136–138
data plane

StreamSamples — client-streaming

per tick, per source, for the whole session
8
SDK ▶ BusSampleBatch
SampleBatch{ session_id, source_id, samples[ Sample{ timestamp_us, sequence, RobotPayload{ values[] } } ] }
The SDK streams batches. values[] is the flat vector aligned to the layout from step 5/6. Every envelope field here is the agreed contract.
proto SampleBatch/Sample/RobotPayload
9
Bus (internal)route by source_id
_connections_by_source[source_id].submit_batch(batch)
The data plane carries no client id, so the server routes each batch purely by source_id to the owning connection.
grpc_source_server.py:73–90
10
Bus (internal)convert + broadcast
GrpcRobotSource iterates batches → SourceSample → _broadcast() to subscribers
The opened source's run loop converts each Sample to a domain SourceSample and fans it out to every subscriber's bounded queue (256, latest-drop under backpressure).
in_memory_source_session.py:173–206
11
consumer ◀ Bussubscribe
subscribe(source_id) → async stream of SourceSample | SourceEvent → persist (parquet/S3)
The recorder/capture consumer drains the subscription and writes episodes. (HTTP today exposes read-only /sources/scan & /sources; open/subscribe are service-level, driven by the capture orchestration.)
source_service.py:39 · in_memory_source_session.py:127–156 · source_http.py:43–65
12
Bus ▶ SDKack
StreamSamplesResponse{ session_id, accepted_samples }
One terminal response per StreamSamples call (one data stream per session, multiplexing all sources by source_id).
grpc_source_server.py:90
Answering "are these names pre-agreed or newly made up?" — Pre-agreed. The full vocabulary (session_id, source_id, timestamp_us, sequence, values and every message: ClientReady, Describe, SourceDiscovered, ScanCompleted, OpenCommand, RobotLayout, RobotJointDescriptor, RobotPayload, SampleBatch, Sample, StreamSamplesResponse) is defined in the committed proto and the SDK's proto is byte-identical to it. The only thing we are proposing to add is the richer layout descriptor (groups + per-dim name/unit/dtype/rotation_type/range), which is not in the proto yet.

Sources (all real, in this repo): module/foundation/proto/loop/foundation/source/v1/source_ingest.proto, …/source/inbound/grpc_source_server.py, …/grpc_source_connector.py, …/service/source_service.py, …/outbound/in_memory_source_session.py, …/inbound/source_http.py, …/domain/robot.py.