How the SDK's data registers into loop's Source Bus (module/foundation) and how a consumer/recorder reads it. Two gRPC calls on one contract: SourceIngestService.Connect (control) and StreamSamples (data).
your process. Holds the schema, answers Describe, pushes samples.
GrpcSourceServer + InMemorySourceSession. Registers connectors, fans out samples.
subscribes to a source, persists to parquet/S3.
module/foundation/proto/…/source_ingest.proto, which is byte-identical to the SDK's proto (verified diff). session_id · source_id · timestamp_us · sequence · values are not invented — they are the existing contract the SDK was built against.name/unit/dtype/rotation_type/range. Today the code carries only RobotJointDescriptor{key,label} + RobotPayload{values}. Marked proposed below.RobotLayout.groups[] — named modality slices (left_leg, eef, …). Today RobotLayout is a flat joints[] only.group.role ∈ {measured, commanded, teleop_input}RobotJointDescriptor.unitRobotJointDescriptor.dtypeRobotJointDescriptor.rotation_type (none/quaternion/euler/rotation_6d)RobotJointDescriptor.range / limitsrobot_class · instance_id · teleop_method · control_hz · planner_hz · floating_base · component_versions — SourceDiscovered/RobotLayout has none of these todayRobotJointDescriptor.key + .label — already one per dimension. Per-scalar naming is not something we add; joints[i] already names value i. We only need to use it well.RobotPayload.values — the flat vectorsource_id · session_id · timestamp_us · sequencesource_id; values = concat of all groups; len(values) == len(joints)| field (existing) | intended purpose | our usage | verdict |
|---|---|---|---|
| source_id | identify a source; data-plane routing key | one robot unit = one source_id (dual-arm concatenated). Server routes batches purely by source_id (grpc_source_server.py:83) | ✅ verified correct |
| session_id | server-assigned at Open, echoed on the data plane | SDK records it from OpenCommand (grpc_source_client.py:251), echoes it in every SampleBatch | ✅ verified correct |
| RobotPayload.values | the robot payload vector | concat; length validated vs declared joints (teleop_session.py:62–66); server reads tuple(sample.robot.values) (grpc_source.py:55) | ✅ verified correct |
| RobotJointDescriptor.key/label | name each ordered value dimension | one descriptor per scalar; mapped 1:1 both ways (proto_mapping.py:35, grpc_source_connector.py:130) | ✅ verified — already per-dim names |
| timestamp_us | per-sample time (µs) | server is pure pass-through, no ordering/sync (grpc_source.py:124). proto fixes no clock domain; Heartbeat.monotonic_us is the only "monotonic" name, so by contrast this = wall-clock. Prior stack used wall-clock. | ⚠️ no bug, but UNDOCUMENTED — must specify wall-clock epoch µs |
| sequence | per-sample ordering (uint64) | server is pass-through; no dedup/ordering/gap detection (grpc_source.py:125, grpc_connection.py:146). caller-provided. | ⚠️ informational only — spec "monotonic per source, gap = loss" |
| StreamStateChanged | per-source STARTED / STOPPED / FAILED | server consumes it: FAILED→SourceEvent(FAILED), STOPPED→clean EOF, STARTED ignored (grpc_connection.py:115–123). SDK doesn't emit it directly — per-source FAILED is reported via ClientError (below); clean stop = ending the stream. | ✅ covered via ClientError + stream-end |
| ClientError{code,message,source_id} | per-source / scan error reporting | server consumes it → synthetic per-source FAILED (grpc_connection.py:125–135). SDK now emits it via SourceProducer.report_source_error() → grpc_source_client.report_error()/_error_event(). | ✅ implemented |
| SourceDiscovered.path | device path / locator | server maps it to informational SourceInfo.path only (grpc_source_connector.py:104); SDK leaves it empty | ✅ verified — informational, empty is fine |
| RecorderCommand.close (Close) | server asks client to stop streaming named sources for a session | SDK _handle_command handles describe/open/shutdown but not close — Close is silently ignored (grpc_source_client.py:222–233) | ⚠️ BUG — Close unhandled; only full Shutdown stops the SDK |
Connect and announces itself. The first ready makes the server create a GrpcConnection + a connection-scoped GrpcSourceConnector.grpc_source_server.py:54–62, 96–102_connectors map. This is the SDK client becoming a registered source provider on the bus.source_service.py:27 · in_memory_source_session.py:70–74get_info() on every registered connector concurrently.source_service.py:30 · in_memory_source_session.py:76–102Describe down the Connect stream and waits (scan timeout, default 5 s) for the answer.grpc_source_connector.py:70–86SourceDiscovered per source carrying the layout. proposed the layout becomes group-structured with per-dim name/unit/rotation/range; today it is only joints[{key,label}].SourceDiscovered/RobotLayout/RobotJointDescriptor_source_owner[source_id]sources known to busSourceInfo and its owning connector recorded. The bus now knows which sources exist and who serves them.grpc_source_connector.py:103–136 · in_memory_source_session.py:96–101GrpcRobotSource, registers source_id → connection in the data-plane routing map, and tells the SDK to start streaming. session_id is assigned here, server-side.grpc_source_connector.py:88–96 · grpc_source_server.py:136–138values[] is the flat vector aligned to the layout from step 5/6. Every envelope field here is the agreed contract.SampleBatch/Sample/RobotPayloadsource_id to the owning connection.grpc_source_server.py:73–90Sample to a domain SourceSample and fans it out to every subscriber's bounded queue (256, latest-drop under backpressure).in_memory_source_session.py:173–206/sources/scan & /sources; open/subscribe are service-level, driven by the capture orchestration.)source_service.py:39 · in_memory_source_session.py:127–156 · source_http.py:43–65StreamSamples call (one data stream per session, multiplexing all sources by source_id).grpc_source_server.py:90session_id, source_id, timestamp_us, sequence, values and every message:
ClientReady, Describe, SourceDiscovered, ScanCompleted, OpenCommand, RobotLayout, RobotJointDescriptor,
RobotPayload, SampleBatch, Sample, StreamSamplesResponse) is defined in the committed proto
and the SDK's proto is byte-identical to it. The only thing we are proposing to add is the richer
layout descriptor (groups + per-dim name/unit/dtype/rotation_type/range), which is not in the proto yet.
Sources (all real, in this repo): module/foundation/proto/loop/foundation/source/v1/source_ingest.proto,
…/source/inbound/grpc_source_server.py, …/grpc_source_connector.py,
…/service/source_service.py, …/outbound/in_memory_source_session.py,
…/inbound/source_http.py, …/domain/robot.py.