Coverage for src/dataknobs_fsm/api/simple.py: 21%

125 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1"""Simple synchronous API for FSM operations. 

2 

3This module provides a simplified synchronous interface for common FSM use cases, 

4making it easy to build data transformation pipelines, validation workflows, and 

5processing systems without dealing with async/await complexity. 

6 

7Architecture: 

8 The dataknobs-fsm package provides three API tiers: 

9 

10 1. **SimpleFSM** (This Module): 

11 - Synchronous interface (no async/await) 

12 - Automatic event loop management 

13 - Best for: Scripts, prototypes, simple pipelines 

14 - Trade-off: Thread overhead for event loop 

15 

16 2. **AsyncSimpleFSM** (async_simple.py): 

17 - Async/await interface 

18 - Production-ready for async contexts 

19 - Best for: Web services, concurrent processing, async applications 

20 - Trade-off: Requires async programming knowledge 

21 

22 3. **AdvancedFSM** (advanced.py): 

23 - Full manual control with debugging support 

24 - Step-by-step execution, breakpoints, profiling 

25 - Best for: Complex workflows, debugging, custom execution strategies 

26 - Trade-off: More complex API 

27 

28 **Choosing the Right API:** 

29 

30 Use SimpleFSM if: 

31 - You're building scripts or prototypes 

32 - You want the simplest possible API 

33 - You're working in synchronous code 

34 - You don't need async/await capabilities 

35 

36 Use AsyncSimpleFSM if: 

37 - You're building production services 

38 - Your application is already async 

39 - You need high concurrency 

40 - You want best performance 

41 

42 Use AdvancedFSM if: 

43 - You need debugging capabilities 

44 - You want step-by-step execution 

45 - You need profiling and tracing 

46 - You're building complex workflows with custom logic 

47 

48Data Handling Modes: 

49 FSMs can process data in three modes: 

50 

51 **COPY Mode** (default): 

52 - Deep copies data for each state 

53 - Safe for concurrent processing 

54 - Higher memory usage 

55 - Best for: Production systems, parallel processing 

56 

57 **REFERENCE Mode**: 

58 - Lazy loading with optimistic locking 

59 - Memory-efficient 

60 - Moderate performance 

61 - Best for: Large datasets, memory-constrained environments 

62 

63 **DIRECT Mode**: 

64 - In-place data modification 

65 - Fastest performance 

66 - Not thread-safe 

67 - Best for: Single-threaded pipelines, performance-critical paths 

68 

69Common Workflow Patterns: 

70 This module enables several common patterns: 

71 

72 **Data Transformation Pipeline:** 

73 ```python 

74 from dataknobs_fsm.api.simple import SimpleFSM 

75 

76 # Create FSM for data cleaning 

77 fsm = SimpleFSM('data_pipeline.yaml') 

78 

79 # Process single record 

80 result = fsm.process({ 

81 'text': 'Some input text', 

82 'metadata': {'source': 'user_input'} 

83 }) 

84 print(result['data']) # Transformed output 

85 ``` 

86 

87 **Batch Processing:** 

88 ```python 

89 # Process multiple records in parallel 

90 records = [ 

91 {'text': 'Record 1', 'id': 1}, 

92 {'text': 'Record 2', 'id': 2}, 

93 {'text': 'Record 3', 'id': 3} 

94 ] 

95 results = fsm.process_batch(records, batch_size=10, max_workers=4) 

96 ``` 

97 

98 **File Processing:** 

99 ```python 

100 from dataknobs_fsm.api.simple import process_file 

101 

102 # Process large file with streaming 

103 stats = process_file( 

104 fsm_config='validate.yaml', 

105 input_file='input.jsonl', 

106 output_file='output.jsonl', 

107 chunk_size=1000, 

108 use_streaming=True 

109 ) 

110 print(f"Processed {stats['total_processed']} records in {stats['duration']:.2f}s") 

111 print(f"Throughput: {stats['throughput']:.2f} records/sec") 

112 ``` 

113 

114 **Data Validation:** 

115 ```python 

116 from dataknobs_fsm.api.simple import validate_data 

117 

118 # Validate records against schema 

119 records = [ 

120 {'name': 'Alice', 'age': 30}, 

121 {'name': 'Bob', 'age': 'invalid'}, # Will fail validation 

122 ] 

123 validation_results = validate_data('schema.yaml', records) 

124 for i, result in enumerate(validation_results): 

125 if not result['valid']: 

126 print(f"Record {i} failed: {result['errors']}") 

127 ``` 

128 

129Example: 

130 Complete ETL pipeline using SimpleFSM: 

131 

132 ```python 

133 from dataknobs_fsm.api.simple import SimpleFSM 

134 from dataknobs_fsm.core.data_modes import DataHandlingMode 

135 

136 # Configuration defines states and transitions 

137 config = { 

138 'name': 'data_etl', 

139 'states': [ 

140 { 

141 'name': 'extract', 

142 'type': 'START', 

143 'function': 'extract_data' 

144 }, 

145 { 

146 'name': 'transform', 

147 'type': 'NORMAL', 

148 'function': 'clean_and_normalize' 

149 }, 

150 { 

151 'name': 'load', 

152 'type': 'END', 

153 'function': 'save_to_database' 

154 } 

155 ], 

156 'arcs': [ 

157 {'from': 'extract', 'to': 'transform'}, 

158 {'from': 'transform', 'to': 'load'} 

159 ] 

160 } 

161 

162 # Create FSM with custom functions 

163 def extract_data(data): 

164 # Extract logic 

165 return {'records': load_from_source(data['source'])} 

166 

167 def clean_and_normalize(data): 

168 # Transform logic 

169 records = [normalize(r) for r in data['records']] 

170 return {'records': records} 

171 

172 def save_to_database(data): 

173 # Load logic 

174 db.bulk_insert(data['records']) 

175 return {'status': 'success', 'count': len(data['records'])} 

176 

177 # Initialize FSM 

178 fsm = SimpleFSM( 

179 config=config, 

180 data_mode=DataHandlingMode.COPY, 

181 custom_functions={ 

182 'extract_data': extract_data, 

183 'clean_and_normalize': clean_and_normalize, 

184 'save_to_database': save_to_database 

185 } 

186 ) 

187 

188 # Process data 

189 result = fsm.process({'source': 'input.csv'}) 

190 print(f"ETL completed: {result['data']['status']}") 

191 print(f"Records processed: {result['data']['count']}") 

192 print(f"States traversed: {' -> '.join(result['path'])}") 

193 

194 # Clean up 

195 fsm.close() 

196 ``` 

197 

198See Also: 

199 - :class:`AsyncSimpleFSM`: Async version for production applications 

200 - :class:`AdvancedFSM`: Advanced API with debugging and profiling 

201 - :class:`DataHandlingMode`: Data processing mode options 

202 - :mod:`dataknobs_fsm.patterns.etl`: ETL workflow patterns 

203 - :mod:`dataknobs_fsm.patterns.file_processing`: File processing patterns 

204""" 

205 

206import asyncio 

207import threading 

208from collections.abc import Callable 

209from pathlib import Path 

210from typing import Any 

211 

212from dataknobs_data import Record 

213 

214from ..core.data_modes import DataHandlingMode 

215from .async_simple import AsyncSimpleFSM 

216 

217 

218class SimpleFSM: 

219 """Synchronous FSM interface for simple workflows. 

220 

221 This class provides a purely synchronous API for FSM operations, 

222 internally using AsyncSimpleFSM with a dedicated event loop managed 

223 automatically in a background thread. 

224 

225 SimpleFSM is designed for ease of use in scripts, prototypes, and simple 

226 pipelines where async/await complexity is not desired. It handles all 

227 async execution transparently, providing a familiar synchronous interface. 

228 

229 Attributes: 

230 data_mode (DataHandlingMode): Data processing mode (COPY/REFERENCE/DIRECT) 

231 _async_fsm (AsyncSimpleFSM): Internal async FSM implementation 

232 _fsm (FSM): Core FSM engine 

233 _resource_manager (ResourceManager): Resource lifecycle manager 

234 _loop (AbstractEventLoop): Dedicated event loop for async operations 

235 _loop_thread (Thread): Background thread running the event loop 

236 

237 Methods: 

238 process: Process a single record through the FSM 

239 process_batch: Process multiple records in parallel batches 

240 process_stream: Process a stream of data from file or iterator 

241 validate: Validate data against FSM's start state schema 

242 get_states: List all state names in the FSM 

243 get_resources: List all registered resource names 

244 close: Clean up resources and close connections 

245 

246 Use Cases: 

247 **Data Transformation:** 

248 Transform data through a pipeline of state functions. Each state receives 

249 the output of the previous state, enabling sequential transformations. 

250 

251 **Data Validation:** 

252 Validate data against schemas defined in state configurations. States can 

253 enforce data quality rules and reject invalid records. 

254 

255 **File Processing:** 

256 Process large files line-by-line or in chunks using `process_stream()`. 

257 Supports automatic format detection (JSON, JSONL, CSV, text). 

258 

259 **Batch Processing:** 

260 Process multiple records in parallel using `process_batch()`. Configurable 

261 batch size and worker count for optimal throughput. 

262 

263 **ETL Workflows:** 

264 Extract-Transform-Load pipelines where data flows through extraction, 

265 transformation, and loading states with error handling. 

266 

267 Note: 

268 **Thread Safety:** 

269 SimpleFSM manages its own event loop in a background thread. While the 

270 synchronous API is thread-safe, concurrent calls will serialize due to 

271 the single event loop. For true concurrent processing, use AsyncSimpleFSM 

272 with multiple event loops or process_batch() with max_workers > 1. 

273 

274 **Resource Management:** 

275 Always call close() when done to properly release resources. Use context 

276 managers (with statement) when available in client code, or ensure close() 

277 is called in a finally block. 

278 

279 **Data Mode Selection:** 

280 - Use COPY (default) for production: safe, predictable, memory-intensive 

281 - Use REFERENCE for large datasets: memory-efficient, moderate overhead 

282 - Use DIRECT for performance: fastest, but not thread-safe 

283 

284 **Error Handling:** 

285 The process() method returns a dict with 'success' and 'error' keys rather 

286 than raising exceptions. This allows for graceful error handling in batch 

287 processing scenarios. 

288 

289 Example: 

290 Basic usage with configuration file: 

291 

292 ```python 

293 from dataknobs_fsm.api.simple import SimpleFSM 

294 

295 # Create FSM from YAML config 

296 fsm = SimpleFSM('pipeline.yaml') 

297 

298 # Process single record 

299 result = fsm.process({ 

300 'text': 'Input text to process', 

301 'metadata': {'source': 'user'} 

302 }) 

303 

304 if result['success']: 

305 print(f"Result: {result['data']}") 

306 print(f"Path: {' -> '.join(result['path'])}") 

307 else: 

308 print(f"Error: {result['error']}") 

309 

310 # Clean up 

311 fsm.close() 

312 ``` 

313 

314 With custom functions and resources: 

315 

316 ```python 

317 from dataknobs_fsm.api.simple import SimpleFSM 

318 from dataknobs_fsm.core.data_modes import DataHandlingMode 

319 

320 # Define custom state functions 

321 def validate(data): 

322 if 'required_field' not in data: 

323 raise ValueError("Missing required field") 

324 return data 

325 

326 def transform(data): 

327 from datetime import datetime 

328 data['processed'] = True 

329 data['timestamp'] = datetime.now().isoformat() 

330 return data 

331 

332 # Create FSM with config dict 

333 config = { 

334 'name': 'validation_pipeline', 

335 'states': [ 

336 {'name': 'validate', 'type': 'START', 'function': 'validate'}, 

337 {'name': 'transform', 'type': 'END', 'function': 'transform'} 

338 ], 

339 'arcs': [ 

340 {'from': 'validate', 'to': 'transform'} 

341 ] 

342 } 

343 

344 # Initialize with custom functions and resources 

345 fsm = SimpleFSM( 

346 config=config, 

347 data_mode=DataHandlingMode.COPY, 

348 resources={ 

349 'database': { 

350 'type': 'DATABASE', 

351 'backend': 'memory' 

352 } 

353 }, 

354 custom_functions={ 

355 'validate': validate, 

356 'transform': transform 

357 } 

358 ) 

359 

360 # Process data 

361 result = fsm.process({'required_field': 'value'}) 

362 print(f"Success: {result['success']}") 

363 

364 fsm.close() 

365 ``` 

366 

367 Batch processing with progress callback: 

368 

369 ```python 

370 # Define progress callback 

371 def on_progress(current, total): 

372 pct = (current / total) * 100 

373 print(f"Progress: {current}/{total} ({pct:.1f}%)") 

374 

375 # Process batch 

376 records = [{'id': i, 'text': f'Record {i}'} for i in range(100)] 

377 results = fsm.process_batch( 

378 data=records, 

379 batch_size=10, 

380 max_workers=4, 

381 on_progress=on_progress 

382 ) 

383 

384 # Check results 

385 successful = sum(1 for r in results if r['success']) 

386 print(f"Processed {successful}/{len(records)} successfully") 

387 ``` 

388 

389 See Also: 

390 - :class:`AsyncSimpleFSM`: Async version for production applications 

391 - :class:`AdvancedFSM`: Full control with debugging capabilities 

392 - :class:`DataHandlingMode`: Data processing mode options 

393 - :func:`process_file`: Convenience function for file processing 

394 - :func:`batch_process`: Convenience function for batch processing 

395 - :func:`validate_data`: Convenience function for data validation 

396 """ 

397 

398 def __init__( 

399 self, 

400 config: str | Path | dict[str, Any], 

401 data_mode: DataHandlingMode = DataHandlingMode.COPY, 

402 resources: dict[str, Any] | None = None, 

403 custom_functions: dict[str, Callable] | None = None 

404 ): 

405 """Initialize SimpleFSM from configuration. 

406 

407 Args: 

408 config: FSM configuration. Can be: 

409 - Path to YAML/JSON config file (str or Path) 

410 - Dictionary containing config (inline configuration) 

411 Must define states, arcs, and optionally resources. 

412 data_mode: Data handling mode controlling how data is passed between 

413 states. Options: 

414 - DataHandlingMode.COPY (default): Deep copy for safety 

415 - DataHandlingMode.REFERENCE: Lazy loading with locking 

416 - DataHandlingMode.DIRECT: In-place modification (fastest) 

417 resources: Optional resource configurations. Dict mapping resource 

418 names to configuration dicts. Each config must have a 'type' key 

419 (DATABASE, FILESYSTEM, HTTP, etc.) and type-specific parameters. 

420 Example: {'db': {'type': 'DATABASE', 'backend': 'postgres', ...}} 

421 custom_functions: Optional custom state functions. Dict mapping function 

422 names to callables. Functions receive data dict and return data dict. 

423 Function names must match 'function' fields in state definitions. 

424 Example: {'my_func': lambda data: {'result': data['input'] * 2}} 

425 

426 Example: 

427 From configuration file: 

428 

429 ```python 

430 from dataknobs_fsm.api.simple import SimpleFSM 

431 

432 # Load from YAML file 

433 fsm = SimpleFSM('config.yaml') 

434 ``` 

435 

436 With inline configuration: 

437 

438 ```python 

439 config = { 

440 'name': 'simple_pipeline', 

441 'states': [ 

442 {'name': 'start', 'type': 'START'}, 

443 {'name': 'process', 'type': 'NORMAL', 'function': 'transform'}, 

444 {'name': 'end', 'type': 'END'} 

445 ], 

446 'arcs': [ 

447 {'from': 'start', 'to': 'process'}, 

448 {'from': 'process', 'to': 'end'} 

449 ] 

450 } 

451 

452 def transform(data): 

453 data['transformed'] = True 

454 return data 

455 

456 fsm = SimpleFSM( 

457 config=config, 

458 custom_functions={'transform': transform} 

459 ) 

460 ``` 

461 

462 With data mode selection: 

463 

464 ```python 

465 from dataknobs_fsm.core.data_modes import DataHandlingMode 

466 

467 # Use COPY for safe concurrent processing 

468 fsm_safe = SimpleFSM('config.yaml', data_mode=DataHandlingMode.COPY) 

469 

470 # Use REFERENCE for memory efficiency 

471 fsm_efficient = SimpleFSM('config.yaml', data_mode=DataHandlingMode.REFERENCE) 

472 

473 # Use DIRECT for maximum performance (single-threaded only) 

474 fsm_fast = SimpleFSM('config.yaml', data_mode=DataHandlingMode.DIRECT) 

475 ``` 

476 

477 With resources: 

478 

479 ```python 

480 resources = { 

481 'database': { 

482 'type': 'DATABASE', 

483 'backend': 'postgres', 

484 'host': 'localhost', 

485 'database': 'mydb', 

486 'user': 'admin', 

487 'password': 'secret' 

488 }, 

489 'http_client': { 

490 'type': 'HTTP', 

491 'base_url': 'https://api.example.com', 

492 'timeout': 30 

493 } 

494 } 

495 

496 fsm = SimpleFSM('config.yaml', resources=resources) 

497 ``` 

498 """ 

499 # Store data_mode for compatibility 

500 self.data_mode = data_mode 

501 

502 # Create the async FSM 

503 self._async_fsm = AsyncSimpleFSM( 

504 config=config, 

505 data_mode=data_mode, 

506 resources=resources, 

507 custom_functions=custom_functions 

508 ) 

509 

510 # Expose internal attributes for compatibility 

511 self._fsm = self._async_fsm._fsm 

512 self._resource_manager = self._async_fsm._resource_manager 

513 self._async_engine = self._async_fsm._async_engine 

514 

515 # Create synchronous engine for compatibility 

516 from ..execution.engine import ExecutionEngine 

517 self._engine = ExecutionEngine(self._fsm) 

518 

519 # Create a dedicated event loop for sync operations 

520 self._loop: asyncio.AbstractEventLoop | None = None 

521 self._loop_thread: threading.Thread | None = None 

522 self._setup_event_loop() 

523 

524 def _setup_event_loop(self) -> None: 

525 """Set up a dedicated event loop in a separate thread.""" 

526 self._loop = asyncio.new_event_loop() 

527 

528 def run_loop() -> None: 

529 asyncio.set_event_loop(self._loop) 

530 self._loop.run_forever() 

531 

532 self._loop_thread = threading.Thread(target=run_loop, daemon=True) 

533 self._loop_thread.start() 

534 

535 def _run_async(self, coro: Any) -> Any: 

536 """Run an async operation in the dedicated event loop. 

537 

538 Args: 

539 coro: Coroutine to run 

540 

541 Returns: 

542 The result of the coroutine 

543 """ 

544 if not self._loop or not self._loop.is_running(): 

545 self._setup_event_loop() 

546 

547 if self._loop is None: 

548 raise RuntimeError("Failed to setup event loop") 

549 

550 future = asyncio.run_coroutine_threadsafe(coro, self._loop) 

551 return future.result() 

552 

553 def process( 

554 self, 

555 data: dict[str, Any] | Record, 

556 initial_state: str | None = None, 

557 timeout: float | None = None 

558 ) -> dict[str, Any]: 

559 """Process a single record through the FSM synchronously. 

560 

561 Args: 

562 data: Input data to process 

563 initial_state: Optional starting state (defaults to FSM start state) 

564 timeout: Optional timeout in seconds 

565 

566 Returns: 

567 Dict containing the processed result with fields: 

568 - final_state: Name of the final state reached 

569 - data: The transformed data 

570 - path: List of states traversed 

571 - success: Whether processing succeeded 

572 - error: Any error message (None if successful) 

573 """ 

574 # Create the coroutine with the async process method 

575 async def _process(): 

576 # Import here to avoid circular dependency 

577 from ..core.context_factory import ContextFactory 

578 from ..core.modes import ProcessingMode 

579 from ..core.result_formatter import ResultFormatter 

580 

581 # Convert to Record if needed 

582 if isinstance(data, dict): 

583 from dataknobs_data import Record 

584 record = Record(data) 

585 else: 

586 record = data 

587 

588 # Create context 

589 context = ContextFactory.create_context( 

590 fsm=self._fsm, 

591 data=record, 

592 initial_state=initial_state, 

593 data_mode=ProcessingMode.SINGLE, 

594 resource_manager=self._resource_manager 

595 ) 

596 

597 try: 

598 # Execute FSM asynchronously 

599 success, result = await self._async_engine.execute(context) 

600 

601 # Format result 

602 return ResultFormatter.format_single_result( 

603 context=context, 

604 success=success, 

605 result=result 

606 ) 

607 except asyncio.TimeoutError: 

608 # Return error result instead of raising 

609 return ResultFormatter.format_error_result( 

610 context=context, 

611 error=TimeoutError(f"FSM execution exceeded timeout of {timeout} seconds") 

612 ) 

613 except Exception as e: 

614 return ResultFormatter.format_error_result( 

615 context=context, 

616 error=e 

617 ) 

618 

619 if timeout: 

620 # Use threading for timeout support 

621 import concurrent.futures 

622 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: 

623 future = executor.submit(self._run_async, _process()) 

624 try: 

625 return future.result(timeout=timeout) 

626 except concurrent.futures.TimeoutError: 

627 future.cancel() 

628 # Return an error result instead of raising 

629 return { 

630 'success': False, 

631 'error': f"FSM execution exceeded timeout of {timeout} seconds", 

632 'final_state': None, 

633 'data': data if isinstance(data, dict) else data.data, 

634 'path': [] 

635 } 

636 else: 

637 return self._run_async(_process()) 

638 

639 def process_batch( 

640 self, 

641 data: list[dict[str, Any] | Record], 

642 batch_size: int = 10, 

643 max_workers: int = 4, 

644 on_progress: Callable | None = None 

645 ) -> list[dict[str, Any]]: 

646 """Process multiple records in parallel batches synchronously. 

647 

648 Args: 

649 data: List of input records to process 

650 batch_size: Number of records per batch 

651 max_workers: Maximum parallel workers 

652 on_progress: Optional callback for progress updates 

653 

654 Returns: 

655 List of results for each input record 

656 """ 

657 return self._run_async( 

658 self._async_fsm.process_batch( 

659 data=data, 

660 batch_size=batch_size, 

661 max_workers=max_workers, 

662 on_progress=on_progress 

663 ) 

664 ) 

665 

666 def process_stream( 

667 self, 

668 source: str | Any, 

669 sink: str | None = None, 

670 chunk_size: int = 100, 

671 on_progress: Callable | None = None, 

672 input_format: str = 'auto', 

673 text_field_name: str = 'text', 

674 csv_delimiter: str = ',', 

675 csv_has_header: bool = True, 

676 skip_empty_lines: bool = True, 

677 use_streaming: bool = False 

678 ) -> dict[str, Any]: 

679 """Process a stream of data through the FSM synchronously. 

680 

681 Args: 

682 source: Data source file path or async iterator 

683 sink: Optional output destination 

684 chunk_size: Size of processing chunks 

685 on_progress: Optional progress callback 

686 input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text') 

687 text_field_name: Field name for text lines when converting to dict 

688 csv_delimiter: CSV delimiter character 

689 csv_has_header: Whether CSV file has header row 

690 skip_empty_lines: Skip empty lines in text files 

691 use_streaming: Use memory-efficient streaming for large files 

692 

693 Returns: 

694 Dict containing stream processing statistics 

695 """ 

696 # If source is a string (file path), use the async version directly 

697 if isinstance(source, str): 

698 return self._run_async( 

699 self._async_fsm.process_stream( 

700 source=source, 

701 sink=sink, 

702 chunk_size=chunk_size, 

703 on_progress=on_progress, 

704 input_format=input_format, 

705 text_field_name=text_field_name, 

706 csv_delimiter=csv_delimiter, 

707 csv_has_header=csv_has_header, 

708 skip_empty_lines=skip_empty_lines, 

709 use_streaming=use_streaming 

710 ) 

711 ) 

712 else: 

713 # Source is an async iterator, need to handle it properly 

714 async def _process(): 

715 return await self._async_fsm.process_stream( 

716 source=source, 

717 sink=sink, 

718 chunk_size=chunk_size, 

719 on_progress=on_progress, 

720 input_format=input_format, 

721 text_field_name=text_field_name, 

722 csv_delimiter=csv_delimiter, 

723 csv_has_header=csv_has_header, 

724 skip_empty_lines=skip_empty_lines, 

725 use_streaming=use_streaming 

726 ) 

727 return self._run_async(_process()) 

728 

729 def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]: 

730 """Validate data against FSM's start state schema synchronously. 

731 

732 Args: 

733 data: Data to validate 

734 

735 Returns: 

736 Dict containing validation results 

737 """ 

738 return self._run_async(self._async_fsm.validate(data)) 

739 

740 def get_states(self) -> list[str]: 

741 """Get list of all state names in the FSM.""" 

742 return self._async_fsm.get_states() 

743 

744 def get_resources(self) -> list[str]: 

745 """Get list of registered resource names.""" 

746 return self._async_fsm.get_resources() 

747 

748 @property 

749 def config(self) -> Any: 

750 """Get the FSM configuration object.""" 

751 return self._async_fsm._config 

752 

753 def close(self) -> None: 

754 """Clean up resources and close connections synchronously.""" 

755 self._run_async(self._async_fsm.close()) 

756 

757 # Shut down the event loop 

758 if self._loop and self._loop.is_running(): 

759 self._loop.call_soon_threadsafe(self._loop.stop) 

760 if self._loop_thread and self._loop_thread.is_alive(): 

761 self._loop_thread.join(timeout=1.0) 

762 

763 async def aclose(self) -> None: 

764 """Async version of close for use in async contexts.""" 

765 await self._async_fsm.close() 

766 

767 

768def create_fsm( 

769 config: str | Path | dict[str, Any], 

770 custom_functions: dict[str, Callable] | None = None, 

771 **kwargs 

772) -> SimpleFSM: 

773 """Factory function to create a SimpleFSM instance. 

774 

775 Args: 

776 config: Configuration file path or dictionary 

777 custom_functions: Optional custom functions to register 

778 **kwargs: Additional arguments passed to SimpleFSM 

779 

780 Returns: 

781 Configured SimpleFSM instance 

782 """ 

783 return SimpleFSM(config, custom_functions=custom_functions, **kwargs) 

784 

785 

786# Convenience functions for common operations 

787 

788def process_file( 

789 fsm_config: str | Path | dict[str, Any], 

790 input_file: str, 

791 output_file: str | None = None, 

792 input_format: str = 'auto', 

793 chunk_size: int = 1000, 

794 timeout: float | None = None, 

795 text_field_name: str = 'text', 

796 csv_delimiter: str = ',', 

797 csv_has_header: bool = True, 

798 skip_empty_lines: bool = True, 

799 use_streaming: bool = False 

800) -> dict[str, Any]: 

801 """Process a file through an FSM with automatic format detection. 

802 

803 Args: 

804 fsm_config: FSM configuration 

805 input_file: Path to input file 

806 output_file: Optional output file path (format auto-detected from extension) 

807 input_format: Input format ('auto', 'jsonl', 'json', 'csv', 'text') 

808 chunk_size: Processing chunk size 

809 timeout: Optional timeout in seconds for processing 

810 text_field_name: Field name for text lines when converting to dict 

811 csv_delimiter: CSV delimiter character 

812 csv_has_header: Whether CSV file has header row 

813 skip_empty_lines: Skip empty lines in text files 

814 use_streaming: Use memory-efficient streaming for large files 

815 

816 Returns: 

817 Processing statistics 

818 

819 Examples: 

820 # Process plain text file 

821 results = process_file('config.yaml', 'input.txt', 'output.jsonl') 

822 

823 # Process large CSV file with streaming 

824 results = process_file('config.yaml', 'large_data.csv', 'results.json', use_streaming=True) 

825 

826 # Process with custom text field name 

827 results = process_file('config.yaml', 'input.txt', text_field_name='content') 

828 """ 

829 fsm = create_fsm(fsm_config) 

830 

831 try: 

832 if timeout: 

833 # Use threading timeout 

834 import concurrent.futures 

835 

836 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: 

837 future = executor.submit( 

838 fsm.process_stream, 

839 source=input_file, 

840 sink=output_file, 

841 chunk_size=chunk_size, 

842 input_format=input_format, 

843 text_field_name=text_field_name, 

844 csv_delimiter=csv_delimiter, 

845 csv_has_header=csv_has_header, 

846 skip_empty_lines=skip_empty_lines, 

847 use_streaming=use_streaming 

848 ) 

849 try: 

850 result = future.result(timeout=timeout) 

851 except concurrent.futures.TimeoutError as e: 

852 future.cancel() 

853 raise TimeoutError(f"File processing exceeded timeout of {timeout} seconds") from e 

854 else: 

855 result = fsm.process_stream( 

856 source=input_file, 

857 sink=output_file, 

858 chunk_size=chunk_size, 

859 input_format=input_format, 

860 text_field_name=text_field_name, 

861 csv_delimiter=csv_delimiter, 

862 csv_has_header=csv_has_header, 

863 skip_empty_lines=skip_empty_lines, 

864 use_streaming=use_streaming 

865 ) 

866 return result 

867 finally: 

868 fsm.close() 

869 

870 

871def validate_data( 

872 fsm_config: str | Path | dict[str, Any], 

873 data: list[dict[str, Any]] 

874) -> list[dict[str, Any]]: 

875 """Validate multiple data records against FSM schema. 

876 

877 Args: 

878 fsm_config: FSM configuration 

879 data: List of data records to validate 

880 

881 Returns: 

882 List of validation results 

883 """ 

884 fsm = create_fsm(fsm_config) 

885 

886 try: 

887 results = [] 

888 for record in data: 

889 results.append(fsm.validate(record)) 

890 return results 

891 finally: 

892 fsm.close() 

893 

894 

895def batch_process( 

896 fsm_config: str | Path | dict[str, Any], 

897 data: list[dict[str, Any] | Record], 

898 batch_size: int = 10, 

899 max_workers: int = 4, 

900 timeout: float | None = None 

901) -> list[dict[str, Any]]: 

902 """Process multiple records in parallel. 

903 

904 Args: 

905 fsm_config: FSM configuration 

906 data: List of input records 

907 batch_size: Batch size for processing 

908 max_workers: Maximum parallel workers 

909 timeout: Optional timeout in seconds for entire batch processing 

910 

911 Returns: 

912 List of processing results 

913 """ 

914 fsm = create_fsm(fsm_config) 

915 

916 try: 

917 if timeout: 

918 # Use threading timeout for batch processing 

919 import concurrent.futures 

920 

921 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: 

922 future = executor.submit( 

923 fsm.process_batch, 

924 data=data, 

925 batch_size=batch_size, 

926 max_workers=max_workers 

927 ) 

928 try: 

929 return future.result(timeout=timeout) 

930 except concurrent.futures.TimeoutError as e: 

931 future.cancel() 

932 raise TimeoutError(f"Batch processing exceeded timeout of {timeout} seconds") from e 

933 else: 

934 return fsm.process_batch( 

935 data=data, 

936 batch_size=batch_size, 

937 max_workers=max_workers 

938 ) 

939 finally: 

940 fsm.close()