Coverage for python / weflayr / sdk / helpers.py: 100%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-23 15:28 +0100

1"""Weflayr SDK — provider-agnostic telemetry helpers. 

2 

3This module contains the generic building blocks shared by every connector. 

4Individual connectors (e.g. ``weflayr.sdk.mistralai``) import from here and 

5only implement the provider-specific bits. 

6 

7Available helpers 

8----------------- 

9- :func:`post_sync` — fire-and-forget HTTP POST (sync) 

10- :func:`post_async` — fire-and-forget HTTP POST (async) 

11- :func:`track_sync` — wrap a sync call with before/after/error telemetry 

12- :func:`track_async` — wrap an async call with before/after/error telemetry 

13""" 

14 

15from __future__ import annotations 

16 

17import os 

18import threading 

19import time 

20import uuid 

21from typing import Any 

22 

23import httpx 

24 

25INTAKE_URL: str = os.environ.get("WEFLAYR_INTAKE_URL", "http://127.0.0.1:8123") 

26"""Default intake API URL, overridden by the ``WEFLAYR_INTAKE_URL`` env var.""" 

27 

28CLIENT_ID: str = os.environ.get("WEFLAYR_CLIENT_ID", "unknown_client") 

29"""Default client ID, overridden by the ``WEFLAYR_CLIENT_ID`` env var.""" 

30 

31CLIENT_SECRET: str = os.environ.get("WEFLAYR_CLIENT_SECRET", "") 

32"""Default bearer token, overridden by the ``WEFLAYR_CLIENT_SECRET`` env var.""" 

33 

34 

35def _build_url(base_url: str, client_id: str) -> str: 

36 """Build the intake endpoint URL for a given client. 

37 

38 Args: 

39 base_url: Intake API base URL (e.g. ``"http://127.0.0.1:8000"``). 

40 client_id: Client identifier included in the path. 

41 

42 Returns: 

43 ``"{base_url}/{client_id}/"`` 

44 """ 

45 return f"{base_url.rstrip('/')}/{client_id}/" 

46 

47 

48def _auth_headers(bearer_token: str) -> dict: 

49 """Build the Authorization header dict. 

50 

51 Args: 

52 bearer_token: The bearer token to include. 

53 

54 Returns: 

55 ``{"Authorization": "Bearer {bearer_token}"}`` 

56 """ 

57 return {"Authorization": f"Bearer {bearer_token}"} 

58 

59 

60def post_sync(url: str, payload: dict, headers: dict | None = None) -> None: 

61 """POST *payload* to *url* in a background daemon thread. 

62 

63 Returns immediately — never blocks the caller or raises exceptions. 

64 

65 Args: 

66 url: HTTP endpoint to POST to. 

67 payload: JSON-serialisable dict sent as the request body. 

68 headers: Optional HTTP headers (e.g. Authorization). 

69 """ 

70 def _send() -> None: 

71 try: 

72 with httpx.Client(timeout=5.0) as client: 

73 client.post(url, json=payload, headers=headers or {}) 

74 except Exception: 

75 pass 

76 

77 threading.Thread(target=_send, daemon=True).start() 

78 

79 

80async def post_async(url: str, payload: dict, headers: dict | None = None) -> None: 

81 """Await a POST of *payload* to *url*, swallowing all exceptions. 

82 

83 Args: 

84 url: HTTP endpoint to POST to. 

85 payload: JSON-serialisable dict sent as the request body. 

86 headers: Optional HTTP headers (e.g. Authorization). 

87 """ 

88 try: 

89 async with httpx.AsyncClient(timeout=5.0) as client: 

90 await client.post(url, json=payload, headers=headers or {}) 

91 except Exception: 

92 pass 

93 

94 

95def _error_payload(exc: Exception) -> dict: 

96 """Extract a provider-agnostic error dict from any exception. 

97 

98 Args: 

99 exc: The exception raised by the provider SDK. 

100 

101 Returns: 

102 A dict with ``error_type``, ``error_message``, and ``status_code``. 

103 """ 

104 return { 

105 "error_type": type(exc).__name__, 

106 "error_message": str(exc), 

107 "status_code": getattr(exc, "status_code", None), 

108 } 

109 

110 

111def track_sync( 

112 url: str, 

113 call: str, 

114 before: dict, 

115 fn, 

116 after_extra, 

117 *, 

118 client_id: str = CLIENT_ID, 

119 bearer_token: str = CLIENT_SECRET, 

120) -> Any: 

121 """Wrap a synchronous call with before/after/error telemetry events. 

122 

123 Builds ``{url}/{client_id}/`` as the endpoint and sends 

124 ``Authorization: Bearer {bearer_token}`` on every request. 

125 

126 Events emitted: 

127 

128 - ``<call>.before`` — always, before the provider call 

129 - ``<call>.after`` — on success, with timing and token usage 

130 - ``<call>.error`` — on failure, with timing and error details; 

131 the original exception is always re-raised after the event is sent 

132 

133 Args: 

134 url: Intake API base URL. 

135 call: Event name prefix (e.g. ``"chat.complete"``). 

136 before: Extra fields merged into every emitted payload. 

137 fn: Zero-argument callable that performs the actual provider call. 

138 after_extra: Callable ``(response) -> dict`` returning fields to merge 

139 into the *after* payload (e.g. token counts). 

140 client_id: Client identifier used in the endpoint path. 

141 bearer_token: Bearer token sent in the Authorization header. 

142 

143 Returns: 

144 The value returned by *fn*, unmodified. 

145 

146 Raises: 

147 Exception: Whatever *fn* raises, unmodified. 

148 """ 

149 endpoint = _build_url(url, client_id) 

150 headers = _auth_headers(bearer_token) 

151 request_id = str(uuid.uuid4()) 

152 

153 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.before", **before}, headers) 

154 

155 start = time.perf_counter() 

156 try: 

157 response = fn() 

158 except Exception as exc: 

159 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 

160 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.error", 

161 **before, "elapsed_ms": elapsed_ms, **_error_payload(exc)}, headers) 

162 raise 

163 

164 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 

165 post_sync(endpoint, {"event_id": request_id, "event_type": f"{call}.after", 

166 **before, "elapsed_ms": elapsed_ms, **after_extra(response)}, headers) 

167 return response 

168 

169 

170async def track_async( 

171 url: str, 

172 call: str, 

173 before: dict, 

174 fn, 

175 after_extra, 

176 *, 

177 client_id: str = CLIENT_ID, 

178 bearer_token: str = CLIENT_SECRET, 

179) -> Any: 

180 """Wrap an async call with before/after/error telemetry events. 

181 

182 Async equivalent of :func:`track_sync`. 

183 

184 Args: 

185 url: Intake API base URL. 

186 call: Event name prefix (e.g. ``"chat.complete_async"``). 

187 before: Extra fields merged into every emitted payload. 

188 fn: Zero-argument async callable that performs the actual provider call. 

189 after_extra: Callable ``(response) -> dict`` returning fields to merge 

190 into the *after* payload (e.g. token counts). 

191 client_id: Client identifier used in the endpoint path. 

192 bearer_token: Bearer token sent in the Authorization header. 

193 

194 Returns: 

195 The value returned by *fn*, unmodified. 

196 

197 Raises: 

198 Exception: Whatever *fn* raises, unmodified. 

199 """ 

200 endpoint = _build_url(url, client_id) 

201 headers = _auth_headers(bearer_token) 

202 request_id = str(uuid.uuid4()) 

203 

204 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.before", **before}, headers) 

205 

206 start = time.perf_counter() 

207 try: 

208 response = await fn() 

209 except Exception as exc: 

210 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 

211 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.error", 

212 **before, "elapsed_ms": elapsed_ms, **_error_payload(exc)}, headers) 

213 raise 

214 

215 elapsed_ms = round((time.perf_counter() - start) * 1000, 1) 

216 await post_async(endpoint, {"event_id": request_id, "event_type": f"{call}.after", 

217 **before, "elapsed_ms": elapsed_ms, **after_extra(response)}, headers) 

218 return response