diff --git a/__pycache__/analyze_captures.cpython-312.pyc b/__pycache__/analyze_captures.cpython-312.pyc
index 41b95be..41c16ec 100644
Binary files a/__pycache__/analyze_captures.cpython-312.pyc and b/__pycache__/analyze_captures.cpython-312.pyc differ
diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc
index 38ac48f..7478e7c 100644
Binary files a/__pycache__/csv_preprocessor.cpython-312.pyc and b/__pycache__/csv_preprocessor.cpython-312.pyc differ
diff --git a/__pycache__/rigol_scope.cpython-312.pyc b/__pycache__/rigol_scope.cpython-312.pyc
index 83fd44c..e2b592e 100644
Binary files a/__pycache__/rigol_scope.cpython-312.pyc and b/__pycache__/rigol_scope.cpython-312.pyc differ
diff --git a/analyze_captures.py b/analyze_captures.py
index 5f41931..c28c5b4 100644
--- a/analyze_captures.py
+++ b/analyze_captures.py
@@ -22,8 +22,8 @@ from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
from csv_preprocessor import (
- analyze_file, analyze_lp_file, analyze_1v8_file,
- group_captures, ChannelMetrics, LPMetrics, V1V8Metrics,
+ analyze_file, analyze_lp_file, analyze_1v8_file, analyze_reg_file,
+ group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, RegDump,
)
DATA_DIR = Path(__file__).parent / "data"
@@ -33,14 +33,20 @@ CLAUDE_MODEL = "claude-opus-4-6"
SYSTEM_PROMPT = (
"You are an expert in MIPI D-PHY signal integrity analysis. "
"You will be given compact pre-processed summaries of oscilloscope captures "
- "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements. "
- "The MIPI PHY (NXP i.MX 8M Mini) drives LP states from the 1.8 V VDDIO. "
- "Each capture has up to four data sets: "
+ "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements "
+ "and DSI controller register snapshots (Samsung DSIM IP on NXP i.MX 8M Mini). "
+ "The MIPI PHY drives LP states from the 1.8 V VDDIO. "
+ "Each capture has up to five data sets: "
"sig (high-res HS quality), proto (long-window HS stats), "
"lp (single-ended LP-11/LP-00/HS burst including SoT sequence), "
- "and pwr (1.8 V supply captured during the LP→HS transition). "
+ "pwr (1.8 V supply captured during the LP→HS transition), "
+ "and reg (DSIM register snapshot — DSIM_PHYTIMING at 0x32e100b4, "
+ "DSIM_PHYTIMING1 at 0xb8, DSIM_PHYTIMING2 at 0xbc control LP state durations "
+ "and PHY clock timing; DSIM_CLKCTRL at 0x08 and DSIM_ESCMODE at 0x14 affect "
+ "LP escape mode and HS entry sequencing). "
"Analyse the data for trends, degradation, anomalies, or consistent spec concerns "
- "across captures. Be concise and actionable."
+ "across captures. Correlate register values with observed LP timing violations. "
+ "Be concise and actionable."
)
@@ -48,24 +54,68 @@ SYSTEM_PROMPT = (
# Helpers
# ---------------------------------------------------------------------------
+FLICKER_LOG = Path(__file__).parent / "reports" / "flicker_log.csv"
+
+
+def _classify_flicker(keys: list, flicker_suspects: list) -> tuple[int, int]:
+ """
+ Count flicker events (display sessions that flickered) vs total sessions in this batch.
+
+ Each test iteration is one complete display load/unload session. Flicker is
+ per-session: it occurs at pipeline load, persists for that session only, then
+ clears automatically on the next load. A single suspect capture IS a genuine
+ flicker event — not a measurement artifact — because the LP pass fires at startup.
+
+ Returns (flicker_sessions, total_sessions).
+ """
+ if not flicker_suspects:
+ return 0, len(keys)
+
+ # Count unique capture numbers that had at least one flicker suspect
+ suspect_sessions = {m.capture_num for m in flicker_suspects}
+ return len(suspect_sessions), len(keys)
+
+
+def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None:
+ """Append a flicker suspect entry to the persistent flicker log."""
+ FLICKER_LOG.parent.mkdir(exist_ok=True)
+ write_header = not FLICKER_LOG.exists()
+ with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
+ import csv as _csv
+ w = _csv.writer(f)
+ if write_header:
+ w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
+ "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
+ w.writerow([
+ datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
+ ts, f"{num:04d}", m.channel,
+ m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
+ ])
+
+
def process_capture(
ts: str,
num: int,
files: dict[str, Path],
verbose: bool = False,
-) -> tuple[str, list[ChannelMetrics]]:
+) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]:
"""
Run the pre-processor on all CSV files for one capture.
- Returns (text_summary, list_of_metrics).
+ Returns (text_summary, metrics_list, flicker_suspects, reg_dumps).
Missing files produce a one-line note instead of crashing.
"""
lines = [f"=== Capture {num:04d} {ts} ==="]
- metrics_list: list[ChannelMetrics | LPMetrics] = []
+ metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = []
+ flicker_suspects: list[LPMetrics] = []
+ reg_dumps: list[RegDump] = []
- for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8"):
+ for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat",
+ "pwr_1v8", "reg"):
if key not in files:
if key == "pwr_1v8":
lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)")
+ elif key == "reg":
+ lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)")
else:
lines.append(f" [{key}] MISSING")
continue
@@ -74,27 +124,65 @@ def process_capture(
m = analyze_lp_file(files[key])
elif key == "pwr_1v8":
m = analyze_1v8_file(files[key])
+ elif key == "reg":
+ m = analyze_reg_file(files[key])
else:
m = analyze_file(files[key])
lines.append(m.summary())
metrics_list.append(m)
if verbose:
print(m.summary())
+ # Real-time flicker detection — log and alert immediately
+ if isinstance(m, LPMetrics) and m.flicker_suspect:
+ flicker_suspects.append(m)
+ _log_flicker_event(ts, num, m)
+ print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] "
+ f"lp_low={m.lp_low_duration_ns} ns ***\n")
+ if isinstance(m, RegDump):
+ reg_dumps.append(m)
except Exception as exc:
lines.append(f" [{key}] ERROR: {exc}")
- return "\n".join(lines), metrics_list
+ return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps
-def build_prompt(all_summaries: list[str]) -> str:
+def build_prompt(all_summaries: list[str], flicker_suspects: list = None,
+ flicker_count: int = 0, total_sessions: int = 0) -> str:
body = "\n\n".join(all_summaries)
+
+ flicker_section = ""
+ if flicker_suspects and flicker_count > 0:
+ items = "\n".join(
+ f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} "
+ f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns "
+ f"lp11_v={m.lp11_voltage_v} V"
+ for m in flicker_suspects
+ )
+ rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)"
+ flicker_section = (
+ f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n"
+ f"Affected captures:\n{items}\n"
+ "Each capture is one complete display pipeline load/unload cycle. Flicker is "
+ "per-session: it occurs at pipeline load and persists for that session only, then "
+ "clears automatically on the next load. A flagged capture therefore represents a "
+ "genuine flicker event, not a measurement artifact.\n"
+ "LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief "
+ "for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it "
+ "to drop a frame and produce visible flicker.\n"
+ "Focus your analysis on WHY the SoT sequence is being violated at pipeline startup "
+ "and what register setting, supply condition, or hardware change would prevent it.\n"
+ )
+
return (
- "Below are pre-processed summaries of MIPI D-PHY captures. "
- "Each capture has three passes per lane (CLK and DAT0):\n"
+ "Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore "
+ "8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge. "
+ "The system occasionally flickers at display pipeline load. "
+ "Each capture has up to four data sets per lane (CLK and DAT0):\n"
" sig — high-res HS differential (rise/fall times)\n"
" proto — long-window HS differential (jitter, clock freq, amplitude)\n"
- " lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n"
- " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n\n"
+ " lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n"
+ " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n"
+ f"{flicker_section}\n"
f"{body}\n\n"
"Please:\n"
"1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n"
@@ -106,11 +194,15 @@ def build_prompt(all_summaries: list[str]) -> str:
"5. For any ERROR or WARNING lines in the summaries, explain the most likely cause "
" (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n"
"6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n"
- "7. Summarise overall signal health in 2–3 sentences."
+ "7. Summarise overall signal health and flicker risk in 2–3 sentences."
)
-def save_html_report(analysis: str, token_line: str, keys: list) -> Path:
+def save_html_report(analysis: str, token_line: str, keys: list,
+ flicker_suspects: list = None,
+ flicker_count: int = 0,
+ total_sessions: int = 0,
+ all_reg_dumps: list = None) -> Path:
"""Write a timestamped HTML report to the reports/ directory."""
REPORTS_DIR.mkdir(exist_ok=True)
now = datetime.now()
@@ -148,6 +240,71 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path:
body_html = text_to_html(analysis)
+ flicker_banner = ""
+ if flicker_suspects and flicker_count > 0:
+ rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
+ rate_str = f"{flicker_count} of {total_sessions} display load sessions ({rate_pct:.0f}%)"
+ rows = "".join(
+ f"
| {m.capture_num:04d} | {m.timestamp} | {m.channel} | "
+ f"{m.lp_low_duration_ns} ns | "
+ f"{m.lp11_to_hs_ns} ns | {m.lp11_voltage_v} V |
"
+ for m in flicker_suspects
+ )
+ flicker_banner = f"""
+
+
⚠ FLICKER DETECTED — {rate_str} flickered
+
Each flagged capture is a genuine flicker event (not an artifact) — the LP pass fires at
+ pipeline startup, so a missing or sub-50 ns LP-low plateau means the SN65DSI83 bridge
+ missed the SoT sequence and dropped a frame.
+ LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief
+ for the SN65DSI83 bridge to detect start-of-transmission.
+
+ | Capture | Timestamp | Channel |
+ LP-low plateau | LP exit→HS | LP-11 voltage |
+ {rows}
+
+
"""
+
+ # --- Register table (collapsible) ---
+ reg_section = ""
+ if all_reg_dumps:
+ # Collect all unique addresses in order they first appear
+ addr_order = []
+ addr_names = {}
+ for rd in all_reg_dumps:
+ for r in rd.registers:
+ if r["address"] not in addr_names:
+ addr_order.append(r["address"])
+ addr_names[r["address"]] = r.get("name", "")
+
+ if addr_order:
+ header_cells = "".join(
+ f"{html.escape(addr)} {html.escape(addr_names[addr])} | "
+ for addr in addr_order
+ )
+ rows_html = ""
+ for rd in all_reg_dumps:
+ reg_map = {r["address"]: r["value"] for r in rd.registers}
+ cells = "".join(
+ f"{html.escape(reg_map.get(addr, '—'))} | "
+ for addr in addr_order
+ )
+ rows_html += f"| {rd.capture_num:04d} | {rd.timestamp} | {cells}
"
+
+ reg_section = f"""
+
+
+ DSI Register Snapshots ({len(all_reg_dumps)} captures)
+
+
+
+ | Capture | Timestamp | {header_cells}
+ {rows_html}
+
+
+ """
+
html_content = f"""
@@ -161,11 +318,22 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path:
ol, ul {{ line-height: 1.8; padding-left: 24px; }}
li {{ margin: 4px 0; }}
.tokens {{ color: #888; font-size: 0.8em; margin-top: 32px; border-top: 1px solid #ddd; padding-top: 8px; }}
+ .flicker-alert {{ background: #fff3cd; border: 2px solid #e65100; border-radius: 6px;
+ padding: 16px 20px; margin-bottom: 28px; }}
+ .flicker-alert h2 {{ color: #e65100; margin-top: 0; }}
+ .flicker-alert table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }}
+ .flicker-alert th {{ background: #e65100; color: white; padding: 6px 10px; text-align: left; }}
+ .flicker-alert td {{ border: 1px solid #ccc; padding: 5px 10px; }}
+ table {{ border-collapse: collapse; width: 100%; }}
+ th {{ background: #1a3a5c; color: white; padding: 6px 10px; text-align: left; }}
+ td {{ border: 1px solid #ddd; padding: 5px 10px; }}
@media print {{ body {{ margin: 20px; }} }}
MIPI D-PHY Analysis Report
+{flicker_banner}
+{reg_section}
Generated: {date_str} |
Scope: {cap_range} |
@@ -198,17 +366,22 @@ def run_analysis(last: int = 10) -> None:
print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...")
all_summaries: list[str] = []
+ all_flicker_suspects: list[LPMetrics] = []
+ all_reg_dumps: list[RegDump] = []
for ts, num in keys:
- summary_text, _ = process_capture(ts, num, groups[(ts, num)])
+ summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)])
all_summaries.append(summary_text)
+ all_flicker_suspects.extend(suspects)
+ all_reg_dumps.extend(reg_dumps)
- prompt = build_prompt(all_summaries)
+ flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
+ prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...")
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
- max_tokens = 3072,
+ max_tokens = 4096,
system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}],
)
@@ -225,8 +398,14 @@ def run_analysis(last: int = 10) -> None:
print(separator + "\n")
# ── HTML report ───────────────────────────────────────────────────────
- report_path = save_html_report(analysis, token_line, keys)
+ report_path = save_html_report(analysis, token_line, keys,
+ all_flicker_suspects, flicker_count, total_sessions,
+ all_reg_dumps)
print(f"[ANALYSIS] Report saved to {report_path}")
+ if flicker_count > 0:
+ rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
+ print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
+ f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***")
def main() -> None:
@@ -263,14 +442,21 @@ def main() -> None:
# --- Run pre-processor ---
all_summaries: list[str] = []
+ all_flicker_suspects: list[LPMetrics] = []
+ all_reg_dumps: list[RegDump] = []
for ts, num in keys:
- summary_text, _ = process_capture(ts, num, groups[(ts, num)], verbose=args.verbose)
+ summary_text, _, suspects, reg_dumps = process_capture(
+ ts, num, groups[(ts, num)], verbose=args.verbose)
all_summaries.append(summary_text)
+ all_flicker_suspects.extend(suspects)
+ all_reg_dumps.extend(reg_dumps)
if not args.verbose:
- print(f" Processed capture {num:04d} {ts}")
+ flag = " *** FLICKER SUSPECT ***" if suspects else ""
+ print(f" Processed capture {num:04d} {ts}{flag}")
# --- Build Claude prompt ---
- prompt = build_prompt(all_summaries)
+ flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
+ prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
if args.dry_run:
print("\n--- Prompt that would be sent to Claude ---")
@@ -282,7 +468,7 @@ def main() -> None:
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
- max_tokens = 3072,
+ max_tokens = 4096,
system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}],
)
@@ -297,8 +483,14 @@ def main() -> None:
print(separator)
# HTML report
- report_path = save_html_report(analysis, token_line, keys)
+ report_path = save_html_report(analysis, token_line, keys,
+ all_flicker_suspects, flicker_count, total_sessions,
+ all_reg_dumps)
print(f"\nReport saved to {report_path}")
+ if flicker_count > 0:
+ rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
+ print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
+ f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***")
if __name__ == "__main__":
diff --git a/csv_preprocessor.py b/csv_preprocessor.py
index 77f4242..5a11b4c 100644
--- a/csv_preprocessor.py
+++ b/csv_preprocessor.py
@@ -16,6 +16,7 @@ File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv
"""
import csv
+import json
import re
import numpy as np
from dataclasses import dataclass, field
@@ -42,10 +43,14 @@ LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (bot
LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low
# Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset
# The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low.
-LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
-LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
-LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
-HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
+LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
+LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
+LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
+HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
+
+# Flicker detection threshold
+# LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk
+FLICKER_LP_LOW_MAX_NS = 50.0 # ns
@dataclass
@@ -420,6 +425,46 @@ def analyze_1v8_file(path: Path) -> "V1V8Metrics":
)
+@dataclass
+class RegDump:
+ """DSI controller register snapshot read from device via memtool."""
+ timestamp: str
+ capture_num: int
+ commands: list # list of memtool command strings that were run
+ registers: list # [{"address": "0x...", "value": "0x...", "name": "..."}, ...]
+ errors: list # any device-side errors
+
+ def summary(self) -> str:
+ lines = [f"Capture {self.capture_num:04d} {self.timestamp} [reg/dsi_phy]"]
+ if self.errors:
+ for err in self.errors:
+ lines.append(f" WARNING: {err}")
+ if not self.registers:
+ lines.append(" No registers captured")
+ return "\n".join(lines)
+ lines.append(f" Commands : {'; '.join(self.commands)}")
+ for r in self.registers:
+ name = f" ({r['name']})" if r.get("name") else ""
+ lines.append(f" {r['address']} : {r['value']}{name}")
+ return "\n".join(lines)
+
+
+def analyze_reg_file(path: Path) -> "RegDump":
+ """Read a register JSON file saved by mipi_test._fetch_registers()."""
+ m = re.match(r"(\d{8}_\d{6})_reg_(\d+)\.json", path.name, re.IGNORECASE)
+ if not m:
+ raise ValueError(f"Filename does not match register pattern: {path.name}")
+ timestamp, cap_str = m.groups()
+ data = json.loads(path.read_text())
+ return RegDump(
+ timestamp = timestamp,
+ capture_num = int(cap_str),
+ commands = data.get("commands", []),
+ registers = data.get("registers", []),
+ errors = data.get("errors") or [],
+ )
+
+
def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
"""
Scan data_dir and group CSV files by (timestamp, capture_number).
@@ -427,17 +472,30 @@ def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
Example key: ("20260408_111448", 1)
Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...}
"""
- pattern = re.compile(
+ csv_pattern = re.compile(
r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE
)
+ reg_pattern = re.compile(
+ r"(\d{8}_\d{6})_reg_(\d+)\.json", re.IGNORECASE
+ )
groups: dict[tuple[str, int], dict[str, Path]] = {}
+
for f in sorted(data_dir.glob("*.csv")):
- m = pattern.match(f.name)
+ m = csv_pattern.match(f.name)
if not m:
continue
ts, ftype, cap_str, ch = m.groups()
key = (ts, int(cap_str))
groups.setdefault(key, {})[f"{ftype}_{ch}"] = f
+
+ for f in sorted(data_dir.glob("*.json")):
+ m = reg_pattern.match(f.name)
+ if not m:
+ continue
+ ts, cap_str = m.groups()
+ key = (ts, int(cap_str))
+ groups.setdefault(key, {})["reg"] = f
+
return groups
@@ -470,6 +528,11 @@ class LPMetrics:
lp_transition_valid: bool # LP-11 → LP-low → HS sequence present
+ # Flicker detection
+ # A capture is flagged when the LP-low plateau is absent or shorter than
+ # FLICKER_LP_LOW_MAX_NS. Normal captures show ~340 ns; flicker shows 0–50 ns.
+ flicker_suspect: bool = False
+
warnings: list = field(default_factory=list)
def summary(self) -> str:
@@ -501,6 +564,8 @@ class LPMetrics:
+ (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else ""))
if self.hs_amplitude_mv is not None:
lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)")
+ if self.flicker_suspect:
+ lines.append(f" *** FLICKER SUSPECT: LP-low plateau absent or < {FLICKER_LP_LOW_MAX_NS:.0f} ns ***")
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
@@ -648,6 +713,15 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
if n_hs_bursts == 0:
warnings.append("No HS bursts detected after LP transition")
+ # Flicker suspect: LP→HS sequence detected but LP-low plateau is absent or too short.
+ # Normal captures show ~340 ns; the confirmed flicker capture showed 0 ns.
+ # Only flag DAT lane (CLK is continuous HS — LP states not expected).
+ flicker_suspect = (
+ channel == "dat"
+ and lp_transition_valid
+ and (lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS)
+ )
+
return LPMetrics(
timestamp = timestamp,
capture_num = capture_num,
@@ -663,6 +737,7 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
hs_burst_dur_ns = hs_burst_dur_ns,
hs_amplitude_mv = hs_amplitude_mv,
lp_transition_valid = lp_transition_valid,
+ flicker_suspect = flicker_suspect,
warnings = warnings,
)
diff --git a/device_server.py b/device_server.py
new file mode 100644
index 0000000..c5640ce
--- /dev/null
+++ b/device_server.py
@@ -0,0 +1,122 @@
+"""
+device_server.py — deploy this on the target device (192.168.45.8)
+
+Provides:
+ PUT /display {"state": "on"|"off"} — blank/unblank framebuffer
+ GET /registers — read MIPI DSI PHY registers via memtool
+
+Add addresses to REGISTER_COMMANDS to capture more register ranges.
+"""
+
+import os
+import re
+import subprocess
+
+from flask import Flask, jsonify, request
+
+app = Flask(__name__)
+
+# ---------------------------------------------------------------------------
+# Register commands to execute on each GET /registers request.
+# Each entry is a complete memtool command string.
+# ---------------------------------------------------------------------------
+REGISTER_COMMANDS = [
+ "memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2
+]
+
+# Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini)
+_DSIM_NAMES = {
+ 0x32e10004: "DSIM_STATUS",
+ 0x32e10008: "DSIM_CLKCTRL",
+ 0x32e1000c: "DSIM_TIMEOUT",
+ 0x32e10010: "DSIM_CONFIG",
+ 0x32e10014: "DSIM_ESCMODE",
+ 0x32e100ac: "DSIM_PHYACCHR",
+ 0x32e100b0: "DSIM_PHYACCHR1",
+ 0x32e100b4: "DSIM_PHYTIMING",
+ 0x32e100b8: "DSIM_PHYTIMING1",
+ 0x32e100bc: "DSIM_PHYTIMING2",
+}
+
+
+def _parse_memtool_output(raw: str) -> list:
+ """
+ Parse 'memtool md -l' output into a list of dicts.
+
+ Handles both formats:
+ 32e100b4: 00000001 12345678 ...
+ 0x32e100b4: 0x00000001 0x12345678 ...
+ """
+ registers = []
+ for line in raw.splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ m = re.match(r"(?:0x)?([0-9a-fA-F]+)\s*:\s*(.+)", line)
+ if not m:
+ continue
+ base_addr = int(m.group(1), 16)
+ values = re.findall(r"[0-9a-fA-F]{8}", m.group(2))
+ for i, val in enumerate(values):
+ addr = base_addr + i * 4
+ registers.append({
+ "address": f"0x{addr:08x}",
+ "value": f"0x{val.lower()}",
+ "name": _DSIM_NAMES.get(addr, ""),
+ })
+ return registers
+
+
+# ---------------------------------------------------------------------------
+# Routes
+# ---------------------------------------------------------------------------
+
+@app.route("/display", methods=["PUT"])
+def control_display():
+ data = request.get_json()
+ state = data.get("state", "").lower()
+ if state == "off":
+ os.system("echo 4 > /sys/class/graphics/fb0/blank")
+ return jsonify({"status": "Display OFF"}), 200
+ elif state == "on":
+ os.system("echo 0 > /sys/class/graphics/fb0/blank")
+ return jsonify({"status": "Display ON"}), 200
+ else:
+ return jsonify({"error": "Invalid state. Use 'on' or 'off'"}), 400
+
+
+@app.route("/registers", methods=["GET"])
+def get_registers():
+ """Read MIPI DSI PHY timing registers via memtool and return JSON."""
+ all_registers = []
+ raw_lines = []
+ errors = []
+
+ for cmd_str in REGISTER_COMMANDS:
+ try:
+ result = subprocess.run(
+ cmd_str.split(), capture_output=True, text=True, timeout=5
+ )
+ raw = result.stdout.strip()
+ if raw:
+ raw_lines.append(raw)
+ all_registers.extend(_parse_memtool_output(raw))
+ if result.returncode != 0 and result.stderr.strip():
+ errors.append(f"{cmd_str}: {result.stderr.strip()}")
+ except FileNotFoundError:
+ errors.append(f"{cmd_str}: memtool not found in PATH")
+ except subprocess.TimeoutExpired:
+ errors.append(f"{cmd_str}: timed out after 5 s")
+ except Exception as e:
+ errors.append(f"{cmd_str}: {e}")
+
+ return jsonify({
+ "commands": REGISTER_COMMANDS,
+ "registers": all_registers,
+ "raw": "\n".join(raw_lines),
+ "errors": errors if errors else None,
+ }), 200
+
+
+if __name__ == "__main__":
+ app.run(host="0.0.0.0", port=5000)
diff --git a/mipi_test.py b/mipi_test.py
index e3eea55..9d6d383 100644
--- a/mipi_test.py
+++ b/mipi_test.py
@@ -7,6 +7,7 @@ VERSION: 0.3
AUTHOR: D. RICE 25/03/2026
© 2026 ARRIVE
"""
+import json
import vxi11
import time
import sys
@@ -19,10 +20,11 @@ import analyze_captures
import rigol_scope
# --- Configuration ---
-URL = "http://192.168.45.8:5000/display"
+DEVICE_BASE = "http://192.168.45.8:5000"
+URL = f"{DEVICE_BASE}/display"
SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3"
-MGMT_INTERVAL = 60 # seconds between management runs (set to 3600 for hourly)
+MGMT_INTERVAL = 3600 # seconds between management runs (3600 = 1 hour)
# --- Capture settings ---
# Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit)
@@ -278,54 +280,59 @@ def _restore_hs_config():
time.sleep(0.1)
+def _fetch_registers(ts: str, iteration: int) -> None:
+ """
+ GET /registers from the device Flask server and save to data/ as JSON.
+ Reads MIPI DSI PHY timing registers via memtool on the target.
+ Non-fatal — a failed fetch prints a warning and returns without crashing.
+ """
+ try:
+ resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
+ resp.raise_for_status()
+ data = resp.json()
+ if data.get("errors"):
+ print(f" REGISTERS: device warnings — {data['errors']}")
+ DATA_DIR.mkdir(exist_ok=True)
+ reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
+ reg_path.write_text(json.dumps(data, indent=2))
+ n = len(data.get("registers", []))
+ print(f" SAVED: {reg_path.name} ({n} registers)")
+ except requests.exceptions.RequestException as e:
+ print(f" REGISTERS: fetch failed — {e}")
+ except Exception as e:
+ print(f" REGISTERS: error — {e}")
+
+
def dual_capture(iteration):
"""
- Two-pass capture per test iteration:
- Pass 1 — signal quality (SIG_SCALE / SIG_POINTS)
- Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS)
- Restores the original 5 ns/div timebase when done.
+ Three-pass capture per test iteration. LP is captured FIRST so it catches
+ the SoT transition at pipeline startup — the moment flicker can occur.
+ HS quality and frame structure passes follow once the link is stable.
+
+ Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON)
+ Pass 2 — signal quality (HS differential, rise/fall)
+ Pass 3 — frame structure (HS differential, jitter/freq)
"""
capture_done.clear()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
- print(f"DUAL CAPTURE #{iteration:04d} [{ts}]")
+ print(f"CAPTURE #{iteration:04d} [{ts}]")
- # ── Pass 1: signal quality ─────────────────────────────────────────────
- print(" PASS 1: SIGNAL QUALITY...")
- _set_timebase(SIG_SCALE, SIG_POINTS)
- if _arm_and_wait():
- _save_pass("sig", iteration, ts)
- else:
- print(" SKIPPING PASS 1 SAVE.")
-
- # ── Pass 2: frame/protocol structure ──────────────────────────────────
- print(" PASS 2: FRAME STRUCTURE...")
- _set_timebase(PROTO_SCALE, PROTO_POINTS)
- if _arm_and_wait():
- _save_pass("proto", iteration, ts)
- else:
- print(" SKIPPING PASS 2 SAVE.")
-
- # ── Pass 3: LP / SoT structure + 1.8 V supply monitoring ─────────────
- # Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers
- # on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended.
- # Rigol is armed first (non-blocking) so the LP→HS current step droops
- # the 1.8 V rail and triggers the Rigol while the Agilent captures.
- print(" PASS 3: LP TRANSITION...")
+ # ── Pass 1: LP / SoT startup transition ───────────────────────────────
+ # Fired immediately after display ON (test_worker has no settle delay).
+ # Catches the first LP-11 → LP-01 → LP-00 → HS SoT sequence, which is
+ # where violations causing screen flicker occur.
+ print(" PASS 1: LP STARTUP TRANSITION...")
_configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected():
- rigol_scope.arm() # arm Rigol before LP trigger so it catches the droop
+ rigol_scope.arm() # arm before Agilent so 1.8 V droop is captured
if _arm_and_wait(timeout=30):
_save_pass_channels("lp", iteration, ts)
else:
- print(" SKIPPING PASS 3 SAVE.")
+ print(" SKIPPING LP SAVE.")
- # Collect Rigol 1.8 V waveform.
- # The Agilent LP acquire + save takes ~35 s, so the Rigol will have
- # long since auto-captured by now. read_waveform_csv() sends :STOP
- # before reading to guarantee the acquisition is finalised.
if rigol_scope.is_connected():
DATA_DIR.mkdir(exist_ok=True)
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
@@ -337,6 +344,27 @@ def dual_capture(iteration):
_restore_hs_config()
+ # ── Pass 2: HS signal quality ──────────────────────────────────────────
+ # LP pass takes ~5–10 s total; the HS link is fully settled by now.
+ print(" PASS 2: SIGNAL QUALITY...")
+ _set_timebase(SIG_SCALE, SIG_POINTS)
+ if _arm_and_wait():
+ _save_pass("sig", iteration, ts)
+ else:
+ print(" SKIPPING SIG SAVE.")
+
+ # ── Pass 3: frame/protocol structure ──────────────────────────────────
+ print(" PASS 3: FRAME STRUCTURE...")
+ _set_timebase(PROTO_SCALE, PROTO_POINTS)
+ if _arm_and_wait():
+ _save_pass("proto", iteration, ts)
+ else:
+ print(" SKIPPING PROTO SAVE.")
+
+ # ── Fetch DSI register snapshot from device ───────────────────────────
+ # Display is still ON here; registers reflect the active pipeline state.
+ _fetch_registers(ts, iteration)
+
# ── Restore original timebase ─────────────────────────────────────────
_set_timebase(5e-9, 500_000)
scope.write(":RUN")
@@ -364,7 +392,7 @@ def mgmt_worker():
print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.")
if copied > 0:
try:
- analyze_captures.run_analysis()
+ analyze_captures.run_analysis(last=30)
except Exception as e:
print(f"[MGMT] ANALYSIS ERROR: {e}")
except Exception as e:
@@ -394,7 +422,7 @@ def test_worker():
requests.put(URL, json={"state": "on"}, timeout=2)
except requests.exceptions.RequestException as e:
print(f" WARNING: display ON failed: {e}")
- time.sleep(DISPLAY_SETTLE_S)
+ # No settle delay — LP pass fires immediately to catch startup SoT transition
dual_capture(count)
count += 1
try: