Files
MiPi_TEST/flicker_watch.py
2026-05-15 16:32:15 +01:00

595 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
flicker_watch.py — Continuous LP capture during video on/off cycles.
Operator watches the display. Script keeps cycling the video stream on/off
and triggering LP captures in the background. Files accumulate on the scope
without being transferred (fast).
Keys (no Enter needed):
f — flicker observed: transfer + archive + analyse recent captures
g — good baseline: transfer + archive recent captures (no analysis)
q — quit
Captures are organised under data/flicker/{event_ts}/ or data/good/{event_ts}/.
"""
import json
import select
import shutil
import sys
import termios
import time
import tty
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
from csv_preprocessor import analyze_lp_file
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
SCOPE_IP = "192.168.45.4"
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
DATA_DIR = Path(__file__).parent / "data"
FLICKER_DIR = DATA_DIR / "flicker"
GOOD_DIR = DATA_DIR / "good"
# Trigger mode:
# "LP_DAT" — falling-edge on DAT0+ (CH3) crossing 0.6 V. Fires on every
# LP-to-HS transition (≈ line rate, 48 kHz). Use to sample
# normal MIPI traffic and spot per-burst anomalies.
# "CLK_GLITCH" — timeout trigger on CLK+ (CH1) staying HIGH > N ms. Fires
# *only* when the clock lane goes LP for longer than expected,
# i.e. an actual glitch. Pairs with sn65_monitor.py to
# capture the wire-side view of a PLL-unlock event.
TRIGGER_MODE = "LP_DAT" # or "CLK_GLITCH"
# Increased from 1 ms to 100 ms. Earlier runs at 1 ms triggered on every
# V-blank (≈0.5/sec on this display) — far too often to be useful. The
# observed PLL-unlock event from sn65_monitor is ~150 ms, so 100 ms
# discriminates real unlocks from normal MIPI line/frame breaks.
CLK_GLITCH_HIGH_MS = 100.0 # CLK+ HIGH longer than this fires the trigger
# Capture window
# LP_DAT mode: 1 µs/div × 20 div = 20 µs window (50k pts → 5 GSa/s)
# CLK_GLITCH: 20 ms/div × 20 div = 400 ms window (200k pts → 500 kSa/s)
# wide enough to bracket a 150 ms event with margin on both sides
if TRIGGER_MODE == "CLK_GLITCH":
LP_SCALE = 20e-3
LP_POINTS = 200_000
LP_TRIG_OFFSET = 0.0 # centre the trigger so we see before+after
SEGMENT_COUNT = 1 # one big window per acquire is plenty
else:
LP_SCALE = 1e-6
LP_POINTS = 50_000
LP_TRIG_OFFSET = 9e-6
SEGMENT_COUNT = 100
LP_V_SCALE = 0.2
LP_V_OFFSET = 0.6
LP_TRIG_LEVEL = 0.6
CYCLE_S = 10.0
# CLK_GLITCH triggers can take many seconds (or never come) — give it the full
# cycle. LP_DAT triggers fill 100 segments in well under a second.
TRIG_TIMEOUT_S = CYCLE_S - 0.5 if TRIGGER_MODE == "CLK_GLITCH" \
else max(SEGMENT_COUNT * 0.020 + 10.0, 15.0)
# ---------------------------------------------------------------------------
# Scope setup
# ---------------------------------------------------------------------------
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30
def _drain_scpi_errors(label: str = "") -> list[str]:
"""Pop everything from the scope's error queue; return list of error strings."""
errs = []
for _ in range(20):
try:
r = scope.ask(":SYSTem:ERRor?").strip()
except Exception:
break
if not r or r.startswith("0,") or r.startswith("+0,") or r == "0":
break
errs.append(r)
if errs and label:
print(f" [{label}] SCPI errors: {errs}")
return errs
def setup_scope() -> None:
"""One-shot scope init — channels, math, default trigger."""
print("CONFIGURING SCOPE...")
try:
idn = scope.ask("*IDN?").strip()
print(f" IDN: {idn}")
except Exception as e:
print(f" IDN read failed: {e}")
cmds = [
"*RST", ":RUN", ":STOP", "*CLS",
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
":CHANnel1:LABel 'CLK+'",
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
":CHANnel2:LABel 'CLK-'",
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
":CHANnel3:LABel 'DAT0+'",
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
":CHANnel4:LABel 'DAT0-'",
":TIMebase:REFerence CENTer",
":TRIGger:MODE EDGE",
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
":DISPlay:LAYout STACKED",
]
for c in cmds:
scope.write(c)
time.sleep(0.05)
_drain_scpi_errors("setup_scope")
print("SCOPE READY.")
def _read_ieee_block() -> bytes:
"""
Read an IEEE 488.2 definite-length binary block from the scope:
'#' <ndigits> <length> <data> [\\n]
"""
# Read header: '#' then one digit telling us how many length-digits follow.
head = scope.read_raw(2)
if not head.startswith(b"#"):
# Sometimes vxi11 returns a longer chunk; locate the '#'
idx = head.find(b"#")
if idx < 0:
extra = scope.read_raw(64)
head = head + extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
if ndigits == 0:
# "#0..." indicates indefinite-length; read until newline.
return scope.read_raw().rstrip(b"\r\n")
length_bytes = scope.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = scope.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
# Discard the trailing newline if present
try:
scope.read_raw(1)
except Exception:
pass
return data
def configure_for_lp() -> None:
"""LP-mode capture, with trigger configured per TRIGGER_MODE."""
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
if TRIGGER_MODE == "CLK_GLITCH":
# Pulse-width (GLITch) trigger on the Infiniium A/B (firmware 5.x):
# fires at the falling edge of a CH1 (CLK+) HIGH pulse longer than
# CLK_GLITCH_HIGH_MS — i.e. CLK held LP-11 for an unusually long time.
# The newer :TRIGger:TIMeout:* SCPI is rejected by this scope (-113).
_drain_scpi_errors()
scope.write(":TRIGger:MODE GLITch")
scope.write(":TRIGger:GLITch:SOURce CHANnel1")
scope.write(":TRIGger:GLITch:POLarity POSitive")
scope.write(":TRIGger:GLITch:DIRection GREaterthan")
scope.write(f":TRIGger:GLITch:WIDTh {CLK_GLITCH_HIGH_MS * 1e-3:.3E}")
scope.write(f":TRIGger:GLITch:LEVel CHANnel1,{LP_TRIG_LEVEL:.3f}")
time.sleep(0.2)
errs = _drain_scpi_errors()
if errs:
print(f" GLITch trigger setup SCPI errors: {errs}")
try:
mode = scope.ask(":TRIGger:MODE?").strip()
w = scope.ask(":TRIGger:GLITch:WIDTh?").strip()
print(f" GLITch trigger: mode={mode} CLK+ HIGH > {float(w)*1000:.1f} ms")
except Exception as e:
print(f" GLITch trigger readback failed: {e}")
else:
# Edge trigger on falling DAT0+: fires on every LP-to-HS transition.
scope.write(":TRIGger:MODE EDGE")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {LP_POINTS}")
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
if SEGMENT_COUNT > 1:
scope.write(":ACQuire:MODE SEGMented")
scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}")
else:
scope.write(":ACQuire:MODE RTIMe")
time.sleep(0.5)
_drain_scpi_errors("configure_for_lp")
def arm_and_wait(timeout_s: float) -> bool:
""":DIGitize + *OPC?. Returns True if trigger fired within timeout."""
global scope
prev = scope.timeout
try:
scope.timeout = timeout_s + 2
scope.write(":DIGitize")
return scope.ask("*OPC?").strip() == "1"
except Exception:
# Trigger timed out or scope locked up — reconnect.
try:
scope.close()
except Exception:
pass
time.sleep(1.0)
scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30
try:
scope.write(":STOP")
except Exception:
pass
return False
finally:
try:
scope.timeout = prev
except Exception:
pass
def _fetch_channel_segments(channel: int, n_segments: int):
"""
Read all segments for one channel via :WAVeform:DATA?. Returns
(times_ndarray, list_of_volts_ndarrays). Time axis is shared across all
segments. When n_segments == 1 we skip the SEGMented:INDex select since
we may be in RTIMe (single-shot) mode rather than SEGMented mode.
"""
import numpy as np
scope.write(f":WAVeform:SOURce CHANnel{channel}")
scope.write(":WAVeform:FORMat WORD")
scope.write(":WAVeform:BYTeorder LSBFirst")
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
x_org = float(scope.ask(":WAVeform:XORigin?"))
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
y_org = float(scope.ask(":WAVeform:YORigin?"))
segs: list = []
for i in range(1, n_segments + 1):
if n_segments > 1:
scope.write(f":ACQuire:SEGMented:INDex {i}")
scope.write(":WAVeform:DATA?")
raw = _read_ieee_block()
codes = np.frombuffer(raw, dtype="<i2")
volts = codes.astype(np.float64) * y_inc + y_org
segs.append(volts)
n = len(segs[0]) if segs else 0
times = np.arange(n) * x_inc + x_org
return times, segs
def save_lp(base_name: str) -> tuple[bool, list[str]]:
"""
Read all N segments for CLK and DAT directly via VXI-11 binary transfer
and write per-segment CSVs locally to DATA_DIR.
Returns (ok, errs). Filenames match csv_preprocessor's expected pattern:
{base_name}_seg{NNN}_{clk|dat}.csv
"""
import numpy as np
_drain_scpi_errors()
try:
t_clk, clk_segs = _fetch_channel_segments(1, SEGMENT_COUNT)
t_dat, dat_segs = _fetch_channel_segments(3, SEGMENT_COUNT)
except Exception as e:
return (False, [f"fetch error: {e}"])
errs = _drain_scpi_errors()
n_written = 0
for i, (clk, dat) in enumerate(zip(clk_segs, dat_segs), start=1):
clk_path = DATA_DIR / f"{base_name}_seg{i:03d}_clk.csv"
dat_path = DATA_DIR / f"{base_name}_seg{i:03d}_dat.csv"
np.savetxt(clk_path, np.column_stack([t_clk, clk]),
delimiter=",", fmt="%.6e")
np.savetxt(dat_path, np.column_stack([t_dat, dat]),
delimiter=",", fmt="%.6e")
n_written += 1
if n_written == 0:
return (False, errs or ["no segments written"])
return (True, errs)
# ---------------------------------------------------------------------------
# Non-blocking keyboard
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# Video control
# ---------------------------------------------------------------------------
def video_start() -> None:
try:
requests.put(VIDEO_URL,
json={"action": "start", "mode": "static-pink"},
timeout=3)
except requests.exceptions.RequestException as e:
print(f" VIDEO START failed: {e}")
def video_stop() -> None:
try:
requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3)
except requests.exceptions.RequestException as e:
print(f" VIDEO STOP failed: {e}")
# ---------------------------------------------------------------------------
# Register snapshot from device (DSIM PHY + SN65DSI83)
# ---------------------------------------------------------------------------
def fetch_registers_snapshot(target_dir: Path, event_ts: str) -> None:
"""GET /registers + /sn65_registers, print key indicators, save JSON."""
combined: dict = {}
for endpoint, key in [("/registers", "dsim"),
("/sn65_registers", "sn65")]:
try:
r = requests.get(f"{DEVICE_BASE}{endpoint}", timeout=5)
r.raise_for_status()
combined[key] = r.json()
except Exception as e:
print(f" REGISTERS: {endpoint} failed — {e}")
combined[key] = None
# Quick-look indicators
sn65 = combined.get("sn65") or {}
regs = sn65.get("registers", {}) if isinstance(sn65, dict) else {}
csr_0a = regs.get("csr_0a", {}) or {}
csr_e5 = regs.get("csr_e5", {}) or {}
if csr_0a:
pll_str = "LOCKED" if csr_0a.get("pll_lock") else "*** UNLOCKED ***"
clk_str = "detected" if csr_0a.get("clk_det") else "NOT detected"
print(f" SN65: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a.get('value')})")
if csr_e5:
flags = [
("pll_unlock", "PLL_UNLOCK"),
("cha_sot_bit_err", "SOT_BIT_ERR"),
("cha_llp_err", "LLP_ERR"),
("cha_ecc_err", "ECC_ERR"),
("cha_lp_err", "LP_ERR"),
("cha_crc_err", "CRC_ERR"),
]
active = [label for k, label in flags if csr_e5.get(k)]
if active:
print(f" SN65: *** ERROR FLAGS: {', '.join(active)} "
f"(CSR 0xE5 = {csr_e5.get('value')}) ***")
else:
print(f" SN65: no error flags (CSR 0xE5 = {csr_e5.get('value')})")
out = target_dir / f"{event_ts}_registers.json"
try:
out.write_text(json.dumps(combined, indent=2))
print(f" registers → {out.relative_to(DATA_DIR.parent)}")
except Exception as e:
print(f" REGISTERS save failed: {e}")
# ---------------------------------------------------------------------------
# Event handling: archive recent captures and (for flicker) analyse
# ---------------------------------------------------------------------------
def archive_and_analyse(event: str, since_iso: str) -> None:
"""
Pull every CSV from the scope, move into data/{event}/{event_ts}/.
For flicker events, run csv_preprocessor on each LP capture and print a
summary table. Always pulls a register snapshot from the device too.
"""
event_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
target = (FLICKER_DIR if event == "flicker" else GOOD_DIR) / event_ts
target.mkdir(parents=True, exist_ok=True)
print(f"\n *** {event.upper()} EVENT @ {event_ts} ***")
# Register snapshot first (fast, before scope transfer which takes longer)
fetch_registers_snapshot(target, event_ts)
# Segment CSVs are already in DATA_DIR (written directly by save_lp via
# SCPI binary read). Just move the ones from this event into the folder.
moved = 0
for f in DATA_DIR.glob("*.csv"):
if f.is_file():
shutil.move(str(f), target / f.name)
moved += 1
print(f" {moved} segment CSV(s) archived to {target.relative_to(DATA_DIR.parent)}")
if event != "flicker":
return
# Analyse every segment CSV. Flag outliers.
print("\n Per-segment LP analysis:")
rows = []
for f in sorted(target.glob("*_lp_*_dat.csv")):
try:
m = analyze_lp_file(f)
rows.append({
"file": f.name,
"lp_low": float(m.lp_low_duration_ns) if m.lp_low_duration_ns is not None else None,
"hs_amp": float(m.hs_amplitude_mv) if m.hs_amplitude_mv is not None else None,
"hs_dur": float(m.hs_burst_dur_ns) if m.hs_burst_dur_ns is not None else None,
"n_burst": int(m.n_hs_bursts) if m.n_hs_bursts is not None else None,
"sus": bool(m.flicker_suspect),
})
except Exception as e:
rows.append({"file": f.name, "error": str(e)})
n_total = len(rows)
n_sus = sum(1 for r in rows if r.get("sus"))
print(f" {n_total} segments analysed ({n_sus} flagged as flicker_suspect)")
# Outlier search across the segments themselves.
def _outliers(field: str, lo_thresh: float | None = None,
hi_thresh: float | None = None) -> list[dict]:
vals = sorted(r[field] for r in rows if r.get(field) is not None)
if not vals:
return []
med = vals[len(vals) // 2]
out = []
for r in rows:
v = r.get(field)
if v is None: continue
far = (lo_thresh is not None and v < lo_thresh) or \
(hi_thresh is not None and v > hi_thresh)
if far:
out.append({"file": r["file"], field: v, "median": med})
return out
print("\n Anomalies vs segment-set median:")
for label, field, lo, hi in [
("very-short LP-low (<50 ns)", "lp_low", 50, None),
("very-low HS amplitude (<50 mV)", "hs_amp", 50, None),
("very-high HS amplitude (>140 mV)","hs_amp", None, 140),
("short HS burst (<8000 ns)", "hs_dur", 8000, None),
]:
ax = _outliers(field, lo, hi)
if ax:
print(f" {label}: {len(ax)} segment(s)")
for x in ax[:8]:
print(f" {x['file']} {field}={x[field]:.1f} "
f"(set median={x['median']:.1f})")
if len(ax) > 8:
print(f" ... +{len(ax) - 8} more")
else:
print(f" {label}: none")
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def main() -> None:
DATA_DIR.mkdir(exist_ok=True)
FLICKER_DIR.mkdir(exist_ok=True)
GOOD_DIR.mkdir(exist_ok=True)
setup_scope()
configure_for_lp()
print("\n" + "=" * 64)
print(" FLICKER WATCH — keys: f=flicker g=good q=quit")
print("=" * 64 + "\n")
cycle = 0
try:
with KeyReader() as keys:
while True:
cycle += 1
cycle_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
cycle_caps = []
cycle_end = time.time() + CYCLE_S
video_start()
mode_desc = (
f"CLK_GLITCH (CLK+ HIGH > {CLK_GLITCH_HIGH_MS:.1f} ms, "
f"{LP_SCALE * 20 * 1000:.0f} ms window)"
if TRIGGER_MODE == "CLK_GLITCH"
else f"LP_DAT ({SEGMENT_COUNT} segs/acquire)"
)
print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON "
f"({CYCLE_S:.0f}s window, {mode_desc})", flush=True)
event = None
last_tick = 0.0
while time.time() < cycle_end:
seq = len(cycle_caps) + 1
base = f"{cycle_ts}_lp_c{cycle:03d}_{seq:02d}"
remaining = lambda: max(0, cycle_end - time.time())
if arm_and_wait(TRIG_TIMEOUT_S):
try:
ok, errs = save_lp(base)
if ok:
cycle_caps.append(base)
tag = ("CLK GLITCH" if TRIGGER_MODE == "CLK_GLITCH"
else f"{SEGMENT_COUNT} segs")
print(f" + acq {seq:02d} ({tag}) "
f"[{remaining():4.1f}s left]",
flush=True)
else:
print(f" ! acq {seq:02d} SAVE FAILED — "
f"{errs[0][:80] if errs else 'unknown'}",
flush=True)
except Exception as e:
print(f" save error: {e}", flush=True)
else:
# Trigger timed out — print a heartbeat at most every 2s.
# In CLK_GLITCH mode this is the *normal* state: it just
# means no glitch happened during this cycle.
if time.time() - last_tick > 2.0:
msg = ("waiting for CLK glitch"
if TRIGGER_MODE == "CLK_GLITCH"
else "waiting for trigger")
print(f" ... {msg} "
f"[{remaining():4.1f}s left]", flush=True)
last_tick = time.time()
key = keys.get_key()
if key in ("f", "g", "q"):
event = key
break
video_stop()
if event is None:
print(f"[cycle {cycle:03d}] ended "
f"({len(cycle_caps)} acq(s) ≈ "
f"{len(cycle_caps) * SEGMENT_COUNT} segments, no event)",
flush=True)
if event == "f":
archive_and_analyse("flicker", cycle_ts)
elif event == "g":
archive_and_analyse("good", cycle_ts)
elif event == "q":
print("\nQUIT requested.")
break
# Brief pause before next cycle so video stop settles.
time.sleep(0.5)
except KeyboardInterrupt:
print("\nInterrupted (Ctrl+C).")
finally:
try:
video_stop()
except Exception:
pass
if __name__ == "__main__":
main()