143 lines
4.8 KiB
Python
143 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
video_cycler.py — Toggle /video start/stop on the device.
|
|
|
|
Pairs with sn65_monitor.py: this script provokes flicker by cycling the
|
|
static-pink video stream, while sn65_monitor measures. All start/stop
|
|
events are timestamp-logged to data/cycle_logs/{ts}.csv so we can later
|
|
cross-reference PLL unlocks against the precise transition moments.
|
|
|
|
Modes:
|
|
python3 video_cycler.py # 10 s on / 0.5 s off, forever
|
|
python3 video_cycler.py --on-s 5 --off-s 2 # 5 s on, 2 s off
|
|
python3 video_cycler.py --cycles 30 # stop after 30 cycles
|
|
python3 video_cycler.py --hold # one start, hold forever
|
|
|
|
Press Ctrl+C to stop (always sends a final video stop).
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import signal
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
DEVICE_BASE = "http://192.168.45.8:5000"
|
|
VIDEO_URL = f"{DEVICE_BASE}/video"
|
|
HTTP_TIMEOUT_S = 3.0
|
|
LOG_DIR = Path(__file__).parent / "data" / "cycle_logs"
|
|
|
|
|
|
_log_writer = None
|
|
_log_file = None
|
|
|
|
|
|
def _open_log() -> Path:
|
|
"""Open a fresh cycle-event CSV in LOG_DIR; return its path."""
|
|
global _log_writer, _log_file
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
path = LOG_DIR / f"{ts}_cycles.csv"
|
|
_log_file = open(path, "w", newline="")
|
|
_log_writer = csv.writer(_log_file)
|
|
_log_writer.writerow(["iso", "unix_ts", "event", "cycle"])
|
|
_log_file.flush()
|
|
return path
|
|
|
|
|
|
def _log_event(event: str, cycle: int) -> None:
|
|
t = time.time()
|
|
iso = datetime.fromtimestamp(t).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
|
|
if _log_writer is not None:
|
|
_log_writer.writerow([iso, f"{t:.6f}", event, cycle])
|
|
_log_file.flush()
|
|
|
|
|
|
def video_start(cycle: int = 0) -> None:
|
|
_log_event("start", cycle)
|
|
try:
|
|
requests.put(VIDEO_URL,
|
|
json={"action": "start", "mode": "static-pink"},
|
|
timeout=HTTP_TIMEOUT_S)
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" video START failed: {e}")
|
|
|
|
|
|
def video_stop(cycle: int = 0) -> None:
|
|
_log_event("stop", cycle)
|
|
try:
|
|
requests.put(VIDEO_URL, json={"action": "stop"},
|
|
timeout=HTTP_TIMEOUT_S)
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" video STOP failed: {e}")
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(
|
|
description=__doc__,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
ap.add_argument("--on-s", type=float, default=10.0,
|
|
help="seconds video is ON per cycle (default 10)")
|
|
ap.add_argument("--off-s", type=float, default=0.5,
|
|
help="seconds video is OFF per cycle (default 0.5)")
|
|
ap.add_argument("--cycles", type=int, default=0,
|
|
help="stop after this many cycles (0 = forever, default)")
|
|
ap.add_argument("--hold", action="store_true",
|
|
help="Send a single video START and hold (no cycling). "
|
|
"Use as a baseline: do unlocks still happen when "
|
|
"we don't toggle on/off?")
|
|
args = ap.parse_args()
|
|
|
|
log_path = _open_log()
|
|
print(f" event log → {log_path.relative_to(LOG_DIR.parent.parent)}")
|
|
|
|
def _shutdown(*_):
|
|
print("\nshutting down — video off")
|
|
video_stop(cycle=-1)
|
|
if _log_file:
|
|
_log_file.close()
|
|
sys.exit(0)
|
|
signal.signal(signal.SIGINT, _shutdown)
|
|
signal.signal(signal.SIGTERM, _shutdown)
|
|
|
|
if args.hold:
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f"VIDEO HOLD — video ON, no cycling (Ctrl+C to stop)")
|
|
print(f"[{ts}] video START\n", flush=True)
|
|
video_start(cycle=0)
|
|
elapsed = 0
|
|
while True:
|
|
time.sleep(30.0)
|
|
elapsed += 30
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f"[{ts}] still holding — {elapsed}s elapsed", flush=True)
|
|
|
|
on_s, off_s = args.on_s, args.off_s
|
|
print(f"VIDEO CYCLER — {on_s:.1f}s on / {off_s:.1f}s off"
|
|
+ (f", {args.cycles} cycles" if args.cycles else ", forever")
|
|
+ " (Ctrl+C to stop)\n")
|
|
cycle = 0
|
|
while True:
|
|
cycle += 1
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f"[{ts}] cycle {cycle:04d} START", flush=True)
|
|
video_start(cycle=cycle)
|
|
time.sleep(on_s)
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
print(f"[{ts}] cycle {cycle:04d} STOP", flush=True)
|
|
video_stop(cycle=cycle)
|
|
if args.cycles and cycle >= args.cycles:
|
|
print(f"\nReached {args.cycles} cycles, exiting.")
|
|
if _log_file:
|
|
_log_file.close()
|
|
return
|
|
time.sleep(off_s)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |