278 lines
11 KiB
Python
278 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
sn65_monitor.py — High-rate SN65DSI83 register monitor.
|
|
|
|
Continuously polls /sn65_registers at ~20 Hz, logs any register-state change
|
|
in real time, and keeps a rolling 30 s window in memory. When you press
|
|
`f` (flicker) or `g` (good), the window is dumped to a JSON file and
|
|
summarised so you can see whether anything moved at the moment of the event.
|
|
|
|
This complements flicker_watch.py: run it in a second terminal during a
|
|
test session to catch transient register changes that disappear before the
|
|
post-event snapshot in flicker_watch can fetch them.
|
|
|
|
Keys:
|
|
f — flicker event: dump rolling buffer + summary, keep monitoring
|
|
g — good baseline: dump rolling buffer + summary, keep monitoring
|
|
q — quit
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import select
|
|
import sys
|
|
import termios
|
|
import time
|
|
import tty
|
|
from collections import deque
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
DEVICE_BASE = "http://192.168.45.8:5000"
|
|
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
|
DSIM_EP = f"{DEVICE_BASE}/registers"
|
|
DATA_DIR = Path(__file__).parent / "data" / "sn65_log"
|
|
|
|
# Aim for ~100 Hz SN65 polling — actual rate is bounded by the I2C-read
|
|
# latency of the device server. At 20 Hz the unlock pulse-width was
|
|
# unresolvable ("≤ 50 ms"); at 100 Hz we should see whether it's e.g. 5 ms
|
|
# or 30 ms, which narrows the root-cause search.
|
|
POLL_DT_S = 0.01 # 100 Hz target
|
|
HISTORY_S = 30.0
|
|
HTTP_TIMEOUT_S = 0.2 # tighter timeout — a slow read shouldn't stall the loop
|
|
|
|
# DSIM register read goes through memtool and adds latency. The current
|
|
# endpoint only exposes 3 static PHY-timing config registers anyway, so
|
|
# poll it once every N SN65 polls (set to 0 to disable entirely). When the
|
|
# device endpoint gains DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL,
|
|
# raise this rate.
|
|
DSIM_POLL_EVERY = 50 # at 100 Hz, every 50th poll → 2 Hz DSIM
|
|
|
|
# csr_e5 error bit names from the device's register decode
|
|
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
|
|
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Non-blocking keyboard
|
|
# ---------------------------------------------------------------------------
|
|
class KeyReader:
|
|
def __enter__(self):
|
|
self.fd = sys.stdin.fileno()
|
|
self.old = termios.tcgetattr(self.fd)
|
|
tty.setcbreak(self.fd)
|
|
return self
|
|
|
|
def get_key(self) -> str | None:
|
|
if select.select([sys.stdin], [], [], 0)[0]:
|
|
return sys.stdin.read(1).lower()
|
|
return None
|
|
|
|
def __exit__(self, *_):
|
|
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Register parsing
|
|
# ---------------------------------------------------------------------------
|
|
def extract_state(sn65_data: dict, dsim_data: dict | None) -> dict:
|
|
"""Pull just the bits we care about into a hashable dict."""
|
|
regs = sn65_data.get("registers", {}) if isinstance(sn65_data, dict) else {}
|
|
csr_0a = regs.get("csr_0a", {}) or {}
|
|
csr_e5 = regs.get("csr_e5", {}) or {}
|
|
state = {
|
|
"csr_0a": csr_0a.get("value"),
|
|
"csr_e5": csr_e5.get("value"),
|
|
"pll_lock": csr_0a.get("pll_lock"),
|
|
"clk_det": csr_0a.get("clk_det"),
|
|
}
|
|
for k in ERROR_BITS:
|
|
state[k] = csr_e5.get(k)
|
|
|
|
# DSIM register values (whatever the endpoint exposes). Currently:
|
|
# DSIM_PHYTIMING (0x32e100b4), DSIM_PHYTIMING1 (0x32e100b8), DSIM_PHYTIMING2 (0x32e100bc).
|
|
# These shouldn't change at runtime — but if any DOES move during an unlock
|
|
# event, that's a clue. When the endpoint is extended to expose status
|
|
# registers (DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL),
|
|
# they'll be picked up here automatically.
|
|
if isinstance(dsim_data, dict):
|
|
for entry in dsim_data.get("registers", []) or []:
|
|
if isinstance(entry, dict) and "name" in entry and "value" in entry:
|
|
state[f"dsim_{entry['name']}"] = entry["value"]
|
|
return state
|
|
|
|
|
|
def state_str(s: dict) -> str:
|
|
"""Compact one-line representation of a state."""
|
|
pll = "PLL✓" if s.get("pll_lock") else "PLL✗"
|
|
clk = "CLK✓" if s.get("clk_det") else "CLK✗"
|
|
errs = [k for k in ERROR_BITS if s.get(k)]
|
|
err_str = (",".join(errs) if errs else "no_err")
|
|
return (f"{pll} {clk} csr0a={s.get('csr_0a')} csr_e5={s.get('csr_e5')} "
|
|
f"{err_str}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Event handling
|
|
# ---------------------------------------------------------------------------
|
|
def save_event(event: str, history: deque, session_changes: list) -> Path:
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
out = DATA_DIR / f"{ts}_{event}.json"
|
|
|
|
snapshot = list(history)
|
|
payload = {
|
|
"event": event,
|
|
"saved_at": ts,
|
|
"n_samples": len(snapshot),
|
|
"window_seconds": HISTORY_S,
|
|
"samples": snapshot,
|
|
"session_changes": session_changes[-200:],
|
|
}
|
|
out.write_text(json.dumps(payload, indent=2, default=str))
|
|
|
|
# Quick console summary
|
|
states_in_window = []
|
|
for s in snapshot:
|
|
if "state" in s:
|
|
sig = json.dumps(s["state"], sort_keys=True)
|
|
if not states_in_window or states_in_window[-1][1] != sig:
|
|
states_in_window.append((s["ts"], sig, s["state"]))
|
|
|
|
print(f"\n*** {event.upper()} EVENT @ {ts} ***")
|
|
print(f" {len(snapshot)} samples saved → {out.relative_to(DATA_DIR.parent.parent)}")
|
|
if len(states_in_window) <= 1:
|
|
print(f" register state was STABLE through the {HISTORY_S:.0f}s window")
|
|
if states_in_window:
|
|
print(f" {state_str(states_in_window[0][2])}")
|
|
else:
|
|
print(f" *** {len(states_in_window)} distinct register states seen in window: ***")
|
|
for ts_change, _, st in states_in_window:
|
|
t_iso = datetime.fromtimestamp(ts_change).strftime("%H:%M:%S.%f")[:-3]
|
|
print(f" {t_iso} {state_str(st)}")
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
def main() -> None:
|
|
sess = requests.Session()
|
|
history: deque = deque(maxlen=int(HISTORY_S / POLL_DT_S) + 10)
|
|
session_changes: list = [] # log of every state change since startup
|
|
last_state: dict | None = None
|
|
last_dsim: dict | None = None
|
|
iter_count = 0
|
|
poll_count = 0
|
|
err_count = 0
|
|
last_status = time.time()
|
|
started = time.time()
|
|
|
|
print(f"SN65 + DSIM MONITOR")
|
|
print(f" SN65: {SN65_EP} (every poll)")
|
|
if DSIM_POLL_EVERY:
|
|
print(f" DSIM: {DSIM_EP} (every {DSIM_POLL_EVERY} polls)")
|
|
else:
|
|
print(f" DSIM: disabled")
|
|
print(f"poll target {1.0/POLL_DT_S:.0f} Hz, rolling buffer {HISTORY_S:.0f}s")
|
|
print("keys: f=flicker g=good q=quit\n", flush=True)
|
|
|
|
with KeyReader() as keys:
|
|
try:
|
|
while True:
|
|
t0 = time.time()
|
|
iter_count += 1
|
|
sn65_data: dict = {}
|
|
err_this_poll = False
|
|
try:
|
|
r = sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
|
|
r.raise_for_status()
|
|
sn65_data = r.json()
|
|
except requests.exceptions.RequestException as e:
|
|
err_this_poll = True
|
|
history.append({"ts": t0, "error": f"sn65: {e}"})
|
|
|
|
# DSIM is fetched only every Nth iteration to keep the SN65
|
|
# poll rate high. In between, we reuse the previous DSIM
|
|
# snapshot.
|
|
if DSIM_POLL_EVERY and (iter_count % DSIM_POLL_EVERY == 0):
|
|
try:
|
|
r = sess.get(DSIM_EP, timeout=HTTP_TIMEOUT_S)
|
|
r.raise_for_status()
|
|
last_dsim = r.json()
|
|
except requests.exceptions.RequestException:
|
|
# best-effort; keep last known
|
|
pass
|
|
dsim_data = last_dsim
|
|
|
|
if err_this_poll:
|
|
err_count += 1
|
|
else:
|
|
state = extract_state(sn65_data, dsim_data)
|
|
history.append({"ts": t0, "state": state,
|
|
"sn65_raw": sn65_data,
|
|
"dsim_raw": dsim_data})
|
|
poll_count += 1
|
|
|
|
if last_state is not None and state != last_state:
|
|
delta = {k: (last_state.get(k), state.get(k))
|
|
for k in state if state.get(k) != last_state.get(k)}
|
|
ts_iso = datetime.fromtimestamp(t0).strftime("%H:%M:%S.%f")[:-3]
|
|
print(f"\n[{ts_iso}] CHANGE: {state_str(state)}")
|
|
for k, (old, new) in delta.items():
|
|
print(f" {k}: {old} → {new}")
|
|
session_changes.append(
|
|
{"ts": t0, "iso": ts_iso, "delta": delta,
|
|
"new_state": state}
|
|
)
|
|
last_state = state
|
|
|
|
# Status line every 2 s — overwrites itself with \r
|
|
if t0 - last_status > 2.0:
|
|
rate = poll_count / (t0 - last_status) if t0 > last_status else 0
|
|
err_pct = err_count / max(1, poll_count + err_count) * 100
|
|
cur = state_str(last_state) if last_state else "(no data)"
|
|
sys.stdout.write(
|
|
f"\r {rate:5.1f} Hz | err {err_pct:4.1f}% | "
|
|
f"buf {len(history)} | changes {len(session_changes)} | "
|
|
f"{cur} "
|
|
)
|
|
sys.stdout.flush()
|
|
last_status = t0
|
|
poll_count = 0
|
|
err_count = 0
|
|
|
|
# Keypress
|
|
key = keys.get_key()
|
|
if key == "f":
|
|
save_event("flicker", history, session_changes)
|
|
elif key == "g":
|
|
save_event("good", history, session_changes)
|
|
elif key == "q":
|
|
print("\nQUIT.")
|
|
break
|
|
|
|
# Pace
|
|
elapsed = time.time() - t0
|
|
if elapsed < POLL_DT_S:
|
|
time.sleep(POLL_DT_S - elapsed)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted (Ctrl+C).")
|
|
|
|
# Session summary
|
|
dur = time.time() - started
|
|
print(f"\n--- session summary: {dur:.1f}s, "
|
|
f"{len(session_changes)} state change(s) ---")
|
|
if session_changes:
|
|
print(" recent changes:")
|
|
for c in session_changes[-10:]:
|
|
print(f" {c['iso']} {state_str(c['new_state'])}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|