Source code for bci_framework.framework.subprocess_handler

"""
==================
Subprocess handler
==================
"""

import os
import sys
import socket
import logging
import subprocess
from queue import Queue, Empty
from urllib import request
from contextlib import closing
from typing import TypeVar, Optional

from PySide2.QtCore import QTimer, QSize
from PySide2.QtWebEngineWidgets import QWebEngineView, QWebEnginePage

from .nbstreamreader import NonBlockingStreamReader as NBSR
from ..extensions import properties as prop


COMMAND = TypeVar('Command')
PATH = TypeVar('Path')


# ----------------------------------------------------------------------
[docs]def run_subprocess(call: COMMAND) -> subprocess.Popen: """Run a python script with non blocking debugger installed.""" my_env = os.environ.copy() my_env['PYTHONPATH'] = ":".join( sys.path + [os.path.join(os.path.dirname(sys.argv[0]))]) sub = subprocess.Popen(call, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=my_env, preexec_fn=os.setsid, shell=False, # universal_newlines=True, # bufsize=1, ) sub.nb_stdout = NBSR(sub.stdout) return sub
########################################################################
[docs]class BrythonLogging: """Log messages from Brython.""" # ---------------------------------------------------------------------- def __init__(self): """""" self.message = Queue() # ----------------------------------------------------------------------
[docs] def feed(self, level: int, message: str, lineNumber: int, sourceID: str) -> None: """Concatenae messages.""" self.message.put(message)
# ----------------------------------------------------------------------
[docs] def readline(self, timeout: Optional[int] = None) -> str: """Get mesage from JavaScriptConsole.""" if self.message.qsize(): try: return self.message.get(block=timeout is not None, timeout=timeout) except Empty: return None
########################################################################
[docs]class VisualizationSubprocess: """Define matplotlib properties and start the auto-resizer.""" # ---------------------------------------------------------------------- def viz_auto_size(self, timer: bool = True) -> None: """""" if self.stopped: return dpi = prop.DPI f = dpi / self.main.DPI try: size = self.main.web_engine.size() if self.plot_size != size or self.plot_dpi != self.main.DPI: if 'light' in os.environ.get('QTMATERIAL_THEME'): background = 'ffffff' else: background = '000000' self.main.web_engine.setUrl( self.url + f'/?width={f * size.width() / dpi:.2f}&height={f * size.height() / dpi:.2f}&dpi={dpi/f:.2f}&background={background}') self.plot_dpi = self.main.DPI self.plot_size = size except: pass if timer: self.timer.singleShot(1000, self.viz_auto_size) # ---------------------------------------------------------------------- def viz_debug(self) -> None: """""" self.stdout = self.subprocess_script.nb_stdout # ---------------------------------------------------------------------- def viz_start(self) -> None: """""" self.timer.singleShot(1000, self.viz_auto_size)
########################################################################
[docs]class StimuliSubprocess: """Connect with Brython logs.""" # ---------------------------------------------------------------------- def stm_debug(self) -> None: """""" console = BrythonLogging() self.web_engine_page = QWebEnginePage(self.main.web_engine) self.web_engine_page.javaScriptConsoleMessage = console.feed self.main.web_engine.setPage(self.web_engine_page) self.stdout = console self.web_engine_page .profile().clearHttpCache() self.stm_start() # ---------------------------------------------------------------------- def stm_start(self) -> None: """""" self.main.web_engine.setUrl(self.url)
######################################################################## class LoadSubprocess(VisualizationSubprocess, StimuliSubprocess): """""" # ---------------------------------------------------------------------- def __init__(self, parent, path: Optional[PATH] = None, use_webview: Optional[bool] = True, debugger: Optional[bool] = False): """""" self.main = parent self.web_view = self.main.gridLayout_webview self.plot_size = QSize(0, 0) self.plot_dpi = 0 self.stopped = False # self.is_analysis = not use_webview # self.is_visualization = use_webview # self.is_stimuli = use_webview self.debugger = debugger if path: self.load_path(path) # ---------------------------------------------------------------------- def load_path(self, path: PATH) -> None: """Load Python scipt.""" self.timer = QTimer() self.is_analysis = self.file_is_analysis(path) if not self.is_analysis: self.port = self.get_free_port() else: self.port = '' self.subprocess_script = run_subprocess( [sys.executable, path, self.port]) if self.is_analysis: self.is_visualization = False self.is_stimuli = False pass # self.start_debug() else: self.prepare_webview() # ---------------------------------------------------------------------- def file_is_analysis(self, path): """""" with open(path, 'r') as file: return 'data_analysis import DataAnalysis' in file.read() # ---------------------------------------------------------------------- def prepare_webview(self) -> None: """Try to load the webview.""" # Try to get mode try: self.mode = request.urlopen( f'http://localhost:{self.port}/mode', timeout=10).read().decode() except: # if fail self.timer.singleShot(100, self.prepare_webview) # call again return # and only when the mode is explicit... if self.mode == 'visualization': self.is_visualization = True self.is_stimuli = False self.is_analysis = False endpoint = '' elif self.mode == 'stimuli': self.is_visualization = False self.is_stimuli = True self.is_analysis = False endpoint = 'dashboard' # self.main.widget_development_webview.show() self.url = f'http://localhost:{self.port}/{endpoint}' self.load_webview() # ---------------------------------------------------------------------- def stop_preview(self) -> None: """Kill the subprocess and crear the webview.""" self.timer.stop() self.stopped = True if hasattr(self, 'subprocess_script'): self.subprocess_script.nb_stdout.stop() self.subprocess_script.terminate() if hasattr(self, 'subprocess_script'): self.timer.singleShot( 300, lambda: self.__delattr__('subprocess_script')) if hasattr(self, 'web_engine_page'): try: self.web_engine_page.deleteLater() except: # already deleted. pass # TODO: A wait page could be a god idea self.main.widget_development_webview.hide() if hasattr(self.main, 'web_engine'): self.main.web_engine.setUrl('about:blank') # ---------------------------------------------------------------------- def load_webview(self) -> None: """After the process starting, set the URL into the webview.""" self.main.widget_development_webview.show() # Create main QWebEngineView object if not hasattr(self.main, 'web_engine'): self.main.web_engine = QWebEngineView() self.web_view.addWidget(self.main.web_engine) else: # self.main.web_engine.deleteLater() # self.main.web_engine = QWebEngineView() self.web_view.addWidget(self.main.web_engine) # Set URL and start interface if self.is_visualization: self.viz_start() elif self.is_stimuli: self.stm_start() # ---------------------------------------------------------------------- def start_debug(self) -> None: """Try to start the debugger.""" if self.debugger: try: if self.is_stimuli: self.stm_debug() elif self.is_visualization or self.is_analysis: self.viz_debug() except Exception as e: pass # ---------------------------------------------------------------------- def reload(self) -> None: """Restart the webview.""" self.main.web_engine.setUrl(self.url) # ---------------------------------------------------------------------- def get_free_port(self) -> str: """Get any free port available.""" with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port = str(s.getsockname()[1]) logging.info(f'Free port found in {port}') return port