Files
david rice 0edb95d7e1 Updates
2026-05-06 15:57:48 +01:00

134 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Keysight DSO80204B scope controller over VXI-11 / SCPI.
Configures channels for MIPI D-PHY probing (50 Ω DC, 19.2× attenuation),
arms a single trigger, and downloads ASCII waveforms with absolute
timestamps reconstructed from the preamble.
CRITICAL: All four channels must be DC50 — the 910R+50R divider only gives
the documented 19.2× ratio with 50 Ω termination.
"""
from __future__ import annotations
import io
import logging
import time
import pandas as pd
import vxi11
from config import (
PROBE_ATTENUATION,
SCOPE_CHANNELS,
SCOPE_POINTS,
SCOPE_TIMEBASE,
TRIGGER_CHANNEL,
TRIGGER_LEVEL_V,
TRIGGER_SLOPE,
)
log = logging.getLogger(__name__)
class ScopeController:
def __init__(self, ip: str) -> None:
self.ip = ip
self._inst = vxi11.Instrument(ip)
idn = self._inst.ask("*IDN?").strip()
log.info("Scope connected: %s", idn)
self.idn = idn
def setup(self) -> None:
i = self._inst
i.write("*RST")
time.sleep(1.0)
i.write(":STOP")
for label, ch in SCOPE_CHANNELS.items():
i.write(f":CHANnel{ch}:DISPlay ON")
i.write(f":CHANnel{ch}:INPut DC50")
i.write(f":CHANnel{ch}:PROBe {PROBE_ATTENUATION}")
i.write(f":CHANnel{ch}:SCALe 0.05")
i.write(f":CHANnel{ch}:OFFSet 0.0")
i.write(f":CHANnel{ch}:LABel '{label}'")
i.write(f":TIMebase:SCALe {SCOPE_TIMEBASE:.3E}")
i.write(":TIMebase:POSition 0")
i.write(":TIMebase:REFerence CENTer")
i.write(":TRIGger:MODE EDGE")
i.write(f":TRIGger:EDGE:SOURce CHANnel{TRIGGER_CHANNEL}")
i.write(f":TRIGger:EDGE:SLOPe {TRIGGER_SLOPE}")
i.write(f":TRIGger:EDGE:LEVel {TRIGGER_LEVEL_V}")
i.write(":TRIGger:SWEep NORMal")
i.write(":ACQuire:MODE RTIMe")
i.write(":ACQuire:INTerpolate ON")
i.write(f":ACQuire:POINts {SCOPE_POINTS}")
i.write(":DISPlay:LAYout STACKed")
def arm_single(self) -> None:
self._inst.write(":SINGle")
def wait_for_trigger(self, timeout_s: float = 30.0) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
try:
ter = int(self._inst.ask(":TER?").strip())
except ValueError:
ter = 0
if ter == 1:
return True
time.sleep(0.1)
return False
def _read_preamble(self, channel: int) -> dict:
self._inst.write(f":WAVeform:SOURce CHANnel{channel}")
raw = self._inst.ask(":WAVeform:PREamble?").strip()
parts = raw.split(",")
return {
"format": int(parts[0]),
"type": int(parts[1]),
"points": int(parts[2]),
"count": int(parts[3]),
"x_increment": float(parts[4]),
"x_origin": float(parts[5]),
"x_reference": float(parts[6]),
"y_increment": float(parts[7]),
"y_origin": float(parts[8]),
"y_reference": float(parts[9]),
}
def download_waveform(self, channel: int) -> pd.DataFrame:
preamble = self._read_preamble(channel)
self._inst.write(":WAVeform:FORMat ASCii")
self._inst.write(":WAVeform:STReaming ON")
self._inst.write(f":WAVeform:SOURce CHANnel{channel}")
raw = self._inst.ask(":WAVeform:DATA?")
if raw.startswith("#"):
n_digits = int(raw[1])
raw = raw[2 + n_digits :]
voltages = pd.read_csv(io.StringIO(raw), header=None).iloc[0].astype(float).values
n = len(voltages)
x_inc = preamble["x_increment"]
x_origin = preamble["x_origin"]
times = x_origin + x_inc * pd.Series(range(n), dtype="float64")
return pd.DataFrame({"time_s": times, "voltage_v": voltages})
def download_all(self) -> dict[str, pd.DataFrame]:
return {
label: self.download_waveform(ch) for label, ch in SCOPE_CHANNELS.items()
}
def close(self) -> None:
try:
self._inst.close()
except Exception:
pass