Updates
This commit is contained in:
172
proto_decoder.py
172
proto_decoder.py
@@ -44,6 +44,9 @@ DSI_DT_RGB888 = 0x3E
|
||||
DSI_DT_HSYNC = 0x21 # short packet — H sync start
|
||||
DSI_DT_VSYNC = 0x01 # short packet — V sync start
|
||||
|
||||
# Known-valid DSI data types used in sync-byte validation (VC=0 + DT in this set)
|
||||
VALID_DSI_DT = {0x01, 0x11, 0x21, 0x31, 0x08, 0x09, 0x19, 0x29, 0x39, 0x3E}
|
||||
|
||||
# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes)
|
||||
HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire)
|
||||
|
||||
@@ -149,23 +152,70 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False)
|
||||
N = len(v_dat)
|
||||
|
||||
# --- Single-ended LP path ---
|
||||
# LP-01 + LP-00 + HS-PREPARE + HS-ZERO form a continuous "LP-low" region where
|
||||
# DAT+ < 0.25 V and rolling std < 45 mV. The LP-low region ends when the first
|
||||
# '1' bit transition in 0xB8 causes rolling std > 45 mV. Start bit decoding a
|
||||
# few bits BEFORE that spike so the phase search can find complete 0xB8 near byte 0.
|
||||
if single_ended:
|
||||
min_lp01 = max(2, int(20.0 / dt_ns))
|
||||
run = 0
|
||||
lp01_end = None
|
||||
for i in range(N):
|
||||
if v_dat[i] < LP_SE_LP01_THRESH_V:
|
||||
run += 1
|
||||
else:
|
||||
if run >= min_lp01:
|
||||
lp01_end = i
|
||||
break
|
||||
run = 0
|
||||
LP11_THRESH_SE = 0.8 # V — LP-11 state (DAT+ high)
|
||||
LP_LOW_V_SE = 0.25 # V — LP-01/LP-00/HS-ZERO are all below this
|
||||
HS_STD_V_SE = 0.045 # V — rolling std above this → first HS data bit
|
||||
LP_LOW_MIN_NS = 5.0 # ns — ignore LP-low runs shorter than this
|
||||
LP_MARGIN_NS = 25.0 # ns — start decode this far before first data bit
|
||||
|
||||
if lp01_end is not None:
|
||||
skip = max(1, int(50.0 / dt_ns))
|
||||
return min(lp01_end + skip, N - 1)
|
||||
return None
|
||||
win_samples = max(10, int(1.0 / dt_ns))
|
||||
try:
|
||||
from numpy.lib.stride_tricks import sliding_window_view
|
||||
rstd = np.zeros(N)
|
||||
wins = sliding_window_view(v_dat, win_samples)
|
||||
rstd[win_samples - 1:win_samples - 1 + len(wins)] = wins.std(axis=-1)
|
||||
except Exception:
|
||||
rstd = np.array([v_dat[max(0, i - win_samples):i + 1].std() for i in range(N)])
|
||||
|
||||
# Find LP-11 end (first sample below LP11_THRESH_SE after LP-11)
|
||||
lp11_end_idx = None
|
||||
in_lp11 = False
|
||||
for i in range(N):
|
||||
if v_dat[i] > LP11_THRESH_SE:
|
||||
in_lp11 = True
|
||||
elif in_lp11:
|
||||
lp11_end_idx = i
|
||||
break
|
||||
if lp11_end_idx is None:
|
||||
return None
|
||||
|
||||
search_end = min(lp11_end_idx + int(2000.0 / dt_ns), N)
|
||||
|
||||
# Find LP-low plateau start: first sustained block of v < LP_LOW_V_SE
|
||||
# AND rstd < HS_STD_V_SE (the LP-11 fall edge has high rstd so we skip it).
|
||||
min_lp_run = max(5, int(LP_LOW_MIN_NS / dt_ns))
|
||||
lp_low_start = None
|
||||
run = 0
|
||||
for i in range(lp11_end_idx, search_end):
|
||||
if v_dat[i] < LP_LOW_V_SE and rstd[i] < HS_STD_V_SE:
|
||||
run += 1
|
||||
if run >= min_lp_run:
|
||||
lp_low_start = i - run + 1
|
||||
break
|
||||
else:
|
||||
run = 0
|
||||
if lp_low_start is None:
|
||||
return min(lp11_end_idx + max(1, int(50.0 / dt_ns)), N - 1)
|
||||
|
||||
# Find LP-low plateau end: first rstd > HS_STD_V_SE after the plateau begins.
|
||||
# This is where the first '1' bit in 0xB8 creates a large voltage transition.
|
||||
lp_low_end = None
|
||||
for i in range(lp_low_start, search_end):
|
||||
if rstd[i] > HS_STD_V_SE:
|
||||
lp_low_end = i
|
||||
break
|
||||
if lp_low_end is None:
|
||||
return min(lp_low_start + max(1, int(50.0 / dt_ns)), N - 1)
|
||||
|
||||
# Start decode LP_MARGIN_NS before the first '1' bit of 0xB8 so the 8-phase
|
||||
# search sees the complete sync byte near byte 0.
|
||||
margin = max(1, int(LP_MARGIN_NS / dt_ns))
|
||||
return max(lp_low_start, lp_low_end - margin)
|
||||
|
||||
# --- Differential LP-triggered path ---
|
||||
# LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns)
|
||||
@@ -379,21 +429,37 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||
print(" ERROR: Too few bits decoded")
|
||||
return None
|
||||
|
||||
# Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges.
|
||||
# LP-00 CLK edges before HS starts produce garbage bits; the correct phase is
|
||||
# the one where 0xB8 appears earliest in the byte stream.
|
||||
raw_bytes = None
|
||||
sync_idx = None
|
||||
# Try all 8 bit-phase offsets. Pass 1: find earliest 0xB8 whose next byte has
|
||||
# VC=0 and a known DSI DT (validated sync). Pass 2 fallback: earliest bare 0xB8.
|
||||
raw_bytes = None
|
||||
sync_idx = None
|
||||
best_phase = 0
|
||||
best_sync = len(bits) # sentinel: "not found"
|
||||
best_sync = len(bits)
|
||||
validated = False
|
||||
|
||||
for phase in range(8):
|
||||
rb = bits_to_bytes(bits[phase:])
|
||||
si = find_sync_byte(rb)
|
||||
if si is not None and si < best_sync:
|
||||
best_sync = si
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = si
|
||||
for i in range(len(rb) - 1):
|
||||
if rb[i][1] == HS_SYNC_BYTE:
|
||||
next_byte = rb[i + 1][1]
|
||||
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
|
||||
if i < best_sync:
|
||||
best_sync = i
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = i
|
||||
validated = True
|
||||
break # stop at first validated pair for this phase
|
||||
|
||||
if not validated:
|
||||
for phase in range(8):
|
||||
rb = bits_to_bytes(bits[phase:])
|
||||
si = find_sync_byte(rb)
|
||||
if si is not None and si < best_sync:
|
||||
best_sync = si
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = si
|
||||
|
||||
if raw_bytes is None:
|
||||
raw_bytes = bits_to_bytes(bits)
|
||||
@@ -405,7 +471,8 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||
else:
|
||||
if verbose:
|
||||
t_sync = raw_bytes[sync_idx][0]
|
||||
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})")
|
||||
qual = "validated" if validated else "bare"
|
||||
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
|
||||
|
||||
# Data bytes after sync
|
||||
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself
|
||||
@@ -507,8 +574,19 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||
print(f" HS burst start: {t_hs_start_ns:.0f} ns "
|
||||
f"({hs_duration_us:.1f} µs available of ~18 µs full burst)")
|
||||
|
||||
# Auto-detect HS common mode from the first 200 ns of the HS burst.
|
||||
# CLK+ common mode (~217 mV) and DAT+ common mode (~104 mV on this board) differ;
|
||||
# hard-coding one value for DAT+ breaks the decode. The median of the HS burst
|
||||
# gives the correct bit threshold for any board without manual calibration.
|
||||
hs_probe_end = min(hs_start_idx + max(1, int(200.0 / dt_ns)), len(v_dat))
|
||||
dat_common_mode = float(np.median(v_dat[hs_start_idx:hs_probe_end]))
|
||||
dat_common_mode = max(0.030, min(0.250, dat_common_mode)) # clamp to 30–250 mV
|
||||
|
||||
if verbose:
|
||||
print(f" DAT+ HS common mode: {dat_common_mode*1000:.0f} mV (auto-detected, used as bit threshold)")
|
||||
|
||||
bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx,
|
||||
dat_thresh=LP_SE_DAT_THRESH_V, clk_thresh=LP_SE_CLK_THRESH_V)
|
||||
dat_thresh=dat_common_mode, clk_thresh=LP_SE_CLK_THRESH_V)
|
||||
|
||||
if verbose:
|
||||
print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)")
|
||||
@@ -518,18 +596,35 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||
print(" ERROR: Too few bits decoded")
|
||||
return None
|
||||
|
||||
raw_bytes = None
|
||||
sync_idx = None
|
||||
raw_bytes = None
|
||||
sync_idx = None
|
||||
best_phase = 0
|
||||
best_sync = len(bits)
|
||||
validated = False
|
||||
|
||||
for phase in range(8):
|
||||
rb = bits_to_bytes(bits[phase:])
|
||||
si = find_sync_byte(rb)
|
||||
if si is not None and si < best_sync:
|
||||
best_sync = si
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = si
|
||||
for i in range(len(rb) - 1):
|
||||
if rb[i][1] == HS_SYNC_BYTE:
|
||||
next_byte = rb[i + 1][1]
|
||||
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
|
||||
if i < best_sync:
|
||||
best_sync = i
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = i
|
||||
validated = True
|
||||
break # stop at first validated pair for this phase
|
||||
|
||||
if not validated:
|
||||
for phase in range(8):
|
||||
rb = bits_to_bytes(bits[phase:])
|
||||
si = find_sync_byte(rb)
|
||||
if si is not None and si < best_sync:
|
||||
best_sync = si
|
||||
best_phase = phase
|
||||
raw_bytes = rb
|
||||
sync_idx = si
|
||||
|
||||
if raw_bytes is None:
|
||||
raw_bytes = bits_to_bytes(bits)
|
||||
@@ -541,7 +636,8 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||
else:
|
||||
if verbose:
|
||||
t_sync = raw_bytes[sync_idx][0]
|
||||
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})")
|
||||
qual = "validated" if validated else "bare"
|
||||
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
|
||||
|
||||
data_bytes = raw_bytes[sync_idx + 1:]
|
||||
header = parse_long_packet_header([b for _, b in data_bytes[:8]])
|
||||
|
||||
Reference in New Issue
Block a user