Files
MiPi_TEST/unlock_capture.py
2026-05-15 16:32:15 +01:00

456 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
unlock_capture.py — capture 1V8 rail + MIPI CLK every time the SN65 reports
a PLL unlock.
Architecture
------------
- Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions.
- On each unlock, immediately:
1. :STOP the Rigol DS1202Z-E and read CH1 (1V8 rail).
Rigol runs with a 120 ms window (10 ms/div × 12) so the rail trace
brackets the ~20 ms unlock.
2. Read 100 segmented MIPI captures from the Keysight DSO80204B.
Each segment is 20 µs of CLK+ and DAT0+. Spread across the recent
~seconds — *most segments will not land in the unlock instant*, but
collectively they prove the MIPI signal stays clean around unlocks.
3. Restart both scopes for the next event.
- Press `g` to capture a baseline pair manually (for clean comparison).
- Press `c` to capture a catastrophic-event snapshot — for when you observe
the black-screen failure (which doesn't manifest as a PLL unlock and so
isn't automatically captured).
- Press `q` to quit.
Pairs nicely with `video_cycler.py --hold` (continuous video, no cycling)
*or* `video_cycler.py` (with cycling) to provoke unlocks more often.
Output layout:
data/unlock_captures/{session_ts}/
unlock_0001_{ts}_rail.csv
unlock_0001_{ts}_mipi_seg001_clk.csv ... seg100_dat.csv
unlock_0001_{ts}_meta.json
...
summary.csv
"""
from __future__ import annotations
import argparse
import csv
import json
import select
import signal
import sys
import termios
import time
import tty
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
RIGOL_IP = "192.168.45.5"
KEYSIGHT_IP = "192.168.45.4"
DATA_ROOT = Path(__file__).parent / "data" / "unlock_captures"
POLL_DT_S = 0.020 # 50 Hz SN65 polling
HTTP_TO_S = 0.2
RIGOL_TO_S = 10.0
KEYSIGHT_TO_S = 30.0
# Rigol CH1 settings — wider window catches a burst of flickers in one trace
RIGOL_V_SCALE = 0.1 # V/div
RIGOL_V_OFFSET = -1.8 # V
RIGOL_TIMEBASE = 500e-3 # s/div → 6 s window
RIGOL_PROBE = 10
# Keysight LP_DAT segmented capture
KS_LP_SCALE = 1e-6
KS_LP_POINTS = 50_000
KS_LP_TRIG_OFFSET = 9e-6
KS_LP_V_SCALE = 0.2
KS_LP_V_OFFSET = 0.6
KS_LP_TRIG_LEVEL = 0.6
KS_SEGMENT_COUNT = 20 # ~2 s capture cycle (was 100 → ~10 s)
KS_PROBE = 19.2
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
# ---------------------------------------------------------------------------
# Non-blocking keys
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# SN65 extraction
# ---------------------------------------------------------------------------
def extract_state(data: dict | None) -> dict:
regs = (data or {}).get("registers", {}) or {}
csr_0a = regs.get("csr_0a") or {}
csr_e5 = regs.get("csr_e5") or {}
state = {
"csr_0a": csr_0a.get("value"),
"csr_e5": csr_e5.get("value"),
"pll_lock": csr_0a.get("pll_lock"),
"clk_det": csr_0a.get("clk_det"),
}
for k in ERROR_BITS:
state[k] = csr_e5.get(k)
return state
# ---------------------------------------------------------------------------
# Rigol I/O
# ---------------------------------------------------------------------------
def setup_rigol(rigol) -> None:
rigol.write(":STOP"); time.sleep(0.2)
rigol.write(":CHANnel1:DISPlay 1")
rigol.write(":CHANnel1:COUPling DC")
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
rigol.write(":CHANnel2:DISPlay 0")
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
rigol.write(":TRIGger:MODE EDGE")
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
rigol.write(":TRIGger:EDGe:LEVel 1.76")
rigol.write(":TRIGger:SWEep AUTO")
rigol.write(":ACQuire:MDEPth AUTO")
time.sleep(0.3); rigol.write(":RUN"); time.sleep(0.2)
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
rigol.write(":STOP"); time.sleep(0.1)
rigol.write(":WAVeform:SOURce CHANnel1")
rigol.write(":WAVeform:FORMat ASC")
rigol.write(":WAVeform:MODE NORM")
time.sleep(0.05)
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
xinc = float(pre[4]); xorig = float(pre[5])
raw = rigol.ask(":WAVeform:DATA?").strip()
if raw.startswith("#"):
ndig = int(raw[1])
raw = raw[2 + ndig:]
vals = [float(v) for v in raw.split(",") if v.strip()]
if not vals:
rigol.write(":RUN")
raise RuntimeError("Rigol returned no samples")
volts = np.asarray(vals, dtype=np.float64)
t = np.arange(len(volts)) * xinc + xorig
np.savetxt(out_path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
rigol.write(":RUN")
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
# ---------------------------------------------------------------------------
# Keysight I/O (mirrors trial_runner.py)
# ---------------------------------------------------------------------------
def _ks_drain(scope):
for _ in range(20):
try:
r = scope.ask(":SYSTem:ERRor?").strip()
except Exception:
return
if not r or r.startswith(("0,", "+0,")) or r == "0":
return
def setup_keysight(scope) -> None:
for c in [
"*RST", ":RUN", ":STOP", "*CLS",
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
":TIMebase:REFerence CENTer",
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
]:
scope.write(c); time.sleep(0.04)
_ks_drain(scope)
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
scope.write(":TRIGger:MODE EDGE")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
scope.write(":ACQuire:MODE SEGMented")
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
time.sleep(0.4)
_ks_drain(scope)
def _ks_read_block(scope) -> bytes:
head = scope.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = scope.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = scope.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = scope.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
scope.read_raw(1)
except Exception:
pass
return data
def keysight_capture(scope, out_dir: Path, base: str) -> int:
""":DIGitize → read all segments → save CSVs. Returns segments written."""
prev = scope.timeout
try:
scope.timeout = KEYSIGHT_TO_S
scope.write(":DIGitize")
if scope.ask("*OPC?").strip() != "1":
return 0
except Exception as e:
print(f" keysight arm/wait failed: {e}")
return 0
finally:
scope.timeout = prev
n_written = 0
for chan_id, label in [(1, "clk"), (3, "dat")]:
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
scope.write(":WAVeform:FORMat WORD")
scope.write(":WAVeform:BYTeorder LSBFirst")
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
x_org = float(scope.ask(":WAVeform:XORigin?"))
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
y_org = float(scope.ask(":WAVeform:YORigin?"))
for i in range(1, KS_SEGMENT_COUNT + 1):
scope.write(f":ACQuire:SEGMented:INDex {i}")
scope.write(":WAVeform:DATA?")
raw = _ks_read_block(scope)
codes = np.frombuffer(raw, dtype="<i2")
volts = codes.astype(np.float64) * y_inc + y_org
t = np.arange(len(volts)) * x_inc + x_org
path = out_dir / f"{base}_seg{i:03d}_{label}.csv"
np.savetxt(path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
if label == "clk":
n_written += 1
return n_written
# ---------------------------------------------------------------------------
# Per-event capture handler
# ---------------------------------------------------------------------------
def handle_event(event_label: str, event_num: int, session_dir: Path,
rigol, scope, summary_writer, last_state: dict) -> None:
"""One unlock or baseline capture: Rigol + Keysight + meta JSON."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
iso = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S.%f")[:-3]
base = f"{event_label}_{event_num:04d}_{ts}"
# 1. Rigol — fast (~100-300 ms)
rail_path = session_dir / f"{base}_rail.csv"
vpp_mV = mean_V = None
try:
vpp_mV, mean_V = capture_rail(rigol, rail_path)
except Exception as e:
print(f" rail capture FAILED: {e}", flush=True)
rail_path = None
# 2. Keysight — slow (~5-15 s for 100 segs)
n_segs = 0
if scope is not None:
try:
n_segs = keysight_capture(scope, session_dir, f"{base}_mipi")
except Exception as e:
print(f" keysight capture FAILED: {e}", flush=True)
# 3. Meta
meta = {
"event": event_label,
"event_num": event_num,
"ts": ts,
"iso": iso,
"last_pll_state": last_state,
"rail_csv": rail_path.name if rail_path else None,
"rail_vpp_mV": vpp_mV,
"rail_mean_V": mean_V,
"n_mipi_segments": n_segs,
"mipi_basename": f"{base}_mipi" if n_segs else None,
}
meta_path = session_dir / f"{base}_meta.json"
meta_path.write_text(json.dumps(meta, indent=2, default=str))
rail_str = (f"Vpp={vpp_mV:.1f}mV mean={mean_V:.3f}V"
if vpp_mV is not None else "RAIL FAILED")
print(f" [{iso}] {event_label.upper():<8} #{event_num:04d} "
f"{rail_str} MIPI={n_segs}segs", flush=True)
summary_writer.writerow([event_num, ts, iso, event_label,
f"{vpp_mV:.1f}" if vpp_mV is not None else "",
f"{mean_V:.3f}" if mean_V is not None else "",
n_segs, base])
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--no-keysight", action="store_true",
help="Rigol only (skip MIPI capture per event)")
args = ap.parse_args()
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = DATA_ROOT / session_ts
session_dir.mkdir(parents=True, exist_ok=True)
print(f"UNLOCK CAPTURE — session {session_ts}")
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
# Connect SN65 endpoint
sess = requests.Session()
try:
sess.get(SN65_EP, timeout=2.0).raise_for_status()
print(f" SN65: reachable")
except Exception as e:
print(f" *** SN65 endpoint failed: {e} ***")
sys.exit(1)
# Connect + configure Rigol
rigol = vxi11.Instrument(RIGOL_IP)
rigol.timeout = RIGOL_TO_S
try:
print(f" Rigol: {rigol.ask('*IDN?').strip()}")
setup_rigol(rigol)
except Exception as e:
print(f" *** Rigol failed: {e} ***")
sys.exit(1)
# Connect + configure Keysight
scope = None
if not args.no_keysight:
scope = vxi11.Instrument(KEYSIGHT_IP)
scope.timeout = KEYSIGHT_TO_S
try:
print(f" Keysight: {scope.ask('*IDN?').strip()}")
setup_keysight(scope)
except Exception as e:
print(f" Keysight failed ({e}) — continuing without MIPI capture")
scope = None
summary_path = session_dir / "summary.csv"
sf = open(summary_path, "w", newline="")
sw = csv.writer(sf)
sw.writerow(["event_num", "ts", "iso", "event_label",
"rail_vpp_mV", "rail_mean_V", "n_mipi_segs", "basename"])
sf.flush()
def _shutdown(*_):
print("\nshutting down")
try: rigol.write(":RUN")
except Exception: pass
try: sf.close()
except Exception: pass
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print("\n Capturing 1V8 rail + MIPI segments on every PLL unlock.")
print(" Run video_cycler.py in another terminal to provoke unlocks.")
print(" keys: g=baseline c=catastrophic-event observed q=quit\n")
print(f" {'time':<14} {'event':<15} {'rail':<28} {'mipi':<10}")
print(f" {'-'*14} {'-'*15} {'-'*28} {'-'*10}")
last_pll = None
last_state = {}
unlock_n = 0
baseline_n = 0
catastrophic_n = 0
err_count = 0
with KeyReader() as keys:
while True:
t0 = time.time()
pll = None
try:
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
last_state = extract_state(r.json())
pll = last_state["pll_lock"]
err_count = 0
except Exception:
err_count += 1
if last_pll is True and pll is False:
unlock_n += 1
handle_event("unlock", unlock_n, session_dir,
rigol, scope, sw, last_state)
sf.flush()
if pll is not None:
last_pll = pll
key = keys.get_key()
if key == "g":
baseline_n += 1
handle_event("baseline", baseline_n, session_dir,
rigol, scope, sw, last_state)
sf.flush()
elif key == "c":
catastrophic_n += 1
print(f"\n *** CATASTROPHIC EVENT OBSERVED — "
f"capturing scopes ***", flush=True)
handle_event("catastrophic", catastrophic_n, session_dir,
rigol, scope, sw, last_state)
sf.flush()
elif key == "q":
_shutdown()
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
if __name__ == "__main__":
main()