""" analyze_captures.py Groups MIPI oscilloscope CSV files by capture, runs csv_preprocessor on each, then sends the compact summaries to the Claude API for trend analysis. Usage: python analyze_captures.py # all captures in ./data python analyze_captures.py --last N # most recent N captures only python analyze_captures.py --capture 0001 # single capture by number """ import argparse import html import sys from datetime import datetime from pathlib import Path import anthropic from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") from csv_preprocessor import ( analyze_file, analyze_lp_file, analyze_1v8_file, analyze_reg_file, group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, RegDump, ) DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" CLAUDE_MODEL = "claude-opus-4-6" SYSTEM_PROMPT = ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will be given compact pre-processed summaries of oscilloscope captures " "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements " "and DSI controller register snapshots (Samsung DSIM IP on NXP i.MX 8M Mini). " "The MIPI PHY drives LP states from the 1.8 V VDDIO. " "Each capture has up to five data sets: " "sig (high-res HS quality), proto (long-window HS stats), " "lp (single-ended LP-11/LP-00/HS burst including SoT sequence), " "pwr (1.8 V supply captured during the LP→HS transition), " "and reg (DSIM register snapshot — DSIM_PHYTIMING at 0x32e100b4, " "DSIM_PHYTIMING1 at 0xb8, DSIM_PHYTIMING2 at 0xbc control LP state durations " "and PHY clock timing; DSIM_CLKCTRL at 0x08 and DSIM_ESCMODE at 0x14 affect " "LP escape mode and HS entry sequencing). " "Analyse the data for trends, degradation, anomalies, or consistent spec concerns " "across captures. Correlate register values with observed LP timing violations. " "Be concise and actionable." ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- FLICKER_LOG = Path(__file__).parent / "reports" / "flicker_log.csv" def _classify_flicker(keys: list, flicker_suspects: list) -> tuple[int, int]: """ Count flicker events (display sessions that flickered) vs total sessions in this batch. Each test iteration is one complete display load/unload session. Flicker is per-session: it occurs at pipeline load, persists for that session only, then clears automatically on the next load. A single suspect capture IS a genuine flicker event — not a measurement artifact — because the LP pass fires at startup. Returns (flicker_sessions, total_sessions). """ if not flicker_suspects: return 0, len(keys) # Count unique capture numbers that had at least one flicker suspect suspect_sessions = {m.capture_num for m in flicker_suspects} return len(suspect_sessions), len(keys) def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None: """Append a flicker suspect entry to the persistent flicker log.""" FLICKER_LOG.parent.mkdir(exist_ok=True) write_header = not FLICKER_LOG.exists() with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f: import csv as _csv w = _csv.writer(f) if write_header: w.writerow(["logged_at", "capture_ts", "capture_num", "channel", "lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"]) w.writerow([ datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ts, f"{num:04d}", m.channel, m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v, ]) def process_capture( ts: str, num: int, files: dict[str, Path], verbose: bool = False, ) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]: """ Run the pre-processor on all CSV files for one capture. Returns (text_summary, metrics_list, flicker_suspects, reg_dumps). Missing files produce a one-line note instead of crashing. """ lines = [f"=== Capture {num:04d} {ts} ==="] metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = [] flicker_suspects: list[LPMetrics] = [] reg_dumps: list[RegDump] = [] for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8", "reg"): if key not in files: if key == "pwr_1v8": lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") elif key == "reg": lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)") else: lines.append(f" [{key}] MISSING") continue try: if key.startswith("lp_"): m = analyze_lp_file(files[key]) elif key == "pwr_1v8": m = analyze_1v8_file(files[key]) elif key == "reg": m = analyze_reg_file(files[key]) else: m = analyze_file(files[key]) lines.append(m.summary()) metrics_list.append(m) if verbose: print(m.summary()) # Real-time flicker detection — log and alert immediately if isinstance(m, LPMetrics) and m.flicker_suspect: flicker_suspects.append(m) _log_flicker_event(ts, num, m) print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] " f"lp_low={m.lp_low_duration_ns} ns ***\n") if isinstance(m, RegDump): reg_dumps.append(m) except Exception as exc: lines.append(f" [{key}] ERROR: {exc}") return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps def build_prompt(all_summaries: list[str], flicker_suspects: list = None, flicker_count: int = 0, total_sessions: int = 0) -> str: body = "\n\n".join(all_summaries) flicker_section = "" if flicker_suspects and flicker_count > 0: items = "\n".join( f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} " f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns " f"lp11_v={m.lp11_voltage_v} V" for m in flicker_suspects ) rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)" flicker_section = ( f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n" f"Affected captures:\n{items}\n" "Each capture is one complete display pipeline load/unload cycle. Flicker is " "per-session: it occurs at pipeline load and persists for that session only, then " "clears automatically on the next load. A flagged capture therefore represents a " "genuine flicker event, not a measurement artifact.\n" "LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief " "for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it " "to drop a frame and produce visible flicker.\n" "Focus your analysis on WHY the SoT sequence is being violated at pipeline startup " "and what register setting, supply condition, or hardware change would prevent it.\n" ) return ( "Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore " "8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge. " "The system occasionally flickers at display pipeline load. " "Each capture has up to four data sets per lane (CLK and DAT0):\n" " sig — high-res HS differential (rise/fall times)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n" " lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n" " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n" f"{flicker_section}\n" f"{body}\n\n" "Please:\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" "2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, 1.8 V droop, etc.).\n" "3. Flag anomalies — missing LP transitions, short LP-low, unexpected burst counts.\n" "4. Correlate 1.8 V supply droop/ripple with MIPI LP anomalies — does droop depth or ripple " " correlate with SoT timing violations, short LP-low plateaux, or LP-11 voltage drops? " " If pwr data is absent, note that supply correlation could not be assessed.\n" "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" "7. Summarise overall signal health and flicker risk in 2–3 sentences." ) def save_html_report(analysis: str, token_line: str, keys: list, flicker_suspects: list = None, flicker_count: int = 0, total_sessions: int = 0, all_reg_dumps: list = None) -> Path: """Write a timestamped HTML report to the reports/ directory.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S_analysis.html") path = REPORTS_DIR / filename cap_range = ( f"Capture {keys[0][1]:04d}" if len(keys) == 1 else f"Captures {keys[0][1]:04d}–{keys[-1][1]:04d}" ) date_str = now.strftime("%Y-%m-%d %H:%M:%S") # Convert plain text analysis to basic HTML (preserve line breaks, bold **) def text_to_html(text: str) -> str: escaped = html.escape(text) # **bold** import re escaped = re.sub(r'\*\*(.+?)\*\*', r'\1', escaped) # Blank lines → paragraph breaks paragraphs = re.split(r'\n{2,}', escaped) parts = [] for para in paragraphs: lines = para.strip().splitlines() if not lines: continue # Numbered or bullet list if lines[0].lstrip().startswith(('1.', '2.', '3.', '-', '*')): items = ''.join(f'
' + '
'.join(lines) + '
Each flagged capture is a genuine flicker event (not an artifact) — the LP pass fires at
pipeline startup, so a missing or sub-50 ns LP-low plateau means the SN65DSI83 bridge
missed the SoT sequence and dropped a frame.
LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief
for the SN65DSI83 bridge to detect start-of-transmission.
| Capture | Timestamp | Channel | LP-low plateau | LP exit→HS | LP-11 voltage |
|---|
| Capture | Timestamp | {header_cells}
|---|
{html.escape(token_line)}
""" path.write_text(html_content, encoding="utf-8") return path # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def run_analysis(last: int = 10) -> None: """ Called by mgmt_worker after each file transfer. Analyses the most recent `last` captures and prints the Claude report. """ groups = group_captures(DATA_DIR) if not groups: print("[ANALYSIS] No captures found.") return keys = sorted(groups.keys())[-last:] print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") all_summaries: list[str] = [] all_flicker_suspects: list[LPMetrics] = [] all_reg_dumps: list[RegDump] = [] for ts, num in keys: summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)]) all_summaries.append(summary_text) all_flicker_suspects.extend(suspects) all_reg_dumps.extend(reg_dumps) flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" # ── Console ─────────────────────────────────────────────────────────── separator = "=" * 60 print(f"\n{separator}") print("CLAUDE ANALYSIS") print(separator) print(analysis) print(f"({token_line})") print(separator + "\n") # ── HTML report ─────────────────────────────────────────────────────── report_path = save_html_report(analysis, token_line, keys, all_flicker_suspects, flicker_count, total_sessions, all_reg_dumps) print(f"[ANALYSIS] Report saved to {report_path}") if flicker_count > 0: rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***") def main() -> None: parser = argparse.ArgumentParser(description="Analyse MIPI CSV captures with Claude") parser.add_argument("--last", type=int, default=None, metavar="N", help="Process only the N most recent captures") parser.add_argument("--capture", type=str, default=None, metavar="NUM", help="Process a single capture number (e.g. 0001)") parser.add_argument("--verbose", action="store_true", help="Print per-file summaries to stdout") parser.add_argument("--dry-run", action="store_true", help="Print summaries and prompt but do not call Claude API") args = parser.parse_args() # --- Discover and filter captures --- groups = group_captures(DATA_DIR) if not groups: print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) sys.exit(1) keys = sorted(groups.keys()) # sorted by (timestamp, capture_num) if args.capture is not None: target_num = int(args.capture) keys = [k for k in keys if k[1] == target_num] if not keys: print(f"Capture {args.capture} not found.", file=sys.stderr) sys.exit(1) if args.last is not None: keys = keys[-args.last:] print(f"Processing {len(keys)} capture(s) from {DATA_DIR}\n") # --- Run pre-processor --- all_summaries: list[str] = [] all_flicker_suspects: list[LPMetrics] = [] all_reg_dumps: list[RegDump] = [] for ts, num in keys: summary_text, _, suspects, reg_dumps = process_capture( ts, num, groups[(ts, num)], verbose=args.verbose) all_summaries.append(summary_text) all_flicker_suspects.extend(suspects) all_reg_dumps.extend(reg_dumps) if not args.verbose: flag = " *** FLICKER SUSPECT ***" if suspects else "" print(f" Processed capture {num:04d} {ts}{flag}") # --- Build Claude prompt --- flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects) prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions) if args.dry_run: print("\n--- Prompt that would be sent to Claude ---") print(prompt) return # --- Call Claude API --- print(f"\nSending {len(prompt):,} characters to {CLAUDE_MODEL}...\n") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 4096, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" separator = "=" * 60 # Console print(f"\n{separator}\nCLAUDE ANALYSIS\n{separator}") print(analysis) print(f"({token_line})") print(separator) # HTML report report_path = save_html_report(analysis, token_line, keys, all_flicker_suspects, flicker_count, total_sessions, all_reg_dumps) print(f"\nReport saved to {report_path}") if flicker_count > 0: rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0 print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions " f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***") if __name__ == "__main__": main()