511 lines
19 KiB
Python
511 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
MIPI TEST APPLICATION - MIPI_TEST.PY
|
||
- ENTRY POINT OF APPLICATION
|
||
|
||
VERSION: 0.3
|
||
AUTHOR: D. RICE 25/03/2026
|
||
© 2026 ARRIVE
|
||
"""
|
||
import json
|
||
import vxi11
|
||
import time
|
||
import sys
|
||
import requests
|
||
import threading
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
import ai_mgmt
|
||
import analyze_captures
|
||
import rigol_scope
|
||
|
||
# --- Configuration ---
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
URL = f"{DEVICE_BASE}/display"
|
||
SCOPE_IP = "192.168.45.4"
|
||
PSU_IP = "192.168.45.3"
|
||
MGMT_INTERVAL = 3600 # seconds between management runs (3600 = 1 hour)
|
||
|
||
# --- Capture settings ---
|
||
# Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit)
|
||
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
|
||
SIG_POINTS = 500_000 # 500 k pts → ~25 GSa/s
|
||
|
||
# Pass 2 — protocol/frame structure: shows LP↔HS transitions and burst envelope
|
||
# 1280×800 24bpp 4-lane: full HS burst ≈ 18 µs. 4 µs/div → 40 µs window captures
|
||
# the complete line payload for DSI packet decode. 500 k pts @ 40 µs = 80 ps/sample
|
||
# (~28 samples/bit at 430 Mbps) — adequate for bit-level decoding.
|
||
PROTO_SCALE = 4e-6 # 4 µs/div → 40 µs window (was 1 µs/div)
|
||
PROTO_POINTS = 500_000 # 500 k pts
|
||
|
||
# Pass 3 — LP state capture: widens vertical range to show LP-11 (~1.2 V)
|
||
# Channels reconfigured to 200 mV/div, offset +0.6 V → display spans −0.2 V to 1.4 V.
|
||
# Saves Ch1 (CLK+) and Ch3 (DAT0+) single-ended so LP-11/LP-00 are distinguishable.
|
||
# Trigger: falling edge on Ch1 at 0.6 V → catches LP-11 → LP-01 SoT transition.
|
||
LP_SCALE = 500e-9 # 500 ns/div → 5 µs window
|
||
LP_POINTS = 200_000 # 200 k pts → ~40 GSa/s
|
||
LP_V_SCALE = 0.2 # V/div — 8 divs = 1.6 V range
|
||
LP_V_OFFSET = 0.6 # V — center display at 0.6 V (range −0.2 V to 1.4 V)
|
||
LP_TRIG_LEVEL = 0.6 # V — midpoint of LP-11 (1.2 V) → LP-01 (0 V) fall
|
||
|
||
DISPLAY_SETTLE_S = 1.0 # seconds to wait after display ON before arming scope
|
||
DATA_DIR = Path(__file__).parent / "data"
|
||
|
||
test_running = False # controls both worker threads
|
||
resume_event = threading.Event() # cleared to pause test_worker, set to resume
|
||
capture_done = threading.Event() # set when a full dual_capture (all 4 files) completes
|
||
|
||
# --- Instrument Connection ---
|
||
try:
|
||
psu = vxi11.Instrument(PSU_IP)
|
||
scope = vxi11.Instrument(SCOPE_IP)
|
||
scope.timeout = 30 # seconds — needs to cover *RST and image saves
|
||
psu.timeout = 5
|
||
except Exception as e:
|
||
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
|
||
sys.exit(1)
|
||
|
||
# Rigol DS1202Z-E for 1.8 V supply monitoring (optional — test continues if unavailable)
|
||
rigol_scope.connect()
|
||
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def setup_scope():
|
||
"""Initialises for MIPI DSI signals (~210 MHz)."""
|
||
print("CONFIGURING SCOPE...")
|
||
|
||
cmds = [
|
||
# ── Reset & stop ──────────────────────────────────────────────────
|
||
"*RST",
|
||
":RUN",
|
||
":STOP",
|
||
|
||
# ── Channel 1 — Clock D+ ─────────────────────────────────────────
|
||
":CHANnel1:DISPlay ON",
|
||
":CHANnel1:INPut DC50",
|
||
":CHANnel1:PROBe 19.2", # 910Ω + 50Ω divider = 19.2:1
|
||
":CHANnel1:SCALe 0.1", # 100 mV/div
|
||
":CHANnel1:OFFSet 0.0",
|
||
":CHANnel1:LABel 'CLK+'",
|
||
|
||
# ── Channel 2 — Clock D- ─────────────────────────────────────────
|
||
":CHANnel2:DISPlay ON",
|
||
":CHANnel2:INPut DC50",
|
||
":CHANnel2:PROBe 19.2",
|
||
":CHANnel2:SCALe 0.1",
|
||
":CHANnel2:OFFSet 0.0",
|
||
":CHANnel2:LABel 'CLK-'",
|
||
|
||
# ── Channel 3 — Data Lane 0 D+ ───────────────────────────────────
|
||
":CHANnel3:DISPlay ON",
|
||
":CHANnel3:INPut DC50",
|
||
":CHANnel3:PROBe 19.2",
|
||
":CHANnel3:SCALe 0.1",
|
||
":CHANnel3:OFFSet 0.0",
|
||
":CHANnel3:LABel 'DAT0+'",
|
||
|
||
# ── Channel 4 — Data Lane 0 D- ───────────────────────────────────
|
||
":CHANnel4:DISPlay ON",
|
||
":CHANnel4:INPut DC50",
|
||
":CHANnel4:PROBe 19.2",
|
||
":CHANnel4:SCALe 0.1",
|
||
":CHANnel4:OFFSet 0.0",
|
||
":CHANnel4:LABel 'DAT0-'",
|
||
|
||
# ── Timebase — 5 ns/div shows ~10 cycles of 200 MHz clock ────────
|
||
":TIMebase:SCALe 5E-9",
|
||
":TIMebase:POSition 0",
|
||
":TIMebase:REFerence CENTer",
|
||
|
||
# ── Trigger — rising edge on Ch1 (Clock D+) ──────────────────────
|
||
":TRIGger:MODE EDGE",
|
||
":TRIGger:EDGE:SOURce CHANnel1",
|
||
":TRIGger:EDGE:SLOPe POSitive",
|
||
":TRIGger:EDGE:LEVel 0.05", # 50 mV post-attenuation
|
||
":TRIGger:SWEep NORMal",
|
||
|
||
# ── Acquisition ───────────────────────────────────────────────────
|
||
":ACQuire:MODE RTIMe",
|
||
":ACQuire:INTerpolate ON",
|
||
":ACQuire:POINts 500000",
|
||
|
||
# ── Display ───────────────────────────────────────────────────────
|
||
":DISPlay:LAYout STACKED",
|
||
|
||
":RUN",
|
||
]
|
||
|
||
for cmd in cmds:
|
||
scope.write(cmd)
|
||
time.sleep(0.05)
|
||
|
||
print("CHANNEL SETUP COMPLETE.")
|
||
setup_math_channels()
|
||
|
||
|
||
def setup_math_channels():
|
||
"""
|
||
F1 = Ch1 - Ch2 (clock differential)
|
||
F2 = Ch3 - Ch4 (lane 0 differential)
|
||
|
||
DSO80204B firmware 05.30.0005 confirmed syntax:
|
||
:FUNCtion<n>:DISPlay ON
|
||
:FUNCtion<n>:SUBTract CHANnel<a>,CHANnel<b>
|
||
"""
|
||
print("SETTING UP MATH CHANNELS...")
|
||
|
||
scope.write("*CLS")
|
||
time.sleep(0.2)
|
||
|
||
math_cmds = [
|
||
# ── F1 = Ch1 - Ch2 (clock differential) ─────────────────────────
|
||
":FUNCtion1:DISPlay ON",
|
||
":FUNCtion1:SUBTract CHANnel1,CHANnel2",
|
||
":FUNCtion1:RANGe 0.8", # 0.8V range = 100mV/div (range = 8 × scale)
|
||
":FUNCtion1:OFFSet 0.0",
|
||
|
||
# ── F2 = Ch3 - Ch4 (lane 0 differential) ─────────────────────────
|
||
":FUNCtion2:DISPlay ON",
|
||
":FUNCtion2:SUBTract CHANnel3,CHANnel4",
|
||
":FUNCtion2:RANGe 0.8", # 0.8V range = 100mV/div
|
||
":FUNCtion2:OFFSet 0.0",
|
||
]
|
||
|
||
for cmd in math_cmds:
|
||
scope.write(cmd)
|
||
time.sleep(0.2)
|
||
|
||
try:
|
||
time.sleep(1.0)
|
||
opc = scope.ask("*OPC?")
|
||
print(f" SCOPE SYNC OK (OPC={opc.strip()})")
|
||
except Exception as e:
|
||
print(f" WARNING: OPC SYNC FAILED ({e})")
|
||
|
||
try:
|
||
err = scope.ask(":SYSTem:ERRor?")
|
||
if err.strip().startswith("0"):
|
||
print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.")
|
||
print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)")
|
||
else:
|
||
print(f" SCPI ERROR: {err.strip()}")
|
||
except Exception as e:
|
||
print(f" COULD NOT READ ERROR QUEUE ({e})")
|
||
|
||
|
||
def _set_timebase(scale, points):
|
||
"""Apply timebase scale and record length, then let the scope settle."""
|
||
scope.write(f":TIMebase:SCALe {scale:.3E}")
|
||
scope.write(f":ACQuire:POINts {points}")
|
||
time.sleep(0.3)
|
||
|
||
|
||
def _arm_and_wait(timeout=20):
|
||
"""
|
||
Fire a single acquisition using :DIGitize (blocking on Infiniium) and
|
||
confirm completion with *OPC?. Temporarily extends scope.timeout to
|
||
cover the full wait period.
|
||
Returns True on success, False on error/timeout.
|
||
"""
|
||
prev_timeout = scope.timeout
|
||
try:
|
||
scope.timeout = timeout + 5 # headroom over the DIGitize wait
|
||
scope.write(":DIGitize")
|
||
resp = scope.ask("*OPC?").strip()
|
||
return resp == "1"
|
||
except Exception as e:
|
||
print(f" ACQUIRE ERROR: {e}")
|
||
return False
|
||
finally:
|
||
scope.timeout = prev_timeout
|
||
|
||
|
||
def _save_pass(tag, iteration, ts):
|
||
"""
|
||
Save F1 (CLK diff) and F2 (DAT diff) as CSV, plus a PNG screenshot.
|
||
Files land in C:\\TEMP on the scope's local disk.
|
||
Naming: <YYYYMMDD_HHMMSS>_<tag>_<iter>_clk.csv (date-first for easy sorting/deletion)
|
||
"""
|
||
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
|
||
try:
|
||
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
|
||
time.sleep(2.5)
|
||
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
|
||
time.sleep(2.5)
|
||
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
|
||
except Exception as e:
|
||
print(f" SAVE ERROR ({tag}): {e}")
|
||
|
||
|
||
def _save_pass_channels(tag, iteration, ts):
|
||
"""
|
||
Save Ch1 (CLK+) and Ch3 (DAT0+) as single-ended CSV for LP state analysis.
|
||
Single-ended is required for LP because differential (F1/F2) cannot distinguish
|
||
LP-11 (Vcm=1.2 V) from LP-00 (Vcm=0 V) — both give Vdiff=0.
|
||
"""
|
||
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
|
||
try:
|
||
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV')
|
||
time.sleep(2.5)
|
||
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV')
|
||
time.sleep(2.5)
|
||
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
|
||
except Exception as e:
|
||
print(f" SAVE ERROR ({tag}): {e}")
|
||
|
||
|
||
def _configure_for_lp():
|
||
"""
|
||
Widen channel vertical scales to capture LP states and switch to a
|
||
falling-edge trigger to catch the LP-11 → LP-01 SoT transition.
|
||
"""
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
|
||
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
|
||
time.sleep(0.05)
|
||
# Trigger on DAT0+ (Ch3) — CLK is continuous HS so it never reaches LP-11 (1.2 V).
|
||
# DAT0 has LP-11 between bursts, so Ch3 falling at 0.6 V catches LP-11 → LP-01.
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
|
||
time.sleep(0.1)
|
||
|
||
|
||
def _restore_hs_config():
|
||
"""Restore HS-mode channel scales, offsets, and trigger after LP capture."""
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe 0.1")
|
||
scope.write(f":CHANnel{ch}:OFFSet 0.0")
|
||
time.sleep(0.05)
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
|
||
scope.write(":TRIGger:EDGE:SLOPe POSitive")
|
||
scope.write(f":TRIGger:EDGE:LEVel 0.05")
|
||
time.sleep(0.1)
|
||
|
||
|
||
def _fetch_registers(ts: str, iteration: int) -> None:
|
||
"""
|
||
GET /registers from the device Flask server and save to data/ as JSON.
|
||
Reads MIPI DSI PHY timing registers via memtool on the target.
|
||
Non-fatal — a failed fetch prints a warning and returns without crashing.
|
||
"""
|
||
try:
|
||
resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if data.get("errors"):
|
||
print(f" REGISTERS: device warnings — {data['errors']}")
|
||
DATA_DIR.mkdir(exist_ok=True)
|
||
reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
|
||
reg_path.write_text(json.dumps(data, indent=2))
|
||
n = len(data.get("registers", []))
|
||
print(f" SAVED: {reg_path.name} ({n} registers)")
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" REGISTERS: fetch failed — {e}")
|
||
except Exception as e:
|
||
print(f" REGISTERS: error — {e}")
|
||
|
||
|
||
def dual_capture(iteration):
|
||
"""
|
||
Three-pass capture per test iteration. LP is captured FIRST so it catches
|
||
the SoT transition at pipeline startup — the moment flicker can occur.
|
||
HS quality and frame structure passes follow once the link is stable.
|
||
|
||
Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON)
|
||
Pass 2 — signal quality (HS differential, rise/fall)
|
||
Pass 3 — frame structure (HS differential, jitter/freq)
|
||
"""
|
||
capture_done.clear()
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
print(f"CAPTURE #{iteration:04d} [{ts}]")
|
||
|
||
# ── Pass 1: LP / SoT startup transition ───────────────────────────────
|
||
# Fired immediately after display ON (test_worker has no settle delay).
|
||
# Catches the first LP-11 → LP-01 → LP-00 → HS SoT sequence, which is
|
||
# where violations causing screen flicker occur.
|
||
print(" PASS 1: LP STARTUP TRANSITION...")
|
||
_configure_for_lp()
|
||
_set_timebase(LP_SCALE, LP_POINTS)
|
||
|
||
if rigol_scope.is_connected():
|
||
rigol_scope.arm() # arm before Agilent so 1.8 V droop is captured
|
||
|
||
if _arm_and_wait(timeout=30):
|
||
_save_pass_channels("lp", iteration, ts)
|
||
else:
|
||
print(" SKIPPING LP SAVE.")
|
||
|
||
if rigol_scope.is_connected():
|
||
DATA_DIR.mkdir(exist_ok=True)
|
||
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
|
||
n = rigol_scope.read_waveform_csv(v18_path)
|
||
if n:
|
||
print(f" SAVED: {v18_path.name} ({n} samples)")
|
||
else:
|
||
print(" RIGOL: Waveform read failed — check connection and probe.")
|
||
|
||
_restore_hs_config()
|
||
|
||
# ── Pass 2: HS signal quality ──────────────────────────────────────────
|
||
# LP pass takes ~5–10 s total; the HS link is fully settled by now.
|
||
print(" PASS 2: SIGNAL QUALITY...")
|
||
_set_timebase(SIG_SCALE, SIG_POINTS)
|
||
if _arm_and_wait():
|
||
_save_pass("sig", iteration, ts)
|
||
else:
|
||
print(" SKIPPING SIG SAVE.")
|
||
|
||
# ── Pass 3: frame/protocol structure (LP-triggered differential) ─────────
|
||
# Re-apply LP trigger so the LP-00 → HS transition lands near t=0 in F2.
|
||
# This gives a fixed byte-framing anchor: HS sync byte 0xB8 appears at
|
||
# t≈380 ns, followed by DI, WC, ECC, then the full pixel payload.
|
||
print(" PASS 3: FRAME STRUCTURE (LP-triggered differential)...")
|
||
_configure_for_lp()
|
||
_set_timebase(PROTO_SCALE, PROTO_POINTS)
|
||
if _arm_and_wait():
|
||
_save_pass("proto", iteration, ts)
|
||
else:
|
||
print(" SKIPPING PROTO SAVE.")
|
||
_restore_hs_config()
|
||
|
||
# ── Fetch DSI register snapshot from device ───────────────────────────
|
||
# Display is still ON here; registers reflect the active pipeline state.
|
||
_fetch_registers(ts, iteration)
|
||
|
||
# ── Restore original timebase ─────────────────────────────────────────
|
||
_set_timebase(5e-9, 500_000)
|
||
scope.write(":RUN")
|
||
capture_done.set()
|
||
|
||
|
||
def mgmt_worker():
|
||
"""
|
||
Fires every MGMT_INTERVAL seconds while a test is running:
|
||
- Pauses test_worker via resume_event
|
||
- Runs ai_mgmt CSV scan
|
||
- Resumes test_worker
|
||
"""
|
||
while test_running:
|
||
time.sleep(MGMT_INTERVAL)
|
||
if not test_running:
|
||
break
|
||
|
||
print("\n[MGMT] WAITING FOR CURRENT CAPTURE TO COMPLETE...")
|
||
capture_done.wait()
|
||
print("[MGMT] PAUSING TEST — RUNNING MANAGEMENT TASKS...")
|
||
resume_event.clear()
|
||
try:
|
||
copied, failed = ai_mgmt.transfer_csv_files()
|
||
print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.")
|
||
if copied > 0:
|
||
try:
|
||
analyze_captures.run_analysis(last=30)
|
||
except Exception as e:
|
||
print(f"[MGMT] ANALYSIS ERROR: {e}")
|
||
except Exception as e:
|
||
print(f"[MGMT] TRANSFER ERROR: {e}")
|
||
finally:
|
||
resume_event.set()
|
||
print("[MGMT] RESUMING TEST.\n")
|
||
|
||
|
||
def test_worker():
|
||
"""
|
||
Background loop:
|
||
- Waits if mgmt_worker has paused via resume_event
|
||
- Turns display ON, waits DISPLAY_SETTLE_S for it to stabilise
|
||
- Runs dual_capture (signal quality + frame structure)
|
||
- Turns display OFF for 1 second
|
||
- Repeats until test_running = False
|
||
"""
|
||
global test_running
|
||
count = 1
|
||
|
||
while test_running:
|
||
resume_event.wait() # block here while mgmt_worker is running
|
||
if not test_running:
|
||
break
|
||
try:
|
||
requests.put(URL, json={"state": "on"}, timeout=2)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" WARNING: display ON failed: {e}")
|
||
# No settle delay — LP pass fires immediately to catch startup SoT transition
|
||
dual_capture(count)
|
||
count += 1
|
||
try:
|
||
requests.put(URL, json={"state": "off"}, timeout=2)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" WARNING: display OFF failed: {e}")
|
||
time.sleep(1.0)
|
||
|
||
|
||
def main_menu():
|
||
global test_running
|
||
|
||
while True:
|
||
print("\n===== MIPI TEST CONTROL =====")
|
||
print("1. RUN IDN CHECK (PSU & SCOPE)")
|
||
print("2. SETUP SCOPE (RUN FIRST)")
|
||
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
|
||
print("4. PSU OUTPUT ON/OFF (CH1)")
|
||
print("5. START TEST & CAPTURE")
|
||
print("6. STOP TEST")
|
||
print("7. EXIT")
|
||
|
||
choice = input("\nSELECT OPTION (1-7): ").strip()
|
||
|
||
if choice == '1':
|
||
print(f"PSU : {psu.ask('*IDN?').strip()}")
|
||
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
|
||
if rigol_scope.is_connected():
|
||
print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}")
|
||
else:
|
||
print("RIGOL: NOT CONNECTED")
|
||
|
||
elif choice == '2':
|
||
setup_scope()
|
||
if rigol_scope.is_connected():
|
||
rigol_scope.configure()
|
||
|
||
elif choice == '3':
|
||
psu.write('CH1:VOLT 24.0')
|
||
psu.write('CH1:CURR 1.5')
|
||
print("PSU CONFIGURED: 24V / 1.5A")
|
||
|
||
elif choice == '4':
|
||
state = input("TYPE 'ON' OR 'OFF': ").strip().upper()
|
||
if state in ('ON', 'OFF'):
|
||
psu.write(f'OUTP CH1,{state}')
|
||
print(f"PSU OUTPUT {state}.")
|
||
else:
|
||
print("INVALID — TYPE 'ON' OR 'OFF'.")
|
||
|
||
elif choice == '5':
|
||
if not test_running:
|
||
test_running = True
|
||
resume_event.set()
|
||
threading.Thread(target=test_worker, daemon=True).start()
|
||
threading.Thread(target=mgmt_worker, daemon=True).start()
|
||
print(f"TEST STARTED. MANAGEMENT INTERVAL: {MGMT_INTERVAL}s.")
|
||
else:
|
||
print("TEST IS ALREADY RUNNING!")
|
||
|
||
elif choice == '6':
|
||
print("STOPPING TEST...")
|
||
test_running = False
|
||
|
||
elif choice == '7':
|
||
test_running = False
|
||
psu.close()
|
||
scope.close()
|
||
rigol_scope.disconnect()
|
||
print("INSTRUMENTS CLOSED. BYE.")
|
||
break
|
||
|
||
else:
|
||
print("INVALID ENTRY. PLEASE CHOOSE 1-7.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main_menu() |