This commit is contained in:
david rice
2026-05-11 16:14:19 +01:00
parent 8d8df1e7a7
commit d73aa2f2a4
5 changed files with 957 additions and 17 deletions

183
analyze_session.py Normal file
View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""
analyze_session.py — Cross-reference sn65_monitor unlocks against
video_cycler events to determine whether unlocks are caused by video
START transitions, STOP transitions, or both.
Usage:
python3 analyze_session.py # use latest sn65 + latest cycle log
python3 analyze_session.py <sn65.json> # specify sn65 log
python3 analyze_session.py <sn65.json> <cycle.csv>
"""
from __future__ import annotations
import csv
import json
import statistics
import sys
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
SN65_DIR = DATA_DIR / "sn65_log"
CYCLE_DIR = DATA_DIR / "cycle_logs"
def find_latest(dir_path: Path, glob: str) -> Path | None:
files = sorted(dir_path.glob(glob))
return files[-1] if files else None
def load_unlocks(sn65_path: Path) -> list[dict]:
"""Pair pll_lock False→True transitions and return [{start_ts, end_ts, duration_ms}, ...]."""
data = json.loads(sn65_path.read_text())
changes = sorted(data.get("session_changes", []), key=lambda c: c["ts"])
pll = [c for c in changes if "pll_lock" in c.get("delta", {})]
unlocks: list[dict] = []
i = 0
while i < len(pll):
e = pll[i]
_, new = e["delta"]["pll_lock"]
if new is False:
for j in range(i + 1, min(i + 20, len(pll))):
_, n_new = pll[j]["delta"]["pll_lock"]
if n_new is True:
unlocks.append({
"start_ts": e["ts"],
"end_ts": pll[j]["ts"],
"duration_ms": (pll[j]["ts"] - e["ts"]) * 1000,
"iso": e["iso"],
})
i = j
break
i += 1
return unlocks
def load_cycle_events(cycle_path: Path) -> list[dict]:
"""Load timestamped video start/stop events from a cycle log CSV."""
out: list[dict] = []
with open(cycle_path) as f:
for row in csv.DictReader(f):
out.append({
"ts": float(row["unix_ts"]),
"event": row["event"],
"cycle": int(row["cycle"]),
"iso": row["iso"],
})
return out
def nearest_event(unlock_ts: float, events: list[dict],
event_type: str | None = None) -> dict | None:
candidates = events if event_type is None else [e for e in events if e["event"] == event_type]
if not candidates:
return None
return min(candidates, key=lambda e: abs(e["ts"] - unlock_ts))
def main() -> None:
args = sys.argv[1:]
sn65_path = Path(args[0]) if len(args) >= 1 else find_latest(SN65_DIR, "*.json")
cycle_path = Path(args[1]) if len(args) >= 2 else find_latest(CYCLE_DIR, "*.csv")
if sn65_path is None or not sn65_path.exists():
print(f"No sn65 log found.")
sys.exit(1)
print(f"sn65 log: {sn65_path}")
print(f"cycle log: {cycle_path if cycle_path else '(none — START/STOP correlation unavailable)'}")
# === Pulse-width analysis ===
unlocks = load_unlocks(sn65_path)
print(f"\n=== PLL unlocks ===")
print(f" total: {len(unlocks)}")
if not unlocks:
print(" no unlocks — nothing more to analyse")
return
durs = sorted(u["duration_ms"] for u in unlocks)
n = len(durs)
print(f" pulse-width: min={durs[0]:.1f} ms med={durs[n//2]:.1f} ms"
f" p90={durs[min(n*9//10, n-1)]:.1f} ms max={durs[-1]:.1f} ms")
print(f" per-event:")
for u in unlocks:
print(f" {u['iso']} duration {u['duration_ms']:6.1f} ms")
# === Correlation against cycle events ===
if cycle_path is None or not cycle_path.exists():
return
events = load_cycle_events(cycle_path)
n_start = sum(1 for e in events if e["event"] == "start")
n_stop = sum(1 for e in events if e["event"] == "stop")
print(f"\n=== Cycle events ===")
print(f" {len(events)} events total ({n_start} start, {n_stop} stop)")
if events:
print(f" first: {events[0]['iso']} last: {events[-1]['iso']}")
# For each unlock find offsets to nearest start and nearest stop
print(f"\n=== Unlock vs. nearest cycle event ===")
print(f" {'unlock iso':<25} {'dur_ms':>7} "
f"{'Δ_to_START':>12} {'Δ_to_STOP':>12} verdict")
near_start = [] # offsets (s) from nearest START to each unlock
near_stop = []
starts = [e for e in events if e["event"] == "start"]
stops = [e for e in events if e["event"] == "stop"]
classifications = {"start": 0, "stop": 0, "neither": 0}
for u in unlocks:
ns = nearest_event(u["start_ts"], starts)
nt = nearest_event(u["start_ts"], stops)
d_start = (u["start_ts"] - ns["ts"]) if ns else None
d_stop = (u["start_ts"] - nt["ts"]) if nt else None
# Classify: "after start" if 0..3s after a start, "after stop" if 0..3s after a stop.
verdict = "neither"
if d_start is not None and 0.0 <= d_start <= 3.0 \
and (d_stop is None or d_stop < 0 or d_stop > d_start):
verdict = "after_START"
near_start.append(d_start)
classifications["start"] += 1
elif d_stop is not None and 0.0 <= d_stop <= 3.0:
verdict = "after_STOP"
near_stop.append(d_stop)
classifications["stop"] += 1
else:
classifications["neither"] += 1
def fmt(v):
if v is None: return ""
return f"{v:+8.2f}s"
print(f" {u['iso']:<25} {u['duration_ms']:>7.1f} "
f"{fmt(d_start):>12} {fmt(d_stop):>12} {verdict}")
total = sum(classifications.values())
print(f"\n=== Verdict ===")
if total:
print(f" after START : {classifications['start']:>3} / {total} "
f"({classifications['start']/total*100:.0f}%)")
print(f" after STOP : {classifications['stop']:>3} / {total} "
f"({classifications['stop']/total*100:.0f}%)")
print(f" neither : {classifications['neither']:>3} / {total} "
f"({classifications['neither']/total*100:.0f}%)")
if near_start:
print(f"\n offsets after START (n={len(near_start)}): "
f"med={statistics.median(near_start)*1000:.0f} ms, "
f"min={min(near_start)*1000:.0f} ms, max={max(near_start)*1000:.0f} ms")
if near_stop:
print(f" offsets after STOP (n={len(near_stop)}): "
f"med={statistics.median(near_stop)*1000:.0f} ms, "
f"min={min(near_stop)*1000:.0f} ms, max={max(near_stop)*1000:.0f} ms")
# Per-cycle unlock rate
if events:
cycle_window_s = events[-1]["ts"] - events[0]["ts"]
n_cycles = max(1, max(e["cycle"] for e in events))
print(f"\n cycles seen: {n_cycles} total span: {cycle_window_s:.1f}s")
print(f" unlocks per cycle: {len(unlocks)/n_cycles:.2f}")
print(f" unlocks per minute: {len(unlocks)/cycle_window_s*60:.2f}")
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

321
compare_stops.py Normal file
View File

@@ -0,0 +1,321 @@
#!/usr/bin/env python3
"""
compare_stops.py — Compare different video-stop styles for SN65 PLL impact.
For each "stop style" defined in STYLES, this script:
1. Polls /sn65_registers in a background thread (50 Hz, like sn65_monitor)
2. Runs N cycles of start → on_s seconds → stop → off_s seconds
3. Records PLL unlock events and which style was active when each fired
At the end it prints a comparison table: per-style cycles run, unlocks seen,
unlocks-per-stop probability, and unlock pulse-width stats. This is the
test you use to verify that switching the device's "stop" semantics (e.g.
GStreamer PAUSE instead of NULL) actually reduces the unlock rate.
Edit STYLES at the top of the file to add new stop variants as your
device server gains them. The start payload can vary too if you want
to test different start sequences.
Usage:
python3 compare_stops.py # 20 cycles per style
python3 compare_stops.py --cycles 50 --on-s 5
"""
from __future__ import annotations
import argparse
import json
import statistics
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
POLL_DT_S = 0.02 # 50 Hz target
HTTP_TIMEOUT_S = 0.2
LOG_DIR = Path(__file__).parent / "data" / "compare_logs"
# -------------------------------------------------------------------------
# STYLES — edit this list to add new stop variants
# -------------------------------------------------------------------------
# Each entry: (label, start_payload_dict, stop_payload_dict)
# The default first entry is the current /video stop behaviour.
# When your device server supports alternative actions (e.g. "pause"),
# uncomment / add the corresponding line below.
STYLES: list[tuple[str, dict, dict]] = [
("stop_full",
{"action": "start", "mode": "static-pink"},
{"action": "stop"}),
# === Add styles below once the device server supports them ===
# ("stop_pause", # GST_STATE_PAUSED — keeps DSI clock alive
# {"action": "start", "mode": "static-pink"},
# {"action": "pause"}),
# ("stop_ready", # GST_STATE_READY — half-way teardown
# {"action": "start", "mode": "static-pink"},
# {"action": "ready"}),
# ("stop_keep_clk",
# {"action": "start", "mode": "static-pink"},
# {"action": "stop", "keep_clock": True}),
]
# -------------------------------------------------------------------------
# SN65 polling thread
# -------------------------------------------------------------------------
class SN65Poller:
"""Background thread polling /sn65_registers; records every PLL transition."""
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
def __init__(self) -> None:
self.sess = requests.Session()
self._stop = threading.Event()
self._thread: threading.Thread | None = None
self.changes: list[dict] = []
self.samples: list[dict] = [] # every poll: {ts, state, err?}
self._last_state: dict | None = None
self.actual_hz = 0.0
self.err_count = 0
def start(self) -> None:
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread:
self._thread.join(timeout=2)
def _run(self) -> None:
n_polls = 0
rate_t0 = time.time()
while not self._stop.is_set():
t0 = time.time()
try:
r = self.sess.get(SN65_EP, timeout=HTTP_TIMEOUT_S)
r.raise_for_status()
data = r.json()
except requests.exceptions.RequestException:
self.samples.append({"ts": t0, "error": True})
self.err_count += 1
if t0 - rate_t0 > 2.0:
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
time.sleep(POLL_DT_S)
continue
regs = data.get("registers", {})
csr0a = regs.get("csr_0a", {}) or {}
csre5 = regs.get("csr_e5", {}) or {}
state = {
"csr_0a": csr0a.get("value"),
"csr_e5": csre5.get("value"),
"pll_lock": csr0a.get("pll_lock"),
"clk_det": csr0a.get("clk_det"),
}
for k in self.ERROR_BITS:
state[k] = csre5.get(k)
self.samples.append({"ts": t0, "state": state})
n_polls += 1
if self._last_state is not None and state != self._last_state:
delta = {k: (self._last_state.get(k), state.get(k))
for k in state if state.get(k) != self._last_state.get(k)}
self.changes.append({"ts": t0, "delta": delta, "new_state": state})
self._last_state = state
if t0 - rate_t0 > 2.0:
self.actual_hz = n_polls / (t0 - rate_t0); rate_t0 = t0; n_polls = 0
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
def unlocks(self) -> list[dict]:
"""Pair pll_lock False→True transitions into [{start_ts, end_ts, duration_ms}]."""
pll = [c for c in self.changes if "pll_lock" in c["delta"]]
out, i = [], 0
while i < len(pll):
_, new = pll[i]["delta"]["pll_lock"]
if new is False:
for j in range(i+1, min(i+20, len(pll))):
_, n_new = pll[j]["delta"]["pll_lock"]
if n_new is True:
out.append({"start_ts": pll[i]["ts"],
"end_ts": pll[j]["ts"],
"duration_ms": (pll[j]["ts"]-pll[i]["ts"])*1000})
i = j
break
i += 1
return out
# -------------------------------------------------------------------------
# Cycle runner
# -------------------------------------------------------------------------
def put_video(payload: dict, label: str) -> None:
try:
requests.put(VIDEO_URL, json=payload, timeout=3.0)
except requests.exceptions.RequestException as e:
print(f" {label} HTTP failed: {e}")
def run_style(label: str, start_payload: dict, stop_payload: dict,
n_cycles: int, on_s: float, off_s: float,
poller: SN65Poller, transitions: list) -> tuple[float, float]:
"""Run n cycles of the given style. Returns (start_ts, end_ts)."""
print(f"\n▸ style '{label}'")
print(f" start={start_payload} stop={stop_payload} cycles={n_cycles}")
style_start = time.time()
for c in range(1, n_cycles + 1):
t = time.time()
put_video(start_payload, "start")
transitions.append({"ts": t, "event": "start", "style": label, "cycle": c})
time.sleep(on_s)
t = time.time()
put_video(stop_payload, "stop")
transitions.append({"ts": t, "event": "stop", "style": label, "cycle": c})
if c % 5 == 0 or c == n_cycles:
n_unlocks = len(poller.unlocks())
print(f" cycle {c:>3}/{n_cycles} "
f"{poller.actual_hz:5.1f} Hz "
f"unlocks so far: {n_unlocks}", flush=True)
time.sleep(off_s)
return style_start, time.time()
# -------------------------------------------------------------------------
# Analysis
# -------------------------------------------------------------------------
def analyse(styles_run: list[dict], unlocks: list[dict],
transitions: list[dict]) -> None:
"""Bucket unlocks by which style was active when they fired."""
print("\n" + "="*78)
print(" STOP-STYLE COMPARISON")
print("="*78)
print(f"\n {'style':<18} {'cycles':>6} {'unlocks':>8} "
f"{'per_cycle':>10} {'med_pulse':>10} {'min_pulse':>10} {'max_pulse':>10}")
print(f" {'-'*18} {'-'*6} {'-'*8} {'-'*10} {'-'*10} {'-'*10} {'-'*10}")
for s in styles_run:
in_window = [u for u in unlocks
if s["t_start"] <= u["start_ts"] <= s["t_end"]]
durs = [u["duration_ms"] for u in in_window]
n = len(in_window)
per_cycle = n / s["cycles"]
med = statistics.median(durs) if durs else 0.0
mn = min(durs) if durs else 0.0
mx = max(durs) if durs else 0.0
print(f" {s['label']:<18} {s['cycles']:>6} {n:>8} "
f"{per_cycle:>10.2f} "
f"{med:>8.1f}ms {mn:>8.1f}ms {mx:>8.1f}ms")
# also: distribution after STOP (we expect 100% if behaviour is same as baseline)
stops_in_style = [t for t in transitions
if t["style"] == s["label"] and t["event"] == "stop"]
after_stop = 0
after_start = 0
for u in in_window:
nearest_stop = min((t["ts"] for t in stops_in_style),
key=lambda ts: abs(ts - u["start_ts"]),
default=None)
starts_in_style = [t for t in transitions
if t["style"] == s["label"] and t["event"] == "start"]
nearest_start = min((t["ts"] for t in starts_in_style),
key=lambda ts: abs(ts - u["start_ts"]),
default=None)
if nearest_stop is None or nearest_start is None:
continue
d_stop = u["start_ts"] - nearest_stop
d_start = u["start_ts"] - nearest_start
if 0 <= d_stop <= 3.0 and (d_start < 0 or d_stop < d_start):
after_stop += 1
elif 0 <= d_start <= 3.0:
after_start += 1
if n:
print(f" {'':<18} ↳ after STOP: {after_stop}, after START: {after_start},"
f" other: {n - after_stop - after_start}")
def save_log(styles_run, unlocks, transitions, samples) -> Path:
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
out = LOG_DIR / f"{ts}_compare.json"
out.write_text(json.dumps({
"saved_at": ts,
"styles": styles_run,
"unlocks": unlocks,
"transitions": transitions,
"n_samples": len(samples),
}, indent=2, default=str))
return out
def main() -> None:
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--cycles", type=int, default=20,
help="number of cycles per style (default 20)")
ap.add_argument("--on-s", type=float, default=10.0,
help="seconds video is ON per cycle (default 10)")
ap.add_argument("--off-s", type=float, default=0.5,
help="seconds video is OFF per cycle (default 0.5)")
ap.add_argument("--settle-s", type=float, default=2.0,
help="seconds to wait between styles (default 2)")
args = ap.parse_args()
if not STYLES:
print("No styles configured — edit STYLES at the top of this file.")
sys.exit(1)
print(f"compare_stops — {len(STYLES)} style(s) × {args.cycles} cycles "
f"({args.on_s}s on / {args.off_s}s off)")
print(f" styles: {[s[0] for s in STYLES]}")
poller = SN65Poller()
poller.start()
print(f" SN65 polling started — waiting 1s for first sample...")
time.sleep(1.0)
styles_run = []
transitions: list = []
try:
for label, start_p, stop_p in STYLES:
t0, t1 = run_style(label, start_p, stop_p,
args.cycles, args.on_s, args.off_s,
poller, transitions)
styles_run.append({"label": label, "cycles": args.cycles,
"t_start": t0, "t_end": t1,
"start_payload": start_p,
"stop_payload": stop_p})
# Settle so unlocks from one style don't get attributed to the next
print(f" settling {args.settle_s}s ...")
time.sleep(args.settle_s)
except KeyboardInterrupt:
print("\nInterrupted — leaving video stopped")
put_video({"action": "stop"}, "stop-on-exit")
poller.stop()
unlocks = poller.unlocks()
print(f"\n SN65 poller: actual ~{poller.actual_hz:.1f} Hz, "
f"{poller.err_count} HTTP errors, "
f"{len(unlocks)} total PLL unlock(s)")
analyse(styles_run, unlocks, transitions)
out = save_log(styles_run, unlocks, transitions, poller.samples)
print(f"\n log saved → {out.relative_to(LOG_DIR.parent.parent)}")
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@@ -1,25 +1,64 @@
#!/usr/bin/env python3
"""
video_cycler.py — Toggle /video start/stop on the device every CYCLE_S seconds.
video_cycler.py — Toggle /video start/stop on the device.
Pairs with sn65_monitor.py: this script provokes the flicker by cycling the
static-pink video stream, while sn65_monitor measures. Press Ctrl+C to stop.
Pairs with sn65_monitor.py: this script provokes flicker by cycling the
static-pink video stream, while sn65_monitor measures. All start/stop
events are timestamp-logged to data/cycle_logs/{ts}.csv so we can later
cross-reference PLL unlocks against the precise transition moments.
Modes:
python3 video_cycler.py # 10 s on / 0.5 s off, forever
python3 video_cycler.py --on-s 5 --off-s 2 # 5 s on, 2 s off
python3 video_cycler.py --cycles 30 # stop after 30 cycles
python3 video_cycler.py --hold # one start, hold forever
Press Ctrl+C to stop (always sends a final video stop).
"""
import argparse
import csv
import signal
import sys
import time
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
CYCLE_S = 10.0
HTTP_TIMEOUT_S = 3.0
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
HTTP_TIMEOUT_S = 3.0
LOG_DIR = Path(__file__).parent / "data" / "cycle_logs"
def video_start() -> None:
_log_writer = None
_log_file = None
def _open_log() -> Path:
"""Open a fresh cycle-event CSV in LOG_DIR; return its path."""
global _log_writer, _log_file
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = LOG_DIR / f"{ts}_cycles.csv"
_log_file = open(path, "w", newline="")
_log_writer = csv.writer(_log_file)
_log_writer.writerow(["iso", "unix_ts", "event", "cycle"])
_log_file.flush()
return path
def _log_event(event: str, cycle: int) -> None:
t = time.time()
iso = datetime.fromtimestamp(t).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
if _log_writer is not None:
_log_writer.writerow([iso, f"{t:.6f}", event, cycle])
_log_file.flush()
def video_start(cycle: int = 0) -> None:
_log_event("start", cycle)
try:
requests.put(VIDEO_URL,
json={"action": "start", "mode": "static-pink"},
@@ -28,7 +67,8 @@ def video_start() -> None:
print(f" video START failed: {e}")
def video_stop() -> None:
def video_stop(cycle: int = 0) -> None:
_log_event("stop", cycle)
try:
requests.put(VIDEO_URL, json={"action": "stop"},
timeout=HTTP_TIMEOUT_S)
@@ -37,24 +77,66 @@ def video_stop() -> None:
def main() -> None:
# On Ctrl+C, make sure we leave video stopped.
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
ap.add_argument("--on-s", type=float, default=10.0,
help="seconds video is ON per cycle (default 10)")
ap.add_argument("--off-s", type=float, default=0.5,
help="seconds video is OFF per cycle (default 0.5)")
ap.add_argument("--cycles", type=int, default=0,
help="stop after this many cycles (0 = forever, default)")
ap.add_argument("--hold", action="store_true",
help="Send a single video START and hold (no cycling). "
"Use as a baseline: do unlocks still happen when "
"we don't toggle on/off?")
args = ap.parse_args()
log_path = _open_log()
print(f" event log → {log_path.relative_to(LOG_DIR.parent.parent)}")
def _shutdown(*_):
print("\nshutting down — video off")
video_stop()
video_stop(cycle=-1)
if _log_file:
_log_file.close()
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print(f"VIDEO CYCLER — {CYCLE_S:.0f} s on / 0.5 s off (Ctrl+C to stop)\n")
if args.hold:
ts = datetime.now().strftime("%H:%M:%S")
print(f"VIDEO HOLD — video ON, no cycling (Ctrl+C to stop)")
print(f"[{ts}] video START\n", flush=True)
video_start(cycle=0)
elapsed = 0
while True:
time.sleep(30.0)
elapsed += 30
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] still holding — {elapsed}s elapsed", flush=True)
on_s, off_s = args.on_s, args.off_s
print(f"VIDEO CYCLER — {on_s:.1f}s on / {off_s:.1f}s off"
+ (f", {args.cycles} cycles" if args.cycles else ", forever")
+ " (Ctrl+C to stop)\n")
cycle = 0
while True:
cycle += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] cycle {cycle:04d} video ON", flush=True)
video_start()
time.sleep(CYCLE_S)
video_stop()
time.sleep(0.5)
print(f"[{ts}] cycle {cycle:04d} START", flush=True)
video_start(cycle=cycle)
time.sleep(on_s)
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] cycle {cycle:04d} STOP", flush=True)
video_stop(cycle=cycle)
if args.cycles and cycle >= args.cycles:
print(f"\nReached {args.cycles} cycles, exiting.")
if _log_file:
_log_file.close()
return
time.sleep(off_s)
if __name__ == "__main__":