595 lines
22 KiB
Python
595 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
flicker_watch.py — Continuous LP capture during video on/off cycles.
|
||
|
||
Operator watches the display. Script keeps cycling the video stream on/off
|
||
and triggering LP captures in the background. Files accumulate on the scope
|
||
without being transferred (fast).
|
||
|
||
Keys (no Enter needed):
|
||
f — flicker observed: transfer + archive + analyse recent captures
|
||
g — good baseline: transfer + archive recent captures (no analysis)
|
||
q — quit
|
||
|
||
Captures are organised under data/flicker/{event_ts}/ or data/good/{event_ts}/.
|
||
"""
|
||
|
||
import json
|
||
import select
|
||
import shutil
|
||
import sys
|
||
import termios
|
||
import time
|
||
import tty
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import requests
|
||
import vxi11
|
||
|
||
from csv_preprocessor import analyze_lp_file
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
SCOPE_IP = "192.168.45.4"
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
VIDEO_URL = f"{DEVICE_BASE}/video"
|
||
|
||
DATA_DIR = Path(__file__).parent / "data"
|
||
FLICKER_DIR = DATA_DIR / "flicker"
|
||
GOOD_DIR = DATA_DIR / "good"
|
||
|
||
# Trigger mode:
|
||
# "LP_DAT" — falling-edge on DAT0+ (CH3) crossing 0.6 V. Fires on every
|
||
# LP-to-HS transition (≈ line rate, 48 kHz). Use to sample
|
||
# normal MIPI traffic and spot per-burst anomalies.
|
||
# "CLK_GLITCH" — timeout trigger on CLK+ (CH1) staying HIGH > N ms. Fires
|
||
# *only* when the clock lane goes LP for longer than expected,
|
||
# i.e. an actual glitch. Pairs with sn65_monitor.py to
|
||
# capture the wire-side view of a PLL-unlock event.
|
||
TRIGGER_MODE = "LP_DAT" # or "CLK_GLITCH"
|
||
# Increased from 1 ms to 100 ms. Earlier runs at 1 ms triggered on every
|
||
# V-blank (≈0.5/sec on this display) — far too often to be useful. The
|
||
# observed PLL-unlock event from sn65_monitor is ~150 ms, so 100 ms
|
||
# discriminates real unlocks from normal MIPI line/frame breaks.
|
||
CLK_GLITCH_HIGH_MS = 100.0 # CLK+ HIGH longer than this fires the trigger
|
||
|
||
# Capture window
|
||
# LP_DAT mode: 1 µs/div × 20 div = 20 µs window (50k pts → 5 GSa/s)
|
||
# CLK_GLITCH: 20 ms/div × 20 div = 400 ms window (200k pts → 500 kSa/s)
|
||
# wide enough to bracket a 150 ms event with margin on both sides
|
||
if TRIGGER_MODE == "CLK_GLITCH":
|
||
LP_SCALE = 20e-3
|
||
LP_POINTS = 200_000
|
||
LP_TRIG_OFFSET = 0.0 # centre the trigger so we see before+after
|
||
SEGMENT_COUNT = 1 # one big window per acquire is plenty
|
||
else:
|
||
LP_SCALE = 1e-6
|
||
LP_POINTS = 50_000
|
||
LP_TRIG_OFFSET = 9e-6
|
||
SEGMENT_COUNT = 100
|
||
|
||
LP_V_SCALE = 0.2
|
||
LP_V_OFFSET = 0.6
|
||
LP_TRIG_LEVEL = 0.6
|
||
|
||
CYCLE_S = 10.0
|
||
# CLK_GLITCH triggers can take many seconds (or never come) — give it the full
|
||
# cycle. LP_DAT triggers fill 100 segments in well under a second.
|
||
TRIG_TIMEOUT_S = CYCLE_S - 0.5 if TRIGGER_MODE == "CLK_GLITCH" \
|
||
else max(SEGMENT_COUNT * 0.020 + 10.0, 15.0)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Scope setup
|
||
# ---------------------------------------------------------------------------
|
||
scope = vxi11.Instrument(SCOPE_IP)
|
||
scope.timeout = 30
|
||
|
||
|
||
def _drain_scpi_errors(label: str = "") -> list[str]:
|
||
"""Pop everything from the scope's error queue; return list of error strings."""
|
||
errs = []
|
||
for _ in range(20):
|
||
try:
|
||
r = scope.ask(":SYSTem:ERRor?").strip()
|
||
except Exception:
|
||
break
|
||
if not r or r.startswith("0,") or r.startswith("+0,") or r == "0":
|
||
break
|
||
errs.append(r)
|
||
if errs and label:
|
||
print(f" [{label}] SCPI errors: {errs}")
|
||
return errs
|
||
|
||
|
||
def setup_scope() -> None:
|
||
"""One-shot scope init — channels, math, default trigger."""
|
||
print("CONFIGURING SCOPE...")
|
||
try:
|
||
idn = scope.ask("*IDN?").strip()
|
||
print(f" IDN: {idn}")
|
||
except Exception as e:
|
||
print(f" IDN read failed: {e}")
|
||
cmds = [
|
||
"*RST", ":RUN", ":STOP", "*CLS",
|
||
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
|
||
":CHANnel1:LABel 'CLK+'",
|
||
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
|
||
":CHANnel2:LABel 'CLK-'",
|
||
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
|
||
":CHANnel3:LABel 'DAT0+'",
|
||
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
|
||
":CHANnel4:LABel 'DAT0-'",
|
||
":TIMebase:REFerence CENTer",
|
||
":TRIGger:MODE EDGE",
|
||
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
|
||
":DISPlay:LAYout STACKED",
|
||
]
|
||
for c in cmds:
|
||
scope.write(c)
|
||
time.sleep(0.05)
|
||
_drain_scpi_errors("setup_scope")
|
||
print("SCOPE READY.")
|
||
|
||
|
||
def _read_ieee_block() -> bytes:
|
||
"""
|
||
Read an IEEE 488.2 definite-length binary block from the scope:
|
||
'#' <ndigits> <length> <data> [\\n]
|
||
"""
|
||
# Read header: '#' then one digit telling us how many length-digits follow.
|
||
head = scope.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
# Sometimes vxi11 returns a longer chunk; locate the '#'
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = scope.read_raw(64)
|
||
head = head + extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
if ndigits == 0:
|
||
# "#0..." indicates indefinite-length; read until newline.
|
||
return scope.read_raw().rstrip(b"\r\n")
|
||
length_bytes = scope.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = scope.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
# Discard the trailing newline if present
|
||
try:
|
||
scope.read_raw(1)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def configure_for_lp() -> None:
|
||
"""LP-mode capture, with trigger configured per TRIGGER_MODE."""
|
||
for ch in (1, 2, 3, 4):
|
||
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
|
||
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
|
||
|
||
if TRIGGER_MODE == "CLK_GLITCH":
|
||
# Pulse-width (GLITch) trigger on the Infiniium A/B (firmware 5.x):
|
||
# fires at the falling edge of a CH1 (CLK+) HIGH pulse longer than
|
||
# CLK_GLITCH_HIGH_MS — i.e. CLK held LP-11 for an unusually long time.
|
||
# The newer :TRIGger:TIMeout:* SCPI is rejected by this scope (-113).
|
||
_drain_scpi_errors()
|
||
scope.write(":TRIGger:MODE GLITch")
|
||
scope.write(":TRIGger:GLITch:SOURce CHANnel1")
|
||
scope.write(":TRIGger:GLITch:POLarity POSitive")
|
||
scope.write(":TRIGger:GLITch:DIRection GREaterthan")
|
||
scope.write(f":TRIGger:GLITch:WIDTh {CLK_GLITCH_HIGH_MS * 1e-3:.3E}")
|
||
scope.write(f":TRIGger:GLITch:LEVel CHANnel1,{LP_TRIG_LEVEL:.3f}")
|
||
time.sleep(0.2)
|
||
errs = _drain_scpi_errors()
|
||
if errs:
|
||
print(f" GLITch trigger setup SCPI errors: {errs}")
|
||
try:
|
||
mode = scope.ask(":TRIGger:MODE?").strip()
|
||
w = scope.ask(":TRIGger:GLITch:WIDTh?").strip()
|
||
print(f" GLITch trigger: mode={mode} CLK+ HIGH > {float(w)*1000:.1f} ms")
|
||
except Exception as e:
|
||
print(f" GLITch trigger readback failed: {e}")
|
||
else:
|
||
# Edge trigger on falling DAT0+: fires on every LP-to-HS transition.
|
||
scope.write(":TRIGger:MODE EDGE")
|
||
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
|
||
|
||
scope.write(":TRIGger:SWEep NORMal")
|
||
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
|
||
scope.write(f":ACQuire:POINts {LP_POINTS}")
|
||
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
|
||
|
||
if SEGMENT_COUNT > 1:
|
||
scope.write(":ACQuire:MODE SEGMented")
|
||
scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}")
|
||
else:
|
||
scope.write(":ACQuire:MODE RTIMe")
|
||
|
||
time.sleep(0.5)
|
||
_drain_scpi_errors("configure_for_lp")
|
||
|
||
|
||
def arm_and_wait(timeout_s: float) -> bool:
|
||
""":DIGitize + *OPC?. Returns True if trigger fired within timeout."""
|
||
global scope
|
||
prev = scope.timeout
|
||
try:
|
||
scope.timeout = timeout_s + 2
|
||
scope.write(":DIGitize")
|
||
return scope.ask("*OPC?").strip() == "1"
|
||
except Exception:
|
||
# Trigger timed out or scope locked up — reconnect.
|
||
try:
|
||
scope.close()
|
||
except Exception:
|
||
pass
|
||
time.sleep(1.0)
|
||
scope = vxi11.Instrument(SCOPE_IP)
|
||
scope.timeout = 30
|
||
try:
|
||
scope.write(":STOP")
|
||
except Exception:
|
||
pass
|
||
return False
|
||
finally:
|
||
try:
|
||
scope.timeout = prev
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _fetch_channel_segments(channel: int, n_segments: int):
|
||
"""
|
||
Read all segments for one channel via :WAVeform:DATA?. Returns
|
||
(times_ndarray, list_of_volts_ndarrays). Time axis is shared across all
|
||
segments. When n_segments == 1 we skip the SEGMented:INDex select since
|
||
we may be in RTIMe (single-shot) mode rather than SEGMented mode.
|
||
"""
|
||
import numpy as np
|
||
scope.write(f":WAVeform:SOURce CHANnel{channel}")
|
||
scope.write(":WAVeform:FORMat WORD")
|
||
scope.write(":WAVeform:BYTeorder LSBFirst")
|
||
|
||
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
|
||
x_org = float(scope.ask(":WAVeform:XORigin?"))
|
||
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
|
||
y_org = float(scope.ask(":WAVeform:YORigin?"))
|
||
|
||
segs: list = []
|
||
for i in range(1, n_segments + 1):
|
||
if n_segments > 1:
|
||
scope.write(f":ACQuire:SEGMented:INDex {i}")
|
||
scope.write(":WAVeform:DATA?")
|
||
raw = _read_ieee_block()
|
||
codes = np.frombuffer(raw, dtype="<i2")
|
||
volts = codes.astype(np.float64) * y_inc + y_org
|
||
segs.append(volts)
|
||
|
||
n = len(segs[0]) if segs else 0
|
||
times = np.arange(n) * x_inc + x_org
|
||
return times, segs
|
||
|
||
|
||
def save_lp(base_name: str) -> tuple[bool, list[str]]:
|
||
"""
|
||
Read all N segments for CLK and DAT directly via VXI-11 binary transfer
|
||
and write per-segment CSVs locally to DATA_DIR.
|
||
|
||
Returns (ok, errs). Filenames match csv_preprocessor's expected pattern:
|
||
{base_name}_seg{NNN}_{clk|dat}.csv
|
||
"""
|
||
import numpy as np
|
||
_drain_scpi_errors()
|
||
try:
|
||
t_clk, clk_segs = _fetch_channel_segments(1, SEGMENT_COUNT)
|
||
t_dat, dat_segs = _fetch_channel_segments(3, SEGMENT_COUNT)
|
||
except Exception as e:
|
||
return (False, [f"fetch error: {e}"])
|
||
|
||
errs = _drain_scpi_errors()
|
||
|
||
n_written = 0
|
||
for i, (clk, dat) in enumerate(zip(clk_segs, dat_segs), start=1):
|
||
clk_path = DATA_DIR / f"{base_name}_seg{i:03d}_clk.csv"
|
||
dat_path = DATA_DIR / f"{base_name}_seg{i:03d}_dat.csv"
|
||
np.savetxt(clk_path, np.column_stack([t_clk, clk]),
|
||
delimiter=",", fmt="%.6e")
|
||
np.savetxt(dat_path, np.column_stack([t_dat, dat]),
|
||
delimiter=",", fmt="%.6e")
|
||
n_written += 1
|
||
|
||
if n_written == 0:
|
||
return (False, errs or ["no segments written"])
|
||
return (True, errs)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Non-blocking keyboard
|
||
# ---------------------------------------------------------------------------
|
||
class KeyReader:
|
||
def __enter__(self):
|
||
self.fd = sys.stdin.fileno()
|
||
self.old = termios.tcgetattr(self.fd)
|
||
tty.setcbreak(self.fd)
|
||
return self
|
||
|
||
def get_key(self) -> str | None:
|
||
if select.select([sys.stdin], [], [], 0)[0]:
|
||
return sys.stdin.read(1).lower()
|
||
return None
|
||
|
||
def __exit__(self, *_):
|
||
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Video control
|
||
# ---------------------------------------------------------------------------
|
||
def video_start() -> None:
|
||
try:
|
||
requests.put(VIDEO_URL,
|
||
json={"action": "start", "mode": "static-pink"},
|
||
timeout=3)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" VIDEO START failed: {e}")
|
||
|
||
|
||
def video_stop() -> None:
|
||
try:
|
||
requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" VIDEO STOP failed: {e}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Register snapshot from device (DSIM PHY + SN65DSI83)
|
||
# ---------------------------------------------------------------------------
|
||
def fetch_registers_snapshot(target_dir: Path, event_ts: str) -> None:
|
||
"""GET /registers + /sn65_registers, print key indicators, save JSON."""
|
||
combined: dict = {}
|
||
for endpoint, key in [("/registers", "dsim"),
|
||
("/sn65_registers", "sn65")]:
|
||
try:
|
||
r = requests.get(f"{DEVICE_BASE}{endpoint}", timeout=5)
|
||
r.raise_for_status()
|
||
combined[key] = r.json()
|
||
except Exception as e:
|
||
print(f" REGISTERS: {endpoint} failed — {e}")
|
||
combined[key] = None
|
||
|
||
# Quick-look indicators
|
||
sn65 = combined.get("sn65") or {}
|
||
regs = sn65.get("registers", {}) if isinstance(sn65, dict) else {}
|
||
csr_0a = regs.get("csr_0a", {}) or {}
|
||
csr_e5 = regs.get("csr_e5", {}) or {}
|
||
|
||
if csr_0a:
|
||
pll_str = "LOCKED" if csr_0a.get("pll_lock") else "*** UNLOCKED ***"
|
||
clk_str = "detected" if csr_0a.get("clk_det") else "NOT detected"
|
||
print(f" SN65: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a.get('value')})")
|
||
|
||
if csr_e5:
|
||
flags = [
|
||
("pll_unlock", "PLL_UNLOCK"),
|
||
("cha_sot_bit_err", "SOT_BIT_ERR"),
|
||
("cha_llp_err", "LLP_ERR"),
|
||
("cha_ecc_err", "ECC_ERR"),
|
||
("cha_lp_err", "LP_ERR"),
|
||
("cha_crc_err", "CRC_ERR"),
|
||
]
|
||
active = [label for k, label in flags if csr_e5.get(k)]
|
||
if active:
|
||
print(f" SN65: *** ERROR FLAGS: {', '.join(active)} "
|
||
f"(CSR 0xE5 = {csr_e5.get('value')}) ***")
|
||
else:
|
||
print(f" SN65: no error flags (CSR 0xE5 = {csr_e5.get('value')})")
|
||
|
||
out = target_dir / f"{event_ts}_registers.json"
|
||
try:
|
||
out.write_text(json.dumps(combined, indent=2))
|
||
print(f" registers → {out.relative_to(DATA_DIR.parent)}")
|
||
except Exception as e:
|
||
print(f" REGISTERS save failed: {e}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Event handling: archive recent captures and (for flicker) analyse
|
||
# ---------------------------------------------------------------------------
|
||
def archive_and_analyse(event: str, since_iso: str) -> None:
|
||
"""
|
||
Pull every CSV from the scope, move into data/{event}/{event_ts}/.
|
||
For flicker events, run csv_preprocessor on each LP capture and print a
|
||
summary table. Always pulls a register snapshot from the device too.
|
||
"""
|
||
event_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
target = (FLICKER_DIR if event == "flicker" else GOOD_DIR) / event_ts
|
||
target.mkdir(parents=True, exist_ok=True)
|
||
|
||
print(f"\n *** {event.upper()} EVENT @ {event_ts} ***")
|
||
|
||
# Register snapshot first (fast, before scope transfer which takes longer)
|
||
fetch_registers_snapshot(target, event_ts)
|
||
|
||
# Segment CSVs are already in DATA_DIR (written directly by save_lp via
|
||
# SCPI binary read). Just move the ones from this event into the folder.
|
||
moved = 0
|
||
for f in DATA_DIR.glob("*.csv"):
|
||
if f.is_file():
|
||
shutil.move(str(f), target / f.name)
|
||
moved += 1
|
||
print(f" {moved} segment CSV(s) archived to {target.relative_to(DATA_DIR.parent)}")
|
||
|
||
if event != "flicker":
|
||
return
|
||
|
||
# Analyse every segment CSV. Flag outliers.
|
||
print("\n Per-segment LP analysis:")
|
||
rows = []
|
||
for f in sorted(target.glob("*_lp_*_dat.csv")):
|
||
try:
|
||
m = analyze_lp_file(f)
|
||
rows.append({
|
||
"file": f.name,
|
||
"lp_low": float(m.lp_low_duration_ns) if m.lp_low_duration_ns is not None else None,
|
||
"hs_amp": float(m.hs_amplitude_mv) if m.hs_amplitude_mv is not None else None,
|
||
"hs_dur": float(m.hs_burst_dur_ns) if m.hs_burst_dur_ns is not None else None,
|
||
"n_burst": int(m.n_hs_bursts) if m.n_hs_bursts is not None else None,
|
||
"sus": bool(m.flicker_suspect),
|
||
})
|
||
except Exception as e:
|
||
rows.append({"file": f.name, "error": str(e)})
|
||
|
||
n_total = len(rows)
|
||
n_sus = sum(1 for r in rows if r.get("sus"))
|
||
print(f" {n_total} segments analysed ({n_sus} flagged as flicker_suspect)")
|
||
|
||
# Outlier search across the segments themselves.
|
||
def _outliers(field: str, lo_thresh: float | None = None,
|
||
hi_thresh: float | None = None) -> list[dict]:
|
||
vals = sorted(r[field] for r in rows if r.get(field) is not None)
|
||
if not vals:
|
||
return []
|
||
med = vals[len(vals) // 2]
|
||
out = []
|
||
for r in rows:
|
||
v = r.get(field)
|
||
if v is None: continue
|
||
far = (lo_thresh is not None and v < lo_thresh) or \
|
||
(hi_thresh is not None and v > hi_thresh)
|
||
if far:
|
||
out.append({"file": r["file"], field: v, "median": med})
|
||
return out
|
||
|
||
print("\n Anomalies vs segment-set median:")
|
||
for label, field, lo, hi in [
|
||
("very-short LP-low (<50 ns)", "lp_low", 50, None),
|
||
("very-low HS amplitude (<50 mV)", "hs_amp", 50, None),
|
||
("very-high HS amplitude (>140 mV)","hs_amp", None, 140),
|
||
("short HS burst (<8000 ns)", "hs_dur", 8000, None),
|
||
]:
|
||
ax = _outliers(field, lo, hi)
|
||
if ax:
|
||
print(f" {label}: {len(ax)} segment(s)")
|
||
for x in ax[:8]:
|
||
print(f" {x['file']} {field}={x[field]:.1f} "
|
||
f"(set median={x['median']:.1f})")
|
||
if len(ax) > 8:
|
||
print(f" ... +{len(ax) - 8} more")
|
||
else:
|
||
print(f" {label}: none")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main loop
|
||
# ---------------------------------------------------------------------------
|
||
def main() -> None:
|
||
DATA_DIR.mkdir(exist_ok=True)
|
||
FLICKER_DIR.mkdir(exist_ok=True)
|
||
GOOD_DIR.mkdir(exist_ok=True)
|
||
|
||
setup_scope()
|
||
configure_for_lp()
|
||
|
||
print("\n" + "=" * 64)
|
||
print(" FLICKER WATCH — keys: f=flicker g=good q=quit")
|
||
print("=" * 64 + "\n")
|
||
|
||
cycle = 0
|
||
try:
|
||
with KeyReader() as keys:
|
||
while True:
|
||
cycle += 1
|
||
cycle_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
cycle_caps = []
|
||
cycle_end = time.time() + CYCLE_S
|
||
|
||
video_start()
|
||
mode_desc = (
|
||
f"CLK_GLITCH (CLK+ HIGH > {CLK_GLITCH_HIGH_MS:.1f} ms, "
|
||
f"{LP_SCALE * 20 * 1000:.0f} ms window)"
|
||
if TRIGGER_MODE == "CLK_GLITCH"
|
||
else f"LP_DAT ({SEGMENT_COUNT} segs/acquire)"
|
||
)
|
||
print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON "
|
||
f"({CYCLE_S:.0f}s window, {mode_desc})", flush=True)
|
||
|
||
event = None
|
||
last_tick = 0.0
|
||
while time.time() < cycle_end:
|
||
seq = len(cycle_caps) + 1
|
||
base = f"{cycle_ts}_lp_c{cycle:03d}_{seq:02d}"
|
||
remaining = lambda: max(0, cycle_end - time.time())
|
||
|
||
if arm_and_wait(TRIG_TIMEOUT_S):
|
||
try:
|
||
ok, errs = save_lp(base)
|
||
if ok:
|
||
cycle_caps.append(base)
|
||
tag = ("CLK GLITCH" if TRIGGER_MODE == "CLK_GLITCH"
|
||
else f"{SEGMENT_COUNT} segs")
|
||
print(f" + acq {seq:02d} ({tag}) "
|
||
f"[{remaining():4.1f}s left]",
|
||
flush=True)
|
||
else:
|
||
print(f" ! acq {seq:02d} SAVE FAILED — "
|
||
f"{errs[0][:80] if errs else 'unknown'}",
|
||
flush=True)
|
||
except Exception as e:
|
||
print(f" save error: {e}", flush=True)
|
||
else:
|
||
# Trigger timed out — print a heartbeat at most every 2s.
|
||
# In CLK_GLITCH mode this is the *normal* state: it just
|
||
# means no glitch happened during this cycle.
|
||
if time.time() - last_tick > 2.0:
|
||
msg = ("waiting for CLK glitch"
|
||
if TRIGGER_MODE == "CLK_GLITCH"
|
||
else "waiting for trigger")
|
||
print(f" ... {msg} "
|
||
f"[{remaining():4.1f}s left]", flush=True)
|
||
last_tick = time.time()
|
||
|
||
key = keys.get_key()
|
||
if key in ("f", "g", "q"):
|
||
event = key
|
||
break
|
||
|
||
video_stop()
|
||
if event is None:
|
||
print(f"[cycle {cycle:03d}] ended "
|
||
f"({len(cycle_caps)} acq(s) ≈ "
|
||
f"{len(cycle_caps) * SEGMENT_COUNT} segments, no event)",
|
||
flush=True)
|
||
|
||
if event == "f":
|
||
archive_and_analyse("flicker", cycle_ts)
|
||
elif event == "g":
|
||
archive_and_analyse("good", cycle_ts)
|
||
elif event == "q":
|
||
print("\nQUIT requested.")
|
||
break
|
||
|
||
# Brief pause before next cycle so video stop settles.
|
||
time.sleep(0.5)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\nInterrupted (Ctrl+C).")
|
||
finally:
|
||
try:
|
||
video_stop()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|