diff --git a/__pycache__/analyze_captures.cpython-312.pyc b/__pycache__/analyze_captures.cpython-312.pyc new file mode 100644 index 0000000..d915d7f Binary files /dev/null and b/__pycache__/analyze_captures.cpython-312.pyc differ diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc index 3bb0973..fff6d4b 100644 Binary files a/__pycache__/csv_preprocessor.cpython-312.pyc and b/__pycache__/csv_preprocessor.cpython-312.pyc differ diff --git a/__pycache__/mipi_test.cpython-312.pyc b/__pycache__/mipi_test.cpython-312.pyc new file mode 100644 index 0000000..2ff00f6 Binary files /dev/null and b/__pycache__/mipi_test.cpython-312.pyc differ diff --git a/analyze_captures.py b/analyze_captures.py index 75e5bcd..47c7dd1 100644 --- a/analyze_captures.py +++ b/analyze_captures.py @@ -12,13 +12,17 @@ Usage: import argparse import sys +from datetime import datetime from pathlib import Path import anthropic +import requests from csv_preprocessor import analyze_file, analyze_lp_file, group_captures, ChannelMetrics, LPMetrics -DATA_DIR = Path(__file__).parent / "data" +DATA_DIR = Path(__file__).parent / "data" +ANALYSIS_LOG = Path(__file__).parent / "analysis_log.txt" +DISPLAY_URL = "http://192.168.45.8:5000/display" CLAUDE_MODEL = "claude-opus-4-6" SYSTEM_PROMPT = ( @@ -90,6 +94,62 @@ def build_prompt(all_summaries: list[str]) -> str: # Main # --------------------------------------------------------------------------- +def run_analysis(last: int = 10) -> None: + """ + Called by mgmt_worker after each file transfer. + Analyses the most recent `last` captures and prints the Claude report. + """ + groups = group_captures(DATA_DIR) + if not groups: + print("[ANALYSIS] No captures found.") + return + + keys = sorted(groups.keys())[-last:] + print(f"\n[ANALYSIS] Processing {len(keys)} most-recent capture(s)...") + + all_summaries: list[str] = [] + for ts, num in keys: + summary_text, _ = process_capture(ts, num, groups[(ts, num)]) + all_summaries.append(summary_text) + + prompt = build_prompt(all_summaries) + print(f"[ANALYSIS] Sending {len(prompt):,} chars to {CLAUDE_MODEL}...") + + client = anthropic.Anthropic() + message = client.messages.create( + model = CLAUDE_MODEL, + max_tokens = 1024, + system = SYSTEM_PROMPT, + messages = [{"role": "user", "content": prompt}], + ) + analysis = message.content[0].text + token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" + + # ── Console ─────────────────────────────────────────────────────────── + separator = "=" * 60 + print(f"\n{separator}") + print("CLAUDE ANALYSIS") + print(separator) + print(analysis) + print(f"({token_line})") + print(separator + "\n") + + # ── Append to log file ──────────────────────────────────────────────── + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with open(ANALYSIS_LOG, "a", encoding="utf-8") as f: + f.write(f"\n{'='*60}\n{ts} — captures {keys[0][1]:04d}–{keys[-1][1]:04d}\n{'='*60}\n") + f.write(analysis) + f.write(f"\n({token_line})\n") + print(f"[ANALYSIS] Report appended to {ANALYSIS_LOG}") + + # ── Send to display ─────────────────────────────────────────────────── + try: + requests.post(DISPLAY_URL, json={"text": analysis}, timeout=5) + print("[ANALYSIS] Report sent to display.") + except Exception as e: + print(f"[ANALYSIS] Display send failed: {e}") + + def main() -> None: parser = argparse.ArgumentParser(description="Analyse MIPI CSV captures with Claude") parser.add_argument("--last", type=int, default=None, metavar="N", @@ -147,14 +207,30 @@ def main() -> None: system = SYSTEM_PROMPT, messages = [{"role": "user", "content": prompt}], ) - analysis = message.content[0].text + analysis = message.content[0].text + token_line = f"Tokens: {message.usage.input_tokens} in / {message.usage.output_tokens} out" + separator = "=" * 60 + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print("=" * 60) - print("CLAUDE ANALYSIS") - print("=" * 60) + # Console + print(f"\n{separator}\nCLAUDE ANALYSIS\n{separator}") print(analysis) - print() - print(f"(Tokens used: {message.usage.input_tokens} in / {message.usage.output_tokens} out)") + print(f"({token_line})") + print(separator) + + # Log file + with open(ANALYSIS_LOG, "a", encoding="utf-8") as f: + f.write(f"\n{separator}\n{ts}\n{separator}\n") + f.write(analysis) + f.write(f"\n({token_line})\n") + print(f"\nReport appended to {ANALYSIS_LOG}") + + # Display + try: + requests.post(DISPLAY_URL, json={"text": analysis}, timeout=5) + print("Report sent to display.") + except Exception as e: + print(f"Display send failed: {e}") if __name__ == "__main__": diff --git a/csv_preprocessor.py b/csv_preprocessor.py index ee2de8b..3511769 100644 --- a/csv_preprocessor.py +++ b/csv_preprocessor.py @@ -32,7 +32,9 @@ TRANSITION_BAND_MV = 50.0 # |Vdiff| < this is considered a transition, not sett # MIPI D-PHY LP state thresholds (single-ended voltage, after probe compensation) LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (both pins high ~1.2 V) -LP_LOW_V = 0.05 # V — single-ended voltage below this → LP-00 or LP-01 pin low +LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low +# Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset +# The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low. LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) @@ -353,8 +355,9 @@ class LPMetrics: lp11_voltage_v: Optional[float] # mean level in LP-11 region (spec 1.0–1.45 V) lp11_duration_us: Optional[float] # total LP-11 time in capture (pre-trigger) - # LP-low (LP-01 + LP-00 combined — CLK+ = 0 V in both states) - lp_low_duration_ns: Optional[float] # duration between LP-11 end and HS start + # LP exit: gap between LP-11 falling edge and HS oscillation onset + lp11_to_hs_ns: Optional[float] # total LP exit time LP-11→HS (includes LP-01+LP-00) + lp_low_duration_ns: Optional[float] # LP-low plateau duration if a clear plateau was seen # HS bursts detected within the window n_hs_bursts: int @@ -378,12 +381,14 @@ class LPMetrics: ) if self.lp11_duration_us is not None: lines.append(f" LP-11 duration : {self.lp11_duration_us:.2f} µs") - if self.lp_low_duration_ns is not None: - ok_lp = self.lp_low_duration_ns >= LP_LOW_DUR_MIN_NS + if self.lp11_to_hs_ns is not None: + ok_exit = self.lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS lines.append( - f" LP-low duration : {self.lp_low_duration_ns:.0f} ns " - f"(spec ≥{LP_LOW_DUR_MIN_NS:.0f} ns) {ok(ok_lp)}" + f" LP exit → HS : {self.lp11_to_hs_ns:.0f} ns " + f"(spec ≥{LP_LOW_DUR_MIN_NS:.0f} ns) {ok(ok_exit)}" ) + if self.lp_low_duration_ns is not None: + lines.append(f" LP-low plateau : {self.lp_low_duration_ns:.0f} ns") lines.append( f" LP→HS sequence : {'valid ✓' if self.lp_transition_valid else 'NOT DETECTED ✗'}" ) @@ -442,17 +447,11 @@ def analyze_lp_file(path: Path) -> "LPMetrics": sample_rate = 1.0 / dt duration_us = (float(times[-1]) - float(times[0])) * 1e6 - # ── State classification ────────────────────────────────────────────── - # Rolling std over ~1 ns window to detect HS oscillation - window = max(10, int(1e-9 / dt)) - rstd = _rolling_std(volts, window) - - lp11_mask = volts > LP11_HIGH_V - lp_low_mask = (volts < LP_LOW_V) & (rstd < HS_OSC_STD_V) - hs_mask = (~lp11_mask) & (~lp_low_mask) & (rstd >= HS_OSC_STD_V) - - # ── LP-11 region ────────────────────────────────────────────────────── + # ── LP-11 detection ─────────────────────────────────────────────────── + # LP-11 is reliable: voltage is clearly above LP11_HIGH_V (0.8 V). + lp11_mask = volts > LP11_HIGH_V lp11_regions = _find_contiguous_regions(lp11_mask, min_samples=10) + lp11_voltage_v = None lp11_duration_us = None if lp11_regions: @@ -461,42 +460,71 @@ def analyze_lp_file(path: Path) -> "LPMetrics": lp11_duration_us = round( sum((times[e] - times[s]) for s, e in lp11_regions) * 1e6, 3) - # ── LP-low region (between last LP-11 and first HS) ─────────────────── + # ── HS burst detection ──────────────────────────────────────────────── + # On DAT0+ with a uniform-colour display, HS data can look DC (no bit + # transitions), making oscillation-based HS detection unreliable. + # Instead: every non-LP-11 gap between LP-11 regions is treated as an + # HS burst. The first gap starts at the end of the first LP-11 region; + # subsequent gaps are between consecutive LP-11 regions. + lp11_to_hs_ns = None lp_low_duration_ns = None lp_transition_valid = False - - lp_low_regions = _find_contiguous_regions(lp_low_mask, min_samples=5) - hs_regions = _find_contiguous_regions(hs_mask, min_samples=20) - - if lp11_regions and lp_low_regions and hs_regions: - # Find the LP-low gap that sits between the last LP-11 and the first HS burst - last_lp11_end = lp11_regions[-1][1] - first_hs_start = hs_regions[0][0] - bridging = [(s, e) for s, e in lp_low_regions - if s >= last_lp11_end and e <= first_hs_start + int(100e-9 / dt)] - if bridging: - s0, e0 = bridging[0][0], bridging[-1][1] - lp_low_duration_ns = round((times[e0] - times[s0]) * 1e9, 1) - lp_transition_valid = True - - # ── HS burst metrics ────────────────────────────────────────────────── - n_hs_bursts = len(hs_regions) + n_hs_bursts = 0 hs_burst_dur_ns = None hs_amplitude_mv = None - if hs_regions: - durations = [(times[e] - times[s]) * 1e9 for s, e in hs_regions] - hs_burst_dur_ns = round(float(np.mean(durations)), 1) + if len(lp11_regions) >= 1: + # Measure LP-11 → HS exit gap (LP-01 + LP-00 combined) using a rolling + # std: the brief exit transition is the first period of measurable + # oscillation (rolling std > threshold) after LP-11 ends. + window = max(10, int(1e-9 / dt)) + rstd = _rolling_std(volts, window) - # HS single-ended amplitude: peak-to-peak / 2 of the oscillating signal - hs_volts = np.concatenate([volts[s:e] for s, e in hs_regions]) - hs_amplitude_mv = round( - (float(np.percentile(hs_volts, 95)) - float(np.percentile(hs_volts, 5))) / 2 * 1000, 1 - ) + hs_bursts = [] + for i, (lp11_s, lp11_e) in enumerate(lp11_regions): + # Burst ends at start of next LP-11, or at window end + burst_end = lp11_regions[i + 1][0] if i + 1 < len(lp11_regions) else len(times) - 1 + burst_dur_ns = round((times[burst_end] - times[lp11_e]) * 1e9, 1) + hs_bursts.append((lp11_e, burst_end, burst_dur_ns)) + + if hs_bursts: + n_hs_bursts = len(hs_bursts) + hs_burst_dur_ns = round(float(np.mean([d for _, _, d in hs_bursts])), 1) + lp_transition_valid = True + + # LP exit gap: find first rolling-std > threshold after LP-11 ends + s_end = lp11_regions[0][1] + lookahead = min(s_end + int(500e-9 / dt), len(times) - 1) + high_std_idx = np.where(rstd[s_end:lookahead] >= HS_OSC_STD_V)[0] + if len(high_std_idx): + lp11_to_hs_ns = round((times[s_end + high_std_idx[0]] - times[s_end]) * 1e9, 1) + + # LP-low plateau: look for a contiguous region in the exit window + # where voltage < LP_LOW_V and std is low (true LP-01/LP-00 plateau) + lp_low_mask = (volts < LP_LOW_V) & (rstd < HS_OSC_STD_V) + lp_low_regions = _find_contiguous_regions(lp_low_mask, min_samples=5) + exit_window = int(1e-6 / dt) + for lplow_s, lplow_e in lp_low_regions: + if s_end <= lplow_s <= s_end + exit_window: + lp_low_duration_ns = round( + (times[lplow_e] - times[lplow_s]) * 1e9, 1) + break + + # HS single-ended amplitude from the first burst (where data may vary) + if hs_bursts: + s, e, _ = hs_bursts[0] + burst_volts = volts[s:e] + hs_amplitude_mv = round( + (float(np.percentile(burst_volts, 95)) - + float(np.percentile(burst_volts, 5))) / 2 * 1000, 1 + ) # ── Warnings ───────────────────────────────────────────────────────── warnings = [] - if not lp11_regions: + continuous_hs_clk = (not lp11_regions) and (channel == "clk") and (float(volts.max()) < LP11_HIGH_V) + if continuous_hs_clk: + warnings.append("CLK lane is in continuous HS mode — LP states not expected on CLK") + elif not lp11_regions: warnings.append("No LP-11 state detected in capture window") elif lp11_voltage_v is not None: if lp11_voltage_v < LP11_SPEC_MIN_V: @@ -504,16 +532,17 @@ def analyze_lp_file(path: Path) -> "LPMetrics": if lp11_voltage_v > LP11_SPEC_MAX_V: warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V above spec max {LP11_SPEC_MAX_V} V") - if lp_low_duration_ns is not None and lp_low_duration_ns < LP_LOW_DUR_MIN_NS: + if lp11_to_hs_ns is not None and lp11_to_hs_ns < LP_LOW_DUR_MIN_NS: warnings.append( - f"LP-low duration {lp_low_duration_ns:.0f} ns below spec min {LP_LOW_DUR_MIN_NS:.0f} ns" + f"LP exit duration {lp11_to_hs_ns:.0f} ns below spec min {LP_LOW_DUR_MIN_NS:.0f} ns " + f"— LP-01/LP-00 states may be absent or too brief" ) - if not lp_transition_valid: - warnings.append("LP-11 → LP-low → HS transition sequence not detected") - - if n_hs_bursts == 0: - warnings.append("No HS bursts detected after LP transition") + if not continuous_hs_clk: + if not lp_transition_valid: + warnings.append("LP-11 → LP-low → HS transition sequence not detected") + if n_hs_bursts == 0: + warnings.append("No HS bursts detected after LP transition") return LPMetrics( timestamp = timestamp, @@ -524,6 +553,7 @@ def analyze_lp_file(path: Path) -> "LPMetrics": n_samples = len(times), lp11_voltage_v = lp11_voltage_v, lp11_duration_us = lp11_duration_us, + lp11_to_hs_ns = lp11_to_hs_ns, lp_low_duration_ns = lp_low_duration_ns, n_hs_bursts = n_hs_bursts, hs_burst_dur_ns = hs_burst_dur_ns, diff --git a/mipi_test.py b/mipi_test.py index c58dd99..4275ffb 100644 --- a/mipi_test.py +++ b/mipi_test.py @@ -14,6 +14,7 @@ import requests import threading from datetime import datetime import ai_mgmt +import analyze_captures # --- Configuration --- URL = "http://192.168.45.8:5000/display" @@ -251,6 +252,9 @@ def _configure_for_lp(): scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") time.sleep(0.05) + # Trigger on DAT0+ (Ch3) — CLK is continuous HS so it never reaches LP-11 (1.2 V). + # DAT0 has LP-11 between bursts, so Ch3 falling at 0.6 V catches LP-11 → LP-01. + scope.write(":TRIGger:EDGE:SOURce CHANnel3") scope.write(":TRIGger:EDGE:SLOPe NEGative") scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") time.sleep(0.1) @@ -262,6 +266,7 @@ def _restore_hs_config(): scope.write(f":CHANnel{ch}:SCALe 0.1") scope.write(f":CHANnel{ch}:OFFSet 0.0") time.sleep(0.05) + scope.write(":TRIGger:EDGE:SOURce CHANnel1") scope.write(":TRIGger:EDGE:SLOPe POSitive") scope.write(f":TRIGger:EDGE:LEVel 0.05") time.sleep(0.1) @@ -331,8 +336,13 @@ def mgmt_worker(): try: copied, failed = ai_mgmt.transfer_csv_files() print(f"[MGMT] TRANSFERRED {copied} FILE(S) TO DATA FOLDER. {failed} FAILED.") + if copied > 0: + try: + analyze_captures.run_analysis() + except Exception as e: + print(f"[MGMT] ANALYSIS ERROR: {e}") except Exception as e: - print(f"[MGMT] ERROR: {e}") + print(f"[MGMT] TRANSFER ERROR: {e}") finally: resume_event.set() print("[MGMT] RESUMING TEST.\n")