258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
rail_watch.py — Capture Rigol DS1202Z-E CH1 (1V8 supply rail) every time the
|
||
SN65DSI83 reports a MIPI PLL unlock.
|
||
|
||
Architecture
|
||
------------
|
||
- Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions.
|
||
- On each unlock, :STOPs the Rigol, reads CH1 waveform via :WAV:DATA?, saves
|
||
to CSV in data/rail_traces/, prints peak-to-peak ripple, then :RUNs again.
|
||
- Press `g` to capture a baseline (clean) trace. Press `q` to quit.
|
||
|
||
Rigol setup (do once on the front panel before running):
|
||
* Channel 1 probed on the 1V8 rail derived to the MIPI PHY
|
||
* DC coupling with offset, or AC coupling for ripple-only view
|
||
* Recommended: 20 mV/div, 5–10 ms/div (60–120 ms window)
|
||
* Trigger: AUTO on Channel 1 so the buffer is always recent
|
||
* Memory depth: 12M (or whatever fits the timebase)
|
||
* :RUN the scope so it's continuously acquiring
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import select
|
||
import signal
|
||
import sys
|
||
import termios
|
||
import time
|
||
import tty
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import numpy as np
|
||
import requests
|
||
import vxi11
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
DEVICE_BASE = "http://192.168.45.8:5000"
|
||
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
|
||
RIGOL_IP = "192.168.45.5"
|
||
DATA_DIR = Path(__file__).parent / "data" / "rail_traces"
|
||
|
||
POLL_DT_S = 0.020 # 50 Hz target — coarser than sn65_monitor
|
||
HTTP_TO_S = 0.2
|
||
RIGOL_TO_S = 10.0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rigol I/O
|
||
# ---------------------------------------------------------------------------
|
||
def _read_ieee_block(rigol) -> bytes:
|
||
"""Read an IEEE 488.2 binary block from the scope: '#'<n><len><data>[\\n]."""
|
||
head = rigol.read_raw(2)
|
||
if not head.startswith(b"#"):
|
||
idx = head.find(b"#")
|
||
if idx < 0:
|
||
extra = rigol.read_raw(64)
|
||
head += extra
|
||
idx = head.find(b"#")
|
||
head = head[idx:idx + 2]
|
||
ndigits = int(head[1:2])
|
||
length_bytes = rigol.read_raw(ndigits)
|
||
nbytes = int(length_bytes)
|
||
data = b""
|
||
while len(data) < nbytes:
|
||
chunk = rigol.read_raw(nbytes - len(data))
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
try:
|
||
rigol.read_raw(1) # trailing newline (may not be present)
|
||
except Exception:
|
||
pass
|
||
return data
|
||
|
||
|
||
def capture_trace(rigol, label: str) -> tuple[Path, float, float]:
|
||
"""
|
||
:STOP → read CH1 → :RUN. Returns (csv_path, vpp_mV, mean_V).
|
||
"""
|
||
rigol.write(":STOP")
|
||
time.sleep(0.06)
|
||
|
||
rigol.write(":WAVeform:SOURce CHANnel1")
|
||
rigol.write(":WAVeform:FORMat BYTE")
|
||
rigol.write(":WAVeform:MODE NORM")
|
||
time.sleep(0.02)
|
||
|
||
preamble = rigol.ask(":WAVeform:PREamble?").strip().split(",")
|
||
# format,type,points,count,xinc,xorig,xref,yinc,yorig,yref
|
||
xinc = float(preamble[4]); xorig = float(preamble[5])
|
||
yinc = float(preamble[7]); yorig = float(preamble[8])
|
||
yref = float(preamble[9])
|
||
|
||
rigol.write(":WAVeform:DATA?")
|
||
raw = _read_ieee_block(rigol)
|
||
codes = np.frombuffer(raw, dtype=np.uint8)
|
||
volts = (codes.astype(np.float64) - yref - yorig) * yinc
|
||
t = np.arange(len(volts)) * xinc + xorig
|
||
|
||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
|
||
csv_path = DATA_DIR / f"{ts}_{label}.csv"
|
||
np.savetxt(csv_path, np.column_stack([t, volts]),
|
||
delimiter=",", fmt="%.6e")
|
||
|
||
rigol.write(":RUN")
|
||
vpp_mV = float((volts.max() - volts.min()) * 1000)
|
||
mean_V = float(volts.mean())
|
||
return csv_path, vpp_mV, mean_V
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SN65 state extraction
|
||
# ---------------------------------------------------------------------------
|
||
def pll_state(data: dict | None):
|
||
if not isinstance(data, dict):
|
||
return None
|
||
regs = data.get("registers", {})
|
||
if not isinstance(regs, dict):
|
||
return None
|
||
csr_0a = regs.get("csr_0a") or {}
|
||
return csr_0a.get("pll_lock")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Non-blocking keys
|
||
# ---------------------------------------------------------------------------
|
||
class KeyReader:
|
||
def __enter__(self):
|
||
self.fd = sys.stdin.fileno()
|
||
self.old = termios.tcgetattr(self.fd)
|
||
tty.setcbreak(self.fd)
|
||
return self
|
||
|
||
def get_key(self) -> str | None:
|
||
if select.select([sys.stdin], [], [], 0)[0]:
|
||
return sys.stdin.read(1).lower()
|
||
return None
|
||
|
||
def __exit__(self, *_):
|
||
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument("--test", action="store_true",
|
||
help="Take one immediate trace + exit (verifies Rigol comms)")
|
||
args = ap.parse_args()
|
||
|
||
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||
sess = requests.Session()
|
||
|
||
print(f"RAIL WATCH")
|
||
print(f" sn65 endpoint: {SN65_EP}")
|
||
print(f" Rigol IP: {RIGOL_IP}")
|
||
print(f" Output dir: {DATA_DIR.relative_to(DATA_DIR.parent.parent)}")
|
||
|
||
try:
|
||
rigol = vxi11.Instrument(RIGOL_IP)
|
||
rigol.timeout = RIGOL_TO_S
|
||
idn = rigol.ask("*IDN?").strip()
|
||
print(f" Rigol IDN: {idn}")
|
||
except Exception as e:
|
||
print(f" *** RIGOL CONNECTION FAILED: {e} ***")
|
||
sys.exit(1)
|
||
|
||
if args.test:
|
||
print("\n--test: taking one capture now...")
|
||
try:
|
||
path, vpp, mean = capture_trace(rigol, "test")
|
||
print(f" saved {path.name}")
|
||
print(f" Vpp = {vpp:.1f} mV mean = {mean:.3f} V")
|
||
except Exception as e:
|
||
print(f" CAPTURE FAILED: {e}")
|
||
sys.exit(0)
|
||
|
||
def _shutdown(*_):
|
||
try:
|
||
rigol.write(":RUN")
|
||
except Exception:
|
||
pass
|
||
print("\nstopped — Rigol restored to RUN")
|
||
sys.exit(0)
|
||
signal.signal(signal.SIGINT, _shutdown)
|
||
signal.signal(signal.SIGTERM, _shutdown)
|
||
|
||
print("\nkeys: g=baseline capture q=quit\n", flush=True)
|
||
print(f" {'time':<14} {'event':<12} {'file':<40} {'Vpp':>7} {'mean':>7}")
|
||
print(f" {'-'*14} {'-'*12} {'-'*40} {'-'*7} {'-'*7}")
|
||
|
||
last_pll = None
|
||
unlock_count = 0
|
||
baseline_count = 0
|
||
err_count = 0
|
||
|
||
with KeyReader() as keys:
|
||
while True:
|
||
t0 = time.time()
|
||
try:
|
||
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
|
||
r.raise_for_status()
|
||
pll = pll_state(r.json())
|
||
err_count = 0
|
||
except Exception:
|
||
pll = None
|
||
err_count += 1
|
||
|
||
# Trigger Rigol on True → False (a real unlock). We ignore the
|
||
# True → None case (transient I2C read failure) since it isn't
|
||
# a PLL state change.
|
||
if last_pll is True and pll is False:
|
||
unlock_count += 1
|
||
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
try:
|
||
path, vpp, mean = capture_trace(
|
||
rigol, f"unlock_{unlock_count:04d}")
|
||
print(f" {iso:<14} {'UNLOCK':<12} "
|
||
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
|
||
flush=True)
|
||
except Exception as e:
|
||
print(f" {iso:<14} UNLOCK CAPTURE FAILED: {e}",
|
||
flush=True)
|
||
|
||
last_pll = pll if pll is not None else last_pll
|
||
|
||
# Manual baseline capture
|
||
key = keys.get_key()
|
||
if key == "g":
|
||
baseline_count += 1
|
||
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
try:
|
||
path, vpp, mean = capture_trace(
|
||
rigol, f"baseline_{baseline_count:04d}")
|
||
print(f" {iso:<14} {'BASELINE':<12} "
|
||
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
|
||
flush=True)
|
||
except Exception as e:
|
||
print(f" {iso:<14} BASELINE CAPTURE FAILED: {e}",
|
||
flush=True)
|
||
elif key == "q":
|
||
_shutdown()
|
||
|
||
# Pace
|
||
elapsed = time.time() - t0
|
||
if elapsed < POLL_DT_S:
|
||
time.sleep(POLL_DT_S - elapsed)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |