""" csv_preprocessor.py Extracts MIPI HS-TX / LP state metrics from oscilloscope CSV files. File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv sig — high-res short window (320 GSa/s, ~20 ns) — rise/fall times Two columns: time_s, vdiff_v (F1/F2 differential, ±250 mV HS swing) proto — lower-res long window (20 GSa/s, ~10 µs) — jitter, frequency, amplitude Two columns: time_s, vdiff_v (F1/F2 differential) lp — LP state capture (~40 GSa/s, ~5 µs) — LP-11/LP-00/HS burst structure Two columns: time_s, voltage_v (Ch1 or Ch3 single-ended CLK+/DAT0+) Vertical range: −0.2 V to 1.4 V so LP-11 (~1.2 V) and LP-00 (~0 V) are visible. Trigger: falling edge at 0.6 V on CLK+ catches LP-11 → LP-01 SoT transition. """ import csv import re import numpy as np from dataclasses import dataclass, field from pathlib import Path from typing import Optional # MIPI D-PHY HS-TX spec limits HS_VDIFF_MIN_MV = 140.0 # |Vdiff| minimum (mV) HS_VDIFF_MAX_MV = 270.0 # |Vdiff| maximum (mV) RISE_FALL_MAX_PS = 500.0 # rise/fall time limit 20%–80% (ps) # Thresholds for "settled" vs "transitioning" TRANSITION_BAND_MV = 50.0 # |Vdiff| < this is considered a transition, not settled # MIPI D-PHY LP state thresholds (single-ended voltage, after probe compensation) LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (both pins high ~1.2 V) LP_LOW_V = 0.05 # V — single-ended voltage below this → LP-00 or LP-01 pin low LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS @dataclass class ChannelMetrics: timestamp: str capture_num: int file_type: str # "sig" | "proto" channel: str # "clk" | "dat" sample_rate_gsps: float duration_ns: float n_samples: int # HS-TX differential voltage vdiff_pos_mv: float # mean settled positive level (HS "1") vdiff_neg_mv: float # mean settled negative level (HS "0") vdiff_amplitude_mv: float # (|pos| + |neg|) / 2 — spec: 140–270 mV vcm_mv: float # (pos + neg) / 2 — common-mode offset # Timing (None when there are too few transitions to measure) clock_freq_mhz: Optional[float] = None jitter_pp_ps: Optional[float] = None jitter_rms_ps: Optional[float] = None rise_time_ps: Optional[float] = None fall_time_ps: Optional[float] = None n_transitions: int = 0 # Spec violations spec_violations: int = 0 # settled samples where |Vdiff| < HS_VDIFF_MIN_MV warnings: list = field(default_factory=list) def summary(self) -> str: ok = lambda cond: "✓" if cond else "✗" lines = [ f"Capture {self.capture_num:04d} {self.timestamp} [{self.file_type}/{self.channel}]", f" Vdiff amplitude : {self.vdiff_amplitude_mv:6.1f} mV " f"(spec 140–270 mV) {ok(HS_VDIFF_MIN_MV <= self.vdiff_amplitude_mv <= HS_VDIFF_MAX_MV)}", f" Vdiff pos/neg : +{self.vdiff_pos_mv:.1f} / {self.vdiff_neg_mv:.1f} mV", f" Common mode : {self.vcm_mv:+.1f} mV", ] if self.clock_freq_mhz is not None: lines.append( f" Clock freq : {self.clock_freq_mhz:.2f} MHz DDR " f"({self.n_transitions} transitions)" ) if self.jitter_pp_ps is not None: lines.append( f" Jitter p-p/RMS : {self.jitter_pp_ps:.1f} ps / {self.jitter_rms_ps:.1f} ps" ) if self.rise_time_ps is not None: lines.append( f" Rise time 20-80%: {self.rise_time_ps:.1f} ps " f"{ok(self.rise_time_ps <= RISE_FALL_MAX_PS)}" ) if self.fall_time_ps is not None: lines.append( f" Fall time 20-80%: {self.fall_time_ps:.1f} ps " f"{ok(self.fall_time_ps <= RISE_FALL_MAX_PS)}" ) if self.spec_violations: lines.append(f" Spec violations : {self.spec_violations} samples below {HS_VDIFF_MIN_MV:.0f} mV ✗") for w in self.warnings: lines.append(f" WARNING: {w}") return "\n".join(lines) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _read_csv(path: Path) -> tuple[np.ndarray, np.ndarray]: times, volts = [], [] with open(path) as f: for row in csv.reader(f): if len(row) >= 2: try: times.append(float(row[0])) volts.append(float(row[1])) except ValueError: pass # skip any header row return np.array(times, dtype=np.float64), np.array(volts, dtype=np.float64) def _zero_crossings(times: np.ndarray, volts: np.ndarray) -> np.ndarray: """Return array of linearly-interpolated zero-crossing times (seconds).""" signs = np.sign(volts) change = np.diff(signs) idx = np.where(change != 0)[0] ct = [] for i in idx: if signs[i] != 0 and signs[i + 1] != 0: frac = -volts[i] / (volts[i + 1] - volts[i]) ct.append(times[i] + frac * (times[i + 1] - times[i])) return np.array(ct) def _rise_fall_times(times: np.ndarray, volts: np.ndarray, v_high: float, v_low: float, window_samples: int = 60) -> tuple[list, list]: """ Measure 20%–80% rise and fall times around each zero crossing. Returns (rise_times_ps, fall_times_ps). """ v20 = v_low + 0.20 * (v_high - v_low) v80 = v_low + 0.80 * (v_high - v_low) signs = np.sign(volts) trans_idx = np.where(np.diff(signs) != 0)[0] rise_ps, fall_ps = [], [] for idx in trans_idx: s = max(0, idx - window_samples // 2) e = min(len(times), idx + window_samples // 2) tw = times[s:e] vw = volts[s:e] if len(vw) < 4: continue if volts[min(idx + 1, len(volts) - 1)] > volts[idx]: # rising edge # find where vw first crosses v20 (ascending) then v80 i20 = np.searchsorted(vw, v20) i80 = np.searchsorted(vw, v80) if 0 < i20 < len(tw) - 1 and 0 < i80 < len(tw) - 1 and i80 > i20: # interpolate each threshold t20 = np.interp(v20, vw[i20 - 1:i20 + 1], tw[i20 - 1:i20 + 1]) t80 = np.interp(v80, vw[i80 - 1:i80 + 1], tw[i80 - 1:i80 + 1]) rise_ps.append((t80 - t20) * 1e12) else: # falling edge # descending: reverse the window so searchsorted still works vw_r = vw[::-1] tw_r = tw[::-1] i80 = np.searchsorted(vw_r, v80) i20 = np.searchsorted(vw_r, v20) if 0 < i80 < len(tw_r) - 1 and 0 < i20 < len(tw_r) - 1 and i20 > i80: t80 = np.interp(v80, vw_r[i80 - 1:i80 + 1], tw_r[i80 - 1:i80 + 1]) t20 = np.interp(v20, vw_r[i20 - 1:i20 + 1], tw_r[i20 - 1:i20 + 1]) fall_ps.append((t20 - t80) * 1e12) return rise_ps, fall_ps # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def analyze_file(path: Path) -> ChannelMetrics: """ Analyse one oscilloscope CSV file and return a ChannelMetrics instance. """ m = re.match(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE) if not m: raise ValueError(f"Filename does not match expected pattern: {path.name}") timestamp, file_type, cap_str, channel = m.groups() if file_type == "lp": raise ValueError("Use analyze_lp_file() for lp-type files (single-ended)") capture_num = int(cap_str) times, volts = _read_csv(path) dt = float(np.diff(times).mean()) sample_rate = 1.0 / dt duration_ns = (float(times[-1]) - float(times[0])) * 1e9 # --- Voltage levels --- v_thresh = TRANSITION_BAND_MV / 1000.0 pos_mask = volts > v_thresh neg_mask = volts < -v_thresh vdiff_pos = float(volts[pos_mask].mean()) * 1000.0 if pos_mask.any() else 0.0 vdiff_neg = float(volts[neg_mask].mean()) * 1000.0 if neg_mask.any() else 0.0 # Classify signal coverage: # no_signal — neither polarity detected (LP state or idle) # one_sided — only one polarity in capture window (short sig window, uniform data) no_signal = (not pos_mask.any()) and (not neg_mask.any()) one_sided = (not no_signal) and ((not pos_mask.any()) or (not neg_mask.any())) if no_signal: amplitude = 0.0 elif one_sided: amplitude = max(abs(vdiff_pos), abs(vdiff_neg)) else: amplitude = (abs(vdiff_pos) + abs(vdiff_neg)) / 2.0 vcm = (vdiff_pos + vdiff_neg) / 2.0 # --- Zero crossings → frequency + jitter (CLK only) --- ct = _zero_crossings(times, volts) n_transitions = len(ct) clock_freq_mhz = jitter_pp_ps = jitter_rms_ps = None # Jitter / frequency are only meaningful on the CLK lane. # On DAT the bit pattern varies, so half-periods are not uniform by design. # Require at least 20 transitions (10 full cycles) for reliable jitter. # Sig files (~8 transitions) are too short; proto files (~4000) are fine. if channel == "clk" and n_transitions >= 20: half_periods = np.diff(ct) * 1e12 # ps med = float(np.median(half_periods)) sd = float(half_periods.std()) # Remove outliers beyond 3σ (spurious glitches) hp = half_periods[np.abs(half_periods - med) < 3.0 * sd] if sd > 0 else half_periods if len(hp) >= 20: clock_freq_mhz = round(1.0 / (float(np.median(hp)) * 2e-12) / 1e6, 2) jitter_pp_ps = round(float(hp.max() - hp.min()), 1) jitter_rms_ps = round(float(hp.std()), 1) # --- Rise / fall times --- v_high = vdiff_pos / 1000.0 v_low = vdiff_neg / 1000.0 rise_list, fall_list = _rise_fall_times(times, volts, v_high, v_low) rise_time_ps = round(float(np.median(rise_list)), 1) if rise_list else None fall_time_ps = round(float(np.median(fall_list)), 1) if fall_list else None # --- Spec violations --- # Only check samples that are well away from any zero crossing (bit-centres). # Transitions naturally pass through sub-140 mV, so counting them as violations # would be misleading. We mask out a ±guard window around each crossing. guard_s = float(np.median(np.diff(ct))) * 0.35 if n_transitions >= 4 else dt * 10 in_guard = np.zeros(len(times), dtype=bool) for t_cross in ct: lo = np.searchsorted(times, t_cross - guard_s) hi = np.searchsorted(times, t_cross + guard_s) in_guard[lo:hi] = True settled = (~in_guard) & (np.abs(volts) > v_thresh) # "Transient" violations: settled samples that dip noticeably below the # measured settled amplitude (threshold = 85 % of the smaller settled level). # This catches genuine dips without flagging cases where the settled level # itself is just marginally below spec (which is reported as a WARNING instead). floor_v = 0.85 * min(abs(vdiff_pos / 1000.0), abs(vdiff_neg / 1000.0)) if ( vdiff_pos and vdiff_neg) else HS_VDIFF_MIN_MV / 1000.0 spec_violations = int(np.sum(settled & (np.abs(volts) < floor_v))) # --- Warnings --- warnings = [] if no_signal: warnings.append("No HS signal detected — line may be in LP state or idle") elif one_sided: polarity = "positive" if pos_mask.any() else "negative" warnings.append( f"Only {polarity} swings in capture window — amplitude may be underestimated" ) if not no_signal and amplitude < HS_VDIFF_MIN_MV: warnings.append(f"Vdiff {amplitude:.0f} mV below spec min {HS_VDIFF_MIN_MV:.0f} mV") if amplitude > HS_VDIFF_MAX_MV: warnings.append(f"Vdiff {amplitude:.0f} mV above spec max {HS_VDIFF_MAX_MV:.0f} mV") if rise_time_ps is not None and rise_time_ps > RISE_FALL_MAX_PS: warnings.append(f"Rise time {rise_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps") if fall_time_ps is not None and fall_time_ps > RISE_FALL_MAX_PS: warnings.append(f"Fall time {fall_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps") if spec_violations > 0: warnings.append(f"{spec_violations} settled samples below {HS_VDIFF_MIN_MV:.0f} mV") return ChannelMetrics( timestamp = timestamp, capture_num = capture_num, file_type = file_type, channel = channel, sample_rate_gsps = round(sample_rate / 1e9, 1), duration_ns = round(duration_ns, 2), n_samples = len(times), vdiff_pos_mv = round(vdiff_pos, 1), vdiff_neg_mv = round(vdiff_neg, 1), vdiff_amplitude_mv = round(amplitude, 1), vcm_mv = round(vcm, 1), clock_freq_mhz = clock_freq_mhz, jitter_pp_ps = jitter_pp_ps, jitter_rms_ps = jitter_rms_ps, rise_time_ps = rise_time_ps, fall_time_ps = fall_time_ps, n_transitions = n_transitions, spec_violations = spec_violations, warnings = warnings, ) def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]: """ Scan data_dir and group CSV files by (timestamp, capture_number). Returns dict mapping (timestamp, num) → {file_type_channel: Path}. Example key: ("20260408_111448", 1) Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...} """ pattern = re.compile(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv", re.IGNORECASE) groups: dict[tuple[str, int], dict[str, Path]] = {} for f in sorted(data_dir.glob("*.csv")): m = pattern.match(f.name) if not m: continue ts, ftype, cap_str, ch = m.groups() key = (ts, int(cap_str)) groups.setdefault(key, {})[f"{ftype}_{ch}"] = f return groups # --------------------------------------------------------------------------- # LP state analysis (lp_clk / lp_dat — single-ended Ch1 / Ch3 captures) # --------------------------------------------------------------------------- @dataclass class LPMetrics: timestamp: str capture_num: int channel: str # "clk" | "dat" sample_rate_gsps: float duration_us: float n_samples: int # LP-11 (both pins high ~1.2 V) lp11_voltage_v: Optional[float] # mean level in LP-11 region (spec 1.0–1.45 V) lp11_duration_us: Optional[float] # total LP-11 time in capture (pre-trigger) # LP-low (LP-01 + LP-00 combined — CLK+ = 0 V in both states) lp_low_duration_ns: Optional[float] # duration between LP-11 end and HS start # HS bursts detected within the window n_hs_bursts: int hs_burst_dur_ns: Optional[float] # mean HS burst duration hs_amplitude_mv: Optional[float] # peak-to-peak single-ended HS swing (mV) lp_transition_valid: bool # LP-11 → LP-low → HS sequence present warnings: list = field(default_factory=list) def summary(self) -> str: ok = lambda c: "✓" if c else "✗" lines = [ f"Capture {self.capture_num:04d} {self.timestamp} [lp/{self.channel}]", ] if self.lp11_voltage_v is not None: in_spec = LP11_SPEC_MIN_V <= self.lp11_voltage_v <= LP11_SPEC_MAX_V lines.append( f" LP-11 voltage : {self.lp11_voltage_v:.3f} V " f"(spec {LP11_SPEC_MIN_V:.1f}–{LP11_SPEC_MAX_V:.2f} V) {ok(in_spec)}" ) if self.lp11_duration_us is not None: lines.append(f" LP-11 duration : {self.lp11_duration_us:.2f} µs") if self.lp_low_duration_ns is not None: ok_lp = self.lp_low_duration_ns >= LP_LOW_DUR_MIN_NS lines.append( f" LP-low duration : {self.lp_low_duration_ns:.0f} ns " f"(spec ≥{LP_LOW_DUR_MIN_NS:.0f} ns) {ok(ok_lp)}" ) lines.append( f" LP→HS sequence : {'valid ✓' if self.lp_transition_valid else 'NOT DETECTED ✗'}" ) if self.n_hs_bursts: lines.append(f" HS bursts : {self.n_hs_bursts}" + (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else "")) if self.hs_amplitude_mv is not None: lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)") for w in self.warnings: lines.append(f" WARNING: {w}") return "\n".join(lines) def _rolling_std(arr: np.ndarray, window: int) -> np.ndarray: """Compute rolling standard deviation using stride_tricks (O(n) memory, fast).""" from numpy.lib.stride_tricks import sliding_window_view n = len(arr) if n <= window: return np.full(n, arr.std()) windowed = sliding_window_view(arr, window) stds = windowed.std(axis=1) # Pad edges to maintain original length pad_l = window // 2 pad_r = n - len(stds) - pad_l return np.concatenate([np.full(pad_l, stds[0]), stds, np.full(pad_r, stds[-1])]) def _find_contiguous_regions(mask: np.ndarray, min_samples: int = 5): """Return list of (start_idx, end_idx) for True runs ≥ min_samples long.""" padded = np.concatenate([[False], mask, [False]]) diff = np.diff(padded.astype(np.int8)) starts = np.where(diff == 1)[0] ends = np.where(diff == -1)[0] return [(s, e) for s, e in zip(starts, ends) if (e - s) >= min_samples] def analyze_lp_file(path: Path) -> "LPMetrics": """ Analyse a single-ended LP capture CSV (Ch1 or Ch3) and return LPMetrics. State classification per sample: LP-11 : voltage > LP11_HIGH_V (~1.2 V, both pins high) LP-low : voltage < LP_LOW_V (~0 V, pin driven low — LP-01 or LP-00) HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V) trans : everything else (transitions between states) """ m = re.match(r"(\d{8}_\d{6})_lp_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE) if not m: raise ValueError(f"Filename does not match lp pattern: {path.name}") timestamp, cap_str, channel = m.groups() capture_num = int(cap_str) times, volts = _read_csv(path) dt = float(np.diff(times).mean()) sample_rate = 1.0 / dt duration_us = (float(times[-1]) - float(times[0])) * 1e6 # ── State classification ────────────────────────────────────────────── # Rolling std over ~1 ns window to detect HS oscillation window = max(10, int(1e-9 / dt)) rstd = _rolling_std(volts, window) lp11_mask = volts > LP11_HIGH_V lp_low_mask = (volts < LP_LOW_V) & (rstd < HS_OSC_STD_V) hs_mask = (~lp11_mask) & (~lp_low_mask) & (rstd >= HS_OSC_STD_V) # ── LP-11 region ────────────────────────────────────────────────────── lp11_regions = _find_contiguous_regions(lp11_mask, min_samples=10) lp11_voltage_v = None lp11_duration_us = None if lp11_regions: lp11_voltage_v = round(float(np.concatenate( [volts[s:e] for s, e in lp11_regions]).mean()), 3) lp11_duration_us = round( sum((times[e] - times[s]) for s, e in lp11_regions) * 1e6, 3) # ── LP-low region (between last LP-11 and first HS) ─────────────────── lp_low_duration_ns = None lp_transition_valid = False lp_low_regions = _find_contiguous_regions(lp_low_mask, min_samples=5) hs_regions = _find_contiguous_regions(hs_mask, min_samples=20) if lp11_regions and lp_low_regions and hs_regions: # Find the LP-low gap that sits between the last LP-11 and the first HS burst last_lp11_end = lp11_regions[-1][1] first_hs_start = hs_regions[0][0] bridging = [(s, e) for s, e in lp_low_regions if s >= last_lp11_end and e <= first_hs_start + int(100e-9 / dt)] if bridging: s0, e0 = bridging[0][0], bridging[-1][1] lp_low_duration_ns = round((times[e0] - times[s0]) * 1e9, 1) lp_transition_valid = True # ── HS burst metrics ────────────────────────────────────────────────── n_hs_bursts = len(hs_regions) hs_burst_dur_ns = None hs_amplitude_mv = None if hs_regions: durations = [(times[e] - times[s]) * 1e9 for s, e in hs_regions] hs_burst_dur_ns = round(float(np.mean(durations)), 1) # HS single-ended amplitude: peak-to-peak / 2 of the oscillating signal hs_volts = np.concatenate([volts[s:e] for s, e in hs_regions]) hs_amplitude_mv = round( (float(np.percentile(hs_volts, 95)) - float(np.percentile(hs_volts, 5))) / 2 * 1000, 1 ) # ── Warnings ───────────────────────────────────────────────────────── warnings = [] if not lp11_regions: warnings.append("No LP-11 state detected in capture window") elif lp11_voltage_v is not None: if lp11_voltage_v < LP11_SPEC_MIN_V: warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V below spec min {LP11_SPEC_MIN_V} V") if lp11_voltage_v > LP11_SPEC_MAX_V: warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V above spec max {LP11_SPEC_MAX_V} V") if lp_low_duration_ns is not None and lp_low_duration_ns < LP_LOW_DUR_MIN_NS: warnings.append( f"LP-low duration {lp_low_duration_ns:.0f} ns below spec min {LP_LOW_DUR_MIN_NS:.0f} ns" ) if not lp_transition_valid: warnings.append("LP-11 → LP-low → HS transition sequence not detected") if n_hs_bursts == 0: warnings.append("No HS bursts detected after LP transition") return LPMetrics( timestamp = timestamp, capture_num = capture_num, channel = channel, sample_rate_gsps = round(sample_rate / 1e9, 1), duration_us = round(duration_us, 2), n_samples = len(times), lp11_voltage_v = lp11_voltage_v, lp11_duration_us = lp11_duration_us, lp_low_duration_ns = lp_low_duration_ns, n_hs_bursts = n_hs_bursts, hs_burst_dur_ns = hs_burst_dur_ns, hs_amplitude_mv = hs_amplitude_mv, lp_transition_valid = lp_transition_valid, warnings = warnings, ) if __name__ == "__main__": import sys data_dir = Path(__file__).parent / "data" if len(sys.argv) > 1: files = [Path(a) for a in sys.argv[1:]] else: files = sorted(data_dir.glob("*.csv"))[:8] # first 8 files as demo for f in files: try: if "_lp_" in f.name: result = analyze_lp_file(f) else: result = analyze_file(f) print(result.summary()) print() except Exception as e: print(f"ERROR {f.name}: {e}")