From 82e6efbcad3fff684aa4dff7caf90a71facf5d33 Mon Sep 17 00:00:00 2001 From: david rice Date: Thu, 9 Apr 2026 10:29:53 +0100 Subject: [PATCH] Updates --- __pycache__/analyze_captures.cpython-312.pyc | Bin 14516 -> 14516 bytes __pycache__/csv_preprocessor.cpython-312.pyc | Bin 32841 -> 33060 bytes __pycache__/rigol_scope.cpython-312.pyc | Bin 8604 -> 8272 bytes analyze_captures.py | 248 ++++++++++++++++--- csv_preprocessor.py | 87 ++++++- device_server.py | 122 +++++++++ mipi_test.py | 102 +++++--- 7 files changed, 488 insertions(+), 71 deletions(-) create mode 100644 device_server.py diff --git a/__pycache__/analyze_captures.cpython-312.pyc b/__pycache__/analyze_captures.cpython-312.pyc index 41b95be3b2d754787dbb36b1fd557ff16019c204..41c16eced3589b43dadba530e1490dad805da55c 100644 GIT binary patch delta 26 gcmdl|xTTQmG%qg~0}$+r-^it+%E+@>U$s>j0BZ{e4FCWD delta 26 gcmdl|xTTQmG%qg~0}y-+-^it+%E+==U$s>j0BuGGGynhq diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc index 38ac48fe93c23f6a2ea6eb0f0cfe02e1b89c2334..7478e7c81761412d834bb71ff092231644b6dd12 100644 GIT binary patch delta 2266 zcmbuAeQZ-z6u{qo?Q6SstK*~Fa2u=Z5FQBG*k>?iuol?Z7_h~S38Q=Kdu3%^+uirN zv9e_dQHBy2+$bPl(U=&;DCC*IM8gn_d>Mvfz$b|gQCI{aN{I0Tz2}t=gZ$@f`|G*q zo_pVU=bm%kiPh|zYgqfKq$DdtuM^($fu-_(`!xq!z^Z>b<{)Dar#wc&S(!vg<)`$R z6f@Yt)y?fiaY;n2EW_XlN>Y5xFe2AY^8Ak89gO(M6;yBDb$|UD)XjC9{A?}bVwA*t zd*m?KfS3km!{tt9rJ;f8L@p+ekx?fLT0}n^w#W6nrFw)qz+k`#IQ2HtI8o`!i%7Aw zijW-j`vbm!6jGeB&=v_wvNI5J##ra1SdtTmJ7s4?z_KLryp0T}mMzqbA)!s;&A6Oy zrEyB2O_DXPBOFv@4f!<#U7{gHLme^%!x_KulVF_S#gZU;eR2nGq~n$JCtoL0nk*#P zHd)qN`W-u)_pI2y;+%zFJ?o;y`d^*d+LyPsqaUq_U9_hSrRVSS9H`h|vG3`F&Bw+J zE;^brXgQuVG_LMTd;O55{(>#BPv|RHUHRLsBw`rn%Jil z7Lbow5y&?D=bPgGkp;yca0lp`oN1hB9RNL`ibOp!8c;_(Wt=MvT0Q4p0%;vZEOEXhgq(6T5(#!W@o`c; zF9V$Ovedof~Nsa0&)ST)K?m^5xPVUJ~w6h_n^5< zp^d6q=&f2%@AW)e+azru;TJyZ0*GOpmCZ@>%~Uk` z>XDUVhk3nR7K0S86p#*)_m#~2OCai5^&mcwUyn{En7Bp$RvgGr9MP%hE8>p!p?Y#U zTFFhOK95y1Ix0DIjoQv7Z){dvc$PXch9Yx#`xdb^wV>{ z_q_McJ@=gZc06x5-EW9HGiFSTz&<(k=i4vh{c+b54JVNPcj6pm(4HdI8EM3mB-OVX z>I9ph#eye%vIT5JLW3}{r%PC2Y!JGTO~@Bi)MWq_vG0I8qDx}+H3Pc>1Fj*4U5JWa ztvjelF{C%SBuf%;Ig_cVwRb8i6~h6arc%^OjX2PyifoG9Or8+4lo>17Pdz~5CaSry zaT^?K9UIGbB>`J4nMfuhHm}&vre4+OWzUL7KJs#* z19gzWLJ2L`ZxrrEW@_@woeFt-MtVG~8qWdL0h-C#88&MtGpSK_t5_j_%}7i3!&E-WBOIcnP}8Z-Y>a;4v_(8 zcH&{U-^YL>fKLIRk>k!;s7V(~QxV!q_OwiiIRHAKoYa-6=zxB*tVFc=K;=8W2CNqu zdSa^-*`J|?fp&R*tOvXTSP0kvC<8p6FVHpuc#a@(cr)O0z!|c+(q77!u?5Dp3{nEGM*Ic{Jghgt z;;}v4IcC`oQrf%qlUI&@E!QS5_gsl^lM47U3e9xOGh0jw!rYFg$CxF1=` zfAP~8Rnfdks~qxaQVbsAw+PCbreJ?$l}q5E5F9nIE)iRykuZcv4W2e?>`=N@=;w?K za%JHp@gg(d)uZZ$5IU$|c-AbUK{DLnFn`Y)5g#X$md;K*$z1R>z$t(oa9UTFW+608 zwlwBW9|7eu107qrz*SjO?vQ7dB7H>+mF=XWj;oJsPd}L!Am{`Nc(0Hjy@IA(^3^((5q30k{c(-IMa3*c@>% z&pV_ZtVf!ItPiDC41>t4=wW`Ko}OGhFaeuPO<2{uG`dZ7xkJoUv4`9aO-0|3%)3fy)t78G(O)eYLfA?Fza)6hN$>kkiP@) zV_-cwveG7Xv9W|Gl2sO=V8@dg??%n+40uC6g# jy6BzhJBzm$zlZns?CKe^J^fW$@oheHD@8xGs>$#V)Py-6 diff --git a/__pycache__/rigol_scope.cpython-312.pyc b/__pycache__/rigol_scope.cpython-312.pyc index 83fd44c0388df10439c428d04487f75bce90661d..e2b592e586a991a159bdfdef3e9733e10df3faa7 100644 GIT binary patch delta 1016 zcmY*YS!+{46rQ^!xmj+S+-%KWk{DB~q>TpBbTPKrn?7i(U==J;dTU82Eu_JvObEWX zfLQ80)a}9d_C@;#EVedLkd_F6Q1QVBDfl3$xFG0^<)(IE=6rL`cfL7u=HADB8%KrL z27?yhAzY|sFWR39mvo}JvrkH$m>QpK?;JXH^4Pe)J-U2D$I@)@EgM?CYq-ZXsc{B_ z=a)kq01JRe!)iYo5`WS;6Pg!Ak9LtFq5u@>g1CbKfV8Y@#v4?!SP#;&p5O+L-NQ5N ziE-$irG+x0Kb8=uf~uUAoPkO_-;uW!>nGyEf+{ zg`Tr6*>>3lj}nJKw#$yC=0(y-JdS{(w&2^r0Ob4%+O@DzcTop*xd2h0aLTR;`ak`U z!iT)#?FXY_0M66+hI)Jx7V054i(efwv-tT)e~Jd8r)2J0Xpr5HSg4UbC~FVtu*%MW zJbGZCrUbNNZ^aFya081Plyvwg5xd&ZO-GFKk{})NEq`=erx0{@8-7uNZSq}g<4OQ> zy1pZud%#p@Cda2DLjEvz=FKE>#ocX`FsM`6I#rkvh;P*$){bCs70;P7=$l&#@|#$_ijC7E(Skm5`^&Jo`>D&81BXkjW{!l03PW#ll9G}qOcj66M-$>ax6 rufNoE40j|SBg=lWjG#Au4>gKD`5)Snl<9u|XMoL<(_e|nlONOH3nW9RKFU%=9s(516*}F+j;uYzQn1VgV7sNV~>GJh&Uhrgmmon)1lZn1wn` z_2in6ut=^NIypV@@{*M4kIu|qd0rCwci%UYIN5nfhITis zA5ta;IS}Z1{N?Ub&LEc0;i_2=q94(u;hSI!-^R{(3}|@TeunF*Xf`Zzgz7Xj8X;Od zh&k7h4LWOsj;iWfkOOk$8k9}&D_1+rvn>3@<~J;q+*elD>hX@mbLTDb1=iWzm*vWR?Oze-$~dG5mQ9 zPVCquHCF80B_1V~qS6-SEW@IV3_*O=W3h*b-STY+h6jMnxDaewtvb>~7LPlDeS8qgxtV2lM zNXkk?xV|7OsxZ_S6;4J`rBATV6aC{lK@Ebe&4@CCY$Z|1GJ=v^T**iZ415112^0FKv5~r%oc}> zkuvw%A#)Y(_+Dv2X18sh5rwHw*}L!D?=Eu#hs-7TkM9fY8hqdnVP5!yzq82;n#ce8 z8UE|Pnx7s4Uq>b)u^{-yGZBq5;Gwf6PLL17R-F!ck>tBzPkb1B&rSs55u(`bKumFj zjroW@l8f=g9#82w$i@M&*UiO7h`kXC;Sxb1=Muxl!$ir>#Y03XMCmv}#Rut9KaJv) z0>kWO53!GNmtDlZi$XZ;Ii?c{aXFPp$T9dhknf#D tuple[int, int]: + """ + Count flicker events (display sessions that flickered) vs total sessions in this batch. + + Each test iteration is one complete display load/unload session. Flicker is + per-session: it occurs at pipeline load, persists for that session only, then + clears automatically on the next load. A single suspect capture IS a genuine + flicker event — not a measurement artifact — because the LP pass fires at startup. + + Returns (flicker_sessions, total_sessions). + """ + if not flicker_suspects: + return 0, len(keys) + + # Count unique capture numbers that had at least one flicker suspect + suspect_sessions = {m.capture_num for m in flicker_suspects} + return len(suspect_sessions), len(keys) + + +def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None: + """Append a flicker suspect entry to the persistent flicker log.""" + FLICKER_LOG.parent.mkdir(exist_ok=True) + write_header = not FLICKER_LOG.exists() + with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f: + import csv as _csv + w = _csv.writer(f) + if write_header: + w.writerow(["logged_at", "capture_ts", "capture_num", "channel", + "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"]) + w.writerow([ + datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + ts, f"{num:04d}", m.channel, + m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v, + ]) + + def process_capture( ts: str, num: int, files: dict[str, Path], verbose: bool = False, -) -> tuple[str, list[ChannelMetrics]]: +) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]: """ Run the pre-processor on all CSV files for one capture. - Returns (text_summary, list_of_metrics). + Returns (text_summary, metrics_list, flicker_suspects, reg_dumps). Missing files produce a one-line note instead of crashing. """ lines = [f"=== Capture {num:04d} {ts} ==="] - metrics_list: list[ChannelMetrics | LPMetrics] = [] + metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = [] + flicker_suspects: list[LPMetrics] = [] + reg_dumps: list[RegDump] = [] - for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8"): + for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", + "pwr_1v8", "reg"): if key not in files: if key == "pwr_1v8": lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") + elif key == "reg": + lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)") else: lines.append(f" [{key}] MISSING") continue @@ -74,27 +124,65 @@ def process_capture( m = analyze_lp_file(files[key]) elif key == "pwr_1v8": m = analyze_1v8_file(files[key]) + elif key == "reg": + m = analyze_reg_file(files[key]) else: m = analyze_file(files[key]) lines.append(m.summary()) metrics_list.append(m) if verbose: print(m.summary()) + # Real-time flicker detection — log and alert immediately + if isinstance(m, LPMetrics) and m.flicker_suspect: + flicker_suspects.append(m) + _log_flicker_event(ts, num, m) + print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] " + f"lp_low={m.lp_low_duration_ns} ns ***\n") + if isinstance(m, RegDump): + reg_dumps.append(m) except Exception as exc: lines.append(f" [{key}] ERROR: {exc}") - return "\n".join(lines), metrics_list + return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps -def build_prompt(all_summaries: list[str]) -> str: +def build_prompt(all_summaries: list[str], flicker_suspects: list = None, + flicker_count: int = 0, total_sessions: int = 0) -> str: body = "\n\n".join(all_summaries) + + flicker_section = "" + if flicker_suspects and flicker_count > 0: + items = "\n".join( + f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} " + f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns " + f"lp11_v={m.lp11_voltage_v} V" + for m in flicker_suspects + ) + rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)" + flicker_section = ( + f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n" + f"Affected captures:\n{items}\n" + "Each capture is one complete display pipeline load/unload cycle. Flicker is " + "per-session: it occurs at pipeline load and persists for that session only, then " + "clears automatically on the next load. A flagged capture therefore represents a " + "genuine flicker event, not a measurement artifact.\n" + "LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief " + "for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it " + "to drop a frame and produce visible flicker.\n" + "Focus your analysis on WHY the SoT sequence is being violated at pipeline startup " + "and what register setting, supply condition, or hardware change would prevent it.\n" + ) + return ( - "Below are pre-processed summaries of MIPI D-PHY captures. " - "Each capture has three passes per lane (CLK and DAT0):\n" + "Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore " + "8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge. " + "The system occasionally flickers at display pipeline load. " + "Each capture has up to four data sets per lane (CLK and DAT0):\n" " sig — high-res HS differential (rise/fall times)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n" - " lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n" - " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n\n" + " lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n" + " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n" + f"{flicker_section}\n" f"{body}\n\n" "Please:\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" @@ -106,11 +194,15 @@ def build_prompt(all_summaries: list[str]) -> str: "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" - "7. Summarise overall signal health in 2–3 sentences." + "7. Summarise overall signal health and flicker risk in 2–3 sentences." ) -def save_html_report(analysis: str, token_line: str, keys: list) -> Path: +def save_html_report(analysis: str, token_line: str, keys: list, + flicker_suspects: list = None, + flicker_count: int = 0, + total_sessions: int = 0, + all_reg_dumps: list = None) -> Path: """Write a timestamped HTML report to the reports/ directory.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() @@ -148,6 +240,71 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path: body_html = text_to_html(analysis) + flicker_banner = "" + if flicker_suspects and flicker_count > 0: + rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 + rate_str = f"{flicker_count} of {total_sessions} display load sessions ({rate_pct:.0f}%)" + rows = "".join( + f"{m.capture_num:04d}{m.timestamp}{m.channel}" + f"{m.lp_low_duration_ns} ns" + f"{m.lp11_to_hs_ns} ns{m.lp11_voltage_v} V" + for m in flicker_suspects + ) + flicker_banner = f""" +
+

⚠ FLICKER DETECTED — {rate_str} flickered

+

Each flagged capture is a genuine flicker event (not an artifact) — the LP pass fires at + pipeline startup, so a missing or sub-50 ns LP-low plateau means the SN65DSI83 bridge + missed the SoT sequence and dropped a frame.
+ LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief + for the SN65DSI83 bridge to detect start-of-transmission.

+ + + + {rows} +
CaptureTimestampChannelLP-low plateauLP exit→HSLP-11 voltage
+
""" + + # --- Register table (collapsible) --- + reg_section = "" + if all_reg_dumps: + # Collect all unique addresses in order they first appear + addr_order = [] + addr_names = {} + for rd in all_reg_dumps: + for r in rd.registers: + if r["address"] not in addr_names: + addr_order.append(r["address"]) + addr_names[r["address"]] = r.get("name", "") + + if addr_order: + header_cells = "".join( + f"{html.escape(addr)}
{html.escape(addr_names[addr])}" + for addr in addr_order + ) + rows_html = "" + for rd in all_reg_dumps: + reg_map = {r["address"]: r["value"] for r in rd.registers} + cells = "".join( + f"{html.escape(reg_map.get(addr, '—'))}" + for addr in addr_order + ) + rows_html += f"{rd.capture_num:04d}{rd.timestamp}{cells}" + + reg_section = f""" +
+ + DSI Register Snapshots ({len(all_reg_dumps)} captures) + +
+ + {header_cells} + {rows_html} +
CaptureTimestamp
+
+
""" + html_content = f""" @@ -161,11 +318,22 @@ def save_html_report(analysis: str, token_line: str, keys: list) -> Path: ol, ul {{ line-height: 1.8; padding-left: 24px; }} li {{ margin: 4px 0; }} .tokens {{ color: #888; font-size: 0.8em; margin-top: 32px; border-top: 1px solid #ddd; padding-top: 8px; }} + .flicker-alert {{ background: #fff3cd; border: 2px solid #e65100; border-radius: 6px; + padding: 16px 20px; margin-bottom: 28px; }} + .flicker-alert h2 {{ color: #e65100; margin-top: 0; }} + .flicker-alert table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }} + .flicker-alert th {{ background: #e65100; color: white; padding: 6px 10px; text-align: left; }} + .flicker-alert td {{ border: 1px solid #ccc; padding: 5px 10px; }} + table {{ border-collapse: collapse; width: 100%; }} + th {{ background: #1a3a5c; color: white; padding: 6px 10px; text-align: left; }} + td {{ border: 1px solid #ddd; padding: 5px 10px; }} @media print {{ body {{ margin: 20px; }} }}

MIPI D-PHY Analysis Report

+{flicker_banner} +{reg_section}

Generated: {date_str}  |  Scope: {cap_range}  |  @@ -198,17 +366,22 @@ def run_analysis(last: int = 10) -> None: print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") all_summaries: list[str] = [] + all_flicker_suspects: list[LPMetrics] = [] + all_reg_dumps: list[RegDump] = [] for ts, num in keys: - summary_text, _ = process_capture(ts, num, groups[(ts, num)]) + summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)]) all_summaries.append(summary_text) + all_flicker_suspects.extend(suspects) + all_reg_dumps.extend(reg_dumps) - prompt = build_prompt(all_summaries) + flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) + prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, - max_tokens = 3072, + max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) @@ -225,8 +398,14 @@ def run_analysis(last: int = 10) -> None: print(separator + "\n") # ── HTML report ─────────────────────────────────────────────────────── - report_path = save_html_report(analysis, token_line, keys) + report_path = save_html_report(analysis, token_line, keys, + all_flicker_suspects, flicker_count, total_sessions, + all_reg_dumps) print(f"[ANALYSIS] Report saved to {report_path}") + if flicker_count > 0: + rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 + print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " + f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***") def main() -> None: @@ -263,14 +442,21 @@ def main() -> None: # --- Run pre-processor --- all_summaries: list[str] = [] + all_flicker_suspects: list[LPMetrics] = [] + all_reg_dumps: list[RegDump] = [] for ts, num in keys: - summary_text, _ = process_capture(ts, num, groups[(ts, num)], verbose=args.verbose) + summary_text, _, suspects, reg_dumps = process_capture( + ts, num, groups[(ts, num)], verbose=args.verbose) all_summaries.append(summary_text) + all_flicker_suspects.extend(suspects) + all_reg_dumps.extend(reg_dumps) if not args.verbose: - print(f" Processed capture {num:04d} {ts}") + flag = " *** FLICKER SUSPECT ***" if suspects else "" + print(f" Processed capture {num:04d} {ts}{flag}") # --- Build Claude prompt --- - prompt = build_prompt(all_summaries) + flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) + prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) if args.dry_run: print("\n--- Prompt that would be sent to Claude ---") @@ -282,7 +468,7 @@ def main() -> None: client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, - max_tokens = 3072, + max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) @@ -297,8 +483,14 @@ def main() -> None: print(separator) # HTML report - report_path = save_html_report(analysis, token_line, keys) + report_path = save_html_report(analysis, token_line, keys, + all_flicker_suspects, flicker_count, total_sessions, + all_reg_dumps) print(f"\nReport saved to {report_path}") + if flicker_count > 0: + rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 + print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " + f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***") if __name__ == "__main__": diff --git a/csv_preprocessor.py b/csv_preprocessor.py index 77f4242..5a11b4c 100644 --- a/csv_preprocessor.py +++ b/csv_preprocessor.py @@ -16,6 +16,7 @@ File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv """ import csv +import json import re import numpy as np from dataclasses import dataclass, field @@ -42,10 +43,14 @@ LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (bot LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low # Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset # The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low. -LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec -LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec -LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) -HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS +LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec +LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec +LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) +HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS + +# Flicker detection threshold +# LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk +FLICKER_LP_LOW_MAX_NS = 50.0 # ns @dataclass @@ -420,6 +425,46 @@ def analyze_1v8_file(path: Path) -> "V1V8Metrics": ) +@dataclass +class RegDump: + """DSI controller register snapshot read from device via memtool.""" + timestamp: str + capture_num: int + commands: list # list of memtool command strings that were run + registers: list # [{"address": "0x...", "value": "0x...", "name": "..."}, ...] + errors: list # any device-side errors + + def summary(self) -> str: + lines = [f"Capture {self.capture_num:04d} {self.timestamp} [reg/dsi_phy]"] + if self.errors: + for err in self.errors: + lines.append(f" WARNING: {err}") + if not self.registers: + lines.append(" No registers captured") + return "\n".join(lines) + lines.append(f" Commands : {'; '.join(self.commands)}") + for r in self.registers: + name = f" ({r['name']})" if r.get("name") else "" + lines.append(f" {r['address']} : {r['value']}{name}") + return "\n".join(lines) + + +def analyze_reg_file(path: Path) -> "RegDump": + """Read a register JSON file saved by mipi_test._fetch_registers().""" + m = re.match(r"(\d{8}_\d{6})_reg_(\d+)\.json", path.name, re.IGNORECASE) + if not m: + raise ValueError(f"Filename does not match register pattern: {path.name}") + timestamp, cap_str = m.groups() + data = json.loads(path.read_text()) + return RegDump( + timestamp = timestamp, + capture_num = int(cap_str), + commands = data.get("commands", []), + registers = data.get("registers", []), + errors = data.get("errors") or [], + ) + + def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: """ Scan data_dir and group CSV files by (timestamp, capture_number). @@ -427,17 +472,30 @@ def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: Example key: ("20260408_111448", 1) Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...} """ - pattern = re.compile( + csv_pattern = re.compile( r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE ) + reg_pattern = re.compile( + r"(\d{8}_\d{6})_reg_(\d+)\.json", re.IGNORECASE + ) groups: dict[tuple[str, int], dict[str, Path]] = {} + for f in sorted(data_dir.glob("*.csv")): - m = pattern.match(f.name) + m = csv_pattern.match(f.name) if not m: continue ts, ftype, cap_str, ch = m.groups() key = (ts, int(cap_str)) groups.setdefault(key, {})[f"{ftype}_{ch}"] = f + + for f in sorted(data_dir.glob("*.json")): + m = reg_pattern.match(f.name) + if not m: + continue + ts, cap_str = m.groups() + key = (ts, int(cap_str)) + groups.setdefault(key, {})["reg"] = f + return groups @@ -470,6 +528,11 @@ class LPMetrics: lp_transition_valid: bool # LP-11 → LP-low → HS sequence present + # Flicker detection + # A capture is flagged when the LP-low plateau is absent or shorter than + # FLICKER_LP_LOW_MAX_NS. Normal captures show ~340 ns; flicker shows 0–50 ns. + flicker_suspect: bool = False + warnings: list = field(default_factory=list) def summary(self) -> str: @@ -501,6 +564,8 @@ class LPMetrics: + (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else "")) if self.hs_amplitude_mv is not None: lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)") + if self.flicker_suspect: + lines.append(f" *** FLICKER SUSPECT: LP-low plateau absent or < {FLICKER_LP_LOW_MAX_NS:.0f} ns ***") for w in self.warnings: lines.append(f" WARNING: {w}") return "\n".join(lines) @@ -648,6 +713,15 @@ def analyze_lp_file(path: Path) -> "LPMetrics": if n_hs_bursts == 0: warnings.append("No HS bursts detected after LP transition") + # Flicker suspect: LP→HS sequence detected but LP-low plateau is absent or too short. + # Normal captures show ~340 ns; the confirmed flicker capture showed 0 ns. + # Only flag DAT lane (CLK is continuous HS — LP states not expected). + flicker_suspect = ( + channel == "dat" + and lp_transition_valid + and (lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS) + ) + return LPMetrics( timestamp = timestamp, capture_num = capture_num, @@ -663,6 +737,7 @@ def analyze_lp_file(path: Path) -> "LPMetrics": hs_burst_dur_ns = hs_burst_dur_ns, hs_amplitude_mv = hs_amplitude_mv, lp_transition_valid = lp_transition_valid, + flicker_suspect = flicker_suspect, warnings = warnings, ) diff --git a/device_server.py b/device_server.py new file mode 100644 index 0000000..c5640ce --- /dev/null +++ b/device_server.py @@ -0,0 +1,122 @@ +""" +device_server.py — deploy this on the target device (192.168.45.8) + +Provides: + PUT /display {"state": "on"|"off"} — blank/unblank framebuffer + GET /registers — read MIPI DSI PHY registers via memtool + +Add addresses to REGISTER_COMMANDS to capture more register ranges. +""" + +import os +import re +import subprocess + +from flask import Flask, jsonify, request + +app = Flask(__name__) + +# --------------------------------------------------------------------------- +# Register commands to execute on each GET /registers request. +# Each entry is a complete memtool command string. +# --------------------------------------------------------------------------- +REGISTER_COMMANDS = [ + "memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2 +] + +# Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini) +_DSIM_NAMES = { + 0x32e10004: "DSIM_STATUS", + 0x32e10008: "DSIM_CLKCTRL", + 0x32e1000c: "DSIM_TIMEOUT", + 0x32e10010: "DSIM_CONFIG", + 0x32e10014: "DSIM_ESCMODE", + 0x32e100ac: "DSIM_PHYACCHR", + 0x32e100b0: "DSIM_PHYACCHR1", + 0x32e100b4: "DSIM_PHYTIMING", + 0x32e100b8: "DSIM_PHYTIMING1", + 0x32e100bc: "DSIM_PHYTIMING2", +} + + +def _parse_memtool_output(raw: str) -> list: + """ + Parse 'memtool md -l' output into a list of dicts. + + Handles both formats: + 32e100b4: 00000001 12345678 ... + 0x32e100b4: 0x00000001 0x12345678 ... + """ + registers = [] + for line in raw.splitlines(): + line = line.strip() + if not line: + continue + m = re.match(r"(?:0x)?([0-9a-fA-F]+)\s*:\s*(.+)", line) + if not m: + continue + base_addr = int(m.group(1), 16) + values = re.findall(r"[0-9a-fA-F]{8}", m.group(2)) + for i, val in enumerate(values): + addr = base_addr + i * 4 + registers.append({ + "address": f"0x{addr:08x}", + "value": f"0x{val.lower()}", + "name": _DSIM_NAMES.get(addr, ""), + }) + return registers + + +# --------------------------------------------------------------------------- +# Routes +# --------------------------------------------------------------------------- + +@app.route("/display", methods=["PUT"]) +def control_display(): + data = request.get_json() + state = data.get("state", "").lower() + if state == "off": + os.system("echo 4 > /sys/class/graphics/fb0/blank") + return jsonify({"status": "Display OFF"}), 200 + elif state == "on": + os.system("echo 0 > /sys/class/graphics/fb0/blank") + return jsonify({"status": "Display ON"}), 200 + else: + return jsonify({"error": "Invalid state. Use 'on' or 'off'"}), 400 + + +@app.route("/registers", methods=["GET"]) +def get_registers(): + """Read MIPI DSI PHY timing registers via memtool and return JSON.""" + all_registers = [] + raw_lines = [] + errors = [] + + for cmd_str in REGISTER_COMMANDS: + try: + result = subprocess.run( + cmd_str.split(), capture_output=True, text=True, timeout=5 + ) + raw = result.stdout.strip() + if raw: + raw_lines.append(raw) + all_registers.extend(_parse_memtool_output(raw)) + if result.returncode != 0 and result.stderr.strip(): + errors.append(f"{cmd_str}: {result.stderr.strip()}") + except FileNotFoundError: + errors.append(f"{cmd_str}: memtool not found in PATH") + except subprocess.TimeoutExpired: + errors.append(f"{cmd_str}: timed out after 5 s") + except Exception as e: + errors.append(f"{cmd_str}: {e}") + + return jsonify({ + "commands": REGISTER_COMMANDS, + "registers": all_registers, + "raw": "\n".join(raw_lines), + "errors": errors if errors else None, + }), 200 + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000) diff --git a/mipi_test.py b/mipi_test.py index e3eea55..9d6d383 100644 --- a/mipi_test.py +++ b/mipi_test.py @@ -7,6 +7,7 @@ VERSION: 0.3 AUTHOR: D. RICE 25/03/2026 © 2026 ARRIVE """ +import json import vxi11 import time import sys @@ -19,10 +20,11 @@ import analyze_captures import rigol_scope # --- Configuration --- -URL = "http://192.168.45.8:5000/display" +DEVICE_BASE = "http://192.168.45.8:5000" +URL = f"{DEVICE_BASE}/display" SCOPE_IP = "192.168.45.4" PSU_IP = "192.168.45.3" -MGMT_INTERVAL = 60 # seconds between management runs (set to 3600 for hourly) +MGMT_INTERVAL = 3600 # seconds between management runs (3600 = 1 hour) # --- Capture settings --- # Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit) @@ -278,54 +280,59 @@ def _restore_hs_config(): time.sleep(0.1) +def _fetch_registers(ts: str, iteration: int) -> None: + """ + GET /registers from the device Flask server and save to data/ as JSON. + Reads MIPI DSI PHY timing registers via memtool on the target. + Non-fatal — a failed fetch prints a warning and returns without crashing. + """ + try: + resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5) + resp.raise_for_status() + data = resp.json() + if data.get("errors"): + print(f" REGISTERS: device warnings — {data['errors']}") + DATA_DIR.mkdir(exist_ok=True) + reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json" + reg_path.write_text(json.dumps(data, indent=2)) + n = len(data.get("registers", [])) + print(f" SAVED: {reg_path.name} ({n} registers)") + except requests.exceptions.RequestException as e: + print(f" REGISTERS: fetch failed — {e}") + except Exception as e: + print(f" REGISTERS: error — {e}") + + def dual_capture(iteration): """ - Two-pass capture per test iteration: - Pass 1 — signal quality (SIG_SCALE / SIG_POINTS) - Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS) - Restores the original 5 ns/div timebase when done. + Three-pass capture per test iteration. LP is captured FIRST so it catches + the SoT transition at pipeline startup — the moment flicker can occur. + HS quality and frame structure passes follow once the link is stable. + + Pass 1 — LP / SoT startup (no settle delay — fires immediately after display ON) + Pass 2 — signal quality (HS differential, rise/fall) + Pass 3 — frame structure (HS differential, jitter/freq) """ capture_done.clear() ts = datetime.now().strftime("%Y%m%d_%H%M%S") - print(f"DUAL CAPTURE #{iteration:04d} [{ts}]") + print(f"CAPTURE #{iteration:04d} [{ts}]") - # ── Pass 1: signal quality ───────────────────────────────────────────── - print(" PASS 1: SIGNAL QUALITY...") - _set_timebase(SIG_SCALE, SIG_POINTS) - if _arm_and_wait(): - _save_pass("sig", iteration, ts) - else: - print(" SKIPPING PASS 1 SAVE.") - - # ── Pass 2: frame/protocol structure ────────────────────────────────── - print(" PASS 2: FRAME STRUCTURE...") - _set_timebase(PROTO_SCALE, PROTO_POINTS) - if _arm_and_wait(): - _save_pass("proto", iteration, ts) - else: - print(" SKIPPING PASS 2 SAVE.") - - # ── Pass 3: LP / SoT structure + 1.8 V supply monitoring ───────────── - # Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers - # on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended. - # Rigol is armed first (non-blocking) so the LP→HS current step droops - # the 1.8 V rail and triggers the Rigol while the Agilent captures. - print(" PASS 3: LP TRANSITION...") + # ── Pass 1: LP / SoT startup transition ─────────────────────────────── + # Fired immediately after display ON (test_worker has no settle delay). + # Catches the first LP-11 → LP-01 → LP-00 → HS SoT sequence, which is + # where violations causing screen flicker occur. + print(" PASS 1: LP STARTUP TRANSITION...") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) if rigol_scope.is_connected(): - rigol_scope.arm() # arm Rigol before LP trigger so it catches the droop + rigol_scope.arm() # arm before Agilent so 1.8 V droop is captured if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: - print(" SKIPPING PASS 3 SAVE.") + print(" SKIPPING LP SAVE.") - # Collect Rigol 1.8 V waveform. - # The Agilent LP acquire + save takes ~35 s, so the Rigol will have - # long since auto-captured by now. read_waveform_csv() sends :STOP - # before reading to guarantee the acquisition is finalised. if rigol_scope.is_connected(): DATA_DIR.mkdir(exist_ok=True) v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" @@ -337,6 +344,27 @@ def dual_capture(iteration): _restore_hs_config() + # ── Pass 2: HS signal quality ────────────────────────────────────────── + # LP pass takes ~5–10 s total; the HS link is fully settled by now. + print(" PASS 2: SIGNAL QUALITY...") + _set_timebase(SIG_SCALE, SIG_POINTS) + if _arm_and_wait(): + _save_pass("sig", iteration, ts) + else: + print(" SKIPPING SIG SAVE.") + + # ── Pass 3: frame/protocol structure ────────────────────────────────── + print(" PASS 3: FRAME STRUCTURE...") + _set_timebase(PROTO_SCALE, PROTO_POINTS) + if _arm_and_wait(): + _save_pass("proto", iteration, ts) + else: + print(" SKIPPING PROTO SAVE.") + + # ── Fetch DSI register snapshot from device ─────────────────────────── + # Display is still ON here; registers reflect the active pipeline state. + _fetch_registers(ts, iteration) + # ── Restore original timebase ───────────────────────────────────────── _set_timebase(5e-9, 500_000) scope.write(":RUN") @@ -364,7 +392,7 @@ def mgmt_worker(): print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.") if copied > 0: try: - analyze_captures.run_analysis() + analyze_captures.run_analysis(last=30) except Exception as e: print(f"[MGMT] ANALYSIS ERROR: {e}") except Exception as e: @@ -394,7 +422,7 @@ def test_worker(): requests.put(URL, json={"state": "on"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display ON failed: {e}") - time.sleep(DISPLAY_SETTLE_S) + # No settle delay — LP pass fires immediately to catch startup SoT transition dual_capture(count) count += 1 try: