Files
MiPi_TEST/csv_preprocessor.py

585 lines
25 KiB
Python
Raw Normal View History

2026-04-08 12:55:34 +01:00
"""
csv_preprocessor.py
Extracts MIPI HS-TX / LP state metrics from oscilloscope CSV files.
File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv
sig high-res short window (320 GSa/s, ~20 ns) rise/fall times
Two columns: time_s, vdiff_v (F1/F2 differential, ±250 mV HS swing)
proto lower-res long window (20 GSa/s, ~10 µs) jitter, frequency, amplitude
Two columns: time_s, vdiff_v (F1/F2 differential)
lp LP state capture (~40 GSa/s, ~5 µs) LP-11/LP-00/HS burst structure
Two columns: time_s, voltage_v (Ch1 or Ch3 single-ended CLK+/DAT0+)
Vertical range: 0.2 V to 1.4 V so LP-11 (~1.2 V) and LP-00 (~0 V) are visible.
Trigger: falling edge at 0.6 V on CLK+ catches LP-11 LP-01 SoT transition.
"""
import csv
import re
import numpy as np
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# MIPI D-PHY HS-TX spec limits
HS_VDIFF_MIN_MV = 140.0 # |Vdiff| minimum (mV)
HS_VDIFF_MAX_MV = 270.0 # |Vdiff| maximum (mV)
RISE_FALL_MAX_PS = 500.0 # rise/fall time limit 20%80% (ps)
# Thresholds for "settled" vs "transitioning"
TRANSITION_BAND_MV = 50.0 # |Vdiff| < this is considered a transition, not settled
# MIPI D-PHY LP state thresholds (single-ended voltage, after probe compensation)
LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (both pins high ~1.2 V)
2026-04-08 14:19:31 +01:00
LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low
# Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset
# The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low.
2026-04-08 12:55:34 +01:00
LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
@dataclass
class ChannelMetrics:
timestamp: str
capture_num: int
file_type: str # "sig" | "proto"
channel: str # "clk" | "dat"
sample_rate_gsps: float
duration_ns: float
n_samples: int
# HS-TX differential voltage
vdiff_pos_mv: float # mean settled positive level (HS "1")
vdiff_neg_mv: float # mean settled negative level (HS "0")
vdiff_amplitude_mv: float # (|pos| + |neg|) / 2 — spec: 140270 mV
vcm_mv: float # (pos + neg) / 2 — common-mode offset
# Timing (None when there are too few transitions to measure)
clock_freq_mhz: Optional[float] = None
jitter_pp_ps: Optional[float] = None
jitter_rms_ps: Optional[float] = None
rise_time_ps: Optional[float] = None
fall_time_ps: Optional[float] = None
n_transitions: int = 0
# Spec violations
spec_violations: int = 0 # settled samples where |Vdiff| < HS_VDIFF_MIN_MV
warnings: list = field(default_factory=list)
def summary(self) -> str:
ok = lambda cond: "" if cond else ""
lines = [
f"Capture {self.capture_num:04d} {self.timestamp} [{self.file_type}/{self.channel}]",
f" Vdiff amplitude : {self.vdiff_amplitude_mv:6.1f} mV "
f"(spec 140270 mV) {ok(HS_VDIFF_MIN_MV <= self.vdiff_amplitude_mv <= HS_VDIFF_MAX_MV)}",
f" Vdiff pos/neg : +{self.vdiff_pos_mv:.1f} / {self.vdiff_neg_mv:.1f} mV",
f" Common mode : {self.vcm_mv:+.1f} mV",
]
if self.clock_freq_mhz is not None:
lines.append(
f" Clock freq : {self.clock_freq_mhz:.2f} MHz DDR "
f"({self.n_transitions} transitions)"
)
if self.jitter_pp_ps is not None:
lines.append(
f" Jitter p-p/RMS : {self.jitter_pp_ps:.1f} ps / {self.jitter_rms_ps:.1f} ps"
)
if self.rise_time_ps is not None:
lines.append(
f" Rise time 20-80%: {self.rise_time_ps:.1f} ps "
f"{ok(self.rise_time_ps <= RISE_FALL_MAX_PS)}"
)
if self.fall_time_ps is not None:
lines.append(
f" Fall time 20-80%: {self.fall_time_ps:.1f} ps "
f"{ok(self.fall_time_ps <= RISE_FALL_MAX_PS)}"
)
if self.spec_violations:
lines.append(f" Spec violations : {self.spec_violations} samples below {HS_VDIFF_MIN_MV:.0f} mV ✗")
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _read_csv(path: Path) -> tuple[np.ndarray, np.ndarray]:
times, volts = [], []
with open(path) as f:
for row in csv.reader(f):
if len(row) >= 2:
try:
times.append(float(row[0]))
volts.append(float(row[1]))
except ValueError:
pass # skip any header row
return np.array(times, dtype=np.float64), np.array(volts, dtype=np.float64)
def _zero_crossings(times: np.ndarray, volts: np.ndarray) -> np.ndarray:
"""Return array of linearly-interpolated zero-crossing times (seconds)."""
signs = np.sign(volts)
change = np.diff(signs)
idx = np.where(change != 0)[0]
ct = []
for i in idx:
if signs[i] != 0 and signs[i + 1] != 0:
frac = -volts[i] / (volts[i + 1] - volts[i])
ct.append(times[i] + frac * (times[i + 1] - times[i]))
return np.array(ct)
def _rise_fall_times(times: np.ndarray, volts: np.ndarray,
v_high: float, v_low: float,
window_samples: int = 60) -> tuple[list, list]:
"""
Measure 20%80% rise and fall times around each zero crossing.
Returns (rise_times_ps, fall_times_ps).
"""
v20 = v_low + 0.20 * (v_high - v_low)
v80 = v_low + 0.80 * (v_high - v_low)
signs = np.sign(volts)
trans_idx = np.where(np.diff(signs) != 0)[0]
rise_ps, fall_ps = [], []
for idx in trans_idx:
s = max(0, idx - window_samples // 2)
e = min(len(times), idx + window_samples // 2)
tw = times[s:e]
vw = volts[s:e]
if len(vw) < 4:
continue
if volts[min(idx + 1, len(volts) - 1)] > volts[idx]: # rising edge
# find where vw first crosses v20 (ascending) then v80
i20 = np.searchsorted(vw, v20)
i80 = np.searchsorted(vw, v80)
if 0 < i20 < len(tw) - 1 and 0 < i80 < len(tw) - 1 and i80 > i20:
# interpolate each threshold
t20 = np.interp(v20, vw[i20 - 1:i20 + 1], tw[i20 - 1:i20 + 1])
t80 = np.interp(v80, vw[i80 - 1:i80 + 1], tw[i80 - 1:i80 + 1])
rise_ps.append((t80 - t20) * 1e12)
else: # falling edge
# descending: reverse the window so searchsorted still works
vw_r = vw[::-1]
tw_r = tw[::-1]
i80 = np.searchsorted(vw_r, v80)
i20 = np.searchsorted(vw_r, v20)
if 0 < i80 < len(tw_r) - 1 and 0 < i20 < len(tw_r) - 1 and i20 > i80:
t80 = np.interp(v80, vw_r[i80 - 1:i80 + 1], tw_r[i80 - 1:i80 + 1])
t20 = np.interp(v20, vw_r[i20 - 1:i20 + 1], tw_r[i20 - 1:i20 + 1])
fall_ps.append((t20 - t80) * 1e12)
return rise_ps, fall_ps
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def analyze_file(path: Path) -> ChannelMetrics:
"""
Analyse one oscilloscope CSV file and return a ChannelMetrics instance.
"""
m = re.match(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv",
path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match expected pattern: {path.name}")
timestamp, file_type, cap_str, channel = m.groups()
if file_type == "lp":
raise ValueError("Use analyze_lp_file() for lp-type files (single-ended)")
capture_num = int(cap_str)
times, volts = _read_csv(path)
dt = float(np.diff(times).mean())
sample_rate = 1.0 / dt
duration_ns = (float(times[-1]) - float(times[0])) * 1e9
# --- Voltage levels ---
v_thresh = TRANSITION_BAND_MV / 1000.0
pos_mask = volts > v_thresh
neg_mask = volts < -v_thresh
vdiff_pos = float(volts[pos_mask].mean()) * 1000.0 if pos_mask.any() else 0.0
vdiff_neg = float(volts[neg_mask].mean()) * 1000.0 if neg_mask.any() else 0.0
# Classify signal coverage:
# no_signal — neither polarity detected (LP state or idle)
# one_sided — only one polarity in capture window (short sig window, uniform data)
no_signal = (not pos_mask.any()) and (not neg_mask.any())
one_sided = (not no_signal) and ((not pos_mask.any()) or (not neg_mask.any()))
if no_signal:
amplitude = 0.0
elif one_sided:
amplitude = max(abs(vdiff_pos), abs(vdiff_neg))
else:
amplitude = (abs(vdiff_pos) + abs(vdiff_neg)) / 2.0
vcm = (vdiff_pos + vdiff_neg) / 2.0
# --- Zero crossings → frequency + jitter (CLK only) ---
ct = _zero_crossings(times, volts)
n_transitions = len(ct)
clock_freq_mhz = jitter_pp_ps = jitter_rms_ps = None
# Jitter / frequency are only meaningful on the CLK lane.
# On DAT the bit pattern varies, so half-periods are not uniform by design.
# Require at least 20 transitions (10 full cycles) for reliable jitter.
# Sig files (~8 transitions) are too short; proto files (~4000) are fine.
if channel == "clk" and n_transitions >= 20:
half_periods = np.diff(ct) * 1e12 # ps
med = float(np.median(half_periods))
sd = float(half_periods.std())
# Remove outliers beyond 3σ (spurious glitches)
hp = half_periods[np.abs(half_periods - med) < 3.0 * sd] if sd > 0 else half_periods
if len(hp) >= 20:
clock_freq_mhz = round(1.0 / (float(np.median(hp)) * 2e-12) / 1e6, 2)
jitter_pp_ps = round(float(hp.max() - hp.min()), 1)
jitter_rms_ps = round(float(hp.std()), 1)
# --- Rise / fall times ---
v_high = vdiff_pos / 1000.0
v_low = vdiff_neg / 1000.0
rise_list, fall_list = _rise_fall_times(times, volts, v_high, v_low)
rise_time_ps = round(float(np.median(rise_list)), 1) if rise_list else None
fall_time_ps = round(float(np.median(fall_list)), 1) if fall_list else None
# --- Spec violations ---
# Only check samples that are well away from any zero crossing (bit-centres).
# Transitions naturally pass through sub-140 mV, so counting them as violations
# would be misleading. We mask out a ±guard window around each crossing.
guard_s = float(np.median(np.diff(ct))) * 0.35 if n_transitions >= 4 else dt * 10
in_guard = np.zeros(len(times), dtype=bool)
for t_cross in ct:
lo = np.searchsorted(times, t_cross - guard_s)
hi = np.searchsorted(times, t_cross + guard_s)
in_guard[lo:hi] = True
settled = (~in_guard) & (np.abs(volts) > v_thresh)
# "Transient" violations: settled samples that dip noticeably below the
# measured settled amplitude (threshold = 85 % of the smaller settled level).
# This catches genuine dips without flagging cases where the settled level
# itself is just marginally below spec (which is reported as a WARNING instead).
floor_v = 0.85 * min(abs(vdiff_pos / 1000.0), abs(vdiff_neg / 1000.0)) if (
vdiff_pos and vdiff_neg) else HS_VDIFF_MIN_MV / 1000.0
spec_violations = int(np.sum(settled & (np.abs(volts) < floor_v)))
# --- Warnings ---
warnings = []
if no_signal:
warnings.append("No HS signal detected — line may be in LP state or idle")
elif one_sided:
polarity = "positive" if pos_mask.any() else "negative"
warnings.append(
f"Only {polarity} swings in capture window — amplitude may be underestimated"
)
if not no_signal and amplitude < HS_VDIFF_MIN_MV:
warnings.append(f"Vdiff {amplitude:.0f} mV below spec min {HS_VDIFF_MIN_MV:.0f} mV")
if amplitude > HS_VDIFF_MAX_MV:
warnings.append(f"Vdiff {amplitude:.0f} mV above spec max {HS_VDIFF_MAX_MV:.0f} mV")
if rise_time_ps is not None and rise_time_ps > RISE_FALL_MAX_PS:
warnings.append(f"Rise time {rise_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps")
if fall_time_ps is not None and fall_time_ps > RISE_FALL_MAX_PS:
warnings.append(f"Fall time {fall_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps")
if spec_violations > 0:
warnings.append(f"{spec_violations} settled samples below {HS_VDIFF_MIN_MV:.0f} mV")
return ChannelMetrics(
timestamp = timestamp,
capture_num = capture_num,
file_type = file_type,
channel = channel,
sample_rate_gsps = round(sample_rate / 1e9, 1),
duration_ns = round(duration_ns, 2),
n_samples = len(times),
vdiff_pos_mv = round(vdiff_pos, 1),
vdiff_neg_mv = round(vdiff_neg, 1),
vdiff_amplitude_mv = round(amplitude, 1),
vcm_mv = round(vcm, 1),
clock_freq_mhz = clock_freq_mhz,
jitter_pp_ps = jitter_pp_ps,
jitter_rms_ps = jitter_rms_ps,
rise_time_ps = rise_time_ps,
fall_time_ps = fall_time_ps,
n_transitions = n_transitions,
spec_violations = spec_violations,
warnings = warnings,
)
def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
"""
Scan data_dir and group CSV files by (timestamp, capture_number).
Returns dict mapping (timestamp, num) {file_type_channel: Path}.
Example key: ("20260408_111448", 1)
Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...}
"""
pattern = re.compile(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv", re.IGNORECASE)
groups: dict[tuple[str, int], dict[str, Path]] = {}
for f in sorted(data_dir.glob("*.csv")):
m = pattern.match(f.name)
if not m:
continue
ts, ftype, cap_str, ch = m.groups()
key = (ts, int(cap_str))
groups.setdefault(key, {})[f"{ftype}_{ch}"] = f
return groups
# ---------------------------------------------------------------------------
# LP state analysis (lp_clk / lp_dat — single-ended Ch1 / Ch3 captures)
# ---------------------------------------------------------------------------
@dataclass
class LPMetrics:
timestamp: str
capture_num: int
channel: str # "clk" | "dat"
sample_rate_gsps: float
duration_us: float
n_samples: int
# LP-11 (both pins high ~1.2 V)
lp11_voltage_v: Optional[float] # mean level in LP-11 region (spec 1.01.45 V)
lp11_duration_us: Optional[float] # total LP-11 time in capture (pre-trigger)
2026-04-08 14:19:31 +01:00
# LP exit: gap between LP-11 falling edge and HS oscillation onset
lp11_to_hs_ns: Optional[float] # total LP exit time LP-11→HS (includes LP-01+LP-00)
lp_low_duration_ns: Optional[float] # LP-low plateau duration if a clear plateau was seen
2026-04-08 12:55:34 +01:00
# HS bursts detected within the window
n_hs_bursts: int
hs_burst_dur_ns: Optional[float] # mean HS burst duration
hs_amplitude_mv: Optional[float] # peak-to-peak single-ended HS swing (mV)
lp_transition_valid: bool # LP-11 → LP-low → HS sequence present
warnings: list = field(default_factory=list)
def summary(self) -> str:
ok = lambda c: "" if c else ""
lines = [
f"Capture {self.capture_num:04d} {self.timestamp} [lp/{self.channel}]",
]
if self.lp11_voltage_v is not None:
in_spec = LP11_SPEC_MIN_V <= self.lp11_voltage_v <= LP11_SPEC_MAX_V
lines.append(
f" LP-11 voltage : {self.lp11_voltage_v:.3f} V "
f"(spec {LP11_SPEC_MIN_V:.1f}{LP11_SPEC_MAX_V:.2f} V) {ok(in_spec)}"
)
if self.lp11_duration_us is not None:
lines.append(f" LP-11 duration : {self.lp11_duration_us:.2f} µs")
2026-04-08 14:19:31 +01:00
if self.lp11_to_hs_ns is not None:
ok_exit = self.lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS
2026-04-08 12:55:34 +01:00
lines.append(
2026-04-08 14:19:31 +01:00
f" LP exit → HS : {self.lp11_to_hs_ns:.0f} ns "
f"(spec ≥{LP_LOW_DUR_MIN_NS:.0f} ns) {ok(ok_exit)}"
2026-04-08 12:55:34 +01:00
)
2026-04-08 14:19:31 +01:00
if self.lp_low_duration_ns is not None:
lines.append(f" LP-low plateau : {self.lp_low_duration_ns:.0f} ns")
2026-04-08 12:55:34 +01:00
lines.append(
f" LP→HS sequence : {'valid ✓' if self.lp_transition_valid else 'NOT DETECTED ✗'}"
)
if self.n_hs_bursts:
lines.append(f" HS bursts : {self.n_hs_bursts}"
+ (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else ""))
if self.hs_amplitude_mv is not None:
lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)")
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
def _rolling_std(arr: np.ndarray, window: int) -> np.ndarray:
"""Compute rolling standard deviation using stride_tricks (O(n) memory, fast)."""
from numpy.lib.stride_tricks import sliding_window_view
n = len(arr)
if n <= window:
return np.full(n, arr.std())
windowed = sliding_window_view(arr, window)
stds = windowed.std(axis=1)
# Pad edges to maintain original length
pad_l = window // 2
pad_r = n - len(stds) - pad_l
return np.concatenate([np.full(pad_l, stds[0]), stds, np.full(pad_r, stds[-1])])
def _find_contiguous_regions(mask: np.ndarray, min_samples: int = 5):
"""Return list of (start_idx, end_idx) for True runs ≥ min_samples long."""
padded = np.concatenate([[False], mask, [False]])
diff = np.diff(padded.astype(np.int8))
starts = np.where(diff == 1)[0]
ends = np.where(diff == -1)[0]
return [(s, e) for s, e in zip(starts, ends) if (e - s) >= min_samples]
def analyze_lp_file(path: Path) -> "LPMetrics":
"""
Analyse a single-ended LP capture CSV (Ch1 or Ch3) and return LPMetrics.
State classification per sample:
LP-11 : voltage > LP11_HIGH_V (~1.2 V, both pins high)
LP-low : voltage < LP_LOW_V (~0 V, pin driven low LP-01 or LP-00)
HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V)
trans : everything else (transitions between states)
"""
m = re.match(r"(\d{8}_\d{6})_lp_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match lp pattern: {path.name}")
timestamp, cap_str, channel = m.groups()
capture_num = int(cap_str)
times, volts = _read_csv(path)
dt = float(np.diff(times).mean())
sample_rate = 1.0 / dt
duration_us = (float(times[-1]) - float(times[0])) * 1e6
2026-04-08 14:19:31 +01:00
# ── LP-11 detection ───────────────────────────────────────────────────
# LP-11 is reliable: voltage is clearly above LP11_HIGH_V (0.8 V).
lp11_mask = volts > LP11_HIGH_V
2026-04-08 12:55:34 +01:00
lp11_regions = _find_contiguous_regions(lp11_mask, min_samples=10)
2026-04-08 14:19:31 +01:00
2026-04-08 12:55:34 +01:00
lp11_voltage_v = None
lp11_duration_us = None
if lp11_regions:
lp11_voltage_v = round(float(np.concatenate(
[volts[s:e] for s, e in lp11_regions]).mean()), 3)
lp11_duration_us = round(
sum((times[e] - times[s]) for s, e in lp11_regions) * 1e6, 3)
2026-04-08 14:19:31 +01:00
# ── HS burst detection ────────────────────────────────────────────────
# On DAT0+ with a uniform-colour display, HS data can look DC (no bit
# transitions), making oscillation-based HS detection unreliable.
# Instead: every non-LP-11 gap between LP-11 regions is treated as an
# HS burst. The first gap starts at the end of the first LP-11 region;
# subsequent gaps are between consecutive LP-11 regions.
lp11_to_hs_ns = None
2026-04-08 12:55:34 +01:00
lp_low_duration_ns = None
lp_transition_valid = False
2026-04-08 14:19:31 +01:00
n_hs_bursts = 0
2026-04-08 12:55:34 +01:00
hs_burst_dur_ns = None
hs_amplitude_mv = None
2026-04-08 14:19:31 +01:00
if len(lp11_regions) >= 1:
# Measure LP-11 → HS exit gap (LP-01 + LP-00 combined) using a rolling
# std: the brief exit transition is the first period of measurable
# oscillation (rolling std > threshold) after LP-11 ends.
window = max(10, int(1e-9 / dt))
rstd = _rolling_std(volts, window)
hs_bursts = []
for i, (lp11_s, lp11_e) in enumerate(lp11_regions):
# Burst ends at start of next LP-11, or at window end
burst_end = lp11_regions[i + 1][0] if i + 1 < len(lp11_regions) else len(times) - 1
burst_dur_ns = round((times[burst_end] - times[lp11_e]) * 1e9, 1)
hs_bursts.append((lp11_e, burst_end, burst_dur_ns))
if hs_bursts:
n_hs_bursts = len(hs_bursts)
hs_burst_dur_ns = round(float(np.mean([d for _, _, d in hs_bursts])), 1)
lp_transition_valid = True
2026-04-08 12:55:34 +01:00
2026-04-08 14:19:31 +01:00
# LP exit gap: find first rolling-std > threshold after LP-11 ends
s_end = lp11_regions[0][1]
lookahead = min(s_end + int(500e-9 / dt), len(times) - 1)
high_std_idx = np.where(rstd[s_end:lookahead] >= HS_OSC_STD_V)[0]
if len(high_std_idx):
lp11_to_hs_ns = round((times[s_end + high_std_idx[0]] - times[s_end]) * 1e9, 1)
# LP-low plateau: look for a contiguous region in the exit window
# where voltage < LP_LOW_V and std is low (true LP-01/LP-00 plateau)
lp_low_mask = (volts < LP_LOW_V) & (rstd < HS_OSC_STD_V)
lp_low_regions = _find_contiguous_regions(lp_low_mask, min_samples=5)
exit_window = int(1e-6 / dt)
for lplow_s, lplow_e in lp_low_regions:
if s_end <= lplow_s <= s_end + exit_window:
lp_low_duration_ns = round(
(times[lplow_e] - times[lplow_s]) * 1e9, 1)
break
# HS single-ended amplitude from the first burst (where data may vary)
if hs_bursts:
s, e, _ = hs_bursts[0]
burst_volts = volts[s:e]
hs_amplitude_mv = round(
(float(np.percentile(burst_volts, 95)) -
float(np.percentile(burst_volts, 5))) / 2 * 1000, 1
)
2026-04-08 12:55:34 +01:00
# ── Warnings ─────────────────────────────────────────────────────────
warnings = []
2026-04-08 14:19:31 +01:00
continuous_hs_clk = (not lp11_regions) and (channel == "clk") and (float(volts.max()) < LP11_HIGH_V)
if continuous_hs_clk:
warnings.append("CLK lane is in continuous HS mode — LP states not expected on CLK")
elif not lp11_regions:
2026-04-08 12:55:34 +01:00
warnings.append("No LP-11 state detected in capture window")
elif lp11_voltage_v is not None:
if lp11_voltage_v < LP11_SPEC_MIN_V:
warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V below spec min {LP11_SPEC_MIN_V} V")
if lp11_voltage_v > LP11_SPEC_MAX_V:
warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V above spec max {LP11_SPEC_MAX_V} V")
2026-04-08 14:19:31 +01:00
if lp11_to_hs_ns is not None and lp11_to_hs_ns < LP_LOW_DUR_MIN_NS:
2026-04-08 12:55:34 +01:00
warnings.append(
2026-04-08 14:19:31 +01:00
f"LP exit duration {lp11_to_hs_ns:.0f} ns below spec min {LP_LOW_DUR_MIN_NS:.0f} ns "
f"— LP-01/LP-00 states may be absent or too brief"
2026-04-08 12:55:34 +01:00
)
2026-04-08 14:19:31 +01:00
if not continuous_hs_clk:
if not lp_transition_valid:
warnings.append("LP-11 → LP-low → HS transition sequence not detected")
if n_hs_bursts == 0:
warnings.append("No HS bursts detected after LP transition")
2026-04-08 12:55:34 +01:00
return LPMetrics(
timestamp = timestamp,
capture_num = capture_num,
channel = channel,
sample_rate_gsps = round(sample_rate / 1e9, 1),
duration_us = round(duration_us, 2),
n_samples = len(times),
lp11_voltage_v = lp11_voltage_v,
lp11_duration_us = lp11_duration_us,
2026-04-08 14:19:31 +01:00
lp11_to_hs_ns = lp11_to_hs_ns,
2026-04-08 12:55:34 +01:00
lp_low_duration_ns = lp_low_duration_ns,
n_hs_bursts = n_hs_bursts,
hs_burst_dur_ns = hs_burst_dur_ns,
hs_amplitude_mv = hs_amplitude_mv,
lp_transition_valid = lp_transition_valid,
warnings = warnings,
)
if __name__ == "__main__":
import sys
data_dir = Path(__file__).parent / "data"
if len(sys.argv) > 1:
files = [Path(a) for a in sys.argv[1:]]
else:
files = sorted(data_dir.glob("*.csv"))[:8] # first 8 files as demo
for f in files:
try:
if "_lp_" in f.name:
result = analyze_lp_file(f)
else:
result = analyze_file(f)
print(result.summary())
print()
except Exception as e:
print(f"ERROR {f.name}: {e}")