Files
MiPi_Investigation/analysis/waveform.py
david rice 0edb95d7e1 Updates
2026-05-06 15:57:48 +01:00

402 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""D-PHY timing extraction and Lane 0 packet decode from scope waveforms.
All voltage thresholds in this module are POST-attenuation values (i.e. what
the scope sees after the 19.2× probe divider). Don't rescale them back to
wire voltages — the divider is calibrated and the thresholds were chosen
to give clean LP/HS state separation at probe output.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
import numpy as np
import pandas as pd
from config import DPHY_SPEC
log = logging.getLogger(__name__)
# Post-attenuation thresholds (volts at scope input, after 19.2× divider).
LP_HIGH_V = 0.040 # "above" → LP-1 (~770 mV on wire)
LP_LOW_V = 0.010 # "below" → LP-0 / HS-0 (~190 mV on wire)
HS_DIFF_V = 0.008 # |CLK_P CLK_N| above this means HS burst is active
@dataclass
class LaneStateSpan:
"""A contiguous run of single-ended-detected lane state."""
state: str # "LP-11" | "LP-01" | "LP-10" | "LP-00" | "HS"
t_start: float
t_end: float
@property
def duration_ns(self) -> float:
return (self.t_end - self.t_start) * 1e9
# ---------------------------------------------------------------------------
# Signal reconstruction
# ---------------------------------------------------------------------------
def differential(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> pd.Series:
return pd.Series(lane_p["voltage_v"].values - lane_n["voltage_v"].values)
def common_mode(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> pd.Series:
return pd.Series((lane_p["voltage_v"].values + lane_n["voltage_v"].values) / 2.0)
# ---------------------------------------------------------------------------
# Lane state machine
# ---------------------------------------------------------------------------
def _classify_sample(vp: float, vn: float, vdiff: float) -> str:
"""Classify a single (p, n) sample into a D-PHY lane state."""
if abs(vdiff) > HS_DIFF_V and vp < LP_HIGH_V and vn < LP_HIGH_V:
return "HS"
p_high = vp > LP_HIGH_V
n_high = vn > LP_HIGH_V
p_low = vp < LP_LOW_V
n_low = vn < LP_LOW_V
if p_high and n_high:
return "LP-11"
if p_low and n_high:
return "LP-01"
if p_high and n_low:
return "LP-10"
if p_low and n_low:
return "LP-00"
return "TRANS" # in-between, not yet a settled state
def classify_lane(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> list[LaneStateSpan]:
"""Walk both single-ended traces and emit consecutive state spans.
Spans labelled "TRANS" are dropped — they are sub-sample edge transitions,
not real D-PHY states. Adjacent same-state spans are merged.
"""
t = lane_p["time_s"].values
vp = lane_p["voltage_v"].values
vn = lane_n["voltage_v"].values
vd = vp - vn
spans: list[LaneStateSpan] = []
cur_state: Optional[str] = None
cur_start = t[0]
for i in range(len(t)):
s = _classify_sample(vp[i], vn[i], vd[i])
if s == "TRANS":
continue
if cur_state is None:
cur_state = s
cur_start = t[i]
continue
if s != cur_state:
spans.append(LaneStateSpan(cur_state, cur_start, t[i]))
cur_state = s
cur_start = t[i]
if cur_state is not None:
spans.append(LaneStateSpan(cur_state, cur_start, t[-1]))
return spans
def _first_span(spans: list[LaneStateSpan], state: str,
start_idx: int = 0) -> Optional[tuple[int, LaneStateSpan]]:
for i in range(start_idx, len(spans)):
if spans[i].state == state:
return i, spans[i]
return None
# ---------------------------------------------------------------------------
# Per-parameter measurements
# ---------------------------------------------------------------------------
# Each function returns nanoseconds, or NaN if the relevant state span is not
# present in the capture window.
def measure_t_lpx(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
"""Duration of LP-01 (Dp low, Dn high) on data lane — HS Request."""
spans = classify_lane(data_lane_p, data_lane_n)
hit = _first_span(spans, "LP-01")
return hit[1].duration_ns if hit else float("nan")
def measure_t_hs_prepare(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
"""Duration of LP-00 on data lane immediately before HS-0 entry."""
spans = classify_lane(data_lane_p, data_lane_n)
for i in range(len(spans) - 1):
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
return spans[i].duration_ns
return float("nan")
def measure_t_clk_prepare(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
"""Duration of LP-00 on clock lane immediately before HS clock starts."""
spans = classify_lane(clk_p, clk_n)
for i in range(len(spans) - 1):
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
return spans[i].duration_ns
return float("nan")
def measure_t_clk_zero(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
"""Duration of HS-0 on clock lane before first clock toggle.
Implementation: find the LP-00 → HS transition, then walk the differential
until the first edge crossing in the opposite polarity (clock toggle).
"""
t = clk_p["time_s"].values
vd = clk_p["voltage_v"].values - clk_n["voltage_v"].values
spans = classify_lane(clk_p, clk_n)
hs_start: Optional[float] = None
for i in range(len(spans) - 1):
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
hs_start = spans[i + 1].t_start
break
if hs_start is None:
return float("nan")
start_idx = int(np.searchsorted(t, hs_start))
initial = vd[start_idx]
sign = -1 if initial >= 0 else 1 # look for opposite-polarity crossing
for j in range(start_idx + 1, len(vd)):
if (sign > 0 and vd[j] > HS_DIFF_V) or (sign < 0 and vd[j] < -HS_DIFF_V):
return (t[j] - hs_start) * 1e9
return float("nan")
def measure_t_clk_prepare_plus_zero(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
a = measure_t_clk_prepare(clk_p, clk_n)
b = measure_t_clk_zero(clk_p, clk_n)
if np.isnan(a) or np.isnan(b):
return float("nan")
return a + b
def measure_t_hs_zero(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
"""HS-0 preamble on data lane before SoT sync byte (00011101 = 0xB8 LSB-first).
Approximated as duration from HS entry until first differential transition
(i.e. first clock-edge-aligned bit flip).
"""
t = data_lane_p["time_s"].values
vd = data_lane_p["voltage_v"].values - data_lane_n["voltage_v"].values
spans = classify_lane(data_lane_p, data_lane_n)
hs_start: Optional[float] = None
for i in range(len(spans) - 1):
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
hs_start = spans[i + 1].t_start
break
if hs_start is None:
return float("nan")
start_idx = int(np.searchsorted(t, hs_start))
initial = vd[start_idx]
sign = -1 if initial >= 0 else 1
for j in range(start_idx + 1, len(vd)):
if (sign > 0 and vd[j] > HS_DIFF_V) or (sign < 0 and vd[j] < -HS_DIFF_V):
return (t[j] - hs_start) * 1e9
return float("nan")
# ---------------------------------------------------------------------------
# Aggregate measurement + spec compliance
# ---------------------------------------------------------------------------
def measure_all(waveforms: dict[str, pd.DataFrame]) -> dict[str, float]:
clk_p = waveforms["CLK_P"]
clk_n = waveforms["CLK_N"]
dat_p = waveforms["DAT0_P"]
dat_n = waveforms["DAT0_N"]
return {
"t_lpx": measure_t_lpx(dat_p, dat_n),
"t_hs_prepare": measure_t_hs_prepare(dat_p, dat_n),
"t_clk_prepare": measure_t_clk_prepare(clk_p, clk_n),
"t_clk_zero": measure_t_clk_zero(clk_p, clk_n),
"t_clk_prepare_plus_zero": measure_t_clk_prepare_plus_zero(clk_p, clk_n),
"t_hs_zero": measure_t_hs_zero(dat_p, dat_n),
}
def check_spec_compliance(measurements: dict[str, float],
spec: dict[str, float] = DPHY_SPEC) -> dict:
out: dict[str, dict] = {}
for name, measured_ns in measurements.items():
min_ns = spec.get(name)
if min_ns is None:
continue
if measured_ns is None or np.isnan(measured_ns):
out[name] = {
"measured_ns": None,
"min_ns": min_ns,
"pass": False,
"margin_ns": None,
}
continue
out[name] = {
"measured_ns": float(measured_ns),
"min_ns": float(min_ns),
"pass": bool(measured_ns >= min_ns),
"margin_ns": float(measured_ns - min_ns),
}
return out
# ---------------------------------------------------------------------------
# Lane 0 DSI packet decode
# ---------------------------------------------------------------------------
# Ground-truth fault detector (Falcon prior art, May 2024). The SN65 IRQ
# register is a hint — packet payload position is the verdict.
DSI_SOT_SYNC = 0xB8 # SoT sync byte after LP-11 → LP-01 → LP-00 → HS-0
DSI_DT_PIXEL = 0x3E # Packed Pixel Stream, 24-bit RGB (long packet)
DSI_DT_HSYNC_START = 0x21
@dataclass
class DSIPacket:
burst_idx: int
timestamp_s: float
data_type: int
word_count: int
ecc: int
payload: bytes
def _find_hs_bursts(clk_p: pd.DataFrame, clk_n: pd.DataFrame,
dat_p: pd.DataFrame, dat_n: pd.DataFrame) -> list[tuple[float, float]]:
"""Return (t_start, t_end) for each HS burst on the data lane."""
spans = classify_lane(dat_p, dat_n)
return [(s.t_start, s.t_end) for s in spans if s.state == "HS"]
def _sample_bits_in_burst(clk_p: pd.DataFrame, clk_n: pd.DataFrame,
dat_p: pd.DataFrame, dat_n: pd.DataFrame,
t_start: float, t_end: float) -> list[int]:
"""DDR-sample the data lane at every clock edge inside the burst window.
Returns a list of 0/1 bit values, in clock-edge order.
"""
t_clk = clk_p["time_s"].values
vd_clk = clk_p["voltage_v"].values - clk_n["voltage_v"].values
t_dat = dat_p["time_s"].values
vd_dat = dat_p["voltage_v"].values - dat_n["voltage_v"].values
i0 = int(np.searchsorted(t_clk, t_start))
i1 = int(np.searchsorted(t_clk, t_end))
if i1 - i0 < 2:
return []
edges: list[float] = []
prev_sign = 1 if vd_clk[i0] >= 0 else -1
for k in range(i0 + 1, i1):
cur_sign = 1 if vd_clk[k] >= 0 else -1
if cur_sign != prev_sign:
edges.append(t_clk[k])
prev_sign = cur_sign
bits: list[int] = []
for et in edges:
idx = int(np.searchsorted(t_dat, et))
if 0 <= idx < len(vd_dat):
bits.append(1 if vd_dat[idx] > 0 else 0)
return bits
def _bits_to_bytes_msb_first(bits: list[int]) -> bytes:
out = bytearray()
for i in range(0, len(bits) - 7, 8):
b = 0
for k in range(8):
b = (b << 1) | (bits[i + k] & 1)
out.append(b)
return bytes(out)
def decode_lane0_packets(waveforms: dict[str, pd.DataFrame],
max_payload_bytes: int = 16) -> list[DSIPacket]:
"""Best-effort DSI Lane 0 packet decode.
Scope window at 5 ns/div × 500 kpts is ~2.5 µs — enough for SoT + header
+ first ~200 bytes of payload. We only need the first few payload bytes
to classify Fault A (all-zero payload start).
"""
clk_p = waveforms["CLK_P"]
clk_n = waveforms["CLK_N"]
dat_p = waveforms["DAT0_P"]
dat_n = waveforms["DAT0_N"]
bursts = _find_hs_bursts(clk_p, clk_n, dat_p, dat_n)
packets: list[DSIPacket] = []
for idx, (t0, t1) in enumerate(bursts):
bits = _sample_bits_in_burst(clk_p, clk_n, dat_p, dat_n, t0, t1)
bs = _bits_to_bytes_msb_first(bits)
sot_pos = bs.find(bytes([DSI_SOT_SYNC]))
if sot_pos < 0 or len(bs) < sot_pos + 5:
continue
header = bs[sot_pos + 1 : sot_pos + 5]
data_type = header[0]
word_count = header[1] | (header[2] << 8)
ecc = header[3]
payload_start = sot_pos + 5
payload_end = min(payload_start + max_payload_bytes, len(bs))
payload = bs[payload_start:payload_end]
packets.append(DSIPacket(
burst_idx=idx,
timestamp_s=t0,
data_type=data_type,
word_count=word_count,
ecc=ecc,
payload=payload,
))
return packets
def classify_packet_fault(packets: list[DSIPacket]) -> dict:
"""Classify Fault A (zero-payload pixel packet) from decoded packets."""
pixel_packets = [p for p in packets if p.data_type == DSI_DT_PIXEL]
if not pixel_packets:
return {"fault_a_detected": False, "reason": "no pixel packets decoded"}
first = pixel_packets[0]
head = first.payload[:8] if first.payload else b""
fault_a = len(head) >= 4 and all(b == 0x00 for b in head[:4])
return {
"fault_a_detected": bool(fault_a),
"first_pixel_payload_hex": head.hex(),
"n_pixel_packets": len(pixel_packets),
"n_total_packets": len(packets),
}
def detect_lane_stall(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame,
stall_threshold_ms: float = 10.0) -> dict:
"""Fault B: continuous LP-11 longer than threshold during what should be active video."""
spans = classify_lane(data_lane_p, data_lane_n)
longest_lp11_ms = 0.0
for s in spans:
if s.state == "LP-11":
ms = s.duration_ns / 1e6
if ms > longest_lp11_ms:
longest_lp11_ms = ms
return {
"fault_b_detected": bool(longest_lp11_ms > stall_threshold_ms),
"longest_lp11_ms": float(longest_lp11_ms),
"threshold_ms": float(stall_threshold_ms),
}