398 lines
18 KiB
Python
398 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Manual test triggers for device_server.py running on the remote unit.
|
|
|
|
Usage:
|
|
python trigger.py start # start kiosk on default video (vid.mp4)
|
|
python trigger.py start --video vid2.mp4 # start kiosk on vid2.mp4
|
|
python trigger.py switch # cycle to the other video
|
|
python trigger.py loop --interval 30 # fire 'switch' every 30s until Ctrl-C
|
|
python trigger.py monitor # device-side 10 ms polling, alert on unlock
|
|
python trigger.py monitor --device-poll-ms 5 # tighter (5 ms) device polling
|
|
python trigger.py monitor --switch-every 5 # also fire switch every 5s, mask post-switch unlocks
|
|
python trigger.py monitor --switch-every 5 --mask 0.5 # custom mask window (default 0.5s)
|
|
python trigger.py monitor --switch-every 5 --wide-ms 500 # also watch full SN65 + DSIM register set
|
|
python trigger.py monitor --switch-every 5 --wide-ms 500 --fast-dsim-ms 1 # add 1 ms /dev/mem DSIM poll
|
|
python trigger.py monitor --switch-every 5 --wide-ms 500 --log # auto-named log in data/
|
|
python trigger.py monitor --switch-every 5 --wide-ms 500 --log mysession.log # custom path
|
|
python trigger.py testpattern-on # enable SN65 internal LVDS test pattern
|
|
python trigger.py testpattern-off # back to MIPI input
|
|
# While monitor is running, press 'f' to mark a visible flicker observation.
|
|
# On Ctrl-C, a correlation table reports whether each mark matched an unlock or not.
|
|
python trigger.py start-pink # start kiosk in static-pink mode
|
|
python trigger.py stop # stop kiosk
|
|
python trigger.py registers # GET /registers
|
|
python trigger.py sn65 # GET /sn65_registers
|
|
python trigger.py settling # GET /sn65_settling
|
|
|
|
Override host/port:
|
|
python trigger.py switch --host 10.32.33.96 --port 5000
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
# Non-blocking single-key input. Windows-only (msvcrt). Falls back gracefully.
|
|
try:
|
|
import msvcrt
|
|
_HAS_KBHIT = True
|
|
except ImportError:
|
|
_HAS_KBHIT = False
|
|
|
|
|
|
def _fmt_ts(t: float) -> str:
|
|
return time.strftime("%H:%M:%S", time.localtime(t)) + f".{int((t % 1) * 1000):03d}"
|
|
|
|
|
|
class _Tee:
|
|
"""Minimal stdout-tee: forwards writes to multiple streams. Used for --log."""
|
|
def __init__(self, *streams):
|
|
self.streams = streams
|
|
def write(self, data):
|
|
for s in self.streams:
|
|
s.write(data)
|
|
# Flush file streams so the log is always current if the user tails it.
|
|
for s in self.streams[1:]:
|
|
s.flush()
|
|
def flush(self):
|
|
for s in self.streams:
|
|
s.flush()
|
|
|
|
|
|
ACTIONS = {
|
|
"switch": ("PUT", "/display", {"state": "on"}),
|
|
"start": ("PUT", "/video", {"action": "start"}),
|
|
"start-pink": ("PUT", "/video", {"action": "start", "mode": "static-pink"}),
|
|
"stop": ("PUT", "/video", {"action": "stop"}),
|
|
"registers": ("GET", "/registers", None),
|
|
"sn65": ("GET", "/sn65_registers", None),
|
|
"settling": ("GET", "/sn65_settling", None),
|
|
"testpattern-on": ("PUT", "/sn65_testpattern", {"state": "on"}),
|
|
"testpattern-off": ("PUT", "/sn65_testpattern", {"state": "off"}),
|
|
"loop": None, # handled specially in main()
|
|
"monitor": None, # handled specially in main()
|
|
}
|
|
|
|
|
|
def _put_json(url: str, payload, timeout: float) -> dict:
|
|
"""PUT JSON, return parsed JSON response."""
|
|
data = json.dumps(payload).encode()
|
|
headers = {"Content-Type": "application/json"}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method="PUT")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def _fmt_event_summary(ev: dict) -> str:
|
|
if ev["type"] == "recovered":
|
|
return f"recovered (csr_0a={ev['csr_0a']}, csr_e5={ev['csr_e5']})"
|
|
if ev["type"] in ("register_change", "dsim_fast_change"):
|
|
parts = [f"{r}({info.get('name')}) {info.get('from')}->{info.get('to')}"
|
|
for r, info in (ev.get("changes") or {}).items()]
|
|
return "; ".join(parts)
|
|
flags = ",".join(ev.get("flags") or [])
|
|
return f"csr_0a={ev['csr_0a']} csr_e5={ev['csr_e5']} {flags}"
|
|
|
|
|
|
def _run_monitor(host: str, port: int, device_poll_ms: int, fetch_s: float, timeout: float,
|
|
switch_every: float | None = None, mask_s: float = 0.5,
|
|
wide_ms: int = 0, fast_dsim_ms: int = 0) -> int:
|
|
"""Device-side PLL monitor. Starts a background poll thread on the device at
|
|
device_poll_ms cadence, then fetches new events from the host every fetch_s
|
|
seconds and prints alerts. Suppresses unlocks within mask_s of any switch.
|
|
"""
|
|
base = f"http://{host}:{port}"
|
|
mon_url = f"{base}/pll_monitor"
|
|
events_url = f"{base}/pll_monitor/events"
|
|
switch_url = f"{base}/display"
|
|
switch_payload = {"state": "on"}
|
|
|
|
# Start the device-side monitor
|
|
start_payload = {"action": "start", "interval_ms": device_poll_ms}
|
|
if wide_ms > 0:
|
|
start_payload["wide_interval_ms"] = wide_ms
|
|
if fast_dsim_ms > 0:
|
|
start_payload["fast_dsim_interval_ms"] = fast_dsim_ms
|
|
try:
|
|
resp = _put_json(mon_url, start_payload, timeout)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
print("Device doesn't have /pll_monitor — scp the updated device_server.py and restart device-server.")
|
|
return 1
|
|
raise
|
|
|
|
host_start = time.time()
|
|
device_now = float(resp.get("device_now", host_start))
|
|
# offset to convert device time → host time
|
|
dev_to_host = host_start - device_now
|
|
actual_ms = int(resp.get("interval_ms", device_poll_ms))
|
|
actual_wide = int(resp.get("wide_interval_ms", 0))
|
|
actual_fast_dsim = int(resp.get("fast_dsim_interval_ms", 0))
|
|
|
|
print(f"Device monitor running at {actual_ms} ms; host fetching every {fetch_s*1000:.0f} ms. Ctrl-C to stop.")
|
|
if actual_wide > 0:
|
|
print(f"Wide-register snapshot enabled at {actual_wide} ms (alerts on any non-frame-counter change).")
|
|
if actual_fast_dsim > 0:
|
|
print(f"Fast DSIM mmap poll enabled at {actual_fast_dsim} ms (direct /dev/mem; catches sub-frame register transients).")
|
|
print(f"Clock offset (host - device): {dev_to_host*1000:+.1f} ms.")
|
|
if switch_every:
|
|
print(f"Driving switch every {switch_every}s; masking unlocks within {mask_s*1000:.0f} ms of each switch.")
|
|
print()
|
|
|
|
last_seen = 0.0 # device time of last event we've already shown
|
|
events = 0 # steady-state (alerted) unlocks
|
|
masked = 0 # unlocks in mask window
|
|
switches = 0
|
|
next_switch = (host_start + switch_every) if switch_every else None
|
|
fetch_errs = 0
|
|
switch_history: list = [] # host times of recent switches, for mask lookup
|
|
flicker_marks: list = [] # host times the user pressed 'f' to mark visible flicker
|
|
unlocks_log: list = [] # (host_t, event_dict) for every unlock event, for correlation
|
|
|
|
if _HAS_KBHIT:
|
|
print("Press 'f' to mark a visible flicker observation. Ctrl-C to stop.\n")
|
|
else:
|
|
print("(Flicker-mark key disabled — msvcrt not available on this platform.)\n")
|
|
|
|
try:
|
|
while True:
|
|
now = time.time()
|
|
|
|
# Drain any pending keypresses (flicker mark)
|
|
if _HAS_KBHIT:
|
|
while msvcrt.kbhit():
|
|
ch = msvcrt.getch()
|
|
if ch == b'\x03': # Ctrl-C as raw byte (rarely happens here, but safe)
|
|
raise KeyboardInterrupt
|
|
c = ch.decode("utf-8", errors="replace").lower()
|
|
if c == "f":
|
|
flicker_marks.append(now)
|
|
print(f"*** [{_fmt_ts(now)}] FLICKER MARK #{len(flicker_marks)}")
|
|
|
|
# Fire scheduled switch
|
|
if next_switch is not None and now >= next_switch:
|
|
switches += 1
|
|
ts = time.strftime("%H:%M:%S") + f".{int((now % 1) * 1000):03d}"
|
|
rc = _send("PUT", switch_url, switch_payload, timeout, quiet=True)
|
|
tag = "OK" if rc == 0 else f"FAIL rc={rc}"
|
|
print(f"[{ts}] switch #{switches} fired ({tag}) — masking {mask_s*1000:.0f} ms")
|
|
switch_history.append(now)
|
|
# keep history small
|
|
if len(switch_history) > 200:
|
|
del switch_history[:100]
|
|
next_switch = now + switch_every
|
|
|
|
# Fetch new events from device
|
|
try:
|
|
with urllib.request.urlopen(f"{events_url}?since={last_seen}", timeout=timeout) as r:
|
|
body = json.loads(r.read().decode())
|
|
except Exception as e:
|
|
fetch_errs += 1
|
|
if fetch_errs <= 3 or fetch_errs % 50 == 0:
|
|
print(f"[{time.strftime('%H:%M:%S')}] fetch error #{fetch_errs}: {e}")
|
|
time.sleep(max(fetch_s, 0.5))
|
|
continue
|
|
|
|
new_events = body.get("events", [])
|
|
for ev in new_events:
|
|
last_seen = max(last_seen, ev["t"])
|
|
# device time → host time
|
|
host_t = ev["t"] + dev_to_host
|
|
ts = time.strftime("%H:%M:%S", time.localtime(host_t)) + f".{int((host_t % 1) * 1000):03d}"
|
|
summary = _fmt_event_summary(ev)
|
|
|
|
if ev["type"] == "recovered":
|
|
print(f" [{ts}] {summary}")
|
|
continue
|
|
|
|
if ev["type"] == "register_change":
|
|
# Always alert — register changes are rare and interesting.
|
|
# Mask-status is informational only.
|
|
in_mask = any(0 <= (host_t - sw_t) <= mask_s for sw_t in switch_history)
|
|
tag = " (post-switch)" if in_mask else ""
|
|
print(f"\a!!! [{ts}] REGISTER CHANGE{tag}: {summary}")
|
|
continue
|
|
|
|
if ev["type"] == "dsim_fast_change":
|
|
in_mask = any(0 <= (host_t - sw_t) <= mask_s for sw_t in switch_history)
|
|
tag = " (post-switch)" if in_mask else ""
|
|
print(f"\a>>> [{ts}] DSIM FAST{tag}: {summary}")
|
|
continue
|
|
|
|
# unlock — log unconditionally so flicker-mark correlation has full picture
|
|
unlocks_log.append((host_t, ev))
|
|
in_mask = any(0 <= (host_t - sw_t) <= mask_s for sw_t in switch_history)
|
|
if in_mask:
|
|
masked += 1
|
|
print(f" [{ts}] (masked, post-switch) {summary}")
|
|
else:
|
|
events += 1
|
|
print(f"\a>>> [{ts}] UNLOCK #{events} (STEADY-STATE): {summary}")
|
|
|
|
time.sleep(fetch_s)
|
|
|
|
except KeyboardInterrupt:
|
|
dur = time.time() - host_start
|
|
# Pull final stats from device
|
|
try:
|
|
with urllib.request.urlopen(events_url, timeout=timeout) as r:
|
|
final = json.loads(r.read().decode())
|
|
stats = final.get("stats", {})
|
|
except Exception:
|
|
stats = {}
|
|
# Stop the device-side monitor
|
|
try:
|
|
_put_json(mon_url, {"action": "stop"}, timeout)
|
|
except Exception:
|
|
pass
|
|
|
|
print(f"\nStopped after {dur:.1f}s.")
|
|
if stats:
|
|
polls = stats.get("polls", 0)
|
|
errs = stats.get("errors", 0)
|
|
rate = polls / dur if dur > 0 else 0
|
|
print(f" Device PLL polls: {polls} ({rate:.1f}/s, {errs} I²C errors)")
|
|
fast_polls = stats.get("fast_dsim_polls", 0)
|
|
if fast_polls:
|
|
fast_rate = fast_polls / dur if dur > 0 else 0
|
|
print(f" Fast DSIM polls (mmap): {fast_polls} ({fast_rate:.0f}/s)")
|
|
fast_err = stats.get("fast_dsim_error")
|
|
if fast_err:
|
|
print(f" Fast DSIM error: {fast_err}")
|
|
print(f" Host fetches: errors={fetch_errs}")
|
|
if switch_every:
|
|
print(f" Switches fired: {switches}")
|
|
print(f" Masked (post-switch) unlocks: {masked}")
|
|
print(f" STEADY-STATE unlocks (alerted): {events}")
|
|
|
|
# --- Flicker observation correlation ---
|
|
if flicker_marks:
|
|
print(f"\n Flicker observations: {len(flicker_marks)}")
|
|
print(f" {'Mark':>5} {'Time':<13} {'Δ nearest switch':>17} "
|
|
f"{'Δ nearest unlock':>17} Verdict")
|
|
within = 0 # marks within mask window of a switch unlock
|
|
between = 0
|
|
for i, mark_t in enumerate(flicker_marks, 1):
|
|
nearest_sw = min(switch_history, key=lambda t: abs(t - mark_t)) if switch_history else None
|
|
nearest_un = min(unlocks_log, key=lambda x: abs(x[0] - mark_t))[0] if unlocks_log else None
|
|
sw_delta = (mark_t - nearest_sw) * 1000 if nearest_sw is not None else None
|
|
un_delta = (mark_t - nearest_un) * 1000 if nearest_un is not None else None
|
|
# Classify: if mark is within mask_s of an unlock event, call it switch-induced
|
|
if un_delta is not None and abs(un_delta) <= mask_s * 1000:
|
|
verdict = "MATCHES switch unlock"
|
|
within += 1
|
|
else:
|
|
verdict = "BETWEEN events (no PLL unlock nearby)"
|
|
between += 1
|
|
sw_str = f"{sw_delta:+8.0f} ms" if sw_delta is not None else " —"
|
|
un_str = f"{un_delta:+8.0f} ms" if un_delta is not None else " —"
|
|
print(f" #{i:<4} {_fmt_ts(mark_t):<13} {sw_str:>17} {un_str:>17} {verdict}")
|
|
print(f"\n Summary: {within} match an unlock, {between} between events.")
|
|
return 0
|
|
|
|
|
|
def _send(method: str, url: str, payload, timeout: float, quiet: bool = False) -> int:
|
|
data = json.dumps(payload).encode() if payload is not None else None
|
|
headers = {"Content-Type": "application/json"} if data else {}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
|
|
if not quiet:
|
|
print(f"{method} {url}" + (f" body={json.dumps(payload)}" if payload else ""))
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
body = resp.read().decode(errors="replace")
|
|
if not quiet:
|
|
print(f"HTTP {resp.status}")
|
|
try:
|
|
print(json.dumps(json.loads(body), indent=2))
|
|
except json.JSONDecodeError:
|
|
print(body)
|
|
except urllib.error.HTTPError as e:
|
|
print(f"HTTP {e.code} {e.reason}")
|
|
print(e.read().decode(errors="replace"))
|
|
return 1
|
|
except urllib.error.URLError as e:
|
|
print(f"connection error: {e.reason}", file=sys.stderr)
|
|
return 2
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
p.add_argument("action", choices=ACTIONS.keys())
|
|
p.add_argument("--video", help="Initial video filename for 'start' (e.g. vid.mp4, vid2.mp4)")
|
|
p.add_argument("--interval", type=float, default=30.0,
|
|
help="Seconds between switches for 'loop' action (default: 30)")
|
|
p.add_argument("--device-poll-ms", type=int, default=10,
|
|
help="For 'monitor': device-side I²C polling interval in ms (default: 10)")
|
|
p.add_argument("--fetch", type=float, default=0.1,
|
|
help="For 'monitor': how often host fetches new events from device (default: 0.1 = 100 ms)")
|
|
p.add_argument("--switch-every", type=float, default=None,
|
|
help="For 'monitor': also fire a switch every N seconds")
|
|
p.add_argument("--mask", type=float, default=0.5,
|
|
help="For 'monitor': suppress unlocks within N seconds of each switch (default: 0.5)")
|
|
p.add_argument("--wide-ms", type=int, default=0,
|
|
help="For 'monitor': also snapshot all SN65 config/status regs every N ms (0=off, try 500)")
|
|
p.add_argument("--fast-dsim-ms", type=int, default=0,
|
|
help="For 'monitor': enable device-side fast DSIM poll via /dev/mem mmap (0=off, try 1)")
|
|
p.add_argument("--log", nargs='?', const='auto', default=None,
|
|
help="Tee output to a log file. Use --log alone for auto-named file in data/, or --log path/to/file.log")
|
|
p.add_argument("--host", default="10.32.33.100")
|
|
p.add_argument("--port", type=int, default=5000)
|
|
p.add_argument("--timeout", type=float, default=10.0)
|
|
args = p.parse_args()
|
|
|
|
if args.action == "monitor":
|
|
log_path = None
|
|
log_fh = None
|
|
if args.log:
|
|
if args.log == 'auto':
|
|
import os
|
|
os.makedirs("data", exist_ok=True)
|
|
log_path = os.path.join("data", f"monitor_{time.strftime('%Y%m%d_%H%M%S')}.log")
|
|
else:
|
|
log_path = args.log
|
|
log_fh = open(log_path, 'w', encoding='utf-8')
|
|
sys.stdout = _Tee(sys.__stdout__, log_fh)
|
|
print(f"Logging to {log_path}")
|
|
try:
|
|
return _run_monitor(args.host, args.port, args.device_poll_ms, args.fetch,
|
|
args.timeout, switch_every=args.switch_every, mask_s=args.mask,
|
|
wide_ms=args.wide_ms, fast_dsim_ms=args.fast_dsim_ms)
|
|
finally:
|
|
if log_fh is not None:
|
|
sys.stdout = sys.__stdout__
|
|
log_fh.close()
|
|
print(f"Log saved to {log_path}")
|
|
|
|
if args.action == "loop":
|
|
url = f"http://{args.host}:{args.port}/display"
|
|
payload = {"state": "on"}
|
|
print(f"Looping 'switch' every {args.interval}s. Ctrl-C to stop.")
|
|
n = 0
|
|
try:
|
|
while True:
|
|
time.sleep(args.interval)
|
|
n += 1
|
|
ts = time.strftime("%H:%M:%S")
|
|
print(f"[{ts}] switch #{n}")
|
|
rc = _send("PUT", url, payload, args.timeout, quiet=True)
|
|
if rc != 0:
|
|
print(f" (request failed, continuing)")
|
|
except KeyboardInterrupt:
|
|
print(f"\nStopped after {n} switches.")
|
|
return 0
|
|
|
|
method, path, payload = ACTIONS[args.action]
|
|
if args.video and args.action in ("start", "start-pink"):
|
|
payload = {**payload, "video": args.video}
|
|
url = f"http://{args.host}:{args.port}{path}"
|
|
return _send(method, url, payload, args.timeout)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|