diff --git a/__pycache__/analyze_captures.cpython-312.pyc b/__pycache__/analyze_captures.cpython-312.pyc index ce869ee..41b95be 100644 Binary files a/__pycache__/analyze_captures.cpython-312.pyc and b/__pycache__/analyze_captures.cpython-312.pyc differ diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc index fff6d4b..38ac48f 100644 Binary files a/__pycache__/csv_preprocessor.cpython-312.pyc and b/__pycache__/csv_preprocessor.cpython-312.pyc differ diff --git a/__pycache__/rigol_scope.cpython-312.pyc b/__pycache__/rigol_scope.cpython-312.pyc new file mode 100644 index 0000000..955fce6 Binary files /dev/null and b/__pycache__/rigol_scope.cpython-312.pyc differ diff --git a/analyze_captures.py b/analyze_captures.py index 12c253a..d688b5a 100644 --- a/analyze_captures.py +++ b/analyze_captures.py @@ -21,7 +21,10 @@ from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") -from csv_preprocessor import analyze_file, analyze_lp_file, group_captures, ChannelMetrics, LPMetrics +from csv_preprocessor import ( + analyze_file, analyze_lp_file, analyze_1v8_file, + group_captures, ChannelMetrics, LPMetrics, V1V8Metrics, +) DATA_DIR = Path(__file__).parent / "data" REPORTS_DIR = Path(__file__).parent / "reports" @@ -30,9 +33,12 @@ CLAUDE_MODEL = "claude-opus-4-6" SYSTEM_PROMPT = ( "You are an expert in MIPI D-PHY signal integrity analysis. " "You will be given compact pre-processed summaries of oscilloscope captures " - "from a MIPI CLK and DAT0 differential pair. " - "Each capture has three passes: sig (high-res HS quality), proto (long-window HS stats), " - "and lp (single-ended, shows LP-11/LP-00/HS burst structure including the SoT sequence). " + "from a MIPI CLK and DAT0 differential pair, plus 1.8 V supply rail measurements. " + "The MIPI PHY (NXP i.MX 8M Mini) drives LP states from the 1.8 V VDDIO. " + "Each capture has up to four data sets: " + "sig (high-res HS quality), proto (long-window HS stats), " + "lp (single-ended LP-11/LP-00/HS burst including SoT sequence), " + "and pwr (1.8 V supply captured during the LP→HS transition). " "Analyse the data for trends, degradation, anomalies, or consistent spec concerns " "across captures. Be concise and actionable." ) @@ -56,13 +62,18 @@ def process_capture( lines = [f"=== Capture {num:04d} {ts} ==="] metrics_list: list[ChannelMetrics | LPMetrics] = [] - for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat"): + for key in ("proto_clk", "proto_dat", "sig_clk", "sig_dat", "lp_clk", "lp_dat", "pwr_1v8"): if key not in files: - lines.append(f" [{key}] MISSING") + if key == "pwr_1v8": + lines.append(f" [{key}] NOT CAPTURED (Rigol not connected or no droop)") + else: + lines.append(f" [{key}] MISSING") continue try: if key.startswith("lp_"): m = analyze_lp_file(files[key]) + elif key == "pwr_1v8": + m = analyze_1v8_file(files[key]) else: m = analyze_file(files[key]) lines.append(m.summary()) @@ -82,16 +93,20 @@ def build_prompt(all_summaries: list[str]) -> str: "Each capture has three passes per lane (CLK and DAT0):\n" " sig — high-res HS differential (rise/fall times)\n" " proto — long-window HS differential (jitter, clock freq, amplitude)\n" - " lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n\n" + " lp — single-ended LP state capture (LP-11 voltage, SoT sequence, HS bursts)\n" + " pwr — 1.8 V supply rail captured during LP→HS transition (droop, ripple, spec)\n\n" f"{body}\n\n" "Please:\n" "1. Identify any consistent spec concerns (HS voltage, LP-11 voltage, LP-low timing).\n" - "2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, etc.).\n" + "2. Highlight any trends over captures (amplitude drift, jitter, LP-11 voltage, 1.8 V droop, etc.).\n" "3. Flag anomalies — missing LP transitions, short LP-low, unexpected burst counts.\n" - "4. For any ERROR or WARNING lines in the summaries, explain the most likely cause " - " (e.g. missing file, bad trigger, signal absent, probe issue) and what to check.\n" - "5. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" - "6. Summarise overall signal health in 2–3 sentences." + "4. Correlate 1.8 V supply droop/ripple with MIPI LP anomalies — does droop depth or ripple " + " correlate with SoT timing violations, short LP-low plateaux, or LP-11 voltage drops? " + " If pwr data is absent, note that supply correlation could not be assessed.\n" + "5. For any ERROR or WARNING lines in the summaries, explain the most likely cause " + " (e.g. missing file, bad trigger, signal absent, probe issue, supply marginal) and what to check.\n" + "6. Provide specific, actionable recommendations to address all identified issues and anomalies.\n" + "7. Summarise overall signal health in 2–3 sentences." ) diff --git a/csv_preprocessor.py b/csv_preprocessor.py index 3511769..60954d3 100644 --- a/csv_preprocessor.py +++ b/csv_preprocessor.py @@ -22,6 +22,13 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Optional +# 1.8 V supply rail spec (i.MX 8M Mini internal regulator, ±5 %) +V18_NOMINAL_V = 1.800 +V18_SPEC_MIN_V = 1.710 # −5 % +V18_SPEC_MAX_V = 1.890 # +5 % +V18_DROOP_WARN_MV = 50.0 # mV droop depth worth flagging +V18_RIPPLE_WARN_MV = 20.0 # mV RMS ripple worth flagging + # MIPI D-PHY HS-TX spec limits HS_VDIFF_MIN_MV = 140.0 # |Vdiff| minimum (mV) HS_VDIFF_MAX_MV = 270.0 # |Vdiff| maximum (mV) @@ -318,6 +325,97 @@ def analyze_file(path: Path) -> ChannelMetrics: ) +@dataclass +class V1V8Metrics: + timestamp: str + capture_num: int + + sample_rate_mhz: float + duration_us: float + n_samples: int + + mean_v: float # mean supply voltage + min_v: float # minimum (worst-case droop) + max_v: float # maximum + droop_mv: float # mean − min (droop depth) + ripple_mv_rms: float # AC ripple (std dev of voltage) + + spec_pass: bool # mean within ±5 % of 1.8 V + droop_pass: bool # minimum above V18_SPEC_MIN_V + + warnings: list = field(default_factory=list) + + def summary(self) -> str: + ok = lambda c: "✓" if c else "✗" + lines = [ + f"Capture {self.capture_num:04d} {self.timestamp} [pwr/1v8]", + f" Mean voltage : {self.mean_v:.4f} V " + f"(spec {V18_SPEC_MIN_V:.2f}–{V18_SPEC_MAX_V:.2f} V) {ok(self.spec_pass)}", + f" Min voltage : {self.min_v:.4f} V {ok(self.droop_pass)}", + f" Droop depth : {self.droop_mv:.1f} mV", + f" Ripple RMS : {self.ripple_mv_rms:.2f} mV", + ] + for w in self.warnings: + lines.append(f" WARNING: {w}") + return "\n".join(lines) + + +def analyze_1v8_file(path: Path) -> "V1V8Metrics": + """Analyse a 1.8 V supply rail CSV captured by the Rigol DS1202Z-E.""" + m = re.match(r"(\d{8}_\d{6})_pwr_(\d+)_1v8\.csv", path.name, re.IGNORECASE) + if not m: + raise ValueError(f"Filename does not match 1v8 pattern: {path.name}") + timestamp, cap_str = m.groups() + capture_num = int(cap_str) + + times, volts = _read_csv(path) + dt = float(np.diff(times).mean()) + sample_rate = 1.0 / dt + duration_us = (float(times[-1]) - float(times[0])) * 1e6 + + mean_v = float(volts.mean()) + min_v = float(volts.min()) + max_v = float(volts.max()) + droop_mv = (mean_v - min_v) * 1000.0 + ripple_mv_rms = float(volts.std()) * 1000.0 + + spec_pass = V18_SPEC_MIN_V <= mean_v <= V18_SPEC_MAX_V + droop_pass = min_v >= V18_SPEC_MIN_V + + warnings = [] + if not spec_pass: + warnings.append( + f"Mean supply {mean_v:.4f} V outside spec " + f"({V18_SPEC_MIN_V:.2f}–{V18_SPEC_MAX_V:.2f} V)" + ) + if not droop_pass: + warnings.append( + f"Supply droops to {min_v:.4f} V — below {V18_SPEC_MIN_V:.2f} V spec min" + ) + if droop_mv > V18_DROOP_WARN_MV: + warnings.append( + f"Droop depth {droop_mv:.1f} mV — possible insufficient decoupling near MIPI PHY" + ) + if ripple_mv_rms > V18_RIPPLE_WARN_MV: + warnings.append(f"Ripple {ripple_mv_rms:.1f} mV RMS is elevated") + + return V1V8Metrics( + timestamp = timestamp, + capture_num = capture_num, + sample_rate_mhz = round(sample_rate / 1e6, 1), + duration_us = round(duration_us, 2), + n_samples = len(times), + mean_v = round(mean_v, 4), + min_v = round(min_v, 4), + max_v = round(max_v, 4), + droop_mv = round(droop_mv, 1), + ripple_mv_rms = round(ripple_mv_rms, 2), + spec_pass = spec_pass, + droop_pass = droop_pass, + warnings = warnings, + ) + + def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: """ Scan data_dir and group CSV files by (timestamp, capture_number). @@ -325,7 +423,9 @@ def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: Example key: ("20260408_111448", 1) Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...} """ - pattern = re.compile(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv", re.IGNORECASE) + pattern = re.compile( + r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE + ) groups: dict[tuple[str, int], dict[str, Path]] = {} for f in sorted(data_dir.glob("*.csv")): m = pattern.match(f.name) diff --git a/mipi_test.py b/mipi_test.py index 0ac84e3..892f041 100644 --- a/mipi_test.py +++ b/mipi_test.py @@ -13,8 +13,10 @@ import sys import requests import threading from datetime import datetime +from pathlib import Path import ai_mgmt import analyze_captures +import rigol_scope # --- Configuration --- URL = "http://192.168.45.8:5000/display" @@ -42,6 +44,7 @@ LP_V_OFFSET = 0.6 # V — center display at 0.6 V (range −0.2 V to 1.4 LP_TRIG_LEVEL = 0.6 # V — midpoint of LP-11 (1.2 V) → LP-01 (0 V) fall DISPLAY_SETTLE_S = 1.0 # seconds to wait after display ON before arming scope +DATA_DIR = Path(__file__).parent / "data" test_running = False # controls both worker threads resume_event = threading.Event() # cleared to pause test_worker, set to resume @@ -57,6 +60,9 @@ except Exception as e: print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}") sys.exit(1) +# Rigol DS1202Z-E for 1.8 V supply monitoring (optional — test continues if unavailable) +rigol_scope.connect() + # --------------------------------------------------------------------------- def setup_scope(): @@ -299,16 +305,37 @@ def dual_capture(iteration): else: print(" SKIPPING PASS 2 SAVE.") - # ── Pass 3: LP / SoT structure ──────────────────────────────────────── + # ── Pass 3: LP / SoT structure + 1.8 V supply monitoring ───────────── # Widens vertical range to capture LP-11 (1.2 V) and falls-edge triggers # on the LP-11 → LP-01 SoT transition. Saves Ch1 and Ch3 single-ended. + # Rigol is armed first (non-blocking) so the LP→HS current step droops + # the 1.8 V rail and triggers the Rigol while the Agilent captures. print(" PASS 3: LP TRANSITION...") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) + + if rigol_scope.is_connected(): + rigol_scope.arm() # arm Rigol before LP trigger so it catches the droop + if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: print(" SKIPPING PASS 3 SAVE.") + + # Collect Rigol 1.8 V waveform (Agilent save takes ~5 s, Rigol should be done) + if rigol_scope.is_connected(): + print(" PASS 3: WAITING FOR RIGOL 1.8 V CAPTURE...") + if rigol_scope.wait_captured(timeout_s=10.0): + DATA_DIR.mkdir(exist_ok=True) + v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" + n = rigol_scope.read_waveform_csv(v18_path) + if n: + print(f" SAVED: {v18_path.name} ({n} samples)") + else: + print(" RIGOL: Waveform read returned 0 samples.") + else: + print(" RIGOL: Timed out waiting for capture.") + _restore_hs_config() # ── Restore original timebase ───────────────────────────────────────── @@ -396,9 +423,15 @@ def main_menu(): if choice == '1': print(f"PSU : {psu.ask('*IDN?').strip()}") print(f"SCOPE: {scope.ask('*IDN?').strip()}") + if rigol_scope.is_connected(): + print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}") + else: + print("RIGOL: NOT CONNECTED") elif choice == '2': setup_scope() + if rigol_scope.is_connected(): + rigol_scope.configure() elif choice == '3': psu.write('CH1:VOLT 24.0') @@ -431,6 +464,7 @@ def main_menu(): test_running = False psu.close() scope.close() + rigol_scope.disconnect() print("INSTRUMENTS CLOSED. BYE.") break diff --git a/rigol_scope.py b/rigol_scope.py new file mode 100644 index 0000000..3e02649 --- /dev/null +++ b/rigol_scope.py @@ -0,0 +1,158 @@ +""" +rigol_scope.py + +Controls the Rigol DS1202Z-E at 192.168.45.5 for 1.8 V supply rail monitoring. +Called from dual_capture() in mipi_test.py during the LP pass. + +The scope is armed (single trigger) just before the Agilent LP capture. +The LP→HS current step droops the 1.8 V rail, triggering the Rigol. +The waveform is then read over SCPI and written directly to the local data/ folder. +""" + +import csv +import time +import vxi11 +from pathlib import Path + +RIGOL_HOST = "192.168.45.5" +V18_SCALE = 0.1 # V/div — 100 mV/div; 10 divs = ±500 mV around 1.8 V +V18_OFFSET = -1.8 # V — shifts zero reference so 1.8 V sits at screen centre +V18_TIMEBASE = 1e-6 # s/div — 1 µs/div = 10 µs total window +V18_TRIG_LEVEL = 1.76 # V — falling-edge trigger on supply droop > 40 mV +TRIG_TIMEOUT_S = 15.0 # s — wait this long for Rigol to capture after arming + +rigol: vxi11.Instrument | None = None + + +# --------------------------------------------------------------------------- +# Connection +# --------------------------------------------------------------------------- + +def connect() -> bool: + global rigol + try: + rigol = vxi11.Instrument(RIGOL_HOST) + rigol.timeout = 10 + idn = rigol.ask("*IDN?").strip() + print(f"[RIGOL] Connected: {idn}") + return True + except Exception as e: + print(f"[RIGOL] Connection failed — 1.8 V monitoring disabled: {e}") + rigol = None + return False + + +def disconnect(): + global rigol + if rigol: + try: + rigol.close() + except Exception: + pass + rigol = None + + +def is_connected() -> bool: + return rigol is not None + + +# --------------------------------------------------------------------------- +# Setup +# --------------------------------------------------------------------------- + +def configure(): + """ + Configure Rigol for 1.8 V supply monitoring. + AUTO trigger sweep: if no droop occurs, scope still captures on timeout + so we always get a supply snapshot even when the rail is healthy. + """ + rigol.write(":STOP") + time.sleep(0.2) + + rigol.write(":CHANnel1:DISPlay 1") + rigol.write(":CHANnel2:DISPlay 0") + rigol.write(":CHANnel1:COUPling DC") + rigol.write(":CHANnel1:PROBe 1") + rigol.write(f":CHANnel1:SCALe {V18_SCALE:.3f}") + rigol.write(f":CHANnel1:OFFSet {V18_OFFSET:.3f}") + rigol.write(f":TIMebase:MAIN:SCALe {V18_TIMEBASE:.2E}") + rigol.write(":TRIGger:MODE EDGE") + rigol.write(":TRIGger:EDGe:SOURce CHANnel1") + rigol.write(":TRIGger:EDGe:SLOPe NEGative") + rigol.write(f":TRIGger:EDGe:LEVel {V18_TRIG_LEVEL:.3f}") + rigol.write(":TRIGger:SWEep AUTO") # auto: captures even without a droop trigger + time.sleep(0.3) + + print(f"[RIGOL] Configured: 1.8 V rail, {int(V18_TIMEBASE*1e6)} µs/div, " + f"trigger <{V18_TRIG_LEVEL} V falling (AUTO sweep)") + + +# --------------------------------------------------------------------------- +# Acquisition +# --------------------------------------------------------------------------- + +def arm(): + """Arm for a single acquisition. Non-blocking — returns immediately.""" + rigol.write(":SINGle") + + +def wait_captured(timeout_s: float = TRIG_TIMEOUT_S) -> bool: + """ + Poll until the scope has completed its single acquisition. + DS1000Z reports STOP when done (triggered or auto-timed-out). + Returns True when ready, False if timeout exceeded. + """ + deadline = time.time() + timeout_s + while time.time() < deadline: + try: + status = rigol.ask(":TRIGger:STATus?").strip().upper() + if status in ("STOP", "TD"): + return True + except Exception: + pass + time.sleep(0.1) + return False + + +def read_waveform_csv(path: Path) -> int: + """ + Read Ch1 waveform from Rigol over SCPI and write to CSV. + The Rigol returns ASCII voltage values; we reconstruct the time axis + from the waveform preamble. + + Returns the number of samples written, or 0 on error. + """ + try: + rigol.write(":WAVeform:SOURce CHANnel1") + rigol.write(":WAVeform:FORMat ASCII") + rigol.write(":WAVeform:MODE NORMal") + + preamble = rigol.ask(":WAVeform:PREamble?").strip().split(",") + # [0]=fmt [1]=type [2]=points [3]=count [4]=x_incr [5]=x_orig [6]=x_ref + # [7]=y_incr [8]=y_orig [9]=y_ref + x_incr = float(preamble[4]) + x_orig = float(preamble[5]) + x_ref = float(preamble[6]) + + raw = rigol.ask(":WAVeform:DATA?").strip() + + # Strip any TMC binary header (#) if present + if raw.startswith("#"): + n_digits = int(raw[1]) + raw = raw[2 + n_digits:] + + vals = [float(v) for v in raw.split(",") if v.strip()] + + path.parent.mkdir(exist_ok=True) + with open(path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["Time (s)", "Voltage (V)"]) + for i, v in enumerate(vals): + t = x_orig + (i - x_ref) * x_incr + writer.writerow([f"{t:.9f}", f"{v:.6f}"]) + + return len(vals) + + except Exception as e: + print(f"[RIGOL] Waveform read error: {e}") + return 0