Files
MiPi_TEST/rail_watch.py
2026-05-15 16:32:15 +01:00

258 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
rail_watch.py — Capture Rigol DS1202Z-E CH1 (1V8 supply rail) every time the
SN65DSI83 reports a MIPI PLL unlock.
Architecture
------------
- Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions.
- On each unlock, :STOPs the Rigol, reads CH1 waveform via :WAV:DATA?, saves
to CSV in data/rail_traces/, prints peak-to-peak ripple, then :RUNs again.
- Press `g` to capture a baseline (clean) trace. Press `q` to quit.
Rigol setup (do once on the front panel before running):
* Channel 1 probed on the 1V8 rail derived to the MIPI PHY
* DC coupling with offset, or AC coupling for ripple-only view
* Recommended: 20 mV/div, 510 ms/div (60120 ms window)
* Trigger: AUTO on Channel 1 so the buffer is always recent
* Memory depth: 12M (or whatever fits the timebase)
* :RUN the scope so it's continuously acquiring
"""
from __future__ import annotations
import argparse
import select
import signal
import sys
import termios
import time
import tty
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
RIGOL_IP = "192.168.45.5"
DATA_DIR = Path(__file__).parent / "data" / "rail_traces"
POLL_DT_S = 0.020 # 50 Hz target — coarser than sn65_monitor
HTTP_TO_S = 0.2
RIGOL_TO_S = 10.0
# ---------------------------------------------------------------------------
# Rigol I/O
# ---------------------------------------------------------------------------
def _read_ieee_block(rigol) -> bytes:
"""Read an IEEE 488.2 binary block from the scope: '#'<n><len><data>[\\n]."""
head = rigol.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = rigol.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = rigol.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = rigol.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
rigol.read_raw(1) # trailing newline (may not be present)
except Exception:
pass
return data
def capture_trace(rigol, label: str) -> tuple[Path, float, float]:
"""
:STOP → read CH1 → :RUN. Returns (csv_path, vpp_mV, mean_V).
"""
rigol.write(":STOP")
time.sleep(0.06)
rigol.write(":WAVeform:SOURce CHANnel1")
rigol.write(":WAVeform:FORMat BYTE")
rigol.write(":WAVeform:MODE NORM")
time.sleep(0.02)
preamble = rigol.ask(":WAVeform:PREamble?").strip().split(",")
# format,type,points,count,xinc,xorig,xref,yinc,yorig,yref
xinc = float(preamble[4]); xorig = float(preamble[5])
yinc = float(preamble[7]); yorig = float(preamble[8])
yref = float(preamble[9])
rigol.write(":WAVeform:DATA?")
raw = _read_ieee_block(rigol)
codes = np.frombuffer(raw, dtype=np.uint8)
volts = (codes.astype(np.float64) - yref - yorig) * yinc
t = np.arange(len(volts)) * xinc + xorig
DATA_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
csv_path = DATA_DIR / f"{ts}_{label}.csv"
np.savetxt(csv_path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
rigol.write(":RUN")
vpp_mV = float((volts.max() - volts.min()) * 1000)
mean_V = float(volts.mean())
return csv_path, vpp_mV, mean_V
# ---------------------------------------------------------------------------
# SN65 state extraction
# ---------------------------------------------------------------------------
def pll_state(data: dict | None):
if not isinstance(data, dict):
return None
regs = data.get("registers", {})
if not isinstance(regs, dict):
return None
csr_0a = regs.get("csr_0a") or {}
return csr_0a.get("pll_lock")
# ---------------------------------------------------------------------------
# Non-blocking keys
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--test", action="store_true",
help="Take one immediate trace + exit (verifies Rigol comms)")
args = ap.parse_args()
DATA_DIR.mkdir(parents=True, exist_ok=True)
sess = requests.Session()
print(f"RAIL WATCH")
print(f" sn65 endpoint: {SN65_EP}")
print(f" Rigol IP: {RIGOL_IP}")
print(f" Output dir: {DATA_DIR.relative_to(DATA_DIR.parent.parent)}")
try:
rigol = vxi11.Instrument(RIGOL_IP)
rigol.timeout = RIGOL_TO_S
idn = rigol.ask("*IDN?").strip()
print(f" Rigol IDN: {idn}")
except Exception as e:
print(f" *** RIGOL CONNECTION FAILED: {e} ***")
sys.exit(1)
if args.test:
print("\n--test: taking one capture now...")
try:
path, vpp, mean = capture_trace(rigol, "test")
print(f" saved {path.name}")
print(f" Vpp = {vpp:.1f} mV mean = {mean:.3f} V")
except Exception as e:
print(f" CAPTURE FAILED: {e}")
sys.exit(0)
def _shutdown(*_):
try:
rigol.write(":RUN")
except Exception:
pass
print("\nstopped — Rigol restored to RUN")
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print("\nkeys: g=baseline capture q=quit\n", flush=True)
print(f" {'time':<14} {'event':<12} {'file':<40} {'Vpp':>7} {'mean':>7}")
print(f" {'-'*14} {'-'*12} {'-'*40} {'-'*7} {'-'*7}")
last_pll = None
unlock_count = 0
baseline_count = 0
err_count = 0
with KeyReader() as keys:
while True:
t0 = time.time()
try:
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
pll = pll_state(r.json())
err_count = 0
except Exception:
pll = None
err_count += 1
# Trigger Rigol on True → False (a real unlock). We ignore the
# True → None case (transient I2C read failure) since it isn't
# a PLL state change.
if last_pll is True and pll is False:
unlock_count += 1
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
try:
path, vpp, mean = capture_trace(
rigol, f"unlock_{unlock_count:04d}")
print(f" {iso:<14} {'UNLOCK':<12} "
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
flush=True)
except Exception as e:
print(f" {iso:<14} UNLOCK CAPTURE FAILED: {e}",
flush=True)
last_pll = pll if pll is not None else last_pll
# Manual baseline capture
key = keys.get_key()
if key == "g":
baseline_count += 1
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
try:
path, vpp, mean = capture_trace(
rigol, f"baseline_{baseline_count:04d}")
print(f" {iso:<14} {'BASELINE':<12} "
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
flush=True)
except Exception as e:
print(f" {iso:<14} BASELINE CAPTURE FAILED: {e}",
flush=True)
elif key == "q":
_shutdown()
# Pace
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
if __name__ == "__main__":
main()