Coverage for little_loops / transport.py: 88%
356 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:16 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:16 -0500
1"""Transport abstraction for the little-loops EventBus.
3A `Transport` is an additive sink for events emitted by `EventBus`. The Protocol
4is intentionally minimal — `send(event)` for delivery and `close()` for cleanup —
5so that new sinks can be added without modifying `EventBus` itself.
7Built-in implementations:
8 JsonlTransport: appends each event as a JSON line to a file.
9 UnixSocketTransport: streams newline-delimited JSON to AF_UNIX socket clients
10 for sub-second-latency local consumers (TUIs, log tailers, dashboards).
11 OTelTransport: maps loop executions to OpenTelemetry traces/spans, exporting
12 via OTLP to Grafana, Jaeger, Datadog, etc. Requires the optional
13 ``opentelemetry-sdk`` and ``opentelemetry-exporter-otlp-grpc`` packages.
14 WebhookTransport: POSTs batched events to an HTTP endpoint for remote
15 dashboards, Slack bots, and CI systems. Requires the optional ``httpx``
16 package (``pip install little-loops[webhooks]``).
17 SQLiteTransport: records FSM loop events into the per-project session
18 database (``.ll/session.db``) for indexed cross-cutting queries.
20Public exports:
21 Transport: runtime-checkable Protocol that any sink must satisfy
22 JsonlTransport: writes events to a JSONL file
23 UnixSocketTransport: streams events over an AF_UNIX socket
24 OTelTransport: exports loop traces via OTLP
25 WebhookTransport: POSTs batched events to an HTTP endpoint
26 wire_transports: register transports listed in `EventsConfig` on an `EventBus`
27"""
29from __future__ import annotations
31import json
32import logging
33import socket
34import threading
35import time
36from collections.abc import Callable
37from pathlib import Path
38from queue import Empty, Full, Queue
39from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
41if TYPE_CHECKING:
42 from little_loops.config.features import EventsConfig
43 from little_loops.events import EventBus
45logger = logging.getLogger(__name__)
47_CLIENT_QUEUE_MAXSIZE = 1024
48_DROP_LOG_INTERVAL_SEC = 5.0
49_ACCEPT_THREAD_JOIN_TIMEOUT = 2.0
50_CLIENT_THREAD_JOIN_TIMEOUT = 1.0
51_CLOSE_TOTAL_TIMEOUT = 10.0
52_ACCEPT_POLL_TIMEOUT = 1.0
53_CLIENT_QUEUE_POLL_TIMEOUT = 0.5
55_WEBHOOK_BATCH_MS_DEFAULT = 1000
56_WEBHOOK_CLOSE_TIMEOUT = 10.0
57_WEBHOOK_RETRY_BASE_S = 0.5
58_WEBHOOK_RETRY_MAX_S = 8.0
61@runtime_checkable
62class Transport(Protocol):
63 """Protocol for an event sink registered on an `EventBus`.
65 A transport receives every event emitted on the bus (no filtering at the
66 transport layer; subscribe an observer with a filter for that). Implementations
67 must tolerate being called with arbitrary `dict[str, Any]` shapes — the bus
68 does not validate event contents.
69 """
71 def send(self, event: dict[str, Any]) -> None:
72 """Deliver a single event."""
73 ...
75 def close(self) -> None:
76 """Release any resources held by the transport. May be a no-op."""
77 ...
80class JsonlTransport:
81 """Append events to a JSONL file, one JSON object per line.
83 The parent directory is created at construction time so per-event writes do
84 not have to check it. `close()` is a no-op since each `send()` opens and
85 closes the file (matching the existing JSONL write pattern in this codebase).
86 """
88 def __init__(self, path: Path) -> None:
89 self._path = path
90 self._path.parent.mkdir(parents=True, exist_ok=True)
92 def send(self, event: dict[str, Any]) -> None:
93 with open(self._path, "a", encoding="utf-8") as f:
94 f.write(json.dumps(event) + "\n")
96 def close(self) -> None:
97 return None
100class _SocketClient:
101 """Per-client state: connection, outbound queue, write thread, drop counters."""
103 def __init__(self, conn: socket.socket, addr: str) -> None:
104 self.conn = conn
105 self.addr = addr
106 self.queue: Queue[bytes] = Queue(maxsize=_CLIENT_QUEUE_MAXSIZE)
107 self.thread: threading.Thread | None = None
108 self.dropped_total = 0
109 self.dropped_since_log = 0
110 self.last_drop_log_ts = 0.0
111 self.first_drop_logged = False
114class UnixSocketTransport:
115 """Stream events as newline-delimited JSON over an `AF_UNIX` socket.
117 On construction, binds the socket at ``path`` (after unlinking any stale
118 file), starts an accept thread, and accepts up to ``max_clients`` concurrent
119 consumers. Each accepted client gets its own daemon thread and bounded
120 outbound queue; ``send()`` enqueues the serialized event into every client
121 queue without blocking. A full queue causes the newest event to be dropped
122 (preserving causal order) and a rate-limited warning is logged.
124 A misbehaving / disconnected client is removed from the pool without
125 affecting other clients or the FSM thread.
127 Not available on platforms without ``AF_UNIX`` (e.g. Windows). The platform
128 check lives in :func:`wire_transports` so the user-facing error has clearer
129 placement; constructing this class on a platform without ``AF_UNIX`` will
130 raise an `OSError` from `socket.socket` directly.
131 """
133 def __init__(
134 self,
135 path: Path,
136 max_clients: int = 8,
137 on_connect: Callable[[_SocketClient], None] | None = None,
138 ) -> None:
139 if not hasattr(socket, "AF_UNIX"):
140 raise RuntimeError(
141 "UnixSocketTransport requires AF_UNIX, which is not available on this platform"
142 )
144 self._path = path
145 self._max_clients = max_clients
146 self._on_connect = on_connect
147 self._shutdown = threading.Event()
148 self._clients: list[_SocketClient] = []
149 self._clients_lock = threading.Lock()
151 self._path.parent.mkdir(parents=True, exist_ok=True)
152 self._path.unlink(missing_ok=True)
154 self._server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155 try:
156 self._server.bind(str(self._path))
157 self._path.chmod(0o600)
158 self._server.listen(max_clients)
159 self._server.settimeout(_ACCEPT_POLL_TIMEOUT)
160 except Exception:
161 self._server.close()
162 self._path.unlink(missing_ok=True)
163 raise
165 self._accept_thread = threading.Thread(
166 target=self._accept_loop,
167 name="unix-socket-transport-accept",
168 daemon=True,
169 )
170 self._accept_thread.start()
172 def _accept_loop(self) -> None:
173 while not self._shutdown.is_set():
174 try:
175 conn, _ = self._server.accept()
176 except TimeoutError:
177 continue
178 except OSError:
179 # Server socket closed during shutdown — exit cleanly
180 return
182 with self._clients_lock:
183 if len(self._clients) >= self._max_clients:
184 logger.warning(
185 "UnixSocketTransport: rejecting client; max_clients=%d reached",
186 self._max_clients,
187 )
188 try:
189 conn.close()
190 except OSError:
191 pass
192 continue
193 client = _SocketClient(conn, addr=str(self._path))
194 client.thread = threading.Thread(
195 target=self._client_loop,
196 args=(client,),
197 name="unix-socket-transport-client",
198 daemon=True,
199 )
200 self._clients.append(client)
201 if self._on_connect is not None:
202 self._on_connect(client)
203 client.thread.start()
205 def _client_loop(self, client: _SocketClient) -> None:
206 try:
207 while not self._shutdown.is_set():
208 try:
209 payload = client.queue.get(timeout=_CLIENT_QUEUE_POLL_TIMEOUT)
210 except Empty:
211 continue
212 try:
213 client.conn.sendall(payload)
214 except OSError:
215 return
216 finally:
217 try:
218 client.conn.close()
219 except OSError:
220 pass
221 with self._clients_lock:
222 if client in self._clients:
223 self._clients.remove(client)
225 def send(self, event: dict[str, Any]) -> None:
226 payload = (json.dumps(event) + "\n").encode("utf-8")
227 with self._clients_lock:
228 snapshot = list(self._clients)
229 for client in snapshot:
230 try:
231 client.queue.put_nowait(payload)
232 except Full:
233 self._record_drop(client)
235 def _record_drop(self, client: _SocketClient) -> None:
236 client.dropped_total += 1
237 client.dropped_since_log += 1
238 now = time.monotonic()
239 if not client.first_drop_logged:
240 logger.warning(
241 "UnixSocketTransport: dropping events for slow client (queue full at %d)",
242 _CLIENT_QUEUE_MAXSIZE,
243 )
244 client.first_drop_logged = True
245 client.last_drop_log_ts = now
246 client.dropped_since_log = 0
247 return
248 if now - client.last_drop_log_ts >= _DROP_LOG_INTERVAL_SEC:
249 logger.warning(
250 "UnixSocketTransport: dropped %d events for slow client",
251 client.dropped_since_log,
252 )
253 client.last_drop_log_ts = now
254 client.dropped_since_log = 0
256 def close(self) -> None:
257 deadline = time.monotonic() + _CLOSE_TOTAL_TIMEOUT
258 self._shutdown.set()
260 accept_budget = min(_ACCEPT_THREAD_JOIN_TIMEOUT, max(0.0, deadline - time.monotonic()))
261 if self._accept_thread.is_alive():
262 self._accept_thread.join(timeout=accept_budget)
263 if self._accept_thread.is_alive():
264 logger.warning(
265 "UnixSocketTransport: accept thread did not exit within %.1fs",
266 accept_budget,
267 )
269 try:
270 self._server.close()
271 except OSError:
272 pass
274 with self._clients_lock:
275 snapshot = list(self._clients)
276 for client in snapshot:
277 try:
278 client.conn.shutdown(socket.SHUT_RDWR)
279 except OSError:
280 pass
281 t = client.thread
282 if t is not None and t.is_alive():
283 budget = min(_CLIENT_THREAD_JOIN_TIMEOUT, max(0.0, deadline - time.monotonic()))
284 t.join(timeout=budget)
285 if t.is_alive():
286 logger.warning(
287 "UnixSocketTransport: client thread did not exit within %.1fs",
288 budget,
289 )
291 self._path.unlink(missing_ok=True)
294_OTEL_EVENT_TYPES = frozenset(
295 {
296 "evaluate",
297 "route",
298 "retry_exhausted",
299 "cycle_detected",
300 "handoff_detected",
301 "handoff_spawned",
302 "action_output",
303 }
304)
305_OTEL_ERROR_OUTCOMES = frozenset({"error", "failed", "exhausted"})
308class OTelTransport:
309 """Map ll loop executions to OpenTelemetry traces and spans, exporting via OTLP.
311 Span hierarchy: loop = trace root, state = child span, action = grandchild.
312 Span events are added for evaluate, route, retry_exhausted, handoff_detected,
313 handoff_spawned, and action_output on the innermost open span.
315 Requires ``opentelemetry-sdk`` and ``opentelemetry-exporter-otlp-grpc``.
316 Install with: ``pip install 'little-loops[otel]'``
318 Sub-loop events (``depth > 0``) are no-ops with a single warning per session.
319 """
321 def __init__(
322 self,
323 endpoint: str = "http://localhost:4317",
324 service_name: str = "little-loops",
325 *,
326 _tracer_provider: Any | None = None,
327 ) -> None:
328 try:
329 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
330 from opentelemetry.sdk.resources import Resource
331 from opentelemetry.sdk.trace import TracerProvider
332 from opentelemetry.sdk.trace.export import BatchSpanProcessor
333 except ImportError as exc:
334 raise RuntimeError(
335 "OTelTransport requires the 'opentelemetry-sdk' and "
336 "'opentelemetry-exporter-otlp-grpc' packages. "
337 "Install with: pip install 'little-loops[otel]'"
338 ) from exc
340 if _tracer_provider is not None:
341 self._provider = _tracer_provider
342 else:
343 resource = Resource.create({"service.name": service_name})
344 provider = TracerProvider(resource=resource)
345 exporter = OTLPSpanExporter(endpoint=endpoint)
346 provider.add_span_processor(BatchSpanProcessor(exporter))
347 self._provider = provider
349 self._tracer = self._provider.get_tracer("little-loops")
350 self._loop_span: Any | None = None
351 self._state_span: Any | None = None
352 self._action_span: Any | None = None
353 self._subloop_warned = False
355 def send(self, event: dict[str, Any]) -> None:
356 depth = event.get("depth", 0)
357 if isinstance(depth, int) and depth > 0:
358 if not self._subloop_warned:
359 logger.warning(
360 "OTelTransport: sub-loop events (depth > 0) are not supported; "
361 "nested-trace support is deferred. Event type: %r",
362 event.get("event"),
363 )
364 self._subloop_warned = True
365 return
367 event_type = event.get("event", "")
368 if event_type == "loop_start":
369 self._handle_loop_start(event)
370 elif event_type == "loop_resume":
371 self._handle_loop_resume(event)
372 elif event_type == "state_enter":
373 self._handle_state_enter(event)
374 elif event_type == "action_start":
375 self._handle_action_start(event)
376 elif event_type == "action_complete":
377 self._handle_action_complete()
378 elif event_type == "loop_complete":
379 self._handle_loop_complete(event)
380 elif event_type in _OTEL_EVENT_TYPES:
381 self._add_span_event(event_type, event)
383 def close(self) -> None:
384 self._provider.force_flush()
385 self._provider.shutdown()
387 # ------------------------------------------------------------------
388 # Internal span machine
389 # ------------------------------------------------------------------
391 def _handle_loop_start(self, event: dict[str, Any]) -> None:
392 loop_name = str(event.get("loop_name", "ll-loop"))
393 self._loop_span = self._tracer.start_span(loop_name)
395 def _handle_loop_resume(self, event: dict[str, Any]) -> None:
396 self._close_state_and_action()
397 if self._loop_span is not None:
398 self._loop_span.end()
399 loop_name = str(event.get("loop_name", "ll-loop"))
400 self._loop_span = self._tracer.start_span(loop_name)
402 def _handle_state_enter(self, event: dict[str, Any]) -> None:
403 self._close_state_and_action()
404 if self._loop_span is None:
405 logger.warning(
406 "OTelTransport: state_enter received without a prior loop_start; skipping span"
407 )
408 return
409 from opentelemetry import trace
411 state_name = str(event.get("state", "unknown-state"))
412 ctx = trace.set_span_in_context(self._loop_span)
413 self._state_span = self._tracer.start_span(state_name, context=ctx)
415 def _handle_action_start(self, event: dict[str, Any]) -> None:
416 if self._state_span is None:
417 logger.warning(
418 "OTelTransport: action_start received without a prior state_enter; skipping span"
419 )
420 return
421 from opentelemetry import trace
423 action_name = str(event.get("action", "unknown-action"))
424 ctx = trace.set_span_in_context(self._state_span)
425 self._action_span = self._tracer.start_span(action_name, context=ctx)
427 def _handle_action_complete(self) -> None:
428 if self._action_span is not None:
429 self._action_span.end()
430 self._action_span = None
432 def _handle_loop_complete(self, event: dict[str, Any]) -> None:
433 from opentelemetry.trace import StatusCode
435 self._close_state_and_action()
436 if self._loop_span is None:
437 logger.warning(
438 "OTelTransport: loop_complete received without a prior loop_start; skipping"
439 )
440 return
441 outcome = str(event.get("outcome", ""))
442 if outcome in _OTEL_ERROR_OUTCOMES:
443 self._loop_span.set_status(StatusCode.ERROR, outcome)
444 else:
445 self._loop_span.set_status(StatusCode.OK)
446 self._loop_span.end()
447 self._loop_span = None
449 def _add_span_event(self, event_type: str, event: dict[str, Any]) -> None:
450 span = self._action_span or self._state_span or self._loop_span
451 if span is None:
452 return
453 attrs = {k: str(v) for k, v in event.items() if k != "event"}
454 span.add_event(event_type, attributes=attrs)
456 def _close_state_and_action(self) -> None:
457 if self._action_span is not None:
458 self._action_span.end()
459 self._action_span = None
460 if self._state_span is not None:
461 self._state_span.end()
462 self._state_span = None
465class WebhookTransport:
466 """POSTs batched FSM events to an HTTP endpoint.
468 Events are enqueued non-blocking in ``send()`` and flushed by a daemon
469 thread on a configurable interval. Failed POSTs are retried with
470 exponential backoff; after ``max_retries`` the batch is dropped with a
471 warning rather than raising to the caller.
473 Requires ``httpx``: ``pip install little-loops[webhooks]``.
474 """
476 def __init__(
477 self,
478 url: str,
479 batch_ms: int = _WEBHOOK_BATCH_MS_DEFAULT,
480 headers: dict[str, str] | None = None,
481 max_retries: int = 3,
482 ) -> None:
483 try:
484 import httpx as _httpx
485 except ImportError as exc:
486 raise RuntimeError(
487 "WebhookTransport requires httpx: pip install little-loops[webhooks]"
488 ) from exc
489 self._httpx = _httpx
490 self._url = url
491 self._batch_ms = batch_ms
492 self._headers = dict(headers) if headers else {}
493 self._max_retries = max_retries
494 self._queue: Queue[dict[str, Any]] = Queue()
495 self._shutdown = threading.Event()
496 self._thread = threading.Thread(target=self._batch_loop, daemon=True, name="webhook-batch")
497 self._thread.start()
499 def send(self, event: dict[str, Any]) -> None:
500 """Enqueue an event for the next batch flush (non-blocking)."""
501 if not self._shutdown.is_set():
502 self._queue.put(event)
504 def close(self) -> None:
505 """Signal shutdown, drain the queue with one final flush, and join the thread."""
506 self._shutdown.set()
507 self._thread.join(timeout=_WEBHOOK_CLOSE_TIMEOUT)
509 def _batch_loop(self) -> None:
510 while not self._shutdown.is_set():
511 self._shutdown.wait(timeout=self._batch_ms / 1000.0)
512 self._flush()
513 # One final drain after shutdown signal
514 self._flush()
516 def _flush(self) -> None:
517 events: list[dict[str, Any]] = []
518 while True:
519 try:
520 events.append(self._queue.get_nowait())
521 except Empty:
522 break
523 if not events:
524 return
525 self._post_with_retry(events)
527 def _post_with_retry(self, events: list[dict[str, Any]]) -> None:
528 payload = json.dumps(events).encode()
529 headers = {"Content-Type": "application/json", **self._headers}
530 backoff = _WEBHOOK_RETRY_BASE_S
531 for attempt in range(self._max_retries + 1):
532 try:
533 resp = self._httpx.post(self._url, content=payload, headers=headers, timeout=10.0)
534 if resp.status_code < 500:
535 return
536 except Exception:
537 pass
538 if attempt < self._max_retries:
539 time.sleep(backoff)
540 backoff = min(backoff * 2, _WEBHOOK_RETRY_MAX_S)
541 logger.warning(
542 "WebhookTransport: giving up after %d retries posting to %r",
543 self._max_retries,
544 self._url,
545 )
548def _make_seed_callback() -> Callable[[_SocketClient], None]:
549 """Return an on_connect callback that seeds a new client with current running loop state."""
550 from little_loops.fsm.persistence import list_running_loops
552 def _seed(client: _SocketClient) -> None:
553 for state in list_running_loops(Path(".loops")):
554 event = {"event": "state_change", **state.to_dict()}
555 payload = (json.dumps(event) + "\n").encode("utf-8")
556 try:
557 client.queue.put_nowait(payload)
558 except Full:
559 pass
561 return _seed
564_TRANSPORT_REGISTRY: dict[str, str] = {
565 "jsonl": "jsonl",
566 "otel": "otel",
567 "socket": "socket",
568 "sqlite": "sqlite",
569 "webhook": "webhook",
570}
573def wire_transports(
574 bus: EventBus,
575 config: EventsConfig,
576 log_dir: Path | None = None,
577) -> None:
578 """Register transports named in `config.transports` on `bus`.
580 Unknown transport names log a warning and are skipped (rather than raising)
581 so that a typo in user config does not prevent the loop from starting. The
582 one exception is the ``socket`` transport on platforms without ``AF_UNIX``,
583 which raises a `RuntimeError` so the user is told why their config does
584 not work — silently dropping the requested transport on Windows would be a
585 more confusing failure mode.
587 Args:
588 bus: EventBus to register transports on
589 config: EventsConfig holding the list of transport names to wire up
590 log_dir: Directory under which built-in transports place their log files.
591 Defaults to ``.ll`` under the current working directory.
592 """
593 base = log_dir if log_dir is not None else Path(".ll")
594 for name in config.transports:
595 if name not in _TRANSPORT_REGISTRY:
596 logger.warning("Unknown transport %r; skipping", name)
597 continue
598 if name == "jsonl":
599 bus.add_transport(JsonlTransport(base / "events.jsonl"))
600 elif name == "otel":
601 bus.add_transport(
602 OTelTransport(
603 endpoint=config.otel.endpoint,
604 service_name=config.otel.service_name,
605 )
606 )
607 elif name == "socket":
608 if not hasattr(socket, "AF_UNIX"):
609 raise RuntimeError(
610 "UnixSocketTransport requires AF_UNIX, which is not available on this "
611 'platform (e.g. Windows). Remove "socket" from events.transports or '
612 'use a different transport such as "jsonl".'
613 )
614 resolved = _resolve_socket_path(config.socket.path, base)
615 bus.add_transport(
616 UnixSocketTransport(
617 resolved,
618 config.socket.max_clients,
619 on_connect=_make_seed_callback(),
620 )
621 )
622 elif name == "sqlite":
623 from little_loops.session_store import SQLiteTransport
625 bus.add_transport(SQLiteTransport(base / "session.db"))
626 elif name == "webhook":
627 if config.webhook.url is None:
628 logger.warning("WebhookTransport: events.webhook.url is None; skipping")
629 continue
630 bus.add_transport(
631 WebhookTransport(
632 url=config.webhook.url,
633 batch_ms=config.webhook.batch_ms,
634 headers=config.webhook.headers,
635 max_retries=3,
636 )
637 )
640def _resolve_socket_path(configured: str, base: Path) -> Path:
641 """Resolve a configured socket path against the per-call log_dir.
643 The default config value is ``.ll/events.sock``. When `wire_transports` is
644 given a custom ``log_dir`` (e.g. a tmp dir in tests), the socket should land
645 inside that directory rather than literally at ``.ll/events.sock`` on disk.
646 Mirroring the JsonlTransport behaviour: strip the ``.ll/`` prefix and treat
647 the remainder as relative to ``base``. Absolute paths are honored as-is.
648 """
649 p = Path(configured)
650 if p.is_absolute():
651 return p
652 if p.parts and p.parts[0] == ".ll":
653 return base.joinpath(*p.parts[1:])
654 return base / p