2026-04-20 10:35:06 +01:00
|
|
|
|
#!/usr/bin/env python3
|
2026-04-16 11:23:25 +01:00
|
|
|
|
"""
|
|
|
|
|
|
MIPI TEST APPLICATION - MIPI_TEST_INTERACTIVE.PY
|
|
|
|
|
|
Interactive flicker confirmation test.
|
|
|
|
|
|
|
|
|
|
|
|
Same three-pass capture sequence as mipi_test.py. After each iteration:
|
|
|
|
|
|
1. LP files are transferred from the scope immediately and analysed.
|
|
|
|
|
|
2. If the rule-based LP pre-processor flags a flicker suspect, Claude is
|
|
|
|
|
|
asked to assess the single capture.
|
|
|
|
|
|
3. If Claude agrees it looks like a flicker event, the test pauses and
|
|
|
|
|
|
asks the operator to confirm by looking at the display.
|
|
|
|
|
|
Confirmed → event is logged, HTML report is written, test STOPS.
|
|
|
|
|
|
Not flickering (false alarm) → event is logged, test CONTINUES.
|
|
|
|
|
|
4. A final HTML report is written when the test ends for any reason.
|
|
|
|
|
|
|
|
|
|
|
|
VERSION: 0.1
|
|
|
|
|
|
AUTHOR: D. RICE 16/04/2026
|
|
|
|
|
|
© 2026 ARRIVE
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import csv as _csv_mod
|
|
|
|
|
|
import html
|
|
|
|
|
|
import json
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
import time
|
|
|
|
|
|
import sys
|
|
|
|
|
|
import requests
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
|
|
import anthropic
|
|
|
|
|
|
import vxi11
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
|
|
|
|
|
|
import ai_mgmt
|
|
|
|
|
|
import rigol_scope
|
|
|
|
|
|
from csv_preprocessor import (analyze_lp_file, LPMetrics,
|
2026-04-20 10:35:06 +01:00
|
|
|
|
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
|
2026-04-16 11:23:25 +01:00
|
|
|
|
|
|
|
|
|
|
load_dotenv(Path(__file__).parent / ".env")
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Configuration (same as mipi_test.py)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
DEVICE_BASE = "http://192.168.45.8:5000"
|
|
|
|
|
|
URL = f"{DEVICE_BASE}/display"
|
|
|
|
|
|
SCOPE_IP = "192.168.45.4"
|
|
|
|
|
|
PSU_IP = "192.168.45.3"
|
|
|
|
|
|
|
|
|
|
|
|
# Pass 1 — signal quality (HS differential, rise/fall)
|
|
|
|
|
|
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
|
|
|
|
|
|
SIG_POINTS = 500_000
|
|
|
|
|
|
|
|
|
|
|
|
# Pass 2 — protocol/frame structure (HS differential, jitter/freq)
|
|
|
|
|
|
PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window
|
|
|
|
|
|
PROTO_POINTS = 500_000
|
|
|
|
|
|
|
|
|
|
|
|
# Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11)
|
|
|
|
|
|
LP_SCALE = 500e-9 # 500 ns/div → 5 µs window
|
|
|
|
|
|
LP_POINTS = 200_000
|
|
|
|
|
|
LP_V_SCALE = 0.2 # V/div
|
|
|
|
|
|
LP_V_OFFSET = 0.6 # V — centres display between LP-00 (0 V) and LP-11 (1.2 V)
|
|
|
|
|
|
LP_TRIG_LEVEL = 0.6 # V — catches LP-11 → LP-01 falling edge
|
|
|
|
|
|
|
|
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
|
|
|
|
REPORTS_DIR = Path(__file__).parent / "reports"
|
|
|
|
|
|
|
|
|
|
|
|
# Persistent logs (shared paths — consistent with mipi_test.py / analyze_captures.py)
|
|
|
|
|
|
FLICKER_LOG = REPORTS_DIR / "flicker_log.csv"
|
|
|
|
|
|
INTERACTIVE_LOG = REPORTS_DIR / "interactive_log.csv"
|
|
|
|
|
|
|
|
|
|
|
|
# Claude model for per-capture flicker assessment
|
|
|
|
|
|
CLAUDE_MODEL = "claude-opus-4-6"
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# D-PHY timing calculation
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
# Ordered field list used for table formatting and u-boot output
|
|
|
|
|
|
_TIMING_FIELD_ORDER = [
|
|
|
|
|
|
"lpx", "hs_prepare", "hs_zero", "hs_trail", "hs_exit",
|
|
|
|
|
|
"clk_prepare", "clk_zero", "clk_post", "clk_trail",
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
# Maps field names → flb_dtovar property names (NXP i.MX 8M Mini Samsung DSIM driver)
|
|
|
|
|
|
_EXTRA_PROP_MAP = {
|
|
|
|
|
|
"lpx": "dsi-phy-extra-lpx",
|
|
|
|
|
|
"hs_prepare": "dsi-phy-extra-hs-prepare",
|
|
|
|
|
|
"hs_zero": "dsi-phy-extra-hs-zero",
|
|
|
|
|
|
"hs_trail": "dsi-phy-extra-hs-trail",
|
|
|
|
|
|
"hs_exit": "dsi-phy-extra-hs-exit",
|
|
|
|
|
|
"clk_prepare": "dsi-phy-extra-clk-prepare",
|
|
|
|
|
|
"clk_zero": "dsi-phy-extra-clk-zero",
|
|
|
|
|
|
"clk_post": "dsi-phy-extra-clk-post",
|
|
|
|
|
|
"clk_trail": "dsi-phy-extra-clk-trail",
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_dphy_timing(pixel_clock_mhz: float,
|
|
|
|
|
|
extra_cycles: dict | None = None) -> dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Calculate Samsung DSIM PHY timing register values for a given pixel clock.
|
|
|
|
|
|
|
|
|
|
|
|
Assumes RGB888 (24 bpp), 4 DSI lanes — NXP i.MX 8M Mini / Ampire 1280×800.
|
|
|
|
|
|
Timing constraints from MIPI D-PHY v1.1 Table 14.
|
|
|
|
|
|
|
|
|
|
|
|
extra_cycles: optional dict mapping field names to additional cycles above the
|
|
|
|
|
|
Round-Up minimum, e.g. {'clk_zero': 3, 'hs_prepare': 1, 'hs_trail': 1}.
|
|
|
|
|
|
|
|
|
|
|
|
Samsung DSIM register packing:
|
|
|
|
|
|
PHY_TIMING (0xb4): (lpx << 8) | hs_exit
|
|
|
|
|
|
PHY_TIMING1 (0xb8): (clk_prepare << 24) | (clk_zero << 16) | (clk_post << 8) | clk_trail
|
|
|
|
|
|
PHY_TIMING2 (0xbc): (hs_prepare << 16) | (hs_zero << 8) | hs_trail
|
|
|
|
|
|
|
|
|
|
|
|
Returns a dict with: pixel_clock_mhz, bit_rate_mbps, byte_clock_mhz,
|
|
|
|
|
|
byte_period_ns, ui_ns, fields, registers, violations.
|
|
|
|
|
|
"""
|
|
|
|
|
|
bpp = 24
|
|
|
|
|
|
lanes = 4
|
|
|
|
|
|
extras = extra_cycles or {}
|
|
|
|
|
|
|
|
|
|
|
|
bit_rate_mbps = pixel_clock_mhz * bpp / lanes # Mbit/s per lane
|
|
|
|
|
|
byte_clock_mhz = bit_rate_mbps / 8 # MHz
|
|
|
|
|
|
byte_period_ns = 1000.0 / byte_clock_mhz # ns per byte-clock cycle
|
|
|
|
|
|
ui_ns = 1000.0 / bit_rate_mbps # ns per UI
|
|
|
|
|
|
|
|
|
|
|
|
def _ru(t_ns: float) -> int:
|
|
|
|
|
|
return max(1, math.ceil(t_ns / byte_period_ns))
|
|
|
|
|
|
|
|
|
|
|
|
def _rb(t_ns: float) -> int:
|
|
|
|
|
|
return max(1, round(t_ns / byte_period_ns))
|
|
|
|
|
|
|
|
|
|
|
|
def _field(name: str, min_ns: float, max_ns: float | None = None) -> dict:
|
|
|
|
|
|
extra = extras.get(name, 0)
|
|
|
|
|
|
ru = _ru(min_ns)
|
|
|
|
|
|
rb = _rb(min_ns)
|
|
|
|
|
|
final = ru + extra
|
|
|
|
|
|
return {
|
|
|
|
|
|
"min_ns": min_ns,
|
|
|
|
|
|
"max_ns": max_ns,
|
|
|
|
|
|
"round_best": rb,
|
|
|
|
|
|
"round_up": ru,
|
|
|
|
|
|
"extra": extra,
|
|
|
|
|
|
"final": final,
|
|
|
|
|
|
"actual_ns": final * byte_period_ns,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fields: dict = {}
|
|
|
|
|
|
|
|
|
|
|
|
# LPX ≥ 50 ns
|
|
|
|
|
|
fields["lpx"] = _field("lpx", 50.0)
|
|
|
|
|
|
|
|
|
|
|
|
# HS-PREPARE: 40+4UI to 85+6UI ns
|
|
|
|
|
|
hs_p_min = 40.0 + 4 * ui_ns
|
|
|
|
|
|
hs_p_max = 85.0 + 6 * ui_ns
|
|
|
|
|
|
fields["hs_prepare"] = _field("hs_prepare", hs_p_min, hs_p_max)
|
|
|
|
|
|
hs_p_actual = fields["hs_prepare"]["actual_ns"]
|
|
|
|
|
|
|
|
|
|
|
|
# HS-ZERO: combined hs_prepare + hs_zero ≥ 145+10UI ns
|
|
|
|
|
|
hs_z_combined_min = 145.0 + 10 * ui_ns
|
|
|
|
|
|
hs_z_min = max(1.0, hs_z_combined_min - hs_p_actual)
|
|
|
|
|
|
fields["hs_zero"] = _field("hs_zero", hs_z_min)
|
|
|
|
|
|
fields["hs_zero"]["combined_min_ns"] = hs_z_combined_min
|
|
|
|
|
|
|
|
|
|
|
|
# HS-TRAIL ≥ max(8UI, 60+4UI) ns
|
|
|
|
|
|
fields["hs_trail"] = _field("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
|
|
|
|
|
|
|
|
|
|
|
|
# HS-EXIT ≥ 100 ns
|
|
|
|
|
|
fields["hs_exit"] = _field("hs_exit", 100.0)
|
|
|
|
|
|
|
|
|
|
|
|
# CLK-PREPARE: 38 to 95 ns
|
|
|
|
|
|
fields["clk_prepare"] = _field("clk_prepare", 38.0, 95.0)
|
|
|
|
|
|
clk_p_actual = fields["clk_prepare"]["actual_ns"]
|
|
|
|
|
|
|
|
|
|
|
|
# CLK-ZERO: combined clk_prepare + clk_zero ≥ 300 ns
|
|
|
|
|
|
clk_z_combined_min = 300.0
|
|
|
|
|
|
clk_z_min = max(1.0, clk_z_combined_min - clk_p_actual)
|
|
|
|
|
|
fields["clk_zero"] = _field("clk_zero", clk_z_min)
|
|
|
|
|
|
fields["clk_zero"]["combined_min_ns"] = clk_z_combined_min
|
|
|
|
|
|
|
|
|
|
|
|
# CLK-POST ≥ 60+52UI ns
|
|
|
|
|
|
fields["clk_post"] = _field("clk_post", 60.0 + 52 * ui_ns)
|
|
|
|
|
|
|
|
|
|
|
|
# CLK-TRAIL ≥ max(12UI, 60) ns
|
|
|
|
|
|
fields["clk_trail"] = _field("clk_trail", max(12 * ui_ns, 60.0))
|
|
|
|
|
|
|
|
|
|
|
|
# ── Register packing ──────────────────────────────────────────────────
|
|
|
|
|
|
def _f(name: str) -> int:
|
|
|
|
|
|
return fields[name]["final"]
|
|
|
|
|
|
|
|
|
|
|
|
phy_timing = (_f("lpx") << 8) | _f("hs_exit")
|
|
|
|
|
|
phy_timing1 = ((_f("clk_prepare") << 24) | (_f("clk_zero") << 16) |
|
|
|
|
|
|
(_f("clk_post") << 8) | _f("clk_trail"))
|
|
|
|
|
|
phy_timing2 = ((_f("hs_prepare") << 16) | (_f("hs_zero") << 8) | _f("hs_trail"))
|
|
|
|
|
|
|
|
|
|
|
|
registers = {
|
|
|
|
|
|
"PHY_TIMING": {"addr": 0xb4, "value": phy_timing},
|
|
|
|
|
|
"PHY_TIMING1": {"addr": 0xb8, "value": phy_timing1},
|
|
|
|
|
|
"PHY_TIMING2": {"addr": 0xbc, "value": phy_timing2},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# ── Constraint checks ─────────────────────────────────────────────────
|
|
|
|
|
|
violations: list[str] = []
|
|
|
|
|
|
|
|
|
|
|
|
def _chk(name: str, lo: float, hi: float | None = None) -> None:
|
|
|
|
|
|
a = fields[name]["actual_ns"]
|
|
|
|
|
|
if a < lo:
|
|
|
|
|
|
violations.append(f"{name}: {a:.2f} ns < {lo:.2f} ns (min)")
|
|
|
|
|
|
if hi is not None and a > hi:
|
|
|
|
|
|
violations.append(f"{name}: {a:.2f} ns > {hi:.2f} ns (max)")
|
|
|
|
|
|
|
|
|
|
|
|
_chk("lpx", 50.0)
|
|
|
|
|
|
_chk("hs_prepare", hs_p_min, hs_p_max)
|
|
|
|
|
|
hs_combined = fields["hs_prepare"]["actual_ns"] + fields["hs_zero"]["actual_ns"]
|
|
|
|
|
|
if hs_combined < hs_z_combined_min:
|
|
|
|
|
|
violations.append(
|
|
|
|
|
|
f"hs_prepare+hs_zero: {hs_combined:.2f} ns < {hs_z_combined_min:.2f} ns (min)")
|
|
|
|
|
|
_chk("hs_trail", max(8 * ui_ns, 60.0 + 4 * ui_ns))
|
|
|
|
|
|
_chk("hs_exit", 100.0)
|
|
|
|
|
|
_chk("clk_prepare", 38.0, 95.0)
|
|
|
|
|
|
clk_combined = fields["clk_prepare"]["actual_ns"] + fields["clk_zero"]["actual_ns"]
|
|
|
|
|
|
if clk_combined < clk_z_combined_min:
|
|
|
|
|
|
violations.append(
|
|
|
|
|
|
f"clk_prepare+clk_zero: {clk_combined:.2f} ns < {clk_z_combined_min:.2f} ns (min)")
|
|
|
|
|
|
_chk("clk_post", 60.0 + 52 * ui_ns)
|
|
|
|
|
|
_chk("clk_trail", max(12 * ui_ns, 60.0))
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"pixel_clock_mhz": pixel_clock_mhz,
|
|
|
|
|
|
"bit_rate_mbps": bit_rate_mbps,
|
|
|
|
|
|
"byte_clock_mhz": byte_clock_mhz,
|
|
|
|
|
|
"byte_period_ns": byte_period_ns,
|
|
|
|
|
|
"ui_ns": ui_ns,
|
|
|
|
|
|
"fields": fields,
|
|
|
|
|
|
"registers": registers,
|
|
|
|
|
|
"violations": violations,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_timing_table(t: dict) -> str:
|
|
|
|
|
|
"""Return a fixed-width console table of all D-PHY timing fields."""
|
|
|
|
|
|
fields = t["fields"]
|
|
|
|
|
|
hdr = (f"{'Field':<14} {'Spec (ns)':<18} {'Rnd Best':>9} "
|
|
|
|
|
|
f"{'Rnd Up':>7} {'Extra':>6} {'Final':>6} {'Actual (ns)':>12} Status")
|
|
|
|
|
|
sep = "-" * len(hdr)
|
|
|
|
|
|
lines = [sep, hdr, sep]
|
|
|
|
|
|
|
|
|
|
|
|
for name in _TIMING_FIELD_ORDER:
|
|
|
|
|
|
f = fields[name]
|
|
|
|
|
|
min_ns = f["min_ns"]
|
|
|
|
|
|
max_ns = f.get("max_ns")
|
|
|
|
|
|
actual = f["actual_ns"]
|
|
|
|
|
|
|
|
|
|
|
|
spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None
|
|
|
|
|
|
else f">= {min_ns:.1f}")
|
|
|
|
|
|
|
|
|
|
|
|
ok = actual >= min_ns
|
|
|
|
|
|
if max_ns is not None and actual > max_ns:
|
|
|
|
|
|
ok = False
|
|
|
|
|
|
if name == "hs_zero":
|
|
|
|
|
|
comb = fields["hs_prepare"]["actual_ns"] + actual
|
|
|
|
|
|
ok = ok and (comb >= f.get("combined_min_ns", 0))
|
|
|
|
|
|
if name == "clk_zero":
|
|
|
|
|
|
comb = fields["clk_prepare"]["actual_ns"] + actual
|
|
|
|
|
|
ok = ok and (comb >= f.get("combined_min_ns", 0))
|
|
|
|
|
|
status = "OK" if ok else "FAIL"
|
|
|
|
|
|
|
|
|
|
|
|
lines.append(
|
|
|
|
|
|
f"{name:<14} {spec_str:<18} {f['round_best']:>9} "
|
|
|
|
|
|
f"{f['round_up']:>7} +{f['extra']:>5} {f['final']:>6} {actual:>12.2f} {status}"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
lines.append(sep)
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_uboot_commands(t: dict) -> str:
|
|
|
|
|
|
"""Return u-boot shell commands to apply the computed PHY timing configuration."""
|
|
|
|
|
|
fields = t["fields"]
|
|
|
|
|
|
regs = t["registers"]
|
|
|
|
|
|
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
|
|
|
|
|
|
|
|
|
|
|
|
phy_t = regs["PHY_TIMING"]["value"]
|
|
|
|
|
|
phy_t1 = regs["PHY_TIMING1"]["value"]
|
|
|
|
|
|
phy_t2 = regs["PHY_TIMING2"]["value"]
|
|
|
|
|
|
|
|
|
|
|
|
non_zero_extras = [
|
|
|
|
|
|
(_EXTRA_PROP_MAP[name], fields[name]["extra"])
|
|
|
|
|
|
for name in _TIMING_FIELD_ORDER
|
|
|
|
|
|
if fields[name]["extra"] > 0
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
lines = [
|
|
|
|
|
|
f"# D-PHY PHY timing registers "
|
|
|
|
|
|
f"(pixel clock {t['pixel_clock_mhz']} MHz, "
|
|
|
|
|
|
f"{t['bit_rate_mbps']:.1f} Mbit/s, "
|
|
|
|
|
|
f"byte clock {t['byte_clock_mhz']:.3f} MHz)",
|
|
|
|
|
|
"#",
|
|
|
|
|
|
f"# PHY_TIMING (0xb4) = 0x{phy_t:08x} "
|
|
|
|
|
|
f"lpx={fv['lpx']} hs_exit={fv['hs_exit']}",
|
|
|
|
|
|
f"# PHY_TIMING1 (0xb8) = 0x{phy_t1:08x} "
|
|
|
|
|
|
f"clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']} "
|
|
|
|
|
|
f"clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}",
|
|
|
|
|
|
f"# PHY_TIMING2 (0xbc) = 0x{phy_t2:08x} "
|
|
|
|
|
|
f"hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']} "
|
|
|
|
|
|
f"hs_trail={fv['hs_trail']}",
|
|
|
|
|
|
"",
|
|
|
|
|
|
"# Enable Round-Up rounding (dsi-tweak bit 2)",
|
|
|
|
|
|
'setenv flb_dtovar "${flb_dtovar} dsi-tweak=4"',
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
if non_zero_extras:
|
|
|
|
|
|
lines += ["", "# Extra PHY cycles above Round-Up minimum"]
|
|
|
|
|
|
for prop, val in non_zero_extras:
|
|
|
|
|
|
lines.append(f'setenv flb_dtovar "${{flb_dtovar}} {prop}={val}"')
|
|
|
|
|
|
|
|
|
|
|
|
lines += ["", "saveenv", "boot"]
|
|
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prompt_for_config() -> dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Interactive startup prompt: ask for pixel clock and optional extra PHY cycles.
|
|
|
|
|
|
Prints timing table and u-boot commands, then returns a config dict.
|
|
|
|
|
|
"""
|
|
|
|
|
|
print("\n" + "=" * 64)
|
|
|
|
|
|
print(" D-PHY TIMING CONFIGURATION")
|
|
|
|
|
|
print("=" * 64)
|
|
|
|
|
|
print("RGB888, 4 DSI lanes, Samsung DSIM on NXP i.MX 8M Mini.")
|
|
|
|
|
|
print("Timing constraints: MIPI D-PHY v1.1 Table 14.\n")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Pixel clock ───────────────────────────────────────────────────────
|
|
|
|
|
|
while True:
|
|
|
|
|
|
try:
|
|
|
|
|
|
pix_str = input("Enter pixel clock in MHz (e.g. 72.0): ").strip()
|
|
|
|
|
|
pixel_clock_mhz = float(pix_str)
|
|
|
|
|
|
if not (10.0 <= pixel_clock_mhz <= 300.0):
|
|
|
|
|
|
print(" Out of range — enter a value between 10 and 300.")
|
|
|
|
|
|
continue
|
|
|
|
|
|
break
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
print(" Invalid — enter a number (e.g. 72.0).")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Baseline (no extras) ──────────────────────────────────────────────
|
|
|
|
|
|
t_base = calculate_dphy_timing(pixel_clock_mhz)
|
|
|
|
|
|
print(f"\nPixel clock : {pixel_clock_mhz} MHz")
|
|
|
|
|
|
print(f"Bit rate : {t_base['bit_rate_mbps']:.1f} Mbit/s per lane")
|
|
|
|
|
|
print(f"Byte clock : {t_base['byte_clock_mhz']:.3f} MHz "
|
|
|
|
|
|
f"({t_base['byte_period_ns']:.3f} ns/byte)")
|
|
|
|
|
|
print(f"UI : {t_base['ui_ns']:.3f} ns\n")
|
|
|
|
|
|
print(format_timing_table(t_base))
|
|
|
|
|
|
|
|
|
|
|
|
if t_base["violations"]:
|
|
|
|
|
|
print("\n *** BASELINE VIOLATIONS ***")
|
|
|
|
|
|
for v in t_base["violations"]:
|
|
|
|
|
|
print(f" ! {v}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("\n All D-PHY v1.1 Table 14 constraints satisfied at baseline.")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Extra cycles ──────────────────────────────────────────────────────
|
|
|
|
|
|
print("\n--- Extra PHY cycles (added on top of Round-Up minimum) ---")
|
|
|
|
|
|
print("Suggested values for SN65DSI83 reliability (from dev testing):")
|
|
|
|
|
|
print(" clk_zero +3, hs_prepare +1, hs_trail +1 (others 0)")
|
|
|
|
|
|
print("Press Enter to accept 0 for each field.\n")
|
|
|
|
|
|
|
|
|
|
|
|
extras: dict[str, int] = {}
|
|
|
|
|
|
for name in _TIMING_FIELD_ORDER:
|
|
|
|
|
|
while True:
|
|
|
|
|
|
try:
|
|
|
|
|
|
val = input(f" Extra cycles for {name:<12s} [0]: ").strip()
|
|
|
|
|
|
extras[name] = int(val) if val else 0
|
|
|
|
|
|
if extras[name] < 0:
|
|
|
|
|
|
print(" Cannot be negative.")
|
|
|
|
|
|
continue
|
|
|
|
|
|
break
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
print(" Enter an integer (or press Enter for 0).")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Final calculation with extras ─────────────────────────────────────
|
|
|
|
|
|
t_final = calculate_dphy_timing(pixel_clock_mhz, extras)
|
|
|
|
|
|
|
|
|
|
|
|
if any(v > 0 for v in extras.values()):
|
|
|
|
|
|
print("\n--- Final timing (with extras) ---")
|
|
|
|
|
|
print(format_timing_table(t_final))
|
|
|
|
|
|
if t_final["violations"]:
|
|
|
|
|
|
print("\n *** VIOLATIONS WITH EXTRAS ***")
|
|
|
|
|
|
for v in t_final["violations"]:
|
|
|
|
|
|
print(f" ! {v}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("\n All constraints satisfied.")
|
|
|
|
|
|
|
|
|
|
|
|
print("\n--- u-boot commands ---")
|
|
|
|
|
|
print(format_uboot_commands(t_final))
|
|
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"pixel_clock_mhz": pixel_clock_mhz,
|
|
|
|
|
|
"extras": extras,
|
|
|
|
|
|
"timing": t_final,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Instrument connection
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
psu = vxi11.Instrument(PSU_IP)
|
|
|
|
|
|
scope = vxi11.Instrument(SCOPE_IP)
|
|
|
|
|
|
scope.timeout = 30
|
|
|
|
|
|
psu.timeout = 5
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
|
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
rigol_scope.connect()
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Scope configuration (identical to mipi_test.py)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def setup_scope():
|
|
|
|
|
|
"""Initialises scope for MIPI DSI signals (~210 MHz)."""
|
|
|
|
|
|
print("CONFIGURING SCOPE...")
|
|
|
|
|
|
cmds = [
|
|
|
|
|
|
"*RST", ":RUN", ":STOP",
|
|
|
|
|
|
# Channel 1 — Clock D+
|
|
|
|
|
|
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
|
|
|
|
|
|
":CHANnel1:SCALe 0.1", ":CHANnel1:OFFSet 0.0", ":CHANnel1:LABel 'CLK+'",
|
|
|
|
|
|
# Channel 2 — Clock D-
|
|
|
|
|
|
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
|
|
|
|
|
|
":CHANnel2:SCALe 0.1", ":CHANnel2:OFFSet 0.0", ":CHANnel2:LABel 'CLK-'",
|
|
|
|
|
|
# Channel 3 — Data Lane 0 D+
|
|
|
|
|
|
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
|
|
|
|
|
|
":CHANnel3:SCALe 0.1", ":CHANnel3:OFFSet 0.0", ":CHANnel3:LABel 'DAT0+'",
|
|
|
|
|
|
# Channel 4 — Data Lane 0 D-
|
|
|
|
|
|
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
|
|
|
|
|
|
":CHANnel4:SCALe 0.1", ":CHANnel4:OFFSet 0.0", ":CHANnel4:LABel 'DAT0-'",
|
|
|
|
|
|
# Timebase
|
|
|
|
|
|
":TIMebase:SCALe 5E-9", ":TIMebase:POSition 0", ":TIMebase:REFerence CENTer",
|
|
|
|
|
|
# Trigger — rising edge on Ch1 (Clock D+)
|
|
|
|
|
|
":TRIGger:MODE EDGE", ":TRIGger:EDGE:SOURce CHANnel1",
|
|
|
|
|
|
":TRIGger:EDGE:SLOPe POSitive", ":TRIGger:EDGE:LEVel 0.05",
|
|
|
|
|
|
":TRIGger:SWEep NORMal",
|
|
|
|
|
|
# Acquisition
|
|
|
|
|
|
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ":ACQuire:POINts 500000",
|
|
|
|
|
|
":DISPlay:LAYout STACKED",
|
|
|
|
|
|
":RUN",
|
|
|
|
|
|
]
|
|
|
|
|
|
for cmd in cmds:
|
|
|
|
|
|
scope.write(cmd)
|
|
|
|
|
|
time.sleep(0.05)
|
|
|
|
|
|
print("CHANNEL SETUP COMPLETE.")
|
|
|
|
|
|
setup_math_channels()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def setup_math_channels():
|
|
|
|
|
|
"""F1 = Ch1−Ch2 (clock differential), F2 = Ch3−Ch4 (lane 0 differential)."""
|
|
|
|
|
|
print("SETTING UP MATH CHANNELS...")
|
|
|
|
|
|
scope.write("*CLS")
|
|
|
|
|
|
time.sleep(0.2)
|
|
|
|
|
|
for cmd in [
|
|
|
|
|
|
":FUNCtion1:DISPlay ON", ":FUNCtion1:SUBTract CHANnel1,CHANnel2",
|
|
|
|
|
|
":FUNCtion1:RANGe 0.8", ":FUNCtion1:OFFSet 0.0",
|
|
|
|
|
|
":FUNCtion2:DISPlay ON", ":FUNCtion2:SUBTract CHANnel3,CHANnel4",
|
|
|
|
|
|
":FUNCtion2:RANGe 0.8", ":FUNCtion2:OFFSet 0.0",
|
|
|
|
|
|
]:
|
|
|
|
|
|
scope.write(cmd)
|
|
|
|
|
|
time.sleep(0.2)
|
|
|
|
|
|
try:
|
|
|
|
|
|
time.sleep(1.0)
|
|
|
|
|
|
opc = scope.ask("*OPC?")
|
|
|
|
|
|
print(f" SCOPE SYNC OK (OPC={opc.strip()})")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" WARNING: OPC SYNC FAILED ({e})")
|
|
|
|
|
|
try:
|
|
|
|
|
|
err = scope.ask(":SYSTem:ERRor?")
|
|
|
|
|
|
if err.strip().startswith("0"):
|
|
|
|
|
|
print(" MATH COMMANDS ACCEPTED — NO SCPI ERRORS.")
|
|
|
|
|
|
print(" F1 = CLK DIFF (CH1-CH2), F2 = DAT DIFF (CH3-CH4)")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f" SCPI ERROR: {err.strip()}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" COULD NOT READ ERROR QUEUE ({e})")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _set_timebase(scale, points):
|
|
|
|
|
|
scope.write(f":TIMebase:SCALe {scale:.3E}")
|
|
|
|
|
|
scope.write(f":ACQuire:POINts {points}")
|
|
|
|
|
|
time.sleep(0.3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _arm_and_wait(timeout=20):
|
|
|
|
|
|
prev_timeout = scope.timeout
|
|
|
|
|
|
try:
|
|
|
|
|
|
scope.timeout = timeout + 5
|
|
|
|
|
|
scope.write(":DIGitize")
|
|
|
|
|
|
return scope.ask("*OPC?").strip() == "1"
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" ACQUIRE ERROR: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
finally:
|
|
|
|
|
|
scope.timeout = prev_timeout
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _save_pass(tag, iteration, ts):
|
|
|
|
|
|
"""Save F1 (CLK diff) and F2 (DAT diff) as CSV."""
|
|
|
|
|
|
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
|
|
|
|
|
|
try:
|
|
|
|
|
|
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
|
|
|
|
|
|
time.sleep(2.5)
|
|
|
|
|
|
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
|
|
|
|
|
|
time.sleep(2.5)
|
|
|
|
|
|
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" SAVE ERROR ({tag}): {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _save_pass_channels(tag, iteration, ts):
|
|
|
|
|
|
"""Save Ch1 (CLK+) and Ch3 (DAT0+) single-ended for LP state analysis."""
|
|
|
|
|
|
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
|
|
|
|
|
|
try:
|
|
|
|
|
|
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV')
|
|
|
|
|
|
time.sleep(2.5)
|
|
|
|
|
|
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV')
|
|
|
|
|
|
time.sleep(2.5)
|
|
|
|
|
|
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" SAVE ERROR ({tag}): {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _configure_for_lp():
|
|
|
|
|
|
"""Widen vertical range for LP states and switch to falling-edge trigger."""
|
|
|
|
|
|
for ch in (1, 2, 3, 4):
|
|
|
|
|
|
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
|
|
|
|
|
|
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
|
|
|
|
|
|
time.sleep(0.05)
|
|
|
|
|
|
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
|
|
|
|
|
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
|
|
|
|
|
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _restore_hs_config():
|
|
|
|
|
|
"""Restore HS-mode channel scales and trigger after LP capture."""
|
|
|
|
|
|
for ch in (1, 2, 3, 4):
|
|
|
|
|
|
scope.write(f":CHANnel{ch}:SCALe 0.1")
|
|
|
|
|
|
scope.write(f":CHANnel{ch}:OFFSet 0.0")
|
|
|
|
|
|
time.sleep(0.05)
|
|
|
|
|
|
scope.write(":TRIGger:EDGE:SOURce CHANnel1")
|
|
|
|
|
|
scope.write(":TRIGger:EDGE:SLOPe POSitive")
|
|
|
|
|
|
scope.write(":TRIGger:EDGE:LEVel 0.05")
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fetch_registers(ts: str, iteration: int) -> None:
|
|
|
|
|
|
"""GET /registers from device server and save to data/ as JSON."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
|
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
|
data = resp.json()
|
|
|
|
|
|
if data.get("errors"):
|
|
|
|
|
|
print(f" REGISTERS: device warnings — {data['errors']}")
|
|
|
|
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
|
|
|
|
|
|
reg_path.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
print(f" SAVED: {reg_path.name} ({len(data.get('registers', []))} registers)")
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
print(f" REGISTERS: fetch failed — {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" REGISTERS: error — {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dual_capture(iteration: int) -> str:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Three-pass capture per test iteration.
|
|
|
|
|
|
|
|
|
|
|
|
Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON)
|
|
|
|
|
|
Pass 2 — signal quality (HS differential, rise/fall)
|
|
|
|
|
|
Pass 3 — frame structure (HS differential, jitter/freq)
|
|
|
|
|
|
|
|
|
|
|
|
Returns the timestamp string used in all filenames for this iteration.
|
|
|
|
|
|
"""
|
|
|
|
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
|
|
print(f"CAPTURE #{iteration:04d} [{ts}]")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Pass 1: LP / SoT startup ───────────────────────────────────────────
|
|
|
|
|
|
print(" PASS 1: LP STARTUP TRANSITION...")
|
|
|
|
|
|
_configure_for_lp()
|
|
|
|
|
|
_set_timebase(LP_SCALE, LP_POINTS)
|
|
|
|
|
|
if rigol_scope.is_connected():
|
|
|
|
|
|
rigol_scope.arm()
|
|
|
|
|
|
if _arm_and_wait(timeout=30):
|
|
|
|
|
|
_save_pass_channels("lp", iteration, ts)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" SKIPPING LP SAVE.")
|
|
|
|
|
|
if rigol_scope.is_connected():
|
|
|
|
|
|
DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
|
|
|
|
|
|
n = rigol_scope.read_waveform_csv(v18_path)
|
|
|
|
|
|
if n:
|
|
|
|
|
|
print(f" SAVED: {v18_path.name} ({n} samples)")
|
|
|
|
|
|
else:
|
2026-04-20 10:35:06 +01:00
|
|
|
|
print(" RIGOL: Waveform read failed — check connection and probe.")
|
2026-04-16 11:23:25 +01:00
|
|
|
|
_restore_hs_config()
|
|
|
|
|
|
|
|
|
|
|
|
# ── Pass 2: HS signal quality ──────────────────────────────────────────
|
|
|
|
|
|
print(" PASS 2: SIGNAL QUALITY...")
|
|
|
|
|
|
_set_timebase(SIG_SCALE, SIG_POINTS)
|
|
|
|
|
|
if _arm_and_wait():
|
|
|
|
|
|
_save_pass("sig", iteration, ts)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" SKIPPING SIG SAVE.")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Pass 3: frame/protocol structure ──────────────────────────────────
|
|
|
|
|
|
print(" PASS 3: FRAME STRUCTURE...")
|
|
|
|
|
|
_set_timebase(PROTO_SCALE, PROTO_POINTS)
|
|
|
|
|
|
if _arm_and_wait():
|
|
|
|
|
|
_save_pass("proto", iteration, ts)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" SKIPPING PROTO SAVE.")
|
|
|
|
|
|
|
|
|
|
|
|
# ── DSI register snapshot ─────────────────────────────────────────────
|
|
|
|
|
|
_fetch_registers(ts, iteration)
|
|
|
|
|
|
|
|
|
|
|
|
# ── Restore default timebase ──────────────────────────────────────────
|
|
|
|
|
|
_set_timebase(5e-9, 500_000)
|
|
|
|
|
|
scope.write(":RUN")
|
|
|
|
|
|
return ts
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Per-iteration LP analysis + Claude assessment
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def _build_system_prompt(config: dict | None = None) -> str:
|
|
|
|
|
|
"""Build the Claude system prompt with correct bit-rate values for the active config."""
|
|
|
|
|
|
if config and "timing" in config:
|
|
|
|
|
|
t = config["timing"]
|
|
|
|
|
|
bit_rate = t["bit_rate_mbps"]
|
|
|
|
|
|
byte_clk = t["byte_clock_mhz"]
|
|
|
|
|
|
byte_per = t["byte_period_ns"]
|
|
|
|
|
|
else:
|
|
|
|
|
|
bit_rate = 432.0
|
|
|
|
|
|
byte_clk = 54.0
|
|
|
|
|
|
byte_per = 18.518
|
|
|
|
|
|
return (
|
|
|
|
|
|
"You are an expert in MIPI D-PHY signal integrity analysis. "
|
|
|
|
|
|
"You will receive a pre-processed summary of a single LP capture from a MIPI DAT0 lane "
|
|
|
|
|
|
"(NXP i.MX 8M Mini Samsung DSIM IP driving a SN65DSI83 MIPI-to-LVDS bridge). "
|
|
|
|
|
|
f"HS bit rate: {bit_rate:.1f} Mbit/s. "
|
|
|
|
|
|
f"Byte clock: {byte_clk:.3f} MHz ({byte_per:.3f} ns/byte). "
|
|
|
|
|
|
"The LP-low plateau (LP-01/LP-00 SoT preamble) must be ≥ 50 ns for the SN65DSI83 "
|
|
|
|
|
|
"to detect start-of-transmission. A plateau shorter than 50 ns or absent means the "
|
|
|
|
|
|
"bridge missed the SoT and the display will flicker visibly. "
|
|
|
|
|
|
"Be decisive. Answer YES or NO on the first line."
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _build_claude_prompt(ts: str, iteration: int,
|
|
|
|
|
|
lp_summaries: list[str],
|
|
|
|
|
|
suspects: list[LPMetrics],
|
2026-04-20 10:34:42 +01:00
|
|
|
|
config: dict | None = None,
|
|
|
|
|
|
followup_summaries: list[str] | None = None) -> str:
|
2026-04-16 11:23:25 +01:00
|
|
|
|
"""
|
|
|
|
|
|
Build a concise prompt asking Claude to assess a single capture.
|
|
|
|
|
|
The rule-based pre-filter has already flagged at least one LP suspect.
|
2026-04-20 10:34:42 +01:00
|
|
|
|
If followup_summaries is provided it contains the next-frame LP capture taken
|
|
|
|
|
|
immediately after the suspect — the frame the operator will actually observe.
|
2026-04-16 11:23:25 +01:00
|
|
|
|
"""
|
|
|
|
|
|
suspect_lines = "\n".join(
|
|
|
|
|
|
f" channel={m.channel} lp_low_plateau={m.lp_low_duration_ns} ns "
|
|
|
|
|
|
f"(spec ≥ 50 ns) lp11_to_hs={m.lp11_to_hs_ns} ns "
|
|
|
|
|
|
f"lp11_voltage={m.lp11_voltage_v} V "
|
|
|
|
|
|
f"hs_amplitude={m.hs_amplitude_mv} mV (normal 105–122 mV; absent <50 mV)"
|
|
|
|
|
|
for m in suspects
|
|
|
|
|
|
)
|
|
|
|
|
|
summaries_text = "\n\n".join(lp_summaries)
|
|
|
|
|
|
|
|
|
|
|
|
config_text = ""
|
|
|
|
|
|
if config and "timing" in config:
|
|
|
|
|
|
t = config["timing"]
|
|
|
|
|
|
config_text = (
|
|
|
|
|
|
f"\n\nTest configuration: pixel clock {t['pixel_clock_mhz']} MHz, "
|
|
|
|
|
|
f"bit rate {t['bit_rate_mbps']:.1f} Mbit/s per lane, "
|
|
|
|
|
|
f"byte clock {t['byte_clock_mhz']:.3f} MHz "
|
|
|
|
|
|
f"({t['byte_period_ns']:.3f} ns/byte), UI {t['ui_ns']:.3f} ns."
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
followup_text = ""
|
|
|
|
|
|
if followup_summaries:
|
|
|
|
|
|
followup_text = (
|
|
|
|
|
|
f"\n\nFOLLOW-UP CAPTURE (next display frame — what the operator sees "
|
|
|
|
|
|
f"on screen while assessing):\n"
|
|
|
|
|
|
f"Note: due to the display pipeline lag, the visual flicker caused by "
|
|
|
|
|
|
f"the electrical event above appears one frame later. If the follow-up "
|
|
|
|
|
|
f"frame looks electrically normal, the flicker observed by the operator "
|
|
|
|
|
|
f"was caused by the preceding capture, not this one.\n\n"
|
|
|
|
|
|
+ "\n\n".join(followup_summaries)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2026-04-16 11:23:25 +01:00
|
|
|
|
return (
|
|
|
|
|
|
f"SINGLE-CAPTURE FLICKER ASSESSMENT — capture {iteration:04d} [{ts}]\n\n"
|
|
|
|
|
|
f"The rule-based LP pre-processor has flagged the following measurements as "
|
|
|
|
|
|
f"potential flicker suspects because the LP-low plateau is absent or shorter "
|
|
|
|
|
|
f"than 50 ns:\n{suspect_lines}\n\n"
|
|
|
|
|
|
f"Full LP capture summaries:\n{summaries_text}"
|
2026-04-20 10:34:42 +01:00
|
|
|
|
f"{config_text}"
|
|
|
|
|
|
f"{followup_text}\n\n"
|
2026-04-16 11:23:25 +01:00
|
|
|
|
f"Based solely on these LP timing metrics, do you believe this capture "
|
|
|
|
|
|
f"represents a genuine screen flicker event — i.e., was the SoT sequence "
|
|
|
|
|
|
f"too brief for the SN65DSI83 bridge to detect start-of-transmission, "
|
|
|
|
|
|
f"likely causing visible display flicker?\n\n"
|
|
|
|
|
|
f"Start your response with YES or NO on the first line, then explain your "
|
|
|
|
|
|
f"reasoning briefly (2–4 sentences) referencing the specific metric values."
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
|
|
|
|
|
|
"""Append a flicker suspect to the shared flicker_log.csv."""
|
|
|
|
|
|
FLICKER_LOG.parent.mkdir(exist_ok=True)
|
|
|
|
|
|
write_header = not FLICKER_LOG.exists()
|
|
|
|
|
|
with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
|
|
|
|
|
|
w = _csv_mod.writer(f)
|
|
|
|
|
|
if write_header:
|
|
|
|
|
|
w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
|
|
|
|
|
|
"lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
|
|
|
|
|
|
w.writerow([
|
|
|
|
|
|
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
|
|
|
ts, f"{iteration:04d}", m.channel,
|
|
|
|
|
|
m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
def _analyze_lp_files(
|
|
|
|
|
|
ts: str, iteration: int
|
|
|
|
|
|
) -> tuple[list[str], list[LPMetrics]]:
|
2026-04-16 11:23:25 +01:00
|
|
|
|
"""
|
2026-04-20 10:34:42 +01:00
|
|
|
|
Run rule-based LP analysis for one iteration.
|
|
|
|
|
|
Returns (lp_summaries, suspects). Logs suspects and prints alerts.
|
|
|
|
|
|
Does NOT call Claude.
|
2026-04-16 11:23:25 +01:00
|
|
|
|
"""
|
|
|
|
|
|
lp_summaries: list[str] = []
|
|
|
|
|
|
suspects: list[LPMetrics] = []
|
|
|
|
|
|
|
|
|
|
|
|
for channel in ("clk", "dat"):
|
|
|
|
|
|
path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv"
|
|
|
|
|
|
if not path.exists():
|
|
|
|
|
|
print(f" LP ANALYSIS: {path.name} not found — skipping.")
|
|
|
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
m = analyze_lp_file(path)
|
|
|
|
|
|
lp_summaries.append(m.summary())
|
|
|
|
|
|
if m.flicker_suspect:
|
|
|
|
|
|
suspects.append(m)
|
|
|
|
|
|
_append_flicker_log(ts, iteration, m)
|
2026-04-20 10:34:42 +01:00
|
|
|
|
if not m.lp_transition_valid and not m.lp11_voltage_v:
|
|
|
|
|
|
reason = "MIPI link silent (no LP-11/LP-low/HS detected)"
|
|
|
|
|
|
elif (m.hs_amplitude_mv is not None
|
2026-04-16 11:23:25 +01:00
|
|
|
|
and m.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
|
2026-04-16 12:08:00 +01:00
|
|
|
|
and m.lp11_to_hs_ns is not None
|
|
|
|
|
|
and m.lp11_to_hs_ns >= FLICKER_LP_LOW_MAX_NS):
|
2026-04-20 10:34:42 +01:00
|
|
|
|
reason = (f"HS burst absent ({m.hs_amplitude_mv:.0f} mV, "
|
|
|
|
|
|
f"lp11_to_hs={m.lp11_to_hs_ns:.0f} ns)")
|
2026-04-16 11:23:25 +01:00
|
|
|
|
else:
|
|
|
|
|
|
reason = f"lp_low={m.lp_low_duration_ns} ns"
|
|
|
|
|
|
print(f"\n *** FLICKER SUSPECT: capture {iteration:04d} "
|
|
|
|
|
|
f"channel={m.channel} {reason} ***\n")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" LP ANALYSIS ERROR ({channel}): {e}")
|
|
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
return lp_summaries, suspects
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
LP-only follow-up capture taken immediately after a suspect is detected.
|
|
|
|
|
|
Captures the next display frame — the one the operator will actually see
|
|
|
|
|
|
when asked to assess whether the screen is flickering.
|
|
|
|
|
|
|
|
|
|
|
|
Returns (ts_followup, lp_summaries, suspects).
|
|
|
|
|
|
Returns ("", [], []) silently if the scope is not reachable.
|
|
|
|
|
|
"""
|
|
|
|
|
|
print(" FOLLOW-UP CAPTURE: acquiring next frame for display-lag context...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
|
|
_configure_for_lp()
|
|
|
|
|
|
_set_timebase(LP_SCALE, LP_POINTS)
|
|
|
|
|
|
if rigol_scope.is_connected():
|
|
|
|
|
|
rigol_scope.arm()
|
|
|
|
|
|
if _arm_and_wait(timeout=10):
|
|
|
|
|
|
_save_pass_channels("lp", iteration, ts_fu)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" FOLLOW-UP: trigger timeout — skipping.")
|
|
|
|
|
|
_restore_hs_config()
|
|
|
|
|
|
return "", [], []
|
|
|
|
|
|
_restore_hs_config()
|
2026-04-16 11:23:25 +01:00
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
# Transfer the new files from scope
|
|
|
|
|
|
try:
|
|
|
|
|
|
copied, _ = ai_mgmt.transfer_csv_files()
|
|
|
|
|
|
print(f" FOLLOW-UP: {copied} file(s) transferred.")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" FOLLOW-UP TRANSFER ERROR: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
summaries, suspects = _analyze_lp_files(ts_fu, iteration)
|
|
|
|
|
|
return ts_fu, summaries, suspects
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" FOLLOW-UP CAPTURE ERROR: {e}")
|
|
|
|
|
|
return "", [], []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _call_claude(
|
|
|
|
|
|
ts: str, iteration: int,
|
|
|
|
|
|
lp_summaries: list[str],
|
|
|
|
|
|
suspects: list[LPMetrics],
|
|
|
|
|
|
config: dict | None = None,
|
|
|
|
|
|
followup_summaries: list[str] | None = None,
|
|
|
|
|
|
) -> tuple[bool, str]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Call the Claude API to assess whether the flagged capture is a flicker event.
|
|
|
|
|
|
Returns (claude_says_flicker, response_text).
|
|
|
|
|
|
"""
|
2026-04-16 11:23:25 +01:00
|
|
|
|
print(" CALLING CLAUDE FOR ASSESSMENT...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
client = anthropic.Anthropic()
|
|
|
|
|
|
message = client.messages.create(
|
|
|
|
|
|
model = CLAUDE_MODEL,
|
2026-04-20 10:34:42 +01:00
|
|
|
|
max_tokens = 600,
|
2026-04-16 11:23:25 +01:00
|
|
|
|
system = _build_system_prompt(config),
|
|
|
|
|
|
messages = [{"role": "user", "content":
|
|
|
|
|
|
_build_claude_prompt(ts, iteration, lp_summaries,
|
2026-04-20 10:34:42 +01:00
|
|
|
|
suspects, config,
|
|
|
|
|
|
followup_summaries)}],
|
2026-04-16 11:23:25 +01:00
|
|
|
|
)
|
|
|
|
|
|
response = message.content[0].text.strip()
|
|
|
|
|
|
first_line = response.splitlines()[0].strip().upper()
|
|
|
|
|
|
claude_says_flicker = first_line.startswith("YES")
|
|
|
|
|
|
label = "FLICKER" if claude_says_flicker else "NOT FLICKER"
|
|
|
|
|
|
print(f" CLAUDE: {label} ({message.usage.input_tokens} in / "
|
|
|
|
|
|
f"{message.usage.output_tokens} out tokens)")
|
2026-04-20 10:34:42 +01:00
|
|
|
|
return claude_says_flicker, response
|
2026-04-16 11:23:25 +01:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" CLAUDE API ERROR: {e}")
|
|
|
|
|
|
fallback = (f"(Claude API unavailable: {e})\n"
|
|
|
|
|
|
f"Rule-based detector flagged LP-low plateau < 50 ns — "
|
|
|
|
|
|
f"treat as potential flicker suspect.")
|
2026-04-20 10:34:42 +01:00
|
|
|
|
return True, fallback
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_lp_and_ask_claude(
|
|
|
|
|
|
ts: str, iteration: int, config: dict | None = None
|
|
|
|
|
|
) -> tuple[bool, str, list[LPMetrics]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Analyse the LP files for this iteration (rule-based + Claude if suspect).
|
|
|
|
|
|
Kept for backwards compatibility — used by report generation and tests.
|
|
|
|
|
|
Does not perform a follow-up capture; call the test loop directly for that.
|
|
|
|
|
|
"""
|
|
|
|
|
|
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
|
|
|
|
|
|
if not suspects:
|
|
|
|
|
|
return False, "", []
|
|
|
|
|
|
claude_flicker, response = _call_claude(ts, iteration, lp_summaries,
|
|
|
|
|
|
suspects, config)
|
|
|
|
|
|
return claude_flicker, response, suspects
|
2026-04-16 11:23:25 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Event logging and HTML report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def _config_section_html(config: dict) -> str:
|
|
|
|
|
|
"""Generate the D-PHY configuration section for the HTML report."""
|
|
|
|
|
|
t = config["timing"]
|
|
|
|
|
|
fields = t["fields"]
|
|
|
|
|
|
regs = t["registers"]
|
|
|
|
|
|
|
|
|
|
|
|
rows_html = ""
|
|
|
|
|
|
for name in _TIMING_FIELD_ORDER:
|
|
|
|
|
|
f = fields[name]
|
|
|
|
|
|
min_ns = f["min_ns"]
|
|
|
|
|
|
max_ns = f.get("max_ns")
|
|
|
|
|
|
actual = f["actual_ns"]
|
|
|
|
|
|
|
|
|
|
|
|
ok = actual >= min_ns
|
|
|
|
|
|
if max_ns is not None and actual > max_ns:
|
|
|
|
|
|
ok = False
|
|
|
|
|
|
if name == "hs_zero":
|
|
|
|
|
|
comb = fields["hs_prepare"]["actual_ns"] + actual
|
|
|
|
|
|
ok = ok and (comb >= f.get("combined_min_ns", 0))
|
|
|
|
|
|
if name == "clk_zero":
|
|
|
|
|
|
comb = fields["clk_prepare"]["actual_ns"] + actual
|
|
|
|
|
|
ok = ok and (comb >= f.get("combined_min_ns", 0))
|
|
|
|
|
|
|
|
|
|
|
|
spec_str = (f"{min_ns:.1f} – {max_ns:.1f}" if max_ns is not None
|
|
|
|
|
|
else f"≥ {min_ns:.1f}")
|
|
|
|
|
|
cell_style = "" if ok else ' style="color:#c62828;font-weight:bold"'
|
|
|
|
|
|
status_html = "✓" if ok else '<span style="color:#c62828">✖ FAIL</span>'
|
|
|
|
|
|
|
|
|
|
|
|
rows_html += (
|
|
|
|
|
|
f"<tr>"
|
|
|
|
|
|
f"<td><code>{name}</code></td>"
|
|
|
|
|
|
f"<td>{spec_str}</td>"
|
|
|
|
|
|
f"<td>{f['round_best']}</td>"
|
|
|
|
|
|
f"<td>{f['round_up']}</td>"
|
|
|
|
|
|
f"<td>+{f['extra']}</td>"
|
|
|
|
|
|
f"<td><strong>{f['final']}</strong></td>"
|
|
|
|
|
|
f"<td{cell_style}>{actual:.2f}</td>"
|
|
|
|
|
|
f"<td>{status_html}</td>"
|
|
|
|
|
|
f"</tr>\n"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if t["violations"]:
|
|
|
|
|
|
items_html = "".join(f"<li>{html.escape(v)}</li>" for v in t["violations"])
|
|
|
|
|
|
viol_html = (
|
|
|
|
|
|
f'<p style="color:#c62828;font-weight:bold">Timing violations:</p>'
|
|
|
|
|
|
f"<ul>{items_html}</ul>"
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
viol_html = (
|
|
|
|
|
|
'<p style="color:#2e7d32">✓ All D-PHY v1.1 Table 14 '
|
|
|
|
|
|
"constraints satisfied.</p>"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
fv = {name: fields[name]["final"] for name in _TIMING_FIELD_ORDER}
|
|
|
|
|
|
phy_t = regs["PHY_TIMING"]["value"]
|
|
|
|
|
|
phy_t1 = regs["PHY_TIMING1"]["value"]
|
|
|
|
|
|
phy_t2 = regs["PHY_TIMING2"]["value"]
|
|
|
|
|
|
uboot = html.escape(format_uboot_commands(t))
|
|
|
|
|
|
|
|
|
|
|
|
return f"""<h2>D-PHY Configuration</h2>
|
|
|
|
|
|
<p>
|
|
|
|
|
|
Pixel clock: <strong>{t['pixel_clock_mhz']} MHz</strong> |
|
|
|
|
|
|
Bit rate: <strong>{t['bit_rate_mbps']:.1f} Mbit/s per lane</strong> |
|
|
|
|
|
|
Byte clock: <strong>{t['byte_clock_mhz']:.3f} MHz</strong>
|
|
|
|
|
|
({t['byte_period_ns']:.3f} ns/byte) |
|
|
|
|
|
|
UI: <strong>{t['ui_ns']:.3f} ns</strong>
|
|
|
|
|
|
</p>
|
|
|
|
|
|
|
|
|
|
|
|
<table>
|
|
|
|
|
|
<tr>
|
|
|
|
|
|
<th>Field</th><th>Spec (ns)</th><th>Rnd Best</th><th>Rnd Up</th>
|
|
|
|
|
|
<th>Extra</th><th>Final</th><th>Actual (ns)</th><th>Status</th>
|
|
|
|
|
|
</tr>
|
|
|
|
|
|
{rows_html}
|
|
|
|
|
|
</table>
|
|
|
|
|
|
|
|
|
|
|
|
{viol_html}
|
|
|
|
|
|
|
|
|
|
|
|
<h3>Samsung DSIM Registers</h3>
|
|
|
|
|
|
<table>
|
|
|
|
|
|
<tr><th>Register</th><th>Address</th><th>Value</th><th>Field breakdown</th></tr>
|
|
|
|
|
|
<tr>
|
|
|
|
|
|
<td>PHY_TIMING</td><td><code>0xb4</code></td>
|
|
|
|
|
|
<td><code>0x{phy_t:08x}</code></td>
|
|
|
|
|
|
<td>lpx={fv['lpx']} hs_exit={fv['hs_exit']}</td>
|
|
|
|
|
|
</tr>
|
|
|
|
|
|
<tr>
|
|
|
|
|
|
<td>PHY_TIMING1</td><td><code>0xb8</code></td>
|
|
|
|
|
|
<td><code>0x{phy_t1:08x}</code></td>
|
|
|
|
|
|
<td>clk_prepare={fv['clk_prepare']} clk_zero={fv['clk_zero']}
|
|
|
|
|
|
clk_post={fv['clk_post']} clk_trail={fv['clk_trail']}</td>
|
|
|
|
|
|
</tr>
|
|
|
|
|
|
<tr>
|
|
|
|
|
|
<td>PHY_TIMING2</td><td><code>0xbc</code></td>
|
|
|
|
|
|
<td><code>0x{phy_t2:08x}</code></td>
|
|
|
|
|
|
<td>hs_prepare={fv['hs_prepare']} hs_zero={fv['hs_zero']}
|
|
|
|
|
|
hs_trail={fv['hs_trail']}</td>
|
|
|
|
|
|
</tr>
|
|
|
|
|
|
</table>
|
|
|
|
|
|
|
|
|
|
|
|
<h3>u-boot Commands</h3>
|
|
|
|
|
|
<pre style="background:#f5f5f5;padding:12px;border-radius:4px;
|
|
|
|
|
|
white-space:pre-wrap;font-size:0.88em">{uboot}</pre>
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def _log_interaction(
|
|
|
|
|
|
events: list,
|
|
|
|
|
|
ts: str,
|
|
|
|
|
|
iteration: int,
|
|
|
|
|
|
suspects: list[LPMetrics],
|
|
|
|
|
|
claude_said: bool,
|
|
|
|
|
|
user_confirmed: bool | None,
|
|
|
|
|
|
reasoning: str,
|
|
|
|
|
|
) -> None:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Append an event to the in-memory list and to interactive_log.csv.
|
|
|
|
|
|
|
|
|
|
|
|
user_confirmed values:
|
|
|
|
|
|
True — operator confirmed flicker
|
|
|
|
|
|
False — operator said no (false alarm)
|
|
|
|
|
|
None — operator was not asked (Claude said no)
|
|
|
|
|
|
"""
|
|
|
|
|
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
events.append({
|
|
|
|
|
|
"ts": ts,
|
|
|
|
|
|
"iteration": iteration,
|
|
|
|
|
|
"suspects": suspects,
|
|
|
|
|
|
"claude_said": claude_said,
|
|
|
|
|
|
"user_confirmed": user_confirmed,
|
|
|
|
|
|
"reasoning": reasoning,
|
|
|
|
|
|
"logged_at": now_str,
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
INTERACTIVE_LOG.parent.mkdir(exist_ok=True)
|
|
|
|
|
|
write_header = not INTERACTIVE_LOG.exists()
|
|
|
|
|
|
with open(INTERACTIVE_LOG, "a", newline="", encoding="utf-8") as f:
|
|
|
|
|
|
w = _csv_mod.writer(f)
|
|
|
|
|
|
if write_header:
|
|
|
|
|
|
w.writerow(["logged_at", "capture_ts", "capture_num",
|
|
|
|
|
|
"claude_said_flicker", "user_confirmed",
|
|
|
|
|
|
"lp_low_ns", "reasoning_summary"])
|
|
|
|
|
|
for m in suspects:
|
|
|
|
|
|
w.writerow([
|
|
|
|
|
|
now_str, ts, f"{iteration:04d}",
|
|
|
|
|
|
"YES" if claude_said else "NO",
|
|
|
|
|
|
("YES" if user_confirmed is True else
|
|
|
|
|
|
"NO" if user_confirmed is False else
|
|
|
|
|
|
"NOT_ASKED"),
|
|
|
|
|
|
m.lp_low_duration_ns,
|
|
|
|
|
|
reasoning[:150].replace("\n", " "),
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_report(events: list, stop_reason: str,
|
|
|
|
|
|
config: dict | None = None) -> Path:
|
|
|
|
|
|
"""Write a timestamped HTML report summarising all flicker interactions."""
|
|
|
|
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
|
filename = now.strftime("%Y%m%d_%H%M%S_interactive.html")
|
|
|
|
|
|
path = REPORTS_DIR / filename
|
|
|
|
|
|
|
|
|
|
|
|
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
|
|
|
|
|
|
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
|
|
|
|
|
|
claude_no_n = sum(1 for e in events if e["user_confirmed"] is None)
|
|
|
|
|
|
|
|
|
|
|
|
# ── Event table rows ───────────────────────────────────────────────────
|
|
|
|
|
|
rows_html = ""
|
|
|
|
|
|
for e in events:
|
|
|
|
|
|
for m in e["suspects"]:
|
|
|
|
|
|
conf = e["user_confirmed"]
|
|
|
|
|
|
if conf is True:
|
|
|
|
|
|
badge = ('<span style="color:#c62828;font-weight:bold">'
|
|
|
|
|
|
'✖ CONFIRMED FLICKER</span>')
|
|
|
|
|
|
elif conf is False:
|
|
|
|
|
|
badge = ('<span style="color:#2e7d32;font-weight:bold">'
|
|
|
|
|
|
'✓ FALSE ALARM</span>')
|
|
|
|
|
|
else:
|
|
|
|
|
|
badge = ('<span style="color:#e65100">'
|
|
|
|
|
|
'Claude said NO — user not asked</span>')
|
|
|
|
|
|
|
|
|
|
|
|
lp_val = m.lp_low_duration_ns
|
|
|
|
|
|
lp_bad = lp_val is None or lp_val < 50
|
|
|
|
|
|
lp_cell = (f'<td style="color:red">{lp_val} ns</td>' if lp_bad
|
|
|
|
|
|
else f'<td>{lp_val} ns</td>')
|
|
|
|
|
|
|
|
|
|
|
|
rows_html += (
|
|
|
|
|
|
f"<tr>"
|
|
|
|
|
|
f"<td>{e['iteration']:04d}</td>"
|
|
|
|
|
|
f"<td>{e['ts']}</td>"
|
|
|
|
|
|
f"<td>{m.channel}</td>"
|
|
|
|
|
|
f"{lp_cell}"
|
|
|
|
|
|
f"<td>{m.lp11_to_hs_ns} ns</td>"
|
|
|
|
|
|
f"<td>{m.lp11_voltage_v} V</td>"
|
|
|
|
|
|
f"<td>{'YES' if e['claude_said'] else 'NO'}</td>"
|
|
|
|
|
|
f"<td>{badge}</td>"
|
|
|
|
|
|
f"</tr>"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
table_html = (
|
|
|
|
|
|
'<p>No flicker suspects were detected during this test run.</p>'
|
|
|
|
|
|
if not rows_html else
|
|
|
|
|
|
f"""<table>
|
|
|
|
|
|
<tr>
|
|
|
|
|
|
<th>Capture</th><th>Timestamp</th><th>Channel</th>
|
|
|
|
|
|
<th>LP-low plateau</th><th>LP exit→HS</th><th>LP-11 voltage</th>
|
|
|
|
|
|
<th>Claude: flicker?</th><th>Outcome</th>
|
|
|
|
|
|
</tr>
|
|
|
|
|
|
{rows_html}
|
|
|
|
|
|
</table>"""
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# ── D-PHY config section ───────────────────────────────────────────────
|
|
|
|
|
|
config_html = _config_section_html(config) if config else ""
|
|
|
|
|
|
|
|
|
|
|
|
# ── Claude reasoning sections ──────────────────────────────────────────
|
|
|
|
|
|
reasoning_html = ""
|
|
|
|
|
|
for e in events:
|
|
|
|
|
|
if not e["reasoning"] or not e["claude_said"]:
|
|
|
|
|
|
continue
|
|
|
|
|
|
conf = e["user_confirmed"]
|
|
|
|
|
|
label = (" — CONFIRMED FLICKER" if conf is True else
|
|
|
|
|
|
" — FALSE ALARM" if conf is False else "")
|
|
|
|
|
|
reasoning_html += (
|
|
|
|
|
|
f'<h3>Capture {e["iteration"]:04d} [{e["ts"]}]{html.escape(label)}</h3>'
|
|
|
|
|
|
f'<pre style="background:#f5f5f5;padding:12px;border-radius:4px;'
|
|
|
|
|
|
f'white-space:pre-wrap;font-size:0.88em">'
|
|
|
|
|
|
f'{html.escape(e["reasoning"])}</pre>'
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
html_content = f"""<!DOCTYPE html>
|
|
|
|
|
|
<html lang="en">
|
|
|
|
|
|
<head>
|
|
|
|
|
|
<meta charset="UTF-8">
|
|
|
|
|
|
<title>MIPI Interactive Flicker Test — {now.strftime('%Y-%m-%d %H:%M:%S')}</title>
|
|
|
|
|
|
<style>
|
|
|
|
|
|
body {{ font-family: Arial, sans-serif; max-width: 1020px; margin: 40px auto;
|
|
|
|
|
|
padding: 0 20px; color: #222; }}
|
|
|
|
|
|
h1 {{ color: #1a3a5c; border-bottom: 2px solid #1a3a5c; padding-bottom: 8px; }}
|
|
|
|
|
|
h2 {{ color: #1a3a5c; margin-top: 32px; }}
|
|
|
|
|
|
h3 {{ color: #333; }}
|
|
|
|
|
|
.meta {{ color: #555; font-size: 0.92em; margin-top: -6px; margin-bottom: 20px; }}
|
|
|
|
|
|
.stop-box {{ background: #e8f4fd; border-left: 4px solid #1a3a5c;
|
|
|
|
|
|
padding: 10px 16px; margin: 16px 0 24px; border-radius: 3px; }}
|
|
|
|
|
|
.stat {{ display: inline-block; margin: 0 16px 20px 0; padding: 12px 22px;
|
|
|
|
|
|
border-radius: 6px; font-size: 1.05em; font-weight: bold; }}
|
|
|
|
|
|
.s-confirmed {{ background: #fdecea; border: 2px solid #c62828; color: #c62828; }}
|
|
|
|
|
|
.s-false {{ background: #e8f5e9; border: 2px solid #2e7d32; color: #2e7d32; }}
|
|
|
|
|
|
.s-claude-no {{ background: #fff8e1; border: 2px solid #f9a825; color: #795548; }}
|
|
|
|
|
|
table {{ border-collapse: collapse; width: 100%; margin-top: 8px; }}
|
|
|
|
|
|
th {{ background: #1a3a5c; color: white; padding: 7px 10px; text-align: left; }}
|
|
|
|
|
|
td {{ border: 1px solid #ddd; padding: 5px 10px; }}
|
|
|
|
|
|
tr:nth-child(even) {{ background: #fafafa; }}
|
|
|
|
|
|
pre {{ margin: 0; }}
|
|
|
|
|
|
</style>
|
|
|
|
|
|
</head>
|
|
|
|
|
|
<body>
|
|
|
|
|
|
|
|
|
|
|
|
<h1>MIPI Interactive Flicker Test Report</h1>
|
|
|
|
|
|
<p class="meta">
|
|
|
|
|
|
Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
|
|
|
Model: {CLAUDE_MODEL}
|
|
|
|
|
|
</p>
|
|
|
|
|
|
|
|
|
|
|
|
<div class="stop-box">
|
|
|
|
|
|
<strong>Stop reason:</strong> {html.escape(stop_reason)}
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
|
|
<div>
|
|
|
|
|
|
<div class="stat s-confirmed">{confirmed_n} confirmed flicker(s)</div>
|
|
|
|
|
|
<div class="stat s-false">{false_alarm_n} false alarm(s)</div>
|
|
|
|
|
|
<div class="stat s-claude-no">{claude_no_n} Claude said no</div>
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
|
|
{config_html}
|
|
|
|
|
|
<h2>Event Log</h2>
|
|
|
|
|
|
{table_html}
|
|
|
|
|
|
|
|
|
|
|
|
{'<h2>Claude Assessments</h2>' + reasoning_html if reasoning_html else ''}
|
|
|
|
|
|
|
|
|
|
|
|
</body>
|
|
|
|
|
|
</html>
|
|
|
|
|
|
"""
|
|
|
|
|
|
path.write_text(html_content, encoding="utf-8")
|
|
|
|
|
|
return path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Interactive test loop
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def run_interactive_test() -> None:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Blocking interactive test loop (runs in the calling thread — no background threads).
|
|
|
|
|
|
|
|
|
|
|
|
Flow per iteration:
|
|
|
|
|
|
1. Display ON
|
|
|
|
|
|
2. Three-pass dual_capture → saves LP/sig/proto CSVs to scope
|
|
|
|
|
|
3. Transfer all scope CSVs to local data/
|
|
|
|
|
|
4. Analyse LP files for this iteration (rule-based + Claude if suspect)
|
|
|
|
|
|
5. If Claude says flicker:
|
|
|
|
|
|
- Keep display ON so operator can observe
|
|
|
|
|
|
- Ask operator to confirm
|
|
|
|
|
|
- Confirmed → log event, save report, STOP
|
|
|
|
|
|
- False alarm → log event, continue
|
|
|
|
|
|
6. Display OFF, 1 s pause, next iteration
|
|
|
|
|
|
|
|
|
|
|
|
Press Ctrl+C to exit at any time. A report is always saved on exit.
|
|
|
|
|
|
"""
|
|
|
|
|
|
iteration = 1
|
|
|
|
|
|
events: list = []
|
|
|
|
|
|
stop_reason = "Test stopped by user"
|
|
|
|
|
|
|
|
|
|
|
|
# ── Pixel clock configuration ──────────────────────────────────────────
|
|
|
|
|
|
config = prompt_for_config()
|
|
|
|
|
|
|
|
|
|
|
|
print("\nINTERACTIVE FLICKER TEST STARTED.")
|
|
|
|
|
|
print("Each iteration: display ON → 3-pass capture → LP analysis → Claude check.")
|
|
|
|
|
|
print("The display stays ON while Claude and the operator assess the frame.")
|
|
|
|
|
|
print("Press Ctrl+C at any time to stop.\n")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
while True:
|
|
|
|
|
|
# ── Display ON ─────────────────────────────────────────────────
|
|
|
|
|
|
try:
|
|
|
|
|
|
requests.put(URL, json={"state": "on"}, timeout=2)
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
print(f" WARNING: display ON failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
# ── Three-pass capture ─────────────────────────────────────────
|
|
|
|
|
|
ts = dual_capture(iteration)
|
|
|
|
|
|
|
|
|
|
|
|
# ── Transfer scope files to local data/ ────────────────────────
|
|
|
|
|
|
print(" TRANSFERRING FILES FROM SCOPE...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
copied, failed = ai_mgmt.transfer_csv_files()
|
|
|
|
|
|
print(f" TRANSFERRED {copied} FILE(S). {failed} FAILED.")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f" TRANSFER ERROR: {e}")
|
|
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
# ── Rule-based LP analysis ─────────────────────────────────────
|
|
|
|
|
|
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
|
|
|
|
|
|
|
|
|
|
|
|
followup_summaries: list[str] = []
|
|
|
|
|
|
if suspects:
|
|
|
|
|
|
# Take an LP-only follow-up capture before calling Claude.
|
|
|
|
|
|
# The visual flicker caused by a missed SoT appears one display
|
|
|
|
|
|
# frame later (pipeline lag), so the operator observes flicker
|
|
|
|
|
|
# on the frame AFTER the electrical event. Including the next
|
|
|
|
|
|
# frame gives Claude — and the operator — the correct context.
|
|
|
|
|
|
_, followup_summaries, _ = _lp_followup_capture(iteration)
|
|
|
|
|
|
|
|
|
|
|
|
# ── Call Claude if any rule-based suspect was found ────────────
|
|
|
|
|
|
claude_flicker = False
|
|
|
|
|
|
reasoning = ""
|
|
|
|
|
|
if suspects:
|
|
|
|
|
|
claude_flicker, reasoning = _call_claude(
|
|
|
|
|
|
ts, iteration, lp_summaries, suspects, config,
|
|
|
|
|
|
followup_summaries or None)
|
2026-04-16 11:23:25 +01:00
|
|
|
|
|
|
|
|
|
|
if claude_flicker:
|
|
|
|
|
|
# ── Keep display ON — ask operator ─────────────────────────
|
|
|
|
|
|
# Play alarm sound once to alert the operator
|
|
|
|
|
|
subprocess.run(
|
|
|
|
|
|
["pw-play", "/usr/share/sounds/freedesktop/stereo/alarm-clock-elapsed.oga"],
|
|
|
|
|
|
check=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
print("\n" + "=" * 64)
|
|
|
|
|
|
print(" CLAUDE SUSPECTS FLICKER — OBSERVE THE DISPLAY NOW")
|
|
|
|
|
|
print("=" * 64)
|
|
|
|
|
|
print(f"\n{reasoning}\n")
|
|
|
|
|
|
|
|
|
|
|
|
confirmed: bool | None = None
|
|
|
|
|
|
while confirmed is None:
|
|
|
|
|
|
try:
|
|
|
|
|
|
ans = input("IS THE SCREEN ACTUALLY FLICKERING? (Y/N): ").strip().upper()
|
|
|
|
|
|
except EOFError:
|
|
|
|
|
|
ans = "N"
|
|
|
|
|
|
if ans in ("Y", "YES"):
|
|
|
|
|
|
confirmed = True
|
|
|
|
|
|
elif ans in ("N", "NO"):
|
|
|
|
|
|
confirmed = False
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" Please enter Y or N.")
|
|
|
|
|
|
|
|
|
|
|
|
_log_interaction(events, ts, iteration, suspects,
|
|
|
|
|
|
claude_flicker, confirmed, reasoning)
|
|
|
|
|
|
|
|
|
|
|
|
if confirmed:
|
|
|
|
|
|
print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.")
|
|
|
|
|
|
stop_reason = (f"Flicker confirmed by operator at "
|
|
|
|
|
|
f"capture {iteration:04d} [{ts}]")
|
|
|
|
|
|
# Display OFF
|
|
|
|
|
|
try:
|
|
|
|
|
|
requests.put(URL, json={"state": "off"}, timeout=2)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
break # exit the while loop → save report below
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(" NOT FLICKERING — false alarm logged. Continuing.\n")
|
|
|
|
|
|
|
2026-04-20 10:34:42 +01:00
|
|
|
|
if suspects and not claude_flicker:
|
2026-04-16 11:23:25 +01:00
|
|
|
|
# Rule-based suspect but Claude said no — record for reference
|
|
|
|
|
|
_log_interaction(events, ts, iteration, suspects,
|
|
|
|
|
|
False, None, reasoning)
|
|
|
|
|
|
|
|
|
|
|
|
# ── Display OFF, brief pause before next iteration ─────────────
|
|
|
|
|
|
try:
|
|
|
|
|
|
requests.put(URL, json={"state": "off"}, timeout=2)
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
print(f" WARNING: display OFF failed: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
iteration += 1
|
|
|
|
|
|
time.sleep(1.0)
|
|
|
|
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
|
print("\n\n TEST INTERRUPTED (Ctrl+C).")
|
|
|
|
|
|
stop_reason = "Test interrupted by operator (Ctrl+C)"
|
|
|
|
|
|
try:
|
|
|
|
|
|
requests.put(URL, json={"state": "off"}, timeout=2)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# ── Always save a report on exit ───────────────────────────────────────
|
|
|
|
|
|
report_path = save_report(events, stop_reason, config)
|
|
|
|
|
|
print(f"\nREPORT SAVED: {report_path}")
|
|
|
|
|
|
if events:
|
|
|
|
|
|
confirmed_n = sum(1 for e in events if e["user_confirmed"] is True)
|
|
|
|
|
|
false_alarm_n = sum(1 for e in events if e["user_confirmed"] is False)
|
|
|
|
|
|
print(f" {confirmed_n} confirmed flicker(s), {false_alarm_n} false alarm(s) "
|
|
|
|
|
|
f"({len(events)} total suspect(s) assessed)")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Menu
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def main_menu() -> None:
|
|
|
|
|
|
while True:
|
|
|
|
|
|
print("\n===== MIPI INTERACTIVE TEST CONTROL =====")
|
|
|
|
|
|
print("1. RUN IDN CHECK (PSU & SCOPE)")
|
|
|
|
|
|
print("2. SETUP SCOPE (RUN FIRST)")
|
|
|
|
|
|
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
|
|
|
|
|
|
print("4. PSU OUTPUT ON/OFF (CH1)")
|
|
|
|
|
|
print("5. START INTERACTIVE FLICKER TEST")
|
|
|
|
|
|
print("6. EXIT")
|
|
|
|
|
|
|
|
|
|
|
|
choice = input("\nSELECT OPTION (1-6): ").strip()
|
|
|
|
|
|
|
|
|
|
|
|
if choice == '1':
|
|
|
|
|
|
print(f"PSU : {psu.ask('*IDN?').strip()}")
|
|
|
|
|
|
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
|
|
|
|
|
|
if rigol_scope.is_connected():
|
|
|
|
|
|
print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("RIGOL: NOT CONNECTED")
|
|
|
|
|
|
|
|
|
|
|
|
elif choice == '2':
|
|
|
|
|
|
setup_scope()
|
|
|
|
|
|
if rigol_scope.is_connected():
|
|
|
|
|
|
rigol_scope.configure()
|
|
|
|
|
|
|
|
|
|
|
|
elif choice == '3':
|
|
|
|
|
|
psu.write('CH1:VOLT 24.0')
|
|
|
|
|
|
psu.write('CH1:CURR 1.5')
|
|
|
|
|
|
print("PSU CONFIGURED: 24V / 1.5A")
|
|
|
|
|
|
|
|
|
|
|
|
elif choice == '4':
|
|
|
|
|
|
state = input("TYPE 'ON' OR 'OFF': ").strip().upper()
|
|
|
|
|
|
if state in ('ON', 'OFF'):
|
|
|
|
|
|
psu.write(f'OUTP CH1,{state}')
|
|
|
|
|
|
print(f"PSU OUTPUT {state}.")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("INVALID — TYPE 'ON' OR 'OFF'.")
|
|
|
|
|
|
|
|
|
|
|
|
elif choice == '5':
|
|
|
|
|
|
run_interactive_test()
|
|
|
|
|
|
|
|
|
|
|
|
elif choice == '6':
|
|
|
|
|
|
psu.close()
|
|
|
|
|
|
scope.close()
|
|
|
|
|
|
rigol_scope.disconnect()
|
|
|
|
|
|
print("INSTRUMENTS CLOSED. BYE.")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
print("INVALID ENTRY. PLEASE CHOOSE 1-6.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
main_menu()
|