#!/usr/bin/env python3 """ rail_watch.py — Capture Rigol DS1202Z-E CH1 (1V8 supply rail) every time the SN65DSI83 reports a MIPI PLL unlock. Architecture ------------ - Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions. - On each unlock, :STOPs the Rigol, reads CH1 waveform via :WAV:DATA?, saves to CSV in data/rail_traces/, prints peak-to-peak ripple, then :RUNs again. - Press `g` to capture a baseline (clean) trace. Press `q` to quit. Rigol setup (do once on the front panel before running): * Channel 1 probed on the 1V8 rail derived to the MIPI PHY * DC coupling with offset, or AC coupling for ripple-only view * Recommended: 20 mV/div, 5–10 ms/div (60–120 ms window) * Trigger: AUTO on Channel 1 so the buffer is always recent * Memory depth: 12M (or whatever fits the timebase) * :RUN the scope so it's continuously acquiring """ from __future__ import annotations import argparse import select import signal import sys import termios import time import tty from datetime import datetime from pathlib import Path import numpy as np import requests import vxi11 # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DEVICE_BASE = "http://192.168.45.8:5000" SN65_EP = f"{DEVICE_BASE}/sn65_registers" RIGOL_IP = "192.168.45.5" DATA_DIR = Path(__file__).parent / "data" / "rail_traces" POLL_DT_S = 0.020 # 50 Hz target — coarser than sn65_monitor HTTP_TO_S = 0.2 RIGOL_TO_S = 10.0 # --------------------------------------------------------------------------- # Rigol I/O # --------------------------------------------------------------------------- def _read_ieee_block(rigol) -> bytes: """Read an IEEE 488.2 binary block from the scope: '#'[\\n].""" head = rigol.read_raw(2) if not head.startswith(b"#"): idx = head.find(b"#") if idx < 0: extra = rigol.read_raw(64) head += extra idx = head.find(b"#") head = head[idx:idx + 2] ndigits = int(head[1:2]) length_bytes = rigol.read_raw(ndigits) nbytes = int(length_bytes) data = b"" while len(data) < nbytes: chunk = rigol.read_raw(nbytes - len(data)) if not chunk: break data += chunk try: rigol.read_raw(1) # trailing newline (may not be present) except Exception: pass return data def capture_trace(rigol, label: str) -> tuple[Path, float, float]: """ :STOP → read CH1 → :RUN. Returns (csv_path, vpp_mV, mean_V). """ rigol.write(":STOP") time.sleep(0.06) rigol.write(":WAVeform:SOURce CHANnel1") rigol.write(":WAVeform:FORMat BYTE") rigol.write(":WAVeform:MODE NORM") time.sleep(0.02) preamble = rigol.ask(":WAVeform:PREamble?").strip().split(",") # format,type,points,count,xinc,xorig,xref,yinc,yorig,yref xinc = float(preamble[4]); xorig = float(preamble[5]) yinc = float(preamble[7]); yorig = float(preamble[8]) yref = float(preamble[9]) rigol.write(":WAVeform:DATA?") raw = _read_ieee_block(rigol) codes = np.frombuffer(raw, dtype=np.uint8) volts = (codes.astype(np.float64) - yref - yorig) * yinc t = np.arange(len(volts)) * xinc + xorig DATA_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] csv_path = DATA_DIR / f"{ts}_{label}.csv" np.savetxt(csv_path, np.column_stack([t, volts]), delimiter=",", fmt="%.6e") rigol.write(":RUN") vpp_mV = float((volts.max() - volts.min()) * 1000) mean_V = float(volts.mean()) return csv_path, vpp_mV, mean_V # --------------------------------------------------------------------------- # SN65 state extraction # --------------------------------------------------------------------------- def pll_state(data: dict | None): if not isinstance(data, dict): return None regs = data.get("registers", {}) if not isinstance(regs, dict): return None csr_0a = regs.get("csr_0a") or {} return csr_0a.get("pll_lock") # --------------------------------------------------------------------------- # Non-blocking keys # --------------------------------------------------------------------------- class KeyReader: def __enter__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) tty.setcbreak(self.fd) return self def get_key(self) -> str | None: if select.select([sys.stdin], [], [], 0)[0]: return sys.stdin.read(1).lower() return None def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--test", action="store_true", help="Take one immediate trace + exit (verifies Rigol comms)") args = ap.parse_args() DATA_DIR.mkdir(parents=True, exist_ok=True) sess = requests.Session() print(f"RAIL WATCH") print(f" sn65 endpoint: {SN65_EP}") print(f" Rigol IP: {RIGOL_IP}") print(f" Output dir: {DATA_DIR.relative_to(DATA_DIR.parent.parent)}") try: rigol = vxi11.Instrument(RIGOL_IP) rigol.timeout = RIGOL_TO_S idn = rigol.ask("*IDN?").strip() print(f" Rigol IDN: {idn}") except Exception as e: print(f" *** RIGOL CONNECTION FAILED: {e} ***") sys.exit(1) if args.test: print("\n--test: taking one capture now...") try: path, vpp, mean = capture_trace(rigol, "test") print(f" saved {path.name}") print(f" Vpp = {vpp:.1f} mV mean = {mean:.3f} V") except Exception as e: print(f" CAPTURE FAILED: {e}") sys.exit(0) def _shutdown(*_): try: rigol.write(":RUN") except Exception: pass print("\nstopped — Rigol restored to RUN") sys.exit(0) signal.signal(signal.SIGINT, _shutdown) signal.signal(signal.SIGTERM, _shutdown) print("\nkeys: g=baseline capture q=quit\n", flush=True) print(f" {'time':<14} {'event':<12} {'file':<40} {'Vpp':>7} {'mean':>7}") print(f" {'-'*14} {'-'*12} {'-'*40} {'-'*7} {'-'*7}") last_pll = None unlock_count = 0 baseline_count = 0 err_count = 0 with KeyReader() as keys: while True: t0 = time.time() try: r = sess.get(SN65_EP, timeout=HTTP_TO_S) r.raise_for_status() pll = pll_state(r.json()) err_count = 0 except Exception: pll = None err_count += 1 # Trigger Rigol on True → False (a real unlock). We ignore the # True → None case (transient I2C read failure) since it isn't # a PLL state change. if last_pll is True and pll is False: unlock_count += 1 iso = datetime.now().strftime("%H:%M:%S.%f")[:-3] try: path, vpp, mean = capture_trace( rigol, f"unlock_{unlock_count:04d}") print(f" {iso:<14} {'UNLOCK':<12} " f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V", flush=True) except Exception as e: print(f" {iso:<14} UNLOCK CAPTURE FAILED: {e}", flush=True) last_pll = pll if pll is not None else last_pll # Manual baseline capture key = keys.get_key() if key == "g": baseline_count += 1 iso = datetime.now().strftime("%H:%M:%S.%f")[:-3] try: path, vpp, mean = capture_trace( rigol, f"baseline_{baseline_count:04d}") print(f" {iso:<14} {'BASELINE':<12} " f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V", flush=True) except Exception as e: print(f" {iso:<14} BASELINE CAPTURE FAILED: {e}", flush=True) elif key == "q": _shutdown() # Pace elapsed = time.time() - t0 if elapsed < POLL_DT_S: time.sleep(POLL_DT_S - elapsed) if __name__ == "__main__": main()