402 lines
14 KiB
Python
402 lines
14 KiB
Python
|
|
"""D-PHY timing extraction and Lane 0 packet decode from scope waveforms.
|
|||
|
|
|
|||
|
|
All voltage thresholds in this module are POST-attenuation values (i.e. what
|
|||
|
|
the scope sees after the 19.2× probe divider). Don't rescale them back to
|
|||
|
|
wire voltages — the divider is calibrated and the thresholds were chosen
|
|||
|
|
to give clean LP/HS state separation at probe output.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import logging
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
import numpy as np
|
|||
|
|
import pandas as pd
|
|||
|
|
|
|||
|
|
from config import DPHY_SPEC
|
|||
|
|
|
|||
|
|
log = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
# Post-attenuation thresholds (volts at scope input, after 19.2× divider).
|
|||
|
|
LP_HIGH_V = 0.040 # "above" → LP-1 (~770 mV on wire)
|
|||
|
|
LP_LOW_V = 0.010 # "below" → LP-0 / HS-0 (~190 mV on wire)
|
|||
|
|
HS_DIFF_V = 0.008 # |CLK_P − CLK_N| above this means HS burst is active
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class LaneStateSpan:
|
|||
|
|
"""A contiguous run of single-ended-detected lane state."""
|
|||
|
|
state: str # "LP-11" | "LP-01" | "LP-10" | "LP-00" | "HS"
|
|||
|
|
t_start: float
|
|||
|
|
t_end: float
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def duration_ns(self) -> float:
|
|||
|
|
return (self.t_end - self.t_start) * 1e9
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Signal reconstruction
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
def differential(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> pd.Series:
|
|||
|
|
return pd.Series(lane_p["voltage_v"].values - lane_n["voltage_v"].values)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def common_mode(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> pd.Series:
|
|||
|
|
return pd.Series((lane_p["voltage_v"].values + lane_n["voltage_v"].values) / 2.0)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Lane state machine
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
def _classify_sample(vp: float, vn: float, vdiff: float) -> str:
|
|||
|
|
"""Classify a single (p, n) sample into a D-PHY lane state."""
|
|||
|
|
if abs(vdiff) > HS_DIFF_V and vp < LP_HIGH_V and vn < LP_HIGH_V:
|
|||
|
|
return "HS"
|
|||
|
|
p_high = vp > LP_HIGH_V
|
|||
|
|
n_high = vn > LP_HIGH_V
|
|||
|
|
p_low = vp < LP_LOW_V
|
|||
|
|
n_low = vn < LP_LOW_V
|
|||
|
|
if p_high and n_high:
|
|||
|
|
return "LP-11"
|
|||
|
|
if p_low and n_high:
|
|||
|
|
return "LP-01"
|
|||
|
|
if p_high and n_low:
|
|||
|
|
return "LP-10"
|
|||
|
|
if p_low and n_low:
|
|||
|
|
return "LP-00"
|
|||
|
|
return "TRANS" # in-between, not yet a settled state
|
|||
|
|
|
|||
|
|
|
|||
|
|
def classify_lane(lane_p: pd.DataFrame, lane_n: pd.DataFrame) -> list[LaneStateSpan]:
|
|||
|
|
"""Walk both single-ended traces and emit consecutive state spans.
|
|||
|
|
|
|||
|
|
Spans labelled "TRANS" are dropped — they are sub-sample edge transitions,
|
|||
|
|
not real D-PHY states. Adjacent same-state spans are merged.
|
|||
|
|
"""
|
|||
|
|
t = lane_p["time_s"].values
|
|||
|
|
vp = lane_p["voltage_v"].values
|
|||
|
|
vn = lane_n["voltage_v"].values
|
|||
|
|
vd = vp - vn
|
|||
|
|
|
|||
|
|
spans: list[LaneStateSpan] = []
|
|||
|
|
cur_state: Optional[str] = None
|
|||
|
|
cur_start = t[0]
|
|||
|
|
|
|||
|
|
for i in range(len(t)):
|
|||
|
|
s = _classify_sample(vp[i], vn[i], vd[i])
|
|||
|
|
if s == "TRANS":
|
|||
|
|
continue
|
|||
|
|
if cur_state is None:
|
|||
|
|
cur_state = s
|
|||
|
|
cur_start = t[i]
|
|||
|
|
continue
|
|||
|
|
if s != cur_state:
|
|||
|
|
spans.append(LaneStateSpan(cur_state, cur_start, t[i]))
|
|||
|
|
cur_state = s
|
|||
|
|
cur_start = t[i]
|
|||
|
|
|
|||
|
|
if cur_state is not None:
|
|||
|
|
spans.append(LaneStateSpan(cur_state, cur_start, t[-1]))
|
|||
|
|
|
|||
|
|
return spans
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _first_span(spans: list[LaneStateSpan], state: str,
|
|||
|
|
start_idx: int = 0) -> Optional[tuple[int, LaneStateSpan]]:
|
|||
|
|
for i in range(start_idx, len(spans)):
|
|||
|
|
if spans[i].state == state:
|
|||
|
|
return i, spans[i]
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Per-parameter measurements
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Each function returns nanoseconds, or NaN if the relevant state span is not
|
|||
|
|
# present in the capture window.
|
|||
|
|
|
|||
|
|
def measure_t_lpx(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
|
|||
|
|
"""Duration of LP-01 (Dp low, Dn high) on data lane — HS Request."""
|
|||
|
|
spans = classify_lane(data_lane_p, data_lane_n)
|
|||
|
|
hit = _first_span(spans, "LP-01")
|
|||
|
|
return hit[1].duration_ns if hit else float("nan")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def measure_t_hs_prepare(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
|
|||
|
|
"""Duration of LP-00 on data lane immediately before HS-0 entry."""
|
|||
|
|
spans = classify_lane(data_lane_p, data_lane_n)
|
|||
|
|
for i in range(len(spans) - 1):
|
|||
|
|
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
|
|||
|
|
return spans[i].duration_ns
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def measure_t_clk_prepare(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
|
|||
|
|
"""Duration of LP-00 on clock lane immediately before HS clock starts."""
|
|||
|
|
spans = classify_lane(clk_p, clk_n)
|
|||
|
|
for i in range(len(spans) - 1):
|
|||
|
|
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
|
|||
|
|
return spans[i].duration_ns
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def measure_t_clk_zero(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
|
|||
|
|
"""Duration of HS-0 on clock lane before first clock toggle.
|
|||
|
|
|
|||
|
|
Implementation: find the LP-00 → HS transition, then walk the differential
|
|||
|
|
until the first edge crossing in the opposite polarity (clock toggle).
|
|||
|
|
"""
|
|||
|
|
t = clk_p["time_s"].values
|
|||
|
|
vd = clk_p["voltage_v"].values - clk_n["voltage_v"].values
|
|||
|
|
|
|||
|
|
spans = classify_lane(clk_p, clk_n)
|
|||
|
|
hs_start: Optional[float] = None
|
|||
|
|
for i in range(len(spans) - 1):
|
|||
|
|
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
|
|||
|
|
hs_start = spans[i + 1].t_start
|
|||
|
|
break
|
|||
|
|
if hs_start is None:
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
start_idx = int(np.searchsorted(t, hs_start))
|
|||
|
|
initial = vd[start_idx]
|
|||
|
|
sign = -1 if initial >= 0 else 1 # look for opposite-polarity crossing
|
|||
|
|
for j in range(start_idx + 1, len(vd)):
|
|||
|
|
if (sign > 0 and vd[j] > HS_DIFF_V) or (sign < 0 and vd[j] < -HS_DIFF_V):
|
|||
|
|
return (t[j] - hs_start) * 1e9
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def measure_t_clk_prepare_plus_zero(clk_p: pd.DataFrame, clk_n: pd.DataFrame) -> float:
|
|||
|
|
a = measure_t_clk_prepare(clk_p, clk_n)
|
|||
|
|
b = measure_t_clk_zero(clk_p, clk_n)
|
|||
|
|
if np.isnan(a) or np.isnan(b):
|
|||
|
|
return float("nan")
|
|||
|
|
return a + b
|
|||
|
|
|
|||
|
|
|
|||
|
|
def measure_t_hs_zero(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame) -> float:
|
|||
|
|
"""HS-0 preamble on data lane before SoT sync byte (00011101 = 0xB8 LSB-first).
|
|||
|
|
|
|||
|
|
Approximated as duration from HS entry until first differential transition
|
|||
|
|
(i.e. first clock-edge-aligned bit flip).
|
|||
|
|
"""
|
|||
|
|
t = data_lane_p["time_s"].values
|
|||
|
|
vd = data_lane_p["voltage_v"].values - data_lane_n["voltage_v"].values
|
|||
|
|
|
|||
|
|
spans = classify_lane(data_lane_p, data_lane_n)
|
|||
|
|
hs_start: Optional[float] = None
|
|||
|
|
for i in range(len(spans) - 1):
|
|||
|
|
if spans[i].state == "LP-00" and spans[i + 1].state == "HS":
|
|||
|
|
hs_start = spans[i + 1].t_start
|
|||
|
|
break
|
|||
|
|
if hs_start is None:
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
start_idx = int(np.searchsorted(t, hs_start))
|
|||
|
|
initial = vd[start_idx]
|
|||
|
|
sign = -1 if initial >= 0 else 1
|
|||
|
|
for j in range(start_idx + 1, len(vd)):
|
|||
|
|
if (sign > 0 and vd[j] > HS_DIFF_V) or (sign < 0 and vd[j] < -HS_DIFF_V):
|
|||
|
|
return (t[j] - hs_start) * 1e9
|
|||
|
|
return float("nan")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Aggregate measurement + spec compliance
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
|
|||
|
|
def measure_all(waveforms: dict[str, pd.DataFrame]) -> dict[str, float]:
|
|||
|
|
clk_p = waveforms["CLK_P"]
|
|||
|
|
clk_n = waveforms["CLK_N"]
|
|||
|
|
dat_p = waveforms["DAT0_P"]
|
|||
|
|
dat_n = waveforms["DAT0_N"]
|
|||
|
|
return {
|
|||
|
|
"t_lpx": measure_t_lpx(dat_p, dat_n),
|
|||
|
|
"t_hs_prepare": measure_t_hs_prepare(dat_p, dat_n),
|
|||
|
|
"t_clk_prepare": measure_t_clk_prepare(clk_p, clk_n),
|
|||
|
|
"t_clk_zero": measure_t_clk_zero(clk_p, clk_n),
|
|||
|
|
"t_clk_prepare_plus_zero": measure_t_clk_prepare_plus_zero(clk_p, clk_n),
|
|||
|
|
"t_hs_zero": measure_t_hs_zero(dat_p, dat_n),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def check_spec_compliance(measurements: dict[str, float],
|
|||
|
|
spec: dict[str, float] = DPHY_SPEC) -> dict:
|
|||
|
|
out: dict[str, dict] = {}
|
|||
|
|
for name, measured_ns in measurements.items():
|
|||
|
|
min_ns = spec.get(name)
|
|||
|
|
if min_ns is None:
|
|||
|
|
continue
|
|||
|
|
if measured_ns is None or np.isnan(measured_ns):
|
|||
|
|
out[name] = {
|
|||
|
|
"measured_ns": None,
|
|||
|
|
"min_ns": min_ns,
|
|||
|
|
"pass": False,
|
|||
|
|
"margin_ns": None,
|
|||
|
|
}
|
|||
|
|
continue
|
|||
|
|
out[name] = {
|
|||
|
|
"measured_ns": float(measured_ns),
|
|||
|
|
"min_ns": float(min_ns),
|
|||
|
|
"pass": bool(measured_ns >= min_ns),
|
|||
|
|
"margin_ns": float(measured_ns - min_ns),
|
|||
|
|
}
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Lane 0 DSI packet decode
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# Ground-truth fault detector (Falcon prior art, May 2024). The SN65 IRQ
|
|||
|
|
# register is a hint — packet payload position is the verdict.
|
|||
|
|
|
|||
|
|
DSI_SOT_SYNC = 0xB8 # SoT sync byte after LP-11 → LP-01 → LP-00 → HS-0
|
|||
|
|
DSI_DT_PIXEL = 0x3E # Packed Pixel Stream, 24-bit RGB (long packet)
|
|||
|
|
DSI_DT_HSYNC_START = 0x21
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class DSIPacket:
|
|||
|
|
burst_idx: int
|
|||
|
|
timestamp_s: float
|
|||
|
|
data_type: int
|
|||
|
|
word_count: int
|
|||
|
|
ecc: int
|
|||
|
|
payload: bytes
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _find_hs_bursts(clk_p: pd.DataFrame, clk_n: pd.DataFrame,
|
|||
|
|
dat_p: pd.DataFrame, dat_n: pd.DataFrame) -> list[tuple[float, float]]:
|
|||
|
|
"""Return (t_start, t_end) for each HS burst on the data lane."""
|
|||
|
|
spans = classify_lane(dat_p, dat_n)
|
|||
|
|
return [(s.t_start, s.t_end) for s in spans if s.state == "HS"]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _sample_bits_in_burst(clk_p: pd.DataFrame, clk_n: pd.DataFrame,
|
|||
|
|
dat_p: pd.DataFrame, dat_n: pd.DataFrame,
|
|||
|
|
t_start: float, t_end: float) -> list[int]:
|
|||
|
|
"""DDR-sample the data lane at every clock edge inside the burst window.
|
|||
|
|
|
|||
|
|
Returns a list of 0/1 bit values, in clock-edge order.
|
|||
|
|
"""
|
|||
|
|
t_clk = clk_p["time_s"].values
|
|||
|
|
vd_clk = clk_p["voltage_v"].values - clk_n["voltage_v"].values
|
|||
|
|
t_dat = dat_p["time_s"].values
|
|||
|
|
vd_dat = dat_p["voltage_v"].values - dat_n["voltage_v"].values
|
|||
|
|
|
|||
|
|
i0 = int(np.searchsorted(t_clk, t_start))
|
|||
|
|
i1 = int(np.searchsorted(t_clk, t_end))
|
|||
|
|
if i1 - i0 < 2:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
edges: list[float] = []
|
|||
|
|
prev_sign = 1 if vd_clk[i0] >= 0 else -1
|
|||
|
|
for k in range(i0 + 1, i1):
|
|||
|
|
cur_sign = 1 if vd_clk[k] >= 0 else -1
|
|||
|
|
if cur_sign != prev_sign:
|
|||
|
|
edges.append(t_clk[k])
|
|||
|
|
prev_sign = cur_sign
|
|||
|
|
|
|||
|
|
bits: list[int] = []
|
|||
|
|
for et in edges:
|
|||
|
|
idx = int(np.searchsorted(t_dat, et))
|
|||
|
|
if 0 <= idx < len(vd_dat):
|
|||
|
|
bits.append(1 if vd_dat[idx] > 0 else 0)
|
|||
|
|
return bits
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _bits_to_bytes_msb_first(bits: list[int]) -> bytes:
|
|||
|
|
out = bytearray()
|
|||
|
|
for i in range(0, len(bits) - 7, 8):
|
|||
|
|
b = 0
|
|||
|
|
for k in range(8):
|
|||
|
|
b = (b << 1) | (bits[i + k] & 1)
|
|||
|
|
out.append(b)
|
|||
|
|
return bytes(out)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def decode_lane0_packets(waveforms: dict[str, pd.DataFrame],
|
|||
|
|
max_payload_bytes: int = 16) -> list[DSIPacket]:
|
|||
|
|
"""Best-effort DSI Lane 0 packet decode.
|
|||
|
|
|
|||
|
|
Scope window at 5 ns/div × 500 kpts is ~2.5 µs — enough for SoT + header
|
|||
|
|
+ first ~200 bytes of payload. We only need the first few payload bytes
|
|||
|
|
to classify Fault A (all-zero payload start).
|
|||
|
|
"""
|
|||
|
|
clk_p = waveforms["CLK_P"]
|
|||
|
|
clk_n = waveforms["CLK_N"]
|
|||
|
|
dat_p = waveforms["DAT0_P"]
|
|||
|
|
dat_n = waveforms["DAT0_N"]
|
|||
|
|
|
|||
|
|
bursts = _find_hs_bursts(clk_p, clk_n, dat_p, dat_n)
|
|||
|
|
packets: list[DSIPacket] = []
|
|||
|
|
|
|||
|
|
for idx, (t0, t1) in enumerate(bursts):
|
|||
|
|
bits = _sample_bits_in_burst(clk_p, clk_n, dat_p, dat_n, t0, t1)
|
|||
|
|
bs = _bits_to_bytes_msb_first(bits)
|
|||
|
|
|
|||
|
|
sot_pos = bs.find(bytes([DSI_SOT_SYNC]))
|
|||
|
|
if sot_pos < 0 or len(bs) < sot_pos + 5:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
header = bs[sot_pos + 1 : sot_pos + 5]
|
|||
|
|
data_type = header[0]
|
|||
|
|
word_count = header[1] | (header[2] << 8)
|
|||
|
|
ecc = header[3]
|
|||
|
|
|
|||
|
|
payload_start = sot_pos + 5
|
|||
|
|
payload_end = min(payload_start + max_payload_bytes, len(bs))
|
|||
|
|
payload = bs[payload_start:payload_end]
|
|||
|
|
|
|||
|
|
packets.append(DSIPacket(
|
|||
|
|
burst_idx=idx,
|
|||
|
|
timestamp_s=t0,
|
|||
|
|
data_type=data_type,
|
|||
|
|
word_count=word_count,
|
|||
|
|
ecc=ecc,
|
|||
|
|
payload=payload,
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
return packets
|
|||
|
|
|
|||
|
|
|
|||
|
|
def classify_packet_fault(packets: list[DSIPacket]) -> dict:
|
|||
|
|
"""Classify Fault A (zero-payload pixel packet) from decoded packets."""
|
|||
|
|
pixel_packets = [p for p in packets if p.data_type == DSI_DT_PIXEL]
|
|||
|
|
if not pixel_packets:
|
|||
|
|
return {"fault_a_detected": False, "reason": "no pixel packets decoded"}
|
|||
|
|
|
|||
|
|
first = pixel_packets[0]
|
|||
|
|
head = first.payload[:8] if first.payload else b""
|
|||
|
|
fault_a = len(head) >= 4 and all(b == 0x00 for b in head[:4])
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"fault_a_detected": bool(fault_a),
|
|||
|
|
"first_pixel_payload_hex": head.hex(),
|
|||
|
|
"n_pixel_packets": len(pixel_packets),
|
|||
|
|
"n_total_packets": len(packets),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def detect_lane_stall(data_lane_p: pd.DataFrame, data_lane_n: pd.DataFrame,
|
|||
|
|
stall_threshold_ms: float = 10.0) -> dict:
|
|||
|
|
"""Fault B: continuous LP-11 longer than threshold during what should be active video."""
|
|||
|
|
spans = classify_lane(data_lane_p, data_lane_n)
|
|||
|
|
longest_lp11_ms = 0.0
|
|||
|
|
for s in spans:
|
|||
|
|
if s.state == "LP-11":
|
|||
|
|
ms = s.duration_ns / 1e6
|
|||
|
|
if ms > longest_lp11_ms:
|
|||
|
|
longest_lp11_ms = ms
|
|||
|
|
return {
|
|||
|
|
"fault_b_detected": bool(longest_lp11_ms > stall_threshold_ms),
|
|||
|
|
"longest_lp11_ms": float(longest_lp11_ms),
|
|||
|
|
"threshold_ms": float(stall_threshold_ms),
|
|||
|
|
}
|