Coverage for Users / vladimirpavlov / PycharmProjects / parameterizable / src / mixinforge / single_thread_enforcer_mixin.py: 94%
32 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-01 16:37 -0600
1"""Single-thread execution enforcement with multi-process support.
3This module provides utilities to ensure that code runs only on the thread
4that first initialized it, while automatically supporting process-based
5parallelism through fork detection. After a fork, the child process
6automatically becomes the new owner thread for that process.
7"""
9from __future__ import annotations
11import inspect
12import os
13import threading
15_owner_thread_native_id: int | None = None
16_owner_thread_name: str | None = None
17_owner_process_id: int | None = None
20def _restrict_to_single_thread() -> None:
21 """Ensure current thread is the original thread.
23 Validates that the calling thread is the same thread that first initialized
24 the program. Automatically resets ownership after process forks to
25 support multi-process parallelism.
27 Raises:
28 RuntimeError: If called from a different thread than the owner thread.
29 """
30 global _owner_thread_native_id, _owner_thread_name, _owner_process_id
32 current_process_id = os.getpid()
33 current_thread_native_id = threading.get_native_id()
34 current_thread_name = threading.current_thread().name
36 if _owner_process_id is not None and current_process_id != _owner_process_id:
37 _owner_thread_native_id = None
38 _owner_thread_name = None
39 _owner_process_id = None
41 if _owner_thread_native_id is None:
42 _owner_thread_native_id = current_thread_native_id
43 _owner_thread_name = current_thread_name
44 _owner_process_id = current_process_id
45 return
47 if current_thread_native_id != _owner_thread_native_id:
48 caller = inspect.stack()[1]
49 raise RuntimeError(
50 "This object is restricted to single-threaded execution.\n"
51 f"Owner thread : {_owner_thread_native_id} ({_owner_thread_name})\n"
52 f"Current thread: {current_thread_native_id} ({current_thread_name}) at "
53 f"{caller.filename}:{caller.lineno}\n"
54 "For parallelism, use multi-process execution.")
57def _reset_thread_ownership() -> None:
58 """Reset thread ownership tracking.
60 Note:
61 This function is intended for testing purposes only.
62 """
63 global _owner_thread_native_id, _owner_thread_name, _owner_process_id
64 _owner_thread_native_id = None
65 _owner_thread_name = None
66 _owner_process_id = None
69class SingleThreadEnforcerMixin:
70 """Mixin to enforce single-threaded execution with multi-process support.
72 Add this mixin to any class to ensure its methods are called only from
73 the thread that first instantiated it. Automatically resets ownership
74 after process forks to support multi-process parallelism while preventing
75 concurrent threading issues.
77 The enforcement happens at instantiation and can be manually triggered
78 via the _restrict_to_single_thread method.
80 Raises:
81 RuntimeError: If instantiated or if _restrict_to_single_thread is called
82 from a different thread than the owner thread.
84 Example:
85 >>> class MyClass(SingleThreadEnforcerMixin):
86 ... def process(self):
87 ... self._restrict_to_single_thread()
88 ... # Process safely on owner thread
89 """
91 def _restrict_to_single_thread(self):
92 """Validate that the current thread is the owner thread.
94 Raises:
95 RuntimeError: If called from a different thread than the owner thread.
96 """
97 _restrict_to_single_thread()
99 def __init__(self):
100 """Initialize and register the current thread as the owner."""
101 _restrict_to_single_thread()