#!/usr/bin/env python3 """ MIPI TEST APPLICATION - MIPI_TEST.PY - ENTRY POINT OF APPLICATION VERSION: 0.3 AUTHOR: D. RICE 25/03/2026 © 2026 ARRIVE """ import vxi11 import time import sys import requests import threading from datetime import datetime from pathlib import Path import ai_mgmt import analyze_captures import rigol_scope # --- Configuration --- URL = "http://192.168.45.8:5000/display" SCOPE_IP = "192.168.45.4" PSU_IP = "192.168.45.3" MGMT_INTERVAL = 60 # seconds between management runs (set to 3600 for hourly) # --- Capture settings --- # Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit) SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window SIG_POINTS = 500_000 # 500 k pts → ~25 GSa/s # Pass 2 — protocol/frame structure: shows LP↔HS transitions and burst envelope PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window PROTO_POINTS = 500_000 # 500 k pts → 50 MSa/s (enough to see burst structure) # Pass 3 — LP state capture: widens vertical range to show LP-11 (~1.2 V) # Channels reconfigured to 200 mV/div, offset +0.6 V → display spans −0.2 V to 1.4 V. # Saves Ch1 (CLK+) and Ch3 (DAT0+) single-ended so LP-11/LP-00 are distinguishable. # Trigger: falling edge on Ch1 at 0.6 V → catches LP-11 → LP-01 SoT transition. LP_SCALE = 500e-9 # 500 ns/div → 5 µs window LP_POINTS = 200_000 # 200 k pts → ~40 GSa/s LP_V_SCALE = 0.2 # V/div — 8 divs = 1.6 V range LP_V_OFFSET = 0.6 # V — center display at 0.6 V (range −0.2 V to 1.4 V) LP_TRIG_LEVEL = 0.6 # V — midpoint of LP-11 (1.2 V) → LP-01 (0 V) fall DISPLAY_SETTLE_S = 1.0 # seconds to wait after display ON before arming scope DATA_DIR = Path(__file__).parent / "data" test_running = False # controls both worker threads resume_event = threading.Event() # cleared to pause test_worker, set to resume capture_done = threading.Event() # set when a full dual_capture (all 4 files) completes # --- Instrument Connection --- try: psu = vxi11.Instrument(PSU_IP) scope = vxi11.Instrument(SCOPE_IP) scope.timeout = 30 # seconds — needs to cover *RST and image saves psu.timeout = 5 except Exception as e: print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}") sys.exit(1) # Rigol DS1202Z-E for 1.8 V supply monitoring (optional — test continues if unavailable) rigol_scope.connect() # --------------------------------------------------------------------------- def setup_scope(): """Initialises DSO80204B for MIPI DSI signals (~210 MHz).""" print("CONFIGURING SCOPE...") cmds = [ # ── Reset & stop ────────────────────────────────────────────────── "*RST", ":RUN", ":STOP", # ── Channel 1 — Clock D+ ───────────────────────────────────────── ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", # 910Ω + 50Ω divider = 19.2:1 ":CHANnel1:SCALe 0.1", # 100 mV/div ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'", # ── Channel 2 — Clock D- ───────────────────────────────────────── ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", ":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'", # ── Channel 3 — Data Lane 0 D+ ─────────────────────────────────── ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2", ":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'", # ── Channel 4 — Data Lane 0 D- ─────────────────────────────────── ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2", ":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'", # ── Timebase — 5 ns/div shows ~10 cycles of 200 MHz clock ──────── ":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer", # ── Trigger — rising edge on Ch1 (Clock D+) ────────────────────── ":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1", ":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05", # 50 mV post-attenuation ":TRIGger:SWEep NORMal", # ── Acquisition ─────────────────────────────────────────────────── ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000", # ── Display ─────────────────────────────────────────────────────── ":DISPlay:LAYout STACKED", ":RUN", ] for cmd in cmds: scope.write(cmd) time.sleep(0.05) print("CHANNEL SETUP COMPLETE.") setup_math_channels() def setup_math_channels(): """ F1 = Ch1 - Ch2 (clock differential) F2 = Ch3 - Ch4 (lane 0 differential) DSO80204B firmware 05.30.0005 confirmed syntax: :FUNCtion:DISPlay ON :FUNCtion:SUBTract CHANnel,CHANnel """ print("SETTING UP MATH CHANNELS...") scope.write("*CLS") time.sleep(0.2) math_cmds = [ # ── F1 = Ch1 - Ch2 (clock differential) ───────────────────────── ":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2", ":FUNCtion1:RANGe 0.8", # 0.8V range = 100mV/div (range = 8 × scale) ":FUNCtion1:OFFSet 0.0", # ── F2 = Ch3 - Ch4 (lane 0 differential) ───────────────────────── ":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4", ":FUNCtion2:RANGe 0.8", # 0.8V range = 100mV/div ":FUNCtion2:OFFSet 0.0", ] for cmd in math_cmds: scope.write(cmd) time.sleep(0.2) try: time.sleep(1.0) opc = scope.ask("*OPC?") print(f" SCOPE SYNC OK (OPC={opc.strip()})") except Exception as e: print(f" WARNING: OPC SYNC FAILED ({e})") try: err = scope.ask(":SYSTem:ERRor?") if err.strip().startswith("0"): print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.") print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)") else: print(f" SCPI ERROR: {err.strip()}") except Exception as e: print(f" COULD NOT READ ERROR QUEUE ({e})") def _set_timebase(scale, points): """Apply timebase scale and record length, then let the scope settle.""" scope.write(f":TIMebase:SCALe {scale:.3E}") scope.write(f":ACQuire:POINts {points}") time.sleep(0.3) def _arm_and_wait(timeout=20): """ Fire a single acquisition using :DIGitize (blocking on Infiniium) and confirm completion with *OPC?. Temporarily extends scope.timeout to cover the full wait period. Returns True on success, False on error/timeout. """ prev_timeout = scope.timeout try: scope.timeout = timeout + 5 # headroom over the DIGitize wait scope.write(":DIGitize") resp = scope.ask("*OPC?").strip() return resp == "1" except Exception as e: print(f" ACQUIRE ERROR: {e}") return False finally: scope.timeout = prev_timeout def _save_pass(tag, iteration, ts): """ Save F1 (CLK diff) and F2 (DAT diff) as CSV, plus a PNG screenshot. Files land in C:\\TEMP on the scope's local disk. Naming: ___clk.csv (date-first for easy sorting/deletion) """ base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _save_pass_channels(tag, iteration, ts): """ Save Ch1 (CLK+) and Ch3 (DAT0+) as single-ended CSV for LP state analysis. Single-ended is required for LP because differential (F1/F2) cannot distinguish LP-11 (Vcm=1.2 V) from LP-00 (Vcm=0 V) — both give Vdiff=0. """ base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _configure_for_lp(): """ Widen channel vertical scales to capture LP states and switch to a falling-edge trigger to catch the LP-11 → LP-01 SoT transition. """ for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") time.sleep(0.05) # Trigger on DAT0+ (Ch3) — CLK is continuous HS so it never reaches LP-11 (1.2 V). # DAT0 has LP-11 between bursts, so Ch3 falling at 0.6 V catches LP-11 → LP-01. scope.write(":TRIGger:EDGE:SOURce CHANnel3") scope.write(":TRIGger:EDGE:SLOPe NEGative") scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") time.sleep(0.1) def _restore_hs_config(): """Restore HS-mode channel scales, offsets, and trigger after LP capture.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe 0.1") scope.write(f":CHANnel{ch}:OFFSet 0.0") time.sleep(0.05) scope.write(":TRIGger:EDGE:SOURce CHANnel1") scope.write(":TRIGger:EDGE:SLOPe POSitive") scope.write(f":TRIGger:EDGE:LEVel 0.05") time.sleep(0.1) def dual_capture(iteration): """ Two-pass capture per test iteration: Pass 1 — signal quality (SIG_SCALE / SIG_POINTS) Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS) Restores the original 5 ns/div timebase when done. """ capture_done.clear() ts = datetime.now().strftime("%Y%m%d_%H%M%S") print(f"DUAL CAPTURE #{iteration:04d} [{ts}]") # ── Pass 1: signal quality ───────────────────────────────────────────── print(" PASS 1: SIGNAL QUALITY...") _set_timebase(SIG_SCALE, SIG_POINTS) if _arm_and_wait(): _save_pass("sig", iteration, ts) else: print(" SKIPPING PASS 1 SAVE.") # ── Pass 2: frame/protocol structure ────────────────────────────────── print(" PASS 2: FRAME STRUCTURE...") _set_timebase(PROTO_SCALE, PROTO_POINTS) if _arm_and_wait(): _save_pass("proto", iteration, ts) else: print(" SKIPPING PASS 2 SAVE.") # ── Pass 3: LP / SoT structure + 1.8 V supply monitoring ───────────── # Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers # on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended. # Rigol is armed first (non-blocking) so the LP→HS current step droops # the 1.8 V rail and triggers the Rigol while the Agilent captures. print(" PASS 3: LP TRANSITION...") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) if rigol_scope.is_connected(): rigol_scope.arm() # arm Rigol before LP trigger so it catches the droop if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: print(" SKIPPING PASS 3 SAVE.") # Collect Rigol 1.8 V waveform (Agilent save takes ~5 s, Rigol should be done) if rigol_scope.is_connected(): print(" PASS 3: WAITING FOR RIGOL 1.8 V CAPTURE...") if rigol_scope.wait_captured(timeout_s=10.0): DATA_DIR.mkdir(exist_ok=True) v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" n = rigol_scope.read_waveform_csv(v18_path) if n: print(f" SAVED: {v18_path.name} ({n} samples)") else: print(" RIGOL: Waveform read returned 0 samples.") else: print(" RIGOL: Timed out waiting for capture.") _restore_hs_config() # ── Restore original timebase ───────────────────────────────────────── _set_timebase(5e-9, 500_000) scope.write(":RUN") capture_done.set() def mgmt_worker(): """ Fires every MGMT_INTERVAL seconds while a test is running: - Pauses test_worker via resume_event - Runs ai_mgmt CSV scan - Resumes test_worker """ while test_running: time.sleep(MGMT_INTERVAL) if not test_running: break print("\n[MGMT] WAITING FOR CURRENT CAPTURE TO COMPLETE...") capture_done.wait() print("[MGMT] PAUSING TEST — RUNNING MANAGEMENT TASKS...") resume_event.clear() try: copied, failed = ai_mgmt.transfer_csv_files() print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.") if copied > 0: try: analyze_captures.run_analysis() except Exception as e: print(f"[MGMT] ANALYSIS ERROR: {e}") except Exception as e: print(f"[MGMT] TRANSFER ERROR: {e}") finally: resume_event.set() print("[MGMT] RESUMING TEST.\n") def test_worker(): """ Background loop: - Waits if mgmt_worker has paused via resume_event - Turns display ON, waits DISPLAY_SETTLE_S for it to stabilise - Runs dual_capture (signal quality + frame structure) - Turns display OFF for 1 second - Repeats until test_running = False """ global test_running count = 1 while test_running: resume_event.wait() # block here while mgmt_worker is running if not test_running: break try: requests.put(URL, json={"state": "on"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display ON failed: {e}") time.sleep(DISPLAY_SETTLE_S) dual_capture(count) count += 1 try: requests.put(URL, json={"state": "off"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display OFF failed: {e}") time.sleep(1.0) def main_menu(): global test_running while True: print("\n===== MIPI TEST CONTROL =====") print("1. RUN IDN CHECK (PSU & SCOPE)") print("2. SETUP SCOPE (RUN FIRST)") print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") print("5. START TEST & CAPTURE") print("6. STOP TEST") print("7. EXIT") choice = input("\nSELECT OPTION (1-7): ").strip() if choice == '1': print(f"PSU : {psu.ask('*IDN?').strip()}") print(f"SCOPE: {scope.ask('*IDN?').strip()}") if rigol_scope.is_connected(): print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}") else: print("RIGOL: NOT CONNECTED") elif choice == '2': setup_scope() if rigol_scope.is_connected(): rigol_scope.configure() elif choice == '3': psu.write('CH1:VOLT 24.0') psu.write('CH1:CURR 1.5') print("PSU CONFIGURED: 24V / 1.5A") elif choice == '4': state = input("TYPE 'ON' OR 'OFF': ").strip().upper() if state in ('ON', 'OFF'): psu.write(f'OUTP CH1,{state}') print(f"PSU OUTPUT {state}.") else: print("INVALID — TYPE 'ON' OR 'OFF'.") elif choice == '5': if not test_running: test_running = True resume_event.set() threading.Thread(target=test_worker, daemon=True).start() threading.Thread(target=mgmt_worker, daemon=True).start() print(f"TEST STARTED. MANAGEMENT INTERVAL: {MGMT_INTERVAL}s.") else: print("TEST IS ALREADY RUNNING!") elif choice == '6': print("STOPPING TEST...") test_running = False elif choice == '7': test_running = False psu.close() scope.close() rigol_scope.disconnect() print("INSTRUMENTS CLOSED. BYE.") break else: print("INVALID ENTRY. PLEASE CHOOSE 1-7.") if __name__ == "__main__": main_menu()