#!/usr/bin/env python3 """ unlock_capture.py — capture 1V8 rail + MIPI CLK every time the SN65 reports a PLL unlock. Architecture ------------ - Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions. - On each unlock, immediately: 1. :STOP the Rigol DS1202Z-E and read CH1 (1V8 rail). Rigol runs with a 120 ms window (10 ms/div × 12) so the rail trace brackets the ~20 ms unlock. 2. Read 100 segmented MIPI captures from the Keysight DSO80204B. Each segment is 20 µs of CLK+ and DAT0+. Spread across the recent ~seconds — *most segments will not land in the unlock instant*, but collectively they prove the MIPI signal stays clean around unlocks. 3. Restart both scopes for the next event. - Press `g` to capture a baseline pair manually (for clean comparison). - Press `c` to capture a catastrophic-event snapshot — for when you observe the black-screen failure (which doesn't manifest as a PLL unlock and so isn't automatically captured). - Press `q` to quit. Pairs nicely with `video_cycler.py --hold` (continuous video, no cycling) *or* `video_cycler.py` (with cycling) to provoke unlocks more often. Output layout: data/unlock_captures/{session_ts}/ unlock_0001_{ts}_rail.csv unlock_0001_{ts}_mipi_seg001_clk.csv ... seg100_dat.csv unlock_0001_{ts}_meta.json ... summary.csv """ from __future__ import annotations import argparse import csv import json import select import signal import sys import termios import time import tty from datetime import datetime from pathlib import Path import numpy as np import requests import vxi11 # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- DEVICE_BASE = "http://192.168.45.8:5000" SN65_EP = f"{DEVICE_BASE}/sn65_registers" RIGOL_IP = "192.168.45.5" KEYSIGHT_IP = "192.168.45.4" DATA_ROOT = Path(__file__).parent / "data" / "unlock_captures" POLL_DT_S = 0.020 # 50 Hz SN65 polling HTTP_TO_S = 0.2 RIGOL_TO_S = 10.0 KEYSIGHT_TO_S = 30.0 # Rigol CH1 settings — wider window catches a burst of flickers in one trace RIGOL_V_SCALE = 0.1 # V/div RIGOL_V_OFFSET = -1.8 # V RIGOL_TIMEBASE = 500e-3 # s/div → 6 s window RIGOL_PROBE = 10 # Keysight LP_DAT segmented capture KS_LP_SCALE = 1e-6 KS_LP_POINTS = 50_000 KS_LP_TRIG_OFFSET = 9e-6 KS_LP_V_SCALE = 0.2 KS_LP_V_OFFSET = 0.6 KS_LP_TRIG_LEVEL = 0.6 KS_SEGMENT_COUNT = 20 # ~2 s capture cycle (was 100 → ~10 s) KS_PROBE = 19.2 ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err", "cha_ecc_err", "cha_lp_err", "cha_crc_err") # --------------------------------------------------------------------------- # Non-blocking keys # --------------------------------------------------------------------------- class KeyReader: def __enter__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) tty.setcbreak(self.fd) return self def get_key(self) -> str | None: if select.select([sys.stdin], [], [], 0)[0]: return sys.stdin.read(1).lower() return None def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) # --------------------------------------------------------------------------- # SN65 extraction # --------------------------------------------------------------------------- def extract_state(data: dict | None) -> dict: regs = (data or {}).get("registers", {}) or {} csr_0a = regs.get("csr_0a") or {} csr_e5 = regs.get("csr_e5") or {} state = { "csr_0a": csr_0a.get("value"), "csr_e5": csr_e5.get("value"), "pll_lock": csr_0a.get("pll_lock"), "clk_det": csr_0a.get("clk_det"), } for k in ERROR_BITS: state[k] = csr_e5.get(k) return state # --------------------------------------------------------------------------- # Rigol I/O # --------------------------------------------------------------------------- def setup_rigol(rigol) -> None: rigol.write(":STOP"); time.sleep(0.2) rigol.write(":CHANnel1:DISPlay 1") rigol.write(":CHANnel1:COUPling DC") rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}") rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}") rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}") rigol.write(":CHANnel2:DISPlay 0") rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}") rigol.write(":TRIGger:MODE EDGE") rigol.write(":TRIGger:EDGe:SOURce CHANnel1") rigol.write(":TRIGger:EDGe:SLOPe NEGative") rigol.write(":TRIGger:EDGe:LEVel 1.76") rigol.write(":TRIGger:SWEep AUTO") rigol.write(":ACQuire:MDEPth AUTO") time.sleep(0.3); rigol.write(":RUN"); time.sleep(0.2) def capture_rail(rigol, out_path: Path) -> tuple[float, float]: rigol.write(":STOP"); time.sleep(0.1) rigol.write(":WAVeform:SOURce CHANnel1") rigol.write(":WAVeform:FORMat ASC") rigol.write(":WAVeform:MODE NORM") time.sleep(0.05) pre = rigol.ask(":WAVeform:PREamble?").strip().split(",") xinc = float(pre[4]); xorig = float(pre[5]) raw = rigol.ask(":WAVeform:DATA?").strip() if raw.startswith("#"): ndig = int(raw[1]) raw = raw[2 + ndig:] vals = [float(v) for v in raw.split(",") if v.strip()] if not vals: rigol.write(":RUN") raise RuntimeError("Rigol returned no samples") volts = np.asarray(vals, dtype=np.float64) t = np.arange(len(volts)) * xinc + xorig np.savetxt(out_path, np.column_stack([t, volts]), delimiter=",", fmt="%.6e") rigol.write(":RUN") return float((volts.max() - volts.min()) * 1000), float(volts.mean()) # --------------------------------------------------------------------------- # Keysight I/O (mirrors trial_runner.py) # --------------------------------------------------------------------------- def _ks_drain(scope): for _ in range(20): try: r = scope.ask(":SYSTem:ERRor?").strip() except Exception: return if not r or r.startswith(("0,", "+0,")) or r == "0": return def setup_keysight(scope) -> None: for c in [ "*RST", ":RUN", ":STOP", "*CLS", ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'", ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'", ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'", ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'", ":TIMebase:REFerence CENTer", ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", ]: scope.write(c); time.sleep(0.04) _ks_drain(scope) for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}") scope.write(":TRIGger:MODE EDGE") scope.write(":TRIGger:EDGE:SOURce CHANnel3") scope.write(":TRIGger:EDGE:SLOPe NEGative") scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}") scope.write(":TRIGger:SWEep NORMal") scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}") scope.write(f":ACQuire:POINts {KS_LP_POINTS}") scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}") scope.write(":ACQuire:MODE SEGMented") scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}") time.sleep(0.4) _ks_drain(scope) def _ks_read_block(scope) -> bytes: head = scope.read_raw(2) if not head.startswith(b"#"): idx = head.find(b"#") if idx < 0: extra = scope.read_raw(64) head += extra idx = head.find(b"#") head = head[idx:idx + 2] ndigits = int(head[1:2]) length_bytes = scope.read_raw(ndigits) nbytes = int(length_bytes) data = b"" while len(data) < nbytes: chunk = scope.read_raw(nbytes - len(data)) if not chunk: break data += chunk try: scope.read_raw(1) except Exception: pass return data def keysight_capture(scope, out_dir: Path, base: str) -> int: """:DIGitize → read all segments → save CSVs. Returns segments written.""" prev = scope.timeout try: scope.timeout = KEYSIGHT_TO_S scope.write(":DIGitize") if scope.ask("*OPC?").strip() != "1": return 0 except Exception as e: print(f" keysight arm/wait failed: {e}") return 0 finally: scope.timeout = prev n_written = 0 for chan_id, label in [(1, "clk"), (3, "dat")]: scope.write(f":WAVeform:SOURce CHANnel{chan_id}") scope.write(":WAVeform:FORMat WORD") scope.write(":WAVeform:BYTeorder LSBFirst") x_inc = float(scope.ask(":WAVeform:XINCrement?")) x_org = float(scope.ask(":WAVeform:XORigin?")) y_inc = float(scope.ask(":WAVeform:YINCrement?")) y_org = float(scope.ask(":WAVeform:YORigin?")) for i in range(1, KS_SEGMENT_COUNT + 1): scope.write(f":ACQuire:SEGMented:INDex {i}") scope.write(":WAVeform:DATA?") raw = _ks_read_block(scope) codes = np.frombuffer(raw, dtype=" None: """One unlock or baseline capture: Rigol + Keysight + meta JSON.""" ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] iso = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S.%f")[:-3] base = f"{event_label}_{event_num:04d}_{ts}" # 1. Rigol — fast (~100-300 ms) rail_path = session_dir / f"{base}_rail.csv" vpp_mV = mean_V = None try: vpp_mV, mean_V = capture_rail(rigol, rail_path) except Exception as e: print(f" rail capture FAILED: {e}", flush=True) rail_path = None # 2. Keysight — slow (~5-15 s for 100 segs) n_segs = 0 if scope is not None: try: n_segs = keysight_capture(scope, session_dir, f"{base}_mipi") except Exception as e: print(f" keysight capture FAILED: {e}", flush=True) # 3. Meta meta = { "event": event_label, "event_num": event_num, "ts": ts, "iso": iso, "last_pll_state": last_state, "rail_csv": rail_path.name if rail_path else None, "rail_vpp_mV": vpp_mV, "rail_mean_V": mean_V, "n_mipi_segments": n_segs, "mipi_basename": f"{base}_mipi" if n_segs else None, } meta_path = session_dir / f"{base}_meta.json" meta_path.write_text(json.dumps(meta, indent=2, default=str)) rail_str = (f"Vpp={vpp_mV:.1f}mV mean={mean_V:.3f}V" if vpp_mV is not None else "RAIL FAILED") print(f" [{iso}] {event_label.upper():<8} #{event_num:04d} " f"{rail_str} MIPI={n_segs}segs", flush=True) summary_writer.writerow([event_num, ts, iso, event_label, f"{vpp_mV:.1f}" if vpp_mV is not None else "", f"{mean_V:.3f}" if mean_V is not None else "", n_segs, base]) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--no-keysight", action="store_true", help="Rigol only (skip MIPI capture per event)") args = ap.parse_args() session_ts = datetime.now().strftime("%Y%m%d_%H%M%S") session_dir = DATA_ROOT / session_ts session_dir.mkdir(parents=True, exist_ok=True) print(f"UNLOCK CAPTURE — session {session_ts}") print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}") # Connect SN65 endpoint sess = requests.Session() try: sess.get(SN65_EP, timeout=2.0).raise_for_status() print(f" SN65: reachable") except Exception as e: print(f" *** SN65 endpoint failed: {e} ***") sys.exit(1) # Connect + configure Rigol rigol = vxi11.Instrument(RIGOL_IP) rigol.timeout = RIGOL_TO_S try: print(f" Rigol: {rigol.ask('*IDN?').strip()}") setup_rigol(rigol) except Exception as e: print(f" *** Rigol failed: {e} ***") sys.exit(1) # Connect + configure Keysight scope = None if not args.no_keysight: scope = vxi11.Instrument(KEYSIGHT_IP) scope.timeout = KEYSIGHT_TO_S try: print(f" Keysight: {scope.ask('*IDN?').strip()}") setup_keysight(scope) except Exception as e: print(f" Keysight failed ({e}) — continuing without MIPI capture") scope = None summary_path = session_dir / "summary.csv" sf = open(summary_path, "w", newline="") sw = csv.writer(sf) sw.writerow(["event_num", "ts", "iso", "event_label", "rail_vpp_mV", "rail_mean_V", "n_mipi_segs", "basename"]) sf.flush() def _shutdown(*_): print("\nshutting down") try: rigol.write(":RUN") except Exception: pass try: sf.close() except Exception: pass sys.exit(0) signal.signal(signal.SIGINT, _shutdown) signal.signal(signal.SIGTERM, _shutdown) print("\n Capturing 1V8 rail + MIPI segments on every PLL unlock.") print(" Run video_cycler.py in another terminal to provoke unlocks.") print(" keys: g=baseline c=catastrophic-event observed q=quit\n") print(f" {'time':<14} {'event':<15} {'rail':<28} {'mipi':<10}") print(f" {'-'*14} {'-'*15} {'-'*28} {'-'*10}") last_pll = None last_state = {} unlock_n = 0 baseline_n = 0 catastrophic_n = 0 err_count = 0 with KeyReader() as keys: while True: t0 = time.time() pll = None try: r = sess.get(SN65_EP, timeout=HTTP_TO_S) r.raise_for_status() last_state = extract_state(r.json()) pll = last_state["pll_lock"] err_count = 0 except Exception: err_count += 1 if last_pll is True and pll is False: unlock_n += 1 handle_event("unlock", unlock_n, session_dir, rigol, scope, sw, last_state) sf.flush() if pll is not None: last_pll = pll key = keys.get_key() if key == "g": baseline_n += 1 handle_event("baseline", baseline_n, session_dir, rigol, scope, sw, last_state) sf.flush() elif key == "c": catastrophic_n += 1 print(f"\n *** CATASTROPHIC EVENT OBSERVED — " f"capturing scopes ***", flush=True) handle_event("catastrophic", catastrophic_n, session_dir, rigol, scope, sw, last_state) sf.flush() elif key == "q": _shutdown() elapsed = time.time() - t0 if elapsed < POLL_DT_S: time.sleep(POLL_DT_S - elapsed) if __name__ == "__main__": main()