Files
MiPi_TEST/sn65_monitor.py
david rice 8d8df1e7a7 Updates
2026-05-11 08:21:34 +01:00

278 lines
11 KiB
Python

#!/usr/bin/env python3
"""
sn65_monitor.py — High-rate SN65DSI83 register monitor.
Continuously polls /sn65_registers at ~20 Hz, logs any register-state change
in real time, and keeps a rolling 30 s window in memory. When you press
`f` (flicker) or `g` (good), the window is dumped to a JSON file and
summarised so you can see whether anything moved at the moment of the event.
This complements flicker_watch.py: run it in a second terminal during a
test session to catch transient register changes that disappear before the
post-event snapshot in flicker_watch can fetch them.
Keys:
f — flicker event: dump rolling buffer + summary, keep monitoring
g — good baseline: dump rolling buffer + summary, keep monitoring
q — quit
"""
from __future__ import annotations
import json
import select
import sys
import termios
import time
import tty
from collections import deque
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
DSIM_EP = f"{DEVICE_BASE}/registers"
DATA_DIR = Path(__file__).parent / "data" / "sn65_log"
# Aim for ~100 Hz SN65 polling — actual rate is bounded by the I2C-read
# latency of the device server. At 20 Hz the unlock pulse-width was
# unresolvable ("≤ 50 ms"); at 100 Hz we should see whether it's e.g. 5 ms
# or 30 ms, which narrows the root-cause search.
POLL_DT_S = 0.01 # 100 Hz target
HISTORY_S = 30.0
HTTP_TIMEOUT_S = 0.2 # tighter timeout — a slow read shouldn't stall the loop
# DSIM register read goes through memtool and adds latency. The current
# endpoint only exposes 3 static PHY-timing config registers anyway, so
# poll it once every N SN65 polls (set to 0 to disable entirely). When the
# device endpoint gains DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL,
# raise this rate.
DSIM_POLL_EVERY = 50 # at 100 Hz, every 50th poll → 2 Hz DSIM
# csr_e5 error bit names from the device's register decode
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
# ---------------------------------------------------------------------------
# Non-blocking keyboard
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# Register parsing
# ---------------------------------------------------------------------------
def extract_state(sn65_data: dict, dsim_data: dict | None) -> dict:
"""Pull just the bits we care about into a hashable dict."""
regs = sn65_data.get("registers", {}) if isinstance(sn65_data, dict) else {}
csr_0a = regs.get("csr_0a", {}) or {}
csr_e5 = regs.get("csr_e5", {}) or {}
state = {
"csr_0a": csr_0a.get("value"),
"csr_e5": csr_e5.get("value"),
"pll_lock": csr_0a.get("pll_lock"),
"clk_det": csr_0a.get("clk_det"),
}
for k in ERROR_BITS:
state[k] = csr_e5.get(k)
# DSIM register values (whatever the endpoint exposes). Currently:
# DSIM_PHYTIMING (0x32e100b4), DSIM_PHYTIMING1 (0x32e100b8), DSIM_PHYTIMING2 (0x32e100bc).
# These shouldn't change at runtime — but if any DOES move during an unlock
# event, that's a clue. When the endpoint is extended to expose status
# registers (DSIM_STATUS / DSIM_CLKCTRL / DSIM_INTSRC / DSIM_FIFOCTRL),
# they'll be picked up here automatically.
if isinstance(dsim_data, dict):
for entry in dsim_data.get("registers", []) or []:
if isinstance(entry, dict) and "name" in entry and "value" in entry:
state[f"dsim_{entry['name']}"] = entry["value"]
return state
def state_str(s: dict) -> str:
"""Compact one-line representation of a state."""
pll = "PLL✓" if s.get("pll_lock") else "PLL✗"
clk = "CLK✓" if s.get("clk_det") else "CLK✗"
errs = [k for k in ERROR_BITS if s.get(k)]
err_str = (",".join(errs) if errs else "no_err")
return (f"{pll} {clk} csr0a={s.get('csr_0a')} csr_e5={s.get('csr_e5')} "
f"{err_str}")
# ---------------------------------------------------------------------------
# Event handling
# ---------------------------------------------------------------------------
def save_event(event: str, history: deque, session_changes: list) -> Path:
DATA_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out = DATA_DIR / f"{ts}_{event}.json"
snapshot = list(history)
payload = {
"event": event,
"saved_at": ts,
"n_samples": len(snapshot),
"window_seconds": HISTORY_S,
"samples": snapshot,
"session_changes": session_changes[-200:],
}
out.write_text(json.dumps(payload, indent=2, default=str))
# Quick console summary
states_in_window = []
for s in snapshot:
if "state" in s:
sig = json.dumps(s["state"], sort_keys=True)
if not states_in_window or states_in_window[-1][1] != sig:
states_in_window.append((s["ts"], sig, s["state"]))
print(f"\n*** {event.upper()} EVENT @ {ts} ***")
print(f" {len(snapshot)} samples saved → {out.relative_to(DATA_DIR.parent.parent)}")
if len(states_in_window) <= 1:
print(f" register state was STABLE through the {HISTORY_S:.0f}s window")
if states_in_window:
print(f" {state_str(states_in_window[0][2])}")
else:
print(f" *** {len(states_in_window)} distinct register states seen in window: ***")
for ts_change, _, st in states_in_window:
t_iso = datetime.fromtimestamp(ts_change).strftime("%H:%M:%S.%f")[:-3]
print(f" {t_iso} {state_str(st)}")
return out
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
sess = requests.Session()
history: deque = deque(maxlen=int(HISTORY_S / POLL_DT_S) + 10)
session_changes: list = [] # log of every state change since startup
last_state: dict | None = None
last_dsim: dict | None = None
iter_count = 0
poll_count = 0
err_count = 0
last_status = time.time()
started = time.time()
print(f"SN65 + DSIM MONITOR")
print(f" SN65: {SN65_EP} (every poll)")
if DSIM_POLL_EVERY:
print(f" DSIM: {DSIM_EP} (every {DSIM_POLL_EVERY} polls)")
else:
print(f" DSIM: disabled")
print(f"poll target {1.0/POLL_DT_S:.0f} Hz, rolling buffer {HISTORY_S:.0f}s")
print("keys: f=flicker g=good q=quit\n", flush=True)
with KeyReader() as keys:
try:
while True:
t0 = time.time()
iter_count += 1
sn65_data: dict = {}
err_this_poll = False
try:
r = sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
sn65_data = r.json()
except requests.exceptions.RequestException as e:
err_this_poll = True
history.append({"ts": t0, "error": f"sn65: {e}"})
# DSIM is fetched only every Nth iteration to keep the SN65
# poll rate high. In between, we reuse the previous DSIM
# snapshot.
if DSIM_POLL_EVERY and (iter_count % DSIM_POLL_EVERY == 0):
try:
r = sess.get(DSIM_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
last_dsim = r.json()
except requests.exceptions.RequestException:
# best-effort; keep last known
pass
dsim_data = last_dsim
if err_this_poll:
err_count += 1
else:
state = extract_state(sn65_data, dsim_data)
history.append({"ts": t0, "state": state,
"sn65_raw": sn65_data,
"dsim_raw": dsim_data})
poll_count += 1
if last_state is not None and state != last_state:
delta = {k: (last_state.get(k), state.get(k))
for k in state if state.get(k) != last_state.get(k)}
ts_iso = datetime.fromtimestamp(t0).strftime("%H:%M:%S.%f")[:-3]
print(f"\n[{ts_iso}] CHANGE: {state_str(state)}")
for k, (old, new) in delta.items():
print(f" {k}: {old}{new}")
session_changes.append(
{"ts": t0, "iso": ts_iso, "delta": delta,
"new_state": state}
)
last_state = state
# Status line every 2 s — overwrites itself with \r
if t0 - last_status > 2.0:
rate = poll_count / (t0 - last_status) if t0 > last_status else 0
err_pct = err_count / max(1, poll_count + err_count) * 100
cur = state_str(last_state) if last_state else "(no data)"
sys.stdout.write(
f"\r {rate:5.1f} Hz | err {err_pct:4.1f}% | "
f"buf {len(history)} | changes {len(session_changes)} | "
f"{cur} "
)
sys.stdout.flush()
last_status = t0
poll_count = 0
err_count = 0
# Keypress
key = keys.get_key()
if key == "f":
save_event("flicker", history, session_changes)
elif key == "g":
save_event("good", history, session_changes)
elif key == "q":
print("\nQUIT.")
break
# Pace
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
except KeyboardInterrupt:
print("\nInterrupted (Ctrl+C).")
# Session summary
dur = time.time() - started
print(f"\n--- session summary: {dur:.1f}s, "
f"{len(session_changes)} state change(s) ---")
if session_changes:
print(" recent changes:")
for c in session_changes[-10:]:
print(f" {c['iso']} {state_str(c['new_state'])}")
if __name__ == "__main__":
main()