Coverage for src/dataknobs_fsm/api/async_simple.py: 24%

126 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1"""Async-first API for production FSM operations. 

2 

3This module provides an async-first interface for FSM operations, designed to 

4work natively in async contexts (web services, FastAPI, async applications) 

5without any asyncio.run() overhead. This is the recommended API for production 

6use when your application is already async. 

7 

8Architecture: 

9 AsyncSimpleFSM is the foundation for the dataknobs-fsm async API tier: 

10 

11 **Design Philosophy:** 

12 - Async/await native - no blocking calls, no thread overhead 

13 - Production-ready with proper error handling and resource management 

14 - High concurrency support - process thousands of requests concurrently 

15 - Memory efficient - streaming support for large datasets 

16 - Framework agnostic - works with FastAPI, aiohttp, asyncio, etc. 

17 

18 **Compared to SimpleFSM:** 

19 - SimpleFSM: Synchronous wrapper with event loop overhead 

20 - AsyncSimpleFSM: Native async, no overhead, better performance 

21 

22 **Compared to AdvancedFSM:** 

23 - AsyncSimpleFSM: Simple API, automatic execution 

24 - AdvancedFSM: Manual control, debugging, profiling 

25 

26Async Patterns: 

27 This module enables several async patterns for production systems: 

28 

29 **Web Service Integration (FastAPI):** 

30 ```python 

31 from fastapi import FastAPI 

32 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM 

33 from dataknobs_fsm.core.data_modes import DataHandlingMode 

34 

35 app = FastAPI() 

36 

37 # Initialize FSM at startup 

38 fsm = None 

39 

40 @app.on_event("startup") 

41 async def startup(): 

42 global fsm 

43 fsm = AsyncSimpleFSM( 

44 'pipeline.yaml', 

45 data_mode=DataHandlingMode.COPY # Safe for concurrent requests 

46 ) 

47 

48 @app.on_event("shutdown") 

49 async def shutdown(): 

50 if fsm: 

51 await fsm.close() 

52 

53 @app.post("/process") 

54 async def process_endpoint(data: dict): 

55 result = await fsm.process(data) 

56 return result 

57 ``` 

58 

59 **Concurrent Processing:** 

60 ```python 

61 import asyncio 

62 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM 

63 

64 fsm = AsyncSimpleFSM('config.yaml') 

65 

66 # Process multiple requests concurrently 

67 async def process_many(items): 

68 tasks = [fsm.process(item) for item in items] 

69 results = await asyncio.gather(*tasks) 

70 return results 

71 

72 # Run 

73 items = [{'id': i, 'text': f'Item {i}'} for i in range(100)] 

74 results = await process_many(items) 

75 ``` 

76 

77 **Streaming Large Files:** 

78 ```python 

79 # Memory-efficient processing of large files 

80 fsm = AsyncSimpleFSM('pipeline.yaml') 

81 

82 stats = await fsm.process_stream( 

83 source='large_input.jsonl', 

84 sink='output.jsonl', 

85 chunk_size=1000, 

86 use_streaming=True # Memory-efficient mode 

87 ) 

88 print(f"Processed {stats['total_processed']} records") 

89 print(f"Throughput: {stats['throughput']:.2f} records/sec") 

90 ``` 

91 

92 **Background Task Processing:** 

93 ```python 

94 import asyncio 

95 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM 

96 

97 async def background_processor(queue: asyncio.Queue): 

98 fsm = AsyncSimpleFSM('processor.yaml') 

99 

100 try: 

101 while True: 

102 # Get work from queue 

103 item = await queue.get() 

104 if item is None: # Shutdown signal 

105 break 

106 

107 # Process asynchronously 

108 result = await fsm.process(item) 

109 

110 # Handle result 

111 if not result['success']: 

112 print(f"Error: {result['error']}") 

113 

114 queue.task_done() 

115 finally: 

116 await fsm.close() 

117 

118 # Start background processor 

119 work_queue = asyncio.Queue() 

120 task = asyncio.create_task(background_processor(work_queue)) 

121 ``` 

122 

123Production Considerations: 

124 **Concurrency:** 

125 - Use DataHandlingMode.COPY for concurrent processing (default) 

126 - Each request gets its own data copy - safe for parallel execution 

127 - For high throughput, use process_batch() with appropriate max_workers 

128 

129 **Resource Management:** 

130 - Always call close() to release database connections, file handles, etc. 

131 - Use startup/shutdown hooks in web frameworks (FastAPI, aiohttp) 

132 - Configure connection pooling in resource definitions 

133 

134 **Error Handling:** 

135 - Process methods return {'success': bool, 'error': str} for graceful degradation 

136 - Use try/except for critical failures that should halt execution 

137 - Configure retry logic in error_recovery patterns 

138 

139 **Memory Management:** 

140 - Use use_streaming=True for large file processing 

141 - Configure appropriate chunk_size based on available memory 

142 - Monitor memory usage with profiling tools 

143 

144 **Performance Optimization:** 

145 - Use REFERENCE mode for read-only transformations (memory-efficient) 

146 - Use DIRECT mode for single-threaded pipelines (fastest) 

147 - Tune batch_size and max_workers for optimal CPU utilization 

148 - Profile with AdvancedFSM.profile_execution() to identify bottlenecks 

149 

150Example: 

151 Complete production FastAPI service: 

152 

153 ```python 

154 from fastapi import FastAPI, BackgroundTasks, HTTPException 

155 from pydantic import BaseModel 

156 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM 

157 from dataknobs_fsm.core.data_modes import DataHandlingMode 

158 import asyncio 

159 

160 app = FastAPI(title="FSM Processing Service") 

161 

162 # Global FSM instance 

163 fsm: AsyncSimpleFSM | None = None 

164 

165 # Request/Response models 

166 class ProcessRequest(BaseModel): 

167 text: str 

168 metadata: dict = {} 

169 

170 class ProcessResponse(BaseModel): 

171 success: bool 

172 result: dict 

173 path: list[str] 

174 error: str | None = None 

175 

176 # Lifecycle management 

177 @app.on_event("startup") 

178 async def startup(): 

179 global fsm 

180 # Initialize FSM with production config 

181 fsm = AsyncSimpleFSM( 

182 config='production.yaml', 

183 data_mode=DataHandlingMode.COPY, # Safe for concurrent requests 

184 resources={ 

185 'database': { 

186 'type': 'DATABASE', 

187 'backend': 'postgres', 

188 'host': 'db.prod.example.com', 

189 'database': 'app_data', 

190 'pool_size': 20 # Connection pooling 

191 } 

192 } 

193 ) 

194 print("FSM initialized") 

195 

196 @app.on_event("shutdown") 

197 async def shutdown(): 

198 if fsm: 

199 await fsm.close() 

200 print("FSM closed") 

201 

202 # Endpoints 

203 @app.post("/process", response_model=ProcessResponse) 

204 async def process_single(request: ProcessRequest): 

205 \"\"\"Process a single request through the FSM.\"\"\" 

206 if not fsm: 

207 raise HTTPException(status_code=503, detail="Service not ready") 

208 

209 result = await fsm.process({ 

210 'text': request.text, 

211 'metadata': request.metadata 

212 }) 

213 

214 return ProcessResponse( 

215 success=result['success'], 

216 result=result['data'], 

217 path=result['path'], 

218 error=result.get('error') 

219 ) 

220 

221 @app.post("/batch", response_model=list[ProcessResponse]) 

222 async def process_batch_endpoint(requests: list[ProcessRequest]): 

223 \"\"\"Process multiple requests in parallel.\"\"\" 

224 if not fsm: 

225 raise HTTPException(status_code=503, detail="Service not ready") 

226 

227 # Convert to processing format 

228 data = [ 

229 {'text': req.text, 'metadata': req.metadata} 

230 for req in requests 

231 ] 

232 

233 # Process batch 

234 results = await fsm.process_batch( 

235 data=data, 

236 batch_size=10, 

237 max_workers=4 

238 ) 

239 

240 # Format responses 

241 return [ 

242 ProcessResponse( 

243 success=r['success'], 

244 result=r['data'], 

245 path=r['path'], 

246 error=r.get('error') 

247 ) 

248 for r in results 

249 ] 

250 

251 @app.post("/file") 

252 async def process_file_endpoint( 

253 background_tasks: BackgroundTasks, 

254 input_path: str, 

255 output_path: str 

256 ): 

257 \"\"\"Process a file in the background.\"\"\" 

258 if not fsm: 

259 raise HTTPException(status_code=503, detail="Service not ready") 

260 

261 async def process_file_task(): 

262 stats = await fsm.process_stream( 

263 source=input_path, 

264 sink=output_path, 

265 chunk_size=1000, 

266 use_streaming=True 

267 ) 

268 print(f"File processing complete: {stats}") 

269 

270 background_tasks.add_task(process_file_task) 

271 return {"status": "processing", "message": "File processing started"} 

272 

273 # Run: uvicorn app:app --host 0.0.0.0 --port 8000 

274 ``` 

275 

276See Also: 

277 - :class:`SimpleFSM`: Synchronous wrapper for scripts and prototypes 

278 - :class:`AdvancedFSM`: Advanced API with debugging and profiling 

279 - :class:`DataHandlingMode`: Data processing mode options 

280 - :mod:`dataknobs_fsm.patterns.error_recovery`: Production error handling patterns 

281 - :mod:`dataknobs_fsm.resources.manager`: Resource management and pooling 

282""" 

283 

284import asyncio 

285from collections.abc import AsyncIterator, Callable 

286from pathlib import Path 

287from typing import Any 

288 

289from dataknobs_data import Record 

290 

291from ..config.builder import FSMBuilder 

292from ..config.loader import ConfigLoader 

293from ..core.context_factory import ContextFactory 

294from ..core.data_modes import DataHandlingMode 

295from ..core.result_formatter import ResultFormatter 

296from ..execution.async_batch import AsyncBatchExecutor 

297from ..execution.async_engine import AsyncExecutionEngine 

298from ..execution.async_stream import AsyncStreamExecutor 

299from ..resources.manager import ResourceManager 

300from ..streaming.core import StreamConfig as CoreStreamConfig 

301 

302 

303class AsyncSimpleFSM: 

304 """Async-first FSM interface for production workflows. 

305 

306 This class provides a fully asynchronous API for FSM operations, designed 

307 to work natively in async contexts without blocking calls or thread overhead. 

308 This is the recommended FSM implementation for production systems. 

309 

310 AsyncSimpleFSM handles all the complexity of async execution, resource 

311 management, and concurrent processing while providing a simple, clean API. 

312 It's optimized for high-throughput scenarios with proper connection pooling, 

313 error handling, and memory management. 

314 

315 Attributes: 

316 data_mode (DataHandlingMode): Data processing mode (COPY/REFERENCE/DIRECT) 

317 _config: Loaded FSM configuration 

318 _fsm (FSM): Core FSM engine 

319 _resource_manager (ResourceManager): Resource lifecycle and pooling manager 

320 _async_engine (AsyncExecutionEngine): Async execution engine 

321 

322 Methods: 

323 process: Process single record asynchronously 

324 process_batch: Process multiple records with concurrency control 

325 process_stream: Stream-process large datasets 

326 validate: Validate data against schema 

327 get_states: List all FSM state names 

328 get_resources: List all registered resources 

329 close: Release all resources and cleanup 

330 

331 Production Use Cases: 

332 **Web API Backend:** 

333 Handle thousands of concurrent requests in FastAPI/aiohttp services. 

334 Each request processes independently with automatic resource pooling. 

335 

336 **Data Pipeline Processing:** 

337 Transform large datasets with memory-efficient streaming and parallel 

338 batch processing. Configurable chunk sizes and worker counts. 

339 

340 **Real-time Event Processing:** 

341 Process events from queues (RabbitMQ, Kafka) with async consumers. 

342 High throughput with concurrent processing of independent events. 

343 

344 **Batch Job Processing:** 

345 Schedule and run large batch jobs with progress tracking and error 

346 handling. Configurable parallelism for optimal resource utilization. 

347 

348 Note: 

349 **Concurrency Safety:** 

350 AsyncSimpleFSM is safe for concurrent use when using DataHandlingMode.COPY 

351 (default). Each process() call operates on independent data. For REFERENCE 

352 or DIRECT modes, ensure external synchronization. 

353 

354 **Resource Pooling:** 

355 Resources (databases, HTTP clients) use connection pooling automatically. 

356 Configure pool_size in resource definitions for optimal performance. 

357 

358 **Error Handling:** 

359 Process methods return success/error dicts rather than raising exceptions, 

360 allowing graceful degradation. Use try/except only for critical failures. 

361 

362 **Memory Management:** 

363 For large datasets, use process_stream() with use_streaming=True for 

364 constant memory usage regardless of file size. 

365 

366 Example: 

367 Basic async processing: 

368 

369 ```python 

370 import asyncio 

371 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM 

372 

373 async def main(): 

374 # Create FSM 

375 fsm = AsyncSimpleFSM('pipeline.yaml') 

376 

377 try: 

378 # Process single record 

379 result = await fsm.process({ 

380 'text': 'Input data', 

381 'metadata': {'source': 'api'} 

382 }) 

383 

384 if result['success']: 

385 print(f"Result: {result['data']}") 

386 print(f"States: {' -> '.join(result['path'])}") 

387 else: 

388 print(f"Error: {result['error']}") 

389 finally: 

390 await fsm.close() 

391 

392 asyncio.run(main()) 

393 ``` 

394 

395 Concurrent processing with asyncio.gather: 

396 

397 ```python 

398 async def process_concurrent(): 

399 fsm = AsyncSimpleFSM('config.yaml') 

400 

401 try: 

402 # Create tasks for concurrent execution 

403 tasks = [ 

404 fsm.process({'id': 1, 'text': 'Item 1'}), 

405 fsm.process({'id': 2, 'text': 'Item 2'}), 

406 fsm.process({'id': 3, 'text': 'Item 3'}) 

407 ] 

408 

409 # Execute concurrently 

410 results = await asyncio.gather(*tasks) 

411 

412 # Check results 

413 for i, result in enumerate(results, 1): 

414 status = "✓" if result['success'] else "✗" 

415 print(f"{status} Item {i}: {result.get('data', result.get('error'))}") 

416 finally: 

417 await fsm.close() 

418 ``` 

419 

420 Production web service pattern: 

421 

422 ```python 

423 from fastapi import FastAPI 

424 from contextlib import asynccontextmanager 

425 

426 @asynccontextmanager 

427 async def lifespan(app: FastAPI): 

428 # Startup: initialize FSM 

429 app.state.fsm = AsyncSimpleFSM( 

430 'production.yaml', 

431 resources={ 

432 'db': { 

433 'type': 'DATABASE', 

434 'backend': 'postgres', 

435 'pool_size': 20, # Connection pooling 

436 'host': 'db.prod.example.com' 

437 } 

438 } 

439 ) 

440 yield 

441 # Shutdown: cleanup 

442 await app.state.fsm.close() 

443 

444 app = FastAPI(lifespan=lifespan) 

445 

446 @app.post("/process") 

447 async def process(request: dict): 

448 result = await app.state.fsm.process(request) 

449 return result 

450 ``` 

451 

452 Batch processing with progress tracking: 

453 

454 ```python 

455 async def process_with_progress(): 

456 fsm = AsyncSimpleFSM('pipeline.yaml') 

457 

458 # Progress callback 

459 def on_progress(current, total): 

460 pct = (current / total) * 100 

461 print(f"Progress: {current}/{total} ({pct:.1f}%)") 

462 

463 try: 

464 records = [{'id': i} for i in range(1000)] 

465 results = await fsm.process_batch( 

466 data=records, 

467 batch_size=50, 

468 max_workers=10, 

469 on_progress=on_progress 

470 ) 

471 

472 successful = sum(1 for r in results if r['success']) 

473 print(f"Success rate: {successful}/{len(results)}") 

474 finally: 

475 await fsm.close() 

476 ``` 

477 

478 Stream processing large files: 

479 

480 ```python 

481 async def process_large_file(): 

482 fsm = AsyncSimpleFSM('transform.yaml') 

483 

484 try: 

485 # Memory-efficient streaming 

486 stats = await fsm.process_stream( 

487 source='input_100gb.jsonl', 

488 sink='output.jsonl', 

489 chunk_size=1000, 

490 use_streaming=True # Constant memory usage 

491 ) 

492 

493 print(f"Processed: {stats['total_processed']}") 

494 print(f"Success: {stats['successful']}") 

495 print(f"Failed: {stats['failed']}") 

496 print(f"Duration: {stats['duration']:.2f}s") 

497 print(f"Throughput: {stats['throughput']:.2f} records/sec") 

498 finally: 

499 await fsm.close() 

500 ``` 

501 

502 See Also: 

503 - :class:`SimpleFSM`: Synchronous wrapper for scripts 

504 - :class:`AdvancedFSM`: Advanced API with debugging 

505 - :func:`create_async_fsm`: Factory function for creating instances 

506 - :mod:`dataknobs_fsm.execution.async_engine`: Async execution engine 

507 - :mod:`dataknobs_fsm.resources.manager`: Resource management 

508 """ 

509 

510 def __init__( 

511 self, 

512 config: str | Path | dict[str, Any], 

513 data_mode: DataHandlingMode = DataHandlingMode.COPY, 

514 resources: dict[str, Any] | None = None, 

515 custom_functions: dict[str, Callable] | None = None 

516 ): 

517 """Initialize AsyncSimpleFSM from configuration. 

518 

519 Args: 

520 config: Path to config file or config dictionary 

521 data_mode: Default data mode for processing 

522 resources: Optional resource configurations 

523 custom_functions: Optional custom functions to register 

524 """ 

525 self.data_mode = data_mode 

526 self._resources = resources or {} 

527 self._custom_functions = custom_functions or {} 

528 

529 # Create loader with knowledge of custom functions 

530 loader = ConfigLoader() 

531 

532 # Tell the loader about registered function names 

533 if self._custom_functions: 

534 for name in self._custom_functions.keys(): 

535 loader.add_registered_function(name) 

536 

537 # Load configuration 

538 if isinstance(config, (str, Path)): 

539 self._config = loader.load_from_file(Path(config)) 

540 else: 

541 self._config = loader.load_from_dict(config) 

542 

543 # Build FSM with custom functions 

544 builder = FSMBuilder() 

545 

546 # Register custom functions with the builder 

547 for name, func in self._custom_functions.items(): 

548 builder.register_function(name, func) 

549 

550 self._fsm = builder.build(self._config) 

551 

552 # Initialize resource manager 

553 self._resource_manager = ResourceManager() 

554 self._setup_resources() 

555 

556 # Create async execution engine 

557 self._async_engine = AsyncExecutionEngine(self._fsm) 

558 

559 def _setup_resources(self) -> None: 

560 """Set up resources from configuration.""" 

561 # Register resources from config 

562 if hasattr(self._config, 'resources'): 

563 for resource_config in self._config.resources: 

564 try: 

565 resource = self._create_resource_provider(resource_config) 

566 self._resource_manager.register_provider(resource_config.name, resource) 

567 except Exception: 

568 # Continue if resource creation fails - this is for simplified API 

569 pass 

570 

571 # Register additional resources passed to constructor 

572 for name, resource_config in self._resources.items(): 

573 try: 

574 # Use ResourceManager factory method 

575 self._resource_manager.register_from_dict(name, resource_config) 

576 except Exception: 

577 # Continue if resource creation fails 

578 pass 

579 

580 def _create_resource_provider(self, resource_config): 

581 """Create a resource provider from ResourceConfig.""" 

582 # Use the same logic as FSMBuilder 

583 from ..config.builder import FSMBuilder 

584 builder = FSMBuilder() 

585 return builder._create_resource(resource_config) 

586 

587 async def process(self, data: dict[str, Any] | Record) -> dict[str, Any]: 

588 """Process a single record through the FSM asynchronously. 

589 

590 Args: 

591 data: Input data to process 

592 

593 Returns: 

594 Dict containing the processed result 

595 """ 

596 # Convert to Record if needed 

597 if isinstance(data, dict): 

598 record = Record(data) 

599 else: 

600 record = data 

601 

602 # Create context 

603 from ..core.modes import ProcessingMode 

604 context = ContextFactory.create_context( 

605 fsm=self._fsm, 

606 data=record, 

607 data_mode=ProcessingMode.SINGLE, 

608 resource_manager=self._resource_manager 

609 ) 

610 

611 try: 

612 # Execute FSM asynchronously 

613 success, result = await self._async_engine.execute(context) 

614 

615 # Format result 

616 return ResultFormatter.format_single_result( 

617 context=context, 

618 success=success, 

619 result=result 

620 ) 

621 except Exception as e: 

622 return ResultFormatter.format_error_result( 

623 context=context, 

624 error=e 

625 ) 

626 

627 async def process_batch( 

628 self, 

629 data: list[dict[str, Any] | Record], 

630 batch_size: int = 10, 

631 max_workers: int = 4, 

632 on_progress: Callable | None = None 

633 ) -> list[dict[str, Any]]: 

634 """Process multiple records in parallel batches asynchronously. 

635 

636 Args: 

637 data: List of input records to process 

638 batch_size: Number of records per batch 

639 max_workers: Maximum parallel workers 

640 on_progress: Optional callback for progress updates 

641 

642 Returns: 

643 List of results for each input record 

644 """ 

645 batch_executor = AsyncBatchExecutor( 

646 fsm=self._fsm, 

647 parallelism=max_workers, 

648 batch_size=batch_size, 

649 progress_callback=on_progress 

650 ) 

651 

652 # Convert to Records 

653 records = [] 

654 for item in data: 

655 if isinstance(item, dict): 

656 records.append(Record(item)) 

657 else: 

658 records.append(item) 

659 

660 # Execute batch 

661 results = await batch_executor.execute_batch(items=records) 

662 

663 # Format results 

664 formatted_results = [] 

665 for result in results: 

666 if result.success: 

667 formatted_results.append({ 

668 'final_state': result.metadata.get('final_state', 'unknown'), 

669 'data': result.result, 

670 'path': result.metadata.get('path', []), 

671 'success': True, 

672 'error': None 

673 }) 

674 else: 

675 formatted_results.append({ 

676 'final_state': result.metadata.get('final_state', None), 

677 'data': result.result if result.result else {}, 

678 'path': result.metadata.get('path', []), 

679 'success': False, 

680 'error': str(result.error) if result.error else str(result.result) 

681 }) 

682 

683 return formatted_results 

684 

685 async def process_stream( 

686 self, 

687 source: str | AsyncIterator[dict[str, Any]], 

688 sink: str | None = None, 

689 chunk_size: int = 100, 

690 on_progress: Callable | None = None, 

691 input_format: str = 'auto', 

692 text_field_name: str = 'text', 

693 csv_delimiter: str = ',', 

694 csv_has_header: bool = True, 

695 skip_empty_lines: bool = True, 

696 use_streaming: bool = False 

697 ) -> dict[str, Any]: 

698 """Process a stream of data through the FSM asynchronously. 

699 

700 Args: 

701 source: Data source (file path or async iterator) 

702 sink: Optional output destination 

703 chunk_size: Size of processing chunks 

704 on_progress: Optional progress callback 

705 input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text') 

706 text_field_name: Field name for text lines when converting to dict 

707 csv_delimiter: CSV delimiter character 

708 csv_has_header: Whether CSV file has header row 

709 skip_empty_lines: Skip empty lines in text files 

710 use_streaming: Use memory-efficient streaming for large files 

711 

712 Returns: 

713 Dict containing stream processing statistics 

714 """ 

715 # Configure streaming 

716 stream_config = CoreStreamConfig( 

717 chunk_size=chunk_size, 

718 parallelism=4, 

719 memory_limit_mb=1024 

720 ) 

721 

722 # Create async stream executor 

723 stream_executor = AsyncStreamExecutor( 

724 fsm=self._fsm, 

725 stream_config=stream_config, 

726 progress_callback=on_progress 

727 ) 

728 

729 # Choose between streaming and regular mode 

730 if use_streaming and isinstance(source, str): 

731 # Use memory-efficient streaming for large files 

732 from ..utils.streaming_file_utils import ( 

733 create_streaming_file_reader, 

734 create_streaming_file_writer, 

735 ) 

736 

737 stream_source = create_streaming_file_reader( 

738 file_path=source, 

739 config=stream_config, 

740 input_format=input_format, 

741 text_field_name=text_field_name, 

742 csv_delimiter=csv_delimiter, 

743 csv_has_header=csv_has_header, 

744 skip_empty_lines=skip_empty_lines 

745 ) 

746 

747 # Handle sink for streaming mode 

748 sink_func = None 

749 cleanup_func = None 

750 if sink: 

751 sink_func, cleanup_func = await create_streaming_file_writer( 

752 file_path=sink, 

753 config=stream_config 

754 ) 

755 else: 

756 # Use regular mode (loads full chunks into memory) 

757 from ..utils.file_utils import create_file_reader, create_file_writer 

758 

759 # Handle file source 

760 if isinstance(source, str): 

761 stream_source = create_file_reader( 

762 file_path=source, 

763 input_format=input_format, 

764 text_field_name=text_field_name, 

765 csv_delimiter=csv_delimiter, 

766 csv_has_header=csv_has_header, 

767 skip_empty_lines=skip_empty_lines 

768 ) 

769 else: 

770 # Already an async iterator 

771 stream_source = source 

772 

773 # Handle sink for regular mode 

774 sink_func = None 

775 cleanup_func = None 

776 if sink: 

777 sink_func, cleanup_func = create_file_writer(sink) 

778 

779 try: 

780 # Execute stream using async executor 

781 result = await stream_executor.execute_stream( 

782 source=stream_source, 

783 sink=sink_func, 

784 chunk_size=chunk_size 

785 ) 

786 

787 return { 

788 'total_processed': result.total_processed, 

789 'successful': result.successful, 

790 'failed': result.failed, 

791 'duration': result.duration, 

792 'throughput': result.throughput 

793 } 

794 finally: 

795 # Clean up any resources (e.g., close files) 

796 if cleanup_func: 

797 if asyncio.iscoroutinefunction(cleanup_func): 

798 await cleanup_func() 

799 else: 

800 cleanup_func() 

801 

802 async def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]: 

803 """Validate data against FSM's start state schema asynchronously. 

804 

805 Args: 

806 data: Data to validate 

807 

808 Returns: 

809 Dict containing validation results 

810 """ 

811 # Convert to Record if needed 

812 if isinstance(data, dict): 

813 record = Record(data) 

814 else: 

815 record = data 

816 

817 # Get start state 

818 start_state = self._fsm.get_start_state() 

819 

820 # Validate against schema 

821 if start_state.schema: 

822 validation_result = start_state.schema.validate(record) 

823 return { 

824 'valid': validation_result.valid, 

825 'errors': validation_result.errors if not validation_result.valid else [] 

826 } 

827 else: 

828 return { 

829 'valid': True, 

830 'errors': [] 

831 } 

832 

833 def get_states(self) -> list[str]: 

834 """Get list of all state names in the FSM.""" 

835 states = [] 

836 # The FSM has networks, and each network has states 

837 for network in self._fsm.networks.values(): 

838 for state in network.states.values(): 

839 states.append(state.name) 

840 return states 

841 

842 def get_resources(self) -> list[str]: 

843 """Get list of registered resource names.""" 

844 return list(self._resource_manager._resources.keys()) 

845 

846 @property 

847 def config(self) -> Any: 

848 """Get the FSM configuration object.""" 

849 return self._config 

850 

851 async def close(self) -> None: 

852 """Clean up resources and close connections asynchronously.""" 

853 await self._resource_manager.cleanup() 

854 

855 # Alias for consistency with other async libraries 

856 aclose = close 

857 

858 

859# Factory function for AsyncSimpleFSM 

860async def create_async_fsm( 

861 config: str | Path | dict[str, Any], 

862 custom_functions: dict[str, Callable] | None = None, 

863 **kwargs 

864) -> AsyncSimpleFSM: 

865 """Factory function to create an AsyncSimpleFSM instance. 

866 

867 Args: 

868 config: Configuration file path or dictionary 

869 custom_functions: Optional custom functions to register 

870 **kwargs: Additional arguments passed to AsyncSimpleFSM 

871 

872 Returns: 

873 Configured AsyncSimpleFSM instance 

874 """ 

875 return AsyncSimpleFSM(config, custom_functions=custom_functions, **kwargs)