Coverage for Users / vladimirpavlov / PycharmProjects / parameterizable / src / mixinforge / single_thread_enforcer_mixin.py: 94%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-01 16:37 -0600

1"""Single-thread execution enforcement with multi-process support. 

2 

3This module provides utilities to ensure that code runs only on the thread 

4that first initialized it, while automatically supporting process-based 

5parallelism through fork detection. After a fork, the child process 

6automatically becomes the new owner thread for that process. 

7""" 

8 

9from __future__ import annotations 

10 

11import inspect 

12import os 

13import threading 

14 

15_owner_thread_native_id: int | None = None 

16_owner_thread_name: str | None = None 

17_owner_process_id: int | None = None 

18 

19 

20def _restrict_to_single_thread() -> None: 

21 """Ensure current thread is the original thread. 

22 

23 Validates that the calling thread is the same thread that first initialized 

24 the program. Automatically resets ownership after process forks to 

25 support multi-process parallelism. 

26 

27 Raises: 

28 RuntimeError: If called from a different thread than the owner thread. 

29 """ 

30 global _owner_thread_native_id, _owner_thread_name, _owner_process_id 

31 

32 current_process_id = os.getpid() 

33 current_thread_native_id = threading.get_native_id() 

34 current_thread_name = threading.current_thread().name 

35 

36 if _owner_process_id is not None and current_process_id != _owner_process_id: 

37 _owner_thread_native_id = None 

38 _owner_thread_name = None 

39 _owner_process_id = None 

40 

41 if _owner_thread_native_id is None: 

42 _owner_thread_native_id = current_thread_native_id 

43 _owner_thread_name = current_thread_name 

44 _owner_process_id = current_process_id 

45 return 

46 

47 if current_thread_native_id != _owner_thread_native_id: 

48 caller = inspect.stack()[1] 

49 raise RuntimeError( 

50 "This object is restricted to single-threaded execution.\n" 

51 f"Owner thread : {_owner_thread_native_id} ({_owner_thread_name})\n" 

52 f"Current thread: {current_thread_native_id} ({current_thread_name}) at " 

53 f"{caller.filename}:{caller.lineno}\n" 

54 "For parallelism, use multi-process execution.") 

55 

56 

57def _reset_thread_ownership() -> None: 

58 """Reset thread ownership tracking. 

59 

60 Note: 

61 This function is intended for testing purposes only. 

62 """ 

63 global _owner_thread_native_id, _owner_thread_name, _owner_process_id 

64 _owner_thread_native_id = None 

65 _owner_thread_name = None 

66 _owner_process_id = None 

67 

68 

69class SingleThreadEnforcerMixin: 

70 """Mixin to enforce single-threaded execution with multi-process support. 

71 

72 Add this mixin to any class to ensure its methods are called only from 

73 the thread that first instantiated it. Automatically resets ownership 

74 after process forks to support multi-process parallelism while preventing 

75 concurrent threading issues. 

76 

77 The enforcement happens at instantiation and can be manually triggered 

78 via the _restrict_to_single_thread method. 

79 

80 Raises: 

81 RuntimeError: If instantiated or if _restrict_to_single_thread is called 

82 from a different thread than the owner thread. 

83 

84 Example: 

85 >>> class MyClass(SingleThreadEnforcerMixin): 

86 ... def process(self): 

87 ... self._restrict_to_single_thread() 

88 ... # Process safely on owner thread 

89 """ 

90 

91 def _restrict_to_single_thread(self): 

92 """Validate that the current thread is the owner thread. 

93 

94 Raises: 

95 RuntimeError: If called from a different thread than the owner thread. 

96 """ 

97 _restrict_to_single_thread() 

98 

99 def __init__(self): 

100 """Initialize and register the current thread as the owner.""" 

101 _restrict_to_single_thread()