#!/usr/bin/env python3 """ video_cycler.py — Toggle /video start/stop on the device. Pairs with sn65_monitor.py: this script provokes flicker by cycling the static-pink video stream, while sn65_monitor measures. All start/stop events are timestamp-logged to data/cycle_logs/{ts}.csv so we can later cross-reference PLL unlocks against the precise transition moments. Modes: python3 video_cycler.py # 10 s on / 0.5 s off, forever python3 video_cycler.py --on-s 5 --off-s 2 # 5 s on, 2 s off python3 video_cycler.py --cycles 30 # stop after 30 cycles python3 video_cycler.py --hold # one start, hold forever Press Ctrl+C to stop (always sends a final video stop). """ import argparse import csv import signal import sys import time from datetime import datetime from pathlib import Path import requests DEVICE_BASE = "http://192.168.45.8:5000" VIDEO_URL = f"{DEVICE_BASE}/video" HTTP_TIMEOUT_S = 3.0 LOG_DIR = Path(__file__).parent / "data" / "cycle_logs" _log_writer = None _log_file = None def _open_log() -> Path: """Open a fresh cycle-event CSV in LOG_DIR; return its path.""" global _log_writer, _log_file LOG_DIR.mkdir(parents=True, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") path = LOG_DIR / f"{ts}_cycles.csv" _log_file = open(path, "w", newline="") _log_writer = csv.writer(_log_file) _log_writer.writerow(["iso", "unix_ts", "event", "cycle"]) _log_file.flush() return path def _log_event(event: str, cycle: int) -> None: t = time.time() iso = datetime.fromtimestamp(t).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] if _log_writer is not None: _log_writer.writerow([iso, f"{t:.6f}", event, cycle]) _log_file.flush() def video_start(cycle: int = 0) -> None: _log_event("start", cycle) try: requests.put(VIDEO_URL, json={"action": "start", "mode": "static-pink"}, timeout=HTTP_TIMEOUT_S) except requests.exceptions.RequestException as e: print(f" video START failed: {e}") def video_stop(cycle: int = 0) -> None: _log_event("stop", cycle) try: requests.put(VIDEO_URL, json={"action": "stop"}, timeout=HTTP_TIMEOUT_S) except requests.exceptions.RequestException as e: print(f" video STOP failed: {e}") def main() -> None: ap = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) ap.add_argument("--on-s", type=float, default=10.0, help="seconds video is ON per cycle (default 10)") ap.add_argument("--off-s", type=float, default=0.5, help="seconds video is OFF per cycle (default 0.5)") ap.add_argument("--cycles", type=int, default=0, help="stop after this many cycles (0 = forever, default)") ap.add_argument("--hold", action="store_true", help="Send a single video START and hold (no cycling). " "Use as a baseline: do unlocks still happen when " "we don't toggle on/off?") args = ap.parse_args() log_path = _open_log() print(f" event log → {log_path.relative_to(LOG_DIR.parent.parent)}") def _shutdown(*_): print("\nshutting down — video off") video_stop(cycle=-1) if _log_file: _log_file.close() sys.exit(0) signal.signal(signal.SIGINT, _shutdown) signal.signal(signal.SIGTERM, _shutdown) if args.hold: ts = datetime.now().strftime("%H:%M:%S") print(f"VIDEO HOLD — video ON, no cycling (Ctrl+C to stop)") print(f"[{ts}] video START\n", flush=True) video_start(cycle=0) elapsed = 0 while True: time.sleep(30.0) elapsed += 30 ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] still holding — {elapsed}s elapsed", flush=True) on_s, off_s = args.on_s, args.off_s print(f"VIDEO CYCLER — {on_s:.1f}s on / {off_s:.1f}s off" + (f", {args.cycles} cycles" if args.cycles else ", forever") + " (Ctrl+C to stop)\n") cycle = 0 while True: cycle += 1 ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] cycle {cycle:04d} START", flush=True) video_start(cycle=cycle) time.sleep(on_s) ts = datetime.now().strftime("%H:%M:%S") print(f"[{ts}] cycle {cycle:04d} STOP", flush=True) video_stop(cycle=cycle) if args.cycles and cycle >= args.cycles: print(f"\nReached {args.cycles} cycles, exiting.") if _log_file: _log_file.close() return time.sleep(off_s) if __name__ == "__main__": main()