Files
MiPi_TEST/compare_stops.py
david rice d73aa2f2a4 Changes
2026-05-11 16:14:19 +01:00

321 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
compare_stops.py — Compare different video-stop styles for SN65 PLL impact.
For each "stop style" defined in STYLES, this script:
1. Polls /sn65_registers in a background thread (50 Hz, like sn65_monitor)
2. Runs N cycles of start → on_s seconds → stop → off_s seconds
3. Records PLL unlock events and which style was active when each fired
At the end it prints a comparison table: per-style cycles run, unlocks seen,
unlocks-per-stop probability, and unlock pulse-width stats. This is the
test you use to verify that switching the device's "stop" semantics (e.g.
GStreamer PAUSE instead of NULL) actually reduces the unlock rate.
Edit STYLES at the top of the file to add new stop variants as your
device server gains them. The start payload can vary too if you want
to test different start sequences.
Usage:
python3 compare_stops.py # 20 cycles per style
python3 compare_stops.py --cycles 50 --on-s 5
"""
from __future__ import annotations
import argparse
import json
import statistics
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
POLL_DT_S = 0.02 # 50 Hz target
HTTP_TIMEOUT_S = 0.2
LOG_DIR = Path(__file__).parent / "data" / "compare_logs"
# -------------------------------------------------------------------------
# STYLES — edit this list to add new stop variants
# -------------------------------------------------------------------------
# Each entry: (label, start_payload_dict, stop_payload_dict)
# The default first entry is the current /video stop behaviour.
# When your device server supports alternative actions (e.g. "pause"),
# uncomment / add the corresponding line below.
STYLES: list[tuple[str, dict, dict]] = [
("stop_full",
{"action": "start", "mode": "static-pink"},
{"action": "stop"}),
# === Add styles below once the device server supports them ===
# ("stop_pause", # GST_STATE_PAUSED — keeps DSI clock alive
# {"action": "start", "mode": "static-pink"},
# {"action": "pause"}),
# ("stop_ready", # GST_STATE_READY — half-way teardown
# {"action": "start", "mode": "static-pink"},
# {"action": "ready"}),
# ("stop_keep_clk",
# {"action": "start", "mode": "static-pink"},
# {"action": "stop", "keep_clock": True}),
]
# -------------------------------------------------------------------------
# SN65 polling thread
# -------------------------------------------------------------------------
class SN65Poller:
"""Background thread polling /sn65_registers; records every PLL transition."""
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
def __init__(self) -> None:
self.sess = requests.Session()
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self.changes: list[dict] = []
self.samples: list[dict] = [] # every poll: {ts, state, err?}
self._last_state: dict | None = None
self.actual_hz = 0.0
self.err_count = 0
def start(self) -> None:
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
def _run(self) -> None:
n_polls = 0
rate_t0 = time.time()
while not self._stop.is_set():
t0 = time.time()
try:
r = self.sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
data = r.json()
except requests.exceptions.RequestException:
self.samples.append({"ts": t0, "error": True})
self.err_count += 1
if t0 - rate_t0 > 2.0:
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
time.sleep(POLL_DT_S)
continue
regs = data.get("registers", {})
csr0a = regs.get("csr_0a", {}) or {}
csre5 = regs.get("csr_e5", {}) or {}
state = {
"csr_0a": csr0a.get("value"),
"csr_e5": csre5.get("value"),
"pll_lock": csr0a.get("pll_lock"),
"clk_det": csr0a.get("clk_det"),
}
for k in self.ERROR_BITS:
state[k] = csre5.get(k)
self.samples.append({"ts": t0, "state": state})
n_polls += 1
if self._last_state is not None and state != self._last_state:
delta = {k: (self._last_state.get(k), state.get(k))
for k in state if state.get(k) != self._last_state.get(k)}
self.changes.append({"ts": t0, "delta": delta, "new_state": state})
self._last_state = state
if t0 - rate_t0 > 2.0:
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
def unlocks(self) -> list[dict]:
"""Pair pll_lock False→True transitions into [{start_ts, end_ts, duration_ms}]."""
pll = [c for c in self.changes if "pll_lock" in c["delta"]]
out, i = [], 0
while i < len(pll):
_, new = pll[i]["delta"]["pll_lock"]
if new is False:
for j in range(i+1, min(i+20, len(pll))):
_, n_new = pll[j]["delta"]["pll_lock"]
if n_new is True:
out.append({"start_ts": pll[i]["ts"],
"end_ts": pll[j]["ts"],
"duration_ms": (pll[j]["ts"]-pll[i]["ts"])*1000})
i = j
break
i += 1
return out
# -------------------------------------------------------------------------
# Cycle runner
# -------------------------------------------------------------------------
def put_video(payload: dict, label: str) -> None:
try:
requests.put(VIDEO_URL, json=payload, timeout=3.0)
except requests.exceptions.RequestException as e:
print(f" {label} HTTP failed: {e}")
def run_style(label: str, start_payload: dict, stop_payload: dict,
n_cycles: int, on_s: float, off_s: float,
poller: SN65Poller, transitions: list) -> tuple[float, float]:
"""Run n cycles of the given style. Returns (start_ts, end_ts)."""
print(f"\n▸ style '{label}'")
print(f" start={start_payload} stop={stop_payload} cycles={n_cycles}")
style_start = time.time()
for c in range(1, n_cycles + 1):
t = time.time()
put_video(start_payload, "start")
transitions.append({"ts": t, "event": "start", "style": label, "cycle": c})
time.sleep(on_s)
t = time.time()
put_video(stop_payload, "stop")
transitions.append({"ts": t, "event": "stop", "style": label, "cycle": c})
if c % 5 == 0 or c == n_cycles:
n_unlocks = len(poller.unlocks())
print(f" cycle {c:>3}/{n_cycles} "
f"{poller.actual_hz:5.1f} Hz "
f"unlocks so far: {n_unlocks}", flush=True)
time.sleep(off_s)
return style_start, time.time()
# -------------------------------------------------------------------------
# Analysis
# -------------------------------------------------------------------------
def analyse(styles_run: list[dict], unlocks: list[dict],
transitions: list[dict]) -> None:
"""Bucket unlocks by which style was active when they fired."""
print("\n" + "="*78)
print(" STOP-STYLE COMPARISON")
print("="*78)
print(f"\n {'style':<18} {'cycles':>6} {'unlocks':>8} "
f"{'per_cycle':>10} {'med_pulse':>10} {'min_pulse':>10} {'max_pulse':>10}")
print(f" {'-'*18} {'-'*6} {'-'*8} {'-'*10} {'-'*10} {'-'*10} {'-'*10}")
for s in styles_run:
in_window = [u for u in unlocks
if s["t_start"] <= u["start_ts"] <= s["t_end"]]
durs = [u["duration_ms"] for u in in_window]
n = len(in_window)
per_cycle = n / s["cycles"]
med = statistics.median(durs) if durs else 0.0
mn = min(durs) if durs else 0.0
mx = max(durs) if durs else 0.0
print(f" {s['label']:<18} {s['cycles']:>6} {n:>8} "
f"{per_cycle:>10.2f} "
f"{med:>8.1f}ms {mn:>8.1f}ms {mx:>8.1f}ms")
# also: distribution after STOP (we expect 100% if behaviour is same as baseline)
stops_in_style = [t for t in transitions
if t["style"] == s["label"] and t["event"] == "stop"]
after_stop = 0
after_start = 0
for u in in_window:
nearest_stop = min((t["ts"] for t in stops_in_style),
key=lambda ts: abs(ts - u["start_ts"]),
default=None)
starts_in_style = [t for t in transitions
if t["style"] == s["label"] and t["event"] == "start"]
nearest_start = min((t["ts"] for t in starts_in_style),
key=lambda ts: abs(ts - u["start_ts"]),
default=None)
if nearest_stop is None or nearest_start is None:
continue
d_stop = u["start_ts"] - nearest_stop
d_start = u["start_ts"] - nearest_start
if 0 <= d_stop <= 3.0 and (d_start < 0 or d_stop < d_start):
after_stop += 1
elif 0 <= d_start <= 3.0:
after_start += 1
if n:
print(f" {'':<18} ↳ after STOP: {after_stop}, after START: {after_start},"
f" other: {n - after_stop - after_start}")
def save_log(styles_run, unlocks, transitions, samples) -> Path:
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out = LOG_DIR / f"{ts}_compare.json"
out.write_text(json.dumps({
"saved_at": ts,
"styles": styles_run,
"unlocks": unlocks,
"transitions": transitions,
"n_samples": len(samples),
}, indent=2, default=str))
return out
def main() -> None:
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--cycles", type=int, default=20,
help="number of cycles per style (default 20)")
ap.add_argument("--on-s", type=float, default=10.0,
help="seconds video is ON per cycle (default 10)")
ap.add_argument("--off-s", type=float, default=0.5,
help="seconds video is OFF per cycle (default 0.5)")
ap.add_argument("--settle-s", type=float, default=2.0,
help="seconds to wait between styles (default 2)")
args = ap.parse_args()
if not STYLES:
print("No styles configured — edit STYLES at the top of this file.")
sys.exit(1)
print(f"compare_stops — {len(STYLES)} style(s) × {args.cycles} cycles "
f"({args.on_s}s on / {args.off_s}s off)")
print(f" styles: {[s[0] for s in STYLES]}")
poller = SN65Poller()
poller.start()
print(f" SN65 polling started — waiting 1s for first sample...")
time.sleep(1.0)
styles_run = []
transitions: list = []
try:
for label, start_p, stop_p in STYLES:
t0, t1 = run_style(label, start_p, stop_p,
args.cycles, args.on_s, args.off_s,
poller, transitions)
styles_run.append({"label": label, "cycles": args.cycles,
"t_start": t0, "t_end": t1,
"start_payload": start_p,
"stop_payload": stop_p})
# Settle so unlocks from one style don't get attributed to the next
print(f" settling {args.settle_s}s ...")
time.sleep(args.settle_s)
except KeyboardInterrupt:
print("\nInterrupted — leaving video stopped")
put_video({"action": "stop"}, "stop-on-exit")
poller.stop()
unlocks = poller.unlocks()
print(f"\n SN65 poller: actual ~{poller.actual_hz:.1f} Hz, "
f"{poller.err_count} HTTP errors, "
f"{len(unlocks)} total PLL unlock(s)")
analyse(styles_run, unlocks, transitions)
out = save_log(styles_run, unlocks, transitions, poller.samples)
print(f"\n log saved → {out.relative_to(LOG_DIR.parent.parent)}")
if __name__ == "__main__":
main()