626 lines
23 KiB
Python
626 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
flicker_burst.py — Press `f` when you observe flicker. The script then:
|
||
|
||
1. Arms Keysight DSO80204B for a large segmented MIPI capture (LP_DAT
|
||
trigger fires at line rate, ~48 kHz, so segments fill in ms).
|
||
2. Polls SN65 /sn65_registers continuously at ~50 Hz, recording every
|
||
PLL state transition.
|
||
3. Tails video_cycler.py's CSV log and stops capturing the moment
|
||
the next video stop/start transition is observed (i.e. the end of
|
||
the current video-on window).
|
||
4. Reads out all Keysight segments and saves everything to a
|
||
per-burst folder for offline signal-integrity / protocol analysis.
|
||
|
||
Run alongside video_cycler.py in another terminal:
|
||
|
||
Terminal A: python3 video_cycler.py # provokes flicker
|
||
Terminal B: python3 flicker_burst.py # this script
|
||
(press `f` when you see flicker; `q` to quit)
|
||
|
||
Output:
|
||
data/flicker_bursts/{session_ts}/
|
||
burst_NNNN_{ts}_pll_samples.json
|
||
burst_NNNN_{ts}_mipi_seg001_clk.csv ... segNNN_dat.csv
|
||
burst_NNNN_{ts}_meta.json
|
||
summary.csv
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import select
|
||
import signal
|
||
import sys
|
||
import termios
|
||
import time
|
||
import tty
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import requests
|
||
import vxi11
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
||
KEYSIGHT_IP = "192.168.45.4"
|
||
RIGOL_IP = "192.168.45.5"
|
||
DATA_ROOT = Path(__file__).parent / "data" / "flicker_bursts"
|
||
CYCLE_LOG_DIR = Path(__file__).parent / "data" / "cycle_logs"
|
||
|
||
POLL_DT_S = 0.020 # 50 Hz SN65 polling
|
||
HTTP_TO_S = 0.2
|
||
KEYSIGHT_TO_S = 60.0 # large reads can take a while
|
||
RIGOL_TO_S = 10.0
|
||
|
||
# Rigol CH1 (1V8 supply rail) — wide enough to bracket the whole burst window
|
||
RIGOL_V_SCALE = 0.1 # V/div
|
||
RIGOL_V_OFFSET = -1.8 # V (puts 1.8 V at screen centre)
|
||
RIGOL_TIMEBASE = 1.0 # s/div → 12 s window
|
||
RIGOL_PROBE = 10
|
||
|
||
# Keysight LP_DAT segmented capture — large segment count. Segments fill in
|
||
# ms (line rate ≈ 48 kHz × N segs), but readout is the slow part: each
|
||
# segment is one SCPI round-trip per channel. 500 segs ≈ ~30 s readout.
|
||
KS_LP_SCALE = 1e-6
|
||
KS_LP_POINTS = 50_000
|
||
KS_LP_TRIG_OFFSET = 9e-6
|
||
KS_LP_V_SCALE = 0.2
|
||
KS_LP_V_OFFSET = 0.6
|
||
KS_LP_TRIG_LEVEL = 0.6
|
||
KS_SEGMENT_COUNT = 100 # readout ~6 s (was 500 → ~30 s)
|
||
KS_PROBE = 19.2
|
||
|
||
# Safety: cap any single capture at this long, in case video_cycler isn't
|
||
# running or its log isn't updating.
|
||
MAX_CAPTURE_S = 20.0
|
||
|
||
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
|
||
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Non-blocking keys
|
||
# ---------------------------------------------------------------------------
|
||
class KeyReader:
|
||
def __enter__(self):
|
||
self.fd = sys.stdin.fileno()
|
||
self.old = termios.tcgetattr(self.fd)
|
||
tty.setcbreak(self.fd)
|
||
return self
|
||
|
||
def get_key(self) -> str | None:
|
||
if select.select([sys.stdin], [], [], 0)[0]:
|
||
return sys.stdin.read(1).lower()
|
||
return None
|
||
|
||
def __exit__(self, *_):
|
||
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CSV-log tail for video_cycler
|
||
# ---------------------------------------------------------------------------
|
||
class CyclerLogTail:
|
||
"""
|
||
Watch video_cycler.py's most-recent CSV log for new events.
|
||
|
||
Uses stat-based size tracking and fresh opens on every check so we're
|
||
immune to any TextIOWrapper buffering quirks across processes.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.path: Path | None = None
|
||
self.pos: int = 0 # byte offset we've read up to
|
||
self._find_latest(initial=True)
|
||
|
||
def _find_latest(self, initial: bool = False) -> bool:
|
||
logs = sorted(CYCLE_LOG_DIR.glob("*_cycles.csv")) if CYCLE_LOG_DIR.exists() else []
|
||
if not logs:
|
||
return False
|
||
latest = logs[-1]
|
||
if self.path != latest:
|
||
self.path = latest
|
||
try:
|
||
# Skip past whatever was already in the file at startup —
|
||
# we only want NEW events. Subsequent rolls keep pos=0.
|
||
self.pos = self.path.stat().st_size if initial else 0
|
||
except FileNotFoundError:
|
||
self.pos = 0
|
||
return True
|
||
|
||
def get_next_event(self, timeout_s: float) -> dict | None:
|
||
"""
|
||
Wait up to timeout_s for the next start/stop event.
|
||
Returns {'iso','ts','event','cycle'} or None.
|
||
"""
|
||
self._find_latest()
|
||
if not self.path:
|
||
return None
|
||
|
||
deadline = time.time() + timeout_s
|
||
first = True
|
||
while first or time.time() < deadline:
|
||
first = False
|
||
try:
|
||
size = self.path.stat().st_size
|
||
except FileNotFoundError:
|
||
self._find_latest()
|
||
if timeout_s <= 0:
|
||
return None
|
||
time.sleep(0.05)
|
||
continue
|
||
if size > self.pos:
|
||
try:
|
||
with open(self.path, "r") as f:
|
||
f.seek(self.pos)
|
||
line = f.readline()
|
||
self.pos = f.tell()
|
||
except Exception:
|
||
line = ""
|
||
if line:
|
||
parts = [p.strip() for p in line.strip().split(",")]
|
||
if len(parts) >= 4 and parts[0] != "iso":
|
||
try:
|
||
return {"iso": parts[0], "ts": float(parts[1]),
|
||
"event": parts[2], "cycle": int(parts[3])}
|
||
except Exception:
|
||
pass
|
||
# Whitespace/comment line — keep looping
|
||
continue
|
||
if timeout_s <= 0:
|
||
return None
|
||
self._find_latest()
|
||
time.sleep(0.05)
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SN65 extraction
|
||
# ---------------------------------------------------------------------------
|
||
def extract_state(data: dict | None) -> dict:
|
||
regs = (data or {}).get("registers", {}) or {}
|
||
csr_0a = regs.get("csr_0a") or {}
|
||
csr_e5 = regs.get("csr_e5") or {}
|
||
out = {
|
||
"csr_0a": csr_0a.get("value"),
|
||
"csr_e5": csr_e5.get("value"),
|
||
"pll_lock": csr_0a.get("pll_lock"),
|
||
"clk_det": csr_0a.get("clk_det"),
|
||
}
|
||
for k in ERROR_BITS:
|
||
out[k] = csr_e5.get(k)
|
||
return out
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rigol I/O (1V8 supply rail capture)
|
||
# ---------------------------------------------------------------------------
|
||
def setup_rigol(rigol) -> None:
|
||
rigol.write(":STOP"); time.sleep(0.2)
|
||
rigol.write(":CHANnel1:DISPlay 1")
|
||
rigol.write(":CHANnel1:COUPling DC")
|
||
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
|
||
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
|
||
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
|
||
rigol.write(":CHANnel2:DISPlay 0")
|
||
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
|
||
rigol.write(":TRIGger:MODE EDGE")
|
||
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
|
||
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
|
||
rigol.write(":TRIGger:EDGe:LEVel 1.76")
|
||
rigol.write(":TRIGger:SWEep AUTO")
|
||
rigol.write(":ACQuire:MDEPth AUTO")
|
||
time.sleep(0.3); rigol.write(":RUN"); time.sleep(0.2)
|
||
|
||
|
||
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
|
||
rigol.write(":STOP"); time.sleep(0.1)
|
||
rigol.write(":WAVeform:SOURce CHANnel1")
|
||
rigol.write(":WAVeform:FORMat ASC")
|
||
rigol.write(":WAVeform:MODE NORM")
|
||
time.sleep(0.05)
|
||
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
|
||
xinc = float(pre[4]); xorig = float(pre[5])
|
||
raw = rigol.ask(":WAVeform:DATA?").strip()
|
||
if raw.startswith("#"):
|
||
ndig = int(raw[1])
|
||
raw = raw[2 + ndig:]
|
||
vals = [float(v) for v in raw.split(",") if v.strip()]
|
||
if not vals:
|
||
rigol.write(":RUN")
|
||
raise RuntimeError("Rigol returned no samples")
|
||
volts = np.asarray(vals, dtype=np.float64)
|
||
t = np.arange(len(volts)) * xinc + xorig
|
||
np.savetxt(out_path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
rigol.write(":RUN")
|
||
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Keysight I/O
|
||
# ---------------------------------------------------------------------------
|
||
def _ks_drain(scope):
|
||
for _ in range(20):
|
||
try:
|
||
r = scope.ask(":SYSTem:ERRor?").strip()
|
||
except Exception:
|
||
return
|
||
if not r or r.startswith(("0,", "+0,")) or r == "0":
|
||
return
|
||
|
||
|
||
def setup_keysight(scope) -> None:
|
||
for c in [
|
||
"*RST", ":RUN", ":STOP", "*CLS",
|
||
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
|
||
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
|
||
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
|
||
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
|
||
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
|
||
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
|
||
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
|
||
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
|
||
":TIMebase:REFerence CENTer",
|
||
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
|
||
]:
|
||
scope.write(c); time.sleep(0.04)
|
||
_ks_drain(scope)
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
|
||
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
|
||
scope.write(":TRIGger:MODE EDGE")
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
|
||
scope.write(":TRIGger:SWEep NORMal")
|
||
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
|
||
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
|
||
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
|
||
scope.write(":ACQuire:MODE SEGMented")
|
||
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
|
||
time.sleep(0.4)
|
||
_ks_drain(scope)
|
||
|
||
|
||
def _ks_read_block(scope) -> bytes:
|
||
head = scope.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = scope.read_raw(64)
|
||
head += extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
length_bytes = scope.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = scope.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
try:
|
||
scope.read_raw(1)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def keysight_arm(scope) -> None:
|
||
"""Send :DIGitize. Acquisition runs in scope memory."""
|
||
scope.write(":DIGitize")
|
||
|
||
|
||
def keysight_read_segments(scope, n_segments: int, out_dir: Path,
|
||
base: str) -> int:
|
||
"""Read N segments for both channels, save per-segment CSVs."""
|
||
n_written = 0
|
||
for chan_id, label in [(1, "clk"), (3, "dat")]:
|
||
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
|
||
scope.write(":WAVeform:FORMat WORD")
|
||
scope.write(":WAVeform:BYTeorder LSBFirst")
|
||
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
|
||
x_org = float(scope.ask(":WAVeform:XORigin?"))
|
||
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
|
||
y_org = float(scope.ask(":WAVeform:YORigin?"))
|
||
for i in range(1, n_segments + 1):
|
||
scope.write(f":ACQuire:SEGMented:INDex {i}")
|
||
scope.write(":WAVeform:DATA?")
|
||
raw = _ks_read_block(scope)
|
||
codes = np.frombuffer(raw, dtype="<i2")
|
||
volts = codes.astype(np.float64) * y_inc + y_org
|
||
t = np.arange(len(volts)) * x_inc + x_org
|
||
path = out_dir / f"{base}_seg{i:03d}_{label}.csv"
|
||
np.savetxt(path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
if label == "clk":
|
||
n_written += 1
|
||
return n_written
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Capture-and-poll cycle
|
||
# ---------------------------------------------------------------------------
|
||
def capture_burst(sess, scope, rigol, cycler_tail: CyclerLogTail,
|
||
burst_n: int, session_dir: Path,
|
||
summary_writer) -> None:
|
||
"""One full burst: arm scope → poll SN65 → wait for cycler event →
|
||
read MIPI segments → save everything."""
|
||
t_press = time.time()
|
||
iso_press = datetime.fromtimestamp(t_press).strftime("%H:%M:%S.%f")[:-3]
|
||
ts_press = datetime.fromtimestamp(t_press).strftime("%Y%m%d_%H%M%S_%f")[:-3]
|
||
base = f"burst_{burst_n:04d}_{ts_press}"
|
||
print(f"\n [{iso_press}] FLICKER #{burst_n} — capture started", flush=True)
|
||
|
||
# 1. Arm Keysight
|
||
if scope is not None:
|
||
try:
|
||
keysight_arm(scope)
|
||
except Exception as e:
|
||
print(f" Keysight arm FAILED: {e}", flush=True)
|
||
|
||
# 2. Poll SN65 in main thread while also tailing cycler log
|
||
samples: list = []
|
||
unlocks: list = []
|
||
last_pll = None
|
||
end_event = None
|
||
deadline = t_press + MAX_CAPTURE_S
|
||
next_log_check = 0.0 # only check log every ~50 ms to keep poll rate high
|
||
|
||
while time.time() < deadline:
|
||
t0 = time.time()
|
||
try:
|
||
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
|
||
r.raise_for_status()
|
||
state = extract_state(r.json())
|
||
samples.append({"ts": t0, "state": state})
|
||
pll = state["pll_lock"]
|
||
if last_pll is True and pll is False:
|
||
unlocks.append({"ts": t0,
|
||
"iso": datetime.fromtimestamp(t0)
|
||
.strftime("%H:%M:%S.%f")[:-3]})
|
||
if pll is not None:
|
||
last_pll = pll
|
||
except Exception as e:
|
||
samples.append({"ts": t0, "error": str(e)})
|
||
|
||
# Cheap check (non-blocking) of cycler log
|
||
if t0 >= next_log_check:
|
||
ev = cycler_tail.get_next_event(timeout_s=0.0)
|
||
if ev is not None and ev["ts"] > t_press:
|
||
end_event = ev
|
||
break
|
||
next_log_check = t0 + 0.05 # 20 Hz log check
|
||
|
||
# Pace SN65 polling
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
|
||
t_end = time.time()
|
||
end_iso = datetime.fromtimestamp(t_end).strftime("%H:%M:%S.%f")[:-3]
|
||
end_reason = ("cycler_event:" + end_event["event"]) if end_event else "timeout"
|
||
print(f" [{end_iso}] capture window ended ({end_reason}) — "
|
||
f"polled {len(samples)} samples in {t_end - t_press:.2f}s",
|
||
flush=True)
|
||
|
||
# 3a. Rigol 1V8 rail snapshot (fast — ~300 ms)
|
||
rail_vpp_mV = rail_mean_V = None
|
||
rail_path = None
|
||
if rigol is not None:
|
||
rail_path = session_dir / f"{base}_rail.csv"
|
||
try:
|
||
rail_vpp_mV, rail_mean_V = capture_rail(rigol, rail_path)
|
||
print(f" rail: Vpp={rail_vpp_mV:.1f}mV mean={rail_mean_V:.3f}V "
|
||
f"({RIGOL_TIMEBASE*12:.0f}s window)", flush=True)
|
||
except Exception as e:
|
||
print(f" rail capture FAILED: {e}", flush=True)
|
||
rail_path = None
|
||
|
||
# 3b. Read Keysight segments
|
||
n_segs = 0
|
||
if scope is not None:
|
||
try:
|
||
# Wait briefly for :DIGitize to complete (segments fill in ms at
|
||
# line rate, but allow margin)
|
||
prev = scope.timeout
|
||
try:
|
||
scope.timeout = 10
|
||
opc = scope.ask("*OPC?").strip()
|
||
except Exception:
|
||
opc = "0"
|
||
finally:
|
||
scope.timeout = prev
|
||
if opc != "1":
|
||
print(f" Keysight :DIGitize didn't complete (OPC={opc}) — "
|
||
f"attempting read anyway", flush=True)
|
||
print(f" reading {KS_SEGMENT_COUNT} segments ×2 ch — be patient",
|
||
flush=True)
|
||
t_read0 = time.time()
|
||
n_segs = keysight_read_segments(
|
||
scope, KS_SEGMENT_COUNT, session_dir, base + "_mipi")
|
||
print(f" MIPI: {n_segs} segments saved "
|
||
f"(readout took {time.time() - t_read0:.1f}s)", flush=True)
|
||
except Exception as e:
|
||
print(f" Keysight read FAILED: {e}", flush=True)
|
||
|
||
# 4. Pair unlocks with their recovery times
|
||
unlock_pairs = []
|
||
pll_evts = [s for s in samples
|
||
if "state" in s and s["state"].get("pll_lock") is not None]
|
||
for u in unlocks:
|
||
for s in pll_evts:
|
||
if s["ts"] > u["ts"] and s["state"]["pll_lock"] is True:
|
||
unlock_pairs.append({"start_ts": u["ts"], "start_iso": u["iso"],
|
||
"duration_ms": (s["ts"] - u["ts"]) * 1000})
|
||
break
|
||
|
||
# 5. Save samples + meta
|
||
samples_path = session_dir / f"{base}_pll_samples.json"
|
||
samples_path.write_text(json.dumps({
|
||
"burst": burst_n,
|
||
"t_press": t_press,
|
||
"press_iso": iso_press,
|
||
"t_end": t_end,
|
||
"end_iso": end_iso,
|
||
"end_reason": end_reason,
|
||
"end_event": end_event,
|
||
"duration_s": t_end - t_press,
|
||
"n_samples": len(samples),
|
||
"n_unlocks": len(unlock_pairs),
|
||
"unlock_pairs": unlock_pairs,
|
||
"samples": samples,
|
||
}, indent=2, default=str))
|
||
|
||
meta_path = session_dir / f"{base}_meta.json"
|
||
meta_path.write_text(json.dumps({
|
||
"burst": burst_n,
|
||
"t_press": t_press,
|
||
"press_iso": iso_press,
|
||
"t_end": t_end,
|
||
"end_iso": end_iso,
|
||
"end_reason": end_reason,
|
||
"duration_s": t_end - t_press,
|
||
"n_pll_samples": len(samples),
|
||
"n_unlocks": len(unlock_pairs),
|
||
"mipi_basename": f"{base}_mipi" if n_segs else None,
|
||
"n_mipi_segments": n_segs,
|
||
"ks_lp_scale_s": KS_LP_SCALE,
|
||
"ks_lp_points": KS_LP_POINTS,
|
||
"rail_csv": rail_path.name if rail_path else None,
|
||
"rail_vpp_mV": rail_vpp_mV,
|
||
"rail_mean_V": rail_mean_V,
|
||
"rail_window_s": RIGOL_TIMEBASE * 12,
|
||
}, indent=2, default=str))
|
||
|
||
summary_writer.writerow([burst_n, ts_press, iso_press, end_iso,
|
||
f"{t_end - t_press:.2f}", end_reason,
|
||
len(samples), len(unlock_pairs), n_segs,
|
||
f"{rail_vpp_mV:.1f}" if rail_vpp_mV is not None else "",
|
||
f"{rail_mean_V:.3f}" if rail_mean_V is not None else "",
|
||
base])
|
||
|
||
durs = sorted(p["duration_ms"] for p in unlock_pairs)
|
||
if durs:
|
||
n = len(durs)
|
||
print(f" unlocks during burst: {n} "
|
||
f"min={durs[0]:.1f}ms med={durs[n//2]:.1f}ms "
|
||
f"max={durs[-1]:.1f}ms", flush=True)
|
||
else:
|
||
print(f" unlocks during burst: 0", flush=True)
|
||
print(f" saved {base}_*", flush=True)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument("--no-keysight", action="store_true",
|
||
help="SN65 polling only (skip MIPI capture)")
|
||
ap.add_argument("--no-rigol", action="store_true",
|
||
help="skip Rigol 1V8 rail capture")
|
||
args = ap.parse_args()
|
||
|
||
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
session_dir = DATA_ROOT / session_ts
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
print(f"FLICKER BURST CAPTURE — session {session_ts}")
|
||
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
|
||
|
||
sess = requests.Session()
|
||
try:
|
||
sess.get(SN65_EP, timeout=2.0).raise_for_status()
|
||
print(f" SN65: reachable")
|
||
except Exception as e:
|
||
print(f" *** SN65 endpoint failed: {e} ***")
|
||
sys.exit(1)
|
||
|
||
rigol = None
|
||
if not args.no_rigol:
|
||
try:
|
||
rigol = vxi11.Instrument(RIGOL_IP)
|
||
rigol.timeout = RIGOL_TO_S
|
||
idn = rigol.ask("*IDN?").strip()
|
||
print(f" Rigol: {idn}")
|
||
setup_rigol(rigol)
|
||
print(f" CH1 1V8 rail, {RIGOL_V_SCALE*1000:.0f} mV/div, "
|
||
f"{RIGOL_TIMEBASE:.1f} s/div ({RIGOL_TIMEBASE*12:.0f}s window)")
|
||
except Exception as e:
|
||
print(f" Rigol failed ({e}) — continuing without rail capture")
|
||
rigol = None
|
||
else:
|
||
print(f" Rigol: disabled (--no-rigol)")
|
||
|
||
scope = None
|
||
if not args.no_keysight:
|
||
try:
|
||
scope = vxi11.Instrument(KEYSIGHT_IP)
|
||
scope.timeout = KEYSIGHT_TO_S
|
||
idn = scope.ask("*IDN?").strip()
|
||
print(f" Keysight: {idn}")
|
||
setup_keysight(scope)
|
||
print(f" LP_DAT segmented, {KS_SEGMENT_COUNT} segs/acq, "
|
||
f"{KS_LP_POINTS} pts × {KS_LP_SCALE*1e6:.0f} µs/div")
|
||
except Exception as e:
|
||
print(f" Keysight failed ({e}) — continuing without MIPI")
|
||
scope = None
|
||
else:
|
||
print(f" Keysight: disabled (--no-keysight)")
|
||
|
||
cycler_tail = CyclerLogTail()
|
||
if cycler_tail.path:
|
||
print(f" cycler log: {cycler_tail.path.name} (tailing for STOP events)")
|
||
else:
|
||
print(f" cycler log: NOT FOUND — capture will use {MAX_CAPTURE_S}s timeout per burst")
|
||
|
||
summary_path = session_dir / "summary.csv"
|
||
sf = open(summary_path, "w", newline="")
|
||
sw = csv.writer(sf)
|
||
sw.writerow(["burst", "ts", "iso_press", "iso_end", "duration_s",
|
||
"end_reason", "n_pll_samples", "n_unlocks",
|
||
"n_mipi_segs", "rail_vpp_mV", "rail_mean_V", "basename"])
|
||
sf.flush()
|
||
|
||
def _shutdown(*_):
|
||
try: sf.close()
|
||
except Exception: pass
|
||
print("\nshutting down")
|
||
sys.exit(0)
|
||
signal.signal(signal.SIGINT, _shutdown)
|
||
signal.signal(signal.SIGTERM, _shutdown)
|
||
|
||
print("\n Press `f` when you see flicker. `q` to quit.")
|
||
print(" Each press triggers a capture window from now until video_cycler")
|
||
print(f" next stops the video (or {MAX_CAPTURE_S:.0f}s timeout if no cycler).\n")
|
||
|
||
burst_n = 0
|
||
with KeyReader() as keys:
|
||
while True:
|
||
key = keys.get_key()
|
||
if key == "q":
|
||
_shutdown()
|
||
elif key == "f":
|
||
burst_n += 1
|
||
capture_burst(sess, scope, rigol, cycler_tail,
|
||
burst_n, session_dir, sw)
|
||
sf.flush()
|
||
print(f"\n ready for next press...\n", flush=True)
|
||
else:
|
||
time.sleep(0.05)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|