Files
MiPi_TEST/mipi_test.py
david rice 15dc295ae1 updated ai
2026-04-08 14:19:31 +01:00

436 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
MIPI TEST APPLICATION - MIPI_TEST.PY
- ENTRY POINT OF APPLICATION
VERSION: 0.3
AUTHOR: D. RICE 25/03/2026
© 2026 ARRIVE
"""
import vxi11
import time
import sys
import requests
import threading
from datetime import datetime
import ai_mgmt
import analyze_captures
# --- Configuration ---
URL = "http://192.168.45.8:5000/display"
SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3"
MGMT_INTERVAL = 60 # seconds between management runs (set to 3600 for hourly)
# --- Capture settings ---
# Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit)
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
SIG_POINTS = 500_000 # 500 k pts → ~25 GSa/s
# Pass 2 — protocol/frame structure: shows LP↔HS transitions and burst envelope
PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window
PROTO_POINTS = 500_000 # 500 k pts → 50 MSa/s (enough to see burst structure)
# Pass 3 — LP state capture: widens vertical range to show LP-11 (~1.2 V)
# Channels reconfigured to 200 mV/div, offset +0.6 V → display spans 0.2 V to 1.4 V.
# Saves Ch1 (CLK+) and Ch3 (DAT0+) single-ended so LP-11/LP-00 are distinguishable.
# Trigger: falling edge on Ch1 at 0.6 V → catches LP-11 → LP-01 SoT transition.
LP_SCALE = 500e-9 # 500 ns/div → 5 µs window
LP_POINTS = 200_000 # 200 k pts → ~40 GSa/s
LP_V_SCALE = 0.2 # V/div — 8 divs = 1.6 V range
LP_V_OFFSET = 0.6 # V — center display at 0.6 V (range 0.2 V to 1.4 V)
LP_TRIG_LEVEL = 0.6 # V — midpoint of LP-11 (1.2 V) → LP-01 (0 V) fall
DISPLAY_SETTLE_S = 1.0 # seconds to wait after display ON before arming scope
test_running = False # controls both worker threads
resume_event = threading.Event() # cleared to pause test_worker, set to resume
capture_done = threading.Event() # set when a full dual_capture (all 4 files) completes
# --- Instrument Connection ---
try:
psu = vxi11.Instrument(PSU_IP)
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30 # seconds — needs to cover *RST and image saves
psu.timeout = 5
except Exception as e:
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
sys.exit(1)
# ---------------------------------------------------------------------------
def setup_scope():
"""Initialises DSO80204B for MIPI DSI signals (~210 MHz)."""
print("CONFIGURING SCOPE...")
cmds = [
# ── Reset & stop ──────────────────────────────────────────────────
"*RST",
":RUN",
":STOP",
# ── Channel 1 — Clock D+ ─────────────────────────────────────────
":CHANnel1:DISPlay ON",
":CHANnel1:INPut DC50",
":CHANnel1:PROBe 19.2", # 910Ω + 50Ω divider = 19.2:1
":CHANnel1:SCALe 0.1", # 100 mV/div
":CHANnel1:OFFSet 0.0",
":CHANnel1:LABel 'CLK+'",
# ── Channel 2 — Clock D- ─────────────────────────────────────────
":CHANnel2:DISPlay ON",
":CHANnel2:INPut DC50",
":CHANnel2:PROBe 19.2",
":CHANnel2:SCALe 0.1",
":CHANnel2:OFFSet 0.0",
":CHANnel2:LABel 'CLK-'",
# ── Channel 3 — Data Lane 0 D+ ───────────────────────────────────
":CHANnel3:DISPlay ON",
":CHANnel3:INPut DC50",
":CHANnel3:PROBe 19.2",
":CHANnel3:SCALe 0.1",
":CHANnel3:OFFSet 0.0",
":CHANnel3:LABel 'DAT0+'",
# ── Channel 4 — Data Lane 0 D- ───────────────────────────────────
":CHANnel4:DISPlay ON",
":CHANnel4:INPut DC50",
":CHANnel4:PROBe 19.2",
":CHANnel4:SCALe 0.1",
":CHANnel4:OFFSet 0.0",
":CHANnel4:LABel 'DAT0-'",
# ── Timebase — 5 ns/div shows ~10 cycles of 200 MHz clock ────────
":TIMebase:SCALe 5E-9",
":TIMebase:POSition 0",
":TIMebase:REFerence CENTer",
# ── Trigger — rising edge on Ch1 (Clock D+) ──────────────────────
":TRIGger:MODE EDGE",
":TRIGger:EDGE:SOURce CHANnel1",
":TRIGger:EDGE:SLOPe POSitive",
":TRIGger:EDGE:LEVel 0.05", # 50 mV post-attenuation
":TRIGger:SWEep NORMal",
# ── Acquisition ───────────────────────────────────────────────────
":ACQuire:MODE RTIMe",
":ACQuire:INTerpolate ON",
":ACQuire:POINts 500000",
# ── Display ───────────────────────────────────────────────────────
":DISPlay:LAYout STACKED",
":RUN",
]
for cmd in cmds:
scope.write(cmd)
time.sleep(0.05)
print("CHANNEL SETUP COMPLETE.")
setup_math_channels()
def setup_math_channels():
"""
F1 = Ch1 - Ch2 (clock differential)
F2 = Ch3 - Ch4 (lane 0 differential)
DSO80204B firmware 05.30.0005 confirmed syntax:
:FUNCtion<n>:DISPlay ON
:FUNCtion<n>:SUBTract CHANnel<a>,CHANnel<b>
"""
print("SETTING UP MATH CHANNELS...")
scope.write("*CLS")
time.sleep(0.2)
math_cmds = [
# ── F1 = Ch1 - Ch2 (clock differential) ─────────────────────────
":FUNCtion1:DISPlay ON",
":FUNCtion1:SUBTract CHANnel1,CHANnel2",
":FUNCtion1:RANGe 0.8", # 0.8V range = 100mV/div (range = 8 × scale)
":FUNCtion1:OFFSet 0.0",
# ── F2 = Ch3 - Ch4 (lane 0 differential) ─────────────────────────
":FUNCtion2:DISPlay ON",
":FUNCtion2:SUBTract CHANnel3,CHANnel4",
":FUNCtion2:RANGe 0.8", # 0.8V range = 100mV/div
":FUNCtion2:OFFSet 0.0",
]
for cmd in math_cmds:
scope.write(cmd)
time.sleep(0.2)
try:
time.sleep(1.0)
opc = scope.ask("*OPC?")
print(f" SCOPE SYNC OK (OPC={opc.strip()})")
except Exception as e:
print(f" WARNING: OPC SYNC FAILED ({e})")
try:
err = scope.ask(":SYSTem:ERRor?")
if err.strip().startswith("0"):
print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.")
print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)")
else:
print(f" SCPI ERROR: {err.strip()}")
except Exception as e:
print(f" COULD NOT READ ERROR QUEUE ({e})")
def _set_timebase(scale, points):
"""Apply timebase scale and record length, then let the scope settle."""
scope.write(f":TIMebase:SCALe {scale:.3E}")
scope.write(f":ACQuire:POINts {points}")
time.sleep(0.3)
def _arm_and_wait(timeout=20):
"""
Fire a single acquisition using :DIGitize (blocking on Infiniium) and
confirm completion with *OPC?. Temporarily extends scope.timeout to
cover the full wait period.
Returns True on success, False on error/timeout.
"""
prev_timeout = scope.timeout
try:
scope.timeout = timeout + 5 # headroom over the DIGitize wait
scope.write(":DIGitize")
resp = scope.ask("*OPC?").strip()
return resp == "1"
except Exception as e:
print(f" ACQUIRE ERROR: {e}")
return False
finally:
scope.timeout = prev_timeout
def _save_pass(tag, iteration, ts):
"""
Save F1 (CLK diff) and F2 (DAT diff) as CSV, plus a PNG screenshot.
Files land in C:\\TEMP on the scope's local disk.
Naming: <YYYYMMDD_HHMMSS>_<tag>_<iter>_clk.csv (date-first for easy sorting/deletion)
"""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _save_pass_channels(tag, iteration, ts):
"""
Save Ch1 (CLK+) and Ch3 (DAT0+) as single-ended CSV for LP state analysis.
Single-ended is required for LP because differential (F1/F2) cannot distinguish
LP-11 (Vcm=1.2 V) from LP-00 (Vcm=0 V) — both give Vdiff=0.
"""
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
try:
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV')
time.sleep(2.5)
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV')
time.sleep(2.5)
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
except Exception as e:
print(f" SAVE ERROR ({tag}): {e}")
def _configure_for_lp():
"""
Widen channel vertical scales to capture LP states and switch to a
falling-edge trigger to catch the LP-11 → LP-01 SoT transition.
"""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
time.sleep(0.05)
# Trigger on DAT0+ (Ch3) — CLK is continuous HS so it never reaches LP-11 (1.2 V).
# DAT0 has LP-11 between bursts, so Ch3 falling at 0.6 V catches LP-11 → LP-01.
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
time.sleep(0.1)
def _restore_hs_config():
"""Restore HS-mode channel scales, offsets, and trigger after LP capture."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe 0.1")
scope.write(f":CHANnel{ch}:OFFSet 0.0")
time.sleep(0.05)
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
scope.write(":TRIGger:EDGE:SLOPe POSitive")
scope.write(f":TRIGger:EDGE:LEVel 0.05")
time.sleep(0.1)
def dual_capture(iteration):
"""
Two-pass capture per test iteration:
Pass 1 — signal quality (SIG_SCALE / SIG_POINTS)
Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS)
Restores the original 5 ns/div timebase when done.
"""
capture_done.clear()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"DUAL CAPTURE #{iteration:04d} [{ts}]")
# ── Pass 1: signal quality ─────────────────────────────────────────────
print(" PASS 1: SIGNAL QUALITY...")
_set_timebase(SIG_SCALE, SIG_POINTS)
if _arm_and_wait():
_save_pass("sig", iteration, ts)
else:
print(" SKIPPING PASS 1 SAVE.")
# ── Pass 2: frame/protocol structure ──────────────────────────────────
print(" PASS 2: FRAME STRUCTURE...")
_set_timebase(PROTO_SCALE, PROTO_POINTS)
if _arm_and_wait():
_save_pass("proto", iteration, ts)
else:
print(" SKIPPING PASS 2 SAVE.")
# ── Pass 3: LP / SoT structure ────────────────────────────────────────
# Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers
# on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended.
print(" PASS 3: LP TRANSITION...")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if _arm_and_wait(timeout=30):
_save_pass_channels("lp", iteration, ts)
else:
print(" SKIPPING PASS 3 SAVE.")
_restore_hs_config()
# ── Restore original timebase ─────────────────────────────────────────
_set_timebase(5e-9, 500_000)
scope.write(":RUN")
capture_done.set()
def mgmt_worker():
"""
Fires every MGMT_INTERVAL seconds while a test is running:
- Pauses test_worker via resume_event
- Runs ai_mgmt CSV scan
- Resumes test_worker
"""
while test_running:
time.sleep(MGMT_INTERVAL)
if not test_running:
break
print("\n[MGMT] WAITING FOR CURRENT CAPTURE TO COMPLETE...")
capture_done.wait()
print("[MGMT] PAUSING TEST — RUNNING MANAGEMENT TASKS...")
resume_event.clear()
try:
copied, failed = ai_mgmt.transfer_csv_files()
print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.")
if copied > 0:
try:
analyze_captures.run_analysis()
except Exception as e:
print(f"[MGMT] ANALYSIS ERROR: {e}")
except Exception as e:
print(f"[MGMT] TRANSFER ERROR: {e}")
finally:
resume_event.set()
print("[MGMT] RESUMING TEST.\n")
def test_worker():
"""
Background loop:
- Waits if mgmt_worker has paused via resume_event
- Turns display ON, waits DISPLAY_SETTLE_S for it to stabilise
- Runs dual_capture (signal quality + frame structure)
- Turns display OFF for 1 second
- Repeats until test_running = False
"""
global test_running
count = 1
while test_running:
resume_event.wait() # block here while mgmt_worker is running
if not test_running:
break
requests.put(URL, json={"state": "on"}, timeout=2)
time.sleep(DISPLAY_SETTLE_S)
dual_capture(count)
count += 1
requests.put(URL, json={"state": "off"}, timeout=2)
time.sleep(1.0)
def main_menu():
global test_running
while True:
print("\n===== MIPI TEST CONTROL =====")
print("1. RUN IDN CHECK (PSU & SCOPE)")
print("2. SETUP SCOPE (RUN FIRST)")
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
print("4. PSU OUTPUT ON/OFF (CH1)")
print("5. START TEST & CAPTURE")
print("6. STOP TEST")
print("7. EXIT")
choice = input("\nSELECT OPTION (1-7): ").strip()
if choice == '1':
print(f"PSU : {psu.ask('*IDN?').strip()}")
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
elif choice == '2':
setup_scope()
elif choice == '3':
psu.write('CH1:VOLT 24.0')
psu.write('CH1:CURR 1.5')
print("PSU CONFIGURED: 24V / 1.5A")
elif choice == '4':
state = input("TYPE 'ON' OR 'OFF': ").strip().upper()
if state in ('ON', 'OFF'):
psu.write(f'OUTP CH1,{state}')
print(f"PSU OUTPUT {state}.")
else:
print("INVALID — TYPE 'ON' OR 'OFF'.")
elif choice == '5':
if not test_running:
test_running = True
resume_event.set()
threading.Thread(target=test_worker, daemon=True).start()
threading.Thread(target=mgmt_worker, daemon=True).start()
print(f"TEST STARTED. MANAGEMENT INTERVAL: {MGMT_INTERVAL}s.")
else:
print("TEST IS ALREADY RUNNING!")
elif choice == '6':
print("STOPPING TEST...")
test_running = False
elif choice == '7':
test_running = False
psu.close()
scope.close()
print("INSTRUMENTS CLOSED. BYE.")
break
else:
print("INVALID ENTRY. PLEASE CHOOSE 1-7.")
if __name__ == "__main__":
main_menu()