Files
MiPi_TEST/analyze_session.py
david rice d73aa2f2a4 Changes
2026-05-11 16:14:19 +01:00

183 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
analyze_session.py — Cross-reference sn65_monitor unlocks against
video_cycler events to determine whether unlocks are caused by video
START transitions, STOP transitions, or both.
Usage:
python3 analyze_session.py # use latest sn65 + latest cycle log
python3 analyze_session.py <sn65.json> # specify sn65 log
python3 analyze_session.py <sn65.json> <cycle.csv>
"""
from __future__ import annotations
import csv
import json
import statistics
import sys
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
SN65_DIR = DATA_DIR / "sn65_log"
CYCLE_DIR = DATA_DIR / "cycle_logs"
def find_latest(dir_path: Path, glob: str) -> Path | None:
files = sorted(dir_path.glob(glob))
return files[-1] if files else None
def load_unlocks(sn65_path: Path) -> list[dict]:
"""Pair pll_lock False→True transitions and return [{start_ts, end_ts, duration_ms}, ...]."""
data = json.loads(sn65_path.read_text())
changes = sorted(data.get("session_changes", []), key=lambda c: c["ts"])
pll = [c for c in changes if "pll_lock" in c.get("delta", {})]
unlocks: list[dict] = []
i = 0
while i < len(pll):
e = pll[i]
_, new = e["delta"]["pll_lock"]
if new is False:
for j in range(i + 1, min(i + 20, len(pll))):
_, n_new = pll[j]["delta"]["pll_lock"]
if n_new is True:
unlocks.append({
"start_ts": e["ts"],
"end_ts": pll[j]["ts"],
"duration_ms": (pll[j]["ts"] - e["ts"]) * 1000,
"iso": e["iso"],
})
i = j
break
i += 1
return unlocks
def load_cycle_events(cycle_path: Path) -> list[dict]:
"""Load timestamped video start/stop events from a cycle log CSV."""
out: list[dict] = []
with open(cycle_path) as f:
for row in csv.DictReader(f):
out.append({
"ts": float(row["unix_ts"]),
"event": row["event"],
"cycle": int(row["cycle"]),
"iso": row["iso"],
})
return out
def nearest_event(unlock_ts: float, events: list[dict],
event_type: str | None = None) -> dict | None:
candidates = events if event_type is None else [e for e in events if e["event"] == event_type]
if not candidates:
return None
return min(candidates, key=lambda e: abs(e["ts"] - unlock_ts))
def main() -> None:
args = sys.argv[1:]
sn65_path = Path(args[0]) if len(args) >= 1 else find_latest(SN65_DIR, "*.json")
cycle_path = Path(args[1]) if len(args) >= 2 else find_latest(CYCLE_DIR, "*.csv")
if sn65_path is None or not sn65_path.exists():
print(f"No sn65 log found.")
sys.exit(1)
print(f"sn65 log: {sn65_path}")
print(f"cycle log: {cycle_path if cycle_path else '(none — START/STOP correlation unavailable)'}")
# === Pulse-width analysis ===
unlocks = load_unlocks(sn65_path)
print(f"\n=== PLL unlocks ===")
print(f" total: {len(unlocks)}")
if not unlocks:
print(" no unlocks — nothing more to analyse")
return
durs = sorted(u["duration_ms"] for u in unlocks)
n = len(durs)
print(f" pulse-width: min={durs[0]:.1f} ms med={durs[n//2]:.1f} ms"
f" p90={durs[min(n*9//10, n-1)]:.1f} ms max={durs[-1]:.1f} ms")
print(f" per-event:")
for u in unlocks:
print(f" {u['iso']} duration {u['duration_ms']:6.1f} ms")
# === Correlation against cycle events ===
if cycle_path is None or not cycle_path.exists():
return
events = load_cycle_events(cycle_path)
n_start = sum(1 for e in events if e["event"] == "start")
n_stop = sum(1 for e in events if e["event"] == "stop")
print(f"\n=== Cycle events ===")
print(f" {len(events)} events total ({n_start} start, {n_stop} stop)")
if events:
print(f" first: {events[0]['iso']} last: {events[-1]['iso']}")
# For each unlock find offsets to nearest start and nearest stop
print(f"\n=== Unlock vs. nearest cycle event ===")
print(f" {'unlock iso':<25} {'dur_ms':>7} "
f"{'Δ_to_START':>12} {'Δ_to_STOP':>12} verdict")
near_start = [] # offsets (s) from nearest START to each unlock
near_stop = []
starts = [e for e in events if e["event"] == "start"]
stops = [e for e in events if e["event"] == "stop"]
classifications = {"start": 0, "stop": 0, "neither": 0}
for u in unlocks:
ns = nearest_event(u["start_ts"], starts)
nt = nearest_event(u["start_ts"], stops)
d_start = (u["start_ts"] - ns["ts"]) if ns else None
d_stop = (u["start_ts"] - nt["ts"]) if nt else None
# Classify: "after start" if 0..3s after a start, "after stop" if 0..3s after a stop.
verdict = "neither"
if d_start is not None and 0.0 <= d_start <= 3.0 \
and (d_stop is None or d_stop < 0 or d_stop > d_start):
verdict = "after_START"
near_start.append(d_start)
classifications["start"] += 1
elif d_stop is not None and 0.0 <= d_stop <= 3.0:
verdict = "after_STOP"
near_stop.append(d_stop)
classifications["stop"] += 1
else:
classifications["neither"] += 1
def fmt(v):
if v is None: return ""
return f"{v:+8.2f}s"
print(f" {u['iso']:<25} {u['duration_ms']:>7.1f} "
f"{fmt(d_start):>12} {fmt(d_stop):>12} {verdict}")
total = sum(classifications.values())
print(f"\n=== Verdict ===")
if total:
print(f" after START : {classifications['start']:>3} / {total} "
f"({classifications['start']/total*100:.0f}%)")
print(f" after STOP : {classifications['stop']:>3} / {total} "
f"({classifications['stop']/total*100:.0f}%)")
print(f" neither : {classifications['neither']:>3} / {total} "
f"({classifications['neither']/total*100:.0f}%)")
if near_start:
print(f"\n offsets after START (n={len(near_start)}): "
f"med={statistics.median(near_start)*1000:.0f} ms, "
f"min={min(near_start)*1000:.0f} ms, max={max(near_start)*1000:.0f} ms")
if near_stop:
print(f" offsets after STOP (n={len(near_stop)}): "
f"med={statistics.median(near_stop)*1000:.0f} ms, "
f"min={min(near_stop)*1000:.0f} ms, max={max(near_stop)*1000:.0f} ms")
# Per-cycle unlock rate
if events:
cycle_window_s = events[-1]["ts"] - events[0]["ts"]
n_cycles = max(1, max(e["cycle"] for e in events))
print(f"\n cycles seen: {n_cycles} total span: {cycle_window_s:.1f}s")
print(f" unlocks per cycle: {len(unlocks)/n_cycles:.2f}")
print(f" unlocks per minute: {len(unlocks)/cycle_window_s*60:.2f}")
if __name__ == "__main__":
main()