""" analyze_captures.py Groups MIPI oscilloscope CSV files by capture, runs csv_preprocessor on each, then sends the compact summaries to the Claude API for trend analysis. Usage: python analyze_captures.py # all captures in ./data python analyze_captures.py --last N # most recent N captures only python analyze_captures.py --capture 0001 # single capture by number """ import argparse import html import sys from datetime import datetime from pathlib import Path import anthropic from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") from csv_preprocessor import ( analyze_file, analyze_lp_file, analyze_1v8_file, group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, ) DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" CLAUDE_MODEL = "claude-opus-4-6" SYSTEM_PROMPT = ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will be given compact pre-processed summaries of oscilloscope captures " "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements. " "The MIPI PHY (NXP i.MX 8M Mini) drives LP states from the 1.8 V VDDIO. " "Each capture has up to four data sets: " "sig (high-res HS quality), proto (long-window HS stats), " "lp (single-ended LP-11/LP-00/HS burst including SoT sequence), " "and pwr (1.8 V supply captured during the LP→HS transition). " "Analyse the data for trends, degradation, anomalies, or consistent spec concerns " "across captures. Be concise and actionable." ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def process_capture( ts: str, num: int, files: dict[str, Path], verbose: bool = False, ) -> tuple[str, list[ChannelMetrics]]: """ Run the pre-processor on all CSV files for one capture. Returns (text_summary, list_of_metrics). Missing files produce a one-line note instead of crashing. """ lines = [f"=== Capture {num:04d} {ts} ==="] metrics_list: list[ChannelMetrics | LPMetrics] = [] for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8"): if key not in files: if key == "pwr_1v8": lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") else: lines.append(f" [{key}] MISSING") continue try: if key.startswith("lp_"): m = analyze_lp_file(files[key]) elif key == "pwr_1v8": m = analyze_1v8_file(files[key]) else: m = analyze_file(files[key]) lines.append(m.summary()) metrics_list.append(m) if verbose: print(m.summary()) except Exception as exc: lines.append(f" [{key}] ERROR: {exc}") return "\n".join(lines), metrics_list def build_prompt(all_summaries: list[str]) -> str: body = "\n\n".join(all_summaries) return ( "Below are pre-processed summaries of MIPI D-PHY captures. " "Each capture has three passes per lane (CLK and DAT0):\n" " sig — high-res HS differential (rise/fall times)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n" " lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n" " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n\n" f"{body}\n\n" "Please:\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" "2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, 1.8 V droop, etc.).\n" "3. Flag anomalies — missing LP transitions, short LP-low, unexpected burst counts.\n" "4. Correlate 1.8 V supply droop/ripple with MIPI LP anomalies — does droop depth or ripple " " correlate with SoT timing violations, short LP-low plateaux, or LP-11 voltage drops? " " If pwr data is absent, note that supply correlation could not be assessed.\n" "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" "7. Summarise overall signal health in 2–3 sentences." ) def save_html_report(analysis: str, token_line: str, keys: list) -> Path: """Write a timestamped HTML report to the reports/ directory.""" REPORTS_DIR.mkdir(exist_ok=True) now = datetime.now() filename = now.strftime("%Y%m%d_%H%M%S_analysis.html") path = REPORTS_DIR / filename cap_range = ( f"Capture {keys[0][1]:04d}" if len(keys) == 1 else f"Captures {keys[0][1]:04d}–{keys[-1][1]:04d}" ) date_str = now.strftime("%Y-%m-%d %H:%M:%S") # Convert plain text analysis to basic HTML (preserve line breaks, bold **) def text_to_html(text: str) -> str: escaped = html.escape(text) # **bold** import re escaped = re.sub(r'\*\*(.+?)\*\*', r'\1', escaped) # Blank lines → paragraph breaks paragraphs = re.split(r'\n{2,}', escaped) parts = [] for para in paragraphs: lines = para.strip().splitlines() if not lines: continue # Numbered or bullet list if lines[0].lstrip().startswith(('1.', '2.', '3.', '-', '*')): items = ''.join(f'
' + '
'.join(lines) + '
{html.escape(token_line)}
""" path.write_text(html_content, encoding="utf-8") return path # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def run_analysis(last: int = 10) -> None: """ Called by mgmt_worker after each file transfer. Analyses the most recent `last` captures and prints the Claude report. """ groups = group_captures(DATA_DIR) if not groups: print("[ANALYSIS] No captures found.") return keys = sorted(groups.keys())[-last:] print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") all_summaries: list[str] = [] for ts, num in keys: summary_text, _ = process_capture(ts, num, groups[(ts, num)]) all_summaries.append(summary_text) prompt = build_prompt(all_summaries) print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 3072, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" # ── Console ─────────────────────────────────────────────────────────── separator = "=" * 60 print(f"\n{separator}") print("CLAUDE ANALYSIS") print(separator) print(analysis) print(f"({token_line})") print(separator + "\n") # ── HTML report ─────────────────────────────────────────────────────── report_path = save_html_report(analysis, token_line, keys) print(f"[ANALYSIS] Report saved to {report_path}") def main() -> None: parser = argparse.ArgumentParser(description="Analyse MIPI CSV captures with Claude") parser.add_argument("--last", type=int, default=None, metavar="N", help="Process only the N most recent captures") parser.add_argument("--capture", type=str, default=None, metavar="NUM", help="Process a single capture number (e.g. 0001)") parser.add_argument("--verbose", action="store_true", help="Print per-file summaries to stdout") parser.add_argument("--dry-run", action="store_true", help="Print summaries and prompt but do not call Claude API") args = parser.parse_args() # --- Discover and filter captures --- groups = group_captures(DATA_DIR) if not groups: print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) sys.exit(1) keys = sorted(groups.keys()) # sorted by (timestamp, capture_num) if args.capture is not None: target_num = int(args.capture) keys = [k for k in keys if k[1] == target_num] if not keys: print(f"Capture {args.capture} not found.", file=sys.stderr) sys.exit(1) if args.last is not None: keys = keys[-args.last:] print(f"Processing {len(keys)} capture(s) from {DATA_DIR}\n") # --- Run pre-processor --- all_summaries: list[str] = [] for ts, num in keys: summary_text, _ = process_capture(ts, num, groups[(ts, num)], verbose=args.verbose) all_summaries.append(summary_text) if not args.verbose: print(f" Processed capture {num:04d} {ts}") # --- Build Claude prompt --- prompt = build_prompt(all_summaries) if args.dry_run: print("\n--- Prompt that would be sent to Claude ---") print(prompt) return # --- Call Claude API --- print(f"\nSending {len(prompt):,} characters to {CLAUDE_MODEL}...\n") client = anthropic.Anthropic() message = client.messages.create( model = CLAUDE_MODEL, max_tokens = 3072, system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) analysis = message.content[0].text token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" separator = "=" * 60 # Console print(f"\n{separator}\nCLAUDE ANALYSIS\n{separator}") print(analysis) print(f"({token_line})") print(separator) # HTML report report_path = save_html_report(analysis, token_line, keys) print(f"\nReport saved to {report_path}") if __name__ == "__main__": main()