Files
MiPi_TEST/csv_preprocessor.py
david rice c196a2a29c updates
2026-04-17 11:27:41 +01:00

923 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
csv_preprocessor.py
Extracts MIPI HS-TX / LP state metrics from oscilloscope CSV files.
File naming convention: YYYYMMDD_HHMMSS_{sig|proto|lp}_{NNNN}_{clk|dat}.csv
sig — high-res short window (320 GSa/s, ~20 ns) — rise/fall times
Two columns: time_s, vdiff_v (F1/F2 differential, ±250 mV HS swing)
proto — lower-res long window (20 GSa/s, ~10 µs) — jitter, frequency, amplitude
Two columns: time_s, vdiff_v (F1/F2 differential)
lp — LP state capture (~40 GSa/s, ~5 µs) — LP-11/LP-00/HS burst structure
Two columns: time_s, voltage_v (Ch1 or Ch3 single-ended CLK+/DAT0+)
Vertical range: 0.2 V to 1.4 V so LP-11 (~1.2 V) and LP-00 (~0 V) are visible.
Trigger: falling edge at 0.6 V on CLK+ catches LP-11 → LP-01 SoT transition.
"""
import csv
import json
import re
import numpy as np
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# 1.8 V supply rail spec (i.MX 8M Mini internal regulator, ±5 %)
V18_NOMINAL_V = 1.800
V18_SPEC_MIN_V = 1.710 # 5 %
V18_SPEC_MAX_V = 1.890 # +5 %
V18_DROOP_WARN_MV = 50.0 # mV droop depth worth flagging
V18_RIPPLE_WARN_MV = 20.0 # mV RMS ripple worth flagging
# MIPI D-PHY HS-TX spec limits
HS_VDIFF_MIN_MV = 140.0 # |Vdiff| minimum (mV)
HS_VDIFF_MAX_MV = 270.0 # |Vdiff| maximum (mV)
RISE_FALL_MAX_PS = 500.0 # rise/fall time limit 20%80% (ps)
# Thresholds for "settled" vs "transitioning"
TRANSITION_BAND_MV = 50.0 # |Vdiff| < this is considered a transition, not settled
# MIPI D-PHY LP state thresholds (single-ended voltage, after probe compensation)
LP11_HIGH_V = 0.8 # V — single-ended voltage above this → LP-11 (both pins high ~1.2 V)
LP_LOW_V = 0.25 # V — single-ended voltage below this → LP-00 or LP-01 pin low
# Note: probe loading can shift LP-low from true 0 V to ~100 mV; 0.25 V clears that offset
# The rolling-std gate (HS_OSC_STD_V) prevents HS minima near 0 V being called LP-low.
LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
# Flicker detection thresholds
# LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk
FLICKER_LP_LOW_MAX_NS = 50.0 # ns
# HS burst amplitude below this (single-ended p-p / 2, mV) → HS burst absent after LP transition.
# On this hardware normal HS = 105122 mV; confirmed flicker = 1432 mV (DC / LP-11 recovery).
# Captures where LP-01/LP-00 completed normally but the bridge never entered HS mode show
# essentially zero amplitude (the burst window is DC LP-11), so lp_low alone cannot detect this.
HS_BURST_AMPLITUDE_MIN_MV = 50.0 # mV — below this, no real HS burst is present
@dataclass
class ChannelMetrics:
timestamp: str
capture_num: int
file_type: str # "sig" | "proto"
channel: str # "clk" | "dat"
sample_rate_gsps: float
duration_ns: float
n_samples: int
# HS-TX differential voltage
vdiff_pos_mv: float # mean settled positive level (HS "1")
vdiff_neg_mv: float # mean settled negative level (HS "0")
vdiff_amplitude_mv: float # (|pos| + |neg|) / 2 — spec: 140270 mV
vcm_mv: float # (pos + neg) / 2 — common-mode offset
# Timing (None when there are too few transitions to measure)
clock_freq_mhz: Optional[float] = None
jitter_pp_ps: Optional[float] = None
jitter_rms_ps: Optional[float] = None
rise_time_ps: Optional[float] = None
fall_time_ps: Optional[float] = None
n_transitions: int = 0
# Spec violations
spec_violations: int = 0 # settled samples where |Vdiff| < HS_VDIFF_MIN_MV
warnings: list = field(default_factory=list)
def summary(self) -> str:
ok = lambda cond: "" if cond else ""
lines = [
f"Capture {self.capture_num:04d} {self.timestamp} [{self.file_type}/{self.channel}]",
f" Vdiff amplitude : {self.vdiff_amplitude_mv:6.1f} mV "
f"(spec 140270 mV) {ok(HS_VDIFF_MIN_MV <= self.vdiff_amplitude_mv <= HS_VDIFF_MAX_MV)}",
f" Vdiff pos/neg : +{self.vdiff_pos_mv:.1f} / {self.vdiff_neg_mv:.1f} mV",
f" Common mode : {self.vcm_mv:+.1f} mV",
]
if self.clock_freq_mhz is not None:
lines.append(
f" Clock freq : {self.clock_freq_mhz:.2f} MHz DDR "
f"({self.n_transitions} transitions)"
)
if self.jitter_pp_ps is not None:
lines.append(
f" Jitter p-p/RMS : {self.jitter_pp_ps:.1f} ps / {self.jitter_rms_ps:.1f} ps"
)
if self.rise_time_ps is not None:
lines.append(
f" Rise time 20-80%: {self.rise_time_ps:.1f} ps "
f"{ok(self.rise_time_ps <= RISE_FALL_MAX_PS)}"
)
if self.fall_time_ps is not None:
lines.append(
f" Fall time 20-80%: {self.fall_time_ps:.1f} ps "
f"{ok(self.fall_time_ps <= RISE_FALL_MAX_PS)}"
)
if self.spec_violations:
lines.append(f" Spec violations : {self.spec_violations} samples below {HS_VDIFF_MIN_MV:.0f} mV ✗")
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _read_csv(path: Path) -> tuple[np.ndarray, np.ndarray]:
times, volts = [], []
with open(path) as f:
for row in csv.reader(f):
if len(row) >= 2:
try:
times.append(float(row[0]))
volts.append(float(row[1]))
except ValueError:
pass # skip any header row
t = np.array(times, dtype=np.float64)
v = np.array(volts, dtype=np.float64)
if len(t) < 2:
raise ValueError(f"Insufficient samples in {path.name} ({len(t)} rows parsed)")
return t, v
def _zero_crossings(times: np.ndarray, volts: np.ndarray) -> np.ndarray:
"""Return array of linearly-interpolated zero-crossing times (seconds)."""
signs = np.sign(volts)
change = np.diff(signs)
idx = np.where(change != 0)[0]
ct = []
for i in idx:
if signs[i] != 0 and signs[i + 1] != 0:
frac = -volts[i] / (volts[i + 1] - volts[i])
ct.append(times[i] + frac * (times[i + 1] - times[i]))
return np.array(ct)
def _rise_fall_times(times: np.ndarray, volts: np.ndarray,
v_high: float, v_low: float,
window_samples: int = 60) -> tuple[list, list]:
"""
Measure 20%80% rise and fall times around each zero crossing.
Returns (rise_times_ps, fall_times_ps).
"""
v20 = v_low + 0.20 * (v_high - v_low)
v80 = v_low + 0.80 * (v_high - v_low)
signs = np.sign(volts)
trans_idx = np.where(np.diff(signs) != 0)[0]
rise_ps, fall_ps = [], []
for idx in trans_idx:
s = max(0, idx - window_samples // 2)
e = min(len(times), idx + window_samples // 2)
tw = times[s:e]
vw = volts[s:e]
if len(vw) < 4:
continue
if volts[min(idx + 1, len(volts) - 1)] > volts[idx]: # rising edge
# find where vw first crosses v20 (ascending) then v80
i20 = np.searchsorted(vw, v20)
i80 = np.searchsorted(vw, v80)
if 0 < i20 < len(tw) - 1 and 0 < i80 < len(tw) - 1 and i80 > i20:
# interpolate each threshold
t20 = np.interp(v20, vw[i20 - 1:i20 + 1], tw[i20 - 1:i20 + 1])
t80 = np.interp(v80, vw[i80 - 1:i80 + 1], tw[i80 - 1:i80 + 1])
rise_ps.append((t80 - t20) * 1e12)
else: # falling edge
# descending: reverse the window so searchsorted still works
vw_r = vw[::-1]
tw_r = tw[::-1]
i80 = np.searchsorted(vw_r, v80)
i20 = np.searchsorted(vw_r, v20)
if 0 < i80 < len(tw_r) - 1 and 0 < i20 < len(tw_r) - 1 and i20 > i80:
t80 = np.interp(v80, vw_r[i80 - 1:i80 + 1], tw_r[i80 - 1:i80 + 1])
t20 = np.interp(v20, vw_r[i20 - 1:i20 + 1], tw_r[i20 - 1:i20 + 1])
fall_ps.append((t20 - t80) * 1e12)
return rise_ps, fall_ps
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def analyze_file(path: Path) -> ChannelMetrics:
"""
Analyse one oscilloscope CSV file and return a ChannelMetrics instance.
"""
m = re.match(r"(\d{8}_\d{6})_(sig|proto|lp)_(\d+)_(clk|dat)\.csv",
path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match expected pattern: {path.name}")
timestamp, file_type, cap_str, channel = m.groups()
if file_type == "lp":
raise ValueError("Use analyze_lp_file() for lp-type files (single-ended)")
capture_num = int(cap_str)
times, volts = _read_csv(path)
dt = float(np.diff(times).mean())
sample_rate = 1.0 / dt
duration_ns = (float(times[-1]) - float(times[0])) * 1e9
# --- Voltage levels ---
v_thresh = TRANSITION_BAND_MV / 1000.0
pos_mask = volts > v_thresh
neg_mask = volts < -v_thresh
vdiff_pos = float(volts[pos_mask].mean()) * 1000.0 if pos_mask.any() else 0.0
vdiff_neg = float(volts[neg_mask].mean()) * 1000.0 if neg_mask.any() else 0.0
# Classify signal coverage:
# no_signal — neither polarity detected (LP state or idle)
# one_sided — only one polarity in capture window (short sig window, uniform data)
no_signal = (not pos_mask.any()) and (not neg_mask.any())
one_sided = (not no_signal) and ((not pos_mask.any()) or (not neg_mask.any()))
if no_signal:
amplitude = 0.0
elif one_sided:
amplitude = max(abs(vdiff_pos), abs(vdiff_neg))
else:
amplitude = (abs(vdiff_pos) + abs(vdiff_neg)) / 2.0
vcm = (vdiff_pos + vdiff_neg) / 2.0
# --- Zero crossings → frequency + jitter (CLK only) ---
ct = _zero_crossings(times, volts)
n_transitions = len(ct)
clock_freq_mhz = jitter_pp_ps = jitter_rms_ps = None
# Jitter / frequency are only meaningful on the CLK lane.
# On DAT the bit pattern varies, so half-periods are not uniform by design.
# Require at least 20 transitions (10 full cycles) for reliable jitter.
# Sig files (~8 transitions) are too short; proto files (~4000) are fine.
if channel == "clk" and n_transitions >= 20:
half_periods = np.diff(ct) * 1e12 # ps
med = float(np.median(half_periods))
sd = float(half_periods.std())
# Remove outliers beyond 3σ (spurious glitches)
hp = half_periods[np.abs(half_periods - med) < 3.0 * sd] if sd > 0 else half_periods
if len(hp) >= 20:
clock_freq_mhz = round(1.0 / (float(np.median(hp)) * 2e-12) / 1e6, 2)
jitter_pp_ps = round(float(hp.max() - hp.min()), 1)
jitter_rms_ps = round(float(hp.std()), 1)
# --- Rise / fall times ---
v_high = vdiff_pos / 1000.0
v_low = vdiff_neg / 1000.0
rise_list, fall_list = _rise_fall_times(times, volts, v_high, v_low)
rise_time_ps = round(float(np.median(rise_list)), 1) if rise_list else None
fall_time_ps = round(float(np.median(fall_list)), 1) if fall_list else None
# --- Spec violations ---
# Only check samples that are well away from any zero crossing (bit-centres).
# Transitions naturally pass through sub-140 mV, so counting them as violations
# would be misleading. We mask out a ±guard window around each crossing.
guard_s = float(np.median(np.diff(ct))) * 0.35 if n_transitions >= 4 else dt * 10
in_guard = np.zeros(len(times), dtype=bool)
for t_cross in ct:
lo = np.searchsorted(times, t_cross - guard_s)
hi = np.searchsorted(times, t_cross + guard_s)
in_guard[lo:hi] = True
settled = (~in_guard) & (np.abs(volts) > v_thresh)
# "Transient" violations: settled samples that dip noticeably below the
# measured settled amplitude (threshold = 85 % of the smaller settled level).
# This catches genuine dips without flagging cases where the settled level
# itself is just marginally below spec (which is reported as a WARNING instead).
floor_v = 0.85 * min(abs(vdiff_pos / 1000.0), abs(vdiff_neg / 1000.0)) if (
vdiff_pos and vdiff_neg) else HS_VDIFF_MIN_MV / 1000.0
spec_violations = int(np.sum(settled & (np.abs(volts) < floor_v)))
# --- Warnings ---
warnings = []
if no_signal:
warnings.append("No HS signal detected — line may be in LP state or idle")
elif one_sided:
polarity = "positive" if pos_mask.any() else "negative"
warnings.append(
f"Only {polarity} swings in capture window — amplitude may be underestimated"
)
if not no_signal and amplitude < HS_VDIFF_MIN_MV:
warnings.append(f"Vdiff {amplitude:.0f} mV below spec min {HS_VDIFF_MIN_MV:.0f} mV")
if amplitude > HS_VDIFF_MAX_MV:
warnings.append(f"Vdiff {amplitude:.0f} mV above spec max {HS_VDIFF_MAX_MV:.0f} mV")
if rise_time_ps is not None and rise_time_ps > RISE_FALL_MAX_PS:
warnings.append(f"Rise time {rise_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps")
if fall_time_ps is not None and fall_time_ps > RISE_FALL_MAX_PS:
warnings.append(f"Fall time {fall_time_ps:.0f} ps exceeds {RISE_FALL_MAX_PS:.0f} ps")
if spec_violations > 0:
warnings.append(f"{spec_violations} settled samples below {HS_VDIFF_MIN_MV:.0f} mV")
return ChannelMetrics(
timestamp = timestamp,
capture_num = capture_num,
file_type = file_type,
channel = channel,
sample_rate_gsps = round(sample_rate / 1e9, 1),
duration_ns = round(duration_ns, 2),
n_samples = len(times),
vdiff_pos_mv = round(vdiff_pos, 1),
vdiff_neg_mv = round(vdiff_neg, 1),
vdiff_amplitude_mv = round(amplitude, 1),
vcm_mv = round(vcm, 1),
clock_freq_mhz = clock_freq_mhz,
jitter_pp_ps = jitter_pp_ps,
jitter_rms_ps = jitter_rms_ps,
rise_time_ps = rise_time_ps,
fall_time_ps = fall_time_ps,
n_transitions = n_transitions,
spec_violations = spec_violations,
warnings = warnings,
)
@dataclass
class V1V8Metrics:
timestamp: str
capture_num: int
sample_rate_mhz: float
duration_us: float
n_samples: int
mean_v: float # mean supply voltage
min_v: float # minimum (worst-case droop)
max_v: float # maximum
droop_mv: float # mean min (droop depth)
ripple_mv_rms: float # AC ripple (std dev of voltage)
spec_pass: bool # mean within ±5 % of 1.8 V
droop_pass: bool # minimum above V18_SPEC_MIN_V
warnings: list = field(default_factory=list)
def summary(self) -> str:
ok = lambda c: "" if c else ""
lines = [
f"Capture {self.capture_num:04d} {self.timestamp} [pwr/1v8]",
f" Mean voltage : {self.mean_v:.4f} V "
f"(spec {V18_SPEC_MIN_V:.2f}{V18_SPEC_MAX_V:.2f} V) {ok(self.spec_pass)}",
f" Min voltage : {self.min_v:.4f} V {ok(self.droop_pass)}",
f" Droop depth : {self.droop_mv:.1f} mV",
f" Ripple RMS : {self.ripple_mv_rms:.2f} mV",
]
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
def analyze_1v8_file(path: Path) -> "V1V8Metrics":
"""Analyse a 1.8 V supply rail CSV captured by the Rigol DS1202Z-E."""
m = re.match(r"(\d{8}_\d{6})_pwr_(\d+)_1v8\.csv", path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match 1v8 pattern: {path.name}")
timestamp, cap_str = m.groups()
capture_num = int(cap_str)
times, volts = _read_csv(path)
dt = float(np.diff(times).mean())
sample_rate = 1.0 / dt
duration_us = (float(times[-1]) - float(times[0])) * 1e6
mean_v = float(volts.mean())
min_v = float(volts.min())
max_v = float(volts.max())
droop_mv = (mean_v - min_v) * 1000.0
ripple_mv_rms = float(volts.std()) * 1000.0
spec_pass = V18_SPEC_MIN_V <= mean_v <= V18_SPEC_MAX_V
droop_pass = min_v >= V18_SPEC_MIN_V
warnings = []
if not spec_pass:
warnings.append(
f"Mean supply {mean_v:.4f} V outside spec "
f"({V18_SPEC_MIN_V:.2f}{V18_SPEC_MAX_V:.2f} V)"
)
if not droop_pass:
warnings.append(
f"Supply droops to {min_v:.4f} V — below {V18_SPEC_MIN_V:.2f} V spec min"
)
if droop_mv > V18_DROOP_WARN_MV:
warnings.append(
f"Droop depth {droop_mv:.1f} mV — possible insufficient decoupling near MIPI PHY"
)
if ripple_mv_rms > V18_RIPPLE_WARN_MV:
warnings.append(f"Ripple {ripple_mv_rms:.1f} mV RMS is elevated")
return V1V8Metrics(
timestamp = timestamp,
capture_num = capture_num,
sample_rate_mhz = round(sample_rate / 1e6, 1),
duration_us = round(duration_us, 2),
n_samples = len(times),
mean_v = round(mean_v, 4),
min_v = round(min_v, 4),
max_v = round(max_v, 4),
droop_mv = round(droop_mv, 1),
ripple_mv_rms = round(ripple_mv_rms, 2),
spec_pass = spec_pass,
droop_pass = droop_pass,
warnings = warnings,
)
# ---------------------------------------------------------------------------
# DSIM PHY timing register decoder (D-PHY v1.1 Table 14 @ 432 Mbit/s, 54 MHz byte clock)
# ---------------------------------------------------------------------------
# Byte-clock period used to convert register fields (in byte-clock units) to nanoseconds.
# 54 MHz byte clock → 18.518 ns per byte clock.
_DSIM_BYTE_PERIOD_NS = 18.518
# Per-field decode table. Key = lowest 2 hex digits of register address.
# Each entry: (field_name, bit_shift, byte_mask, spec)
# spec = ("min", ns) — field_ns must be ≥ ns
# ("range", lo, hi) — field_ns must be lo ≤ x ≤ hi
# None — not individually checked (part of a combined check only)
_DSIM_PHY_FIELDS: dict[str, list] = {
"b4": [ # PHYTIMING 0x32e100b4
("TLPX", 8, 0xFF, ("min", 50.0)),
("THS_EXIT", 0, 0xFF, ("min", 100.0)),
],
"b8": [ # PHYTIMING1 0x32e100b8
("TCLK_PREPARE", 24, 0xFF, ("range", 38.0, 95.0)),
("TCLK_ZERO", 16, 0xFF, None), # combined with TCLK_PREPARE ≥ 300 ns
("TCLK_POST", 8, 0xFF, ("min", 180.4)),
("TCLK_TRAIL", 0, 0xFF, ("min", 60.0)),
],
"bc": [ # PHYTIMING2 0x32e100bc
# Field order verified against kernel logs (samsung_dsim_set_phy_ctrl):
# [23:16]=THS_PREPARE, [15:8]=THS_ZERO, [7:0]=THS_TRAIL
("THS_PREPARE", 16, 0xFF, ("range", 49.3, 98.9)),
("THS_ZERO", 8, 0xFF, None), # combined with THS_PREPARE ≥ 168.2 ns
("THS_TRAIL", 0, 0xFF, ("min", 69.3)),
],
}
# Combined (sum) checks applied after individual field decoding.
# (field_a, field_b, min_ns, label)
_DSIM_COMBINED_CHECKS = [
("TCLK_PREPARE", "TCLK_ZERO", 300.0, "TCLK_PREPARE+TCLK_ZERO"),
("THS_PREPARE", "THS_ZERO", 168.2, "THS_PREPARE+THS_ZERO"),
]
def _decode_dsim_registers(registers: list) -> list[str]:
"""
Decode DSIM PHY timing registers and return a list of annotated strings,
one per field, with D-PHY v1.1 spec compliance check results.
"""
ok = lambda c: "" if c else "✗ VIOLATION"
lines = []
field_ns: dict[str, float] = {}
for reg in registers:
addr_str = reg.get("address", "").lower().lstrip("0x")
val_str = reg.get("value", "0x0").lower()
suffix = addr_str[-2:] if len(addr_str) >= 2 else ""
fields = _DSIM_PHY_FIELDS.get(suffix)
if fields is None:
continue # register not in our decode table
try:
val = int(val_str, 16)
except ValueError:
lines.append(f" {reg.get('address')} : {reg.get('value')} (parse error)")
continue
reg_name = reg.get("name") or f"0x{addr_str}"
lines.append(f" {reg.get('address')} ({reg_name}) = {val_str}")
for (fname, shift, mask, spec) in fields:
raw = (val >> shift) & mask
ns = raw * _DSIM_BYTE_PERIOD_NS
field_ns[fname] = ns
if spec is None:
# shown in combined check only
lines.append(f" {fname:<16s} = {raw:3d} bc → {ns:6.1f} ns (combined check below)")
elif spec[0] == "min":
pass_check = ns >= spec[1]
lines.append(
f" {fname:<16s} = {raw:3d} bc → {ns:6.1f} ns "
f"(spec ≥ {spec[1]:.1f} ns) {ok(pass_check)}"
)
elif spec[0] == "range":
pass_check = spec[1] <= ns <= spec[2]
lines.append(
f" {fname:<16s} = {raw:3d} bc → {ns:6.1f} ns "
f"(spec {spec[1]:.1f}{spec[2]:.1f} ns) {ok(pass_check)}"
)
# Combined sum checks
for (fa, fb, min_ns, label) in _DSIM_COMBINED_CHECKS:
if fa in field_ns and fb in field_ns:
total = field_ns[fa] + field_ns[fb]
pass_check = total >= min_ns
lines.append(
f" {label:<28s} = {total:6.1f} ns (spec ≥ {min_ns:.1f} ns) {ok(pass_check)}"
)
return lines
@dataclass
class RegDump:
"""DSI controller register snapshot read from device via memtool."""
timestamp: str
capture_num: int
commands: list # list of memtool command strings that were run
registers: list # [{"address": "0x...", "value": "0x...", "name": "..."}, ...]
errors: list # any device-side errors
def summary(self) -> str:
lines = [f"Capture {self.capture_num:04d} {self.timestamp} [reg/dsi_phy]"]
if self.errors:
for err in self.errors:
lines.append(f" WARNING: {err}")
if not self.registers:
lines.append(" No registers captured")
return "\n".join(lines)
lines.append(f" Commands : {'; '.join(self.commands)}")
decoded = _decode_dsim_registers(self.registers)
if decoded:
lines.extend(decoded)
else:
# Fallback: raw hex dump if no addresses matched decode table
for r in self.registers:
name = f" ({r['name']})" if r.get("name") else ""
lines.append(f" {r['address']} : {r['value']}{name}")
return "\n".join(lines)
def analyze_reg_file(path: Path) -> "RegDump":
"""Read a register JSON file saved by mipi_test._fetch_registers()."""
m = re.match(r"(\d{8}_\d{6})_reg_(\d+)\.json", path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match register pattern: {path.name}")
timestamp, cap_str = m.groups()
data = json.loads(path.read_text())
return RegDump(
timestamp = timestamp,
capture_num = int(cap_str),
commands = data.get("commands", []),
registers = data.get("registers", []),
errors = data.get("errors") or [],
)
def group_captures(data_dir: Path) -> dict[tuple[str, int], dict[str, Path]]:
"""
Scan data_dir and group CSV files by (timestamp, capture_number).
Returns dict mapping (timestamp, num) → {file_type_channel: Path}.
Example key: ("20260408_111448", 1)
Example value: {"sig_clk": Path(...), "sig_dat": ..., "proto_clk": ..., "proto_dat": ...}
"""
csv_pattern = re.compile(
r"(\d{8}_\d{6})_(sig|proto|lp|pwr)_(\d+)_(clk|dat|1v8)\.csv", re.IGNORECASE
)
reg_pattern = re.compile(
r"(\d{8}_\d{6})_reg_(\d+)\.json", re.IGNORECASE
)
groups: dict[tuple[str, int], dict[str, Path]] = {}
for f in sorted(data_dir.glob("*.csv")):
m = csv_pattern.match(f.name)
if not m:
continue
ts, ftype, cap_str, ch = m.groups()
key = (ts, int(cap_str))
groups.setdefault(key, {})[f"{ftype}_{ch}"] = f
for f in sorted(data_dir.glob("*.json")):
m = reg_pattern.match(f.name)
if not m:
continue
ts, cap_str = m.groups()
key = (ts, int(cap_str))
groups.setdefault(key, {})["reg"] = f
return groups
# ---------------------------------------------------------------------------
# LP state analysis (lp_clk / lp_dat — single-ended Ch1 / Ch3 captures)
# ---------------------------------------------------------------------------
@dataclass
class LPMetrics:
timestamp: str
capture_num: int
channel: str # "clk" | "dat"
sample_rate_gsps: float
duration_us: float
n_samples: int
# LP-11 (both pins high ~1.2 V)
lp11_voltage_v: Optional[float] # mean level in LP-11 region (spec 1.01.45 V)
lp11_duration_us: Optional[float] # total LP-11 time in capture (pre-trigger)
# LP exit: gap between LP-11 falling edge and HS oscillation onset
lp11_to_hs_ns: Optional[float] # total LP exit time LP-11→HS (includes LP-01+LP-00)
lp_low_duration_ns: Optional[float] # LP-low plateau duration if a clear plateau was seen
# HS bursts detected within the window
n_hs_bursts: int
hs_burst_dur_ns: Optional[float] # mean HS burst duration
hs_amplitude_mv: Optional[float] # peak-to-peak single-ended HS swing (mV)
lp_transition_valid: bool # LP-11 → LP-low → HS sequence present
# Flicker detection
# A capture is flagged when the LP-low plateau is absent or shorter than
# FLICKER_LP_LOW_MAX_NS. Normal captures show ~340 ns; flicker shows 050 ns.
flicker_suspect: bool = False
warnings: list = field(default_factory=list)
def summary(self) -> str:
ok = lambda c: "" if c else ""
lines = [
f"Capture {self.capture_num:04d} {self.timestamp} [lp/{self.channel}]",
]
if self.lp11_voltage_v is not None:
in_spec = LP11_SPEC_MIN_V <= self.lp11_voltage_v <= LP11_SPEC_MAX_V
lines.append(
f" LP-11 voltage : {self.lp11_voltage_v:.3f} V "
f"(spec {LP11_SPEC_MIN_V:.1f}{LP11_SPEC_MAX_V:.2f} V) {ok(in_spec)}"
)
if self.lp11_duration_us is not None:
lines.append(f" LP-11 duration : {self.lp11_duration_us:.2f} µs")
if self.lp11_to_hs_ns is not None:
ok_exit = self.lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS
lines.append(
f" LP exit → HS : {self.lp11_to_hs_ns:.0f} ns "
f"(spec ≥{LP_LOW_DUR_MIN_NS:.0f} ns) {ok(ok_exit)}"
)
if self.lp_low_duration_ns is not None:
lines.append(f" LP-low plateau : {self.lp_low_duration_ns:.0f} ns")
lines.append(
f" LP→HS sequence : {'valid ✓' if self.lp_transition_valid else 'NOT DETECTED ✗'}"
)
if self.n_hs_bursts:
lines.append(f" HS bursts : {self.n_hs_bursts}"
+ (f" avg {self.hs_burst_dur_ns:.0f} ns" if self.hs_burst_dur_ns else ""))
if self.hs_amplitude_mv is not None:
lines.append(f" HS amplitude : {self.hs_amplitude_mv:.0f} mV (single-ended p-p/2)")
if self.flicker_suspect:
if (self.hs_amplitude_mv is not None
and self.hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
and self.lp11_to_hs_ns is not None
and self.lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS):
lines.append(
f" *** FLICKER SUSPECT: HS burst absent "
f"(amplitude {self.hs_amplitude_mv:.0f} mV < {HS_BURST_AMPLITUDE_MIN_MV:.0f} mV, "
f"lp11_to_hs={self.lp11_to_hs_ns:.0f} ns) ***"
)
else:
lines.append(
f" *** FLICKER SUSPECT: LP-low plateau absent or < {FLICKER_LP_LOW_MAX_NS:.0f} ns ***"
)
for w in self.warnings:
lines.append(f" WARNING: {w}")
return "\n".join(lines)
def _rolling_std(arr: np.ndarray, window: int) -> np.ndarray:
"""Compute rolling standard deviation using stride_tricks (O(n) memory, fast)."""
from numpy.lib.stride_tricks import sliding_window_view
n = len(arr)
if n <= window:
return np.full(n, arr.std())
windowed = sliding_window_view(arr, window)
stds = windowed.std(axis=1)
# Pad edges to maintain original length
pad_l = window // 2
pad_r = n - len(stds) - pad_l
return np.concatenate([np.full(pad_l, stds[0]), stds, np.full(pad_r, stds[-1])])
def _find_contiguous_regions(mask: np.ndarray, min_samples: int = 5):
"""Return list of (start_idx, end_idx) for True runs ≥ min_samples long."""
padded = np.concatenate([[False], mask, [False]])
diff = np.diff(padded.astype(np.int8))
starts = np.where(diff == 1)[0]
ends = np.where(diff == -1)[0]
return [(s, e) for s, e in zip(starts, ends) if (e - s) >= min_samples]
def analyze_lp_file(path: Path) -> "LPMetrics":
"""
Analyse a single-ended LP capture CSV (Ch1 or Ch3) and return LPMetrics.
State classification per sample:
LP-11 : voltage > LP11_HIGH_V (~1.2 V, both pins high)
LP-low : voltage < LP_LOW_V (~0 V, pin driven low — LP-01 or LP-00)
HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V)
trans : everything else (transitions between states)
"""
m = re.match(r"(\d{8}_\d{6})_lp_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE)
if not m:
raise ValueError(f"Filename does not match lp pattern: {path.name}")
timestamp, cap_str, channel = m.groups()
capture_num = int(cap_str)
times, volts = _read_csv(path)
dt = float(np.diff(times).mean())
sample_rate = 1.0 / dt
duration_us = (float(times[-1]) - float(times[0])) * 1e6
# ── LP-11 detection ───────────────────────────────────────────────────
# LP-11 is reliable: voltage is clearly above LP11_HIGH_V (0.8 V).
lp11_mask = volts > LP11_HIGH_V
lp11_regions = _find_contiguous_regions(lp11_mask, min_samples=10)
lp11_voltage_v = None
lp11_duration_us = None
if lp11_regions:
lp11_voltage_v = round(float(np.concatenate(
[volts[s:e] for s, e in lp11_regions]).mean()), 3)
lp11_duration_us = round(
sum((times[min(e, len(times) - 1)] - times[s])
for s, e in lp11_regions) * 1e6, 3)
# ── HS burst detection ────────────────────────────────────────────────
# On DAT0+ with a uniform-colour display, HS data can look DC (no bit
# transitions), making oscillation-based HS detection unreliable.
# Instead: every non-LP-11 gap between LP-11 regions is treated as an
# HS burst. The first gap starts at the end of the first LP-11 region;
# subsequent gaps are between consecutive LP-11 regions.
lp11_to_hs_ns = None
lp_low_duration_ns = None
lp_transition_valid = False
n_hs_bursts = 0
hs_burst_dur_ns = None
hs_amplitude_mv = None
if len(lp11_regions) >= 1:
# Measure LP-11 → HS exit gap (LP-01 + LP-00 combined) using a rolling
# std: the brief exit transition is the first period of measurable
# oscillation (rolling std > threshold) after LP-11 ends.
window = max(10, int(1e-9 / dt))
rstd = _rolling_std(volts, window)
hs_bursts = []
for i, (lp11_s, lp11_e) in enumerate(lp11_regions):
# Burst ends at start of next LP-11, or at window end
burst_end = lp11_regions[i + 1][0] if i + 1 < len(lp11_regions) else len(times) - 1
lp11_e_idx = min(lp11_e, len(times) - 1) # guard: region end can == len(times)
burst_dur_ns = round((times[burst_end] - times[lp11_e_idx]) * 1e9, 1)
hs_bursts.append((lp11_e_idx, burst_end, burst_dur_ns))
if hs_bursts:
n_hs_bursts = len(hs_bursts)
hs_burst_dur_ns = round(float(np.mean([d for _, _, d in hs_bursts])), 1)
lp_transition_valid = True
# LP exit gap: find first rolling-std > threshold after LP-11 ends
s_end = lp11_regions[0][1]
lookahead = min(s_end + int(500e-9 / dt), len(times) - 1)
high_std_idx = np.where(rstd[s_end:lookahead] >= HS_OSC_STD_V)[0]
if len(high_std_idx):
lp11_to_hs_ns = round((times[s_end + high_std_idx[0]] - times[s_end]) * 1e9, 1)
# LP-low plateau: look for a contiguous region in the exit window
# where voltage < LP_LOW_V and std is low (true LP-01/LP-00 plateau)
lp_low_mask = (volts < LP_LOW_V) & (rstd < HS_OSC_STD_V)
# Time-based minimum: reject glitches shorter than 5 ns.
# At ~40 GSa/s (25 ps/sample) the old min_samples=5 admitted 125 ps noise spikes.
_min_lp_low = max(5, int(5e-9 / dt))
lp_low_regions = _find_contiguous_regions(lp_low_mask, min_samples=_min_lp_low)
exit_window = int(1e-6 / dt)
for lplow_s, lplow_e in lp_low_regions:
if s_end <= lplow_s <= s_end + exit_window:
lp_low_duration_ns = round(
(times[min(lplow_e, len(times) - 1)] - times[lplow_s]) * 1e9, 1)
break
# HS single-ended amplitude from the first burst (where data may vary)
if hs_bursts:
s, e, _ = hs_bursts[0]
burst_volts = volts[s:e]
if len(burst_volts) >= 2:
hs_amplitude_mv = round(
(float(np.percentile(burst_volts, 95)) -
float(np.percentile(burst_volts, 5))) / 2 * 1000, 1
)
# ── Warnings ─────────────────────────────────────────────────────────
warnings = []
continuous_hs_clk = (not lp11_regions) and (channel == "clk") and (float(volts.max()) < LP11_HIGH_V)
if continuous_hs_clk:
warnings.append("CLK lane is in continuous HS mode — LP states not expected on CLK")
elif not lp11_regions:
warnings.append("No LP-11 state detected in capture window")
elif lp11_voltage_v is not None:
if lp11_voltage_v < LP11_SPEC_MIN_V:
warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V below spec min {LP11_SPEC_MIN_V} V")
if lp11_voltage_v > LP11_SPEC_MAX_V:
warnings.append(f"LP-11 voltage {lp11_voltage_v:.3f} V above spec max {LP11_SPEC_MAX_V} V")
if lp11_to_hs_ns is not None and lp11_to_hs_ns < LP_LOW_DUR_MIN_NS:
warnings.append(
f"LP exit duration {lp11_to_hs_ns:.0f} ns below spec min {LP_LOW_DUR_MIN_NS:.0f} ns "
f"— LP-01/LP-00 states may be absent or too brief"
)
if not continuous_hs_clk:
if not lp_transition_valid:
warnings.append("LP-11 → LP-low → HS transition sequence not detected")
if n_hs_bursts == 0:
warnings.append("No HS bursts detected after LP transition")
# Flicker suspect: either the LP-low plateau is absent/short, OR the HS burst
# amplitude is too low. Two confirmed failure modes on this hardware:
#
# A) Normal LP-low (~342380 ns) → bridge misses SoT → returns to LP-11
# Signature: lp11_to_hs fires at real LP-low end (~347 ns), hs_amplitude ≈ 1530 mV.
# Guard: lp11_to_hs >= LP_LOW_DUR_MIN_NS prevents DC-content false positives
# where the ~3 ns noise spike fires the gate but HS IS present.
#
# B) Short LP-low (50200 ns, vs nominal ~342380 ns) → marginal SoT timing
# → HS burst starts but is weak, hs_amplitude ≈ 4060 mV (vs normal 100122 mV).
# Signature: lp_low anomalously short, lp11_to_hs fires at noise spike (~3 ns).
# The lp11_to_hs guard cannot be used here (noise spike looks the same as mode A
# false positives), so LP-low duration itself gates the amplitude check.
# Confirmed example: capture 0120 (lp_low=108 ns, lp11_to_hs=1.7 ns, amp=49 mV).
#
# Only flag DAT lane (CLK is continuous HS — LP states not expected).
_lp_low_short = (
lp_low_duration_ns is not None
and lp_low_duration_ns < 200.0 # below this, LP-low is anomalously brief
)
hs_burst_absent = (
hs_amplitude_mv is not None
and hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
and (
# Mode A: LP-low normal, HS never started (rolling-std confirms it)
(lp11_to_hs_ns is not None and lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS)
# Mode B: LP-low anomalously short + low amplitude = marginal HS launch
or _lp_low_short
)
)
flicker_suspect = (
channel == "dat"
and lp_transition_valid
and (
(lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS)
or hs_burst_absent
)
)
return LPMetrics(
timestamp = timestamp,
capture_num = capture_num,
channel = channel,
sample_rate_gsps = round(sample_rate / 1e9, 1),
duration_us = round(duration_us, 2),
n_samples = len(times),
lp11_voltage_v = lp11_voltage_v,
lp11_duration_us = lp11_duration_us,
lp11_to_hs_ns = lp11_to_hs_ns,
lp_low_duration_ns = lp_low_duration_ns,
n_hs_bursts = n_hs_bursts,
hs_burst_dur_ns = hs_burst_dur_ns,
hs_amplitude_mv = hs_amplitude_mv,
lp_transition_valid = lp_transition_valid,
flicker_suspect = flicker_suspect,
warnings = warnings,
)
if __name__ == "__main__":
import sys
data_dir = Path(__file__).parent / "data"
if len(sys.argv) > 1:
files = [Path(a) for a in sys.argv[1:]]
else:
files = sorted(data_dir.glob("*.csv"))[:8] # first 8 files as demo
for f in files:
try:
if "_lp_" in f.name:
result = analyze_lp_file(f)
else:
result = analyze_file(f)
print(result.summary())
print()
except Exception as e:
print(f"ERROR {f.name}: {e}")