weflayr.sdk.helpers

Weflayr SDK — provider-agnostic telemetry helpers.

This module contains the generic building blocks shared by every connector. Individual connectors (e.g. weflayr.sdk.mistralai) import from here and only implement the provider-specific bits.

Available helpers

  1"""Weflayr SDK — provider-agnostic telemetry helpers.
  2
  3This module contains the generic building blocks shared by every connector.
  4Individual connectors (e.g. ``weflayr.sdk.mistralai``) import from here and
  5only implement the provider-specific bits.
  6
  7Available helpers
  8-----------------
  9- :func:`post_sync`    — fire-and-forget HTTP POST (sync)
 10- :func:`post_async`   — fire-and-forget HTTP POST (async)
 11- :func:`track_sync`   — wrap a sync call with before/after telemetry
 12- :func:`track_async`  — wrap an async call with before/after telemetry
 13"""
 14
 15from __future__ import annotations
 16
 17import os
 18import threading
 19import time
 20import uuid
 21from typing import Any
 22
 23import httpx
 24
 25INTAKE_URL: str = os.environ.get("WEFLAYR_INTAKE_URL", "http://127.0.0.1:8123")
 26"""Default intake API URL, overridden by the ``WEFLAYR_INTAKE_URL`` env var."""
 27
 28
 29def post_sync(url: str, payload: dict) -> None:
 30    """POST *payload* to *url* in a background daemon thread.
 31
 32    Returns immediately — never blocks the caller or raises exceptions.
 33
 34    Args:
 35        url: HTTP endpoint to POST to.
 36        payload: JSON-serialisable dict sent as the request body.
 37    """
 38    def _send() -> None:
 39        try:
 40            with httpx.Client(timeout=5.0) as client:
 41                client.post(url, json=payload)
 42        except Exception:
 43            pass
 44
 45    threading.Thread(target=_send, daemon=True).start()
 46
 47
 48async def post_async(url: str, payload: dict) -> None:
 49    """Await a POST of *payload* to *url*, swallowing all exceptions.
 50
 51    Args:
 52        url: HTTP endpoint to POST to.
 53        payload: JSON-serialisable dict sent as the request body.
 54    """
 55    try:
 56        async with httpx.AsyncClient(timeout=5.0) as client:
 57            await client.post(url, json=payload)
 58    except Exception:
 59        pass
 60
 61
 62def track_sync(url: str, call: str, before: dict, fn, after_extra) -> Any:
 63    """Wrap a synchronous call with before/after telemetry events.
 64
 65    A UUID is generated per invocation and included in both events so the
 66    intake API can correlate them.
 67
 68    Args:
 69        url: Intake API endpoint.
 70        call: Event name prefix (e.g. ``"chat.complete"``).
 71            Produces ``"<call>.before"`` and ``"<call>.after"`` event types.
 72        before: Extra fields merged into the *before* (and *after*) payload.
 73        fn: Zero-argument callable that performs the actual provider call.
 74        after_extra: Callable ``(response) -> dict`` returning fields to merge
 75            into the *after* payload (e.g. token counts).
 76
 77    Returns:
 78        The value returned by *fn*, unmodified.
 79    """
 80    request_id = str(uuid.uuid4())
 81    post_sync(url, {"event_id": request_id, "event_type": f"{call}.before", **before})
 82
 83    start = time.perf_counter()
 84    response = fn()
 85    elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
 86
 87    post_sync(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)})
 88
 89    return response
 90
 91
 92async def track_async(url: str, call: str, before: dict, fn, after_extra) -> Any:
 93    """Wrap an async call with before/after telemetry events.
 94
 95    Async equivalent of :func:`track_sync`. The same UUID is included in both
 96    events for correlation.
 97
 98    Args:
 99        url: Intake API endpoint.
100        call: Event name prefix (e.g. ``"chat.complete_async"``).
101            Produces ``"<call>.before"`` and ``"<call>.after"`` event types.
102        before: Extra fields merged into the *before* (and *after*) payload.
103        fn: Zero-argument async callable that performs the actual provider call.
104        after_extra: Callable ``(response) -> dict`` returning fields to merge
105            into the *after* payload (e.g. token counts).
106
107    Returns:
108        The value returned by *fn*, unmodified.
109    """
110    request_id = str(uuid.uuid4())
111    await post_async(url, {"event_id": request_id, "event_type": f"{call}.before", **before})
112
113    start = time.perf_counter()
114    response = await fn()
115    elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
116
117    await post_async(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)})
118
119    return response
INTAKE_URL: str = 'http://127.0.0.1:8123'

Default intake API URL, overridden by the WEFLAYR_INTAKE_URL env var.

def post_sync(url: str, payload: dict) -> None:
30def post_sync(url: str, payload: dict) -> None:
31    """POST *payload* to *url* in a background daemon thread.
32
33    Returns immediately — never blocks the caller or raises exceptions.
34
35    Args:
36        url: HTTP endpoint to POST to.
37        payload: JSON-serialisable dict sent as the request body.
38    """
39    def _send() -> None:
40        try:
41            with httpx.Client(timeout=5.0) as client:
42                client.post(url, json=payload)
43        except Exception:
44            pass
45
46    threading.Thread(target=_send, daemon=True).start()

POST payload to url in a background daemon thread.

Returns immediately — never blocks the caller or raises exceptions.

Args: url: HTTP endpoint to POST to. payload: JSON-serialisable dict sent as the request body.

async def post_async(url: str, payload: dict) -> None:
49async def post_async(url: str, payload: dict) -> None:
50    """Await a POST of *payload* to *url*, swallowing all exceptions.
51
52    Args:
53        url: HTTP endpoint to POST to.
54        payload: JSON-serialisable dict sent as the request body.
55    """
56    try:
57        async with httpx.AsyncClient(timeout=5.0) as client:
58            await client.post(url, json=payload)
59    except Exception:
60        pass

Await a POST of payload to url, swallowing all exceptions.

Args: url: HTTP endpoint to POST to. payload: JSON-serialisable dict sent as the request body.

def track_sync(url: str, call: str, before: dict, fn, after_extra) -> Any:
63def track_sync(url: str, call: str, before: dict, fn, after_extra) -> Any:
64    """Wrap a synchronous call with before/after telemetry events.
65
66    A UUID is generated per invocation and included in both events so the
67    intake API can correlate them.
68
69    Args:
70        url: Intake API endpoint.
71        call: Event name prefix (e.g. ``"chat.complete"``).
72            Produces ``"<call>.before"`` and ``"<call>.after"`` event types.
73        before: Extra fields merged into the *before* (and *after*) payload.
74        fn: Zero-argument callable that performs the actual provider call.
75        after_extra: Callable ``(response) -> dict`` returning fields to merge
76            into the *after* payload (e.g. token counts).
77
78    Returns:
79        The value returned by *fn*, unmodified.
80    """
81    request_id = str(uuid.uuid4())
82    post_sync(url, {"event_id": request_id, "event_type": f"{call}.before", **before})
83
84    start = time.perf_counter()
85    response = fn()
86    elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
87
88    post_sync(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)})
89
90    return response

Wrap a synchronous call with before/after telemetry events.

A UUID is generated per invocation and included in both events so the intake API can correlate them.

Args: url: Intake API endpoint. call: Event name prefix (e.g. "chat.complete"). Produces "<call>.before" and "<call>.after" event types. before: Extra fields merged into the before (and after) payload. fn: Zero-argument callable that performs the actual provider call. after_extra: Callable (response) -> dict returning fields to merge into the after payload (e.g. token counts).

Returns: The value returned by fn, unmodified.

async def track_async(url: str, call: str, before: dict, fn, after_extra) -> Any:
 93async def track_async(url: str, call: str, before: dict, fn, after_extra) -> Any:
 94    """Wrap an async call with before/after telemetry events.
 95
 96    Async equivalent of :func:`track_sync`. The same UUID is included in both
 97    events for correlation.
 98
 99    Args:
100        url: Intake API endpoint.
101        call: Event name prefix (e.g. ``"chat.complete_async"``).
102            Produces ``"<call>.before"`` and ``"<call>.after"`` event types.
103        before: Extra fields merged into the *before* (and *after*) payload.
104        fn: Zero-argument async callable that performs the actual provider call.
105        after_extra: Callable ``(response) -> dict`` returning fields to merge
106            into the *after* payload (e.g. token counts).
107
108    Returns:
109        The value returned by *fn*, unmodified.
110    """
111    request_id = str(uuid.uuid4())
112    await post_async(url, {"event_id": request_id, "event_type": f"{call}.before", **before})
113
114    start = time.perf_counter()
115    response = await fn()
116    elapsed_ms = round((time.perf_counter() - start) * 1000, 1)
117
118    await post_async(url, {"event_id": request_id, "event_type": f"{call}.after", **before, "elapsed_ms": elapsed_ms, **after_extra(response)})
119
120    return response

Wrap an async call with before/after telemetry events.

Async equivalent of track_sync(). The same UUID is included in both events for correlation.

Args: url: Intake API endpoint. call: Event name prefix (e.g. "chat.complete_async"). Produces "<call>.before" and "<call>.after" event types. before: Extra fields merged into the before (and after) payload. fn: Zero-argument async callable that performs the actual provider call. after_extra: Callable (response) -> dict returning fields to merge into the after payload (e.g. token counts).

Returns: The value returned by fn, unmodified.