191 lines
5.4 KiB
Python
191 lines
5.4 KiB
Python
import serial
|
|
import struct
|
|
import random
|
|
import threading
|
|
import time
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import pyvisa
|
|
import vxi11
|
|
|
|
# Initial state
|
|
toggle_state = 0
|
|
|
|
ser = serial.Serial(port='/dev/ttyACM0', baudrate=115200)
|
|
|
|
VISA_ADDRESS = "TCPIP::192.168.45.3::INSTR"
|
|
|
|
rm = pyvisa.ResourceManager()
|
|
|
|
SPD = rm.open_resource(
|
|
VISA_ADDRESS,
|
|
write_termination='\n',
|
|
read_termination='\n'
|
|
)
|
|
|
|
class DS1202ZE(vxi11.Instrument):
|
|
def __init__(self, host, *args, **kwargs):
|
|
super(DS1202ZE, self).__init__(host, *args, **kwargs)
|
|
def get_identification(self):
|
|
return self.ask("*IDN?")
|
|
def get_vavg_channel1(self):
|
|
return self.ask(":MEAS:ITEM? VAVG, CHAN1\n")
|
|
def get_vavg_channel2(self):
|
|
return self.ask(":MEAS:ITEM? VAVG, CHAN2\n")
|
|
|
|
class Tee(object):
|
|
def __init__(self, filename):
|
|
self.terminal = sys.stdout
|
|
self.log = open(filename, "w") # Use "a" to append
|
|
|
|
def write(self, message):
|
|
self.terminal.write(message)
|
|
self.log.write(message)
|
|
|
|
def flush(self):
|
|
# This flush method is needed for python 3 compatibility.
|
|
self.terminal.flush()
|
|
self.log.flush()
|
|
|
|
def pack_integers_to_bytes(*integers: int) -> bytes:
|
|
# Setup Header (Sync bytes)
|
|
sync_bytes = [0x41, 0x52]
|
|
|
|
# Process Data
|
|
# Mask each input to 8-bit to prevent packing errors
|
|
data_bytes = [i & 0xFF for i in integers]
|
|
|
|
# Get length
|
|
data_length = [len(data_bytes) & 0xFF]
|
|
|
|
# Calculate Checksum
|
|
# Summing all data bytes, then applying bitwise NOT
|
|
total_sum = sum(data_bytes)
|
|
rx_checksum = ~total_sum
|
|
|
|
rx_checksum_h = (rx_checksum >> 8) & 0xFF
|
|
rx_checksum_l = rx_checksum & 0xFF
|
|
checksum_bytes = [rx_checksum_h, rx_checksum_l]
|
|
|
|
# Combine all components
|
|
full_packet = sync_bytes + data_length + data_bytes + checksum_bytes
|
|
|
|
# Dynamically create format string
|
|
# '!' = Network order, 'B' = unsigned char
|
|
# We need as many 'B's as there are elements in full_packet
|
|
format_string = f"!{len(full_packet)}B"
|
|
|
|
return struct.pack(format_string, *full_packet)
|
|
|
|
def set_voltage():
|
|
global SPD
|
|
|
|
voltage = random.randint(9, 30)
|
|
voltage_f = float(voltage)
|
|
|
|
SPD.write('CH1:VOLT ' + str(voltage_f)) # Send power supply source voltage
|
|
|
|
print(f"[{time.strftime('%H:%M:%S')}] VOLTAGE SET TO: " + str(voltage) + "V")
|
|
|
|
# Schedule the NEXT random trigger
|
|
# random.uniform(60, 360) provides the random delay in seconds
|
|
delay = random.uniform(60, 360)
|
|
|
|
# Create and start a new timer thread
|
|
timer = threading.Timer(delay, set_voltage)
|
|
timer.start()
|
|
|
|
def trigger_serial_command():
|
|
global toggle_state
|
|
global ser
|
|
global min_delay_input_f
|
|
global max_delay_input_f
|
|
|
|
toggle_state = 1 - toggle_state
|
|
|
|
if toggle_state == 0:
|
|
state = 0
|
|
|
|
print(f"[{time.strftime('%H:%M:%S')}] SWITCH SET TO: OFF")
|
|
|
|
else:
|
|
state = 1
|
|
|
|
print(f"[{time.strftime('%H:%M:%S')}] SWITCH SET TO: ON")
|
|
|
|
# Serial logic
|
|
print(f"[{time.strftime('%H:%M:%S')}] SENDING SERIAL COMMAND...")
|
|
|
|
command = 115 # 0x73 's' ASCII
|
|
|
|
data = (command, state)
|
|
byte_data = pack_integers_to_bytes(*data)
|
|
ser.write(serial.to_bytes(byte_data))
|
|
|
|
# Schedule the NEXT random trigger
|
|
# random.uniform(0.1, 100.0) provides the random delay in seconds
|
|
delay = random.uniform(min_delay_input_f, max_delay_input_f)
|
|
print(f"[{time.strftime('%H:%M:%S')}] NEXT COMMAND IN {delay:.2f} SECONDS...")
|
|
|
|
# Create and start a new timer thread
|
|
timer = threading.Timer(delay, trigger_serial_command)
|
|
timer.start()
|
|
|
|
# Run main
|
|
if __name__ == '__main__':
|
|
now = datetime.now()
|
|
|
|
# Format as string: YYYY-MM-DD HH:MM:SS
|
|
timestamp_str = now.strftime("%Y_%m_%d_%H_%M_%S")
|
|
build_path = "~/Python/Automotive-Power-Simulator-App/LOGS/" + timestamp_str + ".txt"
|
|
log_file = Path(build_path).expanduser()
|
|
sys.stdout = Tee(log_file)
|
|
print(f"[{time.strftime('%H:%M:%S')}] LOGGING TO: " + str(log_file))
|
|
|
|
SPD.write('CH1:VOLT ' + str(24.0)) # Send power supply source voltage
|
|
SPD.write('CH1:CURR ' + str(1.5)) # Send power supply current limit
|
|
|
|
# Enable power supply output source
|
|
SPD.write('OUTP CH1,ON') # Enable supply output
|
|
|
|
# Get time constants
|
|
print (f"[{time.strftime('%H:%M:%S')}] PLEASE ENTER A MINIMUM DELAY VALUE...")
|
|
|
|
min_delay_input = input()
|
|
min_delay_input_f = float(min_delay_input)
|
|
|
|
print(f"[{time.strftime('%H:%M:%S')}] MINIMUM TIME DELAY: " + str(min_delay_input_f))
|
|
|
|
# Get time constants
|
|
print (f"[{time.strftime('%H:%M:%S')}] PLEASE ENTER A MAXIMUM DELAY VALUE...")
|
|
|
|
max_delay_input = input()
|
|
max_delay_input_f = float(max_delay_input)
|
|
|
|
print(f"[{time.strftime('%H:%M:%S')}] MAXIMUM TIME DELAY: " + str(max_delay_input_f))
|
|
|
|
# Start the very first trigger
|
|
trigger_serial_command()
|
|
set_voltage()
|
|
|
|
try:
|
|
while (1):
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
state = 0
|
|
command = 115 # 0x73 's' ASCII
|
|
|
|
data = (command, state)
|
|
byte_data = pack_integers_to_bytes(*data)
|
|
ser.write(serial.to_bytes(byte_data))
|
|
|
|
ser.close()
|
|
|
|
SPD.write('CH1:VOLT ' + str(0.0)) # Send power supply source voltage
|
|
SPD.write('CH1:CURR ' + str(0.0)) # Send power supply current limit
|
|
|
|
# Disable power supply output source
|
|
SPD.write('OUTP CH1,OFF')
|
|
SPD.close() |