Files
MiPi_TEST/analyze_captures.py

590 lines
27 KiB
Python
Raw Normal View History

2026-04-08 12:55:34 +01:00
"""
analyze_captures.py
Groups MIPI oscilloscope CSV files by capture, runs csv_preprocessor on each,
then sends the compact summaries to the Claude API for trend analysis.
Usage:
python analyze_captures.py # all captures in ./data
python analyze_captures.py --last N # most recent N captures only
python analyze_captures.py --capture 0001 # single capture by number
"""
import argparse
2026-04-08 15:42:51 +01:00
import html
2026-04-08 12:55:34 +01:00
import sys
2026-04-08 14:19:31 +01:00
from datetime import datetime
2026-04-08 12:55:34 +01:00
from pathlib import Path
import anthropic
2026-04-08 15:42:51 +01:00
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
2026-04-08 12:55:34 +01:00
2026-04-09 08:45:57 +01:00
from csv_preprocessor import (
2026-04-09 10:29:53 +01:00
analyze_file, analyze_lp_file, analyze_1v8_file, analyze_reg_file,
group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, RegDump,
2026-04-09 08:45:57 +01:00
)
2026-04-08 12:55:34 +01:00
2026-04-08 15:42:51 +01:00
DATA_DIR = Path(__file__).parent / "data"
REPORTS_DIR = Path(__file__).parent / "reports"
2026-04-08 12:55:34 +01:00
2026-04-09 11:03:35 +01:00
# ---------------------------------------------------------------------------
# Confirmed device configuration — update here if hardware changes
# ---------------------------------------------------------------------------
# Derived values (do not edit — calculated from the constants above):
# HS bit rate = PIXEL_CLK_MHZ × COLOR_DEPTH_BPP / DSI_LANES
# = 72 × 24 / 4 = 432 Mbit/s per lane
# HS bit UI = 1 / 432e6 ≈ 2.315 ns
# Byte clock = 432 / 8 = 54 MHz → 18.518 ns/byte
#
# MIPI D-PHY v1.1 minimum timings at 432 Mbit/s:
# T_LPX ≥ 50 ns
# T_HS-PREPARE 40+4·UI → 85+6·UI = 49.398.9 ns
# T_HS-ZERO ≥ 145+10·UI = 168.2 ns
# T_HS-TRAIL ≥ max(8·UI, 60+4·UI) = 69.3 ns
# T_HS-EXIT ≥ 100 ns
# T_CLK-PREPARE 3895 ns
# T_CLK-ZERO ≥ 300 ns
# T_CLK-POST ≥ 60+52·UI = 180.4 ns
# T_CLK-TRAIL ≥ 60 ns
#
# In byte-clock units (÷ 18.518 ns, round up):
# TLPX = 3 (55.6 ns)
# THS_PREPARE = 3 (55.6 ns — within 49.398.9 ns window)
# THS_ZERO = 10 (185.2 ns ≥ 168.2 ns ✓)
# THS_TRAIL = 4 (74.1 ns ≥ 69.3 ns ✓)
# THS_EXIT = 6 (111.1 ns ≥ 100 ns ✓)
# TCLK_PREPARE = 3 (55.6 ns — within 3895 ns ✓)
# TCLK_ZERO = 17 (314.8 ns ≥ 300 ns ✓)
# TCLK_POST = 10 (185.2 ns ≥ 180.4 ns ✓)
# TCLK_TRAIL = 4 (74.1 ns ≥ 60 ns ✓)
DEVICE_CONFIG = {
"dsi_host": "NXP i.MX 8M Mini (Samsung DSIM IP, sec-dsim/samsung-dsim driver)",
"dsi_bridge": "Texas Instruments SN65DSI83 (MIPI-to-LVDS)",
"pixel_clock_mhz": 72,
"dsi_lanes": 4,
"color_format": "RGB888 (24 bpp)",
2026-04-09 11:22:46 +01:00
# DSI clock is LOCKED to pixel clock by format geometry — not independently configurable:
# 24 bpp / 4 lanes = 6 bit-clocks per pixel; DDR halves the clock → 3 × pixel clock
# DSI clock (differential CLK pair) = 3 × 72 MHz = 216 MHz
# HS bit rate = 216 MHz × 2 (DDR) = 432 Mbit/s per lane
"dsi_clock_mhz": 216, # 3 × pixel_clock; differential CLK pair frequency
"hs_bit_rate_mbps": 432, # 216 × 2 (DDR) = 72 × 24 / 4
2026-04-09 11:03:35 +01:00
"hs_ui_ns": 2.315, # 1 / 432e6
"byte_clock_mhz": 54, # 432 / 8
"byte_period_ns": 18.518,
"vddio_v": 1.8,
2026-04-09 11:22:46 +01:00
# Samsung DSIM PHY timing field mapping (sec_mipi_dsim.c / samsung-dsim.c):
# PHYTIMING (0xb4): [15:8]=TLPX, [7:0]=THS_EXIT
2026-04-09 11:03:35 +01:00
# PHYTIMING1 (0xb8): [31:24]=TCLK_PREPARE, [23:16]=TCLK_ZERO,
2026-04-09 11:22:46 +01:00
# [15:8]=TCLK_POST, [7:0]=TCLK_TRAIL
# PHYTIMING2 (0xbc): [23:16]=THS_TRAIL, [15:8]=THS_ZERO, [7:0]=THS_PREPARE
# All fields in byte-clock units (÷ 18.518 ns, rounded up) per MIPI D-PHY v1.1:
2026-04-09 11:03:35 +01:00
"dsim_phytiming_target": {
2026-04-09 11:22:46 +01:00
"PHYTIMING (0xb4)": "0x00000306 (TLPX=3→55.6ns ≥50ns ✓, THS_EXIT=6→111ns ≥100ns ✓)",
"PHYTIMING1 (0xb8)": "0x03110A04 (TCLK_PREPARE=3→55.6ns, TCLK_ZERO=17→315ns ≥300ns ✓, TCLK_POST=10→185ns ≥180ns ✓, TCLK_TRAIL=4→74ns ≥60ns ✓)",
"PHYTIMING2 (0xbc)": "0x00040A03 (THS_TRAIL=4→74ns ≥69ns ✓, THS_ZERO=10→185ns ≥168ns ✓, THS_PREPARE=3→56ns within 49-99ns ✓)",
2026-04-09 11:03:35 +01:00
},
}
2026-04-08 12:55:34 +01:00
CLAUDE_MODEL = "claude-opus-4-6"
SYSTEM_PROMPT = (
"You are an expert in MIPI D-PHY signal integrity analysis. "
"You will be given compact pre-processed summaries of oscilloscope captures "
2026-04-09 10:29:53 +01:00
"from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements "
"and DSI controller register snapshots (Samsung DSIM IP on NXP i.MX 8M Mini). "
"The MIPI PHY drives LP states from the 1.8 V VDDIO. "
"Each capture has up to five data sets: "
2026-04-09 08:45:57 +01:00
"sig (high-res HS quality), proto (long-window HS stats), "
"lp (single-ended LP-11/LP-00/HS burst including SoT sequence), "
2026-04-09 10:29:53 +01:00
"pwr (1.8 V supply captured during the LP→HS transition), "
"and reg (DSIM register snapshot — DSIM_PHYTIMING at 0x32e100b4, "
2026-04-09 11:03:35 +01:00
"DSIM_PHYTIMING1 at 0xb8, DSIM_PHYTIMING2 at 0xbc control LP/HS state durations; "
"timing fields are in byte-clock units where 1 unit = 18.518 ns at 432 Mbit/s). "
2026-04-08 12:55:34 +01:00
"Analyse the data for trends, degradation, anomalies, or consistent spec concerns "
2026-04-09 11:03:35 +01:00
"across captures. Correlate register field values with observed LP timing violations. "
2026-04-09 10:29:53 +01:00
"Be concise and actionable."
2026-04-08 12:55:34 +01:00
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
2026-04-09 10:29:53 +01:00
FLICKER_LOG = Path(__file__).parent / "reports" / "flicker_log.csv"
def _classify_flicker(keys: list, flicker_suspects: list) -> tuple[int, int]:
"""
Count flicker events (display sessions that flickered) vs total sessions in this batch.
Each test iteration is one complete display load/unload session. Flicker is
per-session: it occurs at pipeline load, persists for that session only, then
clears automatically on the next load. A single suspect capture IS a genuine
flicker event not a measurement artifact because the LP pass fires at startup.
Returns (flicker_sessions, total_sessions).
"""
if not flicker_suspects:
return 0, len(keys)
# Count unique capture numbers that had at least one flicker suspect
suspect_sessions = {m.capture_num for m in flicker_suspects}
return len(suspect_sessions), len(keys)
def _log_flicker_event(ts: str, num: int, m: "LPMetrics") -> None:
"""Append a flicker suspect entry to the persistent flicker log."""
FLICKER_LOG.parent.mkdir(exist_ok=True)
write_header = not FLICKER_LOG.exists()
with open(FLICKER_LOG, "a", newline="", encoding="utf-8") as f:
import csv as _csv
w = _csv.writer(f)
if write_header:
w.writerow(["logged_at", "capture_ts", "capture_num", "channel",
"lp_low_duration_ns", "lp11_to_hs_ns", "lp11_voltage_v"])
w.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
ts, f"{num:04d}", m.channel,
m.lp_low_duration_ns, m.lp11_to_hs_ns, m.lp11_voltage_v,
])
2026-04-08 12:55:34 +01:00
def process_capture(
ts: str,
num: int,
files: dict[str, Path],
verbose: bool = False,
2026-04-09 10:29:53 +01:00
) -> tuple[str, list, list["LPMetrics"], list["RegDump"]]:
2026-04-08 12:55:34 +01:00
"""
Run the pre-processor on all CSV files for one capture.
2026-04-09 10:29:53 +01:00
Returns (text_summary, metrics_list, flicker_suspects, reg_dumps).
2026-04-08 12:55:34 +01:00
Missing files produce a one-line note instead of crashing.
"""
lines = [f"=== Capture {num:04d} {ts} ==="]
2026-04-09 10:29:53 +01:00
metrics_list: list[ChannelMetrics | LPMetrics | V1V8Metrics | RegDump] = []
flicker_suspects: list[LPMetrics] = []
reg_dumps: list[RegDump] = []
2026-04-08 12:55:34 +01:00
2026-04-09 10:29:53 +01:00
for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat",
"pwr_1v8", "reg"):
2026-04-08 12:55:34 +01:00
if key not in files:
2026-04-09 08:45:57 +01:00
if key == "pwr_1v8":
lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)")
2026-04-09 10:29:53 +01:00
elif key == "reg":
lines.append(f" [{key}] NOT CAPTURED (device unreachable or memtool error)")
2026-04-09 08:45:57 +01:00
else:
lines.append(f" [{key}] MISSING")
2026-04-08 12:55:34 +01:00
continue
try:
if key.startswith("lp_"):
m = analyze_lp_file(files[key])
2026-04-09 08:45:57 +01:00
elif key == "pwr_1v8":
m = analyze_1v8_file(files[key])
2026-04-09 10:29:53 +01:00
elif key == "reg":
m = analyze_reg_file(files[key])
2026-04-08 12:55:34 +01:00
else:
m = analyze_file(files[key])
lines.append(m.summary())
metrics_list.append(m)
if verbose:
print(m.summary())
2026-04-09 10:29:53 +01:00
# Real-time flicker detection — log and alert immediately
if isinstance(m, LPMetrics) and m.flicker_suspect:
flicker_suspects.append(m)
_log_flicker_event(ts, num, m)
print(f"\n *** FLICKER SUSPECT: capture {num:04d} [{ts}] "
f"lp_low={m.lp_low_duration_ns} ns ***\n")
if isinstance(m, RegDump):
reg_dumps.append(m)
2026-04-08 12:55:34 +01:00
except Exception as exc:
lines.append(f" [{key}] ERROR: {exc}")
2026-04-09 10:29:53 +01:00
return "\n".join(lines), metrics_list, flicker_suspects, reg_dumps
2026-04-08 12:55:34 +01:00
2026-04-09 10:29:53 +01:00
def build_prompt(all_summaries: list[str], flicker_suspects: list = None,
flicker_count: int = 0, total_sessions: int = 0) -> str:
2026-04-09 11:03:35 +01:00
cfg = DEVICE_CONFIG
target = cfg["dsim_phytiming_target"]
config_section = (
f"Device under test:\n"
f" DSI host: {cfg['dsi_host']}\n"
f" DSI bridge: {cfg['dsi_bridge']}\n"
f" Pixel clock: {cfg['pixel_clock_mhz']} MHz\n"
f" DSI lanes: {cfg['dsi_lanes']} data lanes\n"
f" Color format: {cfg['color_format']}\n"
2026-04-09 11:22:46 +01:00
f" DSI clock: {cfg['dsi_clock_mhz']} MHz "
f"(= 3 × pixel clock; fixed by 24bpp/4lanes/DDR — not independently tunable)\n"
f" HS bit rate: {cfg['hs_bit_rate_mbps']} Mbit/s per lane "
f"(= DSI clock × 2, DDR)\n"
2026-04-09 11:03:35 +01:00
f" HS UI: {cfg['hs_ui_ns']:.3f} ns\n"
f" Byte clock: {cfg['byte_clock_mhz']} MHz ({cfg['byte_period_ns']:.3f} ns/byte)\n"
f" VDDIO: {cfg['vddio_v']} V\n"
2026-04-09 11:22:46 +01:00
f" Target DSIM PHY timing registers (byte-clock units, MIPI D-PHY v1.1 compliant):\n"
2026-04-09 11:03:35 +01:00
+ "\n".join(f" {k}: {v}" for k, v in target.items())
)
2026-04-08 12:55:34 +01:00
body = "\n\n".join(all_summaries)
2026-04-09 10:29:53 +01:00
flicker_section = ""
if flicker_suspects and flicker_count > 0:
items = "\n".join(
f" - Capture {m.capture_num:04d} [{m.timestamp}] channel={m.channel} "
f"lp_low={m.lp_low_duration_ns} ns lp11_to_hs={m.lp11_to_hs_ns} ns "
f"lp11_v={m.lp11_voltage_v} V"
for m in flicker_suspects
)
rate = f"{flicker_count}/{total_sessions} display load sessions ({100*flicker_count/total_sessions:.0f}%)"
flicker_section = (
f"\n\nALERT — FLICKER DETECTED: {rate} produced screen flicker in this batch.\n"
f"Affected captures:\n{items}\n"
"Each capture is one complete display pipeline load/unload cycle. Flicker is "
"per-session: it occurs at pipeline load and persists for that session only, then "
"clears automatically on the next load. A flagged capture therefore represents a "
"genuine flicker event, not a measurement artifact.\n"
"LP-low plateau < 50 ns means the LP-01/LP-00 SoT states are absent or too brief "
"for the SN65DSI83 MIPI/LVDS bridge to detect start-of-transmission, causing it "
"to drop a frame and produce visible flicker.\n"
"Focus your analysis on WHY the SoT sequence is being violated at pipeline startup "
"and what register setting, supply condition, or hardware change would prevent it.\n"
)
2026-04-08 12:55:34 +01:00
return (
2026-04-09 10:29:53 +01:00
"Below are pre-processed summaries of MIPI D-PHY captures from a Digi ConnectCore "
2026-04-09 11:22:46 +01:00
"8M Mini SOM (NXP i.MX 8M Mini) driving a SN65DSI83 MIPI-to-LVDS bridge.\n\n"
"Flicker behaviour (confirmed by observation):\n"
" - The system is BISTABLE at initialisation. The state is determined entirely at\n"
" pipeline load (LP→HS SoT sequence) and does not change during a session.\n"
" - State A (good): SoT succeeds → bridge locks → display runs indefinitely, "
"perfectly stable, no flicker.\n"
" - State B (bad): SoT fails → bridge stuck → flickers indefinitely until the\n"
" display pipeline is unloaded and reloaded (no other recovery).\n"
" - After a reset the system is almost always in State A — good state is strongly\n"
" preferred but State B occurs intermittently at pipeline load.\n"
" - Nothing ongoing (supply noise, EMI, temperature drift) causes or ends flicker;\n"
" the root cause is non-deterministic and acts only at the LP→HS SoT moment.\n"
" - The LP pass captures the very first SoT transition at pipeline startup.\n"
" Any LP timing anomaly in these captures is a genuine flicker event, not noise.\n\n"
2026-04-09 11:03:35 +01:00
"Each capture has up to five data sets per lane (CLK and DAT0):\n"
2026-04-08 12:55:34 +01:00
" sig — high-res HS differential (rise/fall times)\n"
" proto — long-window HS differential (jitter, clock freq, amplitude)\n"
2026-04-09 10:29:53 +01:00
" lp — single-ended LP state capture at pipeline startup (LP-11, SoT sequence, HS bursts)\n"
" pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n"
2026-04-09 11:03:35 +01:00
" reg — DSIM PHY timing register snapshot from running device\n"
f"\n{config_section}\n"
2026-04-09 10:29:53 +01:00
f"{flicker_section}\n"
2026-04-08 12:55:34 +01:00
f"{body}\n\n"
"Please:\n"
"1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n"
2026-04-09 08:45:57 +01:00
"2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, 1.8 V droop, etc.).\n"
2026-04-08 12:55:34 +01:00
"3. Flag anomalies — missing LP transitions, short LP-low, unexpected burst counts.\n"
2026-04-09 08:45:57 +01:00
"4. Correlate 1.8 V supply droop/ripple with MIPI LP anomalies — does droop depth or ripple "
" correlate with SoT timing violations, short LP-low plateaux, or LP-11 voltage drops? "
" If pwr data is absent, note that supply correlation could not be assessed.\n"
"5. For any ERROR or WARNING lines in the summaries, explain the most likely cause "
" (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n"
"6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n"
2026-04-09 10:29:53 +01:00
"7. Summarise overall signal health and flicker risk in 23 sentences."
2026-04-08 12:55:34 +01:00
)
2026-04-09 10:29:53 +01:00
def save_html_report(analysis: str, token_line: str, keys: list,
flicker_suspects: list = None,
flicker_count: int = 0,
total_sessions: int = 0,
all_reg_dumps: list = None) -> Path:
2026-04-08 15:42:51 +01:00
"""Write a timestamped HTML report to the reports/ directory."""
REPORTS_DIR.mkdir(exist_ok=True)
now = datetime.now()
filename = now.strftime("%Y%m%d_%H%M%S_analysis.html")
path = REPORTS_DIR / filename
cap_range = (
f"Capture {keys[0][1]:04d}"
if len(keys) == 1
else f"Captures {keys[0][1]:04d}{keys[-1][1]:04d}"
)
date_str = now.strftime("%Y-%m-%d %H:%M:%S")
# Convert plain text analysis to basic HTML (preserve line breaks, bold **)
def text_to_html(text: str) -> str:
escaped = html.escape(text)
# **bold**
import re
escaped = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', escaped)
# Blank lines → paragraph breaks
paragraphs = re.split(r'\n{2,}', escaped)
parts = []
for para in paragraphs:
lines = para.strip().splitlines()
if not lines:
continue
# Numbered or bullet list
if lines[0].lstrip().startswith(('1.', '2.', '3.', '-', '*')):
items = ''.join(f'<li>{l.lstrip("0123456789.-* ")}</li>' for l in lines if l.strip())
tag = 'ol' if lines[0].lstrip()[0].isdigit() else 'ul'
parts.append(f'<{tag}>{items}</{tag}>')
else:
parts.append('<p>' + '<br>'.join(lines) + '</p>')
return '\n'.join(parts)
body_html = text_to_html(analysis)
2026-04-09 10:29:53 +01:00
flicker_banner = ""
if flicker_suspects and flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
rate_str = f"{flicker_count} of {total_sessions} display load sessions ({rate_pct:.0f}%)"
rows = "".join(
f"<tr><td>{m.capture_num:04d}</td><td>{m.timestamp}</td><td>{m.channel}</td>"
f"<td style='color:red'>{m.lp_low_duration_ns} ns</td>"
f"<td>{m.lp11_to_hs_ns} ns</td><td>{m.lp11_voltage_v} V</td></tr>"
for m in flicker_suspects
)
flicker_banner = f"""
<div style="background:#fff3cd;border:2px solid #e65100;border-radius:6px;
padding:16px 20px;margin-bottom:28px;">
<h2 style="color:#e65100;margin-top:0">&#9888; FLICKER DETECTED &mdash; {rate_str} flickered</h2>
<p>Each flagged capture is a genuine flicker event (not an artifact) the LP pass fires at
pipeline startup, so a missing or sub-50&nbsp;ns LP-low plateau means the SN65DSI83 bridge
missed the SoT sequence and dropped a frame.<br>
LP-low plateau &lt; 50&nbsp;ns means the LP-01/LP-00 SoT states are absent or too brief
for the SN65DSI83 bridge to detect start-of-transmission.</p>
<table>
<tr><th>Capture</th><th>Timestamp</th><th>Channel</th>
<th>LP-low plateau</th><th>LP exit&rarr;HS</th><th>LP-11 voltage</th></tr>
{rows}
</table>
</div>"""
# --- Register table (collapsible) ---
reg_section = ""
if all_reg_dumps:
# Collect all unique addresses in order they first appear
addr_order = []
addr_names = {}
for rd in all_reg_dumps:
for r in rd.registers:
if r["address"] not in addr_names:
addr_order.append(r["address"])
addr_names[r["address"]] = r.get("name", "")
if addr_order:
header_cells = "".join(
f"<th>{html.escape(addr)}<br><small>{html.escape(addr_names[addr])}</small></th>"
for addr in addr_order
)
rows_html = ""
for rd in all_reg_dumps:
reg_map = {r["address"]: r["value"] for r in rd.registers}
cells = "".join(
f"<td>{html.escape(reg_map.get(addr, ''))}</td>"
for addr in addr_order
)
rows_html += f"<tr><td>{rd.capture_num:04d}</td><td>{rd.timestamp}</td>{cells}</tr>"
reg_section = f"""
<details style="margin-bottom:24px;">
<summary style="cursor:pointer;font-weight:bold;color:#1a3a5c;font-size:1.05em;">
DSI Register Snapshots ({len(all_reg_dumps)} captures)
</summary>
<div style="overflow-x:auto;margin-top:8px;">
<table>
<tr><th>Capture</th><th>Timestamp</th>{header_cells}</tr>
{rows_html}
</table>
</div>
</details>"""
2026-04-08 15:42:51 +01:00
html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>MIPI Analysis {cap_range}</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 900px; margin: 40px auto; padding: 0 20px; color: #222; }}
h1 {{ color: #1a3a5c; border-bottom: 2px solid #1a3a5c; padding-bottom: 8px; }}
.meta {{ color: #555; font-size: 0.95em; margin-top: -8px; margin-bottom: 24px; }}
p {{ line-height: 1.6; }}
ol, ul {{ line-height: 1.8; padding-left: 24px; }}
li {{ margin: 4px 0; }}
.tokens {{ color: #888; font-size: 0.8em; margin-top: 32px; border-top: 1px solid #ddd; padding-top: 8px; }}
2026-04-09 10:29:53 +01:00
.flicker-alert {{ background: #fff3cd; border: 2px solid #e65100; border-radius: 6px;
padding: 16px 20px; margin-bottom: 28px; }}
.flicker-alert h2 {{ color: #e65100; margin-top: 0; }}
.flicker-alert table {{ border-collapse: collapse; width: 100%; margin-top: 10px; }}
.flicker-alert th {{ background: #e65100; color: white; padding: 6px 10px; text-align: left; }}
.flicker-alert td {{ border: 1px solid #ccc; padding: 5px 10px; }}
table {{ border-collapse: collapse; width: 100%; }}
th {{ background: #1a3a5c; color: white; padding: 6px 10px; text-align: left; }}
td {{ border: 1px solid #ddd; padding: 5px 10px; }}
2026-04-08 15:42:51 +01:00
@media print {{ body {{ margin: 20px; }} }}
</style>
</head>
<body>
<h1>MIPI D-PHY Analysis Report</h1>
2026-04-09 10:29:53 +01:00
{flicker_banner}
{reg_section}
2026-04-08 15:42:51 +01:00
<p class="meta">
<strong>Generated:</strong> {date_str} &nbsp;|&nbsp;
<strong>Scope:</strong> {cap_range} &nbsp;|&nbsp;
<strong>Model:</strong> {CLAUDE_MODEL}
</p>
{body_html}
<p class="tokens">{html.escape(token_line)}</p>
</body>
</html>
"""
path.write_text(html_content, encoding="utf-8")
return path
2026-04-08 12:55:34 +01:00
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
2026-04-08 14:19:31 +01:00
def run_analysis(last: int = 10) -> None:
"""
Called by mgmt_worker after each file transfer.
Analyses the most recent `last` captures and prints the Claude report.
"""
groups = group_captures(DATA_DIR)
if not groups:
print("[ANALYSIS] No captures found.")
return
keys = sorted(groups.keys())[-last:]
print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...")
all_summaries: list[str] = []
2026-04-09 10:29:53 +01:00
all_flicker_suspects: list[LPMetrics] = []
all_reg_dumps: list[RegDump] = []
2026-04-08 14:19:31 +01:00
for ts, num in keys:
2026-04-09 10:29:53 +01:00
summary_text, _, suspects, reg_dumps = process_capture(ts, num, groups[(ts, num)])
2026-04-08 14:19:31 +01:00
all_summaries.append(summary_text)
2026-04-09 10:29:53 +01:00
all_flicker_suspects.extend(suspects)
all_reg_dumps.extend(reg_dumps)
2026-04-08 14:19:31 +01:00
2026-04-09 10:29:53 +01:00
flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
2026-04-08 14:19:31 +01:00
print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...")
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
2026-04-09 10:29:53 +01:00
max_tokens = 4096,
2026-04-08 14:19:31 +01:00
system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}],
)
2026-04-08 15:42:51 +01:00
analysis = message.content[0].text
2026-04-08 14:19:31 +01:00
token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out"
# ── Console ───────────────────────────────────────────────────────────
separator = "=" * 60
print(f"\n{separator}")
print("CLAUDE ANALYSIS")
print(separator)
print(analysis)
print(f"({token_line})")
print(separator + "\n")
2026-04-08 15:42:51 +01:00
# ── HTML report ───────────────────────────────────────────────────────
2026-04-09 10:29:53 +01:00
report_path = save_html_report(analysis, token_line, keys,
all_flicker_suspects, flicker_count, total_sessions,
all_reg_dumps)
2026-04-08 15:42:51 +01:00
print(f"[ANALYSIS] Report saved to {report_path}")
2026-04-09 10:29:53 +01:00
if flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
print(f"[ANALYSIS] *** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
f"({rate_pct:.0f}%) — logged to {FLICKER_LOG} ***")
2026-04-08 14:19:31 +01:00
2026-04-08 12:55:34 +01:00
def main() -> None:
parser = argparse.ArgumentParser(description="Analyse MIPI CSV captures with Claude")
parser.add_argument("--last", type=int, default=None, metavar="N",
help="Process only the N most recent captures")
parser.add_argument("--capture", type=str, default=None, metavar="NUM",
help="Process a single capture number (e.g. 0001)")
parser.add_argument("--verbose", action="store_true",
help="Print per-file summaries to stdout")
parser.add_argument("--dry-run", action="store_true",
help="Print summaries and prompt but do not call Claude API")
args = parser.parse_args()
# --- Discover and filter captures ---
groups = group_captures(DATA_DIR)
if not groups:
print(f"No CSV files found in {DATA_DIR}", file=sys.stderr)
sys.exit(1)
keys = sorted(groups.keys()) # sorted by (timestamp, capture_num)
if args.capture is not None:
target_num = int(args.capture)
keys = [k for k in keys if k[1] == target_num]
if not keys:
print(f"Capture {args.capture} not found.", file=sys.stderr)
sys.exit(1)
if args.last is not None:
keys = keys[-args.last:]
print(f"Processing {len(keys)} capture(s) from {DATA_DIR}\n")
# --- Run pre-processor ---
all_summaries: list[str] = []
2026-04-09 10:29:53 +01:00
all_flicker_suspects: list[LPMetrics] = []
all_reg_dumps: list[RegDump] = []
2026-04-08 12:55:34 +01:00
for ts, num in keys:
2026-04-09 10:29:53 +01:00
summary_text, _, suspects, reg_dumps = process_capture(
ts, num, groups[(ts, num)], verbose=args.verbose)
2026-04-08 12:55:34 +01:00
all_summaries.append(summary_text)
2026-04-09 10:29:53 +01:00
all_flicker_suspects.extend(suspects)
all_reg_dumps.extend(reg_dumps)
2026-04-08 12:55:34 +01:00
if not args.verbose:
2026-04-09 10:29:53 +01:00
flag = " *** FLICKER SUSPECT ***" if suspects else ""
print(f" Processed capture {num:04d} {ts}{flag}")
2026-04-08 12:55:34 +01:00
# --- Build Claude prompt ---
2026-04-09 10:29:53 +01:00
flicker_count, total_sessions = _classify_flicker(keys, all_flicker_suspects)
prompt = build_prompt(all_summaries, all_flicker_suspects, flicker_count, total_sessions)
2026-04-08 12:55:34 +01:00
if args.dry_run:
print("\n--- Prompt that would be sent to Claude ---")
print(prompt)
return
# --- Call Claude API ---
print(f"\nSending {len(prompt):,} characters to {CLAUDE_MODEL}...\n")
client = anthropic.Anthropic()
message = client.messages.create(
model = CLAUDE_MODEL,
2026-04-09 10:29:53 +01:00
max_tokens = 4096,
2026-04-08 12:55:34 +01:00
system = SYSTEM_PROMPT,
messages = [{"role": "user", "content": prompt}],
)
2026-04-08 14:19:31 +01:00
analysis = message.content[0].text
token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out"
separator = "=" * 60
2026-04-08 12:55:34 +01:00
2026-04-08 14:19:31 +01:00
# Console
print(f"\n{separator}\nCLAUDE ANALYSIS\n{separator}")
2026-04-08 12:55:34 +01:00
print(analysis)
2026-04-08 14:19:31 +01:00
print(f"({token_line})")
print(separator)
2026-04-08 15:42:51 +01:00
# HTML report
2026-04-09 10:29:53 +01:00
report_path = save_html_report(analysis, token_line, keys,
all_flicker_suspects, flicker_count, total_sessions,
all_reg_dumps)
2026-04-08 15:42:51 +01:00
print(f"\nReport saved to {report_path}")
2026-04-09 10:29:53 +01:00
if flicker_count > 0:
rate_pct = 100 * flicker_count / total_sessions if total_sessions else 0
print(f"*** FLICKER DETECTED — {flicker_count}/{total_sessions} sessions "
f"({rate_pct:.0f}%) — see {FLICKER_LOG} ***")
2026-04-08 12:55:34 +01:00
if __name__ == "__main__":
main()