#!/usr/bin/env python3 """ analyze_session.py — Cross-reference sn65_monitor unlocks against video_cycler events to determine whether unlocks are caused by video START transitions, STOP transitions, or both. Usage: python3 analyze_session.py # use latest sn65 + latest cycle log python3 analyze_session.py # specify sn65 log python3 analyze_session.py """ from __future__ import annotations import csv import json import statistics import sys from pathlib import Path DATA_DIR = Path(__file__).parent / "data" SN65_DIR = DATA_DIR / "sn65_log" CYCLE_DIR = DATA_DIR / "cycle_logs" def find_latest(dir_path: Path, glob: str) -> Path | None: files = sorted(dir_path.glob(glob)) return files[-1] if files else None def load_unlocks(sn65_path: Path) -> list[dict]: """Pair pll_lock False→True transitions and return [{start_ts, end_ts, duration_ms}, ...].""" data = json.loads(sn65_path.read_text()) changes = sorted(data.get("session_changes", []), key=lambda c: c["ts"]) pll = [c for c in changes if "pll_lock" in c.get("delta", {})] unlocks: list[dict] = [] i = 0 while i < len(pll): e = pll[i] _, new = e["delta"]["pll_lock"] if new is False: for j in range(i + 1, min(i + 20, len(pll))): _, n_new = pll[j]["delta"]["pll_lock"] if n_new is True: unlocks.append({ "start_ts": e["ts"], "end_ts": pll[j]["ts"], "duration_ms": (pll[j]["ts"] - e["ts"]) * 1000, "iso": e["iso"], }) i = j break i += 1 return unlocks def load_cycle_events(cycle_path: Path) -> list[dict]: """Load timestamped video start/stop events from a cycle log CSV.""" out: list[dict] = [] with open(cycle_path) as f: for row in csv.DictReader(f): out.append({ "ts": float(row["unix_ts"]), "event": row["event"], "cycle": int(row["cycle"]), "iso": row["iso"], }) return out def nearest_event(unlock_ts: float, events: list[dict], event_type: str | None = None) -> dict | None: candidates = events if event_type is None else [e for e in events if e["event"] == event_type] if not candidates: return None return min(candidates, key=lambda e: abs(e["ts"] - unlock_ts)) def main() -> None: args = sys.argv[1:] sn65_path = Path(args[0]) if len(args) >= 1 else find_latest(SN65_DIR, "*.json") cycle_path = Path(args[1]) if len(args) >= 2 else find_latest(CYCLE_DIR, "*.csv") if sn65_path is None or not sn65_path.exists(): print(f"No sn65 log found.") sys.exit(1) print(f"sn65 log: {sn65_path}") print(f"cycle log: {cycle_path if cycle_path else '(none — START/STOP correlation unavailable)'}") # === Pulse-width analysis === unlocks = load_unlocks(sn65_path) print(f"\n=== PLL unlocks ===") print(f" total: {len(unlocks)}") if not unlocks: print(" no unlocks — nothing more to analyse") return durs = sorted(u["duration_ms"] for u in unlocks) n = len(durs) print(f" pulse-width: min={durs[0]:.1f} ms med={durs[n//2]:.1f} ms" f" p90={durs[min(n*9//10, n-1)]:.1f} ms max={durs[-1]:.1f} ms") print(f" per-event:") for u in unlocks: print(f" {u['iso']} duration {u['duration_ms']:6.1f} ms") # === Correlation against cycle events === if cycle_path is None or not cycle_path.exists(): return events = load_cycle_events(cycle_path) n_start = sum(1 for e in events if e["event"] == "start") n_stop = sum(1 for e in events if e["event"] == "stop") print(f"\n=== Cycle events ===") print(f" {len(events)} events total ({n_start} start, {n_stop} stop)") if events: print(f" first: {events[0]['iso']} last: {events[-1]['iso']}") # For each unlock find offsets to nearest start and nearest stop print(f"\n=== Unlock vs. nearest cycle event ===") print(f" {'unlock iso':<25} {'dur_ms':>7} " f"{'Δ_to_START':>12} {'Δ_to_STOP':>12} verdict") near_start = [] # offsets (s) from nearest START to each unlock near_stop = [] starts = [e for e in events if e["event"] == "start"] stops = [e for e in events if e["event"] == "stop"] classifications = {"start": 0, "stop": 0, "neither": 0} for u in unlocks: ns = nearest_event(u["start_ts"], starts) nt = nearest_event(u["start_ts"], stops) d_start = (u["start_ts"] - ns["ts"]) if ns else None d_stop = (u["start_ts"] - nt["ts"]) if nt else None # Classify: "after start" if 0..3s after a start, "after stop" if 0..3s after a stop. verdict = "neither" if d_start is not None and 0.0 <= d_start <= 3.0 \ and (d_stop is None or d_stop < 0 or d_stop > d_start): verdict = "after_START" near_start.append(d_start) classifications["start"] += 1 elif d_stop is not None and 0.0 <= d_stop <= 3.0: verdict = "after_STOP" near_stop.append(d_stop) classifications["stop"] += 1 else: classifications["neither"] += 1 def fmt(v): if v is None: return " —" return f"{v:+8.2f}s" print(f" {u['iso']:<25} {u['duration_ms']:>7.1f} " f"{fmt(d_start):>12} {fmt(d_stop):>12} {verdict}") total = sum(classifications.values()) print(f"\n=== Verdict ===") if total: print(f" after START : {classifications['start']:>3} / {total} " f"({classifications['start']/total*100:.0f}%)") print(f" after STOP : {classifications['stop']:>3} / {total} " f"({classifications['stop']/total*100:.0f}%)") print(f" neither : {classifications['neither']:>3} / {total} " f"({classifications['neither']/total*100:.0f}%)") if near_start: print(f"\n offsets after START (n={len(near_start)}): " f"med={statistics.median(near_start)*1000:.0f} ms, " f"min={min(near_start)*1000:.0f} ms, max={max(near_start)*1000:.0f} ms") if near_stop: print(f" offsets after STOP (n={len(near_stop)}): " f"med={statistics.median(near_stop)*1000:.0f} ms, " f"min={min(near_stop)*1000:.0f} ms, max={max(near_stop)*1000:.0f} ms") # Per-cycle unlock rate if events: cycle_window_s = events[-1]["ts"] - events[0]["ts"] n_cycles = max(1, max(e["cycle"] for e in events)) print(f"\n cycles seen: {n_cycles} total span: {cycle_window_s:.1f}s") print(f" unlocks per cycle: {len(unlocks)/n_cycles:.2f}") print(f" unlocks per minute: {len(unlocks)/cycle_window_s*60:.2f}") if __name__ == "__main__": main()