import serial import struct import random import threading import time import sys from pathlib import Path from datetime import datetime import pyvisa import vxi11 # Initial state toggle_state = 0 #ser = serial.Serial(port='/dev/ttyACM0', baudrate=115200) VISA_ADDRESS = "TCPIP::192.168.45.3::INSTR" rm = pyvisa.ResourceManager() SPD = rm.open_resource( VISA_ADDRESS, write_termination='\n', read_termination='\n' ) class DS1202ZE(vxi11.Instrument): def __init__(self, host, *args, **kwargs): super(DS1202ZE, self).__init__(host, *args, **kwargs) def get_identification(self): return self.ask("*IDN?") def get_vavg_channel1(self): return self.ask(":MEAS:ITEM? VAVG, CHAN1\n") def get_vavg_channel2(self): return self.ask(":MEAS:ITEM? VAVG, CHAN2\n") class Tee(object): def __init__(self, filename): self.terminal = sys.stdout self.log = open(filename, "w") # Use "a" to append def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): # This flush method is needed for python 3 compatibility. self.terminal.flush() self.log.flush() def pack_integers_to_bytes(*integers: int) -> bytes: # Setup Header (Sync bytes) sync_bytes = [0x41, 0x52] # Process Data # Mask each input to 8-bit to prevent packing errors data_bytes = [i & 0xFF for i in integers] # Get length data_length = [len(data_bytes) & 0xFF] # Calculate Checksum # Summing all data bytes, then applying bitwise NOT total_sum = sum(data_bytes) rx_checksum = ~total_sum rx_checksum_h = (rx_checksum >> 8) & 0xFF rx_checksum_l = rx_checksum & 0xFF checksum_bytes = [rx_checksum_h, rx_checksum_l] # Combine all components full_packet = sync_bytes + data_length + data_bytes + checksum_bytes # Dynamically create format string # '!' = Network order, 'B' = unsigned char # We need as many 'B's as there are elements in full_packet format_string = f"!{len(full_packet)}B" return struct.pack(format_string, *full_packet) def set_voltage(): global SPD voltage = random.randint(9, 30) voltage_f = float(voltage) SPD.write('CH1:VOLT ' + str(voltage_f)) # Send power supply source voltage print(f"[{time.strftime('%H:%M:%S')}] VOLTAGE SET TO: " + str(voltage) + "V") # Schedule the NEXT random trigger # random.uniform(60, 360) provides the random delay in seconds delay = random.uniform(60, 360) # Create and start a new timer thread timer = threading.Timer(delay, set_voltage) timer.start() def trigger_serial_command(): global toggle_state global ser global min_delay_input_f global max_delay_input_f toggle_state = 1 - toggle_state if toggle_state == 0: state = 0 print(f"[{time.strftime('%H:%M:%S')}] SWITCH SET TO: OFF") else: state = 1 print(f"[{time.strftime('%H:%M:%S')}] SWITCH SET TO: ON") # Serial logic print(f"[{time.strftime('%H:%M:%S')}] SENDING SERIAL COMMAND...") command = 115 # 0x73 's' ASCII data = (command, state) byte_data = pack_integers_to_bytes(*data) #ser.write(serial.to_bytes(byte_data)) # Schedule the NEXT random trigger # random.uniform(0.1, 100.0) provides the random delay in seconds delay = random.uniform(min_delay_input_f, max_delay_input_f) print(f"[{time.strftime('%H:%M:%S')}] NEXT COMMAND IN {delay:.2f} SECONDS...") # Create and start a new timer thread timer = threading.Timer(delay, trigger_serial_command) timer.start() # Run main if __name__ == '__main__': now = datetime.now() # Format as string: YYYY-MM-DD HH:MM:SS timestamp_str = now.strftime("%Y_%m_%d_%H_%M_%S") build_path = "~/Python/Automotive-Power-Simulator-App/LOGS/" + timestamp_str + ".txt" log_file = Path(build_path).expanduser() sys.stdout = Tee(log_file) print(f"[{time.strftime('%H:%M:%S')}] LOGGING TO: " + str(log_file)) SPD.write('CH1:VOLT ' + str(24.0)) # Send power supply source voltage SPD.write('CH1:CURR ' + str(1.5)) # Send power supply current limit # Enable power supply output source SPD.write('OUTP CH1,ON') # Enable supply output # Get time constants print (f"[{time.strftime('%H:%M:%S')}] PLEASE ENTER A MINIMUM DELAY VALUE...") min_delay_input = input() min_delay_input_f = float(min_delay_input) print(f"[{time.strftime('%H:%M:%S')}] MINIMUM TIME DELAY: " + str(min_delay_input_f)) # Get time constants print (f"[{time.strftime('%H:%M:%S')}] PLEASE ENTER A MAXIMUM DELAY VALUE...") max_delay_input = input() max_delay_input_f = float(max_delay_input) print(f"[{time.strftime('%H:%M:%S')}] MAXIMUM TIME DELAY: " + str(max_delay_input_f)) # Start the very first trigger trigger_serial_command() set_voltage() try: while (1): time.sleep(1) except KeyboardInterrupt: state = 0 command = 115 # 0x73 's' ASCII data = (command, state) byte_data = pack_integers_to_bytes(*data) #ser.write(serial.to_bytes(byte_data)) #ser.close() SPD.write('CH1:VOLT ' + str(0.0)) # Send power supply source voltage SPD.write('CH1:CURR ' + str(0.0)) # Send power supply current limit # Disable power supply output source SPD.write('OUTP CH1,OFF') SPD.close()