#!/user/bin/env python3 """ AUTOMOTIVE POWER SIMULATOR APP - MAIN.PY - ENTRY POINT OF APPLICATION VERSION: 1.0 AUTHOR: D. RICE 14/11/2025 © 2025 ARRIVE """ # Imports import sys from PySide6.QtWidgets import (QApplication, QDialog, QMainWindow) from PySide6.QtCore import (QThread, QObject, Signal, Slot) import time from ui_mainwindow import Ui_MainWindow from ui_scanning import Ui_scanningDialog from ui_connerror import Ui_connerrorDialog # Scanning Window Class class ScanningWindow(QDialog): def __init__(self, parent=None): # Initialise print window super(ScanningWindow, self).__init__(parent) # Finish print window initial setup self.ui = Ui_scanningDialog() self.ui.setupUi(self) # No device warning Window Class class NoDeviceWindow(QDialog): def __init__(self, parent=None): # Initialise print window super(NoDeviceWindow, self).__init__(parent) # Finish print window initial setup self.ui = Ui_connerrorDialog() self.ui.setupUi(self) # Static method to create the dialog and return response @staticmethod def get_response(parent=None): dialog = NoDeviceWindow(parent) results = dialog.exec() return results # Main window class class MainWindow(QMainWindow): def __init__(self): # Initialise main window super(MainWindow, self).__init__() # Finish main window initial setup self.ui = Ui_MainWindow() self.ui.setupUi(self) # Set up timer prototype self.counter = 0 # Hold process reference. self.p = None # Thread and Worker initialisation self.worker_thread = None self.worker = None # Add start button action self.ui.connButton.setCheckable(True) self.ui.connButton.clicked.connect(self.conn_button_press) # Set initial size initial_width = 1024 initial_height = 768 # Lock the window size by setting min/max to the same values self.setMinimumSize(initial_width, initial_height) self.setMaximumSize(initial_width, initial_height) self.setGeometry(0, 0, 1024, 768) self.setFixedSize(1024, 768) self.show() def conn_button_press (self): global IDN_Response_flag global Error_flag text_conn = self.ui.connButton.text() if text_conn == "CONNECT": self.ui.connButton.setText("DISCONNECT") w = ScanningWindow(self) w.show() QApplication.processEvents() # Run Connection actions here #self.start_worker() while ((IDN_Response_flag == False) and (Error_flag == False)): QApplication.processEvents() if IDN_Response_flag == True: w.close() IDN_Response_flag = False Error_flag = False self.run_scope_setup() if Error_flag == True: response = NoDeviceWindow.get_response(self) self.ui.connButton.setText("CONNECT") self.closeEvent() self.ui.manu.setText("---") self.ui.model.setText("---") self.ui.sn.setText("---") self.ui.fw.setText("---") IDN_Response_flag = False Error_flag = False QApplication.processEvents() else: self.ui.connButton.setText("CONNECT") self.closeEvent() self.ui.manu.setText("---") self.ui.model.setText("---") self.ui.sn.setText("---") self.ui.fw.setText("---") QApplication.processEvents() def run_scope_setup (self): self.worker.run_setup() @Slot() def start_worker(self): # Create a QThread self.worker_thread = QThread() #nCreate the worker QObject (Now created directly, no import needed) self.worker = ScopeWorker() # Move the worker object to the thread self.worker.moveToThread(self.worker_thread) # Connect Signals self.worker_thread.started.connect(self.worker.run) self.worker.idn_signal.connect(self.handle_idn) self.worker.data_ready_signal.connect(self.handle_data_ready) self.worker.error_signal.connect(self.handle_error) # Clean-up connections self.worker.finished_signal.connect(self.worker_thread.quit) self.worker.finished_signal.connect(self.worker.deleteLater) self.worker_thread.finished.connect(self.worker_thread.deleteLater) # Start the QThread, which triggers worker.run self.worker_thread.start() @Slot() def signal_acquisition(self): """Sends the signal to the worker to start pulling data.""" if self.worker: self.log_text.append("Main: Sending START_ACQUISITION signal.") self.status_label.setText("Status: Acquiring data...") # This is safe because start_acquisition only sets a flag self.worker.start_acquisition() # --- SLOTS: Receive data safely from the worker thread --- @Slot(str) def handle_idn(self, idn_string): global IDN_Response_flag global Error_flag # Received the initial IDN from the worker IDN_Response_flag = True # Split the string by the comma delimiter components = idn_string.strip().split(',') # Split the string by the comma delimiter components = idn_string.strip().split(',') if len(components) == 4: # Assign the components to meaningful keys in a dictionary parsed_data = { "manufacturer": components[0], "model": components[1], "serial_number": components[2], "firmware_version": components[3] } self.ui.manu.setText(parsed_data.get("manufacturer")) self.ui.model.setText(parsed_data.get("model")) self.ui.sn.setText(parsed_data.get("serial_number")) self.ui.fw.setText(parsed_data.get("firmware_version")) else: Error_flag = True self.ui.manu.setText("---") self.ui.model.setText("---") self.ui.sn.setText("---") self.ui.fw.setText("---") @Slot(str) def handle_data_ready(self, data_snippet): """Received the acquired data from the worker.""" self.status_label.setText("Status: Data Ready. Waiting for next command.") self.log_text.append(f"Main: Received Waveform Data Snippet:\n{data_snippet}") @Slot(str) def handle_error(self, message): global Error_flag Error_flag = True def closeEvent(self): """Handle application close to stop the thread gracefully.""" if self.worker_thread and self.worker_thread.isRunning(): self.worker.stop() # Wait a moment for the thread to clean up self.worker_thread.wait(500) # Run main if __name__ == '__main__': # Launch main window app = QApplication(sys.argv) app.setStyle('Fusion') window = MainWindow() window.show() app.exec() sys.exit()