701 lines
26 KiB
Python
701 lines
26 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
trial_runner.py — Controlled single-trial flicker experiment.
|
||
|
||
Each trial is one labelled load/unload cycle:
|
||
|
||
1. start video (PUT /video start, static-pink)
|
||
2. observe for OBSERVE_S seconds
|
||
- poll SN65 PLL state at ~50 Hz, log every state change
|
||
3. snapshot Rigol CH1 (1V8 rail) — one trace per trial
|
||
4. stop video (PUT /video stop)
|
||
5. prompt for label ([f]licker / [g]ood / [s]kip / [q]uit)
|
||
6. save trial JSON + rail CSV with the label
|
||
7. brief pause, then next trial
|
||
|
||
Output layout:
|
||
data/trials/{session_ts}/
|
||
trial_0001_good_{ts}.json
|
||
trial_0001_good_{ts}_rail.csv
|
||
trial_0002_flicker_{ts}.json
|
||
trial_0002_flicker_{ts}_rail.csv
|
||
...
|
||
summary.csv (one row per trial: label, n_unlocks, vpp_mV, mean_V)
|
||
|
||
Prerequisites:
|
||
* Rigol DS1202Z-E at 192.168.45.5, CH1 probed on 1V8 rail
|
||
(script configures channel/timebase/trigger automatically)
|
||
* Keysight DSO80204B at 192.168.45.4 with CH1=CLK+, CH3=DAT0+ (CH2/CH4
|
||
= the complementary differential lines; script configures the rest)
|
||
* SN65 device endpoint at http://192.168.45.8:5000
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import signal
|
||
import sys
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import requests
|
||
import vxi11
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
||
VIDEO_URL = f"{DEVICE_BASE}/video"
|
||
RIGOL_IP = "192.168.45.5"
|
||
KEYSIGHT_IP = "192.168.45.4"
|
||
DATA_ROOT = Path(__file__).parent / "data" / "trials"
|
||
|
||
OBSERVE_S = 10.0 # observe window per trial
|
||
PAUSE_BETWEEN_S = 0.5
|
||
POLL_DT_S = 0.020 # 50 Hz SN65 polling during the observe window
|
||
HTTP_TO_S = 0.2
|
||
RIGOL_TO_S = 10.0
|
||
KEYSIGHT_TO_S = 30.0
|
||
|
||
# ---- Rigol CH1 (1V8 rail) capture settings ---------------------------------
|
||
# 100 mV/div, offset −1.8 V puts 1.8 V at screen centre with ±400 mV headroom.
|
||
# 10 ms/div × 12 div = 120 ms window — comfortably brackets a ~40 ms unlock.
|
||
RIGOL_V_SCALE = 0.1 # V/div
|
||
RIGOL_V_OFFSET = -1.8 # V
|
||
RIGOL_TIMEBASE = 10e-3 # s/div → 120 ms window
|
||
RIGOL_PROBE = 10 # 10× passive probe on 1V8 rail
|
||
|
||
# ---- Keysight LP-mode capture settings (mirrors flicker_watch.py LP_DAT) ---
|
||
KS_LP_SCALE = 1e-6 # 1 µs/div → 20 µs window
|
||
KS_LP_POINTS = 50_000
|
||
KS_LP_TRIG_OFFSET = 9e-6
|
||
KS_LP_V_SCALE = 0.2
|
||
KS_LP_V_OFFSET = 0.6
|
||
KS_LP_TRIG_LEVEL = 0.6
|
||
KS_SEGMENT_COUNT = 100 # segments per :DIGitize
|
||
KS_PROBE = 19.2 # matches existing test rig
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rigol I/O
|
||
# ---------------------------------------------------------------------------
|
||
def _read_ieee_block(rigol) -> bytes:
|
||
head = rigol.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = rigol.read_raw(64)
|
||
head += extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
length_bytes = rigol.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = rigol.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
try:
|
||
rigol.read_raw(1)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def setup_rigol(rigol) -> None:
|
||
"""One-shot SCPI configuration of Rigol CH1 for 1V8 supply rail capture."""
|
||
rigol.write(":STOP"); time.sleep(0.2)
|
||
rigol.write(":CHANnel1:DISPlay 1")
|
||
rigol.write(":CHANnel1:COUPling DC")
|
||
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
|
||
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
|
||
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
|
||
rigol.write(":CHANnel2:DISPlay 0")
|
||
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
|
||
rigol.write(":TRIGger:MODE EDGE")
|
||
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
|
||
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
|
||
rigol.write(":TRIGger:EDGe:LEVel 1.76")
|
||
rigol.write(":TRIGger:SWEep AUTO")
|
||
rigol.write(":ACQuire:MDEPth AUTO")
|
||
time.sleep(0.3)
|
||
rigol.write(":RUN")
|
||
time.sleep(0.2)
|
||
|
||
|
||
_rail_diag_printed = False
|
||
|
||
|
||
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
|
||
""":STOP → read CH1 (ASCII format) → :RUN. Returns (vpp_mV, mean_V).
|
||
|
||
ASCII format returns volts directly — sidesteps the BYTE-format
|
||
YOrigin/YReference unit ambiguity in the Rigol manual. Mirrors the
|
||
proven rigol_scope.py approach used in mipi_test.py.
|
||
"""
|
||
global _rail_diag_printed
|
||
|
||
rigol.write(":STOP")
|
||
time.sleep(0.1)
|
||
rigol.write(":WAVeform:SOURce CHANnel1")
|
||
rigol.write(":WAVeform:FORMat ASC") # Rigol DS1000Z uses ASC not ASCII
|
||
rigol.write(":WAVeform:MODE NORM")
|
||
time.sleep(0.05)
|
||
|
||
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
|
||
xinc = float(pre[4])
|
||
xorig = float(pre[5])
|
||
|
||
raw = rigol.ask(":WAVeform:DATA?").strip()
|
||
# Strip optional IEEE 488.2 binary header '#<ndigits><nbytes>'
|
||
if raw.startswith("#"):
|
||
ndig = int(raw[1])
|
||
raw = raw[2 + ndig:]
|
||
vals = [float(v) for v in raw.split(",") if v.strip()]
|
||
if not vals:
|
||
rigol.write(":RUN")
|
||
raise RuntimeError("Rigol returned no samples (channel disabled?)")
|
||
|
||
volts = np.asarray(vals, dtype=np.float64)
|
||
t = np.arange(len(volts)) * xinc + xorig
|
||
|
||
# One-time diagnostic: dump preamble + raw sample range so we can spot
|
||
# probe / channel-setting issues immediately.
|
||
if not _rail_diag_printed:
|
||
_rail_diag_printed = True
|
||
print(f" [diag] Rigol preamble: pts={pre[2]} xinc={xinc:.2e} "
|
||
f"xorig={xorig:.2e}")
|
||
print(f" [diag] first 5 samples (V): "
|
||
f"{[round(v, 4) for v in volts[:5].tolist()]}")
|
||
print(f" [diag] sample range: "
|
||
f"min={volts.min():.4f} V, max={volts.max():.4f} V, "
|
||
f"n={len(volts)}")
|
||
|
||
np.savetxt(out_path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
rigol.write(":RUN")
|
||
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Keysight DSO80204B (MIPI scope) I/O — mirrors flicker_watch.py LP_DAT mode
|
||
# ---------------------------------------------------------------------------
|
||
def _ks_drain_errors(scope) -> list[str]:
|
||
errs = []
|
||
for _ in range(20):
|
||
try:
|
||
r = scope.ask(":SYSTem:ERRor?").strip()
|
||
except Exception:
|
||
break
|
||
if not r or r.startswith(("0,", "+0,")) or r == "0":
|
||
break
|
||
errs.append(r)
|
||
return errs
|
||
|
||
|
||
def setup_keysight(scope) -> None:
|
||
"""Configure Keysight scope for MIPI LP-mode segmented LP_DAT capture."""
|
||
cmds = [
|
||
"*RST", ":RUN", ":STOP", "*CLS",
|
||
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
|
||
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
|
||
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
|
||
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
|
||
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
|
||
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
|
||
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
|
||
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
|
||
":TIMebase:REFerence CENTer",
|
||
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
|
||
]
|
||
for c in cmds:
|
||
scope.write(c)
|
||
time.sleep(0.04)
|
||
_ks_drain_errors(scope)
|
||
|
||
# LP-mode channel offsets + falling-edge trigger on DAT0+
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
|
||
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
|
||
scope.write(":TRIGger:MODE EDGE")
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
|
||
scope.write(":TRIGger:SWEep NORMal")
|
||
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
|
||
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
|
||
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
|
||
scope.write(":ACQuire:MODE SEGMented")
|
||
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
|
||
time.sleep(0.4)
|
||
_ks_drain_errors(scope)
|
||
|
||
|
||
def _ks_read_block(scope) -> bytes:
|
||
"""IEEE 488.2 binary block: '#'<n><len><data>[\\n]."""
|
||
head = scope.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = scope.read_raw(64)
|
||
head += extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
length_bytes = scope.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = scope.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
try:
|
||
scope.read_raw(1)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def keysight_arm(scope) -> None:
|
||
"""Send :DIGitize. Acquisition runs in scope memory until OPC."""
|
||
scope.write(":DIGitize")
|
||
|
||
|
||
def keysight_wait_done(scope, timeout_s: float) -> bool:
|
||
"""Block until acquisition completes or timeout."""
|
||
prev = scope.timeout
|
||
try:
|
||
scope.timeout = timeout_s + 2
|
||
return scope.ask("*OPC?").strip() == "1"
|
||
except Exception:
|
||
return False
|
||
finally:
|
||
scope.timeout = prev
|
||
|
||
|
||
def keysight_read_segments(scope, n_segments: int):
|
||
"""Read CLK+ (CH1) and DAT0+ (CH3) for all N segments via :WAVeform:DATA?."""
|
||
out = {}
|
||
for chan_id, label in [(1, "clk"), (3, "dat")]:
|
||
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
|
||
scope.write(":WAVeform:FORMat WORD")
|
||
scope.write(":WAVeform:BYTeorder LSBFirst")
|
||
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
|
||
x_org = float(scope.ask(":WAVeform:XORigin?"))
|
||
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
|
||
y_org = float(scope.ask(":WAVeform:YORigin?"))
|
||
segs = []
|
||
for i in range(1, n_segments + 1):
|
||
if n_segments > 1:
|
||
scope.write(f":ACQuire:SEGMented:INDex {i}")
|
||
scope.write(":WAVeform:DATA?")
|
||
raw = _ks_read_block(scope)
|
||
codes = np.frombuffer(raw, dtype="<i2")
|
||
segs.append(codes.astype(np.float64) * y_inc + y_org)
|
||
n = len(segs[0]) if segs else 0
|
||
out[label] = {"times": np.arange(n) * x_inc + x_org, "segs": segs}
|
||
return out
|
||
|
||
|
||
def save_keysight_segments(segments: dict, out_dir: Path, base: str) -> int:
|
||
"""Write per-segment CSVs to out_dir. Returns number of segments written."""
|
||
n_written = 0
|
||
n_segs = len(segments["clk"]["segs"])
|
||
for i in range(n_segs):
|
||
for label in ("clk", "dat"):
|
||
t = segments[label]["times"]
|
||
v = segments[label]["segs"][i]
|
||
path = out_dir / f"{base}_seg{i+1:03d}_{label}.csv"
|
||
np.savetxt(path, np.column_stack([t, v]),
|
||
delimiter=",", fmt="%.6e")
|
||
n_written += 1
|
||
return n_written
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Video + SN65 helpers
|
||
# ---------------------------------------------------------------------------
|
||
def video_start(sess: requests.Session) -> None:
|
||
try:
|
||
sess.put(VIDEO_URL,
|
||
json={"action": "start", "mode": "static-pink"}, timeout=3.0)
|
||
except Exception as e:
|
||
print(f" video START failed: {e}")
|
||
|
||
|
||
def video_stop(sess: requests.Session) -> None:
|
||
try:
|
||
sess.put(VIDEO_URL, json={"action": "stop"}, timeout=3.0)
|
||
except Exception as e:
|
||
print(f" video STOP failed: {e}")
|
||
|
||
|
||
def extract_state(data: dict | None) -> dict:
|
||
regs = (data or {}).get("registers", {}) or {}
|
||
csr_0a = regs.get("csr_0a") or {}
|
||
csr_e5 = regs.get("csr_e5") or {}
|
||
return {
|
||
"csr_0a": csr_0a.get("value"),
|
||
"csr_e5": csr_e5.get("value"),
|
||
"pll_lock": csr_0a.get("pll_lock"),
|
||
"clk_det": csr_0a.get("clk_det"),
|
||
"pll_unlock": csr_e5.get("pll_unlock"),
|
||
"cha_sot_bit_err":csr_e5.get("cha_sot_bit_err"),
|
||
"cha_llp_err": csr_e5.get("cha_llp_err"),
|
||
"cha_ecc_err": csr_e5.get("cha_ecc_err"),
|
||
"cha_lp_err": csr_e5.get("cha_lp_err"),
|
||
"cha_crc_err": csr_e5.get("cha_crc_err"),
|
||
}
|
||
|
||
|
||
def observe_window(sess: requests.Session, duration_s: float) -> tuple[list, list]:
|
||
"""
|
||
Poll SN65 for `duration_s` at POLL_DT_S. Return (all_samples, unlocks).
|
||
|
||
`unlocks` is a list of pll_lock True→False events (timestamps only — paired
|
||
recovery times are stitched in post).
|
||
"""
|
||
samples: list = []
|
||
unlocks: list = []
|
||
last_pll: bool | None = None
|
||
end = time.time() + duration_s
|
||
|
||
while time.time() < end:
|
||
t0 = time.time()
|
||
try:
|
||
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
|
||
r.raise_for_status()
|
||
state = extract_state(r.json())
|
||
samples.append({"ts": t0, "state": state})
|
||
pll = state["pll_lock"]
|
||
if last_pll is True and pll is False:
|
||
unlocks.append({"ts": t0,
|
||
"iso": datetime.fromtimestamp(t0)
|
||
.strftime("%H:%M:%S.%f")[:-3]})
|
||
if pll is not None:
|
||
last_pll = pll
|
||
except Exception as e:
|
||
samples.append({"ts": t0, "error": str(e)})
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
return samples, unlocks
|
||
|
||
|
||
class SN65Poller(threading.Thread):
|
||
"""
|
||
Background SN65 poller — runs for the full duration of a trial
|
||
(video_start … video_stop) so we never have a coverage gap.
|
||
Uses its own requests.Session because requests.Session isn't
|
||
thread-safe for sharing with the main thread's HTTP calls.
|
||
"""
|
||
def __init__(self):
|
||
super().__init__(daemon=True)
|
||
self._sess = requests.Session()
|
||
self._stop_evt = threading.Event() # NOT _stop: Thread uses that
|
||
self._lock = threading.Lock()
|
||
self.samples: list = []
|
||
self.unlocks: list = []
|
||
|
||
def request_stop(self):
|
||
self._stop_evt.set()
|
||
|
||
def snapshot(self) -> tuple[list, list]:
|
||
"""Return shallow copies of (samples, unlocks) so the main thread can
|
||
keep mutating them safely after the poller has stopped."""
|
||
with self._lock:
|
||
return list(self.samples), list(self.unlocks)
|
||
|
||
def run(self):
|
||
last_pll: bool | None = None
|
||
while not self._stop_evt.is_set():
|
||
t0 = time.time()
|
||
try:
|
||
r = self._sess.get(SN65_EP, timeout=HTTP_TO_S)
|
||
r.raise_for_status()
|
||
state = extract_state(r.json())
|
||
pll = state["pll_lock"]
|
||
rec = {"ts": t0, "state": state}
|
||
if last_pll is True and pll is False:
|
||
self.unlocks.append({
|
||
"ts": t0,
|
||
"iso": datetime.fromtimestamp(t0)
|
||
.strftime("%H:%M:%S.%f")[:-3],
|
||
})
|
||
if pll is not None:
|
||
last_pll = pll
|
||
except Exception as e:
|
||
rec = {"ts": t0, "error": str(e)}
|
||
with self._lock:
|
||
self.samples.append(rec)
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
|
||
|
||
def prompt_label(default: str = "g") -> str:
|
||
"""Block until user enters f/g/s/q."""
|
||
while True:
|
||
try:
|
||
ans = input("\n label? [f]licker / [g]ood / [s]kip / [q]uit: "
|
||
).strip().lower()
|
||
except EOFError:
|
||
return "q"
|
||
if ans == "":
|
||
ans = default
|
||
if ans in ("f", "g", "s", "q"):
|
||
return ans
|
||
print(f" not understood ('{ans}') — try again")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument("--observe-s", type=float, default=OBSERVE_S,
|
||
help=f"observe window per trial in seconds (default {OBSERVE_S})")
|
||
ap.add_argument("--no-rigol", action="store_true",
|
||
help="skip Rigol rail capture (useful if scope not connected)")
|
||
ap.add_argument("--no-keysight", action="store_true",
|
||
help="skip Keysight MIPI capture (useful if scope not connected)")
|
||
args = ap.parse_args()
|
||
|
||
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
session_dir = DATA_ROOT / session_ts
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
summary_path = session_dir / "summary.csv"
|
||
|
||
print(f"TRIAL RUNNER — session {session_ts}")
|
||
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
|
||
print(f" observe: {args.observe_s:.1f} s")
|
||
print(f" SN65 endpoint: {SN65_EP}")
|
||
|
||
sess = requests.Session()
|
||
|
||
# Verify SN65 endpoint
|
||
try:
|
||
sess.get(SN65_EP, timeout=2.0).raise_for_status()
|
||
print(f" SN65: reachable")
|
||
except Exception as e:
|
||
print(f" *** SN65 endpoint failed: {e} ***")
|
||
sys.exit(1)
|
||
|
||
rigol = None
|
||
if not args.no_rigol:
|
||
try:
|
||
rigol = vxi11.Instrument(RIGOL_IP)
|
||
rigol.timeout = RIGOL_TO_S
|
||
idn = rigol.ask("*IDN?").strip()
|
||
print(f" Rigol: {idn}")
|
||
setup_rigol(rigol)
|
||
print(f" CH1 configured: {RIGOL_V_SCALE*1000:.0f} mV/div, "
|
||
f"offset {RIGOL_V_OFFSET:.2f} V, {RIGOL_TIMEBASE*1000:.1f} ms/div")
|
||
except Exception as e:
|
||
print(f" Rigol unreachable ({e}) — continuing without rail capture")
|
||
rigol = None
|
||
else:
|
||
print(f" Rigol: disabled (--no-rigol)")
|
||
|
||
scope = None
|
||
if not args.no_keysight:
|
||
try:
|
||
scope = vxi11.Instrument(KEYSIGHT_IP)
|
||
scope.timeout = KEYSIGHT_TO_S
|
||
idn = scope.ask("*IDN?").strip()
|
||
print(f" Keysight: {idn}")
|
||
setup_keysight(scope)
|
||
print(f" LP_DAT segmented, {KS_SEGMENT_COUNT} segs/acquire, "
|
||
f"{KS_LP_POINTS} pts × {KS_LP_SCALE*1e6:.0f} µs/div")
|
||
except Exception as e:
|
||
print(f" Keysight unreachable ({e}) — continuing without MIPI capture")
|
||
scope = None
|
||
else:
|
||
print(f" Keysight: disabled (--no-keysight)")
|
||
|
||
# Open summary CSV
|
||
sf = open(summary_path, "w", newline="")
|
||
sw = csv.writer(sf)
|
||
sw.writerow(["trial", "iso", "label", "n_unlocks",
|
||
"min_unlock_ms", "med_unlock_ms", "max_unlock_ms",
|
||
"rail_vpp_mV", "rail_mean_V",
|
||
"n_keysight_segs", "json_file"])
|
||
sf.flush()
|
||
|
||
def _shutdown(*_):
|
||
try:
|
||
video_stop(sess)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
sf.close()
|
||
except Exception:
|
||
pass
|
||
if rigol is not None:
|
||
try:
|
||
rigol.write(":RUN")
|
||
except Exception:
|
||
pass
|
||
print("\nshutting down — video off, Rigol restored to RUN")
|
||
sys.exit(0)
|
||
signal.signal(signal.SIGINT, _shutdown)
|
||
signal.signal(signal.SIGTERM, _shutdown)
|
||
|
||
print("\n Watch the display during each observe window, then label the trial.")
|
||
print()
|
||
trial = 0
|
||
while True:
|
||
trial += 1
|
||
trial_iso = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
print(f"=== TRIAL {trial:04d} {trial_iso} ===", flush=True)
|
||
|
||
# Start background SN65 poller — runs continuously through the entire
|
||
# trial (observe + Rigol read + MIPI read + video_stop) so we don't
|
||
# miss any unlock that falls in the readout/transition phases.
|
||
poller = SN65Poller()
|
||
poller.start()
|
||
|
||
# 1) start video
|
||
print(f" video START", flush=True)
|
||
video_start(sess)
|
||
t_video_on = time.time()
|
||
|
||
# 2a) Kick off Keysight acquire (non-blocking — runs in scope memory).
|
||
if scope is not None:
|
||
try:
|
||
keysight_arm(scope)
|
||
except Exception as e:
|
||
print(f" Keysight arm FAILED: {e}", flush=True)
|
||
|
||
# 2b) Observe phase — main thread just sleeps while poller does its job
|
||
print(f" observing for {args.observe_s:.0f} s ...", flush=True)
|
||
time.sleep(args.observe_s)
|
||
|
||
# 3) Rigol rail snapshot — poller continues in background
|
||
vpp_mV = mean_V = None
|
||
rail_path = None
|
||
if rigol is not None:
|
||
rail_path = session_dir / f"trial_{trial:04d}_{trial_iso}_rail.csv"
|
||
try:
|
||
vpp_mV, mean_V = capture_rail(rigol, rail_path)
|
||
print(f" rail: Vpp={vpp_mV:.1f} mV mean={mean_V:.3f} V", flush=True)
|
||
except Exception as e:
|
||
print(f" rail capture FAILED: {e}", flush=True)
|
||
rail_path = None
|
||
|
||
# 4) Read Keysight segments (poller continues in background)
|
||
n_ks_segs = 0
|
||
if scope is not None:
|
||
try:
|
||
if keysight_wait_done(scope, timeout_s=5.0):
|
||
segs = keysight_read_segments(scope, KS_SEGMENT_COUNT)
|
||
base = f"trial_{trial:04d}_{trial_iso}_mipi"
|
||
n_ks_segs = save_keysight_segments(segs, session_dir, base)
|
||
print(f" MIPI: {n_ks_segs} segments saved "
|
||
f"(base {base}_segNNN_clk.csv / _dat.csv)", flush=True)
|
||
else:
|
||
print(f" Keysight acquisition didn't complete in time", flush=True)
|
||
except Exception as e:
|
||
print(f" Keysight read FAILED: {e}", flush=True)
|
||
|
||
# 5) stop video — poller still running so we catch any unlock at the
|
||
# moment of video stop (which we missed in the previous design)
|
||
print(f" video STOP", flush=True)
|
||
video_stop(sess)
|
||
# Brief tail so the post-stop transition is included in the poll window
|
||
time.sleep(0.5)
|
||
|
||
# Stop poller and harvest its data
|
||
poller.request_stop()
|
||
poller.join(timeout=2.0)
|
||
samples, unlocks = poller.snapshot()
|
||
n_errors = sum(1 for s in samples if "error" in s)
|
||
n_none = sum(1 for s in samples
|
||
if "state" in s and s["state"].get("pll_lock") is None)
|
||
print(f" SN65 polled: {len(samples)} samples "
|
||
f"(over ~{args.observe_s + 6:.0f}s) "
|
||
f"errors={n_errors} None={n_none}", flush=True)
|
||
|
||
# Pair unlocks with their recovery times for pulse-width measurement
|
||
unlock_pairs = []
|
||
pll_evts = [s for s in samples
|
||
if "state" in s and s["state"].get("pll_lock") is not None]
|
||
for u in unlocks:
|
||
# Find next sample where pll_lock is True after this unlock ts
|
||
for s in pll_evts:
|
||
if s["ts"] > u["ts"] and s["state"]["pll_lock"] is True:
|
||
dur = (s["ts"] - u["ts"]) * 1000.0
|
||
unlock_pairs.append({"start_ts": u["ts"],
|
||
"start_iso": u["iso"],
|
||
"duration_ms": dur})
|
||
break
|
||
|
||
durs = sorted(p["duration_ms"] for p in unlock_pairs)
|
||
if durs:
|
||
n = len(durs)
|
||
mn, md, mx = durs[0], durs[n//2], durs[-1]
|
||
print(f" unlocks: {len(unlock_pairs)} durations: "
|
||
f"min={mn:.1f}ms med={md:.1f}ms max={mx:.1f}ms", flush=True)
|
||
else:
|
||
mn = md = mx = None
|
||
print(f" unlocks: 0", flush=True)
|
||
|
||
# 6) prompt for label
|
||
label_short = prompt_label()
|
||
if label_short == "q":
|
||
_shutdown()
|
||
if label_short == "s":
|
||
print(f" skipped (no save)")
|
||
time.sleep(PAUSE_BETWEEN_S)
|
||
trial -= 1 # don't number this one
|
||
continue
|
||
label = {"f": "flicker", "g": "good"}[label_short]
|
||
|
||
# 7) save trial JSON + summary row
|
||
json_path = session_dir / f"trial_{trial:04d}_{label}_{trial_iso}.json"
|
||
trial_data = {
|
||
"trial": trial,
|
||
"session_ts": session_ts,
|
||
"trial_ts": trial_iso,
|
||
"label": label,
|
||
"observe_s": args.observe_s,
|
||
"t_video_on": t_video_on,
|
||
"n_samples": len(samples),
|
||
"n_unlocks": len(unlock_pairs),
|
||
"unlock_pairs": unlock_pairs,
|
||
"samples": samples,
|
||
"rail_csv": rail_path.name if rail_path else None,
|
||
"rail_vpp_mV": vpp_mV,
|
||
"rail_mean_V": mean_V,
|
||
"n_keysight_segs": n_ks_segs,
|
||
"keysight_basename": f"trial_{trial:04d}_{trial_iso}_mipi" if n_ks_segs else None,
|
||
}
|
||
json_path.write_text(json.dumps(trial_data, indent=2, default=str))
|
||
print(f" saved {json_path.name}", flush=True)
|
||
|
||
sw.writerow([trial, trial_iso, label, len(unlock_pairs),
|
||
f"{mn:.1f}" if mn is not None else "",
|
||
f"{md:.1f}" if md is not None else "",
|
||
f"{mx:.1f}" if mx is not None else "",
|
||
f"{vpp_mV:.1f}" if vpp_mV is not None else "",
|
||
f"{mean_V:.3f}" if mean_V is not None else "",
|
||
n_ks_segs,
|
||
json_path.name])
|
||
sf.flush()
|
||
|
||
time.sleep(PAUSE_BETWEEN_S)
|
||
print() # blank line between trials
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |