Coverage for /home/martinb/.local/share/virtualenvs/camcops/lib/python3.6/site-packages/kombu/clocks.py : 3%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Logical Clocks and Synchronization."""
2from __future__ import absolute_import, unicode_literals
4from threading import Lock
5from itertools import islice
6from operator import itemgetter
8from .five import python_2_unicode_compatible, zip
10__all__ = ('LamportClock', 'timetuple')
12R_CLOCK = '_lamport(clock={0}, timestamp={1}, id={2} {3!r})'
15@python_2_unicode_compatible
16class timetuple(tuple):
17 """Tuple of event clock information.
19 Can be used as part of a heap to keep events ordered.
21 Arguments:
22 clock (int): Event clock value.
23 timestamp (float): Event UNIX timestamp value.
24 id (str): Event host id (e.g. ``hostname:pid``).
25 obj (Any): Optional obj to associate with this event.
26 """
28 __slots__ = ()
30 def __new__(cls, clock, timestamp, id, obj=None):
31 return tuple.__new__(cls, (clock, timestamp, id, obj))
33 def __repr__(self):
34 return R_CLOCK.format(*self)
36 def __getnewargs__(self):
37 return tuple(self)
39 def __lt__(self, other):
40 # 0: clock 1: timestamp 3: process id
41 try:
42 A, B = self[0], other[0]
43 # uses logical clock value first
44 if A and B: # use logical clock if available
45 if A == B: # equal clocks use lower process id
46 return self[2] < other[2]
47 return A < B
48 return self[1] < other[1] # ... or use timestamp
49 except IndexError:
50 return NotImplemented
52 def __gt__(self, other):
53 return other < self
55 def __le__(self, other):
56 return not other < self
58 def __ge__(self, other):
59 return not self < other
61 clock = property(itemgetter(0))
62 timestamp = property(itemgetter(1))
63 id = property(itemgetter(2))
64 obj = property(itemgetter(3))
67@python_2_unicode_compatible
68class LamportClock(object):
69 """Lamport's logical clock.
71 From Wikipedia:
73 A Lamport logical clock is a monotonically incrementing software counter
74 maintained in each process. It follows some simple rules:
76 * A process increments its counter before each event in that process;
77 * When a process sends a message, it includes its counter value with
78 the message;
79 * On receiving a message, the receiver process sets its counter to be
80 greater than the maximum of its own value and the received value
81 before it considers the message received.
83 Conceptually, this logical clock can be thought of as a clock that only
84 has meaning in relation to messages moving between processes. When a
85 process receives a message, it resynchronizes its logical clock with
86 the sender.
88 See Also:
89 * `Lamport timestamps`_
91 * `Lamports distributed mutex`_
93 .. _`Lamport Timestamps`: https://en.wikipedia.org/wiki/Lamport_timestamps
94 .. _`Lamports distributed mutex`: https://bit.ly/p99ybE
96 *Usage*
98 When sending a message use :meth:`forward` to increment the clock,
99 when receiving a message use :meth:`adjust` to sync with
100 the time stamp of the incoming message.
102 """
104 #: The clocks current value.
105 value = 0
107 def __init__(self, initial_value=0, Lock=Lock):
108 self.value = initial_value
109 self.mutex = Lock()
111 def adjust(self, other):
112 with self.mutex:
113 value = self.value = max(self.value, other) + 1
114 return value
116 def forward(self):
117 with self.mutex:
118 self.value += 1
119 return self.value
121 def sort_heap(self, h):
122 """Sort heap of events.
124 List of tuples containing at least two elements, representing
125 an event, where the first element is the event's scalar clock value,
126 and the second element is the id of the process (usually
127 ``"hostname:pid"``): ``sh([(clock, processid, ...?), (...)])``
129 The list must already be sorted, which is why we refer to it as a
130 heap.
132 The tuple will not be unpacked, so more than two elements can be
133 present.
135 Will return the latest event.
136 """
137 if h[0][0] == h[1][0]:
138 same = []
139 for PN in zip(h, islice(h, 1, None)):
140 if PN[0][0] != PN[1][0]:
141 break # Prev and Next's clocks differ
142 same.append(PN[0])
143 # return first item sorted by process id
144 return sorted(same, key=lambda event: event[1])[0]
145 # clock values unique, return first item
146 return h[0]
148 def __str__(self):
149 return str(self.value)
151 def __repr__(self):
152 return '<LamportClock: {0.value}>'.format(self)