Coverage for src/dataknobs_fsm/utils/streaming_file_utils.py: 88%
260 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 10:30 -0600
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-31 10:30 -0600
1"""Streaming file utilities for processing large files efficiently.
3This module provides memory-efficient streaming utilities for reading and writing
4large files that may not fit in memory.
5"""
7import asyncio
8import csv
9import json
10from collections import deque
11from pathlib import Path
12from typing import Any, AsyncIterator, Callable, Dict, List, Tuple, Union
14from dataknobs_fsm.streaming.core import StreamChunk, StreamConfig, StreamMetrics
15from dataknobs_fsm.utils.file_utils import detect_format, get_csv_delimiter
18class StreamingFileReader:
19 """Memory-efficient streaming file reader with chunking support."""
21 def __init__(
22 self,
23 file_path: Union[str, Path],
24 chunk_size: int = 1000,
25 input_format: str = 'auto',
26 text_field_name: str = 'text',
27 csv_delimiter: str = ',',
28 csv_has_header: bool = True,
29 skip_empty_lines: bool = True,
30 max_memory_mb: int = 100
31 ):
32 """Initialize streaming file reader.
34 Args:
35 file_path: Path to input file
36 chunk_size: Number of records per chunk
37 input_format: File format ('auto', 'jsonl', 'json', 'csv', 'text')
38 text_field_name: Field name for text lines
39 csv_delimiter: CSV delimiter character
40 csv_has_header: Whether CSV has header row
41 skip_empty_lines: Skip empty lines in text files
42 max_memory_mb: Maximum memory usage in MB
43 """
44 self.file_path = Path(file_path)
45 self.chunk_size = chunk_size
46 self.text_field_name = text_field_name
47 self.csv_delimiter = csv_delimiter
48 self.csv_has_header = csv_has_header
49 self.skip_empty_lines = skip_empty_lines
50 self.max_memory_mb = max_memory_mb
52 # Auto-detect format if needed
53 if input_format == 'auto':
54 self.format = detect_format(self.file_path)
55 if self.format == 'csv' and self.file_path.suffix.lower() == '.tsv':
56 self.csv_delimiter = '\t'
57 else:
58 self.format = input_format
60 self.metrics = StreamMetrics()
61 self._chunk_count = 0
63 async def read_chunks(self) -> AsyncIterator[StreamChunk]:
64 """Read file in chunks, yielding StreamChunk objects.
66 Yields:
67 StreamChunk objects containing batches of records
68 """
69 self.metrics.start_time = asyncio.get_event_loop().time()
71 try:
72 if self.format == 'jsonl':
73 async for chunk in self._read_jsonl_chunks():
74 yield chunk
75 elif self.format == 'json':
76 async for chunk in self._read_json_chunks():
77 yield chunk
78 elif self.format == 'csv':
79 async for chunk in self._read_csv_chunks():
80 yield chunk
81 elif self.format == 'text':
82 async for chunk in self._read_text_chunks():
83 yield chunk
84 else:
85 raise ValueError(f"Unsupported format: {self.format}")
86 finally:
87 self.metrics.end_time = asyncio.get_event_loop().time()
89 async def _read_jsonl_chunks(self) -> AsyncIterator[StreamChunk]:
90 """Read JSONL file in chunks."""
91 chunk_data = []
93 with open(self.file_path) as f:
94 for line in f:
95 if line.strip():
96 try:
97 record = json.loads(line)
98 chunk_data.append(record)
99 self.metrics.items_processed += 1
101 if len(chunk_data) >= self.chunk_size:
102 yield self._create_chunk(chunk_data)
103 chunk_data = []
105 # Allow other tasks to run
106 await asyncio.sleep(0)
107 except json.JSONDecodeError:
108 self.metrics.errors_count += 1
109 continue
111 # Yield remaining data
112 if chunk_data:
113 yield self._create_chunk(chunk_data, is_last=True)
115 async def _read_json_chunks(self) -> AsyncIterator[StreamChunk]:
116 """Read JSON file in chunks (for arrays)."""
117 import ijson
119 with open(self.file_path, 'rb') as f:
120 # First try to parse as an array with streaming
121 try:
122 parser = ijson.items(f, 'item')
123 chunk_data = []
124 item_count = 0
126 for item in parser:
127 chunk_data.append(item)
128 item_count += 1
129 self.metrics.items_processed += 1
131 if len(chunk_data) >= self.chunk_size:
132 yield self._create_chunk(chunk_data)
133 chunk_data = []
134 await asyncio.sleep(0)
136 if chunk_data:
137 yield self._create_chunk(chunk_data, is_last=True)
138 elif item_count == 0:
139 # No items found, might be a single object
140 raise ValueError("No array items found")
142 except (ijson.JSONError, ValueError):
143 # Not an array or empty, try as single object
144 f.seek(0)
145 with open(self.file_path) as text_f:
146 data = json.load(text_f)
148 if isinstance(data, list):
149 # It's an array, process in chunks
150 for i in range(0, len(data), self.chunk_size):
151 chunk = data[i:i + self.chunk_size]
152 is_last = (i + self.chunk_size) >= len(data)
153 self.metrics.items_processed += len(chunk)
154 yield self._create_chunk(chunk, is_last=is_last)
155 await asyncio.sleep(0)
156 else:
157 # Single object
158 self.metrics.items_processed += 1
159 yield self._create_chunk([data], is_last=True)
161 async def _read_csv_chunks(self) -> AsyncIterator[StreamChunk]:
162 """Read CSV file in chunks."""
163 chunk_data = []
164 total_rows = 0
166 with open(self.file_path, newline='') as f:
167 if self.csv_has_header:
168 reader = csv.DictReader(f, delimiter=self.csv_delimiter)
169 else:
170 # For headerless CSV, create field names
171 first_line = f.readline()
172 f.seek(0)
173 num_fields = len(first_line.split(self.csv_delimiter))
174 fieldnames = [f'col_{i}' for i in range(num_fields)]
175 reader = csv.DictReader(f, fieldnames=fieldnames, delimiter=self.csv_delimiter)
177 # Count total rows for determining last chunk
178 reader_list = list(reader)
179 total_rows = len(reader_list)
181 for idx, row in enumerate(reader_list):
182 chunk_data.append(dict(row)) # Convert OrderedDict to dict
183 self.metrics.items_processed += 1
185 if len(chunk_data) >= self.chunk_size:
186 # Check if this will be the last chunk
187 is_last = (idx + 1) >= total_rows
188 yield self._create_chunk(chunk_data, is_last=is_last)
189 chunk_data = []
190 await asyncio.sleep(0)
192 if chunk_data:
193 yield self._create_chunk(chunk_data, is_last=True)
195 async def _read_text_chunks(self) -> AsyncIterator[StreamChunk]:
196 """Read text file in chunks."""
197 chunk_data = []
199 with open(self.file_path) as f:
200 for line in f:
201 sline = line.rstrip('\n\r')
202 if sline or not self.skip_empty_lines:
203 chunk_data.append({self.text_field_name: sline})
204 self.metrics.items_processed += 1
206 if len(chunk_data) >= self.chunk_size:
207 yield self._create_chunk(chunk_data)
208 chunk_data = []
209 await asyncio.sleep(0)
211 if chunk_data:
212 yield self._create_chunk(chunk_data, is_last=True)
214 def _create_chunk(self, data: List[Dict[str, Any]], is_last: bool = False) -> StreamChunk:
215 """Create a StreamChunk from data."""
216 chunk = StreamChunk(
217 data=data,
218 sequence_number=self._chunk_count,
219 metadata={
220 'file': str(self.file_path),
221 'format': self.format,
222 'chunk_size': len(data)
223 },
224 is_last=is_last
225 )
226 self._chunk_count += 1
227 self.metrics.chunks_processed += 1
228 return chunk
231class StreamingFileWriter:
232 """Memory-efficient streaming file writer with buffering."""
234 def __init__(
235 self,
236 file_path: Union[str, Path],
237 output_format: str | None = None,
238 buffer_size: int = 1000,
239 flush_interval: float = 1.0
240 ):
241 """Initialize streaming file writer.
243 Args:
244 file_path: Path to output file
245 output_format: Output format (auto-detected if None)
246 buffer_size: Number of records to buffer before writing
247 flush_interval: Time interval (seconds) to flush buffer
248 """
249 self.file_path = Path(file_path)
250 self.buffer_size = buffer_size
251 self.flush_interval = flush_interval
253 # Auto-detect format
254 self.format = output_format or detect_format(self.file_path, for_output=True)
256 self._buffer: deque = deque()
257 self._file_handle: Any | None = None
258 self._csv_writer: csv.DictWriter | None = None
259 self._last_flush_time = asyncio.get_event_loop().time()
260 self._is_first_write = True
261 self.metrics = StreamMetrics()
263 async def __aenter__(self):
264 """Async context manager entry."""
265 self.open()
266 return self
268 async def __aexit__(self, exc_type, exc_val, exc_tb):
269 """Async context manager exit."""
270 await self.close()
272 def open(self):
273 """Open the output file."""
274 if self.format == 'jsonl':
275 self._file_handle = open(self.file_path, 'w')
276 elif self.format == 'csv':
277 self._file_handle = open(self.file_path, 'w', newline='')
278 elif self.format == 'json':
279 self._file_handle = open(self.file_path, 'w')
280 self._file_handle.write('[') # Start JSON array
281 elif self.format == 'text':
282 self._file_handle = open(self.file_path, 'w')
283 else:
284 self._file_handle = open(self.file_path, 'w')
286 self.metrics.start_time = asyncio.get_event_loop().time()
288 async def write_chunk(self, chunk: StreamChunk) -> None:
289 """Write a chunk of data to the file.
291 Args:
292 chunk: StreamChunk to write
293 """
294 if not self._file_handle:
295 self.open()
297 # Add chunk data to buffer
298 if isinstance(chunk.data, list):
299 self._buffer.extend(chunk.data)
300 else:
301 self._buffer.append(chunk.data)
303 # Check if we should flush
304 current_time = asyncio.get_event_loop().time()
305 should_flush = (
306 len(self._buffer) >= self.buffer_size or
307 chunk.is_last or
308 (current_time - self._last_flush_time) > self.flush_interval
309 )
311 if should_flush:
312 await self._flush_buffer()
313 self._last_flush_time = current_time
315 self.metrics.chunks_processed += 1
317 async def _flush_buffer(self) -> None:
318 """Flush the buffer to file."""
319 if not self._buffer or not self._file_handle:
320 return
322 if self.format == 'jsonl':
323 # Write each record as a JSON line
324 while self._buffer:
325 record = self._buffer.popleft()
326 json.dump(record, self._file_handle)
327 self._file_handle.write('\n')
328 self.metrics.items_processed += 1
330 elif self.format == 'csv':
331 # Initialize CSV writer if needed
332 if self._csv_writer is None and self._buffer:
333 first_record = self._buffer[0]
334 fieldnames = list(first_record.keys())
335 delimiter = get_csv_delimiter(self.file_path)
336 self._csv_writer = csv.DictWriter(
337 self._file_handle,
338 fieldnames=fieldnames,
339 delimiter=delimiter
340 )
341 self._csv_writer.writeheader()
343 # Write records
344 while self._buffer:
345 record = self._buffer.popleft()
346 self._csv_writer.writerow(record)
347 self.metrics.items_processed += 1
349 elif self.format == 'json':
350 # Write as JSON array elements
351 while self._buffer:
352 record = self._buffer.popleft()
353 if not self._is_first_write:
354 self._file_handle.write(',')
355 json.dump(record, self._file_handle)
356 self._is_first_write = False
357 self.metrics.items_processed += 1
359 elif self.format == 'text':
360 # Write text lines
361 while self._buffer:
362 record = self._buffer.popleft()
363 # Extract text from dict if needed
364 if isinstance(record, dict):
365 text = record.get('text', str(record))
366 else:
367 text = str(record)
368 self._file_handle.write(text + '\n')
369 self.metrics.items_processed += 1
371 # Flush to disk
372 self._file_handle.flush()
374 # Allow other tasks to run
375 await asyncio.sleep(0)
377 async def close(self) -> None:
378 """Close the file and flush remaining buffer."""
379 if self._buffer:
380 await self._flush_buffer()
382 if self._file_handle:
383 if self.format == 'json':
384 self._file_handle.write(']') # Close JSON array
386 self._file_handle.close()
387 self._file_handle = None
389 self.metrics.end_time = asyncio.get_event_loop().time()
392class StreamingFileProcessor:
393 """High-level streaming file processor combining reader and writer."""
395 def __init__(
396 self,
397 input_path: Union[str, Path],
398 output_path: Union[str, Path],
399 transform_fn: Callable[[Dict[str, Any]], Dict[str, Any]] | None = None,
400 chunk_size: int = 1000,
401 input_format: str = 'auto',
402 output_format: str | None = None
403 ):
404 """Initialize streaming file processor.
406 Args:
407 input_path: Input file path
408 output_path: Output file path
409 transform_fn: Optional transformation function for each record
410 chunk_size: Records per chunk
411 input_format: Input file format
412 output_format: Output file format (auto-detected if None)
413 """
414 self.reader = StreamingFileReader(
415 input_path,
416 chunk_size=chunk_size,
417 input_format=input_format
418 )
419 self.writer = StreamingFileWriter(
420 output_path,
421 output_format=output_format,
422 buffer_size=chunk_size
423 )
424 self.transform_fn = transform_fn or (lambda x: x)
426 async def process(self, progress_callback: Callable[[int, int], None] | None = None) -> StreamMetrics:
427 """Process the file with streaming.
429 Args:
430 progress_callback: Optional callback for progress updates (items_processed, total_chunks)
432 Returns:
433 Combined metrics from processing
434 """
435 async with self.writer:
436 total_items = 0
438 async for chunk in self.reader.read_chunks():
439 # Transform each record in the chunk
440 transformed_data = []
441 for record in chunk.data:
442 try:
443 transformed = self.transform_fn(record)
444 if transformed is not None:
445 transformed_data.append(transformed)
446 except Exception:
447 self.reader.metrics.errors_count += 1
448 continue
450 # Create new chunk with transformed data
451 if transformed_data:
452 transformed_chunk = StreamChunk(
453 data=transformed_data,
454 sequence_number=chunk.sequence_number,
455 metadata=chunk.metadata,
456 is_last=chunk.is_last
457 )
458 await self.writer.write_chunk(transformed_chunk)
460 total_items += len(chunk.data)
462 # Report progress
463 if progress_callback:
464 progress_callback(total_items, self.reader._chunk_count)
466 # Combine metrics
467 combined_metrics = StreamMetrics(
468 chunks_processed=self.reader.metrics.chunks_processed,
469 items_processed=self.reader.metrics.items_processed,
470 errors_count=self.reader.metrics.errors_count,
471 start_time=self.reader.metrics.start_time,
472 end_time=self.writer.metrics.end_time
473 )
475 return combined_metrics
478# Convenience functions for SimpleFSM integration
480async def create_streaming_file_reader(
481 file_path: Union[str, Path],
482 config: StreamConfig,
483 **kwargs
484) -> AsyncIterator[List[Dict[str, Any]]]:
485 """Create a streaming file reader compatible with SimpleFSM.
487 Args:
488 file_path: Input file path
489 config: Stream configuration
490 **kwargs: Additional reader parameters
492 Yields:
493 Lists of records (chunks)
494 """
495 reader = StreamingFileReader(
496 file_path,
497 chunk_size=config.chunk_size,
498 **kwargs
499 )
501 async for chunk in reader.read_chunks():
502 yield chunk.data
505async def create_streaming_file_writer(
506 file_path: Union[str, Path],
507 config: StreamConfig,
508 **kwargs
509) -> Tuple[Callable, Callable]:
510 """Create a streaming file writer compatible with SimpleFSM.
512 Args:
513 file_path: Output file path
514 config: Stream configuration
515 **kwargs: Additional writer parameters
517 Returns:
518 Tuple of (write_fn, cleanup_fn)
519 """
520 writer = StreamingFileWriter(
521 file_path,
522 buffer_size=config.buffer_size,
523 **kwargs
524 )
526 writer.open()
528 async def write_fn(results: List[Dict[str, Any]]) -> None:
529 """Write results to file."""
530 chunk = StreamChunk(data=results)
531 await writer.write_chunk(chunk)
533 async def cleanup_fn() -> None:
534 """Close and cleanup."""
535 await writer.close()
537 return write_fn, cleanup_fn