Parked for now

This commit is contained in:
david rice
2026-05-15 16:32:15 +01:00
parent d73aa2f2a4
commit 39f4355b8d
21 changed files with 3031 additions and 1 deletions

258
rail_watch.py Normal file
View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""
rail_watch.py — Capture Rigol DS1202Z-E CH1 (1V8 supply rail) every time the
SN65DSI83 reports a MIPI PLL unlock.
Architecture
------------
- Polls /sn65_registers at ~50 Hz looking for pll_lock True→False transitions.
- On each unlock, :STOPs the Rigol, reads CH1 waveform via :WAV:DATA?, saves
to CSV in data/rail_traces/, prints peak-to-peak ripple, then :RUNs again.
- Press `g` to capture a baseline (clean) trace. Press `q` to quit.
Rigol setup (do once on the front panel before running):
* Channel 1 probed on the 1V8 rail derived to the MIPI PHY
* DC coupling with offset, or AC coupling for ripple-only view
* Recommended: 20 mV/div, 510 ms/div (60120 ms window)
* Trigger: AUTO on Channel 1 so the buffer is always recent
* Memory depth: 12M (or whatever fits the timebase)
* :RUN the scope so it's continuously acquiring
"""
from __future__ import annotations
import argparse
import select
import signal
import sys
import termios
import time
import tty
from datetime import datetime
from pathlib import Path
import numpy as np
import requests
import vxi11
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEVICE_BASE = "http://192.168.45.8:5000"
SN65_EP = f"{DEVICE_BASE}/sn65_registers"
RIGOL_IP = "192.168.45.5"
DATA_DIR = Path(__file__).parent / "data" / "rail_traces"
POLL_DT_S = 0.020 # 50 Hz target — coarser than sn65_monitor
HTTP_TO_S = 0.2
RIGOL_TO_S = 10.0
# ---------------------------------------------------------------------------
# Rigol I/O
# ---------------------------------------------------------------------------
def _read_ieee_block(rigol) -> bytes:
"""Read an IEEE 488.2 binary block from the scope: '#'<n><len><data>[\\n]."""
head = rigol.read_raw(2)
if not head.startswith(b"#"):
idx = head.find(b"#")
if idx < 0:
extra = rigol.read_raw(64)
head += extra
idx = head.find(b"#")
head = head[idx:idx + 2]
ndigits = int(head[1:2])
length_bytes = rigol.read_raw(ndigits)
nbytes = int(length_bytes)
data = b""
while len(data) < nbytes:
chunk = rigol.read_raw(nbytes - len(data))
if not chunk:
break
data += chunk
try:
rigol.read_raw(1) # trailing newline (may not be present)
except Exception:
pass
return data
def capture_trace(rigol, label: str) -> tuple[Path, float, float]:
"""
:STOP → read CH1 → :RUN. Returns (csv_path, vpp_mV, mean_V).
"""
rigol.write(":STOP")
time.sleep(0.06)
rigol.write(":WAVeform:SOURce CHANnel1")
rigol.write(":WAVeform:FORMat BYTE")
rigol.write(":WAVeform:MODE NORM")
time.sleep(0.02)
preamble = rigol.ask(":WAVeform:PREamble?").strip().split(",")
# format,type,points,count,xinc,xorig,xref,yinc,yorig,yref
xinc = float(preamble[4]); xorig = float(preamble[5])
yinc = float(preamble[7]); yorig = float(preamble[8])
yref = float(preamble[9])
rigol.write(":WAVeform:DATA?")
raw = _read_ieee_block(rigol)
codes = np.frombuffer(raw, dtype=np.uint8)
volts = (codes.astype(np.float64) - yref - yorig) * yinc
t = np.arange(len(volts)) * xinc + xorig
DATA_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
csv_path = DATA_DIR / f"{ts}_{label}.csv"
np.savetxt(csv_path, np.column_stack([t, volts]),
delimiter=",", fmt="%.6e")
rigol.write(":RUN")
vpp_mV = float((volts.max() - volts.min()) * 1000)
mean_V = float(volts.mean())
return csv_path, vpp_mV, mean_V
# ---------------------------------------------------------------------------
# SN65 state extraction
# ---------------------------------------------------------------------------
def pll_state(data: dict | None):
if not isinstance(data, dict):
return None
regs = data.get("registers", {})
if not isinstance(regs, dict):
return None
csr_0a = regs.get("csr_0a") or {}
return csr_0a.get("pll_lock")
# ---------------------------------------------------------------------------
# Non-blocking keys
# ---------------------------------------------------------------------------
class KeyReader:
def __enter__(self):
self.fd = sys.stdin.fileno()
self.old = termios.tcgetattr(self.fd)
tty.setcbreak(self.fd)
return self
def get_key(self) -> str | None:
if select.select([sys.stdin], [], [], 0)[0]:
return sys.stdin.read(1).lower()
return None
def __exit__(self, *_):
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--test", action="store_true",
help="Take one immediate trace + exit (verifies Rigol comms)")
args = ap.parse_args()
DATA_DIR.mkdir(parents=True, exist_ok=True)
sess = requests.Session()
print(f"RAIL WATCH")
print(f" sn65 endpoint: {SN65_EP}")
print(f" Rigol IP: {RIGOL_IP}")
print(f" Output dir: {DATA_DIR.relative_to(DATA_DIR.parent.parent)}")
try:
rigol = vxi11.Instrument(RIGOL_IP)
rigol.timeout = RIGOL_TO_S
idn = rigol.ask("*IDN?").strip()
print(f" Rigol IDN: {idn}")
except Exception as e:
print(f" *** RIGOL CONNECTION FAILED: {e} ***")
sys.exit(1)
if args.test:
print("\n--test: taking one capture now...")
try:
path, vpp, mean = capture_trace(rigol, "test")
print(f" saved {path.name}")
print(f" Vpp = {vpp:.1f} mV mean = {mean:.3f} V")
except Exception as e:
print(f" CAPTURE FAILED: {e}")
sys.exit(0)
def _shutdown(*_):
try:
rigol.write(":RUN")
except Exception:
pass
print("\nstopped — Rigol restored to RUN")
sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
print("\nkeys: g=baseline capture q=quit\n", flush=True)
print(f" {'time':<14} {'event':<12} {'file':<40} {'Vpp':>7} {'mean':>7}")
print(f" {'-'*14} {'-'*12} {'-'*40} {'-'*7} {'-'*7}")
last_pll = None
unlock_count = 0
baseline_count = 0
err_count = 0
with KeyReader() as keys:
while True:
t0 = time.time()
try:
r = sess.get(SN65_EP, timeout=HTTP_TO_S)
r.raise_for_status()
pll = pll_state(r.json())
err_count = 0
except Exception:
pll = None
err_count += 1
# Trigger Rigol on True → False (a real unlock). We ignore the
# True → None case (transient I2C read failure) since it isn't
# a PLL state change.
if last_pll is True and pll is False:
unlock_count += 1
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
try:
path, vpp, mean = capture_trace(
rigol, f"unlock_{unlock_count:04d}")
print(f" {iso:<14} {'UNLOCK':<12} "
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
flush=True)
except Exception as e:
print(f" {iso:<14} UNLOCK CAPTURE FAILED: {e}",
flush=True)
last_pll = pll if pll is not None else last_pll
# Manual baseline capture
key = keys.get_key()
if key == "g":
baseline_count += 1
iso = datetime.now().strftime("%H:%M:%S.%f")[:-3]
try:
path, vpp, mean = capture_trace(
rigol, f"baseline_{baseline_count:04d}")
print(f" {iso:<14} {'BASELINE':<12} "
f"{path.name:<40} {vpp:>5.1f}mV {mean:>5.3f}V",
flush=True)
except Exception as e:
print(f" {iso:<14} BASELINE CAPTURE FAILED: {e}",
flush=True)
elif key == "q":
_shutdown()
# Pace
elapsed = time.time() - t0
if elapsed < POLL_DT_S:
time.sleep(POLL_DT_S - elapsed)
if __name__ == "__main__":
main()