Files
MiPi_TEST/trial_runner.py
2026-05-15 16:32:15 +01:00

701 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
trial_runner.py — Controlled single-trial flicker experiment.
Each trial is one labelled load/unload cycle:
1. start video (PUT /video start, static-pink)
2. observe for OBSERVE_S seconds
- poll SN65 PLL state at ~50 Hz, log every state change
3. snapshot Rigol CH1 (1V8 rail) — one trace per trial
4. stop video (PUT /video stop)
5. prompt for label ([f]licker / [g]ood / [s]kip / [q]uit)
6. save trial JSON + rail CSV with the label
7. brief pause, then next trial
Output layout:
data/trials/{session_ts}/
trial_0001_good_{ts}.json
trial_0001_good_{ts}_rail.csv
trial_0002_flicker_{ts}.json
trial_0002_flicker_{ts}_rail.csv
...
summary.csv (one row per trial: label, n_unlocks, vpp_mV, mean_V)
Prerequisites:
* Rigol DS1202Z-E at 192.168.45.5, CH1 probed on 1V8 rail
(script configures channel/timebase/trigger automatically)
* Keysight DSO80204B at 192.168.45.4 with CH1=CLK+, CH3=DAT0+ (CH2/CH4
= the complementary differential lines; script configures the rest)
* SN65 device endpoint at http://192.168.45.8:5000
"""
from __future__ import annotations
import argparse
import csv
import json
import signal
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
VIDEO_URL = f"{DEVICE_BASE}/video"
RIGOL_IP = "192.168.45.5"
KEYSIGHT_IP = "192.168.45.4"
DATA_ROOT = Path(__file__).parent / "data" / "trials"
OBSERVE_S = 10.0 # observe window per trial
PAUSE_BETWEEN_S = 0.5
POLL_DT_S = 0.020 # 50 Hz SN65 polling during the observe window
HTTP_TO_S = 0.2
RIGOL_TO_S = 10.0
KEYSIGHT_TO_S = 30.0
# ---- Rigol CH1 (1V8 rail) capture settings ---------------------------------
# 100 mV/div, offset 1.8 V puts 1.8 V at screen centre with ±400 mV headroom.
# 10 ms/div × 12 div = 120 ms window — comfortably brackets a ~40 ms unlock.
RIGOL_V_SCALE = 0.1 # V/div
RIGOL_V_OFFSET = -1.8 # V
RIGOL_TIMEBASE = 10e-3 # s/div → 120 ms window
RIGOL_PROBE = 10 # 10× passive probe on 1V8 rail
# ---- Keysight LP-mode capture settings (mirrors flicker_watch.py LP_DAT) ---
KS_LP_SCALE = 1e-6 # 1 µs/div → 20 µs window
KS_LP_POINTS = 50_000
KS_LP_TRIG_OFFSET = 9e-6
KS_LP_V_SCALE = 0.2
KS_LP_V_OFFSET = 0.6
KS_LP_TRIG_LEVEL = 0.6
KS_SEGMENT_COUNT = 100 # segments per :DIGitize
KS_PROBE = 19.2 # matches existing test rig
# ---------------------------------------------------------------------------
# Rigol I/O
# ---------------------------------------------------------------------------
def _read_ieee_block(rigol) -> bytes:
head = rigol.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = rigol.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = rigol.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = rigol.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
rigol.read_raw(1)
except Exception:
pass
return data
def setup_rigol(rigol) -> None:
"""One-shot SCPI configuration of Rigol CH1 for 1V8 supply rail capture."""
rigol.write(":STOP"); time.sleep(0.2)
rigol.write(":CHANnel1:DISPlay 1")
rigol.write(":CHANnel1:COUPling DC")
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
rigol.write(":CHANnel2:DISPlay 0")
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
rigol.write(":TRIGger:MODE EDGE")
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
rigol.write(":TRIGger:EDGe:LEVel 1.76")
rigol.write(":TRIGger:SWEep AUTO")
rigol.write(":ACQuire:MDEPth AUTO")
time.sleep(0.3)
rigol.write(":RUN")
time.sleep(0.2)
_rail_diag_printed = False
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
""":STOP → read CH1 (ASCII format) → :RUN. Returns (vpp_mV, mean_V).
ASCII format returns volts directly — sidesteps the BYTE-format
YOrigin/YReference unit ambiguity in the Rigol manual. Mirrors the
proven rigol_scope.py approach used in mipi_test.py.
"""
global _rail_diag_printed
rigol.write(":STOP")
time.sleep(0.1)
rigol.write(":WAVeform:SOURce CHANnel1")
rigol.write(":WAVeform:FORMat ASC") # Rigol DS1000Z uses ASC not ASCII
rigol.write(":WAVeform:MODE NORM")
time.sleep(0.05)
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
xinc = float(pre[4])
xorig = float(pre[5])
raw = rigol.ask(":WAVeform:DATA?").strip()
# Strip optional IEEE 488.2 binary header '#<ndigits><nbytes>'
if raw.startswith("#"):
ndig = int(raw[1])
raw = raw[2 + ndig:]
vals = [float(v) for v in raw.split(",") if v.strip()]
if not vals:
rigol.write(":RUN")
raise RuntimeError("Rigol returned no samples (channel disabled?)")
volts = np.asarray(vals, dtype=np.float64)
t = np.arange(len(volts)) * xinc + xorig
# One-time diagnostic: dump preamble + raw sample range so we can spot
# probe / channel-setting issues immediately.
if not _rail_diag_printed:
_rail_diag_printed = True
print(f" [diag] Rigol preamble: pts={pre[2]} xinc={xinc:.2e} "
f"xorig={xorig:.2e}")
print(f" [diag] first 5 samples (V): "
f"{[round(v, 4) for v in volts[:5].tolist()]}")
print(f" [diag] sample range: "
f"min={volts.min():.4f} V, max={volts.max():.4f} V, "
f"n={len(volts)}")
np.savetxt(out_path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
rigol.write(":RUN")
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
# ---------------------------------------------------------------------------
# Keysight DSO80204B (MIPI scope) I/O — mirrors flicker_watch.py LP_DAT mode
# ---------------------------------------------------------------------------
def _ks_drain_errors(scope) -> list[str]:
errs = []
for _ in range(20):
try:
r = scope.ask(":SYSTem:ERRor?").strip()
except Exception:
break
if not r or r.startswith(("0,", "+0,")) or r == "0":
break
errs.append(r)
return errs
def setup_keysight(scope) -> None:
"""Configure Keysight scope for MIPI LP-mode segmented LP_DAT capture."""
cmds = [
"*RST", ":RUN", ":STOP", "*CLS",
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
":TIMebase:REFerence CENTer",
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
]
for c in cmds:
scope.write(c)
time.sleep(0.04)
_ks_drain_errors(scope)
# LP-mode channel offsets + falling-edge trigger on DAT0+
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
scope.write(":TRIGger:MODE EDGE")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
scope.write(":ACQuire:MODE SEGMented")
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
time.sleep(0.4)
_ks_drain_errors(scope)
def _ks_read_block(scope) -> bytes:
"""IEEE 488.2 binary block: '#'<n><len><data>[\\n]."""
head = scope.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = scope.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = scope.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = scope.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
scope.read_raw(1)
except Exception:
pass
return data
def keysight_arm(scope) -> None:
"""Send :DIGitize. Acquisition runs in scope memory until OPC."""
scope.write(":DIGitize")
def keysight_wait_done(scope, timeout_s: float) -> bool:
"""Block until acquisition completes or timeout."""
prev = scope.timeout
try:
scope.timeout = timeout_s + 2
return scope.ask("*OPC?").strip() == "1"
except Exception:
return False
finally:
scope.timeout = prev
def keysight_read_segments(scope, n_segments: int):
"""Read CLK+ (CH1) and DAT0+ (CH3) for all N segments via :WAVeform:DATA?."""
out = {}
for chan_id, label in [(1, "clk"), (3, "dat")]:
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
scope.write(":WAVeform:FORMat WORD")
scope.write(":WAVeform:BYTeorder LSBFirst")
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
x_org = float(scope.ask(":WAVeform:XORigin?"))
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
y_org = float(scope.ask(":WAVeform:YORigin?"))
segs = []
for i in range(1, n_segments + 1):
if n_segments > 1:
scope.write(f":ACQuire:SEGMented:INDex {i}")
scope.write(":WAVeform:DATA?")
raw = _ks_read_block(scope)
codes = np.frombuffer(raw, dtype="<i2")
segs.append(codes.astype(np.float64) * y_inc + y_org)
n = len(segs[0]) if segs else 0
out[label] = {"times": np.arange(n) * x_inc + x_org, "segs": segs}
return out
def save_keysight_segments(segments: dict, out_dir: Path, base: str) -> int:
"""Write per-segment CSVs to out_dir. Returns number of segments written."""
n_written = 0
n_segs = len(segments["clk"]["segs"])
for i in range(n_segs):
for label in ("clk", "dat"):
t = segments[label]["times"]
v = segments[label]["segs"][i]
path = out_dir / f"{base}_seg{i+1:03d}_{label}.csv"
np.savetxt(path, np.column_stack([t, v]),
delimiter=",", fmt="%.6e")
n_written += 1
return n_written
# ---------------------------------------------------------------------------
# Video + SN65 helpers
# ---------------------------------------------------------------------------
def video_start(sess: requests.Session) -> None:
try:
sess.put(VIDEO_URL,
json={"action": "start", "mode": "static-pink"}, timeout=3.0)
except Exception as e:
print(f" video START failed: {e}")
def video_stop(sess: requests.Session) -> None:
try:
sess.put(VIDEO_URL, json={"action": "stop"}, timeout=3.0)
except Exception as e:
print(f" video STOP failed: {e}")
def extract_state(data: dict | None) -> dict:
regs = (data or {}).get("registers", {}) or {}
csr_0a = regs.get("csr_0a") or {}
csr_e5 = regs.get("csr_e5") or {}
return {
"csr_0a": csr_0a.get("value"),
"csr_e5": csr_e5.get("value"),
"pll_lock": csr_0a.get("pll_lock"),
"clk_det": csr_0a.get("clk_det"),
"pll_unlock": csr_e5.get("pll_unlock"),
"cha_sot_bit_err":csr_e5.get("cha_sot_bit_err"),
"cha_llp_err": csr_e5.get("cha_llp_err"),
"cha_ecc_err": csr_e5.get("cha_ecc_err"),
"cha_lp_err": csr_e5.get("cha_lp_err"),
"cha_crc_err": csr_e5.get("cha_crc_err"),
}
def observe_window(sess: requests.Session, duration_s: float) -> tuple[list, list]:
"""
Poll SN65 for `duration_s` at POLL_DT_S. Return (all_samples, unlocks).
`unlocks` is a list of pll_lock True→False events (timestamps only — paired
recovery times are stitched in post).
"""
samples: list = []
unlocks: list = []
last_pll: bool | None = None
end = time.time() + duration_s
while time.time() < end:
t0 = time.time()
try:
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
state = extract_state(r.json())
samples.append({"ts": t0, "state": state})
pll = state["pll_lock"]
if last_pll is True and pll is False:
unlocks.append({"ts": t0,
"iso": datetime.fromtimestamp(t0)
.strftime("%H:%M:%S.%f")[:-3]})
if pll is not None:
last_pll = pll
except Exception as e:
samples.append({"ts": t0, "error": str(e)})
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
return samples, unlocks
class SN65Poller(threading.Thread):
"""
Background SN65 poller — runs for the full duration of a trial
(video_start … video_stop) so we never have a coverage gap.
Uses its own requests.Session because requests.Session isn't
thread-safe for sharing with the main thread's HTTP calls.
"""
def __init__(self):
super().__init__(daemon=True)
self._sess = requests.Session()
self._stop_evt = threading.Event() # NOT _stop: Thread uses that
self._lock = threading.Lock()
self.samples: list = []
self.unlocks: list = []
def request_stop(self):
self._stop_evt.set()
def snapshot(self) -> tuple[list, list]:
"""Return shallow copies of (samples, unlocks) so the main thread can
keep mutating them safely after the poller has stopped."""
with self._lock:
return list(self.samples), list(self.unlocks)
def run(self):
last_pll: bool | None = None
while not self._stop_evt.is_set():
t0 = time.time()
try:
r = self._sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
state = extract_state(r.json())
pll = state["pll_lock"]
rec = {"ts": t0, "state": state}
if last_pll is True and pll is False:
self.unlocks.append({
"ts": t0,
"iso": datetime.fromtimestamp(t0)
.strftime("%H:%M:%S.%f")[:-3],
})
if pll is not None:
last_pll = pll
except Exception as e:
rec = {"ts": t0, "error": str(e)}
with self._lock:
self.samples.append(rec)
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
def prompt_label(default: str = "g") -> str:
"""Block until user enters f/g/s/q."""
while True:
try:
ans = input("\n label? [f]licker / [g]ood / [s]kip / [q]uit: "
).strip().lower()
except EOFError:
return "q"
if ans == "":
ans = default
if ans in ("f", "g", "s", "q"):
return ans
print(f" not understood ('{ans}') — try again")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--observe-s", type=float, default=OBSERVE_S,
help=f"observe window per trial in seconds (default {OBSERVE_S})")
ap.add_argument("--no-rigol", action="store_true",
help="skip Rigol rail capture (useful if scope not connected)")
ap.add_argument("--no-keysight", action="store_true",
help="skip Keysight MIPI capture (useful if scope not connected)")
args = ap.parse_args()
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = DATA_ROOT / session_ts
session_dir.mkdir(parents=True, exist_ok=True)
summary_path = session_dir / "summary.csv"
print(f"TRIAL RUNNER — session {session_ts}")
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
print(f" observe: {args.observe_s:.1f} s")
print(f" SN65 endpoint: {SN65_EP}")
sess = requests.Session()
# Verify SN65 endpoint
try:
sess.get(SN65_EP, timeout=2.0).raise_for_status()
print(f" SN65: reachable")
except Exception as e:
print(f" *** SN65 endpoint failed: {e} ***")
sys.exit(1)
rigol = None
if not args.no_rigol:
try:
rigol = vxi11.Instrument(RIGOL_IP)
rigol.timeout = RIGOL_TO_S
idn = rigol.ask("*IDN?").strip()
print(f" Rigol: {idn}")
setup_rigol(rigol)
print(f" CH1 configured: {RIGOL_V_SCALE*1000:.0f} mV/div, "
f"offset {RIGOL_V_OFFSET:.2f} V, {RIGOL_TIMEBASE*1000:.1f} ms/div")
except Exception as e:
print(f" Rigol unreachable ({e}) — continuing without rail capture")
rigol = None
else:
print(f" Rigol: disabled (--no-rigol)")
scope = None
if not args.no_keysight:
try:
scope = vxi11.Instrument(KEYSIGHT_IP)
scope.timeout = KEYSIGHT_TO_S
idn = scope.ask("*IDN?").strip()
print(f" Keysight: {idn}")
setup_keysight(scope)
print(f" LP_DAT segmented, {KS_SEGMENT_COUNT} segs/acquire, "
f"{KS_LP_POINTS} pts × {KS_LP_SCALE*1e6:.0f} µs/div")
except Exception as e:
print(f" Keysight unreachable ({e}) — continuing without MIPI capture")
scope = None
else:
print(f" Keysight: disabled (--no-keysight)")
# Open summary CSV
sf = open(summary_path, "w", newline="")
sw = csv.writer(sf)
sw.writerow(["trial", "iso", "label", "n_unlocks",
"min_unlock_ms", "med_unlock_ms", "max_unlock_ms",
"rail_vpp_mV", "rail_mean_V",
"n_keysight_segs", "json_file"])
sf.flush()
def _shutdown(*_):
try:
video_stop(sess)
except Exception:
pass
try:
sf.close()
except Exception:
pass
if rigol is not None:
try:
rigol.write(":RUN")
except Exception:
pass
print("\nshutting down — video off, Rigol restored to RUN")
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print("\n Watch the display during each observe window, then label the trial.")
print()
trial = 0
while True:
trial += 1
trial_iso = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"=== TRIAL {trial:04d} {trial_iso} ===", flush=True)
# Start background SN65 poller — runs continuously through the entire
# trial (observe + Rigol read + MIPI read + video_stop) so we don't
# miss any unlock that falls in the readout/transition phases.
poller = SN65Poller()
poller.start()
# 1) start video
print(f" video START", flush=True)
video_start(sess)
t_video_on = time.time()
# 2a) Kick off Keysight acquire (non-blocking — runs in scope memory).
if scope is not None:
try:
keysight_arm(scope)
except Exception as e:
print(f" Keysight arm FAILED: {e}", flush=True)
# 2b) Observe phase — main thread just sleeps while poller does its job
print(f" observing for {args.observe_s:.0f} s ...", flush=True)
time.sleep(args.observe_s)
# 3) Rigol rail snapshot — poller continues in background
vpp_mV = mean_V = None
rail_path = None
if rigol is not None:
rail_path = session_dir / f"trial_{trial:04d}_{trial_iso}_rail.csv"
try:
vpp_mV, mean_V = capture_rail(rigol, rail_path)
print(f" rail: Vpp={vpp_mV:.1f} mV mean={mean_V:.3f} V", flush=True)
except Exception as e:
print(f" rail capture FAILED: {e}", flush=True)
rail_path = None
# 4) Read Keysight segments (poller continues in background)
n_ks_segs = 0
if scope is not None:
try:
if keysight_wait_done(scope, timeout_s=5.0):
segs = keysight_read_segments(scope, KS_SEGMENT_COUNT)
base = f"trial_{trial:04d}_{trial_iso}_mipi"
n_ks_segs = save_keysight_segments(segs, session_dir, base)
print(f" MIPI: {n_ks_segs} segments saved "
f"(base {base}_segNNN_clk.csv / _dat.csv)", flush=True)
else:
print(f" Keysight acquisition didn't complete in time", flush=True)
except Exception as e:
print(f" Keysight read FAILED: {e}", flush=True)
# 5) stop video — poller still running so we catch any unlock at the
# moment of video stop (which we missed in the previous design)
print(f" video STOP", flush=True)
video_stop(sess)
# Brief tail so the post-stop transition is included in the poll window
time.sleep(0.5)
# Stop poller and harvest its data
poller.request_stop()
poller.join(timeout=2.0)
samples, unlocks = poller.snapshot()
n_errors = sum(1 for s in samples if "error" in s)
n_none = sum(1 for s in samples
if "state" in s and s["state"].get("pll_lock") is None)
print(f" SN65 polled: {len(samples)} samples "
f"(over ~{args.observe_s + 6:.0f}s) "
f"errors={n_errors} None={n_none}", flush=True)
# Pair unlocks with their recovery times for pulse-width measurement
unlock_pairs = []
pll_evts = [s for s in samples
if "state" in s and s["state"].get("pll_lock") is not None]
for u in unlocks:
# Find next sample where pll_lock is True after this unlock ts
for s in pll_evts:
if s["ts"] > u["ts"] and s["state"]["pll_lock"] is True:
dur = (s["ts"] - u["ts"]) * 1000.0
unlock_pairs.append({"start_ts": u["ts"],
"start_iso": u["iso"],
"duration_ms": dur})
break
durs = sorted(p["duration_ms"] for p in unlock_pairs)
if durs:
n = len(durs)
mn, md, mx = durs[0], durs[n//2], durs[-1]
print(f" unlocks: {len(unlock_pairs)} durations: "
f"min={mn:.1f}ms med={md:.1f}ms max={mx:.1f}ms", flush=True)
else:
mn = md = mx = None
print(f" unlocks: 0", flush=True)
# 6) prompt for label
label_short = prompt_label()
if label_short == "q":
_shutdown()
if label_short == "s":
print(f" skipped (no save)")
time.sleep(PAUSE_BETWEEN_S)
trial -= 1 # don't number this one
continue
label = {"f": "flicker", "g": "good"}[label_short]
# 7) save trial JSON + summary row
json_path = session_dir / f"trial_{trial:04d}_{label}_{trial_iso}.json"
trial_data = {
"trial": trial,
"session_ts": session_ts,
"trial_ts": trial_iso,
"label": label,
"observe_s": args.observe_s,
"t_video_on": t_video_on,
"n_samples": len(samples),
"n_unlocks": len(unlock_pairs),
"unlock_pairs": unlock_pairs,
"samples": samples,
"rail_csv": rail_path.name if rail_path else None,
"rail_vpp_mV": vpp_mV,
"rail_mean_V": mean_V,
"n_keysight_segs": n_ks_segs,
"keysight_basename": f"trial_{trial:04d}_{trial_iso}_mipi" if n_ks_segs else None,
}
json_path.write_text(json.dumps(trial_data, indent=2, default=str))
print(f" saved {json_path.name}", flush=True)
sw.writerow([trial, trial_iso, label, len(unlock_pairs),
f"{mn:.1f}" if mn is not None else "",
f"{md:.1f}" if md is not None else "",
f"{mx:.1f}" if mx is not None else "",
f"{vpp_mV:.1f}" if vpp_mV is not None else "",
f"{mean_V:.3f}" if mean_V is not None else "",
n_ks_segs,
json_path.name])
sf.flush()
time.sleep(PAUSE_BETWEEN_S)
print() # blank line between trials
if __name__ == "__main__":
main()