Coverage for little_loops / transport.py: 88%

356 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:16 -0500

1"""Transport abstraction for the little-loops EventBus. 

2 

3A `Transport` is an additive sink for events emitted by `EventBus`. The Protocol 

4is intentionally minimal — `send(event)` for delivery and `close()` for cleanup — 

5so that new sinks can be added without modifying `EventBus` itself. 

6 

7Built-in implementations: 

8 JsonlTransport: appends each event as a JSON line to a file. 

9 UnixSocketTransport: streams newline-delimited JSON to AF_UNIX socket clients 

10 for sub-second-latency local consumers (TUIs, log tailers, dashboards). 

11 OTelTransport: maps loop executions to OpenTelemetry traces/spans, exporting 

12 via OTLP to Grafana, Jaeger, Datadog, etc. Requires the optional 

13 ``opentelemetry-sdk`` and ``opentelemetry-exporter-otlp-grpc`` packages. 

14 WebhookTransport: POSTs batched events to an HTTP endpoint for remote 

15 dashboards, Slack bots, and CI systems. Requires the optional ``httpx`` 

16 package (``pip install little-loops[webhooks]``). 

17 SQLiteTransport: records FSM loop events into the per-project session 

18 database (``.ll/session.db``) for indexed cross-cutting queries. 

19 

20Public exports: 

21 Transport: runtime-checkable Protocol that any sink must satisfy 

22 JsonlTransport: writes events to a JSONL file 

23 UnixSocketTransport: streams events over an AF_UNIX socket 

24 OTelTransport: exports loop traces via OTLP 

25 WebhookTransport: POSTs batched events to an HTTP endpoint 

26 wire_transports: register transports listed in `EventsConfig` on an `EventBus` 

27""" 

28 

29from __future__ import annotations 

30 

31import json 

32import logging 

33import socket 

34import threading 

35import time 

36from collections.abc import Callable 

37from pathlib import Path 

38from queue import Empty, Full, Queue 

39from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable 

40 

41if TYPE_CHECKING: 

42 from little_loops.config.features import EventsConfig 

43 from little_loops.events import EventBus 

44 

45logger = logging.getLogger(__name__) 

46 

47_CLIENT_QUEUE_MAXSIZE = 1024 

48_DROP_LOG_INTERVAL_SEC = 5.0 

49_ACCEPT_THREAD_JOIN_TIMEOUT = 2.0 

50_CLIENT_THREAD_JOIN_TIMEOUT = 1.0 

51_CLOSE_TOTAL_TIMEOUT = 10.0 

52_ACCEPT_POLL_TIMEOUT = 1.0 

53_CLIENT_QUEUE_POLL_TIMEOUT = 0.5 

54 

55_WEBHOOK_BATCH_MS_DEFAULT = 1000 

56_WEBHOOK_CLOSE_TIMEOUT = 10.0 

57_WEBHOOK_RETRY_BASE_S = 0.5 

58_WEBHOOK_RETRY_MAX_S = 8.0 

59 

60 

61@runtime_checkable 

62class Transport(Protocol): 

63 """Protocol for an event sink registered on an `EventBus`. 

64 

65 A transport receives every event emitted on the bus (no filtering at the 

66 transport layer; subscribe an observer with a filter for that). Implementations 

67 must tolerate being called with arbitrary `dict[str, Any]` shapes — the bus 

68 does not validate event contents. 

69 """ 

70 

71 def send(self, event: dict[str, Any]) -> None: 

72 """Deliver a single event.""" 

73 ... 

74 

75 def close(self) -> None: 

76 """Release any resources held by the transport. May be a no-op.""" 

77 ... 

78 

79 

80class JsonlTransport: 

81 """Append events to a JSONL file, one JSON object per line. 

82 

83 The parent directory is created at construction time so per-event writes do 

84 not have to check it. `close()` is a no-op since each `send()` opens and 

85 closes the file (matching the existing JSONL write pattern in this codebase). 

86 """ 

87 

88 def __init__(self, path: Path) -> None: 

89 self._path = path 

90 self._path.parent.mkdir(parents=True, exist_ok=True) 

91 

92 def send(self, event: dict[str, Any]) -> None: 

93 with open(self._path, "a", encoding="utf-8") as f: 

94 f.write(json.dumps(event) + "\n") 

95 

96 def close(self) -> None: 

97 return None 

98 

99 

100class _SocketClient: 

101 """Per-client state: connection, outbound queue, write thread, drop counters.""" 

102 

103 def __init__(self, conn: socket.socket, addr: str) -> None: 

104 self.conn = conn 

105 self.addr = addr 

106 self.queue: Queue[bytes] = Queue(maxsize=_CLIENT_QUEUE_MAXSIZE) 

107 self.thread: threading.Thread | None = None 

108 self.dropped_total = 0 

109 self.dropped_since_log = 0 

110 self.last_drop_log_ts = 0.0 

111 self.first_drop_logged = False 

112 

113 

114class UnixSocketTransport: 

115 """Stream events as newline-delimited JSON over an `AF_UNIX` socket. 

116 

117 On construction, binds the socket at ``path`` (after unlinking any stale 

118 file), starts an accept thread, and accepts up to ``max_clients`` concurrent 

119 consumers. Each accepted client gets its own daemon thread and bounded 

120 outbound queue; ``send()`` enqueues the serialized event into every client 

121 queue without blocking. A full queue causes the newest event to be dropped 

122 (preserving causal order) and a rate-limited warning is logged. 

123 

124 A misbehaving / disconnected client is removed from the pool without 

125 affecting other clients or the FSM thread. 

126 

127 Not available on platforms without ``AF_UNIX`` (e.g. Windows). The platform 

128 check lives in :func:`wire_transports` so the user-facing error has clearer 

129 placement; constructing this class on a platform without ``AF_UNIX`` will 

130 raise an `OSError` from `socket.socket` directly. 

131 """ 

132 

133 def __init__( 

134 self, 

135 path: Path, 

136 max_clients: int = 8, 

137 on_connect: Callable[[_SocketClient], None] | None = None, 

138 ) -> None: 

139 if not hasattr(socket, "AF_UNIX"): 

140 raise RuntimeError( 

141 "UnixSocketTransport requires AF_UNIX, which is not available on this platform" 

142 ) 

143 

144 self._path = path 

145 self._max_clients = max_clients 

146 self._on_connect = on_connect 

147 self._shutdown = threading.Event() 

148 self._clients: list[_SocketClient] = [] 

149 self._clients_lock = threading.Lock() 

150 

151 self._path.parent.mkdir(parents=True, exist_ok=True) 

152 self._path.unlink(missing_ok=True) 

153 

154 self._server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

155 try: 

156 self._server.bind(str(self._path)) 

157 self._path.chmod(0o600) 

158 self._server.listen(max_clients) 

159 self._server.settimeout(_ACCEPT_POLL_TIMEOUT) 

160 except Exception: 

161 self._server.close() 

162 self._path.unlink(missing_ok=True) 

163 raise 

164 

165 self._accept_thread = threading.Thread( 

166 target=self._accept_loop, 

167 name="unix-socket-transport-accept", 

168 daemon=True, 

169 ) 

170 self._accept_thread.start() 

171 

172 def _accept_loop(self) -> None: 

173 while not self._shutdown.is_set(): 

174 try: 

175 conn, _ = self._server.accept() 

176 except TimeoutError: 

177 continue 

178 except OSError: 

179 # Server socket closed during shutdown — exit cleanly 

180 return 

181 

182 with self._clients_lock: 

183 if len(self._clients) >= self._max_clients: 

184 logger.warning( 

185 "UnixSocketTransport: rejecting client; max_clients=%d reached", 

186 self._max_clients, 

187 ) 

188 try: 

189 conn.close() 

190 except OSError: 

191 pass 

192 continue 

193 client = _SocketClient(conn, addr=str(self._path)) 

194 client.thread = threading.Thread( 

195 target=self._client_loop, 

196 args=(client,), 

197 name="unix-socket-transport-client", 

198 daemon=True, 

199 ) 

200 self._clients.append(client) 

201 if self._on_connect is not None: 

202 self._on_connect(client) 

203 client.thread.start() 

204 

205 def _client_loop(self, client: _SocketClient) -> None: 

206 try: 

207 while not self._shutdown.is_set(): 

208 try: 

209 payload = client.queue.get(timeout=_CLIENT_QUEUE_POLL_TIMEOUT) 

210 except Empty: 

211 continue 

212 try: 

213 client.conn.sendall(payload) 

214 except OSError: 

215 return 

216 finally: 

217 try: 

218 client.conn.close() 

219 except OSError: 

220 pass 

221 with self._clients_lock: 

222 if client in self._clients: 

223 self._clients.remove(client) 

224 

225 def send(self, event: dict[str, Any]) -> None: 

226 payload = (json.dumps(event) + "\n").encode("utf-8") 

227 with self._clients_lock: 

228 snapshot = list(self._clients) 

229 for client in snapshot: 

230 try: 

231 client.queue.put_nowait(payload) 

232 except Full: 

233 self._record_drop(client) 

234 

235 def _record_drop(self, client: _SocketClient) -> None: 

236 client.dropped_total += 1 

237 client.dropped_since_log += 1 

238 now = time.monotonic() 

239 if not client.first_drop_logged: 

240 logger.warning( 

241 "UnixSocketTransport: dropping events for slow client (queue full at %d)", 

242 _CLIENT_QUEUE_MAXSIZE, 

243 ) 

244 client.first_drop_logged = True 

245 client.last_drop_log_ts = now 

246 client.dropped_since_log = 0 

247 return 

248 if now - client.last_drop_log_ts >= _DROP_LOG_INTERVAL_SEC: 

249 logger.warning( 

250 "UnixSocketTransport: dropped %d events for slow client", 

251 client.dropped_since_log, 

252 ) 

253 client.last_drop_log_ts = now 

254 client.dropped_since_log = 0 

255 

256 def close(self) -> None: 

257 deadline = time.monotonic() + _CLOSE_TOTAL_TIMEOUT 

258 self._shutdown.set() 

259 

260 accept_budget = min(_ACCEPT_THREAD_JOIN_TIMEOUT, max(0.0, deadline - time.monotonic())) 

261 if self._accept_thread.is_alive(): 

262 self._accept_thread.join(timeout=accept_budget) 

263 if self._accept_thread.is_alive(): 

264 logger.warning( 

265 "UnixSocketTransport: accept thread did not exit within %.1fs", 

266 accept_budget, 

267 ) 

268 

269 try: 

270 self._server.close() 

271 except OSError: 

272 pass 

273 

274 with self._clients_lock: 

275 snapshot = list(self._clients) 

276 for client in snapshot: 

277 try: 

278 client.conn.shutdown(socket.SHUT_RDWR) 

279 except OSError: 

280 pass 

281 t = client.thread 

282 if t is not None and t.is_alive(): 

283 budget = min(_CLIENT_THREAD_JOIN_TIMEOUT, max(0.0, deadline - time.monotonic())) 

284 t.join(timeout=budget) 

285 if t.is_alive(): 

286 logger.warning( 

287 "UnixSocketTransport: client thread did not exit within %.1fs", 

288 budget, 

289 ) 

290 

291 self._path.unlink(missing_ok=True) 

292 

293 

294_OTEL_EVENT_TYPES = frozenset( 

295 { 

296 "evaluate", 

297 "route", 

298 "retry_exhausted", 

299 "cycle_detected", 

300 "handoff_detected", 

301 "handoff_spawned", 

302 "action_output", 

303 } 

304) 

305_OTEL_ERROR_OUTCOMES = frozenset({"error", "failed", "exhausted"}) 

306 

307 

308class OTelTransport: 

309 """Map ll loop executions to OpenTelemetry traces and spans, exporting via OTLP. 

310 

311 Span hierarchy: loop = trace root, state = child span, action = grandchild. 

312 Span events are added for evaluate, route, retry_exhausted, handoff_detected, 

313 handoff_spawned, and action_output on the innermost open span. 

314 

315 Requires ``opentelemetry-sdk`` and ``opentelemetry-exporter-otlp-grpc``. 

316 Install with: ``pip install 'little-loops[otel]'`` 

317 

318 Sub-loop events (``depth > 0``) are no-ops with a single warning per session. 

319 """ 

320 

321 def __init__( 

322 self, 

323 endpoint: str = "http://localhost:4317", 

324 service_name: str = "little-loops", 

325 *, 

326 _tracer_provider: Any | None = None, 

327 ) -> None: 

328 try: 

329 from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter 

330 from opentelemetry.sdk.resources import Resource 

331 from opentelemetry.sdk.trace import TracerProvider 

332 from opentelemetry.sdk.trace.export import BatchSpanProcessor 

333 except ImportError as exc: 

334 raise RuntimeError( 

335 "OTelTransport requires the 'opentelemetry-sdk' and " 

336 "'opentelemetry-exporter-otlp-grpc' packages. " 

337 "Install with: pip install 'little-loops[otel]'" 

338 ) from exc 

339 

340 if _tracer_provider is not None: 

341 self._provider = _tracer_provider 

342 else: 

343 resource = Resource.create({"service.name": service_name}) 

344 provider = TracerProvider(resource=resource) 

345 exporter = OTLPSpanExporter(endpoint=endpoint) 

346 provider.add_span_processor(BatchSpanProcessor(exporter)) 

347 self._provider = provider 

348 

349 self._tracer = self._provider.get_tracer("little-loops") 

350 self._loop_span: Any | None = None 

351 self._state_span: Any | None = None 

352 self._action_span: Any | None = None 

353 self._subloop_warned = False 

354 

355 def send(self, event: dict[str, Any]) -> None: 

356 depth = event.get("depth", 0) 

357 if isinstance(depth, int) and depth > 0: 

358 if not self._subloop_warned: 

359 logger.warning( 

360 "OTelTransport: sub-loop events (depth > 0) are not supported; " 

361 "nested-trace support is deferred. Event type: %r", 

362 event.get("event"), 

363 ) 

364 self._subloop_warned = True 

365 return 

366 

367 event_type = event.get("event", "") 

368 if event_type == "loop_start": 

369 self._handle_loop_start(event) 

370 elif event_type == "loop_resume": 

371 self._handle_loop_resume(event) 

372 elif event_type == "state_enter": 

373 self._handle_state_enter(event) 

374 elif event_type == "action_start": 

375 self._handle_action_start(event) 

376 elif event_type == "action_complete": 

377 self._handle_action_complete() 

378 elif event_type == "loop_complete": 

379 self._handle_loop_complete(event) 

380 elif event_type in _OTEL_EVENT_TYPES: 

381 self._add_span_event(event_type, event) 

382 

383 def close(self) -> None: 

384 self._provider.force_flush() 

385 self._provider.shutdown() 

386 

387 # ------------------------------------------------------------------ 

388 # Internal span machine 

389 # ------------------------------------------------------------------ 

390 

391 def _handle_loop_start(self, event: dict[str, Any]) -> None: 

392 loop_name = str(event.get("loop_name", "ll-loop")) 

393 self._loop_span = self._tracer.start_span(loop_name) 

394 

395 def _handle_loop_resume(self, event: dict[str, Any]) -> None: 

396 self._close_state_and_action() 

397 if self._loop_span is not None: 

398 self._loop_span.end() 

399 loop_name = str(event.get("loop_name", "ll-loop")) 

400 self._loop_span = self._tracer.start_span(loop_name) 

401 

402 def _handle_state_enter(self, event: dict[str, Any]) -> None: 

403 self._close_state_and_action() 

404 if self._loop_span is None: 

405 logger.warning( 

406 "OTelTransport: state_enter received without a prior loop_start; skipping span" 

407 ) 

408 return 

409 from opentelemetry import trace 

410 

411 state_name = str(event.get("state", "unknown-state")) 

412 ctx = trace.set_span_in_context(self._loop_span) 

413 self._state_span = self._tracer.start_span(state_name, context=ctx) 

414 

415 def _handle_action_start(self, event: dict[str, Any]) -> None: 

416 if self._state_span is None: 

417 logger.warning( 

418 "OTelTransport: action_start received without a prior state_enter; skipping span" 

419 ) 

420 return 

421 from opentelemetry import trace 

422 

423 action_name = str(event.get("action", "unknown-action")) 

424 ctx = trace.set_span_in_context(self._state_span) 

425 self._action_span = self._tracer.start_span(action_name, context=ctx) 

426 

427 def _handle_action_complete(self) -> None: 

428 if self._action_span is not None: 

429 self._action_span.end() 

430 self._action_span = None 

431 

432 def _handle_loop_complete(self, event: dict[str, Any]) -> None: 

433 from opentelemetry.trace import StatusCode 

434 

435 self._close_state_and_action() 

436 if self._loop_span is None: 

437 logger.warning( 

438 "OTelTransport: loop_complete received without a prior loop_start; skipping" 

439 ) 

440 return 

441 outcome = str(event.get("outcome", "")) 

442 if outcome in _OTEL_ERROR_OUTCOMES: 

443 self._loop_span.set_status(StatusCode.ERROR, outcome) 

444 else: 

445 self._loop_span.set_status(StatusCode.OK) 

446 self._loop_span.end() 

447 self._loop_span = None 

448 

449 def _add_span_event(self, event_type: str, event: dict[str, Any]) -> None: 

450 span = self._action_span or self._state_span or self._loop_span 

451 if span is None: 

452 return 

453 attrs = {k: str(v) for k, v in event.items() if k != "event"} 

454 span.add_event(event_type, attributes=attrs) 

455 

456 def _close_state_and_action(self) -> None: 

457 if self._action_span is not None: 

458 self._action_span.end() 

459 self._action_span = None 

460 if self._state_span is not None: 

461 self._state_span.end() 

462 self._state_span = None 

463 

464 

465class WebhookTransport: 

466 """POSTs batched FSM events to an HTTP endpoint. 

467 

468 Events are enqueued non-blocking in ``send()`` and flushed by a daemon 

469 thread on a configurable interval. Failed POSTs are retried with 

470 exponential backoff; after ``max_retries`` the batch is dropped with a 

471 warning rather than raising to the caller. 

472 

473 Requires ``httpx``: ``pip install little-loops[webhooks]``. 

474 """ 

475 

476 def __init__( 

477 self, 

478 url: str, 

479 batch_ms: int = _WEBHOOK_BATCH_MS_DEFAULT, 

480 headers: dict[str, str] | None = None, 

481 max_retries: int = 3, 

482 ) -> None: 

483 try: 

484 import httpx as _httpx 

485 except ImportError as exc: 

486 raise RuntimeError( 

487 "WebhookTransport requires httpx: pip install little-loops[webhooks]" 

488 ) from exc 

489 self._httpx = _httpx 

490 self._url = url 

491 self._batch_ms = batch_ms 

492 self._headers = dict(headers) if headers else {} 

493 self._max_retries = max_retries 

494 self._queue: Queue[dict[str, Any]] = Queue() 

495 self._shutdown = threading.Event() 

496 self._thread = threading.Thread(target=self._batch_loop, daemon=True, name="webhook-batch") 

497 self._thread.start() 

498 

499 def send(self, event: dict[str, Any]) -> None: 

500 """Enqueue an event for the next batch flush (non-blocking).""" 

501 if not self._shutdown.is_set(): 

502 self._queue.put(event) 

503 

504 def close(self) -> None: 

505 """Signal shutdown, drain the queue with one final flush, and join the thread.""" 

506 self._shutdown.set() 

507 self._thread.join(timeout=_WEBHOOK_CLOSE_TIMEOUT) 

508 

509 def _batch_loop(self) -> None: 

510 while not self._shutdown.is_set(): 

511 self._shutdown.wait(timeout=self._batch_ms / 1000.0) 

512 self._flush() 

513 # One final drain after shutdown signal 

514 self._flush() 

515 

516 def _flush(self) -> None: 

517 events: list[dict[str, Any]] = [] 

518 while True: 

519 try: 

520 events.append(self._queue.get_nowait()) 

521 except Empty: 

522 break 

523 if not events: 

524 return 

525 self._post_with_retry(events) 

526 

527 def _post_with_retry(self, events: list[dict[str, Any]]) -> None: 

528 payload = json.dumps(events).encode() 

529 headers = {"Content-Type": "application/json", **self._headers} 

530 backoff = _WEBHOOK_RETRY_BASE_S 

531 for attempt in range(self._max_retries + 1): 

532 try: 

533 resp = self._httpx.post(self._url, content=payload, headers=headers, timeout=10.0) 

534 if resp.status_code < 500: 

535 return 

536 except Exception: 

537 pass 

538 if attempt < self._max_retries: 

539 time.sleep(backoff) 

540 backoff = min(backoff * 2, _WEBHOOK_RETRY_MAX_S) 

541 logger.warning( 

542 "WebhookTransport: giving up after %d retries posting to %r", 

543 self._max_retries, 

544 self._url, 

545 ) 

546 

547 

548def _make_seed_callback() -> Callable[[_SocketClient], None]: 

549 """Return an on_connect callback that seeds a new client with current running loop state.""" 

550 from little_loops.fsm.persistence import list_running_loops 

551 

552 def _seed(client: _SocketClient) -> None: 

553 for state in list_running_loops(Path(".loops")): 

554 event = {"event": "state_change", **state.to_dict()} 

555 payload = (json.dumps(event) + "\n").encode("utf-8") 

556 try: 

557 client.queue.put_nowait(payload) 

558 except Full: 

559 pass 

560 

561 return _seed 

562 

563 

564_TRANSPORT_REGISTRY: dict[str, str] = { 

565 "jsonl": "jsonl", 

566 "otel": "otel", 

567 "socket": "socket", 

568 "sqlite": "sqlite", 

569 "webhook": "webhook", 

570} 

571 

572 

573def wire_transports( 

574 bus: EventBus, 

575 config: EventsConfig, 

576 log_dir: Path | None = None, 

577) -> None: 

578 """Register transports named in `config.transports` on `bus`. 

579 

580 Unknown transport names log a warning and are skipped (rather than raising) 

581 so that a typo in user config does not prevent the loop from starting. The 

582 one exception is the ``socket`` transport on platforms without ``AF_UNIX``, 

583 which raises a `RuntimeError` so the user is told why their config does 

584 not work — silently dropping the requested transport on Windows would be a 

585 more confusing failure mode. 

586 

587 Args: 

588 bus: EventBus to register transports on 

589 config: EventsConfig holding the list of transport names to wire up 

590 log_dir: Directory under which built-in transports place their log files. 

591 Defaults to ``.ll`` under the current working directory. 

592 """ 

593 base = log_dir if log_dir is not None else Path(".ll") 

594 for name in config.transports: 

595 if name not in _TRANSPORT_REGISTRY: 

596 logger.warning("Unknown transport %r; skipping", name) 

597 continue 

598 if name == "jsonl": 

599 bus.add_transport(JsonlTransport(base / "events.jsonl")) 

600 elif name == "otel": 

601 bus.add_transport( 

602 OTelTransport( 

603 endpoint=config.otel.endpoint, 

604 service_name=config.otel.service_name, 

605 ) 

606 ) 

607 elif name == "socket": 

608 if not hasattr(socket, "AF_UNIX"): 

609 raise RuntimeError( 

610 "UnixSocketTransport requires AF_UNIX, which is not available on this " 

611 'platform (e.g. Windows). Remove "socket" from events.transports or ' 

612 'use a different transport such as "jsonl".' 

613 ) 

614 resolved = _resolve_socket_path(config.socket.path, base) 

615 bus.add_transport( 

616 UnixSocketTransport( 

617 resolved, 

618 config.socket.max_clients, 

619 on_connect=_make_seed_callback(), 

620 ) 

621 ) 

622 elif name == "sqlite": 

623 from little_loops.session_store import SQLiteTransport 

624 

625 bus.add_transport(SQLiteTransport(base / "session.db")) 

626 elif name == "webhook": 

627 if config.webhook.url is None: 

628 logger.warning("WebhookTransport: events.webhook.url is None; skipping") 

629 continue 

630 bus.add_transport( 

631 WebhookTransport( 

632 url=config.webhook.url, 

633 batch_ms=config.webhook.batch_ms, 

634 headers=config.webhook.headers, 

635 max_retries=3, 

636 ) 

637 ) 

638 

639 

640def _resolve_socket_path(configured: str, base: Path) -> Path: 

641 """Resolve a configured socket path against the per-call log_dir. 

642 

643 The default config value is ``.ll/events.sock``. When `wire_transports` is 

644 given a custom ``log_dir`` (e.g. a tmp dir in tests), the socket should land 

645 inside that directory rather than literally at ``.ll/events.sock`` on disk. 

646 Mirroring the JsonlTransport behaviour: strip the ``.ll/`` prefix and treat 

647 the remainder as relative to ``base``. Absolute paths are honored as-is. 

648 """ 

649 p = Path(configured) 

650 if p.is_absolute(): 

651 return p 

652 if p.parts and p.parts[0] == ".ll": 

653 return base.joinpath(*p.parts[1:]) 

654 return base / p