This commit is contained in:
david rice
2026-05-11 08:21:34 +01:00
parent 75248c9574
commit 8d8df1e7a7
4 changed files with 564 additions and 110 deletions

View File

@@ -24,10 +24,10 @@ import tty
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import numpy as np
import requests import requests
import vxi11 import vxi11
import ai_mgmt
from csv_preprocessor import analyze_lp_file from csv_preprocessor import analyze_lp_file
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -41,24 +41,45 @@ DATA_DIR = Path(__file__).parent / "data"
FLICKER_DIR = DATA_DIR / "flicker" FLICKER_DIR = DATA_DIR / "flicker"
GOOD_DIR = DATA_DIR / "good" GOOD_DIR = DATA_DIR / "good"
# LP capture parameters (matched to mipi_test_interactive.py) # Trigger mode:
LP_SCALE = 1e-6 # 1 µs/div → 20 µs window # "LP_DAT" — falling-edge on DAT0+ (CH3) crossing 0.6 V. Fires on every
LP_POINTS = 200_000 # LP-to-HS transition (≈ line rate, 48 kHz). Use to sample
LP_TRIG_OFFSET = 9e-6 # 1 µs pre / 19 µs post-trigger # normal MIPI traffic and spot per-burst anomalies.
# "CLK_GLITCH" — timeout trigger on CLK+ (CH1) staying HIGH > N ms. Fires
# *only* when the clock lane goes LP for longer than expected,
# i.e. an actual glitch. Pairs with sn65_monitor.py to
# capture the wire-side view of a PLL-unlock event.
TRIGGER_MODE = "CLK_GLITCH" # or "LP_DAT"
# Increased from 1 ms to 100 ms. Earlier runs at 1 ms triggered on every
# V-blank (≈0.5/sec on this display) — far too often to be useful. The
# observed PLL-unlock event from sn65_monitor is ~150 ms, so 100 ms
# discriminates real unlocks from normal MIPI line/frame breaks.
CLK_GLITCH_HIGH_MS = 100.0 # CLK+ HIGH longer than this fires the trigger
# Capture window
# LP_DAT mode: 1 µs/div × 20 div = 20 µs window (50k pts → 5 GSa/s)
# CLK_GLITCH: 20 ms/div × 20 div = 400 ms window (200k pts → 500 kSa/s)
# wide enough to bracket a 150 ms event with margin on both sides
if TRIGGER_MODE == "CLK_GLITCH":
LP_SCALE = 20e-3
LP_POINTS = 200_000
LP_TRIG_OFFSET = 0.0 # centre the trigger so we see before+after
SEGMENT_COUNT = 1 # one big window per acquire is plenty
else:
LP_SCALE = 1e-6
LP_POINTS = 50_000
LP_TRIG_OFFSET = 9e-6
SEGMENT_COUNT = 100
LP_V_SCALE = 0.2 LP_V_SCALE = 0.2
LP_V_OFFSET = 0.6 LP_V_OFFSET = 0.6
LP_TRIG_LEVEL = 0.6 LP_TRIG_LEVEL = 0.6
# Segmented memory: capture N back-to-back LP triggers per :DIGitize, then CYCLE_S = 10.0
# dump the whole acquisition as a single H5 file. Massively higher coverage # CLK_GLITCH triggers can take many seconds (or never come) — give it the full
# than single-shot CSV captures. # cycle. LP_DAT triggers fill 100 segments in well under a second.
SEGMENT_COUNT = 100 TRIG_TIMEOUT_S = CYCLE_S - 0.5 if TRIGGER_MODE == "CLK_GLITCH" \
SAVE_FORMAT = "H5" # Keysight native multi-segment format else max(SEGMENT_COUNT * 0.020 + 10.0, 15.0)
CYCLE_S = 10.0 # seconds video is on per cycle
# Filling N segments takes ~N × LP-trigger period. LP triggers fire roughly
# at line rate (≈48 kHz) so 100 segments fill in ms, but allow margin.
TRIG_TIMEOUT_S = max(SEGMENT_COUNT * 0.020 + 5.0, 10.0)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Scope setup # Scope setup
@@ -67,11 +88,32 @@ scope = vxi11.Instrument(SCOPE_IP)
scope.timeout = 30 scope.timeout = 30
def _drain_scpi_errors(label: str = "") -> list[str]:
"""Pop everything from the scope's error queue; return list of error strings."""
errs = []
for _ in range(20):
try:
r = scope.ask(":SYSTem:ERRor?").strip()
except Exception:
break
if not r or r.startswith("0,") or r.startswith("+0,") or r == "0":
break
errs.append(r)
if errs and label:
print(f" [{label}] SCPI errors: {errs}")
return errs
def setup_scope() -> None: def setup_scope() -> None:
"""One-shot scope init — channels, math, default trigger.""" """One-shot scope init — channels, math, default trigger."""
print("CONFIGURING SCOPE...") print("CONFIGURING SCOPE...")
try:
idn = scope.ask("*IDN?").strip()
print(f" IDN: {idn}")
except Exception as e:
print(f" IDN read failed: {e}")
cmds = [ cmds = [
"*RST", ":RUN", ":STOP", "*RST", ":RUN", ":STOP", "*CLS",
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
":CHANnel1:LABel 'CLK+'", ":CHANnel1:LABel 'CLK+'",
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
@@ -88,25 +130,93 @@ def setup_scope() -> None:
for c in cmds: for c in cmds:
scope.write(c) scope.write(c)
time.sleep(0.05) time.sleep(0.05)
_drain_scpi_errors("setup_scope")
print("SCOPE READY.") print("SCOPE READY.")
def _read_ieee_block() -> bytes:
"""
Read an IEEE 488.2 definite-length binary block from the scope:
'#' <ndigits> <length> <data> [\\n]
"""
# Read header: '#' then one digit telling us how many length-digits follow.
head = scope.read_raw(2)
if not head.startswith(b"#"):
# Sometimes vxi11 returns a longer chunk; locate the '#'
idx = head.find(b"#")
if idx < 0:
extra = scope.read_raw(64)
head = head + extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
if ndigits == 0:
# "#0..." indicates indefinite-length; read until newline.
return scope.read_raw().rstrip(b"\r\n")
length_bytes = scope.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = scope.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
# Discard the trailing newline if present
try:
scope.read_raw(1)
except Exception:
pass
return data
def configure_for_lp() -> None: def configure_for_lp() -> None:
"""LP-mode + segmented memory: N back-to-back LP triggers per acquisition.""" """LP-mode capture, with trigger configured per TRIGGER_MODE."""
for ch in (1, 2, 3, 4): for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative") if TRIGGER_MODE == "CLK_GLITCH":
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") # Pulse-width (GLITch) trigger on the Infiniium A/B (firmware 5.x):
# fires at the falling edge of a CH1 (CLK+) HIGH pulse longer than
# CLK_GLITCH_HIGH_MS — i.e. CLK held LP-11 for an unusually long time.
# The newer :TRIGger:TIMeout:* SCPI is rejected by this scope (-113).
_drain_scpi_errors()
scope.write(":TRIGger:MODE GLITch")
scope.write(":TRIGger:GLITch:SOURce CHANnel1")
scope.write(":TRIGger:GLITch:POLarity POSitive")
scope.write(":TRIGger:GLITch:DIRection GREaterthan")
scope.write(f":TRIGger:GLITch:WIDTh {CLK_GLITCH_HIGH_MS * 1e-3:.3E}")
scope.write(f":TRIGger:GLITch:LEVel CHANnel1,{LP_TRIG_LEVEL:.3f}")
time.sleep(0.2)
errs = _drain_scpi_errors()
if errs:
print(f" GLITch trigger setup SCPI errors: {errs}")
try:
mode = scope.ask(":TRIGger:MODE?").strip()
w = scope.ask(":TRIGger:GLITch:WIDTh?").strip()
print(f" GLITch trigger: mode={mode} CLK+ HIGH > {float(w)*1000:.1f} ms")
except Exception as e:
print(f" GLITch trigger readback failed: {e}")
else:
# Edge trigger on falling DAT0+: fires on every LP-to-HS transition.
scope.write(":TRIGger:MODE EDGE")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal") scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}") scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {LP_POINTS}") scope.write(f":ACQuire:POINts {LP_POINTS}")
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
# Segmented memory: fill N segments per :DIGitize.
scope.write(":ACQuire:MODE SEGMented") if SEGMENT_COUNT > 1:
scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}") scope.write(":ACQuire:MODE SEGMented")
scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}")
else:
scope.write(":ACQuire:MODE RTIMe")
time.sleep(0.5) time.sleep(0.5)
_drain_scpi_errors("configure_for_lp")
def arm_and_wait(timeout_s: float) -> bool: def arm_and_wait(timeout_s: float) -> bool:
@@ -138,14 +248,69 @@ def arm_and_wait(timeout_s: float) -> bool:
pass pass
def save_lp(base_name: str) -> None: def _fetch_channel_segments(channel: int, n_segments: int):
"""Save all N segments of Ch1 (CLK+) and Ch3 (DAT0+) as a single H5 each.""" """
base = f"C:\\TEMP\\{base_name}" Read all segments for one channel via :WAVeform:DATA?. Returns
ext = SAVE_FORMAT.lower() (times_ndarray, list_of_volts_ndarrays). Time axis is shared across all
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.{ext}",{SAVE_FORMAT}') segments. When n_segments == 1 we skip the SEGMented:INDex select since
time.sleep(3.0) we may be in RTIMe (single-shot) mode rather than SEGMented mode.
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.{ext}",{SAVE_FORMAT}') """
time.sleep(3.0) import numpy as np
scope.write(f":WAVeform:SOURce CHANnel{channel}")
scope.write(":WAVeform:FORMat WORD")
scope.write(":WAVeform:BYTeorder LSBFirst")
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
x_org = float(scope.ask(":WAVeform:XORigin?"))
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
y_org = float(scope.ask(":WAVeform:YORigin?"))
segs: list = []
for i in range(1, n_segments + 1):
if n_segments > 1:
scope.write(f":ACQuire:SEGMented:INDex {i}")
scope.write(":WAVeform:DATA?")
raw = _read_ieee_block()
codes = np.frombuffer(raw, dtype="<i2")
volts = codes.astype(np.float64) * y_inc + y_org
segs.append(volts)
n = len(segs[0]) if segs else 0
times = np.arange(n) * x_inc + x_org
return times, segs
def save_lp(base_name: str) -> tuple[bool, list[str]]:
"""
Read all N segments for CLK and DAT directly via VXI-11 binary transfer
and write per-segment CSVs locally to DATA_DIR.
Returns (ok, errs). Filenames match csv_preprocessor's expected pattern:
{base_name}_seg{NNN}_{clk|dat}.csv
"""
import numpy as np
_drain_scpi_errors()
try:
t_clk, clk_segs = _fetch_channel_segments(1, SEGMENT_COUNT)
t_dat, dat_segs = _fetch_channel_segments(3, SEGMENT_COUNT)
except Exception as e:
return (False, [f"fetch error: {e}"])
errs = _drain_scpi_errors()
n_written = 0
for i, (clk, dat) in enumerate(zip(clk_segs, dat_segs), start=1):
clk_path = DATA_DIR / f"{base_name}_seg{i:03d}_clk.csv"
dat_path = DATA_DIR / f"{base_name}_seg{i:03d}_dat.csv"
np.savetxt(clk_path, np.column_stack([t_clk, clk]),
delimiter=",", fmt="%.6e")
np.savetxt(dat_path, np.column_stack([t_dat, dat]),
delimiter=",", fmt="%.6e")
n_written += 1
if n_written == 0:
return (False, errs or ["no segments written"])
return (True, errs)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -186,48 +351,6 @@ def video_stop() -> None:
print(f" VIDEO STOP failed: {e}") print(f" VIDEO STOP failed: {e}")
# ---------------------------------------------------------------------------
# H5 transfer (ai_mgmt only handles CSV — segmented mode produces .h5)
# ---------------------------------------------------------------------------
def _transfer_h5_files() -> int:
"""SMB-pull every .h5 from the scope share into DATA_DIR; delete on scope."""
from smb.SMBConnection import SMBConnection
import socket
conn = SMBConnection(
ai_mgmt.USERNAME, ai_mgmt.PASSWORD,
socket.gethostname(), ai_mgmt.SERVER_NAME,
use_ntlm_v2=True, is_direct_tcp=True,
)
if not conn.connect(ai_mgmt.SERVER, 445):
print(" H5 transfer: could not connect to scope share")
return 0
count = 0
try:
h5_paths: list[str] = []
def walk(path: str) -> None:
for entry in conn.listPath(ai_mgmt.SHARE, path):
if entry.filename in (".", ".."):
continue
full = f"{path}/{entry.filename}"
if entry.isDirectory:
walk(full)
elif entry.filename.lower().endswith(".h5"):
h5_paths.append(full)
walk("/")
for remote in h5_paths:
local = DATA_DIR / Path(remote).name
try:
with open(local, "wb") as fh:
conn.retrieveFile(ai_mgmt.SHARE, remote, fh)
conn.deleteFiles(ai_mgmt.SHARE, remote)
count += 1
except Exception as e:
print(f" H5 transfer failed for {Path(remote).name}: {e}")
finally:
conn.close()
return count
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Register snapshot from device (DSIM PHY + SN65DSI83) # Register snapshot from device (DSIM PHY + SN65DSI83)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -297,39 +420,14 @@ def archive_and_analyse(event: str, since_iso: str) -> None:
# Register snapshot first (fast, before scope transfer which takes longer) # Register snapshot first (fast, before scope transfer which takes longer)
fetch_registers_snapshot(target, event_ts) fetch_registers_snapshot(target, event_ts)
print(f" Transferring scope → {target} ...") # Segment CSVs are already in DATA_DIR (written directly by save_lp via
try: # SCPI binary read). Just move the ones from this event into the folder.
copied, failed = ai_mgmt.transfer_csv_files()
except Exception as e:
print(f" TRANSFER ERROR: {e}")
return
print(f" {copied} file(s) transferred ({failed} failed)")
# ai_mgmt only fetches CSVs. H5 (segmented) files need a separate pass.
h5_count = _transfer_h5_files()
if h5_count:
print(f" {h5_count} H5 file(s) transferred")
# Move just-arrived files (csv + h5) out of data/ (flat) into the event folder.
moved = 0 moved = 0
for f in list(DATA_DIR.glob("*.csv")) + list(DATA_DIR.glob("*.h5")): for f in DATA_DIR.glob("*.csv"):
if f.is_file(): if f.is_file():
shutil.move(str(f), target / f.name) shutil.move(str(f), target / f.name)
moved += 1 moved += 1
print(f" {moved} file(s) archived to {target.relative_to(DATA_DIR.parent)}") print(f" {moved} segment CSV(s) archived to {target.relative_to(DATA_DIR.parent)}")
# Explode each H5 into per-segment CSVs so csv_preprocessor can analyse them.
from explode_h5 import explode
h5_files = sorted(target.glob("*_lp_*.h5"))
seg_csv_count = 0
for h5 in h5_files:
try:
csvs = explode(h5)
seg_csv_count += len(csvs)
except Exception as e:
print(f" EXPLODE error on {h5.name}: {e}")
if h5_files:
print(f" exploded {len(h5_files)} H5 file(s) → {seg_csv_count} segment CSV(s)")
if event != "flicker": if event != "flicker":
return return
@@ -416,9 +514,14 @@ def main() -> None:
cycle_end = time.time() + CYCLE_S cycle_end = time.time() + CYCLE_S
video_start() video_start()
mode_desc = (
f"CLK_GLITCH (CLK+ HIGH > {CLK_GLITCH_HIGH_MS:.1f} ms, "
f"{LP_SCALE * 20 * 1000:.0f} ms window)"
if TRIGGER_MODE == "CLK_GLITCH"
else f"LP_DAT ({SEGMENT_COUNT} segs/acquire)"
)
print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON " print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON "
f"({CYCLE_S:.0f}s window, {SEGMENT_COUNT} segs/acquire)", f"({CYCLE_S:.0f}s window, {mode_desc})", flush=True)
flush=True)
event = None event = None
last_tick = 0.0 last_tick = 0.0
@@ -429,16 +532,29 @@ def main() -> None:
if arm_and_wait(TRIG_TIMEOUT_S): if arm_and_wait(TRIG_TIMEOUT_S):
try: try:
save_lp(base) ok, errs = save_lp(base)
cycle_caps.append(base) if ok:
print(f" + acq {seq:02d} ({SEGMENT_COUNT} segs) " cycle_caps.append(base)
f"[{remaining():4.1f}s left]", flush=True) tag = ("CLK GLITCH" if TRIGGER_MODE == "CLK_GLITCH"
else f"{SEGMENT_COUNT} segs")
print(f" + acq {seq:02d} ({tag}) "
f"[{remaining():4.1f}s left]",
flush=True)
else:
print(f" ! acq {seq:02d} SAVE FAILED — "
f"{errs[0][:80] if errs else 'unknown'}",
flush=True)
except Exception as e: except Exception as e:
print(f" save error: {e}", flush=True) print(f" save error: {e}", flush=True)
else: else:
# Trigger timed out — print a heartbeat at most every 2s # Trigger timed out — print a heartbeat at most every 2s.
# In CLK_GLITCH mode this is the *normal* state: it just
# means no glitch happened during this cycle.
if time.time() - last_tick > 2.0: if time.time() - last_tick > 2.0:
print(f" ... waiting for trigger " msg = ("waiting for CLK glitch"
if TRIGGER_MODE == "CLK_GLITCH"
else "waiting for trigger")
print(f" ... {msg} "
f"[{remaining():4.1f}s left]", flush=True) f"[{remaining():4.1f}s left]", flush=True)
last_tick = time.time() last_tick = time.time()

277
sn65_monitor.py Normal file
View File

@@ -0,0 +1,277 @@
#!/usr/bin/env python3
"""
sn65_monitor.py — High-rate SN65DSI83 register monitor.
Continuously polls /sn65_registers at ~20 Hz, logs any register-state change
in real time, and keeps a rolling 30 s window in memory. When you press
`f` (flicker) or `g` (good), the window is dumped to a JSON file and
summarised so you can see whether anything moved at the moment of the event.
This complements flicker_watch.py: run it in a second terminal during a
test session to catch transient register changes that disappear before the
post-event snapshot in flicker_watch can fetch them.
Keys:
f — flicker event: dump rolling buffer + summary, keep monitoring
g — good baseline: dump rolling buffer + summary, keep monitoring
q — quit
"""
from __future__ import annotations
import json
import select
import sys
import termios
import time
import tty
from collections import deque
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
DSIM_EP = f"{DEVICE_BASE}/registers"
DATA_DIR = Path(__file__).parent / "data" / "sn65_log"
# Aim for ~100 Hz SN65 polling — actual rate is bounded by the I2C-read
# latency of the device server. At 20 Hz the unlock pulse-width was
# unresolvable ("≤ 50 ms"); at 100 Hz we should see whether it's e.g. 5 ms
# or 30 ms, which narrows the root-cause search.
POLL_DT_S = 0.01 # 100 Hz target
HISTORY_S = 30.0
HTTP_TIMEOUT_S = 0.2 # tighter timeout — a slow read shouldn't stall the loop
# DSIM register read goes through memtool and adds latency. The current
# endpoint only exposes 3 static PHY-timing config registers anyway, so
# poll it once every N SN65 polls (set to 0 to disable entirely). When the
# device endpoint gains DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL,
# raise this rate.
DSIM_POLL_EVERY = 50 # at 100 Hz, every 50th poll → 2 Hz DSIM
# csr_e5 error bit names from the device's register decode
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
# ---------------------------------------------------------------------------
# Non-blocking keyboard
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# Register parsing
# ---------------------------------------------------------------------------
def extract_state(sn65_data: dict, dsim_data: dict | None) -> dict:
"""Pull just the bits we care about into a hashable dict."""
regs = sn65_data.get("registers", {}) if isinstance(sn65_data, dict) else {}
csr_0a = regs.get("csr_0a", {}) or {}
csr_e5 = regs.get("csr_e5", {}) or {}
state = {
"csr_0a": csr_0a.get("value"),
"csr_e5": csr_e5.get("value"),
"pll_lock": csr_0a.get("pll_lock"),
"clk_det": csr_0a.get("clk_det"),
}
for k in ERROR_BITS:
state[k] = csr_e5.get(k)
# DSIM register values (whatever the endpoint exposes). Currently:
# DSIM_PHYTIMING (0x32e100b4), DSIM_PHYTIMING1 (0x32e100b8), DSIM_PHYTIMING2 (0x32e100bc).
# These shouldn't change at runtime — but if any DOES move during an unlock
# event, that's a clue. When the endpoint is extended to expose status
# registers (DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL),
# they'll be picked up here automatically.
if isinstance(dsim_data, dict):
for entry in dsim_data.get("registers", []) or []:
if isinstance(entry, dict) and "name" in entry and "value" in entry:
state[f"dsim_{entry['name']}"] = entry["value"]
return state
def state_str(s: dict) -> str:
"""Compact one-line representation of a state."""
pll = "PLL✓" if s.get("pll_lock") else "PLL✗"
clk = "CLK✓" if s.get("clk_det") else "CLK✗"
errs = [k for k in ERROR_BITS if s.get(k)]
err_str = (",".join(errs) if errs else "no_err")
return (f"{pll} {clk} csr0a={s.get('csr_0a')} csr_e5={s.get('csr_e5')} "
f"{err_str}")
# ---------------------------------------------------------------------------
# Event handling
# ---------------------------------------------------------------------------
def save_event(event: str, history: deque, session_changes: list) -> Path:
DATA_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out = DATA_DIR / f"{ts}_{event}.json"
snapshot = list(history)
payload = {
"event": event,
"saved_at": ts,
"n_samples": len(snapshot),
"window_seconds": HISTORY_S,
"samples": snapshot,
"session_changes": session_changes[-200:],
}
out.write_text(json.dumps(payload, indent=2, default=str))
# Quick console summary
states_in_window = []
for s in snapshot:
if "state" in s:
sig = json.dumps(s["state"], sort_keys=True)
if not states_in_window or states_in_window[-1][1] != sig:
states_in_window.append((s["ts"], sig, s["state"]))
print(f"\n*** {event.upper()} EVENT @ {ts} ***")
print(f" {len(snapshot)} samples saved → {out.relative_to(DATA_DIR.parent.parent)}")
if len(states_in_window) <= 1:
print(f" register state was STABLE through the {HISTORY_S:.0f}s window")
if states_in_window:
print(f" {state_str(states_in_window[0][2])}")
else:
print(f" *** {len(states_in_window)} distinct register states seen in window: ***")
for ts_change, _, st in states_in_window:
t_iso = datetime.fromtimestamp(ts_change).strftime("%H:%M:%S.%f")[:-3]
print(f" {t_iso} {state_str(st)}")
return out
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
sess = requests.Session()
history: deque = deque(maxlen=int(HISTORY_S / POLL_DT_S) + 10)
session_changes: list = [] # log of every state change since startup
last_state: dict | None = None
last_dsim: dict | None = None
iter_count = 0
poll_count = 0
err_count = 0
last_status = time.time()
started = time.time()
print(f"SN65 + DSIM MONITOR")
print(f" SN65: {SN65_EP} (every poll)")
if DSIM_POLL_EVERY:
print(f" DSIM: {DSIM_EP} (every {DSIM_POLL_EVERY} polls)")
else:
print(f" DSIM: disabled")
print(f"poll target {1.0/POLL_DT_S:.0f} Hz, rolling buffer {HISTORY_S:.0f}s")
print("keys: f=flicker g=good q=quit\n", flush=True)
with KeyReader() as keys:
try:
while True:
t0 = time.time()
iter_count += 1
sn65_data: dict = {}
err_this_poll = False
try:
r = sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
sn65_data = r.json()
except requests.exceptions.RequestException as e:
err_this_poll = True
history.append({"ts": t0, "error": f"sn65: {e}"})
# DSIM is fetched only every Nth iteration to keep the SN65
# poll rate high. In between, we reuse the previous DSIM
# snapshot.
if DSIM_POLL_EVERY and (iter_count % DSIM_POLL_EVERY == 0):
try:
r = sess.get(DSIM_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
last_dsim = r.json()
except requests.exceptions.RequestException:
# best-effort; keep last known
pass
dsim_data = last_dsim
if err_this_poll:
err_count += 1
else:
state = extract_state(sn65_data, dsim_data)
history.append({"ts": t0, "state": state,
"sn65_raw": sn65_data,
"dsim_raw": dsim_data})
poll_count += 1
if last_state is not None and state != last_state:
delta = {k: (last_state.get(k), state.get(k))
for k in state if state.get(k) != last_state.get(k)}
ts_iso = datetime.fromtimestamp(t0).strftime("%H:%M:%S.%f")[:-3]
print(f"\n[{ts_iso}] CHANGE: {state_str(state)}")
for k, (old, new) in delta.items():
print(f" {k}: {old}{new}")
session_changes.append(
{"ts": t0, "iso": ts_iso, "delta": delta,
"new_state": state}
)
last_state = state
# Status line every 2 s — overwrites itself with \r
if t0 - last_status > 2.0:
rate = poll_count / (t0 - last_status) if t0 > last_status else 0
err_pct = err_count / max(1, poll_count + err_count) * 100
cur = state_str(last_state) if last_state else "(no data)"
sys.stdout.write(
f"\r {rate:5.1f} Hz | err {err_pct:4.1f}% | "
f"buf {len(history)} | changes {len(session_changes)} | "
f"{cur} "
)
sys.stdout.flush()
last_status = t0
poll_count = 0
err_count = 0
# Keypress
key = keys.get_key()
if key == "f":
save_event("flicker", history, session_changes)
elif key == "g":
save_event("good", history, session_changes)
elif key == "q":
print("\nQUIT.")
break
# Pace
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
except KeyboardInterrupt:
print("\nInterrupted (Ctrl+C).")
# Session summary
dur = time.time() - started
print(f"\n--- session summary: {dur:.1f}s, "
f"{len(session_changes)} state change(s) ---")
if session_changes:
print(" recent changes:")
for c in session_changes[-10:]:
print(f" {c['iso']} {state_str(c['new_state'])}")
if __name__ == "__main__":
main()

61
video_cycler.py Normal file
View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
video_cycler.py — Toggle /video start/stop on the device every CYCLE_S seconds.
Pairs with sn65_monitor.py: this script provokes the flicker by cycling the
static-pink video stream, while sn65_monitor measures. Press Ctrl+C to stop.
"""
import signal
import sys
import time
from datetime import datetime
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
CYCLE_S = 10.0
HTTP_TIMEOUT_S = 3.0
def video_start() -> None:
try:
requests.put(VIDEO_URL,
json={"action": "start", "mode": "static-pink"},
timeout=HTTP_TIMEOUT_S)
except requests.exceptions.RequestException as e:
print(f" video START failed: {e}")
def video_stop() -> None:
try:
requests.put(VIDEO_URL, json={"action": "stop"},
timeout=HTTP_TIMEOUT_S)
except requests.exceptions.RequestException as e:
print(f" video STOP failed: {e}")
def main() -> None:
# On Ctrl+C, make sure we leave video stopped.
def _shutdown(*_):
print("\nshutting down — video off")
video_stop()
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print(f"VIDEO CYCLER — {CYCLE_S:.0f} s on / 0.5 s off (Ctrl+C to stop)\n")
cycle = 0
while True:
cycle += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] cycle {cycle:04d} video ON", flush=True)
video_start()
time.sleep(CYCLE_S)
video_stop()
time.sleep(0.5)
if __name__ == "__main__":
main()