Coverage for src/dataknobs_fsm/utils/streaming_file_utils.py: 88%

260 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-31 10:30 -0600

1"""Streaming file utilities for processing large files efficiently. 

2 

3This module provides memory-efficient streaming utilities for reading and writing 

4large files that may not fit in memory. 

5""" 

6 

7import asyncio 

8import csv 

9import json 

10from collections import deque 

11from pathlib import Path 

12from typing import Any, AsyncIterator, Callable, Dict, List, Tuple, Union 

13 

14from dataknobs_fsm.streaming.core import StreamChunk, StreamConfig, StreamMetrics 

15from dataknobs_fsm.utils.file_utils import detect_format, get_csv_delimiter 

16 

17 

18class StreamingFileReader: 

19 """Memory-efficient streaming file reader with chunking support.""" 

20 

21 def __init__( 

22 self, 

23 file_path: Union[str, Path], 

24 chunk_size: int = 1000, 

25 input_format: str = 'auto', 

26 text_field_name: str = 'text', 

27 csv_delimiter: str = ',', 

28 csv_has_header: bool = True, 

29 skip_empty_lines: bool = True, 

30 max_memory_mb: int = 100 

31 ): 

32 """Initialize streaming file reader. 

33 

34 Args: 

35 file_path: Path to input file 

36 chunk_size: Number of records per chunk 

37 input_format: File format ('auto', 'jsonl', 'json', 'csv', 'text') 

38 text_field_name: Field name for text lines 

39 csv_delimiter: CSV delimiter character 

40 csv_has_header: Whether CSV has header row 

41 skip_empty_lines: Skip empty lines in text files 

42 max_memory_mb: Maximum memory usage in MB 

43 """ 

44 self.file_path = Path(file_path) 

45 self.chunk_size = chunk_size 

46 self.text_field_name = text_field_name 

47 self.csv_delimiter = csv_delimiter 

48 self.csv_has_header = csv_has_header 

49 self.skip_empty_lines = skip_empty_lines 

50 self.max_memory_mb = max_memory_mb 

51 

52 # Auto-detect format if needed 

53 if input_format == 'auto': 

54 self.format = detect_format(self.file_path) 

55 if self.format == 'csv' and self.file_path.suffix.lower() == '.tsv': 

56 self.csv_delimiter = '\t' 

57 else: 

58 self.format = input_format 

59 

60 self.metrics = StreamMetrics() 

61 self._chunk_count = 0 

62 

63 async def read_chunks(self) -> AsyncIterator[StreamChunk]: 

64 """Read file in chunks, yielding StreamChunk objects. 

65 

66 Yields: 

67 StreamChunk objects containing batches of records 

68 """ 

69 self.metrics.start_time = asyncio.get_event_loop().time() 

70 

71 try: 

72 if self.format == 'jsonl': 

73 async for chunk in self._read_jsonl_chunks(): 

74 yield chunk 

75 elif self.format == 'json': 

76 async for chunk in self._read_json_chunks(): 

77 yield chunk 

78 elif self.format == 'csv': 

79 async for chunk in self._read_csv_chunks(): 

80 yield chunk 

81 elif self.format == 'text': 

82 async for chunk in self._read_text_chunks(): 

83 yield chunk 

84 else: 

85 raise ValueError(f"Unsupported format: {self.format}") 

86 finally: 

87 self.metrics.end_time = asyncio.get_event_loop().time() 

88 

89 async def _read_jsonl_chunks(self) -> AsyncIterator[StreamChunk]: 

90 """Read JSONL file in chunks.""" 

91 chunk_data = [] 

92 

93 with open(self.file_path) as f: 

94 for line in f: 

95 if line.strip(): 

96 try: 

97 record = json.loads(line) 

98 chunk_data.append(record) 

99 self.metrics.items_processed += 1 

100 

101 if len(chunk_data) >= self.chunk_size: 

102 yield self._create_chunk(chunk_data) 

103 chunk_data = [] 

104 

105 # Allow other tasks to run 

106 await asyncio.sleep(0) 

107 except json.JSONDecodeError: 

108 self.metrics.errors_count += 1 

109 continue 

110 

111 # Yield remaining data 

112 if chunk_data: 

113 yield self._create_chunk(chunk_data, is_last=True) 

114 

115 async def _read_json_chunks(self) -> AsyncIterator[StreamChunk]: 

116 """Read JSON file in chunks (for arrays).""" 

117 import ijson 

118 

119 with open(self.file_path, 'rb') as f: 

120 # First try to parse as an array with streaming 

121 try: 

122 parser = ijson.items(f, 'item') 

123 chunk_data = [] 

124 item_count = 0 

125 

126 for item in parser: 

127 chunk_data.append(item) 

128 item_count += 1 

129 self.metrics.items_processed += 1 

130 

131 if len(chunk_data) >= self.chunk_size: 

132 yield self._create_chunk(chunk_data) 

133 chunk_data = [] 

134 await asyncio.sleep(0) 

135 

136 if chunk_data: 

137 yield self._create_chunk(chunk_data, is_last=True) 

138 elif item_count == 0: 

139 # No items found, might be a single object 

140 raise ValueError("No array items found") 

141 

142 except (ijson.JSONError, ValueError): 

143 # Not an array or empty, try as single object 

144 f.seek(0) 

145 with open(self.file_path) as text_f: 

146 data = json.load(text_f) 

147 

148 if isinstance(data, list): 

149 # It's an array, process in chunks 

150 for i in range(0, len(data), self.chunk_size): 

151 chunk = data[i:i + self.chunk_size] 

152 is_last = (i + self.chunk_size) >= len(data) 

153 self.metrics.items_processed += len(chunk) 

154 yield self._create_chunk(chunk, is_last=is_last) 

155 await asyncio.sleep(0) 

156 else: 

157 # Single object 

158 self.metrics.items_processed += 1 

159 yield self._create_chunk([data], is_last=True) 

160 

161 async def _read_csv_chunks(self) -> AsyncIterator[StreamChunk]: 

162 """Read CSV file in chunks.""" 

163 chunk_data = [] 

164 total_rows = 0 

165 

166 with open(self.file_path, newline='') as f: 

167 if self.csv_has_header: 

168 reader = csv.DictReader(f, delimiter=self.csv_delimiter) 

169 else: 

170 # For headerless CSV, create field names 

171 first_line = f.readline() 

172 f.seek(0) 

173 num_fields = len(first_line.split(self.csv_delimiter)) 

174 fieldnames = [f'col_{i}' for i in range(num_fields)] 

175 reader = csv.DictReader(f, fieldnames=fieldnames, delimiter=self.csv_delimiter) 

176 

177 # Count total rows for determining last chunk 

178 reader_list = list(reader) 

179 total_rows = len(reader_list) 

180 

181 for idx, row in enumerate(reader_list): 

182 chunk_data.append(dict(row)) # Convert OrderedDict to dict 

183 self.metrics.items_processed += 1 

184 

185 if len(chunk_data) >= self.chunk_size: 

186 # Check if this will be the last chunk 

187 is_last = (idx + 1) >= total_rows 

188 yield self._create_chunk(chunk_data, is_last=is_last) 

189 chunk_data = [] 

190 await asyncio.sleep(0) 

191 

192 if chunk_data: 

193 yield self._create_chunk(chunk_data, is_last=True) 

194 

195 async def _read_text_chunks(self) -> AsyncIterator[StreamChunk]: 

196 """Read text file in chunks.""" 

197 chunk_data = [] 

198 

199 with open(self.file_path) as f: 

200 for line in f: 

201 sline = line.rstrip('\n\r') 

202 if sline or not self.skip_empty_lines: 

203 chunk_data.append({self.text_field_name: sline}) 

204 self.metrics.items_processed += 1 

205 

206 if len(chunk_data) >= self.chunk_size: 

207 yield self._create_chunk(chunk_data) 

208 chunk_data = [] 

209 await asyncio.sleep(0) 

210 

211 if chunk_data: 

212 yield self._create_chunk(chunk_data, is_last=True) 

213 

214 def _create_chunk(self, data: List[Dict[str, Any]], is_last: bool = False) -> StreamChunk: 

215 """Create a StreamChunk from data.""" 

216 chunk = StreamChunk( 

217 data=data, 

218 sequence_number=self._chunk_count, 

219 metadata={ 

220 'file': str(self.file_path), 

221 'format': self.format, 

222 'chunk_size': len(data) 

223 }, 

224 is_last=is_last 

225 ) 

226 self._chunk_count += 1 

227 self.metrics.chunks_processed += 1 

228 return chunk 

229 

230 

231class StreamingFileWriter: 

232 """Memory-efficient streaming file writer with buffering.""" 

233 

234 def __init__( 

235 self, 

236 file_path: Union[str, Path], 

237 output_format: str | None = None, 

238 buffer_size: int = 1000, 

239 flush_interval: float = 1.0 

240 ): 

241 """Initialize streaming file writer. 

242 

243 Args: 

244 file_path: Path to output file 

245 output_format: Output format (auto-detected if None) 

246 buffer_size: Number of records to buffer before writing 

247 flush_interval: Time interval (seconds) to flush buffer 

248 """ 

249 self.file_path = Path(file_path) 

250 self.buffer_size = buffer_size 

251 self.flush_interval = flush_interval 

252 

253 # Auto-detect format 

254 self.format = output_format or detect_format(self.file_path, for_output=True) 

255 

256 self._buffer: deque = deque() 

257 self._file_handle: Any | None = None 

258 self._csv_writer: csv.DictWriter | None = None 

259 self._last_flush_time = asyncio.get_event_loop().time() 

260 self._is_first_write = True 

261 self.metrics = StreamMetrics() 

262 

263 async def __aenter__(self): 

264 """Async context manager entry.""" 

265 self.open() 

266 return self 

267 

268 async def __aexit__(self, exc_type, exc_val, exc_tb): 

269 """Async context manager exit.""" 

270 await self.close() 

271 

272 def open(self): 

273 """Open the output file.""" 

274 if self.format == 'jsonl': 

275 self._file_handle = open(self.file_path, 'w') 

276 elif self.format == 'csv': 

277 self._file_handle = open(self.file_path, 'w', newline='') 

278 elif self.format == 'json': 

279 self._file_handle = open(self.file_path, 'w') 

280 self._file_handle.write('[') # Start JSON array 

281 elif self.format == 'text': 

282 self._file_handle = open(self.file_path, 'w') 

283 else: 

284 self._file_handle = open(self.file_path, 'w') 

285 

286 self.metrics.start_time = asyncio.get_event_loop().time() 

287 

288 async def write_chunk(self, chunk: StreamChunk) -> None: 

289 """Write a chunk of data to the file. 

290 

291 Args: 

292 chunk: StreamChunk to write 

293 """ 

294 if not self._file_handle: 

295 self.open() 

296 

297 # Add chunk data to buffer 

298 if isinstance(chunk.data, list): 

299 self._buffer.extend(chunk.data) 

300 else: 

301 self._buffer.append(chunk.data) 

302 

303 # Check if we should flush 

304 current_time = asyncio.get_event_loop().time() 

305 should_flush = ( 

306 len(self._buffer) >= self.buffer_size or 

307 chunk.is_last or 

308 (current_time - self._last_flush_time) > self.flush_interval 

309 ) 

310 

311 if should_flush: 

312 await self._flush_buffer() 

313 self._last_flush_time = current_time 

314 

315 self.metrics.chunks_processed += 1 

316 

317 async def _flush_buffer(self) -> None: 

318 """Flush the buffer to file.""" 

319 if not self._buffer or not self._file_handle: 

320 return 

321 

322 if self.format == 'jsonl': 

323 # Write each record as a JSON line 

324 while self._buffer: 

325 record = self._buffer.popleft() 

326 json.dump(record, self._file_handle) 

327 self._file_handle.write('\n') 

328 self.metrics.items_processed += 1 

329 

330 elif self.format == 'csv': 

331 # Initialize CSV writer if needed 

332 if self._csv_writer is None and self._buffer: 

333 first_record = self._buffer[0] 

334 fieldnames = list(first_record.keys()) 

335 delimiter = get_csv_delimiter(self.file_path) 

336 self._csv_writer = csv.DictWriter( 

337 self._file_handle, 

338 fieldnames=fieldnames, 

339 delimiter=delimiter 

340 ) 

341 self._csv_writer.writeheader() 

342 

343 # Write records 

344 while self._buffer: 

345 record = self._buffer.popleft() 

346 self._csv_writer.writerow(record) 

347 self.metrics.items_processed += 1 

348 

349 elif self.format == 'json': 

350 # Write as JSON array elements 

351 while self._buffer: 

352 record = self._buffer.popleft() 

353 if not self._is_first_write: 

354 self._file_handle.write(',') 

355 json.dump(record, self._file_handle) 

356 self._is_first_write = False 

357 self.metrics.items_processed += 1 

358 

359 elif self.format == 'text': 

360 # Write text lines 

361 while self._buffer: 

362 record = self._buffer.popleft() 

363 # Extract text from dict if needed 

364 if isinstance(record, dict): 

365 text = record.get('text', str(record)) 

366 else: 

367 text = str(record) 

368 self._file_handle.write(text + '\n') 

369 self.metrics.items_processed += 1 

370 

371 # Flush to disk 

372 self._file_handle.flush() 

373 

374 # Allow other tasks to run 

375 await asyncio.sleep(0) 

376 

377 async def close(self) -> None: 

378 """Close the file and flush remaining buffer.""" 

379 if self._buffer: 

380 await self._flush_buffer() 

381 

382 if self._file_handle: 

383 if self.format == 'json': 

384 self._file_handle.write(']') # Close JSON array 

385 

386 self._file_handle.close() 

387 self._file_handle = None 

388 

389 self.metrics.end_time = asyncio.get_event_loop().time() 

390 

391 

392class StreamingFileProcessor: 

393 """High-level streaming file processor combining reader and writer.""" 

394 

395 def __init__( 

396 self, 

397 input_path: Union[str, Path], 

398 output_path: Union[str, Path], 

399 transform_fn: Callable[[Dict[str, Any]], Dict[str, Any]] | None = None, 

400 chunk_size: int = 1000, 

401 input_format: str = 'auto', 

402 output_format: str | None = None 

403 ): 

404 """Initialize streaming file processor. 

405 

406 Args: 

407 input_path: Input file path 

408 output_path: Output file path 

409 transform_fn: Optional transformation function for each record 

410 chunk_size: Records per chunk 

411 input_format: Input file format 

412 output_format: Output file format (auto-detected if None) 

413 """ 

414 self.reader = StreamingFileReader( 

415 input_path, 

416 chunk_size=chunk_size, 

417 input_format=input_format 

418 ) 

419 self.writer = StreamingFileWriter( 

420 output_path, 

421 output_format=output_format, 

422 buffer_size=chunk_size 

423 ) 

424 self.transform_fn = transform_fn or (lambda x: x) 

425 

426 async def process(self, progress_callback: Callable[[int, int], None] | None = None) -> StreamMetrics: 

427 """Process the file with streaming. 

428 

429 Args: 

430 progress_callback: Optional callback for progress updates (items_processed, total_chunks) 

431 

432 Returns: 

433 Combined metrics from processing 

434 """ 

435 async with self.writer: 

436 total_items = 0 

437 

438 async for chunk in self.reader.read_chunks(): 

439 # Transform each record in the chunk 

440 transformed_data = [] 

441 for record in chunk.data: 

442 try: 

443 transformed = self.transform_fn(record) 

444 if transformed is not None: 

445 transformed_data.append(transformed) 

446 except Exception: 

447 self.reader.metrics.errors_count += 1 

448 continue 

449 

450 # Create new chunk with transformed data 

451 if transformed_data: 

452 transformed_chunk = StreamChunk( 

453 data=transformed_data, 

454 sequence_number=chunk.sequence_number, 

455 metadata=chunk.metadata, 

456 is_last=chunk.is_last 

457 ) 

458 await self.writer.write_chunk(transformed_chunk) 

459 

460 total_items += len(chunk.data) 

461 

462 # Report progress 

463 if progress_callback: 

464 progress_callback(total_items, self.reader._chunk_count) 

465 

466 # Combine metrics 

467 combined_metrics = StreamMetrics( 

468 chunks_processed=self.reader.metrics.chunks_processed, 

469 items_processed=self.reader.metrics.items_processed, 

470 errors_count=self.reader.metrics.errors_count, 

471 start_time=self.reader.metrics.start_time, 

472 end_time=self.writer.metrics.end_time 

473 ) 

474 

475 return combined_metrics 

476 

477 

478# Convenience functions for SimpleFSM integration 

479 

480async def create_streaming_file_reader( 

481 file_path: Union[str, Path], 

482 config: StreamConfig, 

483 **kwargs 

484) -> AsyncIterator[List[Dict[str, Any]]]: 

485 """Create a streaming file reader compatible with SimpleFSM. 

486 

487 Args: 

488 file_path: Input file path 

489 config: Stream configuration 

490 **kwargs: Additional reader parameters 

491 

492 Yields: 

493 Lists of records (chunks) 

494 """ 

495 reader = StreamingFileReader( 

496 file_path, 

497 chunk_size=config.chunk_size, 

498 **kwargs 

499 ) 

500 

501 async for chunk in reader.read_chunks(): 

502 yield chunk.data 

503 

504 

505async def create_streaming_file_writer( 

506 file_path: Union[str, Path], 

507 config: StreamConfig, 

508 **kwargs 

509) -> Tuple[Callable, Callable]: 

510 """Create a streaming file writer compatible with SimpleFSM. 

511 

512 Args: 

513 file_path: Output file path 

514 config: Stream configuration 

515 **kwargs: Additional writer parameters 

516 

517 Returns: 

518 Tuple of (write_fn, cleanup_fn) 

519 """ 

520 writer = StreamingFileWriter( 

521 file_path, 

522 buffer_size=config.buffer_size, 

523 **kwargs 

524 ) 

525 

526 writer.open() 

527 

528 async def write_fn(results: List[Dict[str, Any]]) -> None: 

529 """Write results to file.""" 

530 chunk = StreamChunk(data=results) 

531 await writer.write_chunk(chunk) 

532 

533 async def cleanup_fn() -> None: 

534 """Close and cleanup.""" 

535 await writer.close() 

536 

537 return write_fn, cleanup_fn