416 lines
14 KiB
Python
416 lines
14 KiB
Python
#!/user/bin/env python3
|
|
"""
|
|
AUTOMOTIVE POWER SIMULATOR APP - MAIN.PY
|
|
- ENTRY POINT OF APPLICATION
|
|
|
|
VERSION: 1.0
|
|
|
|
AUTHOR: D. RICE 06/01/2026
|
|
© 2026 ARRIVE
|
|
"""
|
|
|
|
# Imports
|
|
import sys
|
|
from PySide6.QtWidgets import (QApplication, QDialog, QMainWindow)
|
|
from PySide6.QtCore import (QIODevice, QByteArray, QTimer)
|
|
from PySide6.QtSerialPort import (QSerialPort, QSerialPortInfo)
|
|
|
|
import time
|
|
import struct
|
|
|
|
from ui_mainwindow import Ui_MainWindow
|
|
from ui_scanning import Ui_scanningDialog
|
|
from ui_connerror import Ui_connerrorDialog
|
|
|
|
# Scanning Window Class
|
|
class ScanningWindow(QDialog):
|
|
def __init__(self, parent=None):
|
|
# Initialise print window
|
|
super(ScanningWindow, self).__init__(parent)
|
|
# Finish print window initial setup
|
|
self.ui = Ui_scanningDialog()
|
|
self.ui.setupUi(self)
|
|
|
|
|
|
# No device warning Window Class
|
|
class NoDeviceWindow(QDialog):
|
|
def __init__(self, parent=None):
|
|
# Initialise print window
|
|
super(NoDeviceWindow, self).__init__(parent)
|
|
# Finish print window initial setup
|
|
self.ui = Ui_connerrorDialog()
|
|
self.ui.setupUi(self)
|
|
|
|
# Static method to create the dialog and return response
|
|
@staticmethod
|
|
def get_response(parent=None):
|
|
dialog = NoDeviceWindow(parent)
|
|
results = dialog.exec()
|
|
return results
|
|
|
|
|
|
# Main window class
|
|
class MainWindow(QMainWindow):
|
|
def __init__(self):
|
|
# Initialise main window
|
|
super(MainWindow, self).__init__()
|
|
|
|
# Finish main window initial setup
|
|
self.ui = Ui_MainWindow()
|
|
self.ui.setupUi(self)
|
|
|
|
# Set up timer prototype
|
|
self.counter = 0
|
|
|
|
# Hold process reference.
|
|
self.p = None
|
|
|
|
# Add start button action
|
|
self.ui.connButton.setCheckable(True)
|
|
self.ui.connButton.clicked.connect(self.conn_button_press)
|
|
|
|
# Add global power button action
|
|
self.ui.powerButton.setCheckable(True)
|
|
self.ui.powerButton.clicked.connect(self.power_button_press)
|
|
|
|
# Add radio button actions
|
|
self.ui.radiolocal.toggled.connect(self.handle_all_radios)
|
|
self.ui.radioscript.toggled.connect(self.handle_all_radios)
|
|
self.ui.radiopulse.toggled.connect(self.handle_all_radios)
|
|
|
|
# Hide other controls
|
|
self.ui.sn.setHidden(True)
|
|
self.ui.snLabel.setHidden(True)
|
|
self.ui.fw.setHidden(True)
|
|
self.ui.fwLabel.setHidden(True)
|
|
self.ui.radiolocal.setHidden(True)
|
|
self.ui.radioscript.setHidden(True)
|
|
self.ui.radiopulse.setHidden(True)
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
# Set initial size
|
|
initial_width = 1000
|
|
initial_height = 600
|
|
|
|
# Lock the window size by setting min/max to the same values
|
|
self.setMinimumSize(initial_width, initial_height)
|
|
self.setMaximumSize(initial_width, initial_height)
|
|
|
|
self.setGeometry(0, 0, 1000, 600)
|
|
|
|
self.setFixedSize(1000, 600)
|
|
|
|
# Setup serial port
|
|
self.serial = QSerialPort(self)
|
|
self.serial.readyRead.connect(self.on_data_received)
|
|
|
|
# Create serial rx flag
|
|
self.serial_rx_flag = False
|
|
|
|
# Create serial rx buffer
|
|
self.receive_buffer = bytearray()
|
|
|
|
# Create store buffer
|
|
self.store_buffer = bytearray()
|
|
|
|
# Create a timeout counter flag
|
|
self.timeoutflag = False
|
|
|
|
# Create a window response variable
|
|
self.windowresponse = False
|
|
|
|
# Setup QT timers
|
|
self.timeouttimer = QTimer()
|
|
self.timeouttimer.setInterval(5000)
|
|
self.timeouttimer.timeout.connect(self.time_out_timer)
|
|
|
|
self.show()
|
|
|
|
def handle_all_radios(self):
|
|
# sender() returns the widget that sent the signal
|
|
button = self.sender()
|
|
|
|
if button.isChecked():
|
|
if button.text() == "LOCAL CONTROL":
|
|
self.ui.powerButton.setHidden(False)
|
|
self.ui.setvLabel.setHidden(False)
|
|
self.ui.setv.setHidden(False)
|
|
self.ui.actvLabel.setHidden(False)
|
|
self.ui.actv.setHidden(False)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
elif button.text() == "SCRIPT CONTROL":
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(False)
|
|
self.ui.script.setHidden(False)
|
|
self.ui.openButton.setHidden(False)
|
|
self.ui.runButton.setHidden(False)
|
|
|
|
elif button.text() == "PULSE TRIGGERED":
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
else:
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
def power_button_press (self):
|
|
text_pow_state = self.ui.powerButton.text()
|
|
|
|
if text_pow_state == "POWER OFF":
|
|
self.ui.powerButton.setText("POWER ON")
|
|
|
|
command = 83 #0x53 'S' ASCII
|
|
state = 1
|
|
data = (command, state)
|
|
byte_data = self.pack_integers_to_bytes(*data)
|
|
|
|
self.serial.write(QByteArray(byte_data))
|
|
|
|
else:
|
|
self.ui.powerButton.setText("POWER OFF")
|
|
|
|
command = 83 #0x53 'S' ASCII
|
|
state = 0
|
|
data = (command, state)
|
|
byte_data = self.pack_integers_to_bytes(*data)
|
|
|
|
self.serial.write(QByteArray(byte_data))
|
|
|
|
|
|
def conn_button_press (self):
|
|
text_conn = self.ui.connButton.text()
|
|
|
|
if text_conn == "CONNECT":
|
|
self.ui.connButton.setText("DISCONNECT")
|
|
|
|
w = ScanningWindow(self)
|
|
w.show()
|
|
|
|
QApplication.processEvents()
|
|
|
|
# Specify serial port and baud
|
|
self.serial.setPortName("/dev/ttyACM0")
|
|
self.serial.setBaudRate(115200)
|
|
|
|
if not self.serial.open(QIODevice.ReadWrite):
|
|
print(f"Failed to open port: {self.serial.errorString()}")
|
|
|
|
command = 73 #0x49 'I' ASCII
|
|
state = 0
|
|
data = (command, state)
|
|
byte_data = self.pack_integers_to_bytes(*data)
|
|
|
|
self.serial.write(QByteArray(byte_data))
|
|
|
|
self.timeouttimer.start()
|
|
|
|
while ((self.serial_rx_flag == False) and (self.timeoutflag == False)):
|
|
QApplication.processEvents()
|
|
|
|
w.close()
|
|
|
|
if self.timeoutflag == True:
|
|
self.windowresponse = NoDeviceWindow.get_response(self)
|
|
self.serial.close()
|
|
self.ui.connButton.setText("CONNECT")
|
|
self.ui.sn.setHidden(True)
|
|
self.ui.snLabel.setHidden(True)
|
|
self.ui.fw.setHidden(True)
|
|
self.ui.fwLabel.setHidden(True)
|
|
self.ui.sn.setText("---")
|
|
self.ui.fw.setText("---")
|
|
self.ui.radiolocal.setHidden(True)
|
|
self.ui.radioscript.setHidden(True)
|
|
self.ui.radiopulse.setHidden(True)
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
else:
|
|
self.process_serial_number()
|
|
|
|
|
|
self.serial_rx_flag = False
|
|
self.timeoutflag = False
|
|
|
|
else:
|
|
self.serial.close()
|
|
self.ui.connButton.setText("CONNECT")
|
|
self.ui.sn.setHidden(True)
|
|
self.ui.snLabel.setHidden(True)
|
|
self.ui.fw.setHidden(True)
|
|
self.ui.fwLabel.setHidden(True)
|
|
self.ui.sn.setText("---")
|
|
self.ui.fw.setText("---")
|
|
self.ui.radiolocal.setHidden(True)
|
|
self.ui.radioscript.setHidden(True)
|
|
self.ui.radiopulse.setHidden(True)
|
|
self.ui.powerButton.setHidden(True)
|
|
self.ui.setvLabel.setHidden(True)
|
|
self.ui.setv.setHidden(True)
|
|
self.ui.actvLabel.setHidden(True)
|
|
self.ui.actv.setHidden(True)
|
|
self.ui.scriptLabel.setHidden(True)
|
|
self.ui.script.setHidden(True)
|
|
self.ui.openButton.setHidden(True)
|
|
self.ui.runButton.setHidden(True)
|
|
|
|
QApplication.processEvents()
|
|
|
|
def on_data_received(self):
|
|
# Triggered whenever new bytes are received
|
|
# Read all available data and add to buffer
|
|
new_data = self.serial.readAll().data()
|
|
self.receive_buffer.extend(new_data)
|
|
|
|
# Process buffer to find packets
|
|
self.process_buffer()
|
|
|
|
def process_buffer(self):
|
|
# Look for sync bytes 0x41 & 0x52
|
|
while len(self.receive_buffer) >= 5: # Minimum posible packet size
|
|
# Look for sync bytes
|
|
if self.receive_buffer[0] == 0x41 and self.receive_buffer[1] == 0x52:
|
|
data_len = self.receive_buffer[2]
|
|
total_packet_size = 3 + data_len + 2
|
|
|
|
# Is it a full packet yet?
|
|
if len(self.receive_buffer) >= total_packet_size:
|
|
packet = self.receive_buffer[:total_packet_size]
|
|
del self.receive_buffer[:total_packet_size] # Clear packet from buffer
|
|
|
|
# Handle packet
|
|
self.handle_valid_packet(packet)
|
|
|
|
else:
|
|
# Wait for more data to arrive
|
|
break
|
|
|
|
else:
|
|
# Not a syns byte so discard the first byte and keep looking
|
|
self.receive_buffer.pop(0)
|
|
|
|
def handle_valid_packet(self, packet):
|
|
# Checksum validity check
|
|
try:
|
|
data_bytes = list(packet[3 : -2])
|
|
received_checksum = (packet[-2] << 8) | packet[-1]
|
|
|
|
if((~sum(data_bytes)) & 0xFFFF) == received_checksum:
|
|
self.serial_rx_flag = True
|
|
self.store_buffer = data_bytes
|
|
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
pass
|
|
|
|
def process_serial_number(self):
|
|
# Get and update serial number
|
|
sub_sn = self.store_buffer[0:19]
|
|
sn_chars = [chr(c) for c in sub_sn if 0 <= c <= 127]
|
|
sn_str = "".join(sn_chars)
|
|
self.ui.sn.setText(sn_str)
|
|
|
|
# Get and update FW Rev
|
|
sub_fw = self.store_buffer[19:22]
|
|
|
|
major = sub_fw[1:2]
|
|
sep = sub_fw[0:1]
|
|
minor = sub_fw[2:3]
|
|
full = major + sep + minor
|
|
fw_chars = [chr(c) for c in full if 0 <= c <= 127]
|
|
fw_str = "".join(fw_chars)
|
|
self.ui.fw.setText(fw_str)
|
|
|
|
self.ui.sn.setHidden(False)
|
|
self.ui.snLabel.setHidden(False)
|
|
self.ui.fw.setHidden(False)
|
|
self.ui.fwLabel.setHidden(False)
|
|
self.ui.radiolocal.setHidden(False)
|
|
self.ui.radioscript.setHidden(False)
|
|
self.ui.radiopulse.setHidden(False)
|
|
self.ui.radiolocal.setChecked(True)
|
|
|
|
|
|
def pack_integers_to_bytes(self, *integers: int) -> bytes:
|
|
# Setup header
|
|
sync_bytes = [0x41, 0x52]
|
|
|
|
# Process data
|
|
# Mask each input to 8-bits to prevent packing errors
|
|
data_bytes = [i & 0xFF for i in integers]
|
|
|
|
# Get length
|
|
data_length = [len(data_bytes) & 0xFF]
|
|
|
|
# Calculate checksum
|
|
# Summingall data bytes, then applying bitwise NOT
|
|
total_sum = sum(data_bytes)
|
|
rx_checksum = ~total_sum
|
|
|
|
rx_checksum_h = (rx_checksum >> 8) & 0xFF
|
|
rx_checksum_l = rx_checksum & 0xFF
|
|
checksum_bytes = [rx_checksum_h,rx_checksum_l]
|
|
|
|
# Combine all components
|
|
full_packet = sync_bytes + data_length + data_bytes + checksum_bytes
|
|
|
|
# Dymanically create format string
|
|
# '!' = Network order, 'B' = unsigned char
|
|
# We need as many 'B's as there are elements in the full packet
|
|
format_string = f"!{len(full_packet)}B"
|
|
|
|
return struct.pack(format_string, *full_packet)
|
|
|
|
def time_out_timer(self):
|
|
self.timeoutflag = True
|
|
|
|
# Run main
|
|
if __name__ == '__main__':
|
|
# Launch main window
|
|
app = QApplication(sys.argv)
|
|
app.setStyle('Fusion')
|
|
window = MainWindow()
|
|
window.show()
|
|
app.exec()
|
|
|
|
sys.exit()
|