#!/usr/bin/env python3 """ compare_stops.py — Compare different video-stop styles for SN65 PLL impact. For each "stop style" defined in STYLES, this script: 1. Polls /sn65_registers in a background thread (50 Hz, like sn65_monitor) 2. Runs N cycles of start → on_s seconds → stop → off_s seconds 3. Records PLL unlock events and which style was active when each fired At the end it prints a comparison table: per-style cycles run, unlocks seen, unlocks-per-stop probability, and unlock pulse-width stats. This is the test you use to verify that switching the device's "stop" semantics (e.g. GStreamer PAUSE instead of NULL) actually reduces the unlock rate. Edit STYLES at the top of the file to add new stop variants as your device server gains them. The start payload can vary too if you want to test different start sequences. Usage: python3 compare_stops.py # 20 cycles per style python3 compare_stops.py --cycles 50 --on-s 5 """ from __future__ import annotations import argparse import json import statistics import sys import threading import time from datetime import datetime from pathlib import Path import requests DEVICE_BASE = "http://192.168.45.8:5000" VIDEO_URL = f"{DEVICE_BASE}/video" SN65_EP = f"{DEVICE_BASE}/sn65_registers" POLL_DT_S = 0.02 # 50 Hz target HTTP_TIMEOUT_S = 0.2 LOG_DIR = Path(__file__).parent / "data" / "compare_logs" # ------------------------------------------------------------------------- # STYLES — edit this list to add new stop variants # ------------------------------------------------------------------------- # Each entry: (label, start_payload_dict, stop_payload_dict) # The default first entry is the current /video stop behaviour. # When your device server supports alternative actions (e.g. "pause"), # uncomment / add the corresponding line below. STYLES: list[tuple[str, dict, dict]] = [ ("stop_full", {"action": "start", "mode": "static-pink"}, {"action": "stop"}), # === Add styles below once the device server supports them === # ("stop_pause", # GST_STATE_PAUSED — keeps DSI clock alive # {"action": "start", "mode": "static-pink"}, # {"action": "pause"}), # ("stop_ready", # GST_STATE_READY — half-way teardown # {"action": "start", "mode": "static-pink"}, # {"action": "ready"}), # ("stop_keep_clk", # {"action": "start", "mode": "static-pink"}, # {"action": "stop", "keep_clock": True}), ] # ------------------------------------------------------------------------- # SN65 polling thread # ------------------------------------------------------------------------- class SN65Poller: """Background thread polling /sn65_registers; records every PLL transition.""" ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err", "cha_ecc_err", "cha_lp_err", "cha_crc_err") def __init__(self) -> None: self.sess = requests.Session() self._stop = threading.Event() self._thread: threading.Thread | None = None self.changes: list[dict] = [] self.samples: list[dict] = [] # every poll: {ts, state, err?} self._last_state: dict | None = None self.actual_hz = 0.0 self.err_count = 0 def start(self) -> None: self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self) -> None: self._stop.set() if self._thread: self._thread.join(timeout=2) def _run(self) -> None: n_polls = 0 rate_t0 = time.time() while not self._stop.is_set(): t0 = time.time() try: r = self.sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S) r.raise_for_status() data = r.json() except requests.exceptions.RequestException: self.samples.append({"ts": t0, "error": True}) self.err_count += 1 if t0 - rate_t0 > 2.0: self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0 time.sleep(POLL_DT_S) continue regs = data.get("registers", {}) csr0a = regs.get("csr_0a", {}) or {} csre5 = regs.get("csr_e5", {}) or {} state = { "csr_0a": csr0a.get("value"), "csr_e5": csre5.get("value"), "pll_lock": csr0a.get("pll_lock"), "clk_det": csr0a.get("clk_det"), } for k in self.ERROR_BITS: state[k] = csre5.get(k) self.samples.append({"ts": t0, "state": state}) n_polls += 1 if self._last_state is not None and state != self._last_state: delta = {k: (self._last_state.get(k), state.get(k)) for k in state if state.get(k) != self._last_state.get(k)} self.changes.append({"ts": t0, "delta": delta, "new_state": state}) self._last_state = state if t0 - rate_t0 > 2.0: self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0 elapsed = time.time() - t0 if elapsed < POLL_DT_S: time.sleep(POLL_DT_S - elapsed) def unlocks(self) -> list[dict]: """Pair pll_lock False→True transitions into [{start_ts, end_ts, duration_ms}].""" pll = [c for c in self.changes if "pll_lock" in c["delta"]] out, i = [], 0 while i < len(pll): _, new = pll[i]["delta"]["pll_lock"] if new is False: for j in range(i+1, min(i+20, len(pll))): _, n_new = pll[j]["delta"]["pll_lock"] if n_new is True: out.append({"start_ts": pll[i]["ts"], "end_ts": pll[j]["ts"], "duration_ms": (pll[j]["ts"]-pll[i]["ts"])*1000}) i = j break i += 1 return out # ------------------------------------------------------------------------- # Cycle runner # ------------------------------------------------------------------------- def put_video(payload: dict, label: str) -> None: try: requests.put(VIDEO_URL, json=payload, timeout=3.0) except requests.exceptions.RequestException as e: print(f" {label} HTTP failed: {e}") def run_style(label: str, start_payload: dict, stop_payload: dict, n_cycles: int, on_s: float, off_s: float, poller: SN65Poller, transitions: list) -> tuple[float, float]: """Run n cycles of the given style. Returns (start_ts, end_ts).""" print(f"\n▸ style '{label}'") print(f" start={start_payload} stop={stop_payload} cycles={n_cycles}") style_start = time.time() for c in range(1, n_cycles + 1): t = time.time() put_video(start_payload, "start") transitions.append({"ts": t, "event": "start", "style": label, "cycle": c}) time.sleep(on_s) t = time.time() put_video(stop_payload, "stop") transitions.append({"ts": t, "event": "stop", "style": label, "cycle": c}) if c % 5 == 0 or c == n_cycles: n_unlocks = len(poller.unlocks()) print(f" cycle {c:>3}/{n_cycles} " f"{poller.actual_hz:5.1f} Hz " f"unlocks so far: {n_unlocks}", flush=True) time.sleep(off_s) return style_start, time.time() # ------------------------------------------------------------------------- # Analysis # ------------------------------------------------------------------------- def analyse(styles_run: list[dict], unlocks: list[dict], transitions: list[dict]) -> None: """Bucket unlocks by which style was active when they fired.""" print("\n" + "="*78) print(" STOP-STYLE COMPARISON") print("="*78) print(f"\n {'style':<18} {'cycles':>6} {'unlocks':>8} " f"{'per_cycle':>10} {'med_pulse':>10} {'min_pulse':>10} {'max_pulse':>10}") print(f" {'-'*18} {'-'*6} {'-'*8} {'-'*10} {'-'*10} {'-'*10} {'-'*10}") for s in styles_run: in_window = [u for u in unlocks if s["t_start"] <= u["start_ts"] <= s["t_end"]] durs = [u["duration_ms"] for u in in_window] n = len(in_window) per_cycle = n / s["cycles"] med = statistics.median(durs) if durs else 0.0 mn = min(durs) if durs else 0.0 mx = max(durs) if durs else 0.0 print(f" {s['label']:<18} {s['cycles']:>6} {n:>8} " f"{per_cycle:>10.2f} " f"{med:>8.1f}ms {mn:>8.1f}ms {mx:>8.1f}ms") # also: distribution after STOP (we expect 100% if behaviour is same as baseline) stops_in_style = [t for t in transitions if t["style"] == s["label"] and t["event"] == "stop"] after_stop = 0 after_start = 0 for u in in_window: nearest_stop = min((t["ts"] for t in stops_in_style), key=lambda ts: abs(ts - u["start_ts"]), default=None) starts_in_style = [t for t in transitions if t["style"] == s["label"] and t["event"] == "start"] nearest_start = min((t["ts"] for t in starts_in_style), key=lambda ts: abs(ts - u["start_ts"]), default=None) if nearest_stop is None or nearest_start is None: continue d_stop = u["start_ts"] - nearest_stop d_start = u["start_ts"] - nearest_start if 0 <= d_stop <= 3.0 and (d_start < 0 or d_stop < d_start): after_stop += 1 elif 0 <= d_start <= 3.0: after_start += 1 if n: print(f" {'':<18} ↳ after STOP: {after_stop}, after START: {after_start}," f" other: {n - after_stop - after_start}") def save_log(styles_run, unlocks, transitions, samples) -> Path: LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") out = LOG_DIR / f"{ts}_compare.json" out.write_text(json.dumps({ "saved_at": ts, "styles": styles_run, "unlocks": unlocks, "transitions": transitions, "n_samples": len(samples), }, indent=2, default=str)) return out def main() -> None: ap = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--cycles", type=int, default=20, help="number of cycles per style (default 20)") ap.add_argument("--on-s", type=float, default=10.0, help="seconds video is ON per cycle (default 10)") ap.add_argument("--off-s", type=float, default=0.5, help="seconds video is OFF per cycle (default 0.5)") ap.add_argument("--settle-s", type=float, default=2.0, help="seconds to wait between styles (default 2)") args = ap.parse_args() if not STYLES: print("No styles configured — edit STYLES at the top of this file.") sys.exit(1) print(f"compare_stops — {len(STYLES)} style(s) × {args.cycles} cycles " f"({args.on_s}s on / {args.off_s}s off)") print(f" styles: {[s[0] for s in STYLES]}") poller = SN65Poller() poller.start() print(f" SN65 polling started — waiting 1s for first sample...") time.sleep(1.0) styles_run = [] transitions: list = [] try: for label, start_p, stop_p in STYLES: t0, t1 = run_style(label, start_p, stop_p, args.cycles, args.on_s, args.off_s, poller, transitions) styles_run.append({"label": label, "cycles": args.cycles, "t_start": t0, "t_end": t1, "start_payload": start_p, "stop_payload": stop_p}) # Settle so unlocks from one style don't get attributed to the next print(f" settling {args.settle_s}s ...") time.sleep(args.settle_s) except KeyboardInterrupt: print("\nInterrupted — leaving video stopped") put_video({"action": "stop"}, "stop-on-exit") poller.stop() unlocks = poller.unlocks() print(f"\n SN65 poller: actual ~{poller.actual_hz:.1f} Hz, " f"{poller.err_count} HTTP errors, " f"{len(unlocks)} total PLL unlock(s)") analyse(styles_run, unlocks, transitions) out = save_log(styles_run, unlocks, transitions, poller.samples) print(f"\n log saved → {out.relative_to(LOG_DIR.parent.parent)}") if __name__ == "__main__": main()