Coverage for src/dataknobs_fsm/api/simple.py: 21%
125 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1"""Simple synchronous API for FSM operations.
3This module provides a simplified synchronous interface for common FSM use cases,
4making it easy to build data transformation pipelines, validation workflows, and
5processing systems without dealing with async/await complexity.
7Architecture:
8 The dataknobs-fsm package provides three API tiers:
10 1. **SimpleFSM** (This Module):
11 - Synchronous interface (no async/await)
12 - Automatic event loop management
13 - Best for: Scripts, prototypes, simple pipelines
14 - Trade-off: Thread overhead for event loop
16 2. **AsyncSimpleFSM** (async_simple.py):
17 - Async/await interface
18 - Production-ready for async contexts
19 - Best for: Web services, concurrent processing, async applications
20 - Trade-off: Requires async programming knowledge
22 3. **AdvancedFSM** (advanced.py):
23 - Full manual control with debugging support
24 - Step-by-step execution, breakpoints, profiling
25 - Best for: Complex workflows, debugging, custom execution strategies
26 - Trade-off: More complex API
28 **Choosing the Right API:**
30 Use SimpleFSM if:
31 - You're building scripts or prototypes
32 - You want the simplest possible API
33 - You're working in synchronous code
34 - You don't need async/await capabilities
36 Use AsyncSimpleFSM if:
37 - You're building production services
38 - Your application is already async
39 - You need high concurrency
40 - You want best performance
42 Use AdvancedFSM if:
43 - You need debugging capabilities
44 - You want step-by-step execution
45 - You need profiling and tracing
46 - You're building complex workflows with custom logic
48Data Handling Modes:
49 FSMs can process data in three modes:
51 **COPY Mode** (default):
52 - Deep copies data for each state
53 - Safe for concurrent processing
54 - Higher memory usage
55 - Best for: Production systems, parallel processing
57 **REFERENCE Mode**:
58 - Lazy loading with optimistic locking
59 - Memory-efficient
60 - Moderate performance
61 - Best for: Large datasets, memory-constrained environments
63 **DIRECT Mode**:
64 - In-place data modification
65 - Fastest performance
66 - Not thread-safe
67 - Best for: Single-threaded pipelines, performance-critical paths
69Common Workflow Patterns:
70 This module enables several common patterns:
72 **Data Transformation Pipeline:**
73 ```python
74 from dataknobs_fsm.api.simple import SimpleFSM
76 # Create FSM for data cleaning
77 fsm = SimpleFSM('data_pipeline.yaml')
79 # Process single record
80 result = fsm.process({
81 'text': 'Some input text',
82 'metadata': {'source': 'user_input'}
83 })
84 print(result['data']) # Transformed output
85 ```
87 **Batch Processing:**
88 ```python
89 # Process multiple records in parallel
90 records = [
91 {'text': 'Record 1', 'id': 1},
92 {'text': 'Record 2', 'id': 2},
93 {'text': 'Record 3', 'id': 3}
94 ]
95 results = fsm.process_batch(records, batch_size=10, max_workers=4)
96 ```
98 **File Processing:**
99 ```python
100 from dataknobs_fsm.api.simple import process_file
102 # Process large file with streaming
103 stats = process_file(
104 fsm_config='validate.yaml',
105 input_file='input.jsonl',
106 output_file='output.jsonl',
107 chunk_size=1000,
108 use_streaming=True
109 )
110 print(f"Processed {stats['total_processed']} records in {stats['duration']:.2f}s")
111 print(f"Throughput: {stats['throughput']:.2f} records/sec")
112 ```
114 **Data Validation:**
115 ```python
116 from dataknobs_fsm.api.simple import validate_data
118 # Validate records against schema
119 records = [
120 {'name': 'Alice', 'age': 30},
121 {'name': 'Bob', 'age': 'invalid'}, # Will fail validation
122 ]
123 validation_results = validate_data('schema.yaml', records)
124 for i, result in enumerate(validation_results):
125 if not result['valid']:
126 print(f"Record {i} failed: {result['errors']}")
127 ```
129Example:
130 Complete ETL pipeline using SimpleFSM:
132 ```python
133 from dataknobs_fsm.api.simple import SimpleFSM
134 from dataknobs_fsm.core.data_modes import DataHandlingMode
136 # Configuration defines states and transitions
137 config = {
138 'name': 'data_etl',
139 'states': [
140 {
141 'name': 'extract',
142 'type': 'START',
143 'function': 'extract_data'
144 },
145 {
146 'name': 'transform',
147 'type': 'NORMAL',
148 'function': 'clean_and_normalize'
149 },
150 {
151 'name': 'load',
152 'type': 'END',
153 'function': 'save_to_database'
154 }
155 ],
156 'arcs': [
157 {'from': 'extract', 'to': 'transform'},
158 {'from': 'transform', 'to': 'load'}
159 ]
160 }
162 # Create FSM with custom functions
163 def extract_data(data):
164 # Extract logic
165 return {'records': load_from_source(data['source'])}
167 def clean_and_normalize(data):
168 # Transform logic
169 records = [normalize(r) for r in data['records']]
170 return {'records': records}
172 def save_to_database(data):
173 # Load logic
174 db.bulk_insert(data['records'])
175 return {'status': 'success', 'count': len(data['records'])}
177 # Initialize FSM
178 fsm = SimpleFSM(
179 config=config,
180 data_mode=DataHandlingMode.COPY,
181 custom_functions={
182 'extract_data': extract_data,
183 'clean_and_normalize': clean_and_normalize,
184 'save_to_database': save_to_database
185 }
186 )
188 # Process data
189 result = fsm.process({'source': 'input.csv'})
190 print(f"ETL completed: {result['data']['status']}")
191 print(f"Records processed: {result['data']['count']}")
192 print(f"States traversed: {' -> '.join(result['path'])}")
194 # Clean up
195 fsm.close()
196 ```
198See Also:
199 - :class:`AsyncSimpleFSM`: Async version for production applications
200 - :class:`AdvancedFSM`: Advanced API with debugging and profiling
201 - :class:`DataHandlingMode`: Data processing mode options
202 - :mod:`dataknobs_fsm.patterns.etl`: ETL workflow patterns
203 - :mod:`dataknobs_fsm.patterns.file_processing`: File processing patterns
204"""
206import asyncio
207import threading
208from collections.abc import Callable
209from pathlib import Path
210from typing import Any
212from dataknobs_data import Record
214from ..core.data_modes import DataHandlingMode
215from .async_simple import AsyncSimpleFSM
218class SimpleFSM:
219 """Synchronous FSM interface for simple workflows.
221 This class provides a purely synchronous API for FSM operations,
222 internally using AsyncSimpleFSM with a dedicated event loop managed
223 automatically in a background thread.
225 SimpleFSM is designed for ease of use in scripts, prototypes, and simple
226 pipelines where async/await complexity is not desired. It handles all
227 async execution transparently, providing a familiar synchronous interface.
229 Attributes:
230 data_mode (DataHandlingMode): Data processing mode (COPY/REFERENCE/DIRECT)
231 _async_fsm (AsyncSimpleFSM): Internal async FSM implementation
232 _fsm (FSM): Core FSM engine
233 _resource_manager (ResourceManager): Resource lifecycle manager
234 _loop (AbstractEventLoop): Dedicated event loop for async operations
235 _loop_thread (Thread): Background thread running the event loop
237 Methods:
238 process: Process a single record through the FSM
239 process_batch: Process multiple records in parallel batches
240 process_stream: Process a stream of data from file or iterator
241 validate: Validate data against FSM's start state schema
242 get_states: List all state names in the FSM
243 get_resources: List all registered resource names
244 close: Clean up resources and close connections
246 Use Cases:
247 **Data Transformation:**
248 Transform data through a pipeline of state functions. Each state receives
249 the output of the previous state, enabling sequential transformations.
251 **Data Validation:**
252 Validate data against schemas defined in state configurations. States can
253 enforce data quality rules and reject invalid records.
255 **File Processing:**
256 Process large files line-by-line or in chunks using `process_stream()`.
257 Supports automatic format detection (JSON, JSONL, CSV, text).
259 **Batch Processing:**
260 Process multiple records in parallel using `process_batch()`. Configurable
261 batch size and worker count for optimal throughput.
263 **ETL Workflows:**
264 Extract-Transform-Load pipelines where data flows through extraction,
265 transformation, and loading states with error handling.
267 Note:
268 **Thread Safety:**
269 SimpleFSM manages its own event loop in a background thread. While the
270 synchronous API is thread-safe, concurrent calls will serialize due to
271 the single event loop. For true concurrent processing, use AsyncSimpleFSM
272 with multiple event loops or process_batch() with max_workers > 1.
274 **Resource Management:**
275 Always call close() when done to properly release resources. Use context
276 managers (with statement) when available in client code, or ensure close()
277 is called in a finally block.
279 **Data Mode Selection:**
280 - Use COPY (default) for production: safe, predictable, memory-intensive
281 - Use REFERENCE for large datasets: memory-efficient, moderate overhead
282 - Use DIRECT for performance: fastest, but not thread-safe
284 **Error Handling:**
285 The process() method returns a dict with 'success' and 'error' keys rather
286 than raising exceptions. This allows for graceful error handling in batch
287 processing scenarios.
289 Example:
290 Basic usage with configuration file:
292 ```python
293 from dataknobs_fsm.api.simple import SimpleFSM
295 # Create FSM from YAML config
296 fsm = SimpleFSM('pipeline.yaml')
298 # Process single record
299 result = fsm.process({
300 'text': 'Input text to process',
301 'metadata': {'source': 'user'}
302 })
304 if result['success']:
305 print(f"Result: {result['data']}")
306 print(f"Path: {' -> '.join(result['path'])}")
307 else:
308 print(f"Error: {result['error']}")
310 # Clean up
311 fsm.close()
312 ```
314 With custom functions and resources:
316 ```python
317 from dataknobs_fsm.api.simple import SimpleFSM
318 from dataknobs_fsm.core.data_modes import DataHandlingMode
320 # Define custom state functions
321 def validate(data):
322 if 'required_field' not in data:
323 raise ValueError("Missing required field")
324 return data
326 def transform(data):
327 from datetime import datetime
328 data['processed'] = True
329 data['timestamp'] = datetime.now().isoformat()
330 return data
332 # Create FSM with config dict
333 config = {
334 'name': 'validation_pipeline',
335 'states': [
336 {'name': 'validate', 'type': 'START', 'function': 'validate'},
337 {'name': 'transform', 'type': 'END', 'function': 'transform'}
338 ],
339 'arcs': [
340 {'from': 'validate', 'to': 'transform'}
341 ]
342 }
344 # Initialize with custom functions and resources
345 fsm = SimpleFSM(
346 config=config,
347 data_mode=DataHandlingMode.COPY,
348 resources={
349 'database': {
350 'type': 'DATABASE',
351 'backend': 'memory'
352 }
353 },
354 custom_functions={
355 'validate': validate,
356 'transform': transform
357 }
358 )
360 # Process data
361 result = fsm.process({'required_field': 'value'})
362 print(f"Success: {result['success']}")
364 fsm.close()
365 ```
367 Batch processing with progress callback:
369 ```python
370 # Define progress callback
371 def on_progress(current, total):
372 pct = (current / total) * 100
373 print(f"Progress: {current}/{total} ({pct:.1f}%)")
375 # Process batch
376 records = [{'id': i, 'text': f'Record {i}'} for i in range(100)]
377 results = fsm.process_batch(
378 data=records,
379 batch_size=10,
380 max_workers=4,
381 on_progress=on_progress
382 )
384 # Check results
385 successful = sum(1 for r in results if r['success'])
386 print(f"Processed {successful}/{len(records)} successfully")
387 ```
389 See Also:
390 - :class:`AsyncSimpleFSM`: Async version for production applications
391 - :class:`AdvancedFSM`: Full control with debugging capabilities
392 - :class:`DataHandlingMode`: Data processing mode options
393 - :func:`process_file`: Convenience function for file processing
394 - :func:`batch_process`: Convenience function for batch processing
395 - :func:`validate_data`: Convenience function for data validation
396 """
398 def __init__(
399 self,
400 config: str | Path | dict[str, Any],
401 data_mode: DataHandlingMode = DataHandlingMode.COPY,
402 resources: dict[str, Any] | None = None,
403 custom_functions: dict[str, Callable] | None = None
404 ):
405 """Initialize SimpleFSM from configuration.
407 Args:
408 config: FSM configuration. Can be:
409 - Path to YAML/JSON config file (str or Path)
410 - Dictionary containing config (inline configuration)
411 Must define states, arcs, and optionally resources.
412 data_mode: Data handling mode controlling how data is passed between
413 states. Options:
414 - DataHandlingMode.COPY (default): Deep copy for safety
415 - DataHandlingMode.REFERENCE: Lazy loading with locking
416 - DataHandlingMode.DIRECT: In-place modification (fastest)
417 resources: Optional resource configurations. Dict mapping resource
418 names to configuration dicts. Each config must have a 'type' key
419 (DATABASE, FILESYSTEM, HTTP, etc.) and type-specific parameters.
420 Example: {'db': {'type': 'DATABASE', 'backend': 'postgres', ...}}
421 custom_functions: Optional custom state functions. Dict mapping function
422 names to callables. Functions receive data dict and return data dict.
423 Function names must match 'function' fields in state definitions.
424 Example: {'my_func': lambda data: {'result': data['input'] * 2}}
426 Example:
427 From configuration file:
429 ```python
430 from dataknobs_fsm.api.simple import SimpleFSM
432 # Load from YAML file
433 fsm = SimpleFSM('config.yaml')
434 ```
436 With inline configuration:
438 ```python
439 config = {
440 'name': 'simple_pipeline',
441 'states': [
442 {'name': 'start', 'type': 'START'},
443 {'name': 'process', 'type': 'NORMAL', 'function': 'transform'},
444 {'name': 'end', 'type': 'END'}
445 ],
446 'arcs': [
447 {'from': 'start', 'to': 'process'},
448 {'from': 'process', 'to': 'end'}
449 ]
450 }
452 def transform(data):
453 data['transformed'] = True
454 return data
456 fsm = SimpleFSM(
457 config=config,
458 custom_functions={'transform': transform}
459 )
460 ```
462 With data mode selection:
464 ```python
465 from dataknobs_fsm.core.data_modes import DataHandlingMode
467 # Use COPY for safe concurrent processing
468 fsm_safe = SimpleFSM('config.yaml', data_mode=DataHandlingMode.COPY)
470 # Use REFERENCE for memory efficiency
471 fsm_efficient = SimpleFSM('config.yaml', data_mode=DataHandlingMode.REFERENCE)
473 # Use DIRECT for maximum performance (single-threaded only)
474 fsm_fast = SimpleFSM('config.yaml', data_mode=DataHandlingMode.DIRECT)
475 ```
477 With resources:
479 ```python
480 resources = {
481 'database': {
482 'type': 'DATABASE',
483 'backend': 'postgres',
484 'host': 'localhost',
485 'database': 'mydb',
486 'user': 'admin',
487 'password': 'secret'
488 },
489 'http_client': {
490 'type': 'HTTP',
491 'base_url': 'https://api.example.com',
492 'timeout': 30
493 }
494 }
496 fsm = SimpleFSM('config.yaml', resources=resources)
497 ```
498 """
499 # Store data_mode for compatibility
500 self.data_mode = data_mode
502 # Create the async FSM
503 self._async_fsm = AsyncSimpleFSM(
504 config=config,
505 data_mode=data_mode,
506 resources=resources,
507 custom_functions=custom_functions
508 )
510 # Expose internal attributes for compatibility
511 self._fsm = self._async_fsm._fsm
512 self._resource_manager = self._async_fsm._resource_manager
513 self._async_engine = self._async_fsm._async_engine
515 # Create synchronous engine for compatibility
516 from ..execution.engine import ExecutionEngine
517 self._engine = ExecutionEngine(self._fsm)
519 # Create a dedicated event loop for sync operations
520 self._loop: asyncio.AbstractEventLoop | None = None
521 self._loop_thread: threading.Thread | None = None
522 self._setup_event_loop()
524 def _setup_event_loop(self) -> None:
525 """Set up a dedicated event loop in a separate thread."""
526 self._loop = asyncio.new_event_loop()
528 def run_loop() -> None:
529 asyncio.set_event_loop(self._loop)
530 self._loop.run_forever()
532 self._loop_thread = threading.Thread(target=run_loop, daemon=True)
533 self._loop_thread.start()
535 def _run_async(self, coro: Any) -> Any:
536 """Run an async operation in the dedicated event loop.
538 Args:
539 coro: Coroutine to run
541 Returns:
542 The result of the coroutine
543 """
544 if not self._loop or not self._loop.is_running():
545 self._setup_event_loop()
547 if self._loop is None:
548 raise RuntimeError("Failed to setup event loop")
550 future = asyncio.run_coroutine_threadsafe(coro, self._loop)
551 return future.result()
553 def process(
554 self,
555 data: dict[str, Any] | Record,
556 initial_state: str | None = None,
557 timeout: float | None = None
558 ) -> dict[str, Any]:
559 """Process a single record through the FSM synchronously.
561 Args:
562 data: Input data to process
563 initial_state: Optional starting state (defaults to FSM start state)
564 timeout: Optional timeout in seconds
566 Returns:
567 Dict containing the processed result with fields:
568 - final_state: Name of the final state reached
569 - data: The transformed data
570 - path: List of states traversed
571 - success: Whether processing succeeded
572 - error: Any error message (None if successful)
573 """
574 # Create the coroutine with the async process method
575 async def _process():
576 # Import here to avoid circular dependency
577 from ..core.context_factory import ContextFactory
578 from ..core.modes import ProcessingMode
579 from ..core.result_formatter import ResultFormatter
581 # Convert to Record if needed
582 if isinstance(data, dict):
583 from dataknobs_data import Record
584 record = Record(data)
585 else:
586 record = data
588 # Create context
589 context = ContextFactory.create_context(
590 fsm=self._fsm,
591 data=record,
592 initial_state=initial_state,
593 data_mode=ProcessingMode.SINGLE,
594 resource_manager=self._resource_manager
595 )
597 try:
598 # Execute FSM asynchronously
599 success, result = await self._async_engine.execute(context)
601 # Format result
602 return ResultFormatter.format_single_result(
603 context=context,
604 success=success,
605 result=result
606 )
607 except asyncio.TimeoutError:
608 # Return error result instead of raising
609 return ResultFormatter.format_error_result(
610 context=context,
611 error=TimeoutError(f"FSM execution exceeded timeout of {timeout} seconds")
612 )
613 except Exception as e:
614 return ResultFormatter.format_error_result(
615 context=context,
616 error=e
617 )
619 if timeout:
620 # Use threading for timeout support
621 import concurrent.futures
622 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
623 future = executor.submit(self._run_async, _process())
624 try:
625 return future.result(timeout=timeout)
626 except concurrent.futures.TimeoutError:
627 future.cancel()
628 # Return an error result instead of raising
629 return {
630 'success': False,
631 'error': f"FSM execution exceeded timeout of {timeout} seconds",
632 'final_state': None,
633 'data': data if isinstance(data, dict) else data.data,
634 'path': []
635 }
636 else:
637 return self._run_async(_process())
639 def process_batch(
640 self,
641 data: list[dict[str, Any] | Record],
642 batch_size: int = 10,
643 max_workers: int = 4,
644 on_progress: Callable | None = None
645 ) -> list[dict[str, Any]]:
646 """Process multiple records in parallel batches synchronously.
648 Args:
649 data: List of input records to process
650 batch_size: Number of records per batch
651 max_workers: Maximum parallel workers
652 on_progress: Optional callback for progress updates
654 Returns:
655 List of results for each input record
656 """
657 return self._run_async(
658 self._async_fsm.process_batch(
659 data=data,
660 batch_size=batch_size,
661 max_workers=max_workers,
662 on_progress=on_progress
663 )
664 )
666 def process_stream(
667 self,
668 source: str | Any,
669 sink: str | None = None,
670 chunk_size: int = 100,
671 on_progress: Callable | None = None,
672 input_format: str = 'auto',
673 text_field_name: str = 'text',
674 csv_delimiter: str = ',',
675 csv_has_header: bool = True,
676 skip_empty_lines: bool = True,
677 use_streaming: bool = False
678 ) -> dict[str, Any]:
679 """Process a stream of data through the FSM synchronously.
681 Args:
682 source: Data source file path or async iterator
683 sink: Optional output destination
684 chunk_size: Size of processing chunks
685 on_progress: Optional progress callback
686 input_format: Input file format ('auto', 'jsonl', 'json', 'csv', 'text')
687 text_field_name: Field name for text lines when converting to dict
688 csv_delimiter: CSV delimiter character
689 csv_has_header: Whether CSV file has header row
690 skip_empty_lines: Skip empty lines in text files
691 use_streaming: Use memory-efficient streaming for large files
693 Returns:
694 Dict containing stream processing statistics
695 """
696 # If source is a string (file path), use the async version directly
697 if isinstance(source, str):
698 return self._run_async(
699 self._async_fsm.process_stream(
700 source=source,
701 sink=sink,
702 chunk_size=chunk_size,
703 on_progress=on_progress,
704 input_format=input_format,
705 text_field_name=text_field_name,
706 csv_delimiter=csv_delimiter,
707 csv_has_header=csv_has_header,
708 skip_empty_lines=skip_empty_lines,
709 use_streaming=use_streaming
710 )
711 )
712 else:
713 # Source is an async iterator, need to handle it properly
714 async def _process():
715 return await self._async_fsm.process_stream(
716 source=source,
717 sink=sink,
718 chunk_size=chunk_size,
719 on_progress=on_progress,
720 input_format=input_format,
721 text_field_name=text_field_name,
722 csv_delimiter=csv_delimiter,
723 csv_has_header=csv_has_header,
724 skip_empty_lines=skip_empty_lines,
725 use_streaming=use_streaming
726 )
727 return self._run_async(_process())
729 def validate(self, data: dict[str, Any] | Record) -> dict[str, Any]:
730 """Validate data against FSM's start state schema synchronously.
732 Args:
733 data: Data to validate
735 Returns:
736 Dict containing validation results
737 """
738 return self._run_async(self._async_fsm.validate(data))
740 def get_states(self) -> list[str]:
741 """Get list of all state names in the FSM."""
742 return self._async_fsm.get_states()
744 def get_resources(self) -> list[str]:
745 """Get list of registered resource names."""
746 return self._async_fsm.get_resources()
748 @property
749 def config(self) -> Any:
750 """Get the FSM configuration object."""
751 return self._async_fsm._config
753 def close(self) -> None:
754 """Clean up resources and close connections synchronously."""
755 self._run_async(self._async_fsm.close())
757 # Shut down the event loop
758 if self._loop and self._loop.is_running():
759 self._loop.call_soon_threadsafe(self._loop.stop)
760 if self._loop_thread and self._loop_thread.is_alive():
761 self._loop_thread.join(timeout=1.0)
763 async def aclose(self) -> None:
764 """Async version of close for use in async contexts."""
765 await self._async_fsm.close()
768def create_fsm(
769 config: str | Path | dict[str, Any],
770 custom_functions: dict[str, Callable] | None = None,
771 **kwargs
772) -> SimpleFSM:
773 """Factory function to create a SimpleFSM instance.
775 Args:
776 config: Configuration file path or dictionary
777 custom_functions: Optional custom functions to register
778 **kwargs: Additional arguments passed to SimpleFSM
780 Returns:
781 Configured SimpleFSM instance
782 """
783 return SimpleFSM(config, custom_functions=custom_functions, **kwargs)
786# Convenience functions for common operations
788def process_file(
789 fsm_config: str | Path | dict[str, Any],
790 input_file: str,
791 output_file: str | None = None,
792 input_format: str = 'auto',
793 chunk_size: int = 1000,
794 timeout: float | None = None,
795 text_field_name: str = 'text',
796 csv_delimiter: str = ',',
797 csv_has_header: bool = True,
798 skip_empty_lines: bool = True,
799 use_streaming: bool = False
800) -> dict[str, Any]:
801 """Process a file through an FSM with automatic format detection.
803 Args:
804 fsm_config: FSM configuration
805 input_file: Path to input file
806 output_file: Optional output file path (format auto-detected from extension)
807 input_format: Input format ('auto', 'jsonl', 'json', 'csv', 'text')
808 chunk_size: Processing chunk size
809 timeout: Optional timeout in seconds for processing
810 text_field_name: Field name for text lines when converting to dict
811 csv_delimiter: CSV delimiter character
812 csv_has_header: Whether CSV file has header row
813 skip_empty_lines: Skip empty lines in text files
814 use_streaming: Use memory-efficient streaming for large files
816 Returns:
817 Processing statistics
819 Examples:
820 # Process plain text file
821 results = process_file('config.yaml', 'input.txt', 'output.jsonl')
823 # Process large CSV file with streaming
824 results = process_file('config.yaml', 'large_data.csv', 'results.json', use_streaming=True)
826 # Process with custom text field name
827 results = process_file('config.yaml', 'input.txt', text_field_name='content')
828 """
829 fsm = create_fsm(fsm_config)
831 try:
832 if timeout:
833 # Use threading timeout
834 import concurrent.futures
836 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
837 future = executor.submit(
838 fsm.process_stream,
839 source=input_file,
840 sink=output_file,
841 chunk_size=chunk_size,
842 input_format=input_format,
843 text_field_name=text_field_name,
844 csv_delimiter=csv_delimiter,
845 csv_has_header=csv_has_header,
846 skip_empty_lines=skip_empty_lines,
847 use_streaming=use_streaming
848 )
849 try:
850 result = future.result(timeout=timeout)
851 except concurrent.futures.TimeoutError as e:
852 future.cancel()
853 raise TimeoutError(f"File processing exceeded timeout of {timeout} seconds") from e
854 else:
855 result = fsm.process_stream(
856 source=input_file,
857 sink=output_file,
858 chunk_size=chunk_size,
859 input_format=input_format,
860 text_field_name=text_field_name,
861 csv_delimiter=csv_delimiter,
862 csv_has_header=csv_has_header,
863 skip_empty_lines=skip_empty_lines,
864 use_streaming=use_streaming
865 )
866 return result
867 finally:
868 fsm.close()
871def validate_data(
872 fsm_config: str | Path | dict[str, Any],
873 data: list[dict[str, Any]]
874) -> list[dict[str, Any]]:
875 """Validate multiple data records against FSM schema.
877 Args:
878 fsm_config: FSM configuration
879 data: List of data records to validate
881 Returns:
882 List of validation results
883 """
884 fsm = create_fsm(fsm_config)
886 try:
887 results = []
888 for record in data:
889 results.append(fsm.validate(record))
890 return results
891 finally:
892 fsm.close()
895def batch_process(
896 fsm_config: str | Path | dict[str, Any],
897 data: list[dict[str, Any] | Record],
898 batch_size: int = 10,
899 max_workers: int = 4,
900 timeout: float | None = None
901) -> list[dict[str, Any]]:
902 """Process multiple records in parallel.
904 Args:
905 fsm_config: FSM configuration
906 data: List of input records
907 batch_size: Batch size for processing
908 max_workers: Maximum parallel workers
909 timeout: Optional timeout in seconds for entire batch processing
911 Returns:
912 List of processing results
913 """
914 fsm = create_fsm(fsm_config)
916 try:
917 if timeout:
918 # Use threading timeout for batch processing
919 import concurrent.futures
921 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
922 future = executor.submit(
923 fsm.process_batch,
924 data=data,
925 batch_size=batch_size,
926 max_workers=max_workers
927 )
928 try:
929 return future.result(timeout=timeout)
930 except concurrent.futures.TimeoutError as e:
931 future.cancel()
932 raise TimeoutError(f"Batch processing exceeded timeout of {timeout} seconds") from e
933 else:
934 return fsm.process_batch(
935 data=data,
936 batch_size=batch_size,
937 max_workers=max_workers
938 )
939 finally:
940 fsm.close()