321 lines
13 KiB
Python
321 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
compare_stops.py — Compare different video-stop styles for SN65 PLL impact.
|
||
|
||
For each "stop style" defined in STYLES, this script:
|
||
1. Polls /sn65_registers in a background thread (50 Hz, like sn65_monitor)
|
||
2. Runs N cycles of start → on_s seconds → stop → off_s seconds
|
||
3. Records PLL unlock events and which style was active when each fired
|
||
|
||
At the end it prints a comparison table: per-style cycles run, unlocks seen,
|
||
unlocks-per-stop probability, and unlock pulse-width stats. This is the
|
||
test you use to verify that switching the device's "stop" semantics (e.g.
|
||
GStreamer PAUSE instead of NULL) actually reduces the unlock rate.
|
||
|
||
Edit STYLES at the top of the file to add new stop variants as your
|
||
device server gains them. The start payload can vary too if you want
|
||
to test different start sequences.
|
||
|
||
Usage:
|
||
python3 compare_stops.py # 20 cycles per style
|
||
python3 compare_stops.py --cycles 50 --on-s 5
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import statistics
|
||
import sys
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
VIDEO_URL = f"{DEVICE_BASE}/video"
|
||
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
||
|
||
POLL_DT_S = 0.02 # 50 Hz target
|
||
HTTP_TIMEOUT_S = 0.2
|
||
|
||
LOG_DIR = Path(__file__).parent / "data" / "compare_logs"
|
||
|
||
# -------------------------------------------------------------------------
|
||
# STYLES — edit this list to add new stop variants
|
||
# -------------------------------------------------------------------------
|
||
# Each entry: (label, start_payload_dict, stop_payload_dict)
|
||
# The default first entry is the current /video stop behaviour.
|
||
# When your device server supports alternative actions (e.g. "pause"),
|
||
# uncomment / add the corresponding line below.
|
||
STYLES: list[tuple[str, dict, dict]] = [
|
||
("stop_full",
|
||
{"action": "start", "mode": "static-pink"},
|
||
{"action": "stop"}),
|
||
|
||
# === Add styles below once the device server supports them ===
|
||
# ("stop_pause", # GST_STATE_PAUSED — keeps DSI clock alive
|
||
# {"action": "start", "mode": "static-pink"},
|
||
# {"action": "pause"}),
|
||
# ("stop_ready", # GST_STATE_READY — half-way teardown
|
||
# {"action": "start", "mode": "static-pink"},
|
||
# {"action": "ready"}),
|
||
# ("stop_keep_clk",
|
||
# {"action": "start", "mode": "static-pink"},
|
||
# {"action": "stop", "keep_clock": True}),
|
||
]
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# SN65 polling thread
|
||
# -------------------------------------------------------------------------
|
||
class SN65Poller:
|
||
"""Background thread polling /sn65_registers; records every PLL transition."""
|
||
|
||
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
|
||
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
|
||
|
||
def __init__(self) -> None:
|
||
self.sess = requests.Session()
|
||
self._stop = threading.Event()
|
||
self._thread: threading.Thread | None = None
|
||
self.changes: list[dict] = []
|
||
self.samples: list[dict] = [] # every poll: {ts, state, err?}
|
||
self._last_state: dict | None = None
|
||
self.actual_hz = 0.0
|
||
self.err_count = 0
|
||
|
||
def start(self) -> None:
|
||
self._thread = threading.Thread(target=self._run, daemon=True)
|
||
self._thread.start()
|
||
|
||
def stop(self) -> None:
|
||
self._stop.set()
|
||
if self._thread:
|
||
self._thread.join(timeout=2)
|
||
|
||
def _run(self) -> None:
|
||
n_polls = 0
|
||
rate_t0 = time.time()
|
||
while not self._stop.is_set():
|
||
t0 = time.time()
|
||
try:
|
||
r = self.sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
except requests.exceptions.RequestException:
|
||
self.samples.append({"ts": t0, "error": True})
|
||
self.err_count += 1
|
||
if t0 - rate_t0 > 2.0:
|
||
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
|
||
time.sleep(POLL_DT_S)
|
||
continue
|
||
|
||
regs = data.get("registers", {})
|
||
csr0a = regs.get("csr_0a", {}) or {}
|
||
csre5 = regs.get("csr_e5", {}) or {}
|
||
state = {
|
||
"csr_0a": csr0a.get("value"),
|
||
"csr_e5": csre5.get("value"),
|
||
"pll_lock": csr0a.get("pll_lock"),
|
||
"clk_det": csr0a.get("clk_det"),
|
||
}
|
||
for k in self.ERROR_BITS:
|
||
state[k] = csre5.get(k)
|
||
self.samples.append({"ts": t0, "state": state})
|
||
n_polls += 1
|
||
|
||
if self._last_state is not None and state != self._last_state:
|
||
delta = {k: (self._last_state.get(k), state.get(k))
|
||
for k in state if state.get(k) != self._last_state.get(k)}
|
||
self.changes.append({"ts": t0, "delta": delta, "new_state": state})
|
||
self._last_state = state
|
||
|
||
if t0 - rate_t0 > 2.0:
|
||
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
|
||
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
|
||
def unlocks(self) -> list[dict]:
|
||
"""Pair pll_lock False→True transitions into [{start_ts, end_ts, duration_ms}]."""
|
||
pll = [c for c in self.changes if "pll_lock" in c["delta"]]
|
||
out, i = [], 0
|
||
while i < len(pll):
|
||
_, new = pll[i]["delta"]["pll_lock"]
|
||
if new is False:
|
||
for j in range(i+1, min(i+20, len(pll))):
|
||
_, n_new = pll[j]["delta"]["pll_lock"]
|
||
if n_new is True:
|
||
out.append({"start_ts": pll[i]["ts"],
|
||
"end_ts": pll[j]["ts"],
|
||
"duration_ms": (pll[j]["ts"]-pll[i]["ts"])*1000})
|
||
i = j
|
||
break
|
||
i += 1
|
||
return out
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Cycle runner
|
||
# -------------------------------------------------------------------------
|
||
def put_video(payload: dict, label: str) -> None:
|
||
try:
|
||
requests.put(VIDEO_URL, json=payload, timeout=3.0)
|
||
except requests.exceptions.RequestException as e:
|
||
print(f" {label} HTTP failed: {e}")
|
||
|
||
|
||
def run_style(label: str, start_payload: dict, stop_payload: dict,
|
||
n_cycles: int, on_s: float, off_s: float,
|
||
poller: SN65Poller, transitions: list) -> tuple[float, float]:
|
||
"""Run n cycles of the given style. Returns (start_ts, end_ts)."""
|
||
print(f"\n▸ style '{label}'")
|
||
print(f" start={start_payload} stop={stop_payload} cycles={n_cycles}")
|
||
style_start = time.time()
|
||
for c in range(1, n_cycles + 1):
|
||
t = time.time()
|
||
put_video(start_payload, "start")
|
||
transitions.append({"ts": t, "event": "start", "style": label, "cycle": c})
|
||
time.sleep(on_s)
|
||
t = time.time()
|
||
put_video(stop_payload, "stop")
|
||
transitions.append({"ts": t, "event": "stop", "style": label, "cycle": c})
|
||
if c % 5 == 0 or c == n_cycles:
|
||
n_unlocks = len(poller.unlocks())
|
||
print(f" cycle {c:>3}/{n_cycles} "
|
||
f"{poller.actual_hz:5.1f} Hz "
|
||
f"unlocks so far: {n_unlocks}", flush=True)
|
||
time.sleep(off_s)
|
||
return style_start, time.time()
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# Analysis
|
||
# -------------------------------------------------------------------------
|
||
def analyse(styles_run: list[dict], unlocks: list[dict],
|
||
transitions: list[dict]) -> None:
|
||
"""Bucket unlocks by which style was active when they fired."""
|
||
print("\n" + "="*78)
|
||
print(" STOP-STYLE COMPARISON")
|
||
print("="*78)
|
||
print(f"\n {'style':<18} {'cycles':>6} {'unlocks':>8} "
|
||
f"{'per_cycle':>10} {'med_pulse':>10} {'min_pulse':>10} {'max_pulse':>10}")
|
||
print(f" {'-'*18} {'-'*6} {'-'*8} {'-'*10} {'-'*10} {'-'*10} {'-'*10}")
|
||
|
||
for s in styles_run:
|
||
in_window = [u for u in unlocks
|
||
if s["t_start"] <= u["start_ts"] <= s["t_end"]]
|
||
durs = [u["duration_ms"] for u in in_window]
|
||
n = len(in_window)
|
||
per_cycle = n / s["cycles"]
|
||
med = statistics.median(durs) if durs else 0.0
|
||
mn = min(durs) if durs else 0.0
|
||
mx = max(durs) if durs else 0.0
|
||
print(f" {s['label']:<18} {s['cycles']:>6} {n:>8} "
|
||
f"{per_cycle:>10.2f} "
|
||
f"{med:>8.1f}ms {mn:>8.1f}ms {mx:>8.1f}ms")
|
||
|
||
# also: distribution after STOP (we expect 100% if behaviour is same as baseline)
|
||
stops_in_style = [t for t in transitions
|
||
if t["style"] == s["label"] and t["event"] == "stop"]
|
||
after_stop = 0
|
||
after_start = 0
|
||
for u in in_window:
|
||
nearest_stop = min((t["ts"] for t in stops_in_style),
|
||
key=lambda ts: abs(ts - u["start_ts"]),
|
||
default=None)
|
||
starts_in_style = [t for t in transitions
|
||
if t["style"] == s["label"] and t["event"] == "start"]
|
||
nearest_start = min((t["ts"] for t in starts_in_style),
|
||
key=lambda ts: abs(ts - u["start_ts"]),
|
||
default=None)
|
||
if nearest_stop is None or nearest_start is None:
|
||
continue
|
||
d_stop = u["start_ts"] - nearest_stop
|
||
d_start = u["start_ts"] - nearest_start
|
||
if 0 <= d_stop <= 3.0 and (d_start < 0 or d_stop < d_start):
|
||
after_stop += 1
|
||
elif 0 <= d_start <= 3.0:
|
||
after_start += 1
|
||
if n:
|
||
print(f" {'':<18} ↳ after STOP: {after_stop}, after START: {after_start},"
|
||
f" other: {n - after_stop - after_start}")
|
||
|
||
|
||
def save_log(styles_run, unlocks, transitions, samples) -> Path:
|
||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
out = LOG_DIR / f"{ts}_compare.json"
|
||
out.write_text(json.dumps({
|
||
"saved_at": ts,
|
||
"styles": styles_run,
|
||
"unlocks": unlocks,
|
||
"transitions": transitions,
|
||
"n_samples": len(samples),
|
||
}, indent=2, default=str))
|
||
return out
|
||
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(
|
||
description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument("--cycles", type=int, default=20,
|
||
help="number of cycles per style (default 20)")
|
||
ap.add_argument("--on-s", type=float, default=10.0,
|
||
help="seconds video is ON per cycle (default 10)")
|
||
ap.add_argument("--off-s", type=float, default=0.5,
|
||
help="seconds video is OFF per cycle (default 0.5)")
|
||
ap.add_argument("--settle-s", type=float, default=2.0,
|
||
help="seconds to wait between styles (default 2)")
|
||
args = ap.parse_args()
|
||
|
||
if not STYLES:
|
||
print("No styles configured — edit STYLES at the top of this file.")
|
||
sys.exit(1)
|
||
|
||
print(f"compare_stops — {len(STYLES)} style(s) × {args.cycles} cycles "
|
||
f"({args.on_s}s on / {args.off_s}s off)")
|
||
print(f" styles: {[s[0] for s in STYLES]}")
|
||
|
||
poller = SN65Poller()
|
||
poller.start()
|
||
print(f" SN65 polling started — waiting 1s for first sample...")
|
||
time.sleep(1.0)
|
||
|
||
styles_run = []
|
||
transitions: list = []
|
||
try:
|
||
for label, start_p, stop_p in STYLES:
|
||
t0, t1 = run_style(label, start_p, stop_p,
|
||
args.cycles, args.on_s, args.off_s,
|
||
poller, transitions)
|
||
styles_run.append({"label": label, "cycles": args.cycles,
|
||
"t_start": t0, "t_end": t1,
|
||
"start_payload": start_p,
|
||
"stop_payload": stop_p})
|
||
# Settle so unlocks from one style don't get attributed to the next
|
||
print(f" settling {args.settle_s}s ...")
|
||
time.sleep(args.settle_s)
|
||
except KeyboardInterrupt:
|
||
print("\nInterrupted — leaving video stopped")
|
||
put_video({"action": "stop"}, "stop-on-exit")
|
||
|
||
poller.stop()
|
||
|
||
unlocks = poller.unlocks()
|
||
print(f"\n SN65 poller: actual ~{poller.actual_hz:.1f} Hz, "
|
||
f"{poller.err_count} HTTP errors, "
|
||
f"{len(unlocks)} total PLL unlock(s)")
|
||
|
||
analyse(styles_run, unlocks, transitions)
|
||
out = save_log(styles_run, unlocks, transitions, poller.samples)
|
||
print(f"\n log saved → {out.relative_to(LOG_DIR.parent.parent)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |