""" analyze_captures.py Groups MIPI oscilloscope CSV files by capture, runs csv_preprocessor on each, then sends the compact summaries to the Claude API for trend analysis. Usage: python analyze_captures.py # all captures in ./data python analyze_captures.py --last N # most recent N captures only python analyze_captures.py --capture 0001 # single capture by number """ import argparse import html import sys from datetime import datetime from pathlib import Path import anthropic from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") from csv_preprocessor import ( analyze_file, analyze_lp_file, analyze_1v8_file, analyze_reg_file, group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, RegDump, ) DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" # --------------------------------------------------------------------------- # Confirmed device configuration — update here if hardware changes # --------------------------------------------------------------------------- # Derived values (do not edit — calculated from the constants above): # HS bit rate = PIXEL_CLK_MHZ × COLOR_DEPTH_BPP / DSI_LANES # = 72 × 24 / 4 = 432 Mbit/s per lane # HS bit UI = 1 / 432e6 ≈ 2.315 ns # Byte clock = 432 / 8 = 54 MHz → 18.518 ns/byte # # MIPI D-PHY v1.1 Table 14 timing constraints at 432 Mbit/s (UI = 2.315 ns): # T_LPX ≥ 50 ns # T_HS-PREPARE 40+4·UI → 85+6·UI = 49.3–98.9 ns (individual range) # T_HS-PREPARE+T_HS-ZERO ≥ 145+10·UI = 168.2 ns (combined minimum; no per-field min on T_HS-ZERO) # T_HS-TRAIL ≥ max(8·UI, 60+4·UI) = 69.3 ns # T_HS-EXIT ≥ 100 ns # T_CLK-PREPARE 38–95 ns (individual range) # T_CLK-PREPARE+T_CLK-ZERO ≥ 300 ns (combined minimum; no per-field min on T_CLK-ZERO) # T_CLK-POST ≥ 60+52·UI = 180.4 ns # T_CLK-TRAIL ≥ 60 ns # # Samsung DSIM field mapping (verified against kernel log output): # PHYTIMING (0xb4): [15:8]=TLPX, [7:0]=THS_EXIT # PHYTIMING1 (0xb8): [31:24]=TCLK_PREPARE, [23:16]=TCLK_ZERO, # [15:8]=TCLK_POST, [7:0]=TCLK_TRAIL # PHYTIMING2 (0xbc): [23:16]=THS_PREPARE, [15:8]=THS_ZERO, [7:0]=THS_TRAIL # # Kernel driver computes: clk_zero = 300 − 38 = 262 ns minimum; # hs_zero = 145 + 10·UI − (40 + 4·UI) = 105 + 6·UI = 118.9 ns minimum # # Actual register values programmed by samsung-dsim driver (samsung_dsim_set_phy_ctrl): # "Round Best" (default): PHY_TIMING=00000305, PHY_TIMING1=020e0a03, PHY_TIMING2=00030605 # → MULTIPLE D-PHY SPEC VIOLATIONS (see dsim_phytiming_round_best below) # "Round Up": PHY_TIMING=00000306, PHY_TIMING1=030f0a04, PHY_TIMING2=00030706 # → ALL fields spec-compliant DEVICE_CONFIG = { "dsi_host": "NXP i.MX 8M Mini (Samsung DSIM IP, sec-dsim/samsung-dsim driver)", "dsi_bridge": "Texas Instruments SN65DSI83 (MIPI-to-LVDS)", "pixel_clock_mhz": 72, "dsi_lanes": 4, "color_format": "RGB888 (24 bpp)", # DSI clock is LOCKED to pixel clock by format geometry — not independently configurable: # 24 bpp / 4 lanes = 6 bit-clocks per pixel; DDR halves the clock → 3 × pixel clock # DSI clock (differential CLK pair) = 3 × 72 MHz = 216 MHz # HS bit rate = 216 MHz × 2 (DDR) = 432 Mbit/s per lane "dsi_clock_mhz": 216, # 3 × pixel_clock; differential CLK pair frequency "hs_bit_rate_mbps": 432, # 216 × 2 (DDR) = 72 × 24 / 4 "hs_ui_ns": 2.315, # 1 / 432e6 "byte_clock_mhz": 54, # 432 / 8 "byte_period_ns": 18.518, "vddio_v": 1.8, # Actual register values from kernel (samsung_dsim_set_phy_ctrl) — two rounding modes: "dsim_phytiming_round_best": { "PHYTIMING (0xb4)": "0x00000305 (TLPX=3→55.6ns ✓, THS_EXIT=5→92.6ns < 100ns ✗)", "PHYTIMING1 (0xb8)": "0x020e0a03 (TCLK_PREPARE=2→37.0ns < 38ns ✗, TCLK_ZERO=14, PREP+ZERO=16bc→296ns < 300ns ✗, TCLK_POST=10→185ns ✓, TCLK_TRAIL=3→55.6ns < 60ns ✗)", "PHYTIMING2 (0xbc)": "0x00030605 (THS_PREPARE=3→55.6ns ✓, THS_ZERO=6, PREP+ZERO=9bc→166.7ns < 168.2ns ✗, THS_TRAIL=5→92.6ns ✓)", "verdict": "NON-COMPLIANT — 5 D-PHY violations; flicker risk at SoT", }, "dsim_phytiming_round_up": { "PHYTIMING (0xb4)": "0x00000306 (TLPX=3→55.6ns ✓, THS_EXIT=6→111.1ns ✓)", "PHYTIMING1 (0xb8)": "0x030f0a04 (TCLK_PREPARE=3→55.6ns ✓, TCLK_ZERO=15, PREP+ZERO=18bc→333ns ✓, TCLK_POST=10→185ns ✓, TCLK_TRAIL=4→74.1ns ✓)", "PHYTIMING2 (0xbc)": "0x00030706 (THS_PREPARE=3→55.6ns ✓, THS_ZERO=7, PREP+ZERO=10bc→185.2ns ✓, THS_TRAIL=6→111.1ns ✓)", "verdict": "FULLY COMPLIANT with D-PHY v1.1 Table 14", }, } CLAUDE_MODEL = "claude-opus-4-6" SYSTEM_PROMPT = ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will be given compact pre-processed summaries of oscilloscope captures " "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements " "and DSI controller register snapshots (Samsung DSIM IP on NXP i.MX 8M Mini). " "The MIPI PHY drives LP states from the 1.8 V VDDIO. " "Each capture has up to five data sets: " "sig (high-res HS quality), proto (long-window HS stats), " "lp (single-ended LP-11/LP-00/HS burst including SoT sequence), " "pwr (1.8 V supply captured during the LP→HS transition), " "and reg (DSIM register snapshot — DSIM_PHYTIMING at 0x32e100b4, " "DSIM_PHYTIMING1 at 0xb8, DSIM_PHYTIMING2 at 0xbc control LP/HS state durations; " "timing fields are in byte-clock units where 1 unit = 18.518 ns at 432 Mbit/s). " "Analyse the data for trends, degradation, anomalies, or consistent spec concerns " "across captures. Correlate register field values with observed LP timing violations. " "Be concise and actionable." ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- FLICKER_LOG = Path(__file__).parent / "reports" / "flicker_log.csv" def _classify_flicker(keys: list, flicker_suspects: list) -> tuple[int, int]: """ Count flicker events (display sessions that flickered) vs total sessions in this batch. Each test iteration is one complete display load/unload session. Flicker is per-session: it occurs at pipeline load, persists for that session only, then clears automatically on the next load. A single suspect capture IS a genuine flicker event — not a measurement artifact — because the LP pass fires at startup. Returns (flicker_sessions, total_sessions). """ if not flicker_suspects: return 0, len(keys) # Count unique capture numbers that had at least one flicker suspect suspect_sessions = {m.capture_num for m in flicker_suspects} return len(suspect_sessions), len(keys) def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None: """Append a flicker suspect entry to the persistent flicker log.""" FLICKER_LOG.parent.mkdir(exist_ok=True) write_header = not FLICKER_LOG.exists() with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f: import csv as _csv w = _csv.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "channel", "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"]) w.writerow([ datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ts, f"{num:04d}", m.channel, m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v, ]) def process_capture( ts: str, num: int, files: dict[str, Path], verbose: bool = False, ) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]: """ Run the pre-processor on all CSV files for one capture. Returns (text_summary, metrics_list, flicker_suspects, reg_dumps). Missing files produce a one-line note instead of crashing. """ lines = [f"=== Capture {num:04d} {ts} ==="] metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = [] flicker_suspects: list[LPMetrics] = [] reg_dumps: list[RegDump] = [] for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8", "reg"): if key not in files: if key == "pwr_1v8": lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") elif key == "reg": lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)") else: lines.append(f" [{key}] MISSING") continue try: if key.startswith("lp_"): m = analyze_lp_file(files[key]) elif key == "pwr_1v8": m = analyze_1v8_file(files[key]) elif key == "reg": m = analyze_reg_file(files[key]) else: m = analyze_file(files[key]) lines.append(m.summary()) metrics_list.append(m) if verbose: print(m.summary()) # Real-time flicker detection — log and alert immediately if isinstance(m, LPMetrics) and m.flicker_suspect: flicker_suspects.append(m) _log_flicker_event(ts, num, m) print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] " f"lp_low={m.lp_low_duration_ns} ns ***\n") if isinstance(m, RegDump): reg_dumps.append(m) except Exception as exc: lines.append(f" [{key}] ERROR: {exc}") return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps def build_prompt(all_summaries: list[str], flicker_suspects: list = None, flicker_count: int = 0, total_sessions: int = 0) -> str: cfg = DEVICE_CONFIG best = cfg["dsim_phytiming_round_best"] up = cfg["dsim_phytiming_round_up"] config_section = ( f"Device under test:\n" f" DSI host: {cfg['dsi_host']}\n" f" DSI bridge: {cfg['dsi_bridge']}\n" f" Pixel clock: {cfg['pixel_clock_mhz']} MHz\n" f" DSI lanes: {cfg['dsi_lanes']} data lanes\n" f" Color format: {cfg['color_format']}\n" f" DSI clock: {cfg['dsi_clock_mhz']} MHz " f"(= 3 × pixel clock; fixed by 24bpp/4lanes/DDR — not independently tunable)\n" f" HS bit rate: {cfg['hs_bit_rate_mbps']} Mbit/s per lane " f"(= DSI clock × 2, DDR)\n" f" HS UI: {cfg['hs_ui_ns']:.3f} ns\n" f" Byte clock: {cfg['byte_clock_mhz']} MHz ({cfg['byte_period_ns']:.3f} ns/byte)\n" f" VDDIO: {cfg['vddio_v']} V\n" f" DSIM PHY timing — 'Round Best' mode (D-PHY v1.1 non-compliant):\n" + "\n".join(f" {k}: {v}" for k, v in best.items()) + f"\n DSIM PHY timing — 'Round Up' mode (D-PHY v1.1 fully compliant):\n" + "\n".join(f" {k}: {v}" for k, v in up.items()) + f"\n NOTE: Compare captured register values against both modes above. " f"'Round Best' has 5 spec violations that could cause non-deterministic SoT failures." ) body = "\n\n".join(all_summaries) flicker_section = "" if flicker_suspects and flicker_count > 0: items = "\n".join( f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} " f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns " f"lp11_v={m.lp11_voltage_v} V" for m in flicker_suspects ) rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)" flicker_section = ( f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n" f"Affected captures:\n{items}\n" "Each capture is one complete display pipeline load/unload cycle. Flicker is " "per-session: it occurs at pipeline load and persists for that session only, then " "clears automatically on the next load. A flagged capture therefore represents a " "genuine flicker event, not a measurement artifact.\n" "LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief " "for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it " "to drop a frame and produce visible flicker.\n" "Focus your analysis on WHY the SoT sequence is being violated at pipeline startup " "and what register setting, supply condition, or hardware change would prevent it.\n" ) return ( "Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore " "8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge.\n\n" "Flicker behaviour (confirmed by observation):\n" " - The system is BISTABLE at initialisation. The state is determined entirely at\n" " pipeline load (LP→HS SoT sequence) and does not change during a session.\n" " - State A (good): SoT succeeds → bridge locks → display runs indefinitely, " "perfectly stable, no flicker.\n" " - State B (bad): SoT fails → bridge stuck → flickers indefinitely until the\n" " display pipeline is unloaded and reloaded (no other recovery).\n" " - After a reset the system is almost always in State A — good state is strongly\n" " preferred but State B occurs intermittently at pipeline load.\n" " - Nothing ongoing (supply noise, EMI, temperature drift) causes or ends flicker;\n" " the root cause is non-deterministic and acts only at the LP→HS SoT moment.\n" " - The LP pass captures the very first SoT transition at pipeline startup.\n" " Any LP timing anomaly in these captures is a genuine flicker event, not noise.\n\n" "Each capture has up to five data sets per lane (CLK and DAT0):\n" " sig — high-res HS differential (rise/fall times)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n" " lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n" " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n" " reg — DSIM PHY timing register snapshot from running device\n" f"\n{config_section}\n" f"{flicker_section}\n" f"{body}\n\n" "Please:\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" "2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, 1.8 V droop, etc.).\n" "3. Flag anomalies — missing LP transitions, short LP-low, unexpected burst counts.\n" "4. Correlate 1.8 V supply droop/ripple with MIPI LP anomalies — does droop depth or ripple " " correlate with SoT timing violations, short LP-low plateaux, or LP-11 voltage drops? " " If pwr data is absent, note that supply correlation could not be assessed.\n" "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" "7. Summarise overall signal health and flicker risk in 2–3 sentences." ) def save_html_report(analysis: str, token_line: str, keys: list, flicker_suspects: list = None, flicker_count: int = 0, total_sessions: int = 0, all_reg_dumps: list = None) -> Path: """Write a timestamped HTML report to the reports/ directory.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S_analysis.html") path = REPORTS_DIR / filename cap_range = ( f"Capture {keys[0][1]:04d}" if len(keys) == 1 else f"Captures {keys[0][1]:04d}–{keys[-1][1]:04d}" ) date_str = now.strftime("%Y-%m-%d %H:%M:%S") # Convert plain text analysis to basic HTML (preserve line breaks, bold **) def text_to_html(text: str) -> str: escaped = html.escape(text) # **bold** import re escaped = re.sub(r'\*\*(.+?)\*\*', r'\1', escaped) # Blank lines → paragraph breaks paragraphs = re.split(r'\n{2,}', escaped) parts = [] for para in paragraphs: lines = para.strip().splitlines() if not lines: continue # Numbered or bullet list if lines[0].lstrip().startswith(('1.', '2.', '3.', '-', '*')): items = ''.join(f'
' + '
'.join(lines) + '
Each flagged capture is a genuine flicker event (not an artifact) — the LP pass fires at
pipeline startup, so a missing or sub-50 ns LP-low plateau means the SN65DSI83 bridge
missed the SoT sequence and dropped a frame.
LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief
for the SN65DSI83 bridge to detect start-of-transmission.
| Capture | Timestamp | Channel | LP-low plateau | LP exit→HS | LP-11 voltage |
|---|
| Capture | Timestamp | {header_cells}
|---|
{html.escape(token_line)}
""" path.write_text(html_content, encoding="utf-8") return path # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def run_analysis(last: int = 10) -> None: """ Called by mgmt_worker after each file transfer. Analyses the most recent `last` captures and prints the Claude report. """ groups = group_captures(DATA_DIR) if not groups: print("[ANALYSIS] No captures found.") return keys = sorted(groups.keys())[-last:] print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") all_summaries: list[str] = [] all_flicker_suspects: list[LPMetrics] = [] all_reg_dumps: list[RegDump] = [] for ts, num in keys: summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)]) all_summaries.append(summary_text) all_flicker_suspects.extend(suspects) all_reg_dumps.extend(reg_dumps) flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" # ── Console ─────────────────────────────────────────────────────────── separator = "=" * 60 print(f"\n{separator}") print("CLAUDE ANALYSIS") print(separator) print(analysis) print(f"({token_line})") print(separator + "\n") # ── HTML report ─────────────────────────────────────────────────────── report_path = save_html_report(analysis, token_line, keys, all_flicker_suspects, flicker_count, total_sessions, all_reg_dumps) print(f"[ANALYSIS] Report saved to {report_path}") if flicker_count > 0: rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***") def main() -> None: parser = argparse.ArgumentParser(description="Analyse MIPI CSV captures with Claude") parser.add_argument("--last", type=int, default=None, metavar="N", help="Process only the N most recent captures") parser.add_argument("--capture", type=str, default=None, metavar="NUM", help="Process a single capture number (e.g. 0001)") parser.add_argument("--verbose", action="store_true", help="Print per-file summaries to stdout") parser.add_argument("--dry-run", action="store_true", help="Print summaries and prompt but do not call Claude API") args = parser.parse_args() # --- Discover and filter captures --- groups = group_captures(DATA_DIR) if not groups: print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) sys.exit(1) keys = sorted(groups.keys()) # sorted by (timestamp, capture_num) if args.capture is not None: target_num = int(args.capture) keys = [k for k in keys if k[1] == target_num] if not keys: print(f"Capture {args.capture} not found.", file=sys.stderr) sys.exit(1) if args.last is not None: keys = keys[-args.last:] print(f"Processing {len(keys)} capture(s) from {DATA_DIR}\n") # --- Run pre-processor --- all_summaries: list[str] = [] all_flicker_suspects: list[LPMetrics] = [] all_reg_dumps: list[RegDump] = [] for ts, num in keys: summary_text, _, suspects, reg_dumps = process_capture( ts, num, groups[(ts, num)], verbose=args.verbose) all_summaries.append(summary_text) all_flicker_suspects.extend(suspects) all_reg_dumps.extend(reg_dumps) if not args.verbose: flag = " *** FLICKER SUSPECT ***" if suspects else "" print(f" Processed capture {num:04d} {ts}{flag}") # --- Build Claude prompt --- flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) if args.dry_run: print("\n--- Prompt that would be sent to Claude ---") print(prompt) return # --- Call Claude API --- print(f"\nSending {len(prompt):,} characters to {CLAUDE_MODEL}...\n") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" separator = "=" * 60 # Console print(f"\n{separator}\nCLAUDE ANALYSIS\n{separator}") print(analysis) print(f"({token_line})") print(separator) # HTML report report_path = save_html_report(analysis, token_line, keys, all_flicker_suspects, flicker_count, total_sessions, all_reg_dumps) print(f"\nReport saved to {report_path}") if flicker_count > 0: rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***") if __name__ == "__main__": main()