#!/usr/bin/env python3 """ MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY Interactive flicker confirmation test. Same three-pass capture sequence as mipi_test.py. After each iteration: 1. LP files are transferred from the scope immediately and analysed. 2. If the rule-based LP pre-processor flags a flicker suspect, Claude is asked to assess the single capture. 3. If Claude agrees it looks like a flicker event, the test pauses and asks the operator to confirm by looking at the display. Confirmed → event is logged, HTML report is written, test STOPS. Not flickering (false alarm) → event is logged, test CONTINUES. 4. A final HTML report is written when the test ends for any reason. VERSION: 0.1 AUTHOR: D. RICE 16/04/2026 © 2026 ARRIVE """ import csv as _csv_mod import html import json import subprocess import time import sys import requests from datetime import datetime from pathlib import Path import math import anthropic import vxi11 from dotenv import load_dotenv import ai_mgmt import rigol_scope from csv_preprocessor import (analyze_lp_file, LPMetrics, HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS) load_dotenv(Path(__file__).parent / ".env") # --------------------------------------------------------------------------- # Configuration (same as mipi_test.py) # --------------------------------------------------------------------------- DEVICE_BASE = "http://192.168.45.8:5000" URL = f"{DEVICE_BASE}/display" SCOPE_IP = "192.168.45.4" PSU_IP = "192.168.45.3" # Pass 1 — signal quality (HS differential, rise/fall) SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window SIG_POINTS = 500_000 # Pass 2 — protocol/frame structure (HS differential, jitter/freq) PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window PROTO_POINTS = 500_000 # Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11) LP_SCALE = 500e-9 # 500 ns/div → 5 µs window LP_POINTS = 200_000 LP_V_SCALE = 0.2 # V/div LP_V_OFFSET = 0.6 # V — centres display between LP-00 (0 V) and LP-11 (1.2 V) LP_TRIG_LEVEL = 0.6 # V — catches LP-11 → LP-01 falling edge DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" # Persistent logs (shared paths — consistent with mipi_test.py / analyze_captures.py) FLICKER_LOG = REPORTS_DIR / "flicker_log.csv" INTERACTIVE_LOG = REPORTS_DIR / "interactive_log.csv" # Claude model for per-capture flicker assessment CLAUDE_MODEL = "claude-opus-4-6" # --------------------------------------------------------------------------- # D-PHY timing calculation # --------------------------------------------------------------------------- # Ordered field list used for table formatting and u-boot output _TIMING_FIELD_ORDER = [ "lpx", "hs_prepare", "hs_zero", "hs_trail", "hs_exit", "clk_prepare", "clk_zero", "clk_post", "clk_trail", ] # Maps field names → flb_dtovar property names (NXP i.MX 8M Mini Samsung DSIM driver) _EXTRA_PROP_MAP = { "lpx": "dsi-phy-extra-lpx", "hs_prepare": "dsi-phy-extra-hs-prepare", "hs_zero": "dsi-phy-extra-hs-zero", "hs_trail": "dsi-phy-extra-hs-trail", "hs_exit": "dsi-phy-extra-hs-exit", "clk_prepare": "dsi-phy-extra-clk-prepare", "clk_zero": "dsi-phy-extra-clk-zero", "clk_post": "dsi-phy-extra-clk-post", "clk_trail": "dsi-phy-extra-clk-trail", } def calculate_dphy_timing(pixel_clock_mhz: float, extra_cycles: dict | None = None) -> dict: """ Calculate Samsung DSIM PHY timing register values for a given pixel clock. Assumes RGB888 (24 bpp), 4 DSI lanes — NXP i.MX 8M Mini / Ampire 1280×800. Timing constraints from MIPI D-PHY v1.1 Table 14. extra_cycles: optional dict mapping field names to additional cycles above the Round-Up minimum, e.g. {'clk_zero': 3, 'hs_prepare': 1, 'hs_trail': 1}. Samsung DSIM register packing: PHY_TIMING (0xb4): (lpx << 8) | hs_exit PHY_TIMING1 (0xb8): (clk_prepare << 24) | (clk_zero << 16) | (clk_post << 8) | clk_trail PHY_TIMING2 (0xbc): (hs_prepare << 16) | (hs_zero << 8) | hs_trail Returns a dict with: pixel_clock_mhz, bit_rate_mbps, byte_clock_mhz, byte_period_ns, ui_ns, fields, registers, violations. """ bpp = 24 lanes = 4 extras = extra_cycles or {} bit_rate_mbps = pixel_clock_mhz * bpp / lanes # Mbit/s per lane byte_clock_mhz = bit_rate_mbps / 8 # MHz byte_period_ns = 1000.0 / byte_clock_mhz # ns per byte-clock cycle ui_ns = 1000.0 / bit_rate_mbps # ns per UI def _ru(t_ns: float) -> int: return max(1, math.ceil(t_ns / byte_period_ns)) def _rb(t_ns: float) -> int: return max(1, round(t_ns / byte_period_ns)) def _field(name: str, min_ns: float, max_ns: float | None = None) -> dict: extra = extras.get(name, 0) ru = _ru(min_ns) rb = _rb(min_ns) final = ru + extra return { "min_ns": min_ns, "max_ns": max_ns, "round_best": rb, "round_up": ru, "extra": extra, "final": final, "actual_ns": final * byte_period_ns, } fields: dict = {} # LPX ≥ 50 ns fields["lpx"] = _field("lpx", 50.0) # HS-PREPARE: 40+4UI to 85+6UI ns hs_p_min = 40.0 + 4 * ui_ns hs_p_max = 85.0 + 6 * ui_ns fields["hs_prepare"] = _field("hs_prepare", hs_p_min, hs_p_max) hs_p_actual = fields["hs_prepare"]["actual_ns"] # HS-ZERO: combined hs_prepare + hs_zero ≥ 145+10UI ns hs_z_combined_min = 145.0 + 10 * ui_ns hs_z_min = max(1.0, hs_z_combined_min - hs_p_actual) fields["hs_zero"] = _field("hs_zero", hs_z_min) fields["hs_zero"]["combined_min_ns"] = hs_z_combined_min # HS-TRAIL ≥ max(8UI, 60+4UI) ns fields["hs_trail"] = _field("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns)) # HS-EXIT ≥ 100 ns fields["hs_exit"] = _field("hs_exit", 100.0) # CLK-PREPARE: 38 to 95 ns fields["clk_prepare"] = _field("clk_prepare", 38.0, 95.0) clk_p_actual = fields["clk_prepare"]["actual_ns"] # CLK-ZERO: combined clk_prepare + clk_zero ≥ 300 ns clk_z_combined_min = 300.0 clk_z_min = max(1.0, clk_z_combined_min - clk_p_actual) fields["clk_zero"] = _field("clk_zero", clk_z_min) fields["clk_zero"]["combined_min_ns"] = clk_z_combined_min # CLK-POST ≥ 60+52UI ns fields["clk_post"] = _field("clk_post", 60.0 + 52 * ui_ns) # CLK-TRAIL ≥ max(12UI, 60) ns fields["clk_trail"] = _field("clk_trail", max(12 * ui_ns, 60.0)) # ── Register packing ────────────────────────────────────────────────── def _f(name: str) -> int: return fields[name]["final"] phy_timing = (_f("lpx") << 8) | _f("hs_exit") phy_timing1 = ((_f("clk_prepare") << 24) | (_f("clk_zero") << 16) | (_f("clk_post") << 8) | _f("clk_trail")) phy_timing2 = ((_f("hs_prepare") << 16) | (_f("hs_zero") << 8) | _f("hs_trail")) registers = { "PHY_TIMING": {"addr": 0xb4, "value": phy_timing}, "PHY_TIMING1": {"addr": 0xb8, "value": phy_timing1}, "PHY_TIMING2": {"addr": 0xbc, "value": phy_timing2}, } # ── Constraint checks ───────────────────────────────────────────────── violations: list[str] = [] def _chk(name: str, lo: float, hi: float | None = None) -> None: a = fields[name]["actual_ns"] if a < lo: violations.append(f"{name}: {a:.2f} ns < {lo:.2f} ns (min)") if hi is not None and a > hi: violations.append(f"{name}: {a:.2f} ns > {hi:.2f} ns (max)") _chk("lpx", 50.0) _chk("hs_prepare", hs_p_min, hs_p_max) hs_combined = fields["hs_prepare"]["actual_ns"] + fields["hs_zero"]["actual_ns"] if hs_combined < hs_z_combined_min: violations.append( f"hs_prepare+hs_zero: {hs_combined:.2f} ns < {hs_z_combined_min:.2f} ns (min)") _chk("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns)) _chk("hs_exit", 100.0) _chk("clk_prepare", 38.0, 95.0) clk_combined = fields["clk_prepare"]["actual_ns"] + fields["clk_zero"]["actual_ns"] if clk_combined < clk_z_combined_min: violations.append( f"clk_prepare+clk_zero: {clk_combined:.2f} ns < {clk_z_combined_min:.2f} ns (min)") _chk("clk_post", 60.0 + 52 * ui_ns) _chk("clk_trail", max(12 * ui_ns, 60.0)) return { "pixel_clock_mhz": pixel_clock_mhz, "bit_rate_mbps": bit_rate_mbps, "byte_clock_mhz": byte_clock_mhz, "byte_period_ns": byte_period_ns, "ui_ns": ui_ns, "fields": fields, "registers": registers, "violations": violations, } def format_timing_table(t: dict) -> str: """Return a fixed-width console table of all D-PHY timing fields.""" fields = t["fields"] hdr = (f"{'Field':<14} {'Spec (ns)':<18} {'Rnd Best':>9} " f"{'Rnd Up':>7} {'Extra':>6} {'Final':>6} {'Actual (ns)':>12} Status") sep = "-" * len(hdr) lines = [sep, hdr, sep] for name in _TIMING_FIELD_ORDER: f = fields[name] min_ns = f["min_ns"] max_ns = f.get("max_ns") actual = f["actual_ns"] spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None else f">= {min_ns:.1f}") ok = actual >= min_ns if max_ns is not None and actual > max_ns: ok = False if name == "hs_zero": comb = fields["hs_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) if name == "clk_zero": comb = fields["clk_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) status = "OK" if ok else "FAIL" lines.append( f"{name:<14} {spec_str:<18} {f['round_best']:>9} " f"{f['round_up']:>7} +{f['extra']:>5} {f['final']:>6} {actual:>12.2f} {status}" ) lines.append(sep) return "\n".join(lines) def format_uboot_commands(t: dict) -> str: """Return u-boot shell commands to apply the computed PHY timing configuration.""" fields = t["fields"] regs = t["registers"] fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER} phy_t = regs["PHY_TIMING"]["value"] phy_t1 = regs["PHY_TIMING1"]["value"] phy_t2 = regs["PHY_TIMING2"]["value"] non_zero_extras = [ (_EXTRA_PROP_MAP[name], fields[name]["extra"]) for name in _TIMING_FIELD_ORDER if fields[name]["extra"] > 0 ] lines = [ f"# D-PHY PHY timing registers " f"(pixel clock {t['pixel_clock_mhz']} MHz, " f"{t['bit_rate_mbps']:.1f} Mbit/s, " f"byte clock {t['byte_clock_mhz']:.3f} MHz)", "#", f"# PHY_TIMING (0xb4) = 0x{phy_t:08x} " f"lpx={fv['lpx']} hs_exit={fv['hs_exit']}", f"# PHY_TIMING1 (0xb8) = 0x{phy_t1:08x} " f"clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']} " f"clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}", f"# PHY_TIMING2 (0xbc) = 0x{phy_t2:08x} " f"hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']} " f"hs_trail={fv['hs_trail']}", "", "# Enable Round-Up rounding (dsi-tweak bit 2)", 'setenv flb_dtovar "${flb_dtovar} dsi-tweak=4"', ] if non_zero_extras: lines += ["", "# Extra PHY cycles above Round-Up minimum"] for prop, val in non_zero_extras: lines.append(f'setenv flb_dtovar "${{flb_dtovar}} {prop}={val}"') lines += ["", "saveenv", "boot"] return "\n".join(lines) def prompt_for_config() -> dict: """ Interactive startup prompt: ask for pixel clock and optional extra PHY cycles. Prints timing table and u-boot commands, then returns a config dict. """ print("\n" + "=" * 64) print(" D-PHY TIMING CONFIGURATION") print("=" * 64) print("RGB888, 4 DSI lanes, Samsung DSIM on NXP i.MX 8M Mini.") print("Timing constraints: MIPI D-PHY v1.1 Table 14.\n") # ── Pixel clock ─────────────────────────────────────────────────────── while True: try: pix_str = input("Enter pixel clock in MHz (e.g. 72.0): ").strip() pixel_clock_mhz = float(pix_str) if not (10.0 <= pixel_clock_mhz <= 300.0): print(" Out of range — enter a value between 10 and 300.") continue break except ValueError: print(" Invalid — enter a number (e.g. 72.0).") # ── Baseline (no extras) ────────────────────────────────────────────── t_base = calculate_dphy_timing(pixel_clock_mhz) print(f"\nPixel clock : {pixel_clock_mhz} MHz") print(f"Bit rate : {t_base['bit_rate_mbps']:.1f} Mbit/s per lane") print(f"Byte clock : {t_base['byte_clock_mhz']:.3f} MHz " f"({t_base['byte_period_ns']:.3f} ns/byte)") print(f"UI : {t_base['ui_ns']:.3f} ns\n") print(format_timing_table(t_base)) if t_base["violations"]: print("\n *** BASELINE VIOLATIONS ***") for v in t_base["violations"]: print(f" ! {v}") else: print("\n All D-PHY v1.1 Table 14 constraints satisfied at baseline.") # ── Extra cycles ────────────────────────────────────────────────────── print("\n--- Extra PHY cycles (added on top of Round-Up minimum) ---") print("Suggested values for SN65DSI83 reliability (from dev testing):") print(" clk_zero +3, hs_prepare +1, hs_trail +1 (others 0)") print("Press Enter to accept 0 for each field.\n") extras: dict[str, int] = {} for name in _TIMING_FIELD_ORDER: while True: try: val = input(f" Extra cycles for {name:<12s} [0]: ").strip() extras[name] = int(val) if val else 0 if extras[name] < 0: print(" Cannot be negative.") continue break except ValueError: print(" Enter an integer (or press Enter for 0).") # ── Final calculation with extras ───────────────────────────────────── t_final = calculate_dphy_timing(pixel_clock_mhz, extras) if any(v > 0 for v in extras.values()): print("\n--- Final timing (with extras) ---") print(format_timing_table(t_final)) if t_final["violations"]: print("\n *** VIOLATIONS WITH EXTRAS ***") for v in t_final["violations"]: print(f" ! {v}") else: print("\n All constraints satisfied.") print("\n--- u-boot commands ---") print(format_uboot_commands(t_final)) print() return { "pixel_clock_mhz": pixel_clock_mhz, "extras": extras, "timing": t_final, } # --------------------------------------------------------------------------- # Instrument connection # --------------------------------------------------------------------------- try: psu = vxi11.Instrument(PSU_IP) scope = vxi11.Instrument(SCOPE_IP) scope.timeout = 30 psu.timeout = 5 except Exception as e: print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}") sys.exit(1) rigol_scope.connect() # --------------------------------------------------------------------------- # Scope configuration (identical to mipi_test.py) # --------------------------------------------------------------------------- def setup_scope(): """Initialises scope for MIPI DSI signals (~210 MHz).""" print("CONFIGURING SCOPE...") cmds = [ "*RST", ":RUN", ":STOP", # Channel 1 — Clock D+ ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", ":CHANnel1:SCALe 0.1", ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'", # Channel 2 — Clock D- ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", ":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'", # Channel 3 — Data Lane 0 D+ ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2", ":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'", # Channel 4 — Data Lane 0 D- ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2", ":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'", # Timebase ":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer", # Trigger — rising edge on Ch1 (Clock D+) ":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1", ":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05", ":TRIGger:SWEep NORMal", # Acquisition ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000", ":DISPlay:LAYout STACKED", ":RUN", ] for cmd in cmds: scope.write(cmd) time.sleep(0.05) print("CHANNEL SETUP COMPLETE.") setup_math_channels() def setup_math_channels(): """F1 = Ch1−Ch2 (clock differential), F2 = Ch3−Ch4 (lane 0 differential).""" print("SETTING UP MATH CHANNELS...") scope.write("*CLS") time.sleep(0.2) for cmd in [ ":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2", ":FUNCtion1:RANGe 0.8", ":FUNCtion1:OFFSet 0.0", ":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4", ":FUNCtion2:RANGe 0.8", ":FUNCtion2:OFFSet 0.0", ]: scope.write(cmd) time.sleep(0.2) try: time.sleep(1.0) opc = scope.ask("*OPC?") print(f" SCOPE SYNC OK (OPC={opc.strip()})") except Exception as e: print(f" WARNING: OPC SYNC FAILED ({e})") try: err = scope.ask(":SYSTem:ERRor?") if err.strip().startswith("0"): print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.") print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)") else: print(f" SCPI ERROR: {err.strip()}") except Exception as e: print(f" COULD NOT READ ERROR QUEUE ({e})") def _set_timebase(scale, points): scope.write(f":TIMebase:SCALe {scale:.3E}") scope.write(f":ACQuire:POINts {points}") time.sleep(0.3) def _arm_and_wait(timeout=20): prev_timeout = scope.timeout try: scope.timeout = timeout + 5 scope.write(":DIGitize") return scope.ask("*OPC?").strip() == "1" except Exception as e: print(f" ACQUIRE ERROR: {e}") return False finally: scope.timeout = prev_timeout def _save_pass(tag, iteration, ts): """Save F1 (CLK diff) and F2 (DAT diff) as CSV.""" base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _save_pass_channels(tag, iteration, ts): """Save Ch1 (CLK+) and Ch3 (DAT0+) single-ended for LP state analysis.""" base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}" try: scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') time.sleep(2.5) scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') time.sleep(2.5) print(f" SAVED: {base}_clk.csv {base}_dat.csv") except Exception as e: print(f" SAVE ERROR ({tag}): {e}") def _configure_for_lp(): """Widen vertical range for LP states and switch to falling-edge trigger.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") time.sleep(0.05) scope.write(":TRIGger:EDGE:SOURce CHANnel3") scope.write(":TRIGger:EDGE:SLOPe NEGative") scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") time.sleep(0.1) def _restore_hs_config(): """Restore HS-mode channel scales and trigger after LP capture.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe 0.1") scope.write(f":CHANnel{ch}:OFFSet 0.0") time.sleep(0.05) scope.write(":TRIGger:EDGE:SOURce CHANnel1") scope.write(":TRIGger:EDGE:SLOPe POSitive") scope.write(":TRIGger:EDGE:LEVel 0.05") time.sleep(0.1) def _fetch_registers(ts: str, iteration: int) -> None: """GET /registers from device server and save to data/ as JSON.""" try: resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5) resp.raise_for_status() data = resp.json() if data.get("errors"): print(f" REGISTERS: device warnings — {data['errors']}") DATA_DIR.mkdir(exist_ok=True) reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json" reg_path.write_text(json.dumps(data, indent=2)) print(f" SAVED: {reg_path.name} ({len(data.get('registers', []))} registers)") except requests.exceptions.RequestException as e: print(f" REGISTERS: fetch failed — {e}") except Exception as e: print(f" REGISTERS: error — {e}") def dual_capture(iteration: int) -> str: """ Three-pass capture per test iteration. Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON) Pass 2 — signal quality (HS differential, rise/fall) Pass 3 — frame structure (HS differential, jitter/freq) Returns the timestamp string used in all filenames for this iteration. """ ts = datetime.now().strftime("%Y%m%d_%H%M%S") print(f"CAPTURE #{iteration:04d} [{ts}]") # ── Pass 1: LP / SoT startup ─────────────────────────────────────────── print(" PASS 1: LP STARTUP TRANSITION...") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) if rigol_scope.is_connected(): rigol_scope.arm() if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: print(" SKIPPING LP SAVE.") if rigol_scope.is_connected(): DATA_DIR.mkdir(exist_ok=True) v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" n = rigol_scope.read_waveform_csv(v18_path) if n: print(f" SAVED: {v18_path.name} ({n} samples)") else: print(" RIGOL: Waveform read failed — check connection and probe.") _restore_hs_config() # ── Pass 2: HS signal quality ────────────────────────────────────────── print(" PASS 2: SIGNAL QUALITY...") _set_timebase(SIG_SCALE, SIG_POINTS) if _arm_and_wait(): _save_pass("sig", iteration, ts) else: print(" SKIPPING SIG SAVE.") # ── Pass 3: frame/protocol structure ────────────────────────────────── print(" PASS 3: FRAME STRUCTURE...") _set_timebase(PROTO_SCALE, PROTO_POINTS) if _arm_and_wait(): _save_pass("proto", iteration, ts) else: print(" SKIPPING PROTO SAVE.") # ── DSI register snapshot ───────────────────────────────────────────── _fetch_registers(ts, iteration) # ── Restore default timebase ────────────────────────────────────────── _set_timebase(5e-9, 500_000) scope.write(":RUN") return ts # --------------------------------------------------------------------------- # Per-iteration LP analysis + Claude assessment # --------------------------------------------------------------------------- def _build_system_prompt(config: dict | None = None) -> str: """Build the Claude system prompt with correct bit-rate values for the active config.""" if config and "timing" in config: t = config["timing"] bit_rate = t["bit_rate_mbps"] byte_clk = t["byte_clock_mhz"] byte_per = t["byte_period_ns"] else: bit_rate = 432.0 byte_clk = 54.0 byte_per = 18.518 return ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will receive a pre-processed summary of a single LP capture from a MIPI DAT0 lane " "(NXP i.MX 8M Mini Samsung DSIM IP driving a SN65DSI83 MIPI-to-LVDS bridge). " f"HS bit rate: {bit_rate:.1f} Mbit/s. " f"Byte clock: {byte_clk:.3f} MHz ({byte_per:.3f} ns/byte). " "The LP-low plateau (LP-01/LP-00 SoT preamble) must be ≥ 50 ns for the SN65DSI83 " "to detect start-of-transmission. A plateau shorter than 50 ns or absent means the " "bridge missed the SoT and the display will flicker visibly. " "Be decisive. Answer YES or NO on the first line." ) def _build_claude_prompt(ts: str, iteration: int, lp_summaries: list[str], suspects: list[LPMetrics], config: dict | None = None) -> str: """ Build a concise prompt asking Claude to assess a single capture. The rule-based pre-filter has already flagged at least one LP suspect. """ suspect_lines = "\n".join( f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns " f"(spec ≥ 50 ns) lp11_to_hs={m.lp11_to_hs_ns} ns " f"lp11_voltage={m.lp11_voltage_v} V " f"hs_amplitude={m.hs_amplitude_mv} mV (normal 105–122 mV; absent <50 mV)" for m in suspects ) summaries_text = "\n\n".join(lp_summaries) config_text = "" if config and "timing" in config: t = config["timing"] config_text = ( f"\n\nTest configuration: pixel clock {t['pixel_clock_mhz']} MHz, " f"bit rate {t['bit_rate_mbps']:.1f} Mbit/s per lane, " f"byte clock {t['byte_clock_mhz']:.3f} MHz " f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns." ) return ( f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n" f"The rule-based LP pre-processor has flagged the following measurements as " f"potential flicker suspects because the LP-low plateau is absent or shorter " f"than 50 ns:\n{suspect_lines}\n\n" f"Full LP capture summaries:\n{summaries_text}" f"{config_text}\n\n" f"Based solely on these LP timing metrics, do you believe this capture " f"represents a genuine screen flicker event — i.e., was the SoT sequence " f"too brief for the SN65DSI83 bridge to detect start-of-transmission, " f"likely causing visible display flicker?\n\n" f"Start your response with YES or NO on the first line, then explain your " f"reasoning briefly (2–4 sentences) referencing the specific metric values." ) def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None: """Append a flicker suspect to the shared flicker_log.csv.""" FLICKER_LOG.parent.mkdir(exist_ok=True) write_header = not FLICKER_LOG.exists() with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f: w = _csv_mod.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "channel", "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"]) w.writerow([ datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ts, f"{iteration:04d}", m.channel, m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v, ]) def analyze_lp_and_ask_claude( ts: str, iteration: int, config: dict | None = None ) -> tuple[bool, str, list[LPMetrics]]: """ Analyse the LP files for this iteration. 1. Run csv_preprocessor on lp_clk and lp_dat. 2. If any file is flagged as a flicker suspect by the rule-based detector, call the Claude API for a focused single-capture assessment. 3. Parse Claude's YES/NO response. Returns: claude_says_flicker — True if Claude opened with YES reasoning — Claude's full response text (or "" if not called) suspects — list of LPMetrics objects that were flagged """ lp_summaries: list[str] = [] suspects: list[LPMetrics] = [] for channel in ("clk", "dat"): path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv" if not path.exists(): print(f" LP ANALYSIS: {path.name} not found — skipping.") continue try: m = analyze_lp_file(path) lp_summaries.append(m.summary()) if m.flicker_suspect: suspects.append(m) _append_flicker_log(ts, iteration, m) if (m.hs_amplitude_mv is not None and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV and m.lp11_to_hs_ns is not None and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS): reason = f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)" else: reason = f"lp_low={m.lp_low_duration_ns} ns" print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} " f"channel={m.channel} {reason} ***\n") except Exception as e: print(f" LP ANALYSIS ERROR ({channel}): {e}") if not suspects: return False, "", [] # ── Call Claude ──────────────────────────────────────────────────────── print(" CALLING CLAUDE FOR ASSESSMENT...") try: client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 512, system = _build_system_prompt(config), messages = [{"role": "user", "content": _build_claude_prompt(ts, iteration, lp_summaries, suspects, config)}], ) response = message.content[0].text.strip() # Parse YES/NO from the first line first_line = response.splitlines()[0].strip().upper() claude_says_flicker = first_line.startswith("YES") label = "FLICKER" if claude_says_flicker else "NOT FLICKER" print(f" CLAUDE: {label} ({message.usage.input_tokens} in / " f"{message.usage.output_tokens} out tokens)") return claude_says_flicker, response, suspects except Exception as e: print(f" CLAUDE API ERROR: {e}") # Fall back to the rule-based result so the operator still gets asked fallback = (f"(Claude API unavailable: {e})\n" f"Rule-based detector flagged LP-low plateau < 50 ns — " f"treat as potential flicker suspect.") return True, fallback, suspects # --------------------------------------------------------------------------- # Event logging and HTML report # --------------------------------------------------------------------------- def _config_section_html(config: dict) -> str: """Generate the D-PHY configuration section for the HTML report.""" t = config["timing"] fields = t["fields"] regs = t["registers"] rows_html = "" for name in _TIMING_FIELD_ORDER: f = fields[name] min_ns = f["min_ns"] max_ns = f.get("max_ns") actual = f["actual_ns"] ok = actual >= min_ns if max_ns is not None and actual > max_ns: ok = False if name == "hs_zero": comb = fields["hs_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) if name == "clk_zero": comb = fields["clk_prepare"]["actual_ns"] + actual ok = ok and (comb >= f.get("combined_min_ns", 0)) spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None else f"≥ {min_ns:.1f}") cell_style = "" if ok else ' style="color:#c62828;font-weight:bold"' status_html = "✓" if ok else '✖ FAIL' rows_html += ( f"" f"{name}" f"{spec_str}" f"{f['round_best']}" f"{f['round_up']}" f"+{f['extra']}" f"{f['final']}" f"{actual:.2f}" f"{status_html}" f"\n" ) if t["violations"]: items_html = "".join(f"
  • {html.escape(v)}
  • " for v in t["violations"]) viol_html = ( f'

    Timing violations:

    ' f"" ) else: viol_html = ( '

    ✓ All D-PHY v1.1 Table 14 ' "constraints satisfied.

    " ) fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER} phy_t = regs["PHY_TIMING"]["value"] phy_t1 = regs["PHY_TIMING1"]["value"] phy_t2 = regs["PHY_TIMING2"]["value"] uboot = html.escape(format_uboot_commands(t)) return f"""

    D-PHY Configuration

    Pixel clock: {t['pixel_clock_mhz']} MHz  |  Bit rate: {t['bit_rate_mbps']:.1f} Mbit/s per lane  |  Byte clock: {t['byte_clock_mhz']:.3f} MHz ({t['byte_period_ns']:.3f} ns/byte)  |  UI: {t['ui_ns']:.3f} ns

    {rows_html}
    FieldSpec (ns)Rnd BestRnd Up ExtraFinalActual (ns)Status
    {viol_html}

    Samsung DSIM Registers

    RegisterAddressValueField breakdown
    PHY_TIMING0xb4 0x{phy_t:08x} lpx={fv['lpx']}   hs_exit={fv['hs_exit']}
    PHY_TIMING10xb8 0x{phy_t1:08x} clk_prepare={fv['clk_prepare']}   clk_zero={fv['clk_zero']}   clk_post={fv['clk_post']}   clk_trail={fv['clk_trail']}
    PHY_TIMING20xbc 0x{phy_t2:08x} hs_prepare={fv['hs_prepare']}   hs_zero={fv['hs_zero']}   hs_trail={fv['hs_trail']}

    u-boot Commands

    {uboot}
    """ def _log_interaction( events: list, ts: str, iteration: int, suspects: list[LPMetrics], claude_said: bool, user_confirmed: bool | None, reasoning: str, ) -> None: """ Append an event to the in-memory list and to interactive_log.csv. user_confirmed values: True — operator confirmed flicker False — operator said no (false alarm) None — operator was not asked (Claude said no) """ now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") events.append({ "ts": ts, "iteration": iteration, "suspects": suspects, "claude_said": claude_said, "user_confirmed": user_confirmed, "reasoning": reasoning, "logged_at": now_str, }) INTERACTIVE_LOG.parent.mkdir(exist_ok=True) write_header = not INTERACTIVE_LOG.exists() with open(INTERACTIVE_LOG, "a", newline="", encoding="utf-8") as f: w = _csv_mod.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "claude_said_flicker", "user_confirmed", "lp_low_ns", "reasoning_summary"]) for m in suspects: w.writerow([ now_str, ts, f"{iteration:04d}", "YES" if claude_said else "NO", ("YES" if user_confirmed is True else "NO" if user_confirmed is False else "NOT_ASKED"), m.lp_low_duration_ns, reasoning[:150].replace("\n", " "), ]) def save_report(events: list, stop_reason: str, config: dict | None = None) -> Path: """Write a timestamped HTML report summarising all flicker interactions.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S_interactive.html") path = REPORTS_DIR / filename confirmed_n = sum(1 for e in events if e["user_confirmed"] is True) false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False) claude_no_n = sum(1 for e in events if e["user_confirmed"] is None) # ── Event table rows ─────────────────────────────────────────────────── rows_html = "" for e in events: for m in e["suspects"]: conf = e["user_confirmed"] if conf is True: badge = ('' '✖ CONFIRMED FLICKER') elif conf is False: badge = ('' '✓ FALSE ALARM') else: badge = ('' 'Claude said NO — user not asked') lp_val = m.lp_low_duration_ns lp_bad = lp_val is None or lp_val < 50 lp_cell = (f'{lp_val} ns' if lp_bad else f'{lp_val} ns') rows_html += ( f"" f"{e['iteration']:04d}" f"{e['ts']}" f"{m.channel}" f"{lp_cell}" f"{m.lp11_to_hs_ns} ns" f"{m.lp11_voltage_v} V" f"{'YES' if e['claude_said'] else 'NO'}" f"{badge}" f"" ) table_html = ( '

    No flicker suspects were detected during this test run.

    ' if not rows_html else f""" {rows_html}
    CaptureTimestampChannel LP-low plateauLP exit→HSLP-11 voltage Claude: flicker?Outcome
    """ ) # ── D-PHY config section ─────────────────────────────────────────────── config_html = _config_section_html(config) if config else "" # ── Claude reasoning sections ────────────────────────────────────────── reasoning_html = "" for e in events: if not e["reasoning"] or not e["claude_said"]: continue conf = e["user_confirmed"] label = (" — CONFIRMED FLICKER" if conf is True else " — FALSE ALARM" if conf is False else "") reasoning_html += ( f'

    Capture {e["iteration"]:04d} [{e["ts"]}]{html.escape(label)}

    ' f'
    '
                f'{html.escape(e["reasoning"])}
    ' ) html_content = f""" MIPI Interactive Flicker Test — {now.strftime('%Y-%m-%d %H:%M:%S')}

    MIPI Interactive Flicker Test Report

    Generated: {now.strftime('%Y-%m-%d %H:%M:%S')}  |  Model: {CLAUDE_MODEL}

    Stop reason: {html.escape(stop_reason)}
    {confirmed_n} confirmed flicker(s)
    {false_alarm_n} false alarm(s)
    {claude_no_n} Claude said no
    {config_html}

    Event Log

    {table_html} {'

    Claude Assessments

    ' + reasoning_html if reasoning_html else ''} """ path.write_text(html_content, encoding="utf-8") return path # --------------------------------------------------------------------------- # Interactive test loop # --------------------------------------------------------------------------- def run_interactive_test() -> None: """ Blocking interactive test loop (runs in the calling thread — no background threads). Flow per iteration: 1. Display ON 2. Three-pass dual_capture → saves LP/sig/proto CSVs to scope 3. Transfer all scope CSVs to local data/ 4. Analyse LP files for this iteration (rule-based + Claude if suspect) 5. If Claude says flicker: - Keep display ON so operator can observe - Ask operator to confirm - Confirmed → log event, save report, STOP - False alarm → log event, continue 6. Display OFF, 1 s pause, next iteration Press Ctrl+C to exit at any time. A report is always saved on exit. """ iteration = 1 events: list = [] stop_reason = "Test stopped by user" # ── Pixel clock configuration ────────────────────────────────────────── config = prompt_for_config() print("\nINTERACTIVE FLICKER TEST STARTED.") print("Each iteration: display ON → 3-pass capture → LP analysis → Claude check.") print("The display stays ON while Claude and the operator assess the frame.") print("Press Ctrl+C at any time to stop.\n") try: while True: # ── Display ON ───────────────────────────────────────────────── try: requests.put(URL, json={"state": "on"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display ON failed: {e}") # ── Three-pass capture ───────────────────────────────────────── ts = dual_capture(iteration) # ── Transfer scope files to local data/ ──────────────────────── print(" TRANSFERRING FILES FROM SCOPE...") try: copied, failed = ai_mgmt.transfer_csv_files() print(f" TRANSFERRED {copied} FILE(S). {failed} FAILED.") except Exception as e: print(f" TRANSFER ERROR: {e}") # ── Analyse LP + ask Claude if suspect ───────────────────────── claude_flicker, reasoning, suspects = analyze_lp_and_ask_claude( ts, iteration, config) if claude_flicker: # ── Keep display ON — ask operator ───────────────────────── # Play alarm sound once to alert the operator subprocess.run( ["pw-play", "/usr/share/sounds/freedesktop/stereo/alarm-clock-elapsed.oga"], check=False, ) print("\n" + "=" * 64) print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW") print("=" * 64) print(f"\n{reasoning}\n") confirmed: bool | None = None while confirmed is None: try: ans = input("IS THE SCREEN ACTUALLY FLICKERING? (Y/N): ").strip().upper() except EOFError: ans = "N" if ans in ("Y", "YES"): confirmed = True elif ans in ("N", "NO"): confirmed = False else: print(" Please enter Y or N.") _log_interaction(events, ts, iteration, suspects, claude_flicker, confirmed, reasoning) if confirmed: print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.") stop_reason = (f"Flicker confirmed by operator at " f"capture {iteration:04d} [{ts}]") # Display OFF try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: pass break # exit the while loop → save report below else: print(" NOT FLICKERING — false alarm logged. Continuing.\n") elif suspects: # Rule-based suspect but Claude said no — record for reference _log_interaction(events, ts, iteration, suspects, False, None, reasoning) # ── Display OFF, brief pause before next iteration ───────────── try: requests.put(URL, json={"state": "off"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display OFF failed: {e}") iteration += 1 time.sleep(1.0) except KeyboardInterrupt: print("\n\n TEST INTERRUPTED (Ctrl+C).") stop_reason = "Test interrupted by operator (Ctrl+C)" try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: pass # ── Always save a report on exit ─────────────────────────────────────── report_path = save_report(events, stop_reason, config) print(f"\nREPORT SAVED: {report_path}") if events: confirmed_n = sum(1 for e in events if e["user_confirmed"] is True) false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False) print(f" {confirmed_n} confirmed flicker(s), {false_alarm_n} false alarm(s) " f"({len(events)} total suspect(s) assessed)") # --------------------------------------------------------------------------- # Menu # --------------------------------------------------------------------------- def main_menu() -> None: while True: print("\n===== MIPI INTERACTIVE TEST CONTROL =====") print("1. RUN IDN CHECK (PSU & SCOPE)") print("2. SETUP SCOPE (RUN FIRST)") print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") print("5. START INTERACTIVE FLICKER TEST") print("6. EXIT") choice = input("\nSELECT OPTION (1-6): ").strip() if choice == '1': print(f"PSU : {psu.ask('*IDN?').strip()}") print(f"SCOPE: {scope.ask('*IDN?').strip()}") if rigol_scope.is_connected(): print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}") else: print("RIGOL: NOT CONNECTED") elif choice == '2': setup_scope() if rigol_scope.is_connected(): rigol_scope.configure() elif choice == '3': psu.write('CH1:VOLT 24.0') psu.write('CH1:CURR 1.5') print("PSU CONFIGURED: 24V / 1.5A") elif choice == '4': state = input("TYPE 'ON' OR 'OFF': ").strip().upper() if state in ('ON', 'OFF'): psu.write(f'OUTP CH1,{state}') print(f"PSU OUTPUT {state}.") else: print("INVALID — TYPE 'ON' OR 'OFF'.") elif choice == '5': run_interactive_test() elif choice == '6': psu.close() scope.close() rigol_scope.disconnect() print("INSTRUMENTS CLOSED. BYE.") break else: print("INVALID ENTRY. PLEASE CHOOSE 1-6.") if __name__ == "__main__": main_menu()