Files
MiPi_TEST/video_cycler.py
david rice d73aa2f2a4 Changes
2026-05-11 16:14:19 +01:00

143 lines
4.8 KiB
Python

#!/usr/bin/env python3
"""
video_cycler.py — Toggle /video start/stop on the device.
Pairs with sn65_monitor.py: this script provokes flicker by cycling the
static-pink video stream, while sn65_monitor measures. All start/stop
events are timestamp-logged to data/cycle_logs/{ts}.csv so we can later
cross-reference PLL unlocks against the precise transition moments.
Modes:
python3 video_cycler.py # 10 s on / 0.5 s off, forever
python3 video_cycler.py --on-s 5 --off-s 2 # 5 s on, 2 s off
python3 video_cycler.py --cycles 30 # stop after 30 cycles
python3 video_cycler.py --hold # one start, hold forever
Press Ctrl+C to stop (always sends a final video stop).
"""
import argparse
import csv
import signal
import sys
import time
from datetime import datetime
from pathlib import Path
import requests
DEVICE_BASE = "http://192.168.45.8:5000"
VIDEO_URL = f"{DEVICE_BASE}/video"
HTTP_TIMEOUT_S = 3.0
LOG_DIR = Path(__file__).parent / "data" / "cycle_logs"
_log_writer = None
_log_file = None
def _open_log() -> Path:
"""Open a fresh cycle-event CSV in LOG_DIR; return its path."""
global _log_writer, _log_file
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = LOG_DIR / f"{ts}_cycles.csv"
_log_file = open(path, "w", newline="")
_log_writer = csv.writer(_log_file)
_log_writer.writerow(["iso", "unix_ts", "event", "cycle"])
_log_file.flush()
return path
def _log_event(event: str, cycle: int) -> None:
t = time.time()
iso = datetime.fromtimestamp(t).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
if _log_writer is not None:
_log_writer.writerow([iso, f"{t:.6f}", event, cycle])
_log_file.flush()
def video_start(cycle: int = 0) -> None:
_log_event("start", cycle)
try:
requests.put(VIDEO_URL,
json={"action": "start", "mode": "static-pink"},
timeout=HTTP_TIMEOUT_S)
except requests.exceptions.RequestException as e:
print(f" video START failed: {e}")
def video_stop(cycle: int = 0) -> None:
_log_event("stop", cycle)
try:
requests.put(VIDEO_URL, json={"action": "stop"},
timeout=HTTP_TIMEOUT_S)
except requests.exceptions.RequestException as e:
print(f" video STOP failed: {e}")
def main() -> None:
ap = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
ap.add_argument("--on-s", type=float, default=10.0,
help="seconds video is ON per cycle (default 10)")
ap.add_argument("--off-s", type=float, default=0.5,
help="seconds video is OFF per cycle (default 0.5)")
ap.add_argument("--cycles", type=int, default=0,
help="stop after this many cycles (0 = forever, default)")
ap.add_argument("--hold", action="store_true",
help="Send a single video START and hold (no cycling). "
"Use as a baseline: do unlocks still happen when "
"we don't toggle on/off?")
args = ap.parse_args()
log_path = _open_log()
print(f" event log → {log_path.relative_to(LOG_DIR.parent.parent)}")
def _shutdown(*_):
print("\nshutting down — video off")
video_stop(cycle=-1)
if _log_file:
_log_file.close()
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
if args.hold:
ts = datetime.now().strftime("%H:%M:%S")
print(f"VIDEO HOLD — video ON, no cycling (Ctrl+C to stop)")
print(f"[{ts}] video START\n", flush=True)
video_start(cycle=0)
elapsed = 0
while True:
time.sleep(30.0)
elapsed += 30
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] still holding — {elapsed}s elapsed", flush=True)
on_s, off_s = args.on_s, args.off_s
print(f"VIDEO CYCLER — {on_s:.1f}s on / {off_s:.1f}s off"
+ (f", {args.cycles} cycles" if args.cycles else ", forever")
+ " (Ctrl+C to stop)\n")
cycle = 0
while True:
cycle += 1
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] cycle {cycle:04d} START", flush=True)
video_start(cycle=cycle)
time.sleep(on_s)
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] cycle {cycle:04d} STOP", flush=True)
video_stop(cycle=cycle)
if args.cycles and cycle >= args.cycles:
print(f"\nReached {args.cycles} cycles, exiting.")
if _log_file:
_log_file.close()
return
time.sleep(off_s)
if __name__ == "__main__":
main()