This commit is contained in:
david rice
2026-04-09 10:29:53 +01:00
parent be7658b54d
commit 82e6efbcad
7 changed files with 488 additions and 71 deletions

View File

@@ -22,8 +22,8 @@ from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env") load_dotenv(Path(__file__).parent / ".env")
from csv_preprocessor import ( from csv_preprocessor import (
analyze_file, analyze_lp_file, analyze_1v8_file, analyze_file, analyze_lp_file, analyze_1v8_file, analyze_reg_file,
group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, RegDump,
) )
DATA_DIR = Path(__file__).parent / "data" DATA_DIR = Path(__file__).parent / "data"
@@ -33,14 +33,20 @@ CLAUDE_MODEL = "claude-opus-4-6"
SYSTEM_PROMPT = ( SYSTEM_PROMPT = (
"You are an expert in MIPI D-PHY signal integrity analysis. " "You are an expert in MIPI D-PHY signal integrity analysis. "
"You will be given compact pre-processed summaries of oscilloscope captures " "You will be given compact pre-processed summaries of oscilloscope captures "
"from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements. " "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements "
"The MIPI PHY (NXP i.MX 8M Mini) drives LP states from the 1.8 V VDDIO. " "and DSI controller register snapshots (Samsung DSIM IP on NXP i.MX 8M Mini). "
"Each capture has up to four data sets: " "The MIPI PHY drives LP states from the 1.8 V VDDIO. "
"Each capture has up to five data sets: "
"sig (high-res HS quality), proto (long-window HS stats), " "sig (high-res HS quality), proto (long-window HS stats), "
"lp (single-ended LP-11/LP-00/HS burst including SoT sequence), " "lp (single-ended LP-11/LP-00/HS burst including SoT sequence), "
"and pwr (1.8 V supply captured during the LP→HS transition). " "pwr (1.8 V supply captured during the LP→HS transition), "
"and reg (DSIM register snapshot — DSIM_PHYTIMING at 0x32e100b4, "
"DSIM_PHYTIMING1 at 0xb8, DSIM_PHYTIMING2 at 0xbc control LP state durations "
"and PHY clock timing; DSIM_CLKCTRL at 0x08 and DSIM_ESCMODE at 0x14 affect "
"LP escape mode and HS entry sequencing). "
"Analyse the data for trends, degradation, anomalies, or consistent spec concerns " "Analyse the data for trends, degradation, anomalies, or consistent spec concerns "
"across captures. Be concise and actionable." "across captures. Correlate register values with observed LP timing violations. "
"Be concise and actionable."
) )
@@ -48,24 +54,68 @@ SYSTEM_PROMPT = (
# Helpers # Helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
FLICKER_LOG = Path(__file__).parent / "reports" / "flicker_log.csv"
def _classify_flicker(keys: list, flicker_suspects: list) -> tuple[int, int]:
"""
Count flicker events (display sessions that flickered) vs total sessions in this batch.
Each test iteration is one complete display load/unload session. Flicker is
per-session: it occurs at pipeline load, persists for that session only, then
clears automatically on the next load. A single suspect capture IS a genuine
flicker event — not a measurement artifact — because the LP pass fires at startup.
Returns (flicker_sessions, total_sessions).
"""
if not flicker_suspects:
return 0, len(keys)
# Count unique capture numbers that had at least one flicker suspect
suspect_sessions = {m.capture_num for m in flicker_suspects}
return len(suspect_sessions), len(keys)
def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None:
"""Append a flicker suspect entry to the persistent flicker log."""
FLICKER_LOG.parent.mkdir(exist_ok=True)
write_header = not FLICKER_LOG.exists()
with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
import csv as _csv
w = _csv.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
"lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
w.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
ts, f"{num:04d}", m.channel,
m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
])
def process_capture( def process_capture(
ts: str, ts: str,
num: int, num: int,
files: dict[str, Path], files: dict[str, Path],
verbose: bool = False, verbose: bool = False,
) -> tuple[str, list[ChannelMetrics]]: ) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]:
""" """
Run the pre-processor on all CSV files for one capture. Run the pre-processor on all CSV files for one capture.
Returns (text_summary, list_of_metrics). Returns (text_summary, metrics_list, flicker_suspects, reg_dumps).
Missing files produce a one-line note instead of crashing. Missing files produce a one-line note instead of crashing.
""" """
lines = [f"=== Capture {num:04d} {ts} ==="] lines = [f"=== Capture {num:04d} {ts} ==="]
metrics_list: list[ChannelMetrics | LPMetrics] = [] metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = []
flicker_suspects: list[LPMetrics] = []
reg_dumps: list[RegDump] = []
for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8"): for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat",
"pwr_1v8", "reg"):
if key not in files: if key not in files:
if key == "pwr_1v8": if key == "pwr_1v8":
lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)")
elif key == "reg":
lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)")
else: else:
lines.append(f" [{key}] MISSING") lines.append(f" [{key}] MISSING")
continue continue
@@ -74,27 +124,65 @@ def process_capture(
m = analyze_lp_file(files[key]) m = analyze_lp_file(files[key])
elif key == "pwr_1v8": elif key == "pwr_1v8":
m = analyze_1v8_file(files[key]) m = analyze_1v8_file(files[key])
elif key == "reg":
m = analyze_reg_file(files[key])
else: else:
m = analyze_file(files[key]) m = analyze_file(files[key])
lines.append(m.summary()) lines.append(m.summary())
metrics_list.append(m) metrics_list.append(m)
if verbose: if verbose:
print(m.summary()) print(m.summary())
# Real-time flicker detection — log and alert immediately
if isinstance(m, LPMetrics) and m.flicker_suspect:
flicker_suspects.append(m)
_log_flicker_event(ts, num, m)
print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] "
f"lp_low={m.lp_low_duration_ns} ns ***\n")
if isinstance(m, RegDump):
reg_dumps.append(m)
except Exception as exc: except Exception as exc:
lines.append(f" [{key}] ERROR: {exc}") lines.append(f" [{key}] ERROR: {exc}")
return "\n".join(lines), metrics_list return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps
def build_prompt(all_summaries: list[str]) -> str: def build_prompt(all_summaries: list[str], flicker_suspects: list = None,
flicker_count: int = 0, total_sessions: int = 0) -> str:
body = "\n\n".join(all_summaries) body = "\n\n".join(all_summaries)
flicker_section = ""
if flicker_suspects and flicker_count > 0:
items = "\n".join(
f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} "
f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns "
f"lp11_v={m.lp11_voltage_v} V"
for m in flicker_suspects
)
rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)"
flicker_section = (
f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n"
f"Affected captures:\n{items}\n"
"Each capture is one complete display pipeline load/unload cycle. Flicker is "
"per-session: it occurs at pipeline load and persists for that session only, then "
"clears automatically on the next load. A flagged capture therefore represents a "
"genuine flicker event, not a measurement artifact.\n"
"LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief "
"for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it "
"to drop a frame and produce visible flicker.\n"
"Focus your analysis on WHY the SoT sequence is being violated at pipeline startup "
"and what register setting, supply condition, or hardware change would prevent it.\n"
)
return ( return (
"Below are pre-processed summaries of MIPI D-PHY captures. " "Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore "
"Each capture has three passes per lane (CLK and DAT0):\n" "8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge. "
"The system occasionally flickers at display pipeline load. "
"Each capture has up to four data sets per lane (CLK and DAT0):\n"
" sig — high-res HS differential (rise/fall times)\n" " sig — high-res HS differential (rise/fall times)\n"
" proto — long-window HS differential (jitter, clock freq, amplitude)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n"
" lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n" " lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n"
" pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n\n" " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n"
f"{flicker_section}\n"
f"{body}\n\n" f"{body}\n\n"
"Please:\n" "Please:\n"
"1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n"
@@ -106,11 +194,15 @@ def build_prompt(all_summaries: list[str]) -> str:
"5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause "
" (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n"
"6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n"
"7. Summarise overall signal health in 23 sentences." "7. Summarise overall signal health and flicker risk in 23 sentences."
) )
def save_html_report(analysis: str, token_line: str, keys: list) -> Path: def save_html_report(analysis: str, token_line: str, keys: list,
flicker_suspects: list = None,
flicker_count: int = 0,
total_sessions: int = 0,
all_reg_dumps: list = None) -> Path:
"""Write a timestamped HTML report to the reports/ directory.""" """Write a timestamped HTML report to the reports/ directory."""
REPORTS_DIR.mkdir(exist_ok=True) REPORTS_DIR.mkdir(exist_ok=True)
now = datetime.now() now = datetime.now()
@@ -148,6 +240,71 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path:
body_html = text_to_html(analysis) body_html = text_to_html(analysis)
flicker_banner = ""
if flicker_suspects and flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
rate_str = f"{flicker_count} of {total_sessions} display load sessions ({rate_pct:.0f}%)"
rows = "".join(
f"<tr><td>{m.capture_num:04d}</td><td>{m.timestamp}</td><td>{m.channel}</td>"
f"<td style='color:red'>{m.lp_low_duration_ns} ns</td>"
f"<td>{m.lp11_to_hs_ns} ns</td><td>{m.lp11_voltage_v} V</td></tr>"
for m in flicker_suspects
)
flicker_banner = f"""
<div style="background:#fff3cd;border:2px solid #e65100;border-radius:6px;
padding:16px 20px;margin-bottom:28px;">
<h2 style="color:#e65100;margin-top:0">&#9888; FLICKER DETECTED &mdash; {rate_str} flickered</h2>
<p>Each flagged capture is a genuine flicker event (not an artifact) — the LP pass fires at
pipeline startup, so a missing or sub-50&nbsp;ns LP-low plateau means the SN65DSI83 bridge
missed the SoT sequence and dropped a frame.<br>
LP-low plateau &lt; 50&nbsp;ns means the LP-01/LP-00 SoT states are absent or too brief
for the SN65DSI83 bridge to detect start-of-transmission.</p>
<table>
<tr><th>Capture</th><th>Timestamp</th><th>Channel</th>
<th>LP-low plateau</th><th>LP exit&rarr;HS</th><th>LP-11 voltage</th></tr>
{rows}
</table>
</div>"""
# --- Register table (collapsible) ---
reg_section = ""
if all_reg_dumps:
# Collect all unique addresses in order they first appear
addr_order = []
addr_names = {}
for rd in all_reg_dumps:
for r in rd.registers:
if r["address"] not in addr_names:
addr_order.append(r["address"])
addr_names[r["address"]] = r.get("name", "")
if addr_order:
header_cells = "".join(
f"<th>{html.escape(addr)}<br><small>{html.escape(addr_names[addr])}</small></th>"
for addr in addr_order
)
rows_html = ""
for rd in all_reg_dumps:
reg_map = {r["address"]: r["value"] for r in rd.registers}
cells = "".join(
f"<td>{html.escape(reg_map.get(addr, ''))}</td>"
for addr in addr_order
)
rows_html += f"<tr><td>{rd.capture_num:04d}</td><td>{rd.timestamp}</td>{cells}</tr>"
reg_section = f"""
<details style="margin-bottom:24px;">
<summary style="cursor:pointer;font-weight:bold;color:#1a3a5c;font-size:1.05em;">
DSI Register Snapshots ({len(all_reg_dumps)} captures)
</summary>
<div style="overflow-x:auto;margin-top:8px;">
<table>
<tr><th>Capture</th><th>Timestamp</th>{header_cells}</tr>
{rows_html}
</table>
</div>
</details>"""
html_content = f"""<!DOCTYPE html> html_content = f"""<!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
@@ -161,11 +318,22 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path:
ol, ul {{ line-height: 1.8; padding-left: 24px; }} ol, ul {{ line-height: 1.8; padding-left: 24px; }}
li {{ margin: 4px 0; }} li {{ margin: 4px 0; }}
.tokens {{ color: #888; font-size: 0.8em; margin-top: 32px; border-top: 1px solid #ddd; padding-top: 8px; }} .tokens {{ color: #888; font-size: 0.8em; margin-top: 32px; border-top: 1px solid #ddd; padding-top: 8px; }}
.flicker-alert {{ background: #fff3cd; border: 2px solid #e65100; border-radius: 6px;
padding: 16px 20px; margin-bottom: 28px; }}
.flicker-alert h2 {{ color: #e65100; margin-top: 0; }}
.flicker-alert table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }}
.flicker-alert th {{ background: #e65100; color: white; padding: 6px 10px; text-align: left; }}
.flicker-alert td {{ border: 1px solid #ccc; padding: 5px 10px; }}
table {{ border-collapse: collapse; width: 100%; }}
th {{ background: #1a3a5c; color: white; padding: 6px 10px; text-align: left; }}
td {{ border: 1px solid #ddd; padding: 5px 10px; }}
@media print {{ body {{ margin: 20px; }} }} @media print {{ body {{ margin: 20px; }} }}
</style> </style>
</head> </head>
<body> <body>
<h1>MIPI D-PHY Analysis Report</h1> <h1>MIPI D-PHY Analysis Report</h1>
{flicker_banner}
{reg_section}
<p class="meta"> <p class="meta">
<strong>Generated:</strong> {date_str} &nbsp;|&nbsp; <strong>Generated:</strong> {date_str} &nbsp;|&nbsp;
<strong>Scope:</strong> {cap_range} &nbsp;|&nbsp; <strong>Scope:</strong> {cap_range} &nbsp;|&nbsp;
@@ -198,17 +366,22 @@ def run_analysis(last: int = 10) -> None:
print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...")
all_summaries: list[str] = [] all_summaries: list[str] = []
all_flicker_suspects: list[LPMetrics] = []
all_reg_dumps: list[RegDump] = []
for ts, num in keys: for ts, num in keys:
summary_text, _ = process_capture(ts, num, groups[(ts, num)]) summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)])
all_summaries.append(summary_text) all_summaries.append(summary_text)
all_flicker_suspects.extend(suspects)
all_reg_dumps.extend(reg_dumps)
prompt = build_prompt(all_summaries) flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...")
client = anthropic.Anthropic() client = anthropic.Anthropic()
message = client.messages.create( message = client.messages.create(
model = CLAUDE_MODEL, model = CLAUDE_MODEL,
max_tokens = 3072, max_tokens = 4096,
system = SYSTEM_PROMPT, system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}], messages = [{"role": "user", "content": prompt}],
) )
@@ -225,8 +398,14 @@ def run_analysis(last: int = 10) -> None:
print(separator + "\n") print(separator + "\n")
# ── HTML report ─────────────────────────────────────────────────────── # ── HTML report ───────────────────────────────────────────────────────
report_path = save_html_report(analysis, token_line, keys) report_path = save_html_report(analysis, token_line, keys,
all_flicker_suspects, flicker_count, total_sessions,
all_reg_dumps)
print(f"[ANALYSIS] Report saved to {report_path}") print(f"[ANALYSIS] Report saved to {report_path}")
if flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***")
def main() -> None: def main() -> None:
@@ -263,14 +442,21 @@ def main() -> None:
# --- Run pre-processor --- # --- Run pre-processor ---
all_summaries: list[str] = [] all_summaries: list[str] = []
all_flicker_suspects: list[LPMetrics] = []
all_reg_dumps: list[RegDump] = []
for ts, num in keys: for ts, num in keys:
summary_text, _ = process_capture(ts, num, groups[(ts, num)], verbose=args.verbose) summary_text, _, suspects, reg_dumps = process_capture(
ts, num, groups[(ts, num)], verbose=args.verbose)
all_summaries.append(summary_text) all_summaries.append(summary_text)
all_flicker_suspects.extend(suspects)
all_reg_dumps.extend(reg_dumps)
if not args.verbose: if not args.verbose:
print(f" Processed capture {num:04d} {ts}") flag = " *** FLICKER SUSPECT ***" if suspects else ""
print(f" Processed capture {num:04d} {ts}{flag}")
# --- Build Claude prompt --- # --- Build Claude prompt ---
prompt = build_prompt(all_summaries) flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
if args.dry_run: if args.dry_run:
print("\n--- Prompt that would be sent to Claude ---") print("\n--- Prompt that would be sent to Claude ---")
@@ -282,7 +468,7 @@ def main() -> None:
client = anthropic.Anthropic() client = anthropic.Anthropic()
message = client.messages.create( message = client.messages.create(
model = CLAUDE_MODEL, model = CLAUDE_MODEL,
max_tokens = 3072, max_tokens = 4096,
system = SYSTEM_PROMPT, system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}], messages = [{"role": "user", "content": prompt}],
) )
@@ -297,8 +483,14 @@ def main() -> None:
print(separator) print(separator)
# HTML report # HTML report
report_path = save_html_report(analysis, token_line, keys) report_path = save_html_report(analysis, token_line, keys,
all_flicker_suspects, flicker_count, total_sessions,
all_reg_dumps)
print(f"\nReport saved to {report_path}") print(f"\nReport saved to {report_path}")
if flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -16,6 +16,7 @@ File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv
""" """
import csv import csv
import json
import re import re
import numpy as np import numpy as np
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -42,10 +43,14 @@ LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (bot
LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low
# Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset # Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset
# The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low. # The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low.
LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
# Flicker detection threshold
# LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk
FLICKER_LP_LOW_MAX_NS = 50.0 # ns
@dataclass @dataclass
@@ -420,6 +425,46 @@ def analyze_1v8_file(path: Path) -> "V1V8Metrics":
) )
@dataclass
class RegDump:
"""DSI controller register snapshot read from device via memtool."""
timestamp: str
capture_num: int
commands: list # list of memtool command strings that were run
registers: list # [{"address": "0x...", "value": "0x...", "name": "..."}, ...]
errors: list # any device-side errors
def summary(self) -> str:
lines = [f"Capture {self.capture_num:04d} {self.timestamp} [reg/dsi_phy]"]
if self.errors:
for err in self.errors:
lines.append(f" WARNING: {err}")
if not self.registers:
lines.append(" No registers captured")
return "\n".join(lines)
lines.append(f" Commands : {'; '.join(self.commands)}")
for r in self.registers:
name = f" ({r['name']})" if r.get("name") else ""
lines.append(f" {r['address']} : {r['value']}{name}")
return "\n".join(lines)
def analyze_reg_file(path: Path) -> "RegDump":
"""Read a register JSON file saved by mipi_test._fetch_registers()."""
m = re.match(r"(\d{8}_\d{6})_reg_(\d+)\.json", path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match register pattern: {path.name}")
timestamp, cap_str = m.groups()
data = json.loads(path.read_text())
return RegDump(
timestamp = timestamp,
capture_num = int(cap_str),
commands = data.get("commands", []),
registers = data.get("registers", []),
errors = data.get("errors") or [],
)
def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
""" """
Scan data_dir and group CSV files by (timestamp, capture_number). Scan data_dir and group CSV files by (timestamp, capture_number).
@@ -427,17 +472,30 @@ def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
Example key: ("20260408_111448", 1) Example key: ("20260408_111448", 1)
Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...} Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...}
""" """
pattern = re.compile( csv_pattern = re.compile(
r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE
) )
reg_pattern = re.compile(
r"(\d{8}_\d{6})_reg_(\d+)\.json", re.IGNORECASE
)
groups: dict[tuple[str, int], dict[str, Path]] = {} groups: dict[tuple[str, int], dict[str, Path]] = {}
for f in sorted(data_dir.glob("*.csv")): for f in sorted(data_dir.glob("*.csv")):
m = pattern.match(f.name) m = csv_pattern.match(f.name)
if not m: if not m:
continue continue
ts, ftype, cap_str, ch = m.groups() ts, ftype, cap_str, ch = m.groups()
key = (ts, int(cap_str)) key = (ts, int(cap_str))
groups.setdefault(key, {})[f"{ftype}_{ch}"] = f groups.setdefault(key, {})[f"{ftype}_{ch}"] = f
for f in sorted(data_dir.glob("*.json")):
m = reg_pattern.match(f.name)
if not m:
continue
ts, cap_str = m.groups()
key = (ts, int(cap_str))
groups.setdefault(key, {})["reg"] = f
return groups return groups
@@ -470,6 +528,11 @@ class LPMetrics:
lp_transition_valid: bool # LP-11 → LP-low → HS sequence present lp_transition_valid: bool # LP-11 → LP-low → HS sequence present
# Flicker detection
# A capture is flagged when the LP-low plateau is absent or shorter than
# FLICKER_LP_LOW_MAX_NS. Normal captures show ~340 ns; flicker shows 050 ns.
flicker_suspect: bool = False
warnings: list = field(default_factory=list) warnings: list = field(default_factory=list)
def summary(self) -> str: def summary(self) -> str:
@@ -501,6 +564,8 @@ class LPMetrics:
+ (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else "")) + (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else ""))
if self.hs_amplitude_mv is not None: if self.hs_amplitude_mv is not None:
lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)") lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)")
if self.flicker_suspect:
lines.append(f" *** FLICKER SUSPECT: LP-low plateau absent or < {FLICKER_LP_LOW_MAX_NS:.0f} ns ***")
for w in self.warnings: for w in self.warnings:
lines.append(f" WARNING: {w}") lines.append(f" WARNING: {w}")
return "\n".join(lines) return "\n".join(lines)
@@ -648,6 +713,15 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
if n_hs_bursts == 0: if n_hs_bursts == 0:
warnings.append("No HS bursts detected after LP transition") warnings.append("No HS bursts detected after LP transition")
# Flicker suspect: LP→HS sequence detected but LP-low plateau is absent or too short.
# Normal captures show ~340 ns; the confirmed flicker capture showed 0 ns.
# Only flag DAT lane (CLK is continuous HS — LP states not expected).
flicker_suspect = (
channel == "dat"
and lp_transition_valid
and (lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS)
)
return LPMetrics( return LPMetrics(
timestamp = timestamp, timestamp = timestamp,
capture_num = capture_num, capture_num = capture_num,
@@ -663,6 +737,7 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
hs_burst_dur_ns = hs_burst_dur_ns, hs_burst_dur_ns = hs_burst_dur_ns,
hs_amplitude_mv = hs_amplitude_mv, hs_amplitude_mv = hs_amplitude_mv,
lp_transition_valid = lp_transition_valid, lp_transition_valid = lp_transition_valid,
flicker_suspect = flicker_suspect,
warnings = warnings, warnings = warnings,
) )

122
device_server.py Normal file
View File

@@ -0,0 +1,122 @@
"""
device_server.py — deploy this on the target device (192.168.45.8)
Provides:
PUT /display {"state": "on"|"off"} — blank/unblank framebuffer
GET /registers — read MIPI DSI PHY registers via memtool
Add addresses to REGISTER_COMMANDS to capture more register ranges.
"""
import os
import re
import subprocess
from flask import Flask, jsonify, request
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Register commands to execute on each GET /registers request.
# Each entry is a complete memtool command string.
# ---------------------------------------------------------------------------
REGISTER_COMMANDS = [
"memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2
]
# Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini)
_DSIM_NAMES = {
0x32e10004: "DSIM_STATUS",
0x32e10008: "DSIM_CLKCTRL",
0x32e1000c: "DSIM_TIMEOUT",
0x32e10010: "DSIM_CONFIG",
0x32e10014: "DSIM_ESCMODE",
0x32e100ac: "DSIM_PHYACCHR",
0x32e100b0: "DSIM_PHYACCHR1",
0x32e100b4: "DSIM_PHYTIMING",
0x32e100b8: "DSIM_PHYTIMING1",
0x32e100bc: "DSIM_PHYTIMING2",
}
def _parse_memtool_output(raw: str) -> list:
"""
Parse 'memtool md -l' output into a list of dicts.
Handles both formats:
32e100b4: 00000001 12345678 ...
0x32e100b4: 0x00000001 0x12345678 ...
"""
registers = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"(?:0x)?([0-9a-fA-F]+)\s*:\s*(.+)", line)
if not m:
continue
base_addr = int(m.group(1), 16)
values = re.findall(r"[0-9a-fA-F]{8}", m.group(2))
for i, val in enumerate(values):
addr = base_addr + i * 4
registers.append({
"address": f"0x{addr:08x}",
"value": f"0x{val.lower()}",
"name": _DSIM_NAMES.get(addr, ""),
})
return registers
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/display", methods=["PUT"])
def control_display():
data = request.get_json()
state = data.get("state", "").lower()
if state == "off":
os.system("echo 4 > /sys/class/graphics/fb0/blank")
return jsonify({"status": "Display OFF"}), 200
elif state == "on":
os.system("echo 0 > /sys/class/graphics/fb0/blank")
return jsonify({"status": "Display ON"}), 200
else:
return jsonify({"error": "Invalid state. Use 'on' or 'off'"}), 400
@app.route("/registers", methods=["GET"])
def get_registers():
"""Read MIPI DSI PHY timing registers via memtool and return JSON."""
all_registers = []
raw_lines = []
errors = []
for cmd_str in REGISTER_COMMANDS:
try:
result = subprocess.run(
cmd_str.split(), capture_output=True, text=True, timeout=5
)
raw = result.stdout.strip()
if raw:
raw_lines.append(raw)
all_registers.extend(_parse_memtool_output(raw))
if result.returncode != 0 and result.stderr.strip():
errors.append(f"{cmd_str}: {result.stderr.strip()}")
except FileNotFoundError:
errors.append(f"{cmd_str}: memtool not found in PATH")
except subprocess.TimeoutExpired:
errors.append(f"{cmd_str}: timed out after 5 s")
except Exception as e:
errors.append(f"{cmd_str}: {e}")
return jsonify({
"commands": REGISTER_COMMANDS,
"registers": all_registers,
"raw": "\n".join(raw_lines),
"errors": errors if errors else None,
}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

View File

@@ -7,6 +7,7 @@ VERSION: 0.3
AUTHOR: D. RICE 25/03/2026 AUTHOR: D. RICE 25/03/2026
© 2026 ARRIVE © 2026 ARRIVE
""" """
import json
import vxi11 import vxi11
import time import time
import sys import sys
@@ -19,10 +20,11 @@ import analyze_captures
import rigol_scope import rigol_scope
# --- Configuration --- # --- Configuration ---
URL = "http://192.168.45.8:5000/display" DEVICE_BASE = "http://192.168.45.8:5000"
URL = f"{DEVICE_BASE}/display"
SCOPE_IP = "192.168.45.4" SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3" PSU_IP = "192.168.45.3"
MGMT_INTERVAL = 60 # seconds between management runs (set to 3600 for hourly) MGMT_INTERVAL = 3600 # seconds between management runs (3600 = 1 hour)
# --- Capture settings --- # --- Capture settings ---
# Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit) # Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit)
@@ -278,54 +280,59 @@ def _restore_hs_config():
time.sleep(0.1) time.sleep(0.1)
def _fetch_registers(ts: str, iteration: int) -> None:
"""
GET /registers from the device Flask server and save to data/ as JSON.
Reads MIPI DSI PHY timing registers via memtool on the target.
Non-fatal — a failed fetch prints a warning and returns without crashing.
"""
try:
resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5)
resp.raise_for_status()
data = resp.json()
if data.get("errors"):
print(f" REGISTERS: device warnings — {data['errors']}")
DATA_DIR.mkdir(exist_ok=True)
reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json"
reg_path.write_text(json.dumps(data, indent=2))
n = len(data.get("registers", []))
print(f" SAVED: {reg_path.name} ({n} registers)")
except requests.exceptions.RequestException as e:
print(f" REGISTERS: fetch failed — {e}")
except Exception as e:
print(f" REGISTERS: error — {e}")
def dual_capture(iteration): def dual_capture(iteration):
""" """
Two-pass capture per test iteration: Three-pass capture per test iteration. LP is captured FIRST so it catches
Pass 1 — signal quality (SIG_SCALE / SIG_POINTS) the SoT transition at pipeline startup — the moment flicker can occur.
Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS) HS quality and frame structure passes follow once the link is stable.
Restores the original 5 ns/div timebase when done.
Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON)
Pass 2 — signal quality (HS differential, rise/fall)
Pass 3 — frame structure (HS differential, jitter/freq)
""" """
capture_done.clear() capture_done.clear()
ts = datetime.now().strftime("%Y%m%d_%H%M%S") ts = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"DUAL CAPTURE #{iteration:04d} [{ts}]") print(f"CAPTURE #{iteration:04d} [{ts}]")
# ── Pass 1: signal quality ───────────────────────────────────────────── # ── Pass 1: LP / SoT startup transition ───────────────────────────────
print(" PASS 1: SIGNAL QUALITY...") # Fired immediately after display ON (test_worker has no settle delay).
_set_timebase(SIG_SCALE, SIG_POINTS) # Catches the first LP-11 → LP-01 → LP-00 → HS SoT sequence, which is
if _arm_and_wait(): # where violations causing screen flicker occur.
_save_pass("sig", iteration, ts) print(" PASS 1: LP STARTUP TRANSITION...")
else:
print(" SKIPPING PASS 1 SAVE.")
# ── Pass 2: frame/protocol structure ──────────────────────────────────
print(" PASS 2: FRAME STRUCTURE...")
_set_timebase(PROTO_SCALE, PROTO_POINTS)
if _arm_and_wait():
_save_pass("proto", iteration, ts)
else:
print(" SKIPPING PASS 2 SAVE.")
# ── Pass 3: LP / SoT structure + 1.8 V supply monitoring ─────────────
# Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers
# on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended.
# Rigol is armed first (non-blocking) so the LP→HS current step droops
# the 1.8 V rail and triggers the Rigol while the Agilent captures.
print(" PASS 3: LP TRANSITION...")
_configure_for_lp() _configure_for_lp()
_set_timebase(LP_SCALE, LP_POINTS) _set_timebase(LP_SCALE, LP_POINTS)
if rigol_scope.is_connected(): if rigol_scope.is_connected():
rigol_scope.arm() # arm Rigol before LP trigger so it catches the droop rigol_scope.arm() # arm before Agilent so 1.8 V droop is captured
if _arm_and_wait(timeout=30): if _arm_and_wait(timeout=30):
_save_pass_channels("lp", iteration, ts) _save_pass_channels("lp", iteration, ts)
else: else:
print(" SKIPPING PASS 3 SAVE.") print(" SKIPPING LP SAVE.")
# Collect Rigol 1.8 V waveform.
# The Agilent LP acquire + save takes ~35 s, so the Rigol will have
# long since auto-captured by now. read_waveform_csv() sends :STOP
# before reading to guarantee the acquisition is finalised.
if rigol_scope.is_connected(): if rigol_scope.is_connected():
DATA_DIR.mkdir(exist_ok=True) DATA_DIR.mkdir(exist_ok=True)
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
@@ -337,6 +344,27 @@ def dual_capture(iteration):
_restore_hs_config() _restore_hs_config()
# ── Pass 2: HS signal quality ──────────────────────────────────────────
# LP pass takes ~510 s total; the HS link is fully settled by now.
print(" PASS 2: SIGNAL QUALITY...")
_set_timebase(SIG_SCALE, SIG_POINTS)
if _arm_and_wait():
_save_pass("sig", iteration, ts)
else:
print(" SKIPPING SIG SAVE.")
# ── Pass 3: frame/protocol structure ──────────────────────────────────
print(" PASS 3: FRAME STRUCTURE...")
_set_timebase(PROTO_SCALE, PROTO_POINTS)
if _arm_and_wait():
_save_pass("proto", iteration, ts)
else:
print(" SKIPPING PROTO SAVE.")
# ── Fetch DSI register snapshot from device ───────────────────────────
# Display is still ON here; registers reflect the active pipeline state.
_fetch_registers(ts, iteration)
# ── Restore original timebase ───────────────────────────────────────── # ── Restore original timebase ─────────────────────────────────────────
_set_timebase(5e-9, 500_000) _set_timebase(5e-9, 500_000)
scope.write(":RUN") scope.write(":RUN")
@@ -364,7 +392,7 @@ def mgmt_worker():
print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.") print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.")
if copied > 0: if copied > 0:
try: try:
analyze_captures.run_analysis() analyze_captures.run_analysis(last=30)
except Exception as e: except Exception as e:
print(f"[MGMT] ANALYSIS ERROR: {e}") print(f"[MGMT] ANALYSIS ERROR: {e}")
except Exception as e: except Exception as e:
@@ -394,7 +422,7 @@ def test_worker():
requests.put(URL, json={"state": "on"}, timeout=2) requests.put(URL, json={"state": "on"}, timeout=2)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f" WARNING: display ON failed: {e}") print(f" WARNING: display ON failed: {e}")
time.sleep(DISPLAY_SETTLE_S) # No settle delay — LP pass fires immediately to catch startup SoT transition
dual_capture(count) dual_capture(count)
count += 1 count += 1
try: try: