Source code for bci_framework.framework.nbstreamreader
"""
==========================
Non blocking stream reader
==========================
"""
import time
from typing import Optional
from threading import Thread
from queue import Queue, Empty
########################################################################
[docs]class NonBlockingStreamReader:
"""Artificial `timeout` for blocking process.
Parameters
----------
stream
The stream to read from, usually a process' stdout or stderr.
"""
# ----------------------------------------------------------------------
def __init__(self, stream):
""""""
self._s = stream
self._q = Queue()
self.running = True
def _populateQueue(stream, queue):
'''
Collect lines from 'stream' and put them in 'quque'.
'''
while self.running:
line = stream.readline()
if line:
queue.put(line)
# else:
# pass
# raise UnexpectedEndOfStream
time.sleep(0.1)
self._t = Thread(target=_populateQueue, args=(self._s, self._q))
self._t.daemon = True
self._t.start() # start collecting lines from the stream
# ----------------------------------------------------------------------
[docs] def readline(self, timeout: Optional[int] = 0.1) -> None:
"""Read lines from queue object."""
try:
return self._q.get(block=timeout is not None, timeout=timeout)
# return self._q.get(block=True, timeout=0.1)
except Empty:
return None
# ----------------------------------------------------------------------
[docs] def stop(self) -> None:
"""Stop the readline."""
self.running = False