Coverage for python / weflayr / sdk / helpers.py: 100%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 15:28 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-23 15:28 +0100
1"""Weflayr SDK — provider-agnostic telemetry helpers.
3This module contains the generic building blocks shared by every connector.
4Individual connectors (e.g. ``weflayr.sdk.mistralai``) import from here and
5only implement the provider-specific bits.
7Available helpers
8-----------------
9- :func:`post_sync` — fire-and-forget HTTP POST (sync)
10- :func:`post_async` — fire-and-forget HTTP POST (async)
11- :func:`track_sync` — wrap a sync call with before/after/error telemetry
12- :func:`track_async` — wrap an async call with before/after/error telemetry
13"""
15from __future__ import annotations
17import os
18import threading
19import time
20import uuid
21from typing import Any
23import httpx
25INTAKE_URL: str = os.environ.get("WEFLAYR_INTAKE_URL", "http://127.0.0.1:8123")
26"""Default intake API URL, overridden by the ``WEFLAYR_INTAKE_URL`` env var."""
28CLIENT_ID: str = os.environ.get("WEFLAYR_CLIENT_ID", "unknown_client")
29"""Default client ID, overridden by the ``WEFLAYR_CLIENT_ID`` env var."""
31CLIENT_SECRET: str = os.environ.get("WEFLAYR_CLIENT_SECRET", "")
32"""Default bearer token, overridden by the ``WEFLAYR_CLIENT_SECRET`` env var."""
35def _build_url(base_url: str, client_id: str) -> str:
36 """Build the intake endpoint URL for a given client.
38 Args:
39 base_url: Intake API base URL (e.g. ``"http://127.0.0.1:8000"``).
40 client_id: Client identifier included in the path.
42 Returns:
43 ``"{base_url}/{client_id}/"``
44 """
45 return f"{base_url.rstrip('/')}/{client_id}/"
48def _auth_headers(bearer_token: str) -> dict:
49 """Build the Authorization header dict.
51 Args:
52 bearer_token: The bearer token to include.
54 Returns:
55 ``{"Authorization": "Bearer {bearer_token}"}``
56 """
57 return {"Authorization": f"Bearer {bearer_token}"}
60def post_sync(url: str, payload: dict, headers: dict | None = None) -> None:
61 """POST *payload* to *url* in a background daemon thread.
63 Returns immediately — never blocks the caller or raises exceptions.
65 Args:
66 url: HTTP endpoint to POST to.
67 payload: JSON-serialisable dict sent as the request body.
68 headers: Optional HTTP headers (e.g. Authorization).
69 """
70 def _send() -> None:
71 try:
72 with httpx.Client(timeout=5.0) as client:
73 client.post(url, json=payload, headers=headers or {})
74 except Exception:
75 pass
77 threading.Thread(target=_send, daemon=True).start()
80async def post_async(url: str, payload: dict, headers: dict | None = None) -> None:
81 """Await a POST of *payload* to *url*, swallowing all exceptions.
83 Args:
84 url: HTTP endpoint to POST to.
85 payload: JSON-serialisable dict sent as the request body.
86 headers: Optional HTTP headers (e.g. Authorization).
87 """
88 try:
89 async with httpx.AsyncClient(timeout=5.0) as client:
90 await client.post(url, json=payload, headers=headers or {})
91 except Exception:
92 pass
95def _error_payload(exc: Exception) -> dict:
96 """Extract a provider-agnostic error dict from any exception.
98 Args:
99 exc: The exception raised by the provider SDK.
101 Returns:
102 A dict with ``error_type``, ``error_message``, and ``status_code``.
103 """
104 return {
105 "error_type": type(exc).__name__,
106 "error_message": str(exc),
107 "status_code": getattr(exc, "status_code", None),
108 }
111def track_sync(
112 url: str,
113 call: str,
114 before: dict,
115 fn,
116 after_extra,
117 *,
118 client_id: str = CLIENT_ID,
119 bearer_token: str = CLIENT_SECRET,
120) -> Any:
121 """Wrap a synchronous call with before/after/error telemetry events.
123 Builds ``{url}/{client_id}/`` as the endpoint and sends
124 ``Authorization: Bearer {bearer_token}`` on every request.
126 Events emitted:
128 - ``<call>.before`` — always, before the provider call
129 - ``<call>.after`` — on success, with timing and token usage
130 - ``<call>.error`` — on failure, with timing and error details;
131 the original exception is always re-raised after the event is sent
133 Args:
134 url: Intake API base URL.
135 call: Event name prefix (e.g. ``"chat.complete"``).
136 before: Extra fields merged into every emitted payload.
137 fn: Zero-argument callable that performs the actual provider call.
138 after_extra: Callable ``(response) -> dict`` returning fields to merge
139 into the *after* payload (e.g. token counts).
140 client_id: Client identifier used in the endpoint path.
141 bearer_token: Bearer token sent in the Authorization header.
143 Returns:
144 The value returned by *fn*, unmodified.
146 Raises:
147 Exception: Whatever *fn* raises, unmodified.
148 """
149 endpoint = _build_url(url, client_id)
150 headers = _auth_headers(bearer_token)
151 request_id = str(uuid.uuid4())
153 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.before", **before}, headers)
155 start = time.perf_counter()
156 try:
157 response = fn()
158 except Exception as exc:
159 elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
160 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.error",
161 **before, "elapsed_ms": elapsed_ms, **_error_payload(exc)}, headers)
162 raise
164 elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
165 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.after",
166 **before, "elapsed_ms": elapsed_ms, **after_extra(response)}, headers)
167 return response
170async def track_async(
171 url: str,
172 call: str,
173 before: dict,
174 fn,
175 after_extra,
176 *,
177 client_id: str = CLIENT_ID,
178 bearer_token: str = CLIENT_SECRET,
179) -> Any:
180 """Wrap an async call with before/after/error telemetry events.
182 Async equivalent of :func:`track_sync`.
184 Args:
185 url: Intake API base URL.
186 call: Event name prefix (e.g. ``"chat.complete_async"``).
187 before: Extra fields merged into every emitted payload.
188 fn: Zero-argument async callable that performs the actual provider call.
189 after_extra: Callable ``(response) -> dict`` returning fields to merge
190 into the *after* payload (e.g. token counts).
191 client_id: Client identifier used in the endpoint path.
192 bearer_token: Bearer token sent in the Authorization header.
194 Returns:
195 The value returned by *fn*, unmodified.
197 Raises:
198 Exception: Whatever *fn* raises, unmodified.
199 """
200 endpoint = _build_url(url, client_id)
201 headers = _auth_headers(bearer_token)
202 request_id = str(uuid.uuid4())
204 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.before", **before}, headers)
206 start = time.perf_counter()
207 try:
208 response = await fn()
209 except Exception as exc:
210 elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
211 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.error",
212 **before, "elapsed_ms": elapsed_ms, **_error_payload(exc)}, headers)
213 raise
215 elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
216 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.after",
217 **before, "elapsed_ms": elapsed_ms, **after_extra(response)}, headers)
218 return response