Files
MiPi_TEST/flicker_burst.py
2026-05-15 16:32:15 +01:00

626 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
flicker_burst.py — Press `f` when you observe flicker. The script then:
1. Arms Keysight DSO80204B for a large segmented MIPI capture (LP_DAT
trigger fires at line rate, ~48 kHz, so segments fill in ms).
2. Polls SN65 /sn65_registers continuously at ~50 Hz, recording every
PLL state transition.
3. Tails video_cycler.py's CSV log and stops capturing the moment
the next video stop/start transition is observed (i.e. the end of
the current video-on window).
4. Reads out all Keysight segments and saves everything to a
per-burst folder for offline signal-integrity / protocol analysis.
Run alongside video_cycler.py in another terminal:
Terminal A: python3 video_cycler.py # provokes flicker
Terminal B: python3 flicker_burst.py # this script
(press `f` when you see flicker; `q` to quit)
Output:
data/flicker_bursts/{session_ts}/
burst_NNNN_{ts}_pll_samples.json
burst_NNNN_{ts}_mipi_seg001_clk.csv ... segNNN_dat.csv
burst_NNNN_{ts}_meta.json
summary.csv
"""
from __future__ import annotations
import argparse
import csv
import json
import select
import signal
import sys
import termios
import time
import tty
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
KEYSIGHT_IP = "192.168.45.4"
RIGOL_IP = "192.168.45.5"
DATA_ROOT = Path(__file__).parent / "data" / "flicker_bursts"
CYCLE_LOG_DIR = Path(__file__).parent / "data" / "cycle_logs"
POLL_DT_S = 0.020 # 50 Hz SN65 polling
HTTP_TO_S = 0.2
KEYSIGHT_TO_S = 60.0 # large reads can take a while
RIGOL_TO_S = 10.0
# Rigol CH1 (1V8 supply rail) — wide enough to bracket the whole burst window
RIGOL_V_SCALE = 0.1 # V/div
RIGOL_V_OFFSET = -1.8 # V (puts 1.8 V at screen centre)
RIGOL_TIMEBASE = 1.0 # s/div → 12 s window
RIGOL_PROBE = 10
# Keysight LP_DAT segmented capture — large segment count. Segments fill in
# ms (line rate ≈ 48 kHz × N segs), but readout is the slow part: each
# segment is one SCPI round-trip per channel. 500 segs ≈ ~30 s readout.
KS_LP_SCALE = 1e-6
KS_LP_POINTS = 50_000
KS_LP_TRIG_OFFSET = 9e-6
KS_LP_V_SCALE = 0.2
KS_LP_V_OFFSET = 0.6
KS_LP_TRIG_LEVEL = 0.6
KS_SEGMENT_COUNT = 100 # readout ~6 s (was 500 → ~30 s)
KS_PROBE = 19.2
# Safety: cap any single capture at this long, in case video_cycler isn't
# running or its log isn't updating.
MAX_CAPTURE_S = 20.0
ERROR_BITS = ("pll_unlock", "cha_sot_bit_err", "cha_llp_err",
"cha_ecc_err", "cha_lp_err", "cha_crc_err")
# ---------------------------------------------------------------------------
# Non-blocking keys
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# CSV-log tail for video_cycler
# ---------------------------------------------------------------------------
class CyclerLogTail:
"""
Watch video_cycler.py's most-recent CSV log for new events.
Uses stat-based size tracking and fresh opens on every check so we're
immune to any TextIOWrapper buffering quirks across processes.
"""
def __init__(self):
self.path: Path | None = None
self.pos: int = 0 # byte offset we've read up to
self._find_latest(initial=True)
def _find_latest(self, initial: bool = False) -> bool:
logs = sorted(CYCLE_LOG_DIR.glob("*_cycles.csv")) if CYCLE_LOG_DIR.exists() else []
if not logs:
return False
latest = logs[-1]
if self.path != latest:
self.path = latest
try:
# Skip past whatever was already in the file at startup —
# we only want NEW events. Subsequent rolls keep pos=0.
self.pos = self.path.stat().st_size if initial else 0
except FileNotFoundError:
self.pos = 0
return True
def get_next_event(self, timeout_s: float) -> dict | None:
"""
Wait up to timeout_s for the next start/stop event.
Returns {'iso','ts','event','cycle'} or None.
"""
self._find_latest()
if not self.path:
return None
deadline = time.time() + timeout_s
first = True
while first or time.time() < deadline:
first = False
try:
size = self.path.stat().st_size
except FileNotFoundError:
self._find_latest()
if timeout_s <= 0:
return None
time.sleep(0.05)
continue
if size > self.pos:
try:
with open(self.path, "r") as f:
f.seek(self.pos)
line = f.readline()
self.pos = f.tell()
except Exception:
line = ""
if line:
parts = [p.strip() for p in line.strip().split(",")]
if len(parts) >= 4 and parts[0] != "iso":
try:
return {"iso": parts[0], "ts": float(parts[1]),
"event": parts[2], "cycle": int(parts[3])}
except Exception:
pass
# Whitespace/comment line — keep looping
continue
if timeout_s <= 0:
return None
self._find_latest()
time.sleep(0.05)
return None
# ---------------------------------------------------------------------------
# SN65 extraction
# ---------------------------------------------------------------------------
def extract_state(data: dict | None) -> dict:
regs = (data or {}).get("registers", {}) or {}
csr_0a = regs.get("csr_0a") or {}
csr_e5 = regs.get("csr_e5") or {}
out = {
"csr_0a": csr_0a.get("value"),
"csr_e5": csr_e5.get("value"),
"pll_lock": csr_0a.get("pll_lock"),
"clk_det": csr_0a.get("clk_det"),
}
for k in ERROR_BITS:
out[k] = csr_e5.get(k)
return out
# ---------------------------------------------------------------------------
# Rigol I/O (1V8 supply rail capture)
# ---------------------------------------------------------------------------
def setup_rigol(rigol) -> None:
rigol.write(":STOP"); time.sleep(0.2)
rigol.write(":CHANnel1:DISPlay 1")
rigol.write(":CHANnel1:COUPling DC")
rigol.write(f":CHANnel1:PROBe {RIGOL_PROBE}")
rigol.write(f":CHANnel1:SCALe {RIGOL_V_SCALE:.3f}")
rigol.write(f":CHANnel1:OFFSet {RIGOL_V_OFFSET:.3f}")
rigol.write(":CHANnel2:DISPlay 0")
rigol.write(f":TIMebase:MAIN:SCALe {RIGOL_TIMEBASE:.3E}")
rigol.write(":TRIGger:MODE EDGE")
rigol.write(":TRIGger:EDGe:SOURce CHANnel1")
rigol.write(":TRIGger:EDGe:SLOPe NEGative")
rigol.write(":TRIGger:EDGe:LEVel 1.76")
rigol.write(":TRIGger:SWEep AUTO")
rigol.write(":ACQuire:MDEPth AUTO")
time.sleep(0.3); rigol.write(":RUN"); time.sleep(0.2)
def capture_rail(rigol, out_path: Path) -> tuple[float, float]:
rigol.write(":STOP"); time.sleep(0.1)
rigol.write(":WAVeform:SOURce CHANnel1")
rigol.write(":WAVeform:FORMat ASC")
rigol.write(":WAVeform:MODE NORM")
time.sleep(0.05)
pre = rigol.ask(":WAVeform:PREamble?").strip().split(",")
xinc = float(pre[4]); xorig = float(pre[5])
raw = rigol.ask(":WAVeform:DATA?").strip()
if raw.startswith("#"):
ndig = int(raw[1])
raw = raw[2 + ndig:]
vals = [float(v) for v in raw.split(",") if v.strip()]
if not vals:
rigol.write(":RUN")
raise RuntimeError("Rigol returned no samples")
volts = np.asarray(vals, dtype=np.float64)
t = np.arange(len(volts)) * xinc + xorig
np.savetxt(out_path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
rigol.write(":RUN")
return float((volts.max() - volts.min()) * 1000), float(volts.mean())
# ---------------------------------------------------------------------------
# Keysight I/O
# ---------------------------------------------------------------------------
def _ks_drain(scope):
for _ in range(20):
try:
r = scope.ask(":SYSTem:ERRor?").strip()
except Exception:
return
if not r or r.startswith(("0,", "+0,")) or r == "0":
return
def setup_keysight(scope) -> None:
for c in [
"*RST", ":RUN", ":STOP", "*CLS",
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50",
f":CHANnel1:PROBe {KS_PROBE}", ":CHANnel1:LABel 'CLK+'",
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50",
f":CHANnel2:PROBe {KS_PROBE}", ":CHANnel2:LABel 'CLK-'",
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50",
f":CHANnel3:PROBe {KS_PROBE}", ":CHANnel3:LABel 'DAT0+'",
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50",
f":CHANnel4:PROBe {KS_PROBE}", ":CHANnel4:LABel 'DAT0-'",
":TIMebase:REFerence CENTer",
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
]:
scope.write(c); time.sleep(0.04)
_ks_drain(scope)
for ch in (1, 2, 3, 4):
scope.write(f":CHANnel{ch}:SCALe {KS_LP_V_SCALE:.3f}")
scope.write(f":CHANnel{ch}:OFFSet {KS_LP_V_OFFSET:.3f}")
scope.write(":TRIGger:MODE EDGE")
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
scope.write(":TRIGger:EDGE:SLOPe NEGative")
scope.write(f":TRIGger:EDGE:LEVel {KS_LP_TRIG_LEVEL:.3f}")
scope.write(":TRIGger:SWEep NORMal")
scope.write(f":TIMebase:SCALe {KS_LP_SCALE:.3E}")
scope.write(f":ACQuire:POINts {KS_LP_POINTS}")
scope.write(f":TIMebase:POSition {KS_LP_TRIG_OFFSET:.2E}")
scope.write(":ACQuire:MODE SEGMented")
scope.write(f":ACQuire:SEGMented:COUNt {KS_SEGMENT_COUNT}")
time.sleep(0.4)
_ks_drain(scope)
def _ks_read_block(scope) -> bytes:
head = scope.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = scope.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = scope.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = scope.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
scope.read_raw(1)
except Exception:
pass
return data
def keysight_arm(scope) -> None:
"""Send :DIGitize. Acquisition runs in scope memory."""
scope.write(":DIGitize")
def keysight_read_segments(scope, n_segments: int, out_dir: Path,
base: str) -> int:
"""Read N segments for both channels, save per-segment CSVs."""
n_written = 0
for chan_id, label in [(1, "clk"), (3, "dat")]:
scope.write(f":WAVeform:SOURce CHANnel{chan_id}")
scope.write(":WAVeform:FORMat WORD")
scope.write(":WAVeform:BYTeorder LSBFirst")
x_inc = float(scope.ask(":WAVeform:XINCrement?"))
x_org = float(scope.ask(":WAVeform:XORigin?"))
y_inc = float(scope.ask(":WAVeform:YINCrement?"))
y_org = float(scope.ask(":WAVeform:YORigin?"))
for i in range(1, n_segments + 1):
scope.write(f":ACQuire:SEGMented:INDex {i}")
scope.write(":WAVeform:DATA?")
raw = _ks_read_block(scope)
codes = np.frombuffer(raw, dtype="<i2")
volts = codes.astype(np.float64) * y_inc + y_org
t = np.arange(len(volts)) * x_inc + x_org
path = out_dir / f"{base}_seg{i:03d}_{label}.csv"
np.savetxt(path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
if label == "clk":
n_written += 1
return n_written
# ---------------------------------------------------------------------------
# Capture-and-poll cycle
# ---------------------------------------------------------------------------
def capture_burst(sess, scope, rigol, cycler_tail: CyclerLogTail,
burst_n: int, session_dir: Path,
summary_writer) -> None:
"""One full burst: arm scope → poll SN65 → wait for cycler event →
read MIPI segments → save everything."""
t_press = time.time()
iso_press = datetime.fromtimestamp(t_press).strftime("%H:%M:%S.%f")[:-3]
ts_press = datetime.fromtimestamp(t_press).strftime("%Y%m%d_%H%M%S_%f")[:-3]
base = f"burst_{burst_n:04d}_{ts_press}"
print(f"\n [{iso_press}] FLICKER #{burst_n} — capture started", flush=True)
# 1. Arm Keysight
if scope is not None:
try:
keysight_arm(scope)
except Exception as e:
print(f" Keysight arm FAILED: {e}", flush=True)
# 2. Poll SN65 in main thread while also tailing cycler log
samples: list = []
unlocks: list = []
last_pll = None
end_event = None
deadline = t_press + MAX_CAPTURE_S
next_log_check = 0.0 # only check log every ~50 ms to keep poll rate high
while time.time() < deadline:
t0 = time.time()
try:
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
state = extract_state(r.json())
samples.append({"ts": t0, "state": state})
pll = state["pll_lock"]
if last_pll is True and pll is False:
unlocks.append({"ts": t0,
"iso": datetime.fromtimestamp(t0)
.strftime("%H:%M:%S.%f")[:-3]})
if pll is not None:
last_pll = pll
except Exception as e:
samples.append({"ts": t0, "error": str(e)})
# Cheap check (non-blocking) of cycler log
if t0 >= next_log_check:
ev = cycler_tail.get_next_event(timeout_s=0.0)
if ev is not None and ev["ts"] > t_press:
end_event = ev
break
next_log_check = t0 + 0.05 # 20 Hz log check
# Pace SN65 polling
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
t_end = time.time()
end_iso = datetime.fromtimestamp(t_end).strftime("%H:%M:%S.%f")[:-3]
end_reason = ("cycler_event:" + end_event["event"]) if end_event else "timeout"
print(f" [{end_iso}] capture window ended ({end_reason}) — "
f"polled {len(samples)} samples in {t_end - t_press:.2f}s",
flush=True)
# 3a. Rigol 1V8 rail snapshot (fast — ~300 ms)
rail_vpp_mV = rail_mean_V = None
rail_path = None
if rigol is not None:
rail_path = session_dir / f"{base}_rail.csv"
try:
rail_vpp_mV, rail_mean_V = capture_rail(rigol, rail_path)
print(f" rail: Vpp={rail_vpp_mV:.1f}mV mean={rail_mean_V:.3f}V "
f"({RIGOL_TIMEBASE*12:.0f}s window)", flush=True)
except Exception as e:
print(f" rail capture FAILED: {e}", flush=True)
rail_path = None
# 3b. Read Keysight segments
n_segs = 0
if scope is not None:
try:
# Wait briefly for :DIGitize to complete (segments fill in ms at
# line rate, but allow margin)
prev = scope.timeout
try:
scope.timeout = 10
opc = scope.ask("*OPC?").strip()
except Exception:
opc = "0"
finally:
scope.timeout = prev
if opc != "1":
print(f" Keysight :DIGitize didn't complete (OPC={opc}) — "
f"attempting read anyway", flush=True)
print(f" reading {KS_SEGMENT_COUNT} segments ×2 ch — be patient",
flush=True)
t_read0 = time.time()
n_segs = keysight_read_segments(
scope, KS_SEGMENT_COUNT, session_dir, base + "_mipi")
print(f" MIPI: {n_segs} segments saved "
f"(readout took {time.time() - t_read0:.1f}s)", flush=True)
except Exception as e:
print(f" Keysight read FAILED: {e}", flush=True)
# 4. Pair unlocks with their recovery times
unlock_pairs = []
pll_evts = [s for s in samples
if "state" in s and s["state"].get("pll_lock") is not None]
for u in unlocks:
for s in pll_evts:
if s["ts"] > u["ts"] and s["state"]["pll_lock"] is True:
unlock_pairs.append({"start_ts": u["ts"], "start_iso": u["iso"],
"duration_ms": (s["ts"] - u["ts"]) * 1000})
break
# 5. Save samples + meta
samples_path = session_dir / f"{base}_pll_samples.json"
samples_path.write_text(json.dumps({
"burst": burst_n,
"t_press": t_press,
"press_iso": iso_press,
"t_end": t_end,
"end_iso": end_iso,
"end_reason": end_reason,
"end_event": end_event,
"duration_s": t_end - t_press,
"n_samples": len(samples),
"n_unlocks": len(unlock_pairs),
"unlock_pairs": unlock_pairs,
"samples": samples,
}, indent=2, default=str))
meta_path = session_dir / f"{base}_meta.json"
meta_path.write_text(json.dumps({
"burst": burst_n,
"t_press": t_press,
"press_iso": iso_press,
"t_end": t_end,
"end_iso": end_iso,
"end_reason": end_reason,
"duration_s": t_end - t_press,
"n_pll_samples": len(samples),
"n_unlocks": len(unlock_pairs),
"mipi_basename": f"{base}_mipi" if n_segs else None,
"n_mipi_segments": n_segs,
"ks_lp_scale_s": KS_LP_SCALE,
"ks_lp_points": KS_LP_POINTS,
"rail_csv": rail_path.name if rail_path else None,
"rail_vpp_mV": rail_vpp_mV,
"rail_mean_V": rail_mean_V,
"rail_window_s": RIGOL_TIMEBASE * 12,
}, indent=2, default=str))
summary_writer.writerow([burst_n, ts_press, iso_press, end_iso,
f"{t_end - t_press:.2f}", end_reason,
len(samples), len(unlock_pairs), n_segs,
f"{rail_vpp_mV:.1f}" if rail_vpp_mV is not None else "",
f"{rail_mean_V:.3f}" if rail_mean_V is not None else "",
base])
durs = sorted(p["duration_ms"] for p in unlock_pairs)
if durs:
n = len(durs)
print(f" unlocks during burst: {n} "
f"min={durs[0]:.1f}ms med={durs[n//2]:.1f}ms "
f"max={durs[-1]:.1f}ms", flush=True)
else:
print(f" unlocks during burst: 0", flush=True)
print(f" saved {base}_*", flush=True)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--no-keysight", action="store_true",
help="SN65 polling only (skip MIPI capture)")
ap.add_argument("--no-rigol", action="store_true",
help="skip Rigol 1V8 rail capture")
args = ap.parse_args()
session_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = DATA_ROOT / session_ts
session_dir.mkdir(parents=True, exist_ok=True)
print(f"FLICKER BURST CAPTURE — session {session_ts}")
print(f" output: {session_dir.relative_to(DATA_ROOT.parent.parent)}")
sess = requests.Session()
try:
sess.get(SN65_EP, timeout=2.0).raise_for_status()
print(f" SN65: reachable")
except Exception as e:
print(f" *** SN65 endpoint failed: {e} ***")
sys.exit(1)
rigol = None
if not args.no_rigol:
try:
rigol = vxi11.Instrument(RIGOL_IP)
rigol.timeout = RIGOL_TO_S
idn = rigol.ask("*IDN?").strip()
print(f" Rigol: {idn}")
setup_rigol(rigol)
print(f" CH1 1V8 rail, {RIGOL_V_SCALE*1000:.0f} mV/div, "
f"{RIGOL_TIMEBASE:.1f} s/div ({RIGOL_TIMEBASE*12:.0f}s window)")
except Exception as e:
print(f" Rigol failed ({e}) — continuing without rail capture")
rigol = None
else:
print(f" Rigol: disabled (--no-rigol)")
scope = None
if not args.no_keysight:
try:
scope = vxi11.Instrument(KEYSIGHT_IP)
scope.timeout = KEYSIGHT_TO_S
idn = scope.ask("*IDN?").strip()
print(f" Keysight: {idn}")
setup_keysight(scope)
print(f" LP_DAT segmented, {KS_SEGMENT_COUNT} segs/acq, "
f"{KS_LP_POINTS} pts × {KS_LP_SCALE*1e6:.0f} µs/div")
except Exception as e:
print(f" Keysight failed ({e}) — continuing without MIPI")
scope = None
else:
print(f" Keysight: disabled (--no-keysight)")
cycler_tail = CyclerLogTail()
if cycler_tail.path:
print(f" cycler log: {cycler_tail.path.name} (tailing for STOP events)")
else:
print(f" cycler log: NOT FOUND — capture will use {MAX_CAPTURE_S}s timeout per burst")
summary_path = session_dir / "summary.csv"
sf = open(summary_path, "w", newline="")
sw = csv.writer(sf)
sw.writerow(["burst", "ts", "iso_press", "iso_end", "duration_s",
"end_reason", "n_pll_samples", "n_unlocks",
"n_mipi_segs", "rail_vpp_mV", "rail_mean_V", "basename"])
sf.flush()
def _shutdown(*_):
try: sf.close()
except Exception: pass
print("\nshutting down")
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print("\n Press `f` when you see flicker. `q` to quit.")
print(" Each press triggers a capture window from now until video_cycler")
print(f" next stops the video (or {MAX_CAPTURE_S:.0f}s timeout if no cycler).\n")
burst_n = 0
with KeyReader() as keys:
while True:
key = keys.get_key()
if key == "q":
_shutdown()
elif key == "f":
burst_n += 1
capture_burst(sess, scope, rigol, cycler_tail,
burst_n, session_dir, sw)
sf.flush()
print(f"\n ready for next press...\n", flush=True)
else:
time.sleep(0.05)
if __name__ == "__main__":
main()