230 lines
6.7 KiB
Python
230 lines
6.7 KiB
Python
#!/user/bin/env python3
|
|
"""
|
|
AUTOMOTIVE POWER SIMULATOR APP - MAIN.PY
|
|
- ENTRY POINT OF APPLICATION
|
|
|
|
VERSION: 1.0
|
|
|
|
AUTHOR: D. RICE 06/01/2026
|
|
© 2026 ARRIVE
|
|
"""
|
|
|
|
# Imports
|
|
import sys
|
|
from PySide6.QtWidgets import (QApplication, QDialog, QMainWindow)
|
|
from PySide6.QtCore import (QIODevice, QByteArray)
|
|
from PySide6.QtSerialPort import (QSerialPort, QSerialPortInfo)
|
|
|
|
import time
|
|
import struct
|
|
|
|
from ui_mainwindow import Ui_MainWindow
|
|
from ui_scanning import Ui_scanningDialog
|
|
from ui_connerror import Ui_connerrorDialog
|
|
|
|
# Scanning Window Class
|
|
class ScanningWindow(QDialog):
|
|
def __init__(self, parent=None):
|
|
# Initialise print window
|
|
super(ScanningWindow, self).__init__(parent)
|
|
# Finish print window initial setup
|
|
self.ui = Ui_scanningDialog()
|
|
self.ui.setupUi(self)
|
|
|
|
|
|
# No device warning Window Class
|
|
class NoDeviceWindow(QDialog):
|
|
def __init__(self, parent=None):
|
|
# Initialise print window
|
|
super(NoDeviceWindow, self).__init__(parent)
|
|
# Finish print window initial setup
|
|
self.ui = Ui_connerrorDialog()
|
|
self.ui.setupUi(self)
|
|
|
|
# Static method to create the dialog and return response
|
|
@staticmethod
|
|
def get_response(parent=None):
|
|
dialog = NoDeviceWindow(parent)
|
|
results = dialog.exec()
|
|
return results
|
|
|
|
|
|
# Main window class
|
|
class MainWindow(QMainWindow):
|
|
def __init__(self):
|
|
# Initialise main window
|
|
super(MainWindow, self).__init__()
|
|
|
|
# Finish main window initial setup
|
|
self.ui = Ui_MainWindow()
|
|
self.ui.setupUi(self)
|
|
|
|
# Set up timer prototype
|
|
self.counter = 0
|
|
|
|
# Hold process reference.
|
|
self.p = None
|
|
|
|
# Thread and Worker initialisation
|
|
self.worker_thread = None
|
|
self.worker = None
|
|
|
|
# Add start button action
|
|
self.ui.connButton.setCheckable(True)
|
|
self.ui.connButton.clicked.connect(self.conn_button_press)
|
|
|
|
# Set initial size
|
|
initial_width = 1000
|
|
initial_height = 600
|
|
|
|
# Lock the window size by setting min/max to the same values
|
|
self.setMinimumSize(initial_width, initial_height)
|
|
self.setMaximumSize(initial_width, initial_height)
|
|
|
|
self.setGeometry(0, 0, 1000, 600)
|
|
|
|
self.setFixedSize(1000, 600)
|
|
|
|
# Setup serial port
|
|
self.serial = QSerialPort(self)
|
|
self.serial.readyRead.connect(self.on_data_received)
|
|
|
|
# Create serial rx flag
|
|
self.serial_rx_flag = False
|
|
|
|
# Create serial rx buffer
|
|
self.receive_buffer = bytearray()
|
|
|
|
self.show()
|
|
|
|
def conn_button_press (self):
|
|
text_conn = self.ui.connButton.text()
|
|
|
|
if text_conn == "CONNECT":
|
|
self.ui.connButton.setText("DISCONNECT")
|
|
|
|
w = ScanningWindow(self)
|
|
w.show()
|
|
|
|
QApplication.processEvents()
|
|
|
|
# Specify serial port and baud
|
|
self.serial.setPortName("/dev/ttyACM0")
|
|
self.serial.setBaudRate(115200)
|
|
|
|
if not self.serial.open(QIODevice.ReadWrite):
|
|
print(f"Failed to open port: {self.serial.errorString()}")
|
|
|
|
command = 73 #0x49 'I' ASCII
|
|
state = 0
|
|
data = (command, state)
|
|
byte_data = self.pack_integers_to_bytes(*data)
|
|
|
|
self.serial.write(QByteArray(byte_data))
|
|
|
|
while (self.serial_rx_flag == False):
|
|
QApplication.processEvents()
|
|
|
|
else:
|
|
self.serial.close()
|
|
self.ui.connButton.setText("CONNECT")
|
|
|
|
QApplication.processEvents()
|
|
|
|
def on_data_received(self):
|
|
# Triggered whenever new bytes are received
|
|
# Read all available data and add to buffer
|
|
new_data = self.serial.readAll().data()
|
|
self.receive_buffer.extend(new_data)
|
|
|
|
# Process buffer to find packets
|
|
self.process_buffer()
|
|
|
|
def process_buffer(self):
|
|
# Look for sync bytes 0x41 & 0x52
|
|
while len(self.receive_buffer) >= 5: # Minimum posible packet size
|
|
# Look for sync bytes
|
|
if self.receive_buffer[0] == 0x41 and self.receive_buffer[1] == 0x52:
|
|
data_len = self.receive_buffer[2]
|
|
total_packet_size = 3 + data_len + 2
|
|
|
|
# Is it a full packet yet?
|
|
if len(self.receive_buffer) >= total_packet_size:
|
|
packet = self.receive_buffer[:total_packet_size]
|
|
del self.receive_buffer[:total_packet_size] # Clear packet from buffer
|
|
|
|
# Handle packet
|
|
self.handle_valid_packet(packet)
|
|
|
|
else:
|
|
# Wait for more data to arrive
|
|
break
|
|
|
|
else:
|
|
# Not a syns byte so discard the first byte and keep looking
|
|
self.receive_buffer.pop(0)
|
|
|
|
def handle_valid_packet(self, packet):
|
|
# Checksum validity check
|
|
try:
|
|
print (packet)
|
|
data_bytes = list(packet[3 : -2])
|
|
received_checksum = (packet[-2] << 8) | packet[-1]
|
|
|
|
if((~sum(data_bytes)) & 0xFFFF) == received_checksum:
|
|
print(f"RX DATA: {data_bytes}")
|
|
|
|
else:
|
|
pass
|
|
|
|
except Exception as e:
|
|
pass
|
|
|
|
|
|
|
|
|
|
def pack_integers_to_bytes(self, *integers: int) -> bytes:
|
|
# Setup header
|
|
sync_bytes = [0x41, 0x52]
|
|
|
|
# Process data
|
|
# Mask each input to 8-bits to prevent packing errors
|
|
data_bytes = [i & 0xFF for i in integers]
|
|
|
|
# Get length
|
|
data_length = [len(data_bytes) & 0xFF]
|
|
|
|
# Calculate checksum
|
|
# Summingall data bytes, then applying bitwise NOT
|
|
total_sum = sum(data_bytes)
|
|
rx_checksum = ~total_sum
|
|
|
|
rx_checksum_h = (rx_checksum >> 8) & 0xFF
|
|
rx_checksum_l = rx_checksum & 0xFF
|
|
checksum_bytes = [rx_checksum_h,rx_checksum_l]
|
|
|
|
# Combine all components
|
|
full_packet = sync_bytes + data_length + data_bytes + checksum_bytes
|
|
|
|
# Dymanically create format string
|
|
# '!' = Network order, 'B' = unsigned char
|
|
# We need as many 'B's as there are elements in the full packet
|
|
format_string = f"!{len(full_packet)}B"
|
|
|
|
return struct.pack(format_string, *full_packet)
|
|
|
|
def run_scope_setup (self):
|
|
self.worker.run_setup()
|
|
|
|
|
|
# Run main
|
|
if __name__ == '__main__':
|
|
# Launch main window
|
|
app = QApplication(sys.argv)
|
|
app.setStyle('Fusion')
|
|
window = MainWindow()
|
|
window.show()
|
|
app.exec()
|
|
|
|
sys.exit()
|