Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import logging 

2 

3from .compat import threading 

4 

5log = logging.getLogger(__name__) 

6 

7 

8class LockError(Exception): 

9 pass 

10 

11 

12class ReadWriteMutex(object): 

13 """A mutex which allows multiple readers, single writer. 

14 

15 :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` 

16 to provide this functionality across threads within a process. 

17 

18 The Beaker package also contained a file-lock based version 

19 of this concept, so that readers/writers could be synchronized 

20 across processes with a common filesystem. A future Dogpile 

21 release may include this additional class at some point. 

22 

23 """ 

24 

25 def __init__(self): 

26 # counts how many asynchronous methods are executing 

27 self.async_ = 0 

28 

29 # pointer to thread that is the current sync operation 

30 self.current_sync_operation = None 

31 

32 # condition object to lock on 

33 self.condition = threading.Condition(threading.Lock()) 

34 

35 def acquire_read_lock(self, wait=True): 

36 """Acquire the 'read' lock.""" 

37 self.condition.acquire() 

38 try: 

39 # see if a synchronous operation is waiting to start 

40 # or is already running, in which case we wait (or just 

41 # give up and return) 

42 if wait: 

43 while self.current_sync_operation is not None: 

44 self.condition.wait() 

45 else: 

46 if self.current_sync_operation is not None: 

47 return False 

48 

49 self.async_ += 1 

50 log.debug("%s acquired read lock", self) 

51 finally: 

52 self.condition.release() 

53 

54 if not wait: 

55 return True 

56 

57 def release_read_lock(self): 

58 """Release the 'read' lock.""" 

59 self.condition.acquire() 

60 try: 

61 self.async_ -= 1 

62 

63 # check if we are the last asynchronous reader thread 

64 # out the door. 

65 if self.async_ == 0: 

66 # yes. so if a sync operation is waiting, notifyAll to wake 

67 # it up 

68 if self.current_sync_operation is not None: 

69 self.condition.notifyAll() 

70 elif self.async_ < 0: 

71 raise LockError( 

72 "Synchronizer error - too many " 

73 "release_read_locks called" 

74 ) 

75 log.debug("%s released read lock", self) 

76 finally: 

77 self.condition.release() 

78 

79 def acquire_write_lock(self, wait=True): 

80 """Acquire the 'write' lock.""" 

81 self.condition.acquire() 

82 try: 

83 # here, we are not a synchronous reader, and after returning, 

84 # assuming waiting or immediate availability, we will be. 

85 

86 if wait: 

87 # if another sync is working, wait 

88 while self.current_sync_operation is not None: 

89 self.condition.wait() 

90 else: 

91 # if another sync is working, 

92 # we dont want to wait, so forget it 

93 if self.current_sync_operation is not None: 

94 return False 

95 

96 # establish ourselves as the current sync 

97 # this indicates to other read/write operations 

98 # that they should wait until this is None again 

99 self.current_sync_operation = threading.currentThread() 

100 

101 # now wait again for asyncs to finish 

102 if self.async_ > 0: 

103 if wait: 

104 # wait 

105 self.condition.wait() 

106 else: 

107 # we dont want to wait, so forget it 

108 self.current_sync_operation = None 

109 return False 

110 log.debug("%s acquired write lock", self) 

111 finally: 

112 self.condition.release() 

113 

114 if not wait: 

115 return True 

116 

117 def release_write_lock(self): 

118 """Release the 'write' lock.""" 

119 self.condition.acquire() 

120 try: 

121 if self.current_sync_operation is not threading.currentThread(): 

122 raise LockError( 

123 "Synchronizer error - current thread doesn't " 

124 "have the write lock" 

125 ) 

126 

127 # reset the current sync operation so 

128 # another can get it 

129 self.current_sync_operation = None 

130 

131 # tell everyone to get ready 

132 self.condition.notifyAll() 

133 

134 log.debug("%s released write lock", self) 

135 finally: 

136 # everyone go !! 

137 self.condition.release()