Coverage for src / tracekit / core / correlation.py: 97%
32 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Correlation ID management for distributed tracing.
3This module provides correlation ID generation and propagation for
4request tracing across TraceKit operations.
7Example:
8 >>> from tracekit.core.correlation import with_correlation_id, get_correlation_id
9 >>> @with_correlation_id()
10 ... def analyze_trace(data):
11 ... corr_id = get_correlation_id()
12 ... print(f"Processing with correlation ID: {corr_id}")
14References:
15 - Distributed tracing best practices
16 - Thread-local and async-safe context management
17"""
19from __future__ import annotations
21import contextvars
22import functools
23import uuid
24from collections.abc import Callable
25from typing import Any, TypeVar
27# Context variable for correlation ID (thread-safe and async-safe)
28_correlation_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
29 "correlation_id", default=None
30)
33def get_correlation_id() -> str | None:
34 """Get the current correlation ID for request tracing.
36 Returns the correlation ID from the current context, or None if
37 no correlation context is active.
39 Returns:
40 Current correlation ID (UUID string) or None.
42 Example:
43 >>> with CorrelationContext():
44 ... corr_id = get_correlation_id()
45 ... print(f"Correlation ID: {corr_id}")
47 References:
48 LOG-004: Correlation ID Injection
49 """
50 return _correlation_id.get()
53def set_correlation_id(corr_id: str) -> None:
54 """Set the correlation ID for the current context.
56 Args:
57 corr_id: Correlation ID to set (typically a UUID string).
59 Example:
60 >>> set_correlation_id("550e8400-e29b-41d4-a716-446655440000")
61 >>> print(get_correlation_id())
62 550e8400-e29b-41d4-a716-446655440000
64 References:
65 LOG-004: Correlation ID Injection
66 """
67 _correlation_id.set(corr_id)
70class CorrelationContext:
71 """Context manager for correlation ID scoping.
73 Automatically generates a UUID correlation ID if not provided,
74 and ensures proper cleanup when exiting the context.
76 Thread-safe and async-safe using contextvars.
78 Args:
79 corr_id: Optional correlation ID. If None, generates a new UUID.
81 Example:
82 >>> # Auto-generate correlation ID
83 >>> with CorrelationContext() as corr_id:
84 ... print(f"Generated ID: {corr_id}")
85 ... # All operations here have this correlation ID
86 ... result = some_analysis()
88 >>> # Use explicit correlation ID
89 >>> with CorrelationContext("my-custom-id") as corr_id:
90 ... print(f"Using ID: {corr_id}")
92 References:
93 LOG-004: Correlation ID Injection
94 """
96 def __init__(self, corr_id: str | None = None):
97 """Initialize correlation context.
99 Args:
100 corr_id: Correlation ID to use, or None to auto-generate.
101 """
102 self.corr_id = corr_id or str(uuid.uuid4())
103 self.token: contextvars.Token | None = None # type: ignore[type-arg]
105 def __enter__(self) -> str:
106 """Enter the correlation context.
108 Returns:
109 The correlation ID for this context.
110 """
111 self.token = _correlation_id.set(self.corr_id)
112 return self.corr_id
114 def __exit__(self, *args: Any) -> None:
115 """Exit the correlation context and restore previous value."""
116 if self.token: 116 ↛ exitline 116 didn't return from function '__exit__' because the condition on line 116 was always true
117 _correlation_id.reset(self.token)
120F = TypeVar("F", bound=Callable[..., Any])
123def with_correlation_id(corr_id: str | None = None) -> Callable[[F], F]:
124 """Decorator to set correlation ID for a function call.
126 Automatically wraps the function in a CorrelationContext, ensuring
127 all operations within the function are traced with the same ID.
129 Args:
130 corr_id: Correlation ID to use, or None to auto-generate.
132 Returns:
133 Decorator function.
135 Note:
136 This decorator currently only supports synchronous functions.
137 For async functions, use CorrelationContext directly:
139 >>> async def async_function():
140 ... with CorrelationContext("my-id"):
141 ... await some_async_operation()
143 Example:
144 >>> @with_correlation_id()
145 ... def analyze_trace(trace_data):
146 ... logger.info("Starting analysis")
147 ... # All logs will include the correlation ID
148 ... result = compute_fft(trace_data)
149 ... return result
151 >>> @with_correlation_id("batch-job-123")
152 ... def process_batch(files):
153 ... for f in files:
154 ... load_and_analyze(f)
156 References:
157 LOG-004: Correlation ID Injection
158 """
160 def decorator(func: F) -> F:
161 @functools.wraps(func)
162 def wrapper(*args: Any, **kwargs: Any) -> Any:
163 with CorrelationContext(corr_id):
164 return func(*args, **kwargs)
166 return wrapper # type: ignore[return-value]
168 return decorator
171def generate_correlation_id() -> str:
172 """Generate a new correlation ID (UUID4).
174 Returns:
175 New correlation ID as string.
177 Example:
178 >>> corr_id = generate_correlation_id()
179 >>> with CorrelationContext(corr_id):
180 ... process_data()
182 References:
183 LOG-004: Correlation ID Injection
184 """
185 return str(uuid.uuid4())