#!/usr/bin/env python3 """ MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY Interactive flicker confirmation test. Same three-pass capture sequence as mipi_test.py. After each iteration: 1. LP files are transferred from the scope immediately and analysed. 2. If the rule-based LP pre-processor flags a flicker suspect, Claude is asked to assess the single capture. 3. If Claude agrees it looks like a flicker event, the test pauses and asks the operator to confirm by looking at the display. Confirmed → event is logged, HTML report is written, test STOPS. Not flickering (false alarm) → event is logged, test CONTINUES. 4. A final HTML report is written when the test ends for any reason. VERSION: 0.1 AUTHOR: D. RICE 16/04/2026 © 2026 ARRIVE """ import csv as _csv_mod import html import json import subprocess import time import sys import requests from datetime import datetime from pathlib import Path import math import os import struct import tempfile import wave import anthropic import vxi11 from dotenv import load_dotenv import ai_mgmt import rigol_scope from csv_preprocessor import (analyze_lp_file, LPMetrics, HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS) load_dotenv(Path(__file__).parent / ".env") # --------------------------------------------------------------------------- # Configuration (same as mipi_test.py) # --------------------------------------------------------------------------- DEVICE_BASE = "http://192.168.45.8:5000" URL = f"{DEVICE_BASE}/display" VIDEO_URL = f"{DEVICE_BASE}/video" SCOPE_IP = "192.168.45.4" PSU_IP = "192.168.45.3" # Pass 1 — signal quality (HS differential, rise/fall) SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window SIG_POINTS = 500_000 # Pass 2 — protocol/frame structure (HS differential, jitter/freq) PROTO_SCALE = 4e-6 # 4 µs/div → 40 µs window (was 1 µs/div) PROTO_POINTS = 500_000 # Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11) LP_SCALE = 1e-6 # 1 µs/div → 20 µs actual window (was 500 ns/div) LP_POINTS = 200_000 LP_TRIG_OFFSET = 9e-6 # shift centre 9 µs after trigger → 1 µs pre / 19 µs post LP_V_SCALE = 0.2 # V/div LP_V_OFFSET = 0.6 # V — centres display between LP-00 (0 V) and LP-11 (1.2 V) LP_TRIG_LEVEL = 0.6 # V — catches LP-11 → LP-01 falling edge DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" # Persistent logs (shared paths — consistent with mipi_test.py / analyze_captures.py) FLICKER_LOG = REPORTS_DIR / "flicker_log.csv" INTERACTIVE_LOG = REPORTS_DIR / "interactive_log.csv" # Claude model for per-capture flicker assessment CLAUDE_MODEL = "claude-opus-4-6" # --------------------------------------------------------------------------- # D-PHY timing calculation # --------------------------------------------------------------------------- # Ordered field list used for table formatting and u-boot output _TIMING_FIELD_ORDER = [ "lpx", "hs_prepare", "hs_zero", "hs_trail", "hs_exit", "clk_prepare", "clk_zero", "clk_post", "clk_trail", ] # Maps field names → flb_dtovar property names (NXP i.MX 8M Mini Samsung DSIM driver) _EXTRA_PROP_MAP = { "lpx": "dsi-phy-extra-lpx", "hs_prepare": "dsi-phy-extra-hs-prepare", "hs_zero": "dsi-phy-extra-hs-zero", "hs_trail": "dsi-phy-extra-hs-trail", "hs_exit": "dsi-phy-extra-hs-exit", "clk_prepare": "dsi-phy-extra-clk-prepare", "clk_zero": "dsi-phy-extra-clk-zero", "clk_post": "dsi-phy-extra-clk-post", "clk_trail": "dsi-phy-extra-clk-trail", } def calculate_dphy_timing(pixel_clock_mhz: float, extra_cycles: dict | None = None) -> dict: """ Calculate Samsung DSIM PHY timing register values for a given pixel clock. Assumes RGB888 (24 bpp), 4 DSI lanes — NXP i.MX 8M Mini / Ampire 1280×800. Timing constraints from MIPI D-PHY v1.1 Table 14. extra_cycles: optional dict mapping field names to additional cycles above the Round-Up minimum, e.g. {'clk_zero': 3, 'hs_prepare': 1, 'hs_trail': 1}. Samsung DSIM register packing: PHY_TIMING (0xb4): (lpx << 8) | hs_exit PHY_TIMING1 (0xb8): (clk_prepare << 24) | (clk_zero << 16) | (clk_post << 8) | clk_trail PHY_TIMING2 (0xbc): (hs_prepare << 16) | (hs_zero << 8) | hs_trail Returns a dict with: pixel_clock_mhz, bit_rate_mbps, byte_clock_mhz, byte_period_ns, ui_ns, fields, registers, violations. """ bpp = 24 lanes = 4 extras = extra_cycles or {} bit_rate_mbps = pixel_clock_mhz * bpp / lanes # Mbit/s per lane byte_clock_mhz = bit_rate_mbps / 8 # MHz byte_period_ns = 1000.0 / byte_clock_mhz # ns per byte-clock cycle ui_ns = 1000.0 / bit_rate_mbps # ns per UI def _ru(t_ns: float) -> int: return max(1, math.ceil(t_ns / byte_period_ns)) def _rb(t_ns: float) -> int: return max(1, round(t_ns / byte_period_ns)) def _field(name: str, min_ns: float, max_ns: float | None = None) -> dict: extra = extras.get(name, 0) ru = _ru(min_ns) rb = _rb(min_ns) final = ru + extra return { "min_ns": min_ns, "max_ns": max_ns, "round_best": rb, "round_up": ru, "extra": extra, "final": final, "actual_ns": final * byte_period_ns, } fields: dict = {} # LPX ≥ 50 ns fields["lpx"] = _field("lpx", 50.0) # HS-PREPARE: 40+4UI to 85+6UI ns hs_p_min = 40.0 + 4 * ui_ns hs_p_max = 85.0 + 6 * ui_ns fields["hs_prepare"] = _field("hs_prepare", hs_p_min, hs_p_max) hs_p_actual = fields["hs_prepare"]["actual_ns"] # HS-ZERO: combined hs_prepare + hs_zero ≥ 145+10UI ns hs_z_combined_min = 145.0 + 10 * ui_ns hs_z_min = max(1.0, hs_z_combined_min - hs_p_actual) fields["hs_zero"] = _field("hs_zero", hs_z_min) fields["hs_zero"]["combined_min_ns"] = hs_z_combined_min # HS-TRAIL ≥ max(8UI, 60+4UI) ns fields["hs_trail"] = _field("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns)) # HS-EXIT ≥ 100 ns fields["hs_exit"] = _field("hs_exit", 100.0) # CLK-PREPARE: 38 to 95 ns fields["clk_prepare"] = _field("clk_prepare", 38.0, 95.0) clk_p_actual = fields["clk_prepare"]["actual_ns"] # CLK-ZERO: combined clk_prepare + clk_zero ≥ 300 ns clk_z_combined_min = 300.0 clk_z_min = max(1.0, clk_z_combined_min - clk_p_actual) fields["clk_zero"] = _field("clk_zero", clk_z_min) fields["clk_zero"]["combined_min_ns"] = clk_z_combined_min # CLK-POST ≥ 60+52UI ns fields["clk_post"] = _field("clk_post", 60.0 + 52 * ui_ns) # CLK-TRAIL ≥ max(12UI, 60) ns fields["clk_trail"] = _field("clk_trail", max(12 * ui_ns, 60.0)) # ── Register packing ────────────────────────────────────────────────── def _f(name: str) -> int: return fields[name]["final"] phy_timing = (_f("lpx") << 8) | _f("hs_exit") phy_timing1 = ((_f("clk_prepare") << 24) | (_f("clk_zero") << 16) | (_f("clk_post") << 8) | _f("clk_trail")) phy_timing2 = ((_f("hs_prepare") << 16) | (_f("hs_zero") << 8) | _f("hs_trail")) registers = { "PHY_TIMING": {"addr": 0xb4, "value": phy_timing}, "PHY_TIMING1": {"addr": 0xb8, "value": phy_timing1}, "PHY_TIMING2": {"addr": 0xbc, "value": phy_timing2}, } # ── Constraint checks ───────────────────────────────────────────────── violations: list[str] = [] def _chk(name: str, lo: float, hi: float | None = None) -> None: a = fields[name]["actual_ns"] if a < lo: violations.append(f"{name}: {a:.2f} ns < {lo:.2f} ns (min)") if hi is not None and a > hi: violations.append(f"{name}: {a:.2f} ns > {hi:.2f} ns (max)") _chk("lpx", 50.0) _chk("hs_prepare", hs_p_min, hs_p_max) hs_combined = fields["hs_prepare"]["actual_ns"] + fields["hs_zero"]["actual_ns"] if hs_combined < hs_z_combined_min: violations.append( f"hs_prepare+hs_zero: {hs_combined:.2f} ns < {hs_z_combined_min:.2f} ns (min)") _chk("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns)) _chk("hs_exit", 100.0) _chk("clk_prepare", 38.0, 95.0) clk_combined = fields["clk_prepare"]["actual_ns"] + fields["clk_zero"]["actual_ns"] if clk_combined < clk_z_combined_min: violations.append( f"clk_prepare+clk_zero: {clk_combined:.2f} ns < {clk_z_combined_min:.2f} ns (min)") _chk("clk_post", 60.0 + 52 * ui_ns) _chk("clk_trail", max(12 * ui_ns, 60.0)) return { "pixel_clock_mhz": pixel_clock_mhz, "bit_rate_mbps": bit_rate_mbps, "byte_clock_mhz": byte_clock_mhz, "byte_period_ns": byte_period_ns, "ui_ns": ui_ns, "fields": fields, "registers": registers, "violations": violations, } def format_timing_table(t: dict) -> str: """Return a fixed-width console table of all D-PHY timing fields.""" fields = t["fields"] hdr = (f"{'Field':<14} {'Spec (ns)':<18} {'Rnd Best':>9} " f"{'Rnd Up':>7} {'Extra':>6} {'Final':>6} {'Actual (ns)':>12} Status") sep = "-" * len(hdr) lines = [sep, hdr, sep] for name in _TIMING_FIELD_ORDER: f = fields[name] min_ns = f["min_ns"] max_ns = f.get("max_ns") actual = f["actual_ns"] spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None else f">= {min_ns:.1f}") ok = actual >= min_ns if max_ns is not None and actual > max_ns: ok = False if name == "hs_zero": comb = fields["hs_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) if name == "clk_zero": comb = fields["clk_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) status = "OK" if ok else "FAIL" lines.append( f"{name:<14} {spec_str:<18} {f['round_best']:>9} " f"{f['round_up']:>7} +{f['extra']:>5} {f['final']:>6} {actual:>12.2f} {status}" ) lines.append(sep) return "\n".join(lines) def format_uboot_commands(t: dict) -> str: """Return u-boot shell commands to apply the computed PHY timing configuration.""" fields = t["fields"] regs = t["registers"] fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER} phy_t = regs["PHY_TIMING"]["value"] phy_t1 = regs["PHY_TIMING1"]["value"] phy_t2 = regs["PHY_TIMING2"]["value"] non_zero_extras = [ (_EXTRA_PROP_MAP[name], fields[name]["extra"]) for name in _TIMING_FIELD_ORDER if fields[name]["extra"] > 0 ] lines = [ f"# D-PHY PHY timing registers " f"(pixel clock {t['pixel_clock_mhz']} MHz, " f"{t['bit_rate_mbps']:.1f} Mbit/s, " f"byte clock {t['byte_clock_mhz']:.3f} MHz)", "#", f"# PHY_TIMING (0xb4) = 0x{phy_t:08x} " f"lpx={fv['lpx']} hs_exit={fv['hs_exit']}", f"# PHY_TIMING1 (0xb8) = 0x{phy_t1:08x} " f"clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']} " f"clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}", f"# PHY_TIMING2 (0xbc) = 0x{phy_t2:08x} " f"hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']} " f"hs_trail={fv['hs_trail']}", "", "# Enable Round-Up rounding (dsi-tweak bit 2)", 'setenv flb_dtovar "${flb_dtovar} dsi-tweak=4"', ] if non_zero_extras: lines += ["", "# Extra PHY cycles above Round-Up minimum"] for prop, val in non_zero_extras: lines.append(f'setenv flb_dtovar "${{flb_dtovar}} {prop}={val}"') lines += ["", "saveenv", "boot"] return "\n".join(lines) def prompt_for_config() -> dict: """ Interactive startup prompt: ask for pixel clock and optional extra PHY cycles. Prints timing table and u-boot commands, then returns a config dict. """ print("\n" + "=" * 64) print(" D-PHY TIMING CONFIGURATION") print("=" * 64) print("RGB888, 4 DSI lanes, Samsung DSIM on NXP i.MX 8M Mini.") print("Timing constraints: MIPI D-PHY v1.1 Table 14.\n") # ── Pixel clock ─────────────────────────────────────────────────────── while True: try: pix_str = input("Enter pixel clock in MHz (e.g. 72.0): ").strip() pixel_clock_mhz = float(pix_str) if not (10.0 <= pixel_clock_mhz <= 300.0): print(" Out of range — enter a value between 10 and 300.") continue break except ValueError: print(" Invalid — enter a number (e.g. 72.0).") # ── Baseline (no extras) ────────────────────────────────────────────── t_base = calculate_dphy_timing(pixel_clock_mhz) print(f"\nPixel clock : {pixel_clock_mhz} MHz") print(f"Bit rate : {t_base['bit_rate_mbps']:.1f} Mbit/s per lane") print(f"Byte clock : {t_base['byte_clock_mhz']:.3f} MHz " f"({t_base['byte_period_ns']:.3f} ns/byte)") print(f"UI : {t_base['ui_ns']:.3f} ns\n") print(format_timing_table(t_base)) if t_base["violations"]: print("\n *** BASELINE VIOLATIONS ***") for v in t_base["violations"]: print(f" ! {v}") else: print("\n All D-PHY v1.1 Table 14 constraints satisfied at baseline.") # ── Extra cycles ────────────────────────────────────────────────────── print("\n--- Extra PHY cycles (added on top of Round-Up minimum) ---") print("Suggested values for SN65DSI83 reliability (from dev testing):") print(" clk_zero +3, hs_prepare +1, hs_trail +1 (others 0)") print("Press Enter to accept 0 for each field.\n") extras: dict[str, int] = {} for name in _TIMING_FIELD_ORDER: while True: try: val = input(f" Extra cycles for {name:<12s} [0]: ").strip() extras[name] = int(val) if val else 0 if extras[name] < 0: print(" Cannot be negative.") continue break except ValueError: print(" Enter an integer (or press Enter for 0).") # ── Final calculation with extras ───────────────────────────────────── t_final = calculate_dphy_timing(pixel_clock_mhz, extras) if any(v > 0 for v in extras.values()): print("\n--- Final timing (with extras) ---") print(format_timing_table(t_final)) if t_final["violations"]: print("\n *** VIOLATIONS WITH EXTRAS ***") for v in t_final["violations"]: print(f" ! {v}") else: print("\n All constraints satisfied.") print("\n--- u-boot commands ---") print(format_uboot_commands(t_final)) print() return { "pixel_clock_mhz": pixel_clock_mhz, "extras": extras, "timing": t_final, } # --------------------------------------------------------------------------- # Instrument connection # --------------------------------------------------------------------------- try: psu = vxi11.Instrument(PSU_IP) scope = vxi11.Instrument(SCOPE_IP) scope.timeout = 30 psu.timeout = 5 except Exception as e: print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}") sys.exit(1) rigol_scope.connect() # --------------------------------------------------------------------------- # Scope configuration (identical to mipi_test.py) # --------------------------------------------------------------------------- def setup_scope(): """Initialises scope for MIPI DSI signals (~210 MHz).""" print("CONFIGURING SCOPE...") cmds = [ "*RST", ":RUN", ":STOP", # Channel 1 — Clock D+ ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", ":CHANnel1:SCALe 0.1", ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'", # Channel 2 — Clock D- ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", ":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'", # Channel 3 — Data Lane 0 D+ ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2", ":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'", # Channel 4 — Data Lane 0 D- ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2", ":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'", # Timebase ":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer", # Trigger — rising edge on Ch1 (Clock D+) ":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1", ":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05", ":TRIGger:SWEep NORMal", # Acquisition ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000", ":DISPlay:LAYout STACKED", ":RUN", ] for cmd in cmds: scope.write(cmd) time.sleep(0.05) print("CHANNEL SETUP COMPLETE.") setup_math_channels() def setup_math_channels(): """F1 = Ch1−Ch2 (clock differential), F2 = Ch3−Ch4 (lane 0 differential).""" print("SETTING UP MATH CHANNELS...") scope.write("*CLS") time.sleep(0.2) for cmd in [ ":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2", ":FUNCtion1:RANGe 0.8", ":FUNCtion1:OFFSet 0.0", ":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4", ":FUNCtion2:RANGe 0.8", ":FUNCtion2:OFFSet 0.0", ]: scope.write(cmd) time.sleep(0.2) try: time.sleep(1.0) opc = scope.ask("*OPC?") print(f" SCOPE SYNC OK (OPC={opc.strip()})") except Exception as e: print(f" WARNING: OPC SYNC FAILED ({e})") try: err = scope.ask(":SYSTem:ERRor?") if err.strip().startswith("0"): print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.") print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)") else: print(f" SCPI ERROR: {err.strip()}") except Exception as e: print(f" COULD NOT READ ERROR QUEUE ({e})") def _set_timebase(scale, points): scope.write(f":TIMebase:SCALe {scale:.3E}") scope.write(f":ACQuire:POINts {points}") time.sleep(0.3) def _arm_and_wait(timeout=20): """Single acquisition via :DIGitize + *OPC?. :DIGitize holds the SCPI bus until it triggers — if it times out we close and reopen the VXI-11 link to abort the pending RPC so subsequent writes to a fresh connection succeed. :SINGle + status polling is NOT used: on this scope it generates QUERY UNTERMINATED errors that break later acquires.""" global scope prev_timeout = scope.timeout try: scope.timeout = timeout + 5 scope.write(":DIGitize") return scope.ask("*OPC?").strip() == "1" except Exception as e: print(f" ACQUIRE ERROR: {e}") try: scope.close() except Exception: pass time.sleep(2.0) for attempt in range(3): try: scope = vxi11.Instrument(SCOPE_IP) scope.timeout = 30 scope.write(":STOP") time.sleep(0.2) break except Exception as re: print(f" SCOPE RECONNECT ERROR (attempt {attempt+1}): {re}") time.sleep(1.0) return False finally: try: scope.timeout = prev_timeout except Exception: pass def _save_pass(tag, iteration, ts): """Save F1 (CLK diff) and F2 (DAT diff) as CSV.""" base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _save_pass_channels(tag, iteration, ts): """Save Ch1 (CLK+) and Ch3 (DAT0+) single-ended for LP state analysis.""" base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _configure_for_lp(): """Widen vertical range for LP states and switch to falling-edge trigger.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") time.sleep(0.05) scope.write(":TRIGger:EDGE:SOURce CHANnel3") scope.write(":TRIGger:EDGE:SLOPe NEGative") scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") scope.write(":TRIGger:SWEep NORMal") # must wait for real LP-11→LP-01 edge, not auto-fire on HS time.sleep(0.1) def _restore_hs_config(): """Restore HS-mode channel scales and trigger after LP capture.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe 0.1") scope.write(f":CHANnel{ch}:OFFSet 0.0") time.sleep(0.05) scope.write(":TRIGger:EDGE:SOURce CHANnel1") scope.write(":TRIGger:EDGE:SLOPe POSitive") scope.write(":TRIGger:EDGE:LEVel 0.05") scope.write(":TRIGger:SWEep AUTO") # CLK startup sets NORMAL — restore AUTO so :DIGitize captures HS data time.sleep(0.1) def _fetch_registers(ts: str, iteration: int) -> None: """GET /registers (DSIM PHY) and /sn65_registers (SN65DSI83 CSRs) then save combined JSON.""" combined: dict = {} # DSIM PHY timing registers (memtool / memory-mapped) try: resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5) resp.raise_for_status() dsim = resp.json() combined["dsim"] = dsim if dsim.get("errors"): print(f" REGISTERS: DSIM warnings — {dsim['errors']}") except requests.exceptions.RequestException as e: print(f" REGISTERS: DSIM fetch failed — {e}") combined["dsim"] = None except Exception as e: print(f" REGISTERS: DSIM error — {e}") combined["dsim"] = None # SN65DSI83 CSR snapshot (I2C) try: resp = requests.get(f"{DEVICE_BASE}/sn65_registers", timeout=5) resp.raise_for_status() sn65 = resp.json() combined["sn65"] = sn65 regs = sn65.get("registers", {}) csr_0a = regs.get("csr_0a", {}) if csr_0a: pll_str = "LOCKED " if csr_0a["pll_lock"] else "*** UNLOCKED ***" clk_str = "detected " if csr_0a["clk_det"] else "NOT detected" print(f" SN65DSI83: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a['value']})") csr_e5 = regs.get("csr_e5", {}) if csr_e5: _error_bits = [ ("pll_unlock", "PLL_UNLOCK"), ("cha_sot_bit_err", "SOT_BIT_ERR"), ("cha_llp_err", "LLP_ERR"), ("cha_ecc_err", "ECC_ERR"), ("cha_lp_err", "LP_ERR"), ("cha_crc_err", "CRC_ERR"), ] active = [label for key, label in _error_bits if csr_e5.get(key)] if active: print(f" SN65DSI83: *** ERROR FLAGS SET: {', '.join(active)} (CSR 0xE5 = {csr_e5['value']}) ***") else: print(f" SN65DSI83: no error flags (CSR 0xE5 = {csr_e5['value']})") if sn65.get("errors"): print(f" SN65DSI83: I2C warnings — {sn65['errors']}") except requests.exceptions.RequestException as e: print(f" REGISTERS: SN65DSI83 fetch failed — {e}") combined["sn65"] = None except Exception as e: print(f" REGISTERS: SN65DSI83 error — {e}") combined["sn65"] = None # SN65DSI83 post-restart settling poll + register snapshots try: resp = requests.get(f"{DEVICE_BASE}/sn65_settling", timeout=10) resp.raise_for_status() settling = resp.json() combined["sn65_settling"] = settling n = settling.get("n_readings", 0) n_err = settling.get("n_error", 0) dur = settling.get("duration_s", 0) # ── csr_e5 error summary ────────────────────────────────────────── if n_err: err_readings = [r for r in settling.get("readings", []) if r.get("any_error")] times = [r["t_ms"] for r in err_readings] print(f" SN65 SETTLING: *** {n_err}/{n} readings had csr_e5 errors " f"over {dur:.1f} s (t={times[0]:.0f}–{times[-1]:.0f} ms) ***") for r in err_readings[:3]: print(f" t={r['t_ms']:6.1f} ms csr_0a={r['csr_0a']} " f"csr_e5={r['csr_e5']} " f"pll={'Y' if r['pll_lock'] else 'N'} " f"clk={'Y' if r['clk_det'] else 'N'}") else: clk_false = sum(1 for r in settling.get("readings", []) if r.get("clk_det") is False) print(f" SN65 SETTLING: no csr_e5 errors in {n} readings over {dur:.1f} s" + (f" ({clk_false} readings with clk_det=False)" if clk_false else "")) # ── Register snapshot: print start values and flag any changes ─── snap_start = settling.get("snapshot_start") or {} snap_end = settling.get("snapshot_end") or {} changed = settling.get("changed_regs") or {} if snap_start: print(f" SN65 REGS (t=0):", end="") # Print a compact one-liner of key config registers _key = ["0x0d", "0x10", "0x11", "0x18", "0x19", "0x1a", "0x1b", "0x3c", "0xe0", "0xe1"] parts = [] for r in _key: info = snap_start.get(r, {}) parts.append(f"{info.get('name','?')}={info.get('value','?')}") print(" " + " ".join(parts)) if changed: print(f" SN65 REGS CHANGED during settling window ({len(changed)} registers):") for reg, diff in changed.items(): print(f" {reg} {diff['name']:16s} {diff['start']} → {diff['end']}") elif snap_start: print(f" SN65 REGS: stable (no register changes between t=0 and t={dur:.1f}s)") except requests.exceptions.RequestException as e: print(f" REGISTERS: settling poll fetch failed — {e}") combined["sn65_settling"] = None except Exception as e: print(f" REGISTERS: settling poll error — {e}") combined["sn65_settling"] = None # Save combined JSON try: DATA_DIR.mkdir(exist_ok=True) reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json" reg_path.write_text(json.dumps(combined, indent=2)) dsim_count = len((combined.get("dsim") or {}).get("registers", [])) print(f" SAVED: {reg_path.name} ({dsim_count} DSIM registers)") except Exception as e: print(f" REGISTERS: save error — {e}") def dual_capture(iteration: int) -> str: """ Three-pass capture per test iteration. Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON) Pass 2 — signal quality (HS differential, rise/fall) Pass 3 — frame structure (HS differential, jitter/freq) Returns the timestamp string used in all filenames for this iteration. """ ts = datetime.now().strftime("%Y%m%d_%H%M%S") print(f"CAPTURE #{iteration:04d} [{ts}]") # ── Pass 1: LP / SoT startup ─────────────────────────────────────────── # Trigger position shifted so only 1 µs of LP-11 is pre-trigger; the # remaining 19 µs shows the full HS burst so truncated bursts are visible. print(" PASS 1: LP STARTUP TRANSITION...") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") if rigol_scope.is_connected(): rigol_scope.arm() if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: print(" SKIPPING LP SAVE.") scope.write(":TIMebase:POSition 0") # restore centred for subsequent passes if rigol_scope.is_connected(): DATA_DIR.mkdir(exist_ok=True) v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" n = rigol_scope.read_waveform_csv(v18_path) if n: print(f" SAVED: {v18_path.name} ({n} samples)") else: print(" RIGOL CH1: waveform read failed — check connection and probe.") _restore_hs_config() # ── Pass 2: HS signal quality ────────────────────────────────────────── print(" PASS 2: SIGNAL QUALITY...") _set_timebase(SIG_SCALE, SIG_POINTS) if _arm_and_wait(): _save_pass("sig", iteration, ts) else: print(" SKIPPING SIG SAVE.") # ── Pass 3: frame/protocol structure (LP-triggered differential) ───────── # Re-apply LP trigger so the LP-00 → HS transition lands near t=0 in F2. # This gives a fixed byte-framing anchor: HS sync byte 0xB8 appears at # t≈380 ns, followed by DI, WC, ECC, then the full pixel payload. print(" PASS 3: FRAME STRUCTURE (LP-triggered differential)...") _configure_for_lp() try: _set_timebase(PROTO_SCALE, PROTO_POINTS) except Exception: print(" SKIPPING PROTO SAVE.") _restore_hs_config() _fetch_registers(ts, iteration) try: _set_timebase(5e-9, 500_000) scope.write(":RUN") except Exception: pass return ts if _arm_and_wait(): _save_pass("proto", iteration, ts) else: print(" SKIPPING PROTO SAVE.") _restore_hs_config() # ── DSI register snapshot ───────────────────────────────────────────── _fetch_registers(ts, iteration) # ── Restore default timebase ────────────────────────────────────────── try: _set_timebase(5e-9, 500_000) scope.write(":RUN") except Exception as e: print(f" WARNING: scope restore failed: {e}") return ts # --------------------------------------------------------------------------- # Per-iteration LP analysis + Claude assessment # --------------------------------------------------------------------------- def _build_system_prompt(config: dict | None = None) -> str: """Build the Claude system prompt with correct bit-rate values for the active config.""" if config and "timing" in config: t = config["timing"] bit_rate = t["bit_rate_mbps"] byte_clk = t["byte_clock_mhz"] byte_per = t["byte_period_ns"] else: bit_rate = 432.0 byte_clk = 54.0 byte_per = 18.518 return ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will receive a pre-processed summary of a single LP capture from a MIPI DAT0 lane " "(NXP i.MX 8M Mini Samsung DSIM IP driving a SN65DSI83 MIPI-to-LVDS bridge). " f"HS bit rate: {bit_rate:.1f} Mbit/s. " f"Byte clock: {byte_clk:.3f} MHz ({byte_per:.3f} ns/byte). " "The LP-low plateau (LP-01/LP-00 SoT preamble) must be ≥ 50 ns for the SN65DSI83 " "to detect start-of-transmission. A plateau shorter than 50 ns or absent means the " "bridge missed the SoT and the display will flicker visibly. " "Be decisive. Answer YES or NO on the first line." ) def _build_claude_prompt(ts: str, iteration: int, lp_summaries: list[str], suspects: list[LPMetrics], config: dict | None = None, followup_summaries: list[str] | None = None) -> str: """ Build a concise prompt asking Claude to assess a single capture. The rule-based pre-filter has already flagged at least one LP suspect. If followup_summaries is provided it contains the next-frame LP capture taken immediately after the suspect — the frame the operator will actually observe. """ suspect_lines = "\n".join( f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns " f"(spec ≥ 50 ns) lp11_to_hs={m.lp11_to_hs_ns} ns " f"lp11_voltage={m.lp11_voltage_v} V " f"hs_amplitude={m.hs_amplitude_mv} mV (normal 105–122 mV; absent <50 mV)" for m in suspects ) summaries_text = "\n\n".join(lp_summaries) config_text = "" if config and "timing" in config: t = config["timing"] config_text = ( f"\n\nTest configuration: pixel clock {t['pixel_clock_mhz']} MHz, " f"bit rate {t['bit_rate_mbps']:.1f} Mbit/s per lane, " f"byte clock {t['byte_clock_mhz']:.3f} MHz " f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns." ) followup_text = "" if followup_summaries: followup_text = ( f"\n\nFOLLOW-UP CAPTURE (next display frame — what the operator sees " f"on screen while assessing):\n" f"Note: due to the display pipeline lag, the visual flicker caused by " f"the electrical event above appears one frame later. If the follow-up " f"frame looks electrically normal, the flicker observed by the operator " f"was caused by the preceding capture, not this one.\n\n" + "\n\n".join(followup_summaries) ) return ( f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n" f"The rule-based LP pre-processor has flagged the following measurements as " f"potential flicker suspects because the LP-low plateau is absent or shorter " f"than 50 ns:\n{suspect_lines}\n\n" f"Full LP capture summaries:\n{summaries_text}" f"{config_text}" f"{followup_text}\n\n" f"Based solely on these LP timing metrics, do you believe this capture " f"represents a genuine screen flicker event — i.e., was the SoT sequence " f"too brief for the SN65DSI83 bridge to detect start-of-transmission, " f"likely causing visible display flicker?\n\n" f"Start your response with YES or NO on the first line, then explain your " f"reasoning briefly (2–4 sentences) referencing the specific metric values." ) def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None: """Append a flicker suspect to the shared flicker_log.csv.""" FLICKER_LOG.parent.mkdir(exist_ok=True) write_header = not FLICKER_LOG.exists() with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f: w = _csv_mod.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "channel", "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"]) w.writerow([ datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ts, f"{iteration:04d}", m.channel, m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v, ]) def _start_video() -> None: try: requests.put(VIDEO_URL, json={"action": "start", "mode": "static-pink"}, timeout=3) print(" VIDEO: static-pink display started.") except Exception as e: print(f" WARNING: video start failed: {e}") def _stop_video() -> None: try: requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3) print(" VIDEO: kiosk player stopped.") except Exception as e: print(f" WARNING: video stop failed: {e}") def _play_alarm() -> None: """Play three short beeps using a generated WAV tone.""" sample_rate = 44100 freq = 880 duration = 0.35 n_samples = int(sample_rate * duration) samples = [int(32767 * math.sin(2 * math.pi * freq * i / sample_rate)) for i in range(n_samples)] packed = struct.pack(f"<{n_samples}h", *samples) tmp = None try: fd, tmp = tempfile.mkstemp(suffix=".wav") os.close(fd) with wave.open(tmp, "w") as w: w.setnchannels(1) w.setsampwidth(2) w.setframerate(sample_rate) w.writeframes(packed) # os.system inherits the full shell environment (XDG_RUNTIME_DIR, PULSE_SERVER, etc.) played = False for cmd in (f"aplay -q {tmp}", f"pw-play {tmp}", f"paplay {tmp}"): if os.system(f"{cmd} 2>/dev/null") == 0: played = True for _ in range(2): time.sleep(0.2) os.system(f"{cmd} 2>/dev/null") break if not played: try: with open("/dev/tty", "w") as tty: for _ in range(5): tty.write("\a") tty.flush() time.sleep(0.3) except Exception: print("\a" * 5, end="", flush=True) except Exception: print("\a" * 5, end="", flush=True) finally: if tmp: try: os.unlink(tmp) except Exception: pass def _analyze_lp_files( ts: str, iteration: int ) -> tuple[list[str], list[LPMetrics]]: """ Run rule-based LP analysis for one iteration. Returns (lp_summaries, suspects). Logs suspects and prints alerts. Does NOT call Claude. """ lp_summaries: list[str] = [] suspects: list[LPMetrics] = [] for channel in ("clk", "dat"): path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv" if not path.exists(): print(f" LP ANALYSIS: {path.name} not found — skipping.") continue try: m = analyze_lp_file(path) lp_summaries.append(m.summary()) if m.flicker_suspect: suspects.append(m) _append_flicker_log(ts, iteration, m) if not m.lp_transition_valid and not m.lp11_voltage_v: reason = "MIPI link silent (no LP-11/LP-low/HS detected)" elif (m.hs_amplitude_mv is not None and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV and m.lp11_to_hs_ns is not None and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS): reason = (f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, " f"lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)") else: reason = f"lp_low={m.lp_low_duration_ns} ns" print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} " f"channel={m.channel} {reason} ***\n") except Exception as e: print(f" LP ANALYSIS ERROR ({channel}): {e}") return lp_summaries, suspects def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics]]: """ LP-only follow-up capture taken immediately after a suspect is detected. Captures the next display frame — the one the operator will actually see when asked to assess whether the screen is flickering. Returns (ts_followup, lp_summaries, suspects). Returns ("", [], []) silently if the scope is not reachable. """ print(" FOLLOW-UP CAPTURE: acquiring next frame for display-lag context...") try: ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) if rigol_scope.is_connected(): rigol_scope.arm() if _arm_and_wait(timeout=10): _save_pass_channels("lp", iteration, ts_fu) else: print(" FOLLOW-UP: trigger timeout — skipping.") _restore_hs_config() return "", [], [] _restore_hs_config() # Transfer the new files from scope try: copied, _ = ai_mgmt.transfer_csv_files() print(f" FOLLOW-UP: {copied} file(s) transferred.") except Exception as e: print(f" FOLLOW-UP TRANSFER ERROR: {e}") summaries, suspects = _analyze_lp_files(ts_fu, iteration) return ts_fu, summaries, suspects except Exception as e: print(f" FOLLOW-UP CAPTURE ERROR: {e}") return "", [], [] def _call_claude( ts: str, iteration: int, lp_summaries: list[str], suspects: list[LPMetrics], config: dict | None = None, followup_summaries: list[str] | None = None, ) -> tuple[bool, str]: """ Call the Claude API to assess whether the flagged capture is a flicker event. Returns (claude_says_flicker, response_text). """ print(" CALLING CLAUDE FOR ASSESSMENT...") try: client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 600, system = _build_system_prompt(config), messages = [{"role": "user", "content": _build_claude_prompt(ts, iteration, lp_summaries, suspects, config, followup_summaries)}], ) response = message.content[0].text.strip() first_line = response.splitlines()[0].strip().upper() claude_says_flicker = first_line.startswith("YES") label = "FLICKER" if claude_says_flicker else "NOT FLICKER" print(f" CLAUDE: {label} ({message.usage.input_tokens} in / " f"{message.usage.output_tokens} out tokens)") return claude_says_flicker, response except Exception as e: print(f" CLAUDE API ERROR: {e}") fallback = (f"(Claude API unavailable: {e})\n" f"Rule-based detector flagged LP-low plateau < 50 ns — " f"treat as potential flicker suspect.") return True, fallback def analyze_lp_and_ask_claude( ts: str, iteration: int, config: dict | None = None ) -> tuple[bool, str, list[LPMetrics]]: """ Analyse the LP files for this iteration (rule-based + Claude if suspect). Kept for backwards compatibility — used by report generation and tests. Does not perform a follow-up capture; call the test loop directly for that. """ lp_summaries, suspects = _analyze_lp_files(ts, iteration) if not suspects: return False, "", [] claude_flicker, response = _call_claude(ts, iteration, lp_summaries, suspects, config) return claude_flicker, response, suspects # --------------------------------------------------------------------------- # Event logging and HTML report # --------------------------------------------------------------------------- def _config_section_html(config: dict) -> str: """Generate the D-PHY configuration section for the HTML report.""" t = config["timing"] fields = t["fields"] regs = t["registers"] rows_html = "" for name in _TIMING_FIELD_ORDER: f = fields[name] min_ns = f["min_ns"] max_ns = f.get("max_ns") actual = f["actual_ns"] ok = actual >= min_ns if max_ns is not None and actual > max_ns: ok = False if name == "hs_zero": comb = fields["hs_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) if name == "clk_zero": comb = fields["clk_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None else f"≥ {min_ns:.1f}") cell_style = "" if ok else ' style="color:#c62828;font-weight:bold"' status_html = "✓" if ok else '✖ FAIL' rows_html += ( f"" f"{name}" f"{spec_str}" f"{f['round_best']}" f"{f['round_up']}" f"+{f['extra']}" f"{f['final']}" f"{actual:.2f}" f"{status_html}" f"\n" ) if t["violations"]: items_html = "".join(f"
  • {html.escape(v)}
  • " for v in t["violations"]) viol_html = ( f'

    Timing violations:

    ' f"" ) else: viol_html = ( '

    ✓ All D-PHY v1.1 Table 14 ' "constraints satisfied.

    " ) fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER} phy_t = regs["PHY_TIMING"]["value"] phy_t1 = regs["PHY_TIMING1"]["value"] phy_t2 = regs["PHY_TIMING2"]["value"] uboot = html.escape(format_uboot_commands(t)) return f"""

    D-PHY Configuration

    Pixel clock: {t['pixel_clock_mhz']} MHz  |  Bit rate: {t['bit_rate_mbps']:.1f} Mbit/s per lane  |  Byte clock: {t['byte_clock_mhz']:.3f} MHz ({t['byte_period_ns']:.3f} ns/byte)  |  UI: {t['ui_ns']:.3f} ns

    {rows_html}
    FieldSpec (ns)Rnd BestRnd Up ExtraFinalActual (ns)Status
    {viol_html}

    Samsung DSIM Registers

    RegisterAddressValueField breakdown
    PHY_TIMING0xb4 0x{phy_t:08x} lpx={fv['lpx']}   hs_exit={fv['hs_exit']}
    PHY_TIMING10xb8 0x{phy_t1:08x} clk_prepare={fv['clk_prepare']}   clk_zero={fv['clk_zero']}   clk_post={fv['clk_post']}   clk_trail={fv['clk_trail']}
    PHY_TIMING20xbc 0x{phy_t2:08x} hs_prepare={fv['hs_prepare']}   hs_zero={fv['hs_zero']}   hs_trail={fv['hs_trail']}

    u-boot Commands

    {uboot}
    """ def _log_interaction( events: list, ts: str, iteration: int, suspects: list[LPMetrics], claude_said: bool, user_confirmed: bool | None, reasoning: str, ) -> None: """ Append an event to the in-memory list and to interactive_log.csv. user_confirmed values: True — operator confirmed flicker False — operator said no (false alarm) None — operator was not asked (Claude said no) """ now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") events.append({ "ts": ts, "iteration": iteration, "suspects": suspects, "claude_said": claude_said, "user_confirmed": user_confirmed, "reasoning": reasoning, "logged_at": now_str, }) INTERACTIVE_LOG.parent.mkdir(exist_ok=True) write_header = not INTERACTIVE_LOG.exists() with open(INTERACTIVE_LOG, "a", newline="", encoding="utf-8") as f: w = _csv_mod.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "claude_said_flicker", "user_confirmed", "lp_low_ns", "reasoning_summary"]) for m in suspects: w.writerow([ now_str, ts, f"{iteration:04d}", "YES" if claude_said else "NO", ("YES" if user_confirmed is True else "NO" if user_confirmed is False else "NOT_ASKED"), m.lp_low_duration_ns, reasoning[:150].replace("\n", " "), ]) def save_report(events: list, stop_reason: str, config: dict | None = None) -> Path: """Write a timestamped HTML report summarising all flicker interactions.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S_interactive.html") path = REPORTS_DIR / filename confirmed_n = sum(1 for e in events if e["user_confirmed"] is True) false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False) claude_no_n = sum(1 for e in events if e["user_confirmed"] is None) # ── Event table rows ─────────────────────────────────────────────────── rows_html = "" for e in events: for m in e["suspects"]: conf = e["user_confirmed"] if conf is True: badge = ('' '✖ CONFIRMED FLICKER') elif conf is False: badge = ('' '✓ FALSE ALARM') else: badge = ('' 'Claude said NO — user not asked') lp_val = m.lp_low_duration_ns lp_bad = lp_val is None or lp_val < 50 lp_cell = (f'{lp_val} ns' if lp_bad else f'{lp_val} ns') rows_html += ( f"" f"{e['iteration']:04d}" f"{e['ts']}" f"{m.channel}" f"{lp_cell}" f"{m.lp11_to_hs_ns} ns" f"{m.lp11_voltage_v} V" f"{'YES' if e['claude_said'] else 'NO'}" f"{badge}" f"" ) table_html = ( '

    No flicker suspects were detected during this test run.

    ' if not rows_html else f""" {rows_html}
    CaptureTimestampChannel LP-low plateauLP exit→HSLP-11 voltage Claude: flicker?Outcome
    """ ) # ── D-PHY config section ─────────────────────────────────────────────── config_html = _config_section_html(config) if config else "" # ── Claude reasoning sections ────────────────────────────────────────── reasoning_html = "" for e in events: if not e["reasoning"] or not e["claude_said"]: continue conf = e["user_confirmed"] label = (" — CONFIRMED FLICKER" if conf is True else " — FALSE ALARM" if conf is False else "") reasoning_html += ( f'

    Capture {e["iteration"]:04d} [{e["ts"]}]{html.escape(label)}

    ' f'
    '
                f'{html.escape(e["reasoning"])}
    ' ) html_content = f""" MIPI Interactive Flicker Test — {now.strftime('%Y-%m-%d %H:%M:%S')}

    MIPI Interactive Flicker Test Report

    Generated: {now.strftime('%Y-%m-%d %H:%M:%S')}  |  Model: {CLAUDE_MODEL}

    Stop reason: {html.escape(stop_reason)}
    {confirmed_n} confirmed flicker(s)
    {false_alarm_n} false alarm(s)
    {claude_no_n} Claude said no
    {config_html}

    Event Log

    {table_html} {'

    Claude Assessments

    ' + reasoning_html if reasoning_html else ''} """ path.write_text(html_content, encoding="utf-8") return path # --------------------------------------------------------------------------- # Interactive test loop # --------------------------------------------------------------------------- def run_interactive_test() -> None: """ Blocking interactive test loop (runs in the calling thread — no background threads). Flow per iteration: 1. Display ON 2. Three-pass dual_capture → saves LP/sig/proto CSVs to scope 3. Transfer all scope CSVs to local data/ 4. Analyse LP files for this iteration (rule-based + Claude if suspect) 5. If Claude says flicker: - Keep display ON so operator can observe - Ask operator to confirm - Confirmed → log event, save report, STOP - False alarm → log event, continue 6. Display OFF, 1 s pause, next iteration Press Ctrl+C to exit at any time. A report is always saved on exit. """ iteration = 1 events: list = [] stop_reason = "Test stopped by user" # ── Pixel clock configuration ────────────────────────────────────────── config = prompt_for_config() # Flush any stale SCPI state from previous runs (QUERY UNTERMINATED errors etc.) try: scope.write("*CLS") scope.write(":STOP") time.sleep(0.3) except Exception as e: print(f" WARNING: scope pre-flush failed: {e}") print("\nINTERACTIVE FLICKER TEST STARTED.") print("Each iteration: display ON → 3-pass capture → LP analysis → Claude check.") print("The display stays ON while Claude and the operator assess the frame.") print("Press Ctrl+C at any time to stop.\n") _start_video() try: while True: # ── Display ON ───────────────────────────────────────────────── try: requests.put(URL, json={"state": "on"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display ON failed: {e}") # ── Three-pass capture ───────────────────────────────────────── ts = dual_capture(iteration) # ── Transfer scope files to local data/ ──────────────────────── print(" TRANSFERRING FILES FROM SCOPE...") try: copied, failed = ai_mgmt.transfer_csv_files() print(f" TRANSFERRED {copied} FILE(S). {failed} FAILED.") except Exception as e: print(f" TRANSFER ERROR: {e}") # ── Rule-based LP analysis ───────────────────────────────────── lp_summaries, suspects = _analyze_lp_files(ts, iteration) followup_summaries: list[str] = [] if suspects: # Take an LP-only follow-up capture before calling Claude. # The visual flicker caused by a missed SoT appears one display # frame later (pipeline lag), so the operator observes flicker # on the frame AFTER the electrical event. Including the next # frame gives Claude — and the operator — the correct context. _, followup_summaries, _ = _lp_followup_capture(iteration) # ── Call Claude if any rule-based suspect was found ──────────── claude_flicker = False reasoning = "" if suspects: claude_flicker, reasoning = _call_claude( ts, iteration, lp_summaries, suspects, config, followup_summaries or None) if claude_flicker: # ── Keep display ON — ask operator ───────────────────────── _play_alarm() print("\n" + "=" * 64) print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW") print("=" * 64) print(f"\n{reasoning}\n") confirmed: bool | None = None while confirmed is None: try: ans = input("IS THE SCREEN ACTUALLY FLICKERING? (Y/N): ").strip().upper() except EOFError: ans = "N" if ans in ("Y", "YES"): confirmed = True elif ans in ("N", "NO"): confirmed = False else: print(" Please enter Y or N.") _log_interaction(events, ts, iteration, suspects, claude_flicker, confirmed, reasoning) if confirmed: print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.") stop_reason = (f"Flicker confirmed by operator at " f"capture {iteration:04d} [{ts}]") _stop_video() try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: pass break # exit the while loop → save report below else: print(" NOT FLICKERING — false alarm logged. Continuing.\n") if suspects and not claude_flicker: # Rule-based suspect but Claude said no — record for reference _log_interaction(events, ts, iteration, suspects, False, None, reasoning) # ── Display OFF, brief pause before next iteration ───────────── try: requests.put(URL, json={"state": "off"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display OFF failed: {e}") iteration += 1 time.sleep(1.0) except KeyboardInterrupt: print("\n\n TEST INTERRUPTED (Ctrl+C).") stop_reason = "Test interrupted by operator (Ctrl+C)" _stop_video() try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: pass # ── Always save a report on exit ─────────────────────────────────────── report_path = save_report(events, stop_reason, config) print(f"\nREPORT SAVED: {report_path}") if events: confirmed_n = sum(1 for e in events if e["user_confirmed"] is True) false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False) print(f" {confirmed_n} confirmed flicker(s), {false_alarm_n} false alarm(s) " f"({len(events)} total suspect(s) assessed)") # --------------------------------------------------------------------------- # Menu # --------------------------------------------------------------------------- def main_menu() -> None: while True: print("\n===== MIPI INTERACTIVE TEST CONTROL =====") print("1. RUN IDN CHECK (PSU & SCOPE)") print("2. SETUP SCOPE (RUN FIRST)") print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") print("5. START INTERACTIVE FLICKER TEST") print("6. EXIT") choice = input("\nSELECT OPTION (1-6): ").strip() if choice == '1': print(f"PSU : {psu.ask('*IDN?').strip()}") print(f"SCOPE: {scope.ask('*IDN?').strip()}") if rigol_scope.is_connected(): print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}") else: print("RIGOL: NOT CONNECTED") elif choice == '2': setup_scope() if rigol_scope.is_connected(): rigol_scope.configure() elif choice == '3': psu.write('CH1:VOLT 24.0') psu.write('CH1:CURR 1.5') print("PSU CONFIGURED: 24V / 1.5A") elif choice == '4': state = input("TYPE 'ON' OR 'OFF': ").strip().upper() if state in ('ON', 'OFF'): psu.write(f'OUTP CH1,{state}') print(f"PSU OUTPUT {state}.") else: print("INVALID — TYPE 'ON' OR 'OFF'.") elif choice == '5': run_interactive_test() elif choice == '6': psu.close() scope.close() rigol_scope.disconnect() print("INSTRUMENTS CLOSED. BYE.") break else: print("INVALID ENTRY. PLEASE CHOOSE 1-6.") if __name__ == "__main__": main_menu()