weflayr.sdk.helpers
Weflayr SDK — provider-agnostic telemetry helpers.
This module contains the generic building blocks shared by every connector.
Individual connectors (e.g. weflayr.sdk.mistralai) import from here and
only implement the provider-specific bits.
Available helpers
post_sync()— fire-and-forget HTTP POST (sync)post_async()— fire-and-forget HTTP POST (async)track_sync()— wrap a sync call with before/after telemetrytrack_async()— wrap an async call with before/after telemetry
1"""Weflayr SDK — provider-agnostic telemetry helpers. 2 3This module contains the generic building blocks shared by every connector. 4Individual connectors (e.g. ``weflayr.sdk.mistralai``) import from here and 5only implement the provider-specific bits. 6 7Available helpers 8----------------- 9- :func:`post_sync` — fire-and-forget HTTP POST (sync) 10- :func:`post_async` — fire-and-forget HTTP POST (async) 11- :func:`track_sync` — wrap a sync call with before/after telemetry 12- :func:`track_async` — wrap an async call with before/after telemetry 13""" 14 15from __future__ import annotations 16 17import os 18import threading 19import time 20import uuid 21from typing import Any 22 23import httpx 24 25INTAKE_URL: str = os.environ.get("WEFLAYR_INTAKE_URL", "http://127.0.0.1:8123") 26"""Default intake API URL, overridden by the ``WEFLAYR_INTAKE_URL`` env var.""" 27 28 29def post_sync(url: str, payload: dict) -> None: 30 """POST *payload* to *url* in a background daemon thread. 31 32 Returns immediately — never blocks the caller or raises exceptions. 33 34 Args: 35 url: HTTP endpoint to POST to. 36 payload: JSON-serialisable dict sent as the request body. 37 """ 38 def _send() -> None: 39 try: 40 with httpx.Client(timeout=5.0) as client: 41 client.post(url, json=payload) 42 except Exception: 43 pass 44 45 threading.Thread(target=_send, daemon=True).start() 46 47 48async def post_async(url: str, payload: dict) -> None: 49 """Await a POST of *payload* to *url*, swallowing all exceptions. 50 51 Args: 52 url: HTTP endpoint to POST to. 53 payload: JSON-serialisable dict sent as the request body. 54 """ 55 try: 56 async with httpx.AsyncClient(timeout=5.0) as client: 57 await client.post(url, json=payload) 58 except Exception: 59 pass 60 61 62def track_sync(url: str, call: str, before: dict, fn, after_extra) -> Any: 63 """Wrap a synchronous call with before/after telemetry events. 64 65 A UUID is generated per invocation and included in both events so the 66 intake API can correlate them. 67 68 Args: 69 url: Intake API endpoint. 70 call: Event name prefix (e.g. ``"chat.complete"``). 71 Produces ``"<call>.before"`` and ``"<call>.after"`` event types. 72 before: Extra fields merged into the *before* (and *after*) payload. 73 fn: Zero-argument callable that performs the actual provider call. 74 after_extra: Callable ``(response) -> dict`` returning fields to merge 75 into the *after* payload (e.g. token counts). 76 77 Returns: 78 The value returned by *fn*, unmodified. 79 """ 80 request_id = str(uuid.uuid4()) 81 post_sync(url, {"event_id": request_id, "event_type": f"{call}.before", **before}) 82 83 start = time.perf_counter() 84 response = fn() 85 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 86 87 post_sync(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)}) 88 89 return response 90 91 92async def track_async(url: str, call: str, before: dict, fn, after_extra) -> Any: 93 """Wrap an async call with before/after telemetry events. 94 95 Async equivalent of :func:`track_sync`. The same UUID is included in both 96 events for correlation. 97 98 Args: 99 url: Intake API endpoint. 100 call: Event name prefix (e.g. ``"chat.complete_async"``). 101 Produces ``"<call>.before"`` and ``"<call>.after"`` event types. 102 before: Extra fields merged into the *before* (and *after*) payload. 103 fn: Zero-argument async callable that performs the actual provider call. 104 after_extra: Callable ``(response) -> dict`` returning fields to merge 105 into the *after* payload (e.g. token counts). 106 107 Returns: 108 The value returned by *fn*, unmodified. 109 """ 110 request_id = str(uuid.uuid4()) 111 await post_async(url, {"event_id": request_id, "event_type": f"{call}.before", **before}) 112 113 start = time.perf_counter() 114 response = await fn() 115 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 116 117 await post_async(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)}) 118 119 return response
Default intake API URL, overridden by the WEFLAYR_INTAKE_URL env var.
30def post_sync(url: str, payload: dict) -> None: 31 """POST *payload* to *url* in a background daemon thread. 32 33 Returns immediately — never blocks the caller or raises exceptions. 34 35 Args: 36 url: HTTP endpoint to POST to. 37 payload: JSON-serialisable dict sent as the request body. 38 """ 39 def _send() -> None: 40 try: 41 with httpx.Client(timeout=5.0) as client: 42 client.post(url, json=payload) 43 except Exception: 44 pass 45 46 threading.Thread(target=_send, daemon=True).start()
POST payload to url in a background daemon thread.
Returns immediately — never blocks the caller or raises exceptions.
Args: url: HTTP endpoint to POST to. payload: JSON-serialisable dict sent as the request body.
49async def post_async(url: str, payload: dict) -> None: 50 """Await a POST of *payload* to *url*, swallowing all exceptions. 51 52 Args: 53 url: HTTP endpoint to POST to. 54 payload: JSON-serialisable dict sent as the request body. 55 """ 56 try: 57 async with httpx.AsyncClient(timeout=5.0) as client: 58 await client.post(url, json=payload) 59 except Exception: 60 pass
Await a POST of payload to url, swallowing all exceptions.
Args: url: HTTP endpoint to POST to. payload: JSON-serialisable dict sent as the request body.
63def track_sync(url: str, call: str, before: dict, fn, after_extra) -> Any: 64 """Wrap a synchronous call with before/after telemetry events. 65 66 A UUID is generated per invocation and included in both events so the 67 intake API can correlate them. 68 69 Args: 70 url: Intake API endpoint. 71 call: Event name prefix (e.g. ``"chat.complete"``). 72 Produces ``"<call>.before"`` and ``"<call>.after"`` event types. 73 before: Extra fields merged into the *before* (and *after*) payload. 74 fn: Zero-argument callable that performs the actual provider call. 75 after_extra: Callable ``(response) -> dict`` returning fields to merge 76 into the *after* payload (e.g. token counts). 77 78 Returns: 79 The value returned by *fn*, unmodified. 80 """ 81 request_id = str(uuid.uuid4()) 82 post_sync(url, {"event_id": request_id, "event_type": f"{call}.before", **before}) 83 84 start = time.perf_counter() 85 response = fn() 86 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 87 88 post_sync(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)}) 89 90 return response
Wrap a synchronous call with before/after telemetry events.
A UUID is generated per invocation and included in both events so the intake API can correlate them.
Args:
url: Intake API endpoint.
call: Event name prefix (e.g. "chat.complete").
Produces "<call>.before" and "<call>.after" event types.
before: Extra fields merged into the before (and after) payload.
fn: Zero-argument callable that performs the actual provider call.
after_extra: Callable (response) -> dict returning fields to merge
into the after payload (e.g. token counts).
Returns: The value returned by fn, unmodified.
93async def track_async(url: str, call: str, before: dict, fn, after_extra) -> Any: 94 """Wrap an async call with before/after telemetry events. 95 96 Async equivalent of :func:`track_sync`. The same UUID is included in both 97 events for correlation. 98 99 Args: 100 url: Intake API endpoint. 101 call: Event name prefix (e.g. ``"chat.complete_async"``). 102 Produces ``"<call>.before"`` and ``"<call>.after"`` event types. 103 before: Extra fields merged into the *before* (and *after*) payload. 104 fn: Zero-argument async callable that performs the actual provider call. 105 after_extra: Callable ``(response) -> dict`` returning fields to merge 106 into the *after* payload (e.g. token counts). 107 108 Returns: 109 The value returned by *fn*, unmodified. 110 """ 111 request_id = str(uuid.uuid4()) 112 await post_async(url, {"event_id": request_id, "event_type": f"{call}.before", **before}) 113 114 start = time.perf_counter() 115 response = await fn() 116 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 117 118 await post_async(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)}) 119 120 return response
Wrap an async call with before/after telemetry events.
Async equivalent of track_sync(). The same UUID is included in both
events for correlation.
Args:
url: Intake API endpoint.
call: Event name prefix (e.g. "chat.complete_async").
Produces "<call>.before" and "<call>.after" event types.
before: Extra fields merged into the before (and after) payload.
fn: Zero-argument async callable that performs the actual provider call.
after_extra: Callable (response) -> dict returning fields to merge
into the after payload (e.g. token counts).
Returns: The value returned by fn, unmodified.