456 lines
16 KiB
Python
456 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
unlock_capture.py — capture 1V8 rail + MIPI CLK every time the SN65 reports
|
||
a PLL unlock.
|
||
|
||
Architecture
|
||
------------
|
||
- Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions.
|
||
- On each unlock, immediately:
|
||
1. :STOP the Rigol DS1202Z-E and read CH1 (1V8 rail).
|
||
Rigol runs with a 120 ms window (10 ms/div × 12) so the rail trace
|
||
brackets the ~20 ms unlock.
|
||
2. Read 100 segmented MIPI captures from the Keysight DSO80204B.
|
||
Each segment is 20 µs of CLK+ and DAT0+. Spread across the recent
|
||
~seconds — *most segments will not land in the unlock instant*, but
|
||
collectively they prove the MIPI signal stays clean around unlocks.
|
||
3. Restart both scopes for the next event.
|
||
- Press `g` to capture a baseline pair manually (for clean comparison).
|
||
- Press `c` to capture a catastrophic-event snapshot — for when you observe
|
||
the black-screen failure (which doesn't manifest as a PLL unlock and so
|
||
isn't automatically captured).
|
||
- Press `q` to quit.
|
||
|
||
Pairs nicely with `video_cycler.py --hold` (continuous video, no cycling)
|
||
*or* `video_cycler.py` (with cycling) to provoke unlocks more often.
|
||
|
||
Output layout:
|
||
data/unlock_captures/{session_ts}/
|
||
unlock_0001_{ts}_rail.csv
|
||
unlock_0001_{ts}_mipi_seg001_clk.csv ... seg100_dat.csv
|
||
unlock_0001_{ts}_meta.json
|
||
...
|
||
summary.csv
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import select
|
||
import signal
|
||
import sys
|
||
import termios
|
||
import time
|
||
import tty
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import requests
|
||
import vxi11
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
||
RIGOL_IP = "192.168.45.5"
|
||
KEYSIGHT_IP = "192.168.45.4"
|
||
DATA_ROOT = Path(__file__).parent / "data" / "unlock_captures"
|
||
|
||
POLL_DT_S = 0.020 # 50 Hz SN65 polling
|
||
HTTP_TO_S = 0.2
|
||
RIGOL_TO_S = 10.0
|
||
KEYSIGHT_TO_S = 30.0
|
||
|
||
# Rigol CH1 settings — wider window catches a burst of flickers in one trace
|
||
RIGOL_V_SCALE = 0.1 # V/div
|
||
RIGOL_V_OFFSET = -1.8 # V
|
||
RIGOL_TIMEBASE = 500e-3 # s/div → 6 s window
|
||
RIGOL_PROBE = 10
|
||
|
||
# Keysight LP_DAT segmented capture
|
||
KS_LP_SCALE = 1e-6
|
||
KS_LP_POINTS = 50_000
|
||
KS_LP_TRIG_OFFSET = 9e-6
|
||
KS_LP_V_SCALE = 0.2
|
||
KS_LP_V_OFFSET = 0.6
|
||
KS_LP_TRIG_LEVEL = 0.6
|
||
KS_SEGMENT_COUNT = 20 # ~2 s capture cycle (was 100 → ~10 s)
|
||
KS_PROBE = 19.2
|
||
|
||
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
|
||
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Non-blocking keys
|
||
# ---------------------------------------------------------------------------
|
||
class KeyReader:
|
||
def __enter__(self):
|
||
self.fd = sys.stdin.fileno()
|
||
self.old = termios.tcgetattr(self.fd)
|
||
tty.setcbreak(self.fd)
|
||
return self
|
||
|
||
def get_key(self) -> str | None:
|
||
if select.select([sys.stdin], [], [], 0)[0]:
|
||
return sys.stdin.read(1).lower()
|
||
return None
|
||
|
||
def __exit__(self, *_):
|
||
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SN65 extraction
|
||
# ---------------------------------------------------------------------------
|
||
def extract_state(data: dict | None) -> dict:
|
||
regs = (data or {}).get("registers", {}) or {}
|
||
csr_0a = regs.get("csr_0a") or {}
|
||
csr_e5 = regs.get("csr_e5") or {}
|
||
state = {
|
||
"csr_0a": csr_0a.get("value"),
|
||
"csr_e5": csr_e5.get("value"),
|
||
"pll_lock": csr_0a.get("pll_lock"),
|
||
"clk_det": csr_0a.get("clk_det"),
|
||
}
|
||
for k in ERROR_BITS:
|
||
state[k] = csr_e5.get(k)
|
||
return state
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rigol I/O
|
||
# ---------------------------------------------------------------------------
|
||
def setup_rigol(rigol) -> None:
|
||
rigol.write(":STOP"); time.sleep(0.2)
|
||
rigol.write(":CHANnel1:DISPlay 1")
|
||
rigol.write(":CHANnel1:COUPling DC")
|
||
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
|
||
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
|
||
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
|
||
rigol.write(":CHANnel2:DISPlay 0")
|
||
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
|
||
rigol.write(":TRIGger:MODE EDGE")
|
||
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
|
||
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
|
||
rigol.write(":TRIGger:EDGe:LEVel 1.76")
|
||
rigol.write(":TRIGger:SWEep AUTO")
|
||
rigol.write(":ACQuire:MDEPth AUTO")
|
||
time.sleep(0.3); rigol.write(":RUN"); time.sleep(0.2)
|
||
|
||
|
||
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
|
||
rigol.write(":STOP"); time.sleep(0.1)
|
||
rigol.write(":WAVeform:SOURce CHANnel1")
|
||
rigol.write(":WAVeform:FORMat ASC")
|
||
rigol.write(":WAVeform:MODE NORM")
|
||
time.sleep(0.05)
|
||
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
|
||
xinc = float(pre[4]); xorig = float(pre[5])
|
||
raw = rigol.ask(":WAVeform:DATA?").strip()
|
||
if raw.startswith("#"):
|
||
ndig = int(raw[1])
|
||
raw = raw[2 + ndig:]
|
||
vals = [float(v) for v in raw.split(",") if v.strip()]
|
||
if not vals:
|
||
rigol.write(":RUN")
|
||
raise RuntimeError("Rigol returned no samples")
|
||
volts = np.asarray(vals, dtype=np.float64)
|
||
t = np.arange(len(volts)) * xinc + xorig
|
||
np.savetxt(out_path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
rigol.write(":RUN")
|
||
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Keysight I/O (mirrors trial_runner.py)
|
||
# ---------------------------------------------------------------------------
|
||
def _ks_drain(scope):
|
||
for _ in range(20):
|
||
try:
|
||
r = scope.ask(":SYSTem:ERRor?").strip()
|
||
except Exception:
|
||
return
|
||
if not r or r.startswith(("0,", "+0,")) or r == "0":
|
||
return
|
||
|
||
|
||
def setup_keysight(scope) -> None:
|
||
for c in [
|
||
"*RST", ":RUN", ":STOP", "*CLS",
|
||
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
|
||
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
|
||
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
|
||
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
|
||
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
|
||
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
|
||
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
|
||
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
|
||
":TIMebase:REFerence CENTer",
|
||
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
|
||
]:
|
||
scope.write(c); time.sleep(0.04)
|
||
_ks_drain(scope)
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
|
||
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
|
||
scope.write(":TRIGger:MODE EDGE")
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
|
||
scope.write(":TRIGger:SWEep NORMal")
|
||
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
|
||
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
|
||
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
|
||
scope.write(":ACQuire:MODE SEGMented")
|
||
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
|
||
time.sleep(0.4)
|
||
_ks_drain(scope)
|
||
|
||
|
||
def _ks_read_block(scope) -> bytes:
|
||
head = scope.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = scope.read_raw(64)
|
||
head += extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
length_bytes = scope.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = scope.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
try:
|
||
scope.read_raw(1)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def keysight_capture(scope, out_dir: Path, base: str) -> int:
|
||
""":DIGitize → read all segments → save CSVs. Returns segments written."""
|
||
prev = scope.timeout
|
||
try:
|
||
scope.timeout = KEYSIGHT_TO_S
|
||
scope.write(":DIGitize")
|
||
if scope.ask("*OPC?").strip() != "1":
|
||
return 0
|
||
except Exception as e:
|
||
print(f" keysight arm/wait failed: {e}")
|
||
return 0
|
||
finally:
|
||
scope.timeout = prev
|
||
|
||
n_written = 0
|
||
for chan_id, label in [(1, "clk"), (3, "dat")]:
|
||
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
|
||
scope.write(":WAVeform:FORMat WORD")
|
||
scope.write(":WAVeform:BYTeorder LSBFirst")
|
||
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
|
||
x_org = float(scope.ask(":WAVeform:XORigin?"))
|
||
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
|
||
y_org = float(scope.ask(":WAVeform:YORigin?"))
|
||
for i in range(1, KS_SEGMENT_COUNT + 1):
|
||
scope.write(f":ACQuire:SEGMented:INDex {i}")
|
||
scope.write(":WAVeform:DATA?")
|
||
raw = _ks_read_block(scope)
|
||
codes = np.frombuffer(raw, dtype="<i2")
|
||
volts = codes.astype(np.float64) * y_inc + y_org
|
||
t = np.arange(len(volts)) * x_inc + x_org
|
||
path = out_dir / f"{base}_seg{i:03d}_{label}.csv"
|
||
np.savetxt(path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
if label == "clk":
|
||
n_written += 1
|
||
return n_written
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-event capture handler
|
||
# ---------------------------------------------------------------------------
|
||
def handle_event(event_label: str, event_num: int, session_dir: Path,
|
||
rigol, scope, summary_writer, last_state: dict) -> None:
|
||
"""One unlock or baseline capture: Rigol + Keysight + meta JSON."""
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
|
||
iso = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S.%f")[:-3]
|
||
base = f"{event_label}_{event_num:04d}_{ts}"
|
||
|
||
# 1. Rigol — fast (~100-300 ms)
|
||
rail_path = session_dir / f"{base}_rail.csv"
|
||
vpp_mV = mean_V = None
|
||
try:
|
||
vpp_mV, mean_V = capture_rail(rigol, rail_path)
|
||
except Exception as e:
|
||
print(f" rail capture FAILED: {e}", flush=True)
|
||
rail_path = None
|
||
|
||
# 2. Keysight — slow (~5-15 s for 100 segs)
|
||
n_segs = 0
|
||
if scope is not None:
|
||
try:
|
||
n_segs = keysight_capture(scope, session_dir, f"{base}_mipi")
|
||
except Exception as e:
|
||
print(f" keysight capture FAILED: {e}", flush=True)
|
||
|
||
# 3. Meta
|
||
meta = {
|
||
"event": event_label,
|
||
"event_num": event_num,
|
||
"ts": ts,
|
||
"iso": iso,
|
||
"last_pll_state": last_state,
|
||
"rail_csv": rail_path.name if rail_path else None,
|
||
"rail_vpp_mV": vpp_mV,
|
||
"rail_mean_V": mean_V,
|
||
"n_mipi_segments": n_segs,
|
||
"mipi_basename": f"{base}_mipi" if n_segs else None,
|
||
}
|
||
meta_path = session_dir / f"{base}_meta.json"
|
||
meta_path.write_text(json.dumps(meta, indent=2, default=str))
|
||
|
||
rail_str = (f"Vpp={vpp_mV:.1f}mV mean={mean_V:.3f}V"
|
||
if vpp_mV is not None else "RAIL FAILED")
|
||
print(f" [{iso}] {event_label.upper():<8} #{event_num:04d} "
|
||
f"{rail_str} MIPI={n_segs}segs", flush=True)
|
||
|
||
summary_writer.writerow([event_num, ts, iso, event_label,
|
||
f"{vpp_mV:.1f}" if vpp_mV is not None else "",
|
||
f"{mean_V:.3f}" if mean_V is not None else "",
|
||
n_segs, base])
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument("--no-keysight", action="store_true",
|
||
help="Rigol only (skip MIPI capture per event)")
|
||
args = ap.parse_args()
|
||
|
||
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
session_dir = DATA_ROOT / session_ts
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
print(f"UNLOCK CAPTURE — session {session_ts}")
|
||
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
|
||
|
||
# Connect SN65 endpoint
|
||
sess = requests.Session()
|
||
try:
|
||
sess.get(SN65_EP, timeout=2.0).raise_for_status()
|
||
print(f" SN65: reachable")
|
||
except Exception as e:
|
||
print(f" *** SN65 endpoint failed: {e} ***")
|
||
sys.exit(1)
|
||
|
||
# Connect + configure Rigol
|
||
rigol = vxi11.Instrument(RIGOL_IP)
|
||
rigol.timeout = RIGOL_TO_S
|
||
try:
|
||
print(f" Rigol: {rigol.ask('*IDN?').strip()}")
|
||
setup_rigol(rigol)
|
||
except Exception as e:
|
||
print(f" *** Rigol failed: {e} ***")
|
||
sys.exit(1)
|
||
|
||
# Connect + configure Keysight
|
||
scope = None
|
||
if not args.no_keysight:
|
||
scope = vxi11.Instrument(KEYSIGHT_IP)
|
||
scope.timeout = KEYSIGHT_TO_S
|
||
try:
|
||
print(f" Keysight: {scope.ask('*IDN?').strip()}")
|
||
setup_keysight(scope)
|
||
except Exception as e:
|
||
print(f" Keysight failed ({e}) — continuing without MIPI capture")
|
||
scope = None
|
||
|
||
summary_path = session_dir / "summary.csv"
|
||
sf = open(summary_path, "w", newline="")
|
||
sw = csv.writer(sf)
|
||
sw.writerow(["event_num", "ts", "iso", "event_label",
|
||
"rail_vpp_mV", "rail_mean_V", "n_mipi_segs", "basename"])
|
||
sf.flush()
|
||
|
||
def _shutdown(*_):
|
||
print("\nshutting down")
|
||
try: rigol.write(":RUN")
|
||
except Exception: pass
|
||
try: sf.close()
|
||
except Exception: pass
|
||
sys.exit(0)
|
||
signal.signal(signal.SIGINT, _shutdown)
|
||
signal.signal(signal.SIGTERM, _shutdown)
|
||
|
||
print("\n Capturing 1V8 rail + MIPI segments on every PLL unlock.")
|
||
print(" Run video_cycler.py in another terminal to provoke unlocks.")
|
||
print(" keys: g=baseline c=catastrophic-event observed q=quit\n")
|
||
print(f" {'time':<14} {'event':<15} {'rail':<28} {'mipi':<10}")
|
||
print(f" {'-'*14} {'-'*15} {'-'*28} {'-'*10}")
|
||
|
||
last_pll = None
|
||
last_state = {}
|
||
unlock_n = 0
|
||
baseline_n = 0
|
||
catastrophic_n = 0
|
||
err_count = 0
|
||
|
||
with KeyReader() as keys:
|
||
while True:
|
||
t0 = time.time()
|
||
pll = None
|
||
try:
|
||
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
|
||
r.raise_for_status()
|
||
last_state = extract_state(r.json())
|
||
pll = last_state["pll_lock"]
|
||
err_count = 0
|
||
except Exception:
|
||
err_count += 1
|
||
|
||
if last_pll is True and pll is False:
|
||
unlock_n += 1
|
||
handle_event("unlock", unlock_n, session_dir,
|
||
rigol, scope, sw, last_state)
|
||
sf.flush()
|
||
|
||
if pll is not None:
|
||
last_pll = pll
|
||
|
||
key = keys.get_key()
|
||
if key == "g":
|
||
baseline_n += 1
|
||
handle_event("baseline", baseline_n, session_dir,
|
||
rigol, scope, sw, last_state)
|
||
sf.flush()
|
||
elif key == "c":
|
||
catastrophic_n += 1
|
||
print(f"\n *** CATASTROPHIC EVENT OBSERVED — "
|
||
f"capturing scopes ***", flush=True)
|
||
handle_event("catastrophic", catastrophic_n, session_dir,
|
||
rigol, scope, sw, last_state)
|
||
sf.flush()
|
||
elif key == "q":
|
||
_shutdown()
|
||
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|