183 lines
6.9 KiB
Python
183 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
analyze_session.py — Cross-reference sn65_monitor unlocks against
|
|
video_cycler events to determine whether unlocks are caused by video
|
|
START transitions, STOP transitions, or both.
|
|
|
|
Usage:
|
|
python3 analyze_session.py # use latest sn65 + latest cycle log
|
|
python3 analyze_session.py <sn65.json> # specify sn65 log
|
|
python3 analyze_session.py <sn65.json> <cycle.csv>
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import statistics
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
SN65_DIR = DATA_DIR / "sn65_log"
|
|
CYCLE_DIR = DATA_DIR / "cycle_logs"
|
|
|
|
|
|
def find_latest(dir_path: Path, glob: str) -> Path | None:
|
|
files = sorted(dir_path.glob(glob))
|
|
return files[-1] if files else None
|
|
|
|
|
|
def load_unlocks(sn65_path: Path) -> list[dict]:
|
|
"""Pair pll_lock False→True transitions and return [{start_ts, end_ts, duration_ms}, ...]."""
|
|
data = json.loads(sn65_path.read_text())
|
|
changes = sorted(data.get("session_changes", []), key=lambda c: c["ts"])
|
|
pll = [c for c in changes if "pll_lock" in c.get("delta", {})]
|
|
|
|
unlocks: list[dict] = []
|
|
i = 0
|
|
while i < len(pll):
|
|
e = pll[i]
|
|
_, new = e["delta"]["pll_lock"]
|
|
if new is False:
|
|
for j in range(i + 1, min(i + 20, len(pll))):
|
|
_, n_new = pll[j]["delta"]["pll_lock"]
|
|
if n_new is True:
|
|
unlocks.append({
|
|
"start_ts": e["ts"],
|
|
"end_ts": pll[j]["ts"],
|
|
"duration_ms": (pll[j]["ts"] - e["ts"]) * 1000,
|
|
"iso": e["iso"],
|
|
})
|
|
i = j
|
|
break
|
|
i += 1
|
|
return unlocks
|
|
|
|
|
|
def load_cycle_events(cycle_path: Path) -> list[dict]:
|
|
"""Load timestamped video start/stop events from a cycle log CSV."""
|
|
out: list[dict] = []
|
|
with open(cycle_path) as f:
|
|
for row in csv.DictReader(f):
|
|
out.append({
|
|
"ts": float(row["unix_ts"]),
|
|
"event": row["event"],
|
|
"cycle": int(row["cycle"]),
|
|
"iso": row["iso"],
|
|
})
|
|
return out
|
|
|
|
|
|
def nearest_event(unlock_ts: float, events: list[dict],
|
|
event_type: str | None = None) -> dict | None:
|
|
candidates = events if event_type is None else [e for e in events if e["event"] == event_type]
|
|
if not candidates:
|
|
return None
|
|
return min(candidates, key=lambda e: abs(e["ts"] - unlock_ts))
|
|
|
|
|
|
def main() -> None:
|
|
args = sys.argv[1:]
|
|
sn65_path = Path(args[0]) if len(args) >= 1 else find_latest(SN65_DIR, "*.json")
|
|
cycle_path = Path(args[1]) if len(args) >= 2 else find_latest(CYCLE_DIR, "*.csv")
|
|
|
|
if sn65_path is None or not sn65_path.exists():
|
|
print(f"No sn65 log found.")
|
|
sys.exit(1)
|
|
|
|
print(f"sn65 log: {sn65_path}")
|
|
print(f"cycle log: {cycle_path if cycle_path else '(none — START/STOP correlation unavailable)'}")
|
|
|
|
# === Pulse-width analysis ===
|
|
unlocks = load_unlocks(sn65_path)
|
|
print(f"\n=== PLL unlocks ===")
|
|
print(f" total: {len(unlocks)}")
|
|
if not unlocks:
|
|
print(" no unlocks — nothing more to analyse")
|
|
return
|
|
durs = sorted(u["duration_ms"] for u in unlocks)
|
|
n = len(durs)
|
|
print(f" pulse-width: min={durs[0]:.1f} ms med={durs[n//2]:.1f} ms"
|
|
f" p90={durs[min(n*9//10, n-1)]:.1f} ms max={durs[-1]:.1f} ms")
|
|
print(f" per-event:")
|
|
for u in unlocks:
|
|
print(f" {u['iso']} duration {u['duration_ms']:6.1f} ms")
|
|
|
|
# === Correlation against cycle events ===
|
|
if cycle_path is None or not cycle_path.exists():
|
|
return
|
|
|
|
events = load_cycle_events(cycle_path)
|
|
n_start = sum(1 for e in events if e["event"] == "start")
|
|
n_stop = sum(1 for e in events if e["event"] == "stop")
|
|
print(f"\n=== Cycle events ===")
|
|
print(f" {len(events)} events total ({n_start} start, {n_stop} stop)")
|
|
if events:
|
|
print(f" first: {events[0]['iso']} last: {events[-1]['iso']}")
|
|
|
|
# For each unlock find offsets to nearest start and nearest stop
|
|
print(f"\n=== Unlock vs. nearest cycle event ===")
|
|
print(f" {'unlock iso':<25} {'dur_ms':>7} "
|
|
f"{'Δ_to_START':>12} {'Δ_to_STOP':>12} verdict")
|
|
near_start = [] # offsets (s) from nearest START to each unlock
|
|
near_stop = []
|
|
starts = [e for e in events if e["event"] == "start"]
|
|
stops = [e for e in events if e["event"] == "stop"]
|
|
classifications = {"start": 0, "stop": 0, "neither": 0}
|
|
for u in unlocks:
|
|
ns = nearest_event(u["start_ts"], starts)
|
|
nt = nearest_event(u["start_ts"], stops)
|
|
d_start = (u["start_ts"] - ns["ts"]) if ns else None
|
|
d_stop = (u["start_ts"] - nt["ts"]) if nt else None
|
|
|
|
# Classify: "after start" if 0..3s after a start, "after stop" if 0..3s after a stop.
|
|
verdict = "neither"
|
|
if d_start is not None and 0.0 <= d_start <= 3.0 \
|
|
and (d_stop is None or d_stop < 0 or d_stop > d_start):
|
|
verdict = "after_START"
|
|
near_start.append(d_start)
|
|
classifications["start"] += 1
|
|
elif d_stop is not None and 0.0 <= d_stop <= 3.0:
|
|
verdict = "after_STOP"
|
|
near_stop.append(d_stop)
|
|
classifications["stop"] += 1
|
|
else:
|
|
classifications["neither"] += 1
|
|
|
|
def fmt(v):
|
|
if v is None: return " —"
|
|
return f"{v:+8.2f}s"
|
|
print(f" {u['iso']:<25} {u['duration_ms']:>7.1f} "
|
|
f"{fmt(d_start):>12} {fmt(d_stop):>12} {verdict}")
|
|
|
|
total = sum(classifications.values())
|
|
print(f"\n=== Verdict ===")
|
|
if total:
|
|
print(f" after START : {classifications['start']:>3} / {total} "
|
|
f"({classifications['start']/total*100:.0f}%)")
|
|
print(f" after STOP : {classifications['stop']:>3} / {total} "
|
|
f"({classifications['stop']/total*100:.0f}%)")
|
|
print(f" neither : {classifications['neither']:>3} / {total} "
|
|
f"({classifications['neither']/total*100:.0f}%)")
|
|
|
|
if near_start:
|
|
print(f"\n offsets after START (n={len(near_start)}): "
|
|
f"med={statistics.median(near_start)*1000:.0f} ms, "
|
|
f"min={min(near_start)*1000:.0f} ms, max={max(near_start)*1000:.0f} ms")
|
|
if near_stop:
|
|
print(f" offsets after STOP (n={len(near_stop)}): "
|
|
f"med={statistics.median(near_stop)*1000:.0f} ms, "
|
|
f"min={min(near_stop)*1000:.0f} ms, max={max(near_stop)*1000:.0f} ms")
|
|
|
|
# Per-cycle unlock rate
|
|
if events:
|
|
cycle_window_s = events[-1]["ts"] - events[0]["ts"]
|
|
n_cycles = max(1, max(e["cycle"] for e in events))
|
|
print(f"\n cycles seen: {n_cycles} total span: {cycle_window_s:.1f}s")
|
|
print(f" unlocks per cycle: {len(unlocks)/n_cycles:.2f}")
|
|
print(f" unlocks per minute: {len(unlocks)/cycle_window_s*60:.2f}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |