Coverage for src/dataknobs_fsm/resources/manager.py: 13%

274 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1"""Central resource manager for FSM.""" 

2 

3import threading 

4from contextlib import contextmanager 

5from typing import Any, Dict, Set 

6 

7from dataknobs_fsm.functions.base import ResourceError, ResourceConfig 

8from dataknobs_fsm.resources.base import ( 

9 IResourceProvider, 

10 IResourcePool, 

11 ResourceStatus, 

12 ResourceHealth, 

13 ResourceMetrics, 

14) 

15from dataknobs_fsm.resources.pool import ResourcePool, PoolConfig 

16 

17 

18class ResourceManager: 

19 """Manages resources across the FSM system.""" 

20 

21 def __init__(self): 

22 """Initialize the resource manager.""" 

23 self._providers: Dict[str, IResourceProvider] = {} 

24 self._pools: Dict[str, IResourcePool] = {} 

25 self._resources: Dict[str, Any] = {} 

26 self._resource_owners: Dict[str, Set[str]] = {} # resource_name -> owner_ids 

27 self._lock = threading.RLock() 

28 self._closed = False 

29 

30 def register_provider( 

31 self, 

32 name: str, 

33 provider: IResourceProvider, 

34 pool_config: PoolConfig | None = None 

35 ) -> None: 

36 """Register a resource provider. 

37  

38 Args: 

39 name: Resource name. 

40 provider: Resource provider. 

41 pool_config: Optional pool configuration. 

42 """ 

43 with self._lock: 

44 if name in self._providers: 

45 raise ValueError(f"Provider '{name}' already registered") 

46 

47 self._providers[name] = provider 

48 

49 if pool_config: 

50 # Create a pool for this provider 

51 pool = ResourcePool(provider, pool_config) 

52 self._pools[name] = pool 

53 

54 def unregister_provider(self, name: str) -> None: 

55 """Unregister a resource provider. 

56 

57 Args: 

58 name: Resource name. 

59 """ 

60 with self._lock: 

61 # Close pool if exists 

62 if name in self._pools: 

63 self._pools[name].close() 

64 del self._pools[name] 

65 

66 # Remove provider 

67 if name in self._providers: 

68 provider = self._providers[name] 

69 if hasattr(provider, 'close'): 

70 provider.close() 

71 del self._providers[name] 

72 

73 def get_provider(self, name: str) -> IResourceProvider | None: 

74 """Get a resource provider by name. 

75 

76 Args: 

77 name: Resource name. 

78 

79 Returns: 

80 The resource provider or None if not found. 

81 """ 

82 with self._lock: 

83 return self._providers.get(name) 

84 

85 def get_all_providers(self) -> Dict[str, IResourceProvider]: 

86 """Get all registered resource providers. 

87 

88 Returns: 

89 Dictionary of resource name to provider. 

90 """ 

91 with self._lock: 

92 return dict(self._providers) 

93 

94 def acquire( 

95 self, 

96 name: str, 

97 owner_id: str, 

98 timeout: float | None = None, 

99 **kwargs 

100 ) -> Any: 

101 """Acquire a resource. 

102  

103 Args: 

104 name: Resource name. 

105 owner_id: ID of the owner (e.g., state instance ID). 

106 timeout: Acquisition timeout. 

107 **kwargs: Additional provider-specific parameters. 

108  

109 Returns: 

110 The acquired resource. 

111  

112 Raises: 

113 ResourceError: If acquisition fails. 

114 """ 

115 if self._closed: 

116 raise ResourceError("Resource manager is closed", resource_name=name, operation="acquire") 

117 

118 # First check if provider exists and if owner already has resource 

119 with self._lock: 

120 if name not in self._providers: 

121 raise ResourceError( 

122 f"Unknown resource '{name}'", 

123 resource_name=name, 

124 operation="acquire" 

125 ) 

126 

127 # Check if owner already has this resource 

128 owner_key = f"{owner_id}:{name}" 

129 if owner_key in self._resources: 

130 return self._resources[owner_key] 

131 

132 # Check if we have a pool for this resource 

133 has_pool = name in self._pools 

134 

135 # Acquire resource outside of lock to prevent deadlock 

136 if has_pool: 

137 resource = self._pools[name].acquire(timeout) 

138 else: 

139 resource = self._providers[name].acquire(**kwargs) 

140 

141 # Re-acquire lock to track ownership 

142 with self._lock: 

143 # Double-check that owner doesn't have resource (race condition check) 

144 owner_key = f"{owner_id}:{name}" 

145 if owner_key in self._resources: 

146 # Another thread already acquired for this owner, release the extra 

147 if has_pool: 

148 self._pools[name].release(resource) 

149 else: 

150 self._providers[name].release(resource) 

151 return self._resources[owner_key] 

152 

153 # Track ownership 

154 self._resources[owner_key] = resource 

155 if name not in self._resource_owners: 

156 self._resource_owners[name] = set() 

157 self._resource_owners[name].add(owner_id) 

158 

159 return resource 

160 

161 def release(self, name: str, owner_id: str) -> None: 

162 """Release a resource. 

163  

164 Args: 

165 name: Resource name. 

166 owner_id: ID of the owner. 

167 """ 

168 with self._lock: 

169 owner_key = f"{owner_id}:{name}" 

170 

171 if owner_key not in self._resources: 

172 return # Resource not acquired or already released 

173 

174 resource = self._resources[owner_key] 

175 

176 # Release to pool or provider 

177 if name in self._pools: 

178 self._pools[name].release(resource) 

179 elif name in self._providers: 

180 self._providers[name].release(resource) 

181 

182 # Clean up tracking 

183 del self._resources[owner_key] 

184 if name in self._resource_owners: 

185 self._resource_owners[name].discard(owner_id) 

186 

187 def release_all(self, owner_id: str) -> None: 

188 """Release all resources owned by an owner. 

189  

190 Args: 

191 owner_id: ID of the owner. 

192 """ 

193 with self._lock: 

194 # Find all resources owned by this owner 

195 owner_resources = [] 

196 for key in list(self._resources.keys()): 

197 if key.startswith(f"{owner_id}:"): 

198 resource_name = key.split(":", 1)[1] 

199 owner_resources.append(resource_name) 

200 

201 # Release them all 

202 for resource_name in owner_resources: 

203 self.release(resource_name, owner_id) 

204 

205 def get_resource(self, name: str, owner_id: str) -> Any | None: 

206 """Get an acquired resource. 

207  

208 Args: 

209 name: Resource name. 

210 owner_id: ID of the owner. 

211  

212 Returns: 

213 The resource if acquired, None otherwise. 

214 """ 

215 owner_key = f"{owner_id}:{name}" 

216 return self._resources.get(owner_key) 

217 

218 def has_resource(self, name: str, owner_id: str) -> bool: 

219 """Check if an owner has acquired a resource. 

220  

221 Args: 

222 name: Resource name. 

223 owner_id: ID of the owner. 

224  

225 Returns: 

226 True if the owner has the resource. 

227 """ 

228 owner_key = f"{owner_id}:{name}" 

229 return owner_key in self._resources 

230 

231 def validate_resource(self, name: str) -> bool: 

232 """Validate a resource provider. 

233  

234 Args: 

235 name: Resource name. 

236  

237 Returns: 

238 True if the resource is valid. 

239 """ 

240 with self._lock: 

241 if name not in self._providers: 

242 return False 

243 

244 # Create a test resource to validate 

245 try: 

246 resource = self._providers[name].acquire() 

247 valid = self._providers[name].validate(resource) 

248 self._providers[name].release(resource) 

249 return valid 

250 except Exception: 

251 return False 

252 

253 def health_check(self, name: str | None = None) -> Dict[str, ResourceHealth]: 

254 """Check health of resources. 

255  

256 Args: 

257 name: Optional specific resource name. 

258  

259 Returns: 

260 Health status by resource name. 

261 """ 

262 with self._lock: 

263 if name: 

264 if name in self._providers: 

265 return {name: self._providers[name].health_check()} 

266 else: 

267 return {name: ResourceHealth.UNKNOWN} 

268 

269 # Check all resources 

270 health_status = {} 

271 for resource_name, provider in self._providers.items(): 

272 try: 

273 health_status[resource_name] = provider.health_check() 

274 except Exception: 

275 health_status[resource_name] = ResourceHealth.UNKNOWN 

276 

277 return health_status 

278 

279 def get_metrics(self, name: str | None = None) -> Dict[str, ResourceMetrics]: 

280 """Get resource metrics. 

281  

282 Args: 

283 name: Optional specific resource name. 

284  

285 Returns: 

286 Metrics by resource name. 

287 """ 

288 with self._lock: 

289 if name: 

290 metrics = {} 

291 if name in self._providers: 

292 metrics[name] = self._providers[name].get_metrics() 

293 if name in self._pools: 

294 metrics[f"{name}_pool"] = self._pools[name].get_metrics() 

295 return metrics 

296 

297 # Get all metrics 

298 all_metrics = {} 

299 for resource_name, provider in self._providers.items(): 

300 all_metrics[resource_name] = provider.get_metrics() 

301 for resource_name, pool in self._pools.items(): 

302 all_metrics[f"{resource_name}_pool"] = pool.get_metrics() 

303 

304 return all_metrics 

305 

306 @contextmanager 

307 def resource_context( 

308 self, 

309 name: str, 

310 owner_id: str, 

311 timeout: float | None = None, 

312 **kwargs 

313 ): 

314 """Context manager for resource acquisition. 

315  

316 Args: 

317 name: Resource name. 

318 owner_id: ID of the owner. 

319 timeout: Acquisition timeout. 

320 **kwargs: Additional parameters. 

321  

322 Yields: 

323 The acquired resource. 

324 """ 

325 resource = self.acquire(name, owner_id, timeout, **kwargs) 

326 try: 

327 yield resource 

328 finally: 

329 self.release(name, owner_id) 

330 

331 def configure_from_requirements( 

332 self, 

333 requirements: list[ResourceConfig], 

334 owner_id: str 

335 ) -> Dict[str, Any]: 

336 """Configure resources from requirements. 

337  

338 Args: 

339 requirements: List of resource configurations. 

340 owner_id: ID of the owner. 

341  

342 Returns: 

343 Dictionary of acquired resources. 

344  

345 Raises: 

346 ResourceError: If any resource cannot be acquired. 

347 """ 

348 acquired = {} 

349 

350 try: 

351 for config in requirements: 

352 resource = self.acquire( 

353 config.name, 

354 owner_id, 

355 timeout=config.timeout 

356 ) 

357 acquired[config.name] = resource 

358 

359 return acquired 

360 

361 except Exception as e: 

362 # Release any acquired resources on failure 

363 for name in acquired: 

364 try: 

365 self.release(name, owner_id) 

366 except Exception: 

367 pass 

368 raise ResourceError(f"Failed to acquire resources: {e}", resource_name="multiple", operation="configure") from e 

369 

370 def close(self) -> None: 

371 """Close the resource manager and release all resources.""" 

372 self._closed = True 

373 

374 with self._lock: 

375 # Release all acquired resources 

376 for owner_id in {key.split(":")[0] for key in self._resources.keys()}: 

377 self.release_all(owner_id) 

378 

379 # Close all pools 

380 for pool in self._pools.values(): 

381 pool.close() 

382 self._pools.clear() 

383 

384 # Close all providers 

385 for provider in self._providers.values(): 

386 if hasattr(provider, 'close'): 

387 provider.close() 

388 self._providers.clear() 

389 

390 self._resources.clear() 

391 self._resource_owners.clear() 

392 

393 async def cleanup(self) -> None: 

394 """Async cleanup of all resource providers. 

395  

396 This method performs async cleanup of resources that support it, 

397 while falling back to sync cleanup for those that don't. 

398 """ 

399 import asyncio 

400 import logging 

401 logger = logging.getLogger(__name__) 

402 

403 cleanup_tasks = [] 

404 sync_providers = [] 

405 

406 # Separate async and sync providers 

407 for name, provider in self._providers.items(): 

408 if hasattr(provider, 'aclose') or hasattr(provider, 'cleanup'): 

409 # Provider has async cleanup method 

410 if hasattr(provider, 'aclose'): 

411 cleanup_tasks.append(self._async_close_provider(name, provider)) 

412 elif hasattr(provider, 'cleanup'): 

413 cleanup_tasks.append(self._async_cleanup_provider(name, provider)) 

414 else: 

415 # Provider only has sync cleanup 

416 sync_providers.append((name, provider)) 

417 

418 # Run async cleanups concurrently 

419 if cleanup_tasks: 

420 results = await asyncio.gather(*cleanup_tasks, return_exceptions=True) 

421 for i, result in enumerate(results): 

422 if isinstance(result, Exception): 

423 logger.error(f"Error during async cleanup (task {i}): {result}") 

424 

425 # Run sync cleanups in executor to avoid blocking 

426 if sync_providers: 

427 loop = asyncio.get_event_loop() 

428 for name, provider in sync_providers: 

429 try: 

430 await loop.run_in_executor(None, self._close_provider, name, provider) 

431 except Exception as e: 

432 logger.error(f"Error closing sync provider {name}: {e}") 

433 

434 # Clear tracking data 

435 with self._lock: 

436 self._resources.clear() 

437 self._resource_owners.clear() 

438 self._pools.clear() 

439 self._providers.clear() 

440 

441 async def _async_close_provider(self, name: str, provider: IResourceProvider) -> None: 

442 """Close a provider that has an async close method. 

443  

444 Args: 

445 name: Provider name 

446 provider: Provider instance 

447 """ 

448 import logging 

449 logger = logging.getLogger(__name__) 

450 

451 try: 

452 await provider.aclose() 

453 logger.debug(f"Successfully closed async provider {name}") 

454 except Exception as e: 

455 logger.error(f"Error closing async provider {name}: {e}") 

456 raise 

457 

458 async def _async_cleanup_provider(self, name: str, provider: IResourceProvider) -> None: 

459 """Clean up a provider that has an async cleanup method. 

460  

461 Args: 

462 name: Provider name  

463 provider: Provider instance 

464 """ 

465 import logging 

466 logger = logging.getLogger(__name__) 

467 

468 try: 

469 await provider.cleanup() 

470 logger.debug(f"Successfully cleaned up async provider {name}") 

471 except Exception as e: 

472 logger.error(f"Error cleaning up async provider {name}: {e}") 

473 raise 

474 

475 def _close_provider(self, name: str, provider: IResourceProvider) -> None: 

476 """Close a sync provider. 

477  

478 Args: 

479 name: Provider name 

480 provider: Provider instance 

481 """ 

482 import logging 

483 logger = logging.getLogger(__name__) 

484 

485 try: 

486 if hasattr(provider, 'close'): 

487 provider.close() 

488 logger.debug(f"Successfully closed sync provider {name}") 

489 except Exception as e: 

490 logger.error(f"Error closing sync provider {name}: {e}") 

491 

492 def create_provider_from_dict(self, name: str, config: Dict[str, Any]) -> IResourceProvider: 

493 """Create a resource provider from a dictionary configuration. 

494  

495 Args: 

496 name: Resource name 

497 config: Dictionary configuration for the resource 

498  

499 Returns: 

500 Resource provider instance 

501 """ 

502 # Create a simple in-memory resource provider 

503 class SimpleResourceProvider(IResourceProvider): 

504 """Simple in-memory resource provider for testing and basic use cases. 

505 

506 Provides a lightweight resource provider that stores data in memory 

507 from configuration. Useful for testing FSMs without external dependencies 

508 or for simple static data resources. 

509 """ 

510 

511 def __init__(self, name: str, config: Dict[str, Any]): 

512 self.name = name 

513 self.config = config 

514 self.data = config.get('data', {}) 

515 self._status = ResourceStatus.IDLE 

516 

517 def acquire(self, **kwargs) -> Any: 

518 self._status = ResourceStatus.BUSY 

519 return self.data 

520 

521 def release(self, resource: Any) -> None: 

522 self._status = ResourceStatus.IDLE 

523 

524 def validate(self, resource: Any) -> bool: 

525 return resource is not None 

526 

527 def health_check(self) -> ResourceHealth: 

528 return ResourceHealth.HEALTHY 

529 

530 def get_metrics(self) -> ResourceMetrics: 

531 return ResourceMetrics( 

532 total_acquisitions=0, 

533 active_connections=1 if self._status == ResourceStatus.BUSY else 0, 

534 failed_acquisitions=0 

535 ) 

536 

537 async def get_resource(self): 

538 return self.data 

539 

540 async def close(self): 

541 pass 

542 

543 return SimpleResourceProvider(name, config) 

544 

545 def create_simple_provider(self, name: str, data: Any) -> IResourceProvider: 

546 """Create a simple resource provider with static data. 

547  

548 Args: 

549 name: Resource name 

550 data: The resource data to provide 

551  

552 Returns: 

553 Resource provider instance 

554 """ 

555 return self.create_provider_from_dict(name, {'data': data}) 

556 

557 def register_from_dict(self, name: str, config: Dict[str, Any]) -> None: 

558 """Register a resource provider from a dictionary configuration. 

559  

560 Args: 

561 name: Resource name 

562 config: Dictionary configuration for the resource 

563 """ 

564 provider = self.create_provider_from_dict(name, config) 

565 self.register_provider(name, provider) 

566 

567 def __enter__(self): 

568 """Enter context manager.""" 

569 return self 

570 

571 def __exit__(self, exc_type, exc_val, exc_tb): 

572 """Exit context manager.""" 

573 self.close() 

574 

575 def get_resource_status(self, name: str) -> Dict[str, Any]: 

576 """Get status information for a resource. 

577  

578 Args: 

579 name: Resource name. 

580  

581 Returns: 

582 Status dictionary with provider and pool information. 

583 """ 

584 with self._lock: 

585 status = { 

586 "provider_exists": name in self._providers, 

587 "has_pool": name in self._pools, 

588 "active_count": 0, 

589 "owners": list(self._resource_owners.get(name, set())) 

590 } 

591 

592 if name in self._providers: 

593 try: 

594 metrics = self._providers[name].get_metrics() 

595 status["active_count"] = metrics.active_connections 

596 status["total_acquires"] = metrics.total_acquisitions 

597 status["total_releases"] = metrics.total_acquisitions - metrics.active_connections 

598 except Exception: 

599 pass 

600 

601 return status 

602 

603 def get_all_resources(self) -> Dict[str, Dict[str, Any]]: 

604 """Get information about all registered resources. 

605  

606 Returns: 

607 Dictionary mapping resource names to their status. 

608 """ 

609 with self._lock: 

610 all_resources = {} 

611 for name in self._providers: 

612 all_resources[name] = self.get_resource_status(name) 

613 return all_resources 

614 

615 def get_resource_owners(self, name: str) -> Set[str]: 

616 """Get all owners of a specific resource. 

617  

618 Args: 

619 name: Resource name. 

620  

621 Returns: 

622 Set of owner IDs. 

623 """ 

624 with self._lock: 

625 return self._resource_owners.get(name, set()).copy()