Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2Call loop machinery 

3""" 

4import sys 

5import warnings 

6 

7_py3 = sys.version_info > (3, 0) 

8 

9 

10if not _py3: 

11 exec( 

12 """ 

13def _reraise(cls, val, tb): 

14 raise cls, val, tb 

15""" 

16 ) 

17 

18 

19def _raise_wrapfail(wrap_controller, msg): 

20 co = wrap_controller.gi_code 

21 raise RuntimeError( 

22 "wrap_controller at %r %s:%d %s" 

23 % (co.co_name, co.co_filename, co.co_firstlineno, msg) 

24 ) 

25 

26 

27class HookCallError(Exception): 

28 """ Hook was called wrongly. """ 

29 

30 

31class _Result(object): 

32 def __init__(self, result, excinfo): 

33 self._result = result 

34 self._excinfo = excinfo 

35 

36 @property 

37 def excinfo(self): 

38 return self._excinfo 

39 

40 @property 

41 def result(self): 

42 """Get the result(s) for this hook call (DEPRECATED in favor of ``get_result()``).""" 

43 msg = "Use get_result() which forces correct exception handling" 

44 warnings.warn(DeprecationWarning(msg), stacklevel=2) 

45 return self._result 

46 

47 @classmethod 

48 def from_call(cls, func): 

49 __tracebackhide__ = True 

50 result = excinfo = None 

51 try: 

52 result = func() 

53 except BaseException: 

54 excinfo = sys.exc_info() 

55 

56 return cls(result, excinfo) 

57 

58 def force_result(self, result): 

59 """Force the result(s) to ``result``. 

60 

61 If the hook was marked as a ``firstresult`` a single value should 

62 be set otherwise set a (modified) list of results. Any exceptions 

63 found during invocation will be deleted. 

64 """ 

65 self._result = result 

66 self._excinfo = None 

67 

68 def get_result(self): 

69 """Get the result(s) for this hook call. 

70 

71 If the hook was marked as a ``firstresult`` only a single value 

72 will be returned otherwise a list of results. 

73 """ 

74 __tracebackhide__ = True 

75 if self._excinfo is None: 

76 return self._result 

77 else: 

78 ex = self._excinfo 

79 if _py3: 

80 raise ex[1].with_traceback(ex[2]) 

81 _reraise(*ex) # noqa 

82 

83 

84def _wrapped_call(wrap_controller, func): 

85 """ Wrap calling to a function with a generator which needs to yield 

86 exactly once. The yield point will trigger calling the wrapped function 

87 and return its ``_Result`` to the yield point. The generator then needs 

88 to finish (raise StopIteration) in order for the wrapped call to complete. 

89 """ 

90 try: 

91 next(wrap_controller) # first yield 

92 except StopIteration: 

93 _raise_wrapfail(wrap_controller, "did not yield") 

94 call_outcome = _Result.from_call(func) 

95 try: 

96 wrap_controller.send(call_outcome) 

97 _raise_wrapfail(wrap_controller, "has second yield") 

98 except StopIteration: 

99 pass 

100 return call_outcome.get_result() 

101 

102 

103class _LegacyMultiCall(object): 

104 """ execute a call into multiple python functions/methods. """ 

105 

106 # XXX note that the __multicall__ argument is supported only 

107 # for pytest compatibility reasons. It was never officially 

108 # supported there and is explicitely deprecated since 2.8 

109 # so we can remove it soon, allowing to avoid the below recursion 

110 # in execute() and simplify/speed up the execute loop. 

111 

112 def __init__(self, hook_impls, kwargs, firstresult=False): 

113 self.hook_impls = hook_impls 

114 self.caller_kwargs = kwargs # come from _HookCaller.__call__() 

115 self.caller_kwargs["__multicall__"] = self 

116 self.firstresult = firstresult 

117 

118 def execute(self): 

119 caller_kwargs = self.caller_kwargs 

120 self.results = results = [] 

121 firstresult = self.firstresult 

122 

123 while self.hook_impls: 

124 hook_impl = self.hook_impls.pop() 

125 try: 

126 args = [caller_kwargs[argname] for argname in hook_impl.argnames] 

127 except KeyError: 

128 for argname in hook_impl.argnames: 

129 if argname not in caller_kwargs: 

130 raise HookCallError( 

131 "hook call must provide argument %r" % (argname,) 

132 ) 

133 if hook_impl.hookwrapper: 

134 return _wrapped_call(hook_impl.function(*args), self.execute) 

135 res = hook_impl.function(*args) 

136 if res is not None: 

137 if firstresult: 

138 return res 

139 results.append(res) 

140 

141 if not firstresult: 

142 return results 

143 

144 def __repr__(self): 

145 status = "%d meths" % (len(self.hook_impls),) 

146 if hasattr(self, "results"): 

147 status = ("%d results, " % len(self.results)) + status 

148 return "<_MultiCall %s, kwargs=%r>" % (status, self.caller_kwargs) 

149 

150 

151def _legacymulticall(hook_impls, caller_kwargs, firstresult=False): 

152 return _LegacyMultiCall( 

153 hook_impls, caller_kwargs, firstresult=firstresult 

154 ).execute() 

155 

156 

157def _multicall(hook_impls, caller_kwargs, firstresult=False): 

158 """Execute a call into multiple python functions/methods and return the 

159 result(s). 

160 

161 ``caller_kwargs`` comes from _HookCaller.__call__(). 

162 """ 

163 __tracebackhide__ = True 

164 results = [] 

165 excinfo = None 

166 try: # run impl and wrapper setup functions in a loop 

167 teardowns = [] 

168 try: 

169 for hook_impl in reversed(hook_impls): 

170 try: 

171 args = [caller_kwargs[argname] for argname in hook_impl.argnames] 

172 except KeyError: 

173 for argname in hook_impl.argnames: 

174 if argname not in caller_kwargs: 

175 raise HookCallError( 

176 "hook call must provide argument %r" % (argname,) 

177 ) 

178 

179 if hook_impl.hookwrapper: 

180 try: 

181 gen = hook_impl.function(*args) 

182 next(gen) # first yield 

183 teardowns.append(gen) 

184 except StopIteration: 

185 _raise_wrapfail(gen, "did not yield") 

186 else: 

187 res = hook_impl.function(*args) 

188 if res is not None: 

189 results.append(res) 

190 if firstresult: # halt further impl calls 

191 break 

192 except BaseException: 

193 excinfo = sys.exc_info() 

194 finally: 

195 if firstresult: # first result hooks return a single value 

196 outcome = _Result(results[0] if results else None, excinfo) 

197 else: 

198 outcome = _Result(results, excinfo) 

199 

200 # run all wrapper post-yield blocks 

201 for gen in reversed(teardowns): 

202 try: 

203 gen.send(outcome) 

204 _raise_wrapfail(gen, "has second yield") 

205 except StopIteration: 

206 pass 

207 

208 return outcome.get_result()