Coverage for src/dataknobs_fsm/api/async_simple.py: 24%
126 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1"""Async-first API for production FSM operations.
3This module provides an async-first interface for FSM operations, designed to
4work natively in async contexts (web services, FastAPI, async applications)
5without any asyncio.run() overhead. This is the recommended API for production
6use when your application is already async.
8Architecture:
9 AsyncSimpleFSM is the foundation for the dataknobs-fsm async API tier:
11 **Design Philosophy:**
12 - Async/await native - no blocking calls, no thread overhead
13 - Production-ready with proper error handling and resource management
14 - High concurrency support - process thousands of requests concurrently
15 - Memory efficient - streaming support for large datasets
16 - Framework agnostic - works with FastAPI, aiohttp, asyncio, etc.
18 **Compared to SimpleFSM:**
19 - SimpleFSM: Synchronous wrapper with event loop overhead
20 - AsyncSimpleFSM: Native async, no overhead, better performance
22 **Compared to AdvancedFSM:**
23 - AsyncSimpleFSM: Simple API, automatic execution
24 - AdvancedFSM: Manual control, debugging, profiling
26Async Patterns:
27 This module enables several async patterns for production systems:
29 **Web Service Integration (FastAPI):**
30 ```python
31 from fastapi import FastAPI
32 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
33 from dataknobs_fsm.core.data_modes import DataHandlingMode
35 app = FastAPI()
37 # Initialize FSM at startup
38 fsm = None
40 @app.on_event("startup")
41 async def startup():
42 global fsm
43 fsm = AsyncSimpleFSM(
44 'pipeline.yaml',
45 data_mode=DataHandlingMode.COPY # Safe for concurrent requests
46 )
48 @app.on_event("shutdown")
49 async def shutdown():
50 if fsm:
51 await fsm.close()
53 @app.post("/process")
54 async def process_endpoint(data: dict):
55 result = await fsm.process(data)
56 return result
57 ```
59 **Concurrent Processing:**
60 ```python
61 import asyncio
62 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
64 fsm = AsyncSimpleFSM('config.yaml')
66 # Process multiple requests concurrently
67 async def process_many(items):
68 tasks = [fsm.process(item) for item in items]
69 results = await asyncio.gather(*tasks)
70 return results
72 # Run
73 items = [{'id': i, 'text': f'Item {i}'} for i in range(100)]
74 results = await process_many(items)
75 ```
77 **Streaming Large Files:**
78 ```python
79 # Memory-efficient processing of large files
80 fsm = AsyncSimpleFSM('pipeline.yaml')
82 stats = await fsm.process_stream(
83 source='large_input.jsonl',
84 sink='output.jsonl',
85 chunk_size=1000,
86 use_streaming=True # Memory-efficient mode
87 )
88 print(f"Processed {stats['total_processed']} records")
89 print(f"Throughput: {stats['throughput']:.2f} records/sec")
90 ```
92 **Background Task Processing:**
93 ```python
94 import asyncio
95 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
97 async def background_processor(queue: asyncio.Queue):
98 fsm = AsyncSimpleFSM('processor.yaml')
100 try:
101 while True:
102 # Get work from queue
103 item = await queue.get()
104 if item is None: # Shutdown signal
105 break
107 # Process asynchronously
108 result = await fsm.process(item)
110 # Handle result
111 if not result['success']:
112 print(f"Error: {result['error']}")
114 queue.task_done()
115 finally:
116 await fsm.close()
118 # Start background processor
119 work_queue = asyncio.Queue()
120 task = asyncio.create_task(background_processor(work_queue))
121 ```
123Production Considerations:
124 **Concurrency:**
125 - Use DataHandlingMode.COPY for concurrent processing (default)
126 - Each request gets its own data copy - safe for parallel execution
127 - For high throughput, use process_batch() with appropriate max_workers
129 **Resource Management:**
130 - Always call close() to release database connections, file handles, etc.
131 - Use startup/shutdown hooks in web frameworks (FastAPI, aiohttp)
132 - Configure connection pooling in resource definitions
134 **Error Handling:**
135 - Process methods return {'success': bool, 'error': str} for graceful degradation
136 - Use try/except for critical failures that should halt execution
137 - Configure retry logic in error_recovery patterns
139 **Memory Management:**
140 - Use use_streaming=True for large file processing
141 - Configure appropriate chunk_size based on available memory
142 - Monitor memory usage with profiling tools
144 **Performance Optimization:**
145 - Use REFERENCE mode for read-only transformations (memory-efficient)
146 - Use DIRECT mode for single-threaded pipelines (fastest)
147 - Tune batch_size and max_workers for optimal CPU utilization
148 - Profile with AdvancedFSM.profile_execution() to identify bottlenecks
150Example:
151 Complete production FastAPI service:
153 ```python
154 from fastapi import FastAPI, BackgroundTasks, HTTPException
155 from pydantic import BaseModel
156 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
157 from dataknobs_fsm.core.data_modes import DataHandlingMode
158 import asyncio
160 app = FastAPI(title="FSM Processing Service")
162 # Global FSM instance
163 fsm: AsyncSimpleFSM | None = None
165 # Request/Response models
166 class ProcessRequest(BaseModel):
167 text: str
168 metadata: dict = {}
170 class ProcessResponse(BaseModel):
171 success: bool
172 result: dict
173 path: list[str]
174 error: str | None = None
176 # Lifecycle management
177 @app.on_event("startup")
178 async def startup():
179 global fsm
180 # Initialize FSM with production config
181 fsm = AsyncSimpleFSM(
182 config='production.yaml',
183 data_mode=DataHandlingMode.COPY, # Safe for concurrent requests
184 resources={
185 'database': {
186 'type': 'DATABASE',
187 'backend': 'postgres',
188 'host': 'db.prod.example.com',
189 'database': 'app_data',
190 'pool_size': 20 # Connection pooling
191 }
192 }
193 )
194 print("FSM initialized")
196 @app.on_event("shutdown")
197 async def shutdown():
198 if fsm:
199 await fsm.close()
200 print("FSM closed")
202 # Endpoints
203 @app.post("/process", response_model=ProcessResponse)
204 async def process_single(request: ProcessRequest):
205 \"\"\"Process a single request through the FSM.\"\"\"
206 if not fsm:
207 raise HTTPException(status_code=503, detail="Service not ready")
209 result = await fsm.process({
210 'text': request.text,
211 'metadata': request.metadata
212 })
214 return ProcessResponse(
215 success=result['success'],
216 result=result['data'],
217 path=result['path'],
218 error=result.get('error')
219 )
221 @app.post("/batch", response_model=list[ProcessResponse])
222 async def process_batch_endpoint(requests: list[ProcessRequest]):
223 \"\"\"Process multiple requests in parallel.\"\"\"
224 if not fsm:
225 raise HTTPException(status_code=503, detail="Service not ready")
227 # Convert to processing format
228 data = [
229 {'text': req.text, 'metadata': req.metadata}
230 for req in requests
231 ]
233 # Process batch
234 results = await fsm.process_batch(
235 data=data,
236 batch_size=10,
237 max_workers=4
238 )
240 # Format responses
241 return [
242 ProcessResponse(
243 success=r['success'],
244 result=r['data'],
245 path=r['path'],
246 error=r.get('error')
247 )
248 for r in results
249 ]
251 @app.post("/file")
252 async def process_file_endpoint(
253 background_tasks: BackgroundTasks,
254 input_path: str,
255 output_path: str
256 ):
257 \"\"\"Process a file in the background.\"\"\"
258 if not fsm:
259 raise HTTPException(status_code=503, detail="Service not ready")
261 async def process_file_task():
262 stats = await fsm.process_stream(
263 source=input_path,
264 sink=output_path,
265 chunk_size=1000,
266 use_streaming=True
267 )
268 print(f"File processing complete: {stats}")
270 background_tasks.add_task(process_file_task)
271 return {"status": "processing", "message": "File processing started"}
273 # Run: uvicorn app:app --host 0.0.0.0 --port 8000
274 ```
276See Also:
277 - :class:`SimpleFSM`: Synchronous wrapper for scripts and prototypes
278 - :class:`AdvancedFSM`: Advanced API with debugging and profiling
279 - :class:`DataHandlingMode`: Data processing mode options
280 - :mod:`dataknobs_fsm.patterns.error_recovery`: Production error handling patterns
281 - :mod:`dataknobs_fsm.resources.manager`: Resource management and pooling
282"""
284import asyncio
285from collections.abc import AsyncIterator, Callable
286from pathlib import Path
287from typing import Any
289from dataknobs_data import Record
291from ..config.builder import FSMBuilder
292from ..config.loader import ConfigLoader
293from ..core.context_factory import ContextFactory
294from ..core.data_modes import DataHandlingMode
295from ..core.result_formatter import ResultFormatter
296from ..execution.async_batch import AsyncBatchExecutor
297from ..execution.async_engine import AsyncExecutionEngine
298from ..execution.async_stream import AsyncStreamExecutor
299from ..resources.manager import ResourceManager
300from ..streaming.core import StreamConfig as CoreStreamConfig
303class AsyncSimpleFSM:
304 """Async-first FSM interface for production workflows.
306 This class provides a fully asynchronous API for FSM operations, designed
307 to work natively in async contexts without blocking calls or thread overhead.
308 This is the recommended FSM implementation for production systems.
310 AsyncSimpleFSM handles all the complexity of async execution, resource
311 management, and concurrent processing while providing a simple, clean API.
312 It's optimized for high-throughput scenarios with proper connection pooling,
313 error handling, and memory management.
315 Attributes:
316 data_mode (DataHandlingMode): Data processing mode (COPY/REFERENCE/DIRECT)
317 _config: Loaded FSM configuration
318 _fsm (FSM): Core FSM engine
319 _resource_manager (ResourceManager): Resource lifecycle and pooling manager
320 _async_engine (AsyncExecutionEngine): Async execution engine
322 Methods:
323 process: Process single record asynchronously
324 process_batch: Process multiple records with concurrency control
325 process_stream: Stream-process large datasets
326 validate: Validate data against schema
327 get_states: List all FSM state names
328 get_resources: List all registered resources
329 close: Release all resources and cleanup
331 Production Use Cases:
332 **Web API Backend:**
333 Handle thousands of concurrent requests in FastAPI/aiohttp services.
334 Each request processes independently with automatic resource pooling.
336 **Data Pipeline Processing:**
337 Transform large datasets with memory-efficient streaming and parallel
338 batch processing. Configurable chunk sizes and worker counts.
340 **Real-time Event Processing:**
341 Process events from queues (RabbitMQ, Kafka) with async consumers.
342 High throughput with concurrent processing of independent events.
344 **Batch Job Processing:**
345 Schedule and run large batch jobs with progress tracking and error
346 handling. Configurable parallelism for optimal resource utilization.
348 Note:
349 **Concurrency Safety:**
350 AsyncSimpleFSM is safe for concurrent use when using DataHandlingMode.COPY
351 (default). Each process() call operates on independent data. For REFERENCE
352 or DIRECT modes, ensure external synchronization.
354 **Resource Pooling:**
355 Resources (databases, HTTP clients) use connection pooling automatically.
356 Configure pool_size in resource definitions for optimal performance.
358 **Error Handling:**
359 Process methods return success/error dicts rather than raising exceptions,
360 allowing graceful degradation. Use try/except only for critical failures.
362 **Memory Management:**
363 For large datasets, use process_stream() with use_streaming=True for
364 constant memory usage regardless of file size.
366 Example:
367 Basic async processing:
369 ```python
370 import asyncio
371 from dataknobs_fsm.api.async_simple import AsyncSimpleFSM
373 async def main():
374 # Create FSM
375 fsm = AsyncSimpleFSM('pipeline.yaml')
377 try:
378 # Process single record
379 result = await fsm.process({
380 'text': 'Input data',
381 'metadata': {'source': 'api'}
382 })
384 if result['success']:
385 print(f"Result: {result['data']}")
386 print(f"States: {' -> '.join(result['path'])}")
387 else:
388 print(f"Error: {result['error']}")
389 finally:
390 await fsm.close()
392 asyncio.run(main())
393 ```
395 Concurrent processing with asyncio.gather:
397 ```python
398 async def process_concurrent():
399 fsm = AsyncSimpleFSM('config.yaml')
401 try:
402 # Create tasks for concurrent execution
403 tasks = [
404 fsm.process({'id': 1, 'text': 'Item 1'}),
405 fsm.process({'id': 2, 'text': 'Item 2'}),
406 fsm.process({'id': 3, 'text': 'Item 3'})
407 ]
409 # Execute concurrently
410 results = await asyncio.gather(*tasks)
412 # Check results
413 for i, result in enumerate(results, 1):
414 status = "✓" if result['success'] else "✗"
415 print(f"{status} Item {i}: {result.get('data', result.get('error'))}")
416 finally:
417 await fsm.close()
418 ```
420 Production web service pattern:
422 ```python
423 from fastapi import FastAPI
424 from contextlib import asynccontextmanager
426 @asynccontextmanager
427 async def lifespan(app: FastAPI):
428 # Startup: initialize FSM
429 app.state.fsm = AsyncSimpleFSM(
430 'production.yaml',
431 resources={
432 'db': {
433 'type': 'DATABASE',
434 'backend': 'postgres',
435 'pool_size': 20, # Connection pooling
436 'host': 'db.prod.example.com'
437 }
438 }
439 )
440 yield
441 # Shutdown: cleanup
442 await app.state.fsm.close()
444 app = FastAPI(lifespan=lifespan)
446 @app.post("/process")
447 async def process(request: dict):
448 result = await app.state.fsm.process(request)
449 return result
450 ```
452 Batch processing with progress tracking:
454 ```python
455 async def process_with_progress():
456 fsm = AsyncSimpleFSM('pipeline.yaml')
458 # Progress callback
459 def on_progress(current, total):
460 pct = (current / total) * 100
461 print(f"Progress: {current}/{total} ({pct:.1f}%)")
463 try:
464 records = [{'id': i} for i in range(1000)]
465 results = await fsm.process_batch(
466 data=records,
467 batch_size=50,
468 max_workers=10,
469 on_progress=on_progress
470 )
472 successful = sum(1 for r in results if r['success'])
473 print(f"Success rate: {successful}/{len(results)}")
474 finally:
475 await fsm.close()
476 ```
478 Stream processing large files:
480 ```python
481 async def process_large_file():
482 fsm = AsyncSimpleFSM('transform.yaml')
484 try:
485 # Memory-efficient streaming
486 stats = await fsm.process_stream(
487 source='input_100gb.jsonl',
488 sink='output.jsonl',
489 chunk_size=1000,
490 use_streaming=True # Constant memory usage
491 )
493 print(f"Processed: {stats['total_processed']}")
494 print(f"Success: {stats['successful']}")
495 print(f"Failed: {stats['failed']}")
496 print(f"Duration: {stats['duration']:.2f}s")
497 print(f"Throughput: {stats['throughput']:.2f} records/sec")
498 finally:
499 await fsm.close()
500 ```
502 See Also:
503 - :class:`SimpleFSM`: Synchronous wrapper for scripts
504 - :class:`AdvancedFSM`: Advanced API with debugging
505 - :func:`create_async_fsm`: Factory function for creating instances
506 - :mod:`dataknobs_fsm.execution.async_engine`: Async execution engine
507 - :mod:`dataknobs_fsm.resources.manager`: Resource management
508 """
510 def __init__(
511 self,
512 config: str | Path | dict[str, Any],
513 data_mode: DataHandlingMode = DataHandlingMode.COPY,
514 resources: dict[str, Any] | None = None,
515 custom_functions: dict[str, Callable] | None = None
516 ):
517 """Initialize AsyncSimpleFSM from configuration.
519 Args:
520 config: Path to config file or config dictionary
521 data_mode: Default data mode for processing
522 resources: Optional resource configurations
523 custom_functions: Optional custom functions to register
524 """
525 self.data_mode = data_mode
526 self._resources = resources or {}
527 self._custom_functions = custom_functions or {}
529 # Create loader with knowledge of custom functions
530 loader = ConfigLoader()
532 # Tell the loader about registered function names
533 if self._custom_functions:
534 for name in self._custom_functions.keys():
535 loader.add_registered_function(name)
537 # Load configuration
538 if isinstance(config, (str, Path)):
539 self._config = loader.load_from_file(Path(config))
540 else:
541 self._config = loader.load_from_dict(config)
543 # Build FSM with custom functions
544 builder = FSMBuilder()
546 # Register custom functions with the builder
547 for name, func in self._custom_functions.items():
548 builder.register_function(name, func)
550 self._fsm = builder.build(self._config)
552 # Initialize resource manager
553 self._resource_manager = ResourceManager()
554 self._setup_resources()
556 # Create async execution engine
557 self._async_engine = AsyncExecutionEngine(self._fsm)
559 def _setup_resources(self) -> None:
560 """Set up resources from configuration."""
561 # Register resources from config
562 if hasattr(self._config, 'resources'):
563 for resource_config in self._config.resources:
564 try:
565 resource = self._create_resource_provider(resource_config)
566 self._resource_manager.register_provider(resource_config.name, resource)
567 except Exception:
568 # Continue if resource creation fails - this is for simplified API
569 pass
571 # Register additional resources passed to constructor
572 for name, resource_config in self._resources.items():
573 try:
574 # Use ResourceManager factory method
575 self._resource_manager.register_from_dict(name, resource_config)
576 except Exception:
577 # Continue if resource creation fails
578 pass
580 def _create_resource_provider(self, resource_config):
581 """Create a resource provider from ResourceConfig."""
582 # Use the same logic as FSMBuilder
583 from ..config.builder import FSMBuilder
584 builder = FSMBuilder()
585 return builder._create_resource(resource_config)
587 async def process(self, data: dict[str, Any] | Record) -> dict[str, Any]:
588 """Process a single record through the FSM asynchronously.
590 Args:
591 data: Input data to process
593 Returns:
594 Dict containing the processed result
595 """
596 # Convert to Record if needed
597 if isinstance(data, dict):
598 record = Record(data)
599 else:
600 record = data
602 # Create context
603 from ..core.modes import ProcessingMode
604 context = ContextFactory.create_context(
605 fsm=self._fsm,
606 data=record,
607 data_mode=ProcessingMode.SINGLE,
608 resource_manager=self._resource_manager
609 )
611 try:
612 # Execute FSM asynchronously
613 success, result = await self._async_engine.execute(context)
615 # Format result
616 return ResultFormatter.format_single_result(
617 context=context,
618 success=success,
619 result=result
620 )
621 except Exception as e:
622 return ResultFormatter.format_error_result(
623 context=context,
624 error=e
625 )
627 async def process_batch(
628 self,
629 data: list[dict[str, Any] | Record],
630 batch_size: int = 10,
631 max_workers: int = 4,
632 on_progress: Callable | None = None
633 ) -> list[dict[str, Any]]:
634 """Process multiple records in parallel batches asynchronously.
636 Args:
637 data: List of input records to process
638 batch_size: Number of records per batch
639 max_workers: Maximum parallel workers
640 on_progress: Optional callback for progress updates
642 Returns:
643 List of results for each input record
644 """
645 batch_executor = AsyncBatchExecutor(
646 fsm=self._fsm,
647 parallelism=max_workers,
648 batch_size=batch_size,
649 progress_callback=on_progress
650 )
652 # Convert to Records
653 records = []
654 for item in data:
655 if isinstance(item, dict):
656 records.append(Record(item))
657 else:
658 records.append(item)
660 # Execute batch
661 results = await batch_executor.execute_batch(items=records)
663 # Format results
664 formatted_results = []
665 for result in results:
666 if result.success:
667 formatted_results.append({
668 'final_state': result.metadata.get('final_state', 'unknown'),
669 'data': result.result,
670 'path': result.metadata.get('path', []),
671 'success': True,
672 'error': None
673 })
674 else:
675 formatted_results.append({
676 'final_state': result.metadata.get('final_state', None),
677 'data': result.result if result.result else {},
678 'path': result.metadata.get('path', []),
679 'success': False,
680 'error': str(result.error) if result.error else str(result.result)
681 })
683 return formatted_results
685 async def process_stream(
686 self,
687 source: str | AsyncIterator[dict[str, Any]],
688 sink: str | None = None,
689 chunk_size: int = 100,
690 on_progress: Callable | None = None,
691 input_format: str = 'auto',
692 text_field_name: str = 'text',
693 csv_delimiter: str = ',',
694 csv_has_header: bool = True,
695 skip_empty_lines: bool = True,
696 use_streaming: bool = False
697 ) -> dict[str, Any]:
698 """Process a stream of data through the FSM asynchronously.
700 Args:
701 source: Data source (file path or async iterator)
702 sink: Optional output destination
703 chunk_size: Size of processing chunks
704 on_progress: Optional progress callback
705 input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text')
706 text_field_name: Field name for text lines when converting to dict
707 csv_delimiter: CSV delimiter character
708 csv_has_header: Whether CSV file has header row
709 skip_empty_lines: Skip empty lines in text files
710 use_streaming: Use memory-efficient streaming for large files
712 Returns:
713 Dict containing stream processing statistics
714 """
715 # Configure streaming
716 stream_config = CoreStreamConfig(
717 chunk_size=chunk_size,
718 parallelism=4,
719 memory_limit_mb=1024
720 )
722 # Create async stream executor
723 stream_executor = AsyncStreamExecutor(
724 fsm=self._fsm,
725 stream_config=stream_config,
726 progress_callback=on_progress
727 )
729 # Choose between streaming and regular mode
730 if use_streaming and isinstance(source, str):
731 # Use memory-efficient streaming for large files
732 from ..utils.streaming_file_utils import (
733 create_streaming_file_reader,
734 create_streaming_file_writer,
735 )
737 stream_source = create_streaming_file_reader(
738 file_path=source,
739 config=stream_config,
740 input_format=input_format,
741 text_field_name=text_field_name,
742 csv_delimiter=csv_delimiter,
743 csv_has_header=csv_has_header,
744 skip_empty_lines=skip_empty_lines
745 )
747 # Handle sink for streaming mode
748 sink_func = None
749 cleanup_func = None
750 if sink:
751 sink_func, cleanup_func = await create_streaming_file_writer(
752 file_path=sink,
753 config=stream_config
754 )
755 else:
756 # Use regular mode (loads full chunks into memory)
757 from ..utils.file_utils import create_file_reader, create_file_writer
759 # Handle file source
760 if isinstance(source, str):
761 stream_source = create_file_reader(
762 file_path=source,
763 input_format=input_format,
764 text_field_name=text_field_name,
765 csv_delimiter=csv_delimiter,
766 csv_has_header=csv_has_header,
767 skip_empty_lines=skip_empty_lines
768 )
769 else:
770 # Already an async iterator
771 stream_source = source
773 # Handle sink for regular mode
774 sink_func = None
775 cleanup_func = None
776 if sink:
777 sink_func, cleanup_func = create_file_writer(sink)
779 try:
780 # Execute stream using async executor
781 result = await stream_executor.execute_stream(
782 source=stream_source,
783 sink=sink_func,
784 chunk_size=chunk_size
785 )
787 return {
788 'total_processed': result.total_processed,
789 'successful': result.successful,
790 'failed': result.failed,
791 'duration': result.duration,
792 'throughput': result.throughput
793 }
794 finally:
795 # Clean up any resources (e.g., close files)
796 if cleanup_func:
797 if asyncio.iscoroutinefunction(cleanup_func):
798 await cleanup_func()
799 else:
800 cleanup_func()
802 async def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]:
803 """Validate data against FSM's start state schema asynchronously.
805 Args:
806 data: Data to validate
808 Returns:
809 Dict containing validation results
810 """
811 # Convert to Record if needed
812 if isinstance(data, dict):
813 record = Record(data)
814 else:
815 record = data
817 # Get start state
818 start_state = self._fsm.get_start_state()
820 # Validate against schema
821 if start_state.schema:
822 validation_result = start_state.schema.validate(record)
823 return {
824 'valid': validation_result.valid,
825 'errors': validation_result.errors if not validation_result.valid else []
826 }
827 else:
828 return {
829 'valid': True,
830 'errors': []
831 }
833 def get_states(self) -> list[str]:
834 """Get list of all state names in the FSM."""
835 states = []
836 # The FSM has networks, and each network has states
837 for network in self._fsm.networks.values():
838 for state in network.states.values():
839 states.append(state.name)
840 return states
842 def get_resources(self) -> list[str]:
843 """Get list of registered resource names."""
844 return list(self._resource_manager._resources.keys())
846 @property
847 def config(self) -> Any:
848 """Get the FSM configuration object."""
849 return self._config
851 async def close(self) -> None:
852 """Clean up resources and close connections asynchronously."""
853 await self._resource_manager.cleanup()
855 # Alias for consistency with other async libraries
856 aclose = close
859# Factory function for AsyncSimpleFSM
860async def create_async_fsm(
861 config: str | Path | dict[str, Any],
862 custom_functions: dict[str, Callable] | None = None,
863 **kwargs
864) -> AsyncSimpleFSM:
865 """Factory function to create an AsyncSimpleFSM instance.
867 Args:
868 config: Configuration file path or dictionary
869 custom_functions: Optional custom functions to register
870 **kwargs: Additional arguments passed to AsyncSimpleFSM
872 Returns:
873 Configured AsyncSimpleFSM instance
874 """
875 return AsyncSimpleFSM(config, custom_functions=custom_functions, **kwargs)