Coverage for src / tracekit / core / correlation.py: 97%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Correlation ID management for distributed tracing. 

2 

3This module provides correlation ID generation and propagation for 

4request tracing across TraceKit operations. 

5 

6 

7Example: 

8 >>> from tracekit.core.correlation import with_correlation_id, get_correlation_id 

9 >>> @with_correlation_id() 

10 ... def analyze_trace(data): 

11 ... corr_id = get_correlation_id() 

12 ... print(f"Processing with correlation ID: {corr_id}") 

13 

14References: 

15 - Distributed tracing best practices 

16 - Thread-local and async-safe context management 

17""" 

18 

19from __future__ import annotations 

20 

21import contextvars 

22import functools 

23import uuid 

24from collections.abc import Callable 

25from typing import Any, TypeVar 

26 

27# Context variable for correlation ID (thread-safe and async-safe) 

28_correlation_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( 

29 "correlation_id", default=None 

30) 

31 

32 

33def get_correlation_id() -> str | None: 

34 """Get the current correlation ID for request tracing. 

35 

36 Returns the correlation ID from the current context, or None if 

37 no correlation context is active. 

38 

39 Returns: 

40 Current correlation ID (UUID string) or None. 

41 

42 Example: 

43 >>> with CorrelationContext(): 

44 ... corr_id = get_correlation_id() 

45 ... print(f"Correlation ID: {corr_id}") 

46 

47 References: 

48 LOG-004: Correlation ID Injection 

49 """ 

50 return _correlation_id.get() 

51 

52 

53def set_correlation_id(corr_id: str) -> None: 

54 """Set the correlation ID for the current context. 

55 

56 Args: 

57 corr_id: Correlation ID to set (typically a UUID string). 

58 

59 Example: 

60 >>> set_correlation_id("550e8400-e29b-41d4-a716-446655440000") 

61 >>> print(get_correlation_id()) 

62 550e8400-e29b-41d4-a716-446655440000 

63 

64 References: 

65 LOG-004: Correlation ID Injection 

66 """ 

67 _correlation_id.set(corr_id) 

68 

69 

70class CorrelationContext: 

71 """Context manager for correlation ID scoping. 

72 

73 Automatically generates a UUID correlation ID if not provided, 

74 and ensures proper cleanup when exiting the context. 

75 

76 Thread-safe and async-safe using contextvars. 

77 

78 Args: 

79 corr_id: Optional correlation ID. If None, generates a new UUID. 

80 

81 Example: 

82 >>> # Auto-generate correlation ID 

83 >>> with CorrelationContext() as corr_id: 

84 ... print(f"Generated ID: {corr_id}") 

85 ... # All operations here have this correlation ID 

86 ... result = some_analysis() 

87 

88 >>> # Use explicit correlation ID 

89 >>> with CorrelationContext("my-custom-id") as corr_id: 

90 ... print(f"Using ID: {corr_id}") 

91 

92 References: 

93 LOG-004: Correlation ID Injection 

94 """ 

95 

96 def __init__(self, corr_id: str | None = None): 

97 """Initialize correlation context. 

98 

99 Args: 

100 corr_id: Correlation ID to use, or None to auto-generate. 

101 """ 

102 self.corr_id = corr_id or str(uuid.uuid4()) 

103 self.token: contextvars.Token | None = None # type: ignore[type-arg] 

104 

105 def __enter__(self) -> str: 

106 """Enter the correlation context. 

107 

108 Returns: 

109 The correlation ID for this context. 

110 """ 

111 self.token = _correlation_id.set(self.corr_id) 

112 return self.corr_id 

113 

114 def __exit__(self, *args: Any) -> None: 

115 """Exit the correlation context and restore previous value.""" 

116 if self.token: 116 ↛ exitline 116 didn't return from function '__exit__' because the condition on line 116 was always true

117 _correlation_id.reset(self.token) 

118 

119 

120F = TypeVar("F", bound=Callable[..., Any]) 

121 

122 

123def with_correlation_id(corr_id: str | None = None) -> Callable[[F], F]: 

124 """Decorator to set correlation ID for a function call. 

125 

126 Automatically wraps the function in a CorrelationContext, ensuring 

127 all operations within the function are traced with the same ID. 

128 

129 Args: 

130 corr_id: Correlation ID to use, or None to auto-generate. 

131 

132 Returns: 

133 Decorator function. 

134 

135 Note: 

136 This decorator currently only supports synchronous functions. 

137 For async functions, use CorrelationContext directly: 

138 

139 >>> async def async_function(): 

140 ... with CorrelationContext("my-id"): 

141 ... await some_async_operation() 

142 

143 Example: 

144 >>> @with_correlation_id() 

145 ... def analyze_trace(trace_data): 

146 ... logger.info("Starting analysis") 

147 ... # All logs will include the correlation ID 

148 ... result = compute_fft(trace_data) 

149 ... return result 

150 

151 >>> @with_correlation_id("batch-job-123") 

152 ... def process_batch(files): 

153 ... for f in files: 

154 ... load_and_analyze(f) 

155 

156 References: 

157 LOG-004: Correlation ID Injection 

158 """ 

159 

160 def decorator(func: F) -> F: 

161 @functools.wraps(func) 

162 def wrapper(*args: Any, **kwargs: Any) -> Any: 

163 with CorrelationContext(corr_id): 

164 return func(*args, **kwargs) 

165 

166 return wrapper # type: ignore[return-value] 

167 

168 return decorator 

169 

170 

171def generate_correlation_id() -> str: 

172 """Generate a new correlation ID (UUID4). 

173 

174 Returns: 

175 New correlation ID as string. 

176 

177 Example: 

178 >>> corr_id = generate_correlation_id() 

179 >>> with CorrelationContext(corr_id): 

180 ... process_data() 

181 

182 References: 

183 LOG-004: Correlation ID Injection 

184 """ 

185 return str(uuid.uuid4())