#!/usr/bin/env python3 """ sn65_monitor.py — High-rate SN65DSI83 register monitor. Continuously polls /sn65_registers at ~20 Hz, logs any register-state change in real time, and keeps a rolling 30 s window in memory. When you press `f` (flicker) or `g` (good), the window is dumped to a JSON file and summarised so you can see whether anything moved at the moment of the event. This complements flicker_watch.py: run it in a second terminal during a test session to catch transient register changes that disappear before the post-event snapshot in flicker_watch can fetch them. Keys: f — flicker event: dump rolling buffer + summary, keep monitoring g — good baseline: dump rolling buffer + summary, keep monitoring q — quit """ from __future__ import annotations import json import select import sys import termios import time import tty from collections import deque from datetime import datetime from pathlib import Path import requests DEVICE_BASE = "http://192.168.45.8:5000" SN65_EP = f"{DEVICE_BASE}/sn65_registers" DSIM_EP = f"{DEVICE_BASE}/registers" DATA_DIR = Path(__file__).parent / "data" / "sn65_log" # Aim for ~100 Hz SN65 polling — actual rate is bounded by the I2C-read # latency of the device server. At 20 Hz the unlock pulse-width was # unresolvable ("≤ 50 ms"); at 100 Hz we should see whether it's e.g. 5 ms # or 30 ms, which narrows the root-cause search. POLL_DT_S = 0.01 # 100 Hz target HISTORY_S = 30.0 HTTP_TIMEOUT_S = 0.2 # tighter timeout — a slow read shouldn't stall the loop # DSIM register read goes through memtool and adds latency. The current # endpoint only exposes 3 static PHY-timing config registers anyway, so # poll it once every N SN65 polls (set to 0 to disable entirely). When the # device endpoint gains DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL, # raise this rate. DSIM_POLL_EVERY = 50 # at 100 Hz, every 50th poll → 2 Hz DSIM # csr_e5 error bit names from the device's register decode ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err", "cha_ecc_err", "cha_lp_err", "cha_crc_err") # --------------------------------------------------------------------------- # Non-blocking keyboard # --------------------------------------------------------------------------- class KeyReader: def __enter__(self): self.fd = sys.stdin.fileno() self.old = termios.tcgetattr(self.fd) tty.setcbreak(self.fd) return self def get_key(self) -> str | None: if select.select([sys.stdin], [], [], 0)[0]: return sys.stdin.read(1).lower() return None def __exit__(self, *_): termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) # --------------------------------------------------------------------------- # Register parsing # --------------------------------------------------------------------------- def extract_state(sn65_data: dict, dsim_data: dict | None) -> dict: """Pull just the bits we care about into a hashable dict.""" regs = sn65_data.get("registers", {}) if isinstance(sn65_data, dict) else {} csr_0a = regs.get("csr_0a", {}) or {} csr_e5 = regs.get("csr_e5", {}) or {} state = { "csr_0a": csr_0a.get("value"), "csr_e5": csr_e5.get("value"), "pll_lock": csr_0a.get("pll_lock"), "clk_det": csr_0a.get("clk_det"), } for k in ERROR_BITS: state[k] = csr_e5.get(k) # DSIM register values (whatever the endpoint exposes). Currently: # DSIM_PHYTIMING (0x32e100b4), DSIM_PHYTIMING1 (0x32e100b8), DSIM_PHYTIMING2 (0x32e100bc). # These shouldn't change at runtime — but if any DOES move during an unlock # event, that's a clue. When the endpoint is extended to expose status # registers (DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL), # they'll be picked up here automatically. if isinstance(dsim_data, dict): for entry in dsim_data.get("registers", []) or []: if isinstance(entry, dict) and "name" in entry and "value" in entry: state[f"dsim_{entry['name']}"] = entry["value"] return state def state_str(s: dict) -> str: """Compact one-line representation of a state.""" pll = "PLL✓" if s.get("pll_lock") else "PLL✗" clk = "CLK✓" if s.get("clk_det") else "CLK✗" errs = [k for k in ERROR_BITS if s.get(k)] err_str = (",".join(errs) if errs else "no_err") return (f"{pll} {clk} csr0a={s.get('csr_0a')} csr_e5={s.get('csr_e5')} " f"{err_str}") # --------------------------------------------------------------------------- # Event handling # --------------------------------------------------------------------------- def save_event(event: str, history: deque, session_changes: list) -> Path: DATA_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") out = DATA_DIR / f"{ts}_{event}.json" snapshot = list(history) payload = { "event": event, "saved_at": ts, "n_samples": len(snapshot), "window_seconds": HISTORY_S, "samples": snapshot, "session_changes": session_changes[-200:], } out.write_text(json.dumps(payload, indent=2, default=str)) # Quick console summary states_in_window = [] for s in snapshot: if "state" in s: sig = json.dumps(s["state"], sort_keys=True) if not states_in_window or states_in_window[-1][1] != sig: states_in_window.append((s["ts"], sig, s["state"])) print(f"\n*** {event.upper()} EVENT @ {ts} ***") print(f" {len(snapshot)} samples saved → {out.relative_to(DATA_DIR.parent.parent)}") if len(states_in_window) <= 1: print(f" register state was STABLE through the {HISTORY_S:.0f}s window") if states_in_window: print(f" {state_str(states_in_window[0][2])}") else: print(f" *** {len(states_in_window)} distinct register states seen in window: ***") for ts_change, _, st in states_in_window: t_iso = datetime.fromtimestamp(ts_change).strftime("%H:%M:%S.%f")[:-3] print(f" {t_iso} {state_str(st)}") return out # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: sess = requests.Session() history: deque = deque(maxlen=int(HISTORY_S / POLL_DT_S) + 10) session_changes: list = [] # log of every state change since startup last_state: dict | None = None last_dsim: dict | None = None iter_count = 0 poll_count = 0 err_count = 0 last_status = time.time() started = time.time() print(f"SN65 + DSIM MONITOR") print(f" SN65: {SN65_EP} (every poll)") if DSIM_POLL_EVERY: print(f" DSIM: {DSIM_EP} (every {DSIM_POLL_EVERY} polls)") else: print(f" DSIM: disabled") print(f"poll target {1.0/POLL_DT_S:.0f} Hz, rolling buffer {HISTORY_S:.0f}s") print("keys: f=flicker g=good q=quit\n", flush=True) with KeyReader() as keys: try: while True: t0 = time.time() iter_count += 1 sn65_data: dict = {} err_this_poll = False try: r = sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S) r.raise_for_status() sn65_data = r.json() except requests.exceptions.RequestException as e: err_this_poll = True history.append({"ts": t0, "error": f"sn65: {e}"}) # DSIM is fetched only every Nth iteration to keep the SN65 # poll rate high. In between, we reuse the previous DSIM # snapshot. if DSIM_POLL_EVERY and (iter_count % DSIM_POLL_EVERY == 0): try: r = sess.get(DSIM_EP, timeout=HTTP_TIMEOUT_S) r.raise_for_status() last_dsim = r.json() except requests.exceptions.RequestException: # best-effort; keep last known pass dsim_data = last_dsim if err_this_poll: err_count += 1 else: state = extract_state(sn65_data, dsim_data) history.append({"ts": t0, "state": state, "sn65_raw": sn65_data, "dsim_raw": dsim_data}) poll_count += 1 if last_state is not None and state != last_state: delta = {k: (last_state.get(k), state.get(k)) for k in state if state.get(k) != last_state.get(k)} ts_iso = datetime.fromtimestamp(t0).strftime("%H:%M:%S.%f")[:-3] print(f"\n[{ts_iso}] CHANGE: {state_str(state)}") for k, (old, new) in delta.items(): print(f" {k}: {old} → {new}") session_changes.append( {"ts": t0, "iso": ts_iso, "delta": delta, "new_state": state} ) last_state = state # Status line every 2 s — overwrites itself with \r if t0 - last_status > 2.0: rate = poll_count / (t0 - last_status) if t0 > last_status else 0 err_pct = err_count / max(1, poll_count + err_count) * 100 cur = state_str(last_state) if last_state else "(no data)" sys.stdout.write( f"\r {rate:5.1f} Hz | err {err_pct:4.1f}% | " f"buf {len(history)} | changes {len(session_changes)} | " f"{cur} " ) sys.stdout.flush() last_status = t0 poll_count = 0 err_count = 0 # Keypress key = keys.get_key() if key == "f": save_event("flicker", history, session_changes) elif key == "g": save_event("good", history, session_changes) elif key == "q": print("\nQUIT.") break # Pace elapsed = time.time() - t0 if elapsed < POLL_DT_S: time.sleep(POLL_DT_S - elapsed) except KeyboardInterrupt: print("\nInterrupted (Ctrl+C).") # Session summary dur = time.time() - started print(f"\n--- session summary: {dur:.1f}s, " f"{len(session_changes)} state change(s) ---") if session_changes: print(" recent changes:") for c in session_changes[-10:]: print(f" {c['iso']} {state_str(c['new_state'])}") if __name__ == "__main__": main()