Coverage for greyhorse / river / private / monads / resource.py: 97%

116 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-18 11:33 +0300

1# mypy: warn_no_return=false,disable_error_code="misc,type-arg,valid-type,return-value,operator,call-arg,override" 

2"""Resource monad — bracket-based lifecycle management. 

3 

4``Resource[F, T]`` is an ADT (Pure/Allocate/Bind/Eval) that describes 

5acquire/release pairs for managed resources. Parameterized by effect type ``F`` 

6(typically ``IO``) and value type ``T``. 

7 

8The primary API is ``Resource.make(acquire, release)`` + ``.run()`` for 

9immediate execution, or ``.open()`` for ``with``-statement usage via 

10``ResourceHandle``. 

11 

12Examples: 

13 Simple value:: 

14 

15 res = Resource[IO, int].pure(42) 

16 assert res.run() == 42 

17 

18 Managed resource with acquire/release:: 

19 

20 pool = Resource[IO, Pool].make( 

21 IO(lambda: Pool.create(size=10)), 

22 lambda p: IO(lambda: p.close()), 

23 ) 

24 with pool.open() as p: 

25 p.query(...) 

26 # p.close() called automatically 

27 

28 Chaining resources:: 

29 

30 db = Resource[IO, Db].make(IO(connect), lambda c: IO(c.close)) 

31 tx = db.and_then(lambda c: Resource[IO, Tx].make( 

32 IO(c.begin), lambda t: IO(t.rollback) 

33 )) 

34 result = tx.run() # connect → begin → use → rollback → close 

35""" 

36 

37from __future__ import annotations 

38 

39import logging 

40from collections.abc import Callable 

41from functools import partial 

42from typing import Any, ForwardRef, TypeVar 

43 

44from greyhorse.enum import Enum, Struct, Tuple 

45from greyhorse.utils.types import TypeWrapper 

46 

47from .effect import Effect, ExitCase 

48from .io import IO 

49 

50 

51logger = logging.getLogger(__name__) 

52 

53 

54F = TypeVar('F', bound=Effect, covariant=True) 

55T = TypeVar('T', covariant=True) 

56U = TypeVar('U', covariant=True) 

57 

58 

59class Resource(Enum, TypeWrapper[F, T]): 

60 """Bracket-based resource lifecycle as an algebraic data type. 

61 

62 Four variants: 

63 

64 - ``Pure(value)`` — wraps a ready value, no cleanup. 

65 - ``Allocate(acquire, release)`` — acquire/release pair with ExitCase. 

66 - ``Bind(source, continuation)`` — monadic chaining. 

67 - ``Eval(thunk, suspend)`` — wraps an effect, optionally suspending. 

68 

69 Type parameters: 

70 F: Effect type (typically ``IO``). 

71 T: Value type produced by the resource. 

72 """ 

73 

74 Pure = Tuple(T) 

75 Allocate = Struct(acquire=Effect[T], release=Callable[[T, ExitCase], Effect[None]]) 

76 Bind = Struct( 

77 source=ForwardRef('Resource'), continuation=Callable[[T], ForwardRef('Resource')] 

78 ) 

79 Eval = Struct(thunk=Effect[Any], suspend=bool) 

80 

81 def use(self, f: Callable[[T], Effect[U]]) -> F[U]: 

82 """Execute the resource, applying ``f`` to the acquired value. 

83 

84 Release is guaranteed via ``bracket_case`` — runs even on exception. 

85 

86 Args: 

87 f: Function to apply to the acquired value, returning an Effect. 

88 

89 Returns: 

90 The Effect produced by ``f``, with release guaranteed after. 

91 

92 Examples: 

93 :: 

94 

95 res = Resource[IO, int].pure(42) 

96 result = res.use(lambda v: IO.pure(v * 2))() 

97 assert result == 84 

98 """ 

99 match self: 

100 case Resource.Pure(value): 

101 return f(value) 

102 

103 case Resource.Allocate(acquire, release): 

104 return acquire.bracket_case(f, release) 

105 

106 case Resource.Bind(source, continuation): 

107 

108 def run(value: T) -> F[U]: 

109 next_value = continuation(value) 

110 return next_value.use(f) 

111 

112 return source.use(run) 

113 

114 case Resource.Eval(thunk, suspend): 

115 if suspend: 

116 return thunk.and_then(lambda v: v.use(f)) 

117 return thunk.and_then(f) 

118 

119 def and_then(self, f: Callable[[T], Resource[F, U]]) -> Resource[F, U]: 

120 """Monadic bind — chain with a function returning another Resource. 

121 

122 Args: 

123 f: Function that takes the acquired value and returns a new Resource. 

124 

125 Returns: 

126 A new Resource that acquires both in sequence, releases in LIFO order. 

127 

128 Examples: 

129 :: 

130 

131 r1 = Resource[IO, str].pure('hello') 

132 r2 = r1.and_then(lambda s: Resource[IO, str].pure(s + ' world')) 

133 assert r2.run() == 'hello world' 

134 """ 

135 return self.Bind[*self.__wrapped_type__](source=self, continuation=f) 

136 

137 def run(self) -> T: 

138 """Execute the resource and return the value. 

139 

140 Shortcut for ``use(IO.pure)()``. Acquires, extracts value, releases. 

141 

142 Returns: 

143 The acquired value after release. 

144 

145 Examples: 

146 :: 

147 

148 assert Resource[IO, int].pure(7).run() == 7 

149 """ 

150 return self.use(IO.pure)() 

151 

152 def open(self) -> ResourceHandle[T]: 

153 """Create a context manager for ``with``-statement usage. 

154 

155 Each call returns an independent ``ResourceHandle`` with its own 

156 cleanup stack. Safe for multiple concurrent opens from the same Resource. 

157 

158 Returns: 

159 A ``ResourceHandle`` that acquires on ``__enter__`` and releases 

160 on ``__exit__``. 

161 

162 Examples: 

163 :: 

164 

165 res = Resource[IO, str].make(IO.pure('x'), lambda v: IO(lambda: None)) 

166 with res.open() as value: 

167 assert value == 'x' 

168 """ 

169 return ResourceHandle(self) 

170 

171 @classmethod 

172 def pure(cls, value: T) -> Resource[F, T]: 

173 """Wrap a pure value — no acquire/release needed. 

174 

175 Args: 

176 value: The value to wrap. 

177 

178 Returns: 

179 A Resource that immediately yields the value with no cleanup. 

180 

181 Examples: 

182 :: 

183 

184 res = Resource[IO, int].pure(42) 

185 assert res.run() == 42 

186 """ 

187 return cls.Pure[*cls.__wrapped_type__](value) 

188 

189 @classmethod 

190 def make( 

191 cls, acquire: F[T], release: Callable[[T], F[None]] 

192 ) -> Resource[F, T]: 

193 """Create a resource with simple release (no ExitCase info). 

194 

195 Args: 

196 acquire: Effect that produces the resource value. 

197 release: Function that takes the value and returns a cleanup Effect. 

198 

199 Returns: 

200 A Resource with bracket semantics (acquire → use → release). 

201 

202 Examples: 

203 :: 

204 

205 acquired, released = [], [] 

206 res = Resource[IO, str].make( 

207 IO(lambda: (acquired.append(1), 'val')[1]), 

208 lambda v: IO(lambda: released.append(v)), 

209 ) 

210 assert res.run() == 'val' 

211 assert acquired == [1] and released == ['val'] 

212 """ 

213 return cls.Allocate[*cls.__wrapped_type__]( 

214 acquire=acquire, release=lambda v, _: release(v) 

215 ) 

216 

217 @classmethod 

218 def make_case( 

219 cls, acquire: F[T], release: Callable[[T, ExitCase], F[None]] 

220 ) -> Resource[F, T]: 

221 """Create a resource with ExitCase-aware release. 

222 

223 The release function receives the ``ExitCase`` indicating how the 

224 use-phase ended (Succeeded, Canceled, or Errored). 

225 

226 Args: 

227 acquire: Effect that produces the resource value. 

228 release: Function ``(value, exit_case) -> cleanup Effect``. 

229 

230 Returns: 

231 A Resource with ExitCase-aware bracket semantics. 

232 

233 Examples: 

234 :: 

235 

236 cases = [] 

237 res = Resource[IO, str].make_case( 

238 IO.pure('val'), 

239 lambda v, ec: IO(lambda: cases.append(ec)), 

240 ) 

241 res.run() 

242 assert cases[0] == ExitCase.Succeeded 

243 """ 

244 return cls.Allocate[*cls.__wrapped_type__](acquire=acquire, release=release) 

245 

246 @classmethod 

247 def eval(cls, effect: F[T]) -> Resource[F, T]: 

248 """Wrap an effect as a resource (no cleanup needed). 

249 

250 Args: 

251 effect: The Effect to wrap. 

252 

253 Returns: 

254 A Resource that runs the effect and yields its result. 

255 

256 Examples: 

257 :: 

258 

259 res = Resource[IO, int].eval(IO.pure(7)) 

260 assert res.run() == 7 

261 """ 

262 return cls.Eval[*cls.__wrapped_type__](thunk=effect, suspend=False) 

263 

264 @classmethod 

265 def suspend(cls, effect: F[Resource[F, T]]) -> Resource[F, T]: 

266 """Suspend a resource-producing effect for lazy evaluation. 

267 

268 The outer effect produces a Resource, which is then flattened 

269 into a single Resource. 

270 

271 Args: 

272 effect: An Effect that returns a Resource. 

273 

274 Returns: 

275 A Resource that lazily evaluates the outer effect. 

276 """ 

277 return cls.Eval[*cls.__wrapped_type__](thunk=effect, suspend=True) 

278 

279 

280class ResourceHandle[T]: 

281 """Context manager wrapping a ``Resource`` for ``with``-statement usage. 

282 

283 Each handle is independent — multiple ``open()`` calls on the same 

284 Resource produce independent handles with separate cleanup stacks. 

285 

286 Handles partial-acquire rollback: if acquisition fails mid-chain, 

287 already-acquired resources are released before re-raising. 

288 

289 Examples: 

290 :: 

291 

292 res = Resource[IO, str].make(IO.pure('x'), lambda v: IO(lambda: None)) 

293 with res.open() as value: 

294 assert value == 'x' 

295 # release runs here 

296 """ 

297 

298 __slots__ = ('_active', '_resource', '_stack', '_value') 

299 

300 def __init__(self, resource: Resource) -> None: 

301 self._resource = resource 

302 self._stack: list = [] 

303 self._value: T | None = None 

304 self._active = False 

305 

306 def __enter__(self) -> T: 

307 """Acquire the resource. Rolls back partial acquisitions on failure.""" 

308 if self._active: 

309 raise RuntimeError('ResourceHandle is not reentrant') 

310 self._stack = [] 

311 try: 

312 self._value = resource_acquire(self._resource, self._stack)() 

313 except BaseException as exc: 

314 release_io = resource_release(ExitCase.from_exc(exc), self._stack) 

315 if release_io is not None: 

316 release_io() 

317 self._stack = [] 

318 raise 

319 self._active = True 

320 return self._value 

321 

322 def __exit__( 

323 self, 

324 exc_type: type[BaseException] | None, 

325 exc_val: BaseException | None, 

326 exc_tb: Any, 

327 ) -> None: 

328 """Release the resource. Passes ExitCase to release callbacks.""" 

329 exit_case = ExitCase.from_exc(exc_val) 

330 release_io = resource_release(exit_case, self._stack) 

331 if release_io is not None: 

332 release_io() 

333 self._stack = [] 

334 self._value = None 

335 self._active = False 

336 

337 

338def resource_acquire[F: Effect, T]( 

339 resource: Resource[F, T], stack: list[Callable[[ExitCase], Effect[None]]] 

340) -> F[T]: 

341 """Unwrap a Resource tree, pushing release callbacks onto the stack. 

342 

343 Args: 

344 resource: The Resource to acquire. 

345 stack: Mutable list where release callbacks are appended (LIFO order). 

346 

347 Returns: 

348 An Effect that produces the acquired value. 

349 """ 

350 match resource: 

351 case Resource.Pure(value): 

352 return IO.pure(value) 

353 

354 case Resource.Allocate(acquire, release): 

355 

356 def run(value: T) -> T: 

357 stack.append(partial(release, value)) 

358 return value 

359 

360 return acquire.and_then(IO.wrap(run)) 

361 

362 case Resource.Bind(source, continuation): 

363 

364 def run(value: T) -> F[U]: 

365 next_value = continuation(value) 

366 return resource_acquire(next_value, stack) 

367 

368 return resource_acquire(source, stack).and_then(run) 

369 

370 case Resource.Eval(thunk, suspend): 

371 if suspend: 

372 return thunk.and_then(lambda v: resource_acquire(v, stack)) 

373 return thunk 

374 

375 

376def resource_release( 

377 exit_case: ExitCase, stack: list[Callable[[ExitCase], Effect[None]]] 

378) -> Effect[None] | None: 

379 """Execute all release callbacks from the stack in LIFO order. 

380 

381 Every finalizer runs regardless of prior failures. The first exception 

382 is re-raised after all finalizers have been attempted. 

383 

384 Args: 

385 exit_case: How the use-phase ended (Succeeded/Canceled/Errored). 

386 stack: Release callbacks accumulated by ``resource_acquire``. 

387 

388 Returns: 

389 An IO that runs all releases, or None if the stack is empty. 

390 """ 

391 if not stack: 

392 return None 

393 

394 def run_all() -> None: 

395 first_exc: BaseException | None = None 

396 for fn in reversed(stack): 

397 try: 

398 fn(exit_case)() 

399 except Exception as e: 

400 if first_exc is None: 

401 first_exc = e 

402 else: 

403 logger.debug('resource_release: ошибка подавлена: %s', e) 

404 if first_exc is not None: 

405 raise first_exc 

406 

407 return IO(run_all)