Files
MiPi_TEST/proto_decoder.py
david rice dd93fbd893 Updates
2026-05-07 09:01:32 +01:00

916 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
proto_decoder.py
Decodes DSI packet content from proto (differential) captures.
Usage:
python3 proto_decoder.py [--cap CAP_NUM] [--dir DATA_DIR] [--compare]
The proto_*_clk and proto_*_dat captures are Ch1-Ch2 and Ch3-Ch4 differential
waveforms at ~50-80 ps/sample. CLK runs continuously at ~215 MHz (430 Mbps DDR).
DAT carries MIPI D-PHY HS data, sampled on both CLK edges.
Decodes:
- DSI long packet header: DI (data type / virtual channel), word count, ECC
- First N payload bytes on lane 0
- Compares two captures to spot differing byte positions (data-shift detection)
"""
import argparse
import glob
import sys
from pathlib import Path
import numpy as np
DATA_DIR = Path(__file__).parent / "data"
DISPLAY_W = 1280 # pixels per line
DISPLAY_H = 800
N_LANES = 4
BPP = 24 # bits per pixel (RGB888)
# Expected bytes per line on lane 0:
# payload = DISPLAY_W * (BPP//8) bytes total / N_LANES per lane
# header = 4 bytes total (DI, WC_L, WC_H, ECC), 1 per lane
# footer = 2 bytes total (CRC_L, CRC_H), distributed across first 2 lanes
PAYLOAD_BYTES_PER_LANE = (DISPLAY_W * (BPP // 8)) // N_LANES # 960
HEADER_BYTES_PER_LANE = 1 # DI on lane 0
FOOTER_BYTES_PER_LANE = 1 # CRC_L on lane 0
TOTAL_LANE0_BYTES = HEADER_BYTES_PER_LANE + PAYLOAD_BYTES_PER_LANE + FOOTER_BYTES_PER_LANE
# DSI data type for 24-bit packed pixel stream
DSI_DT_RGB888 = 0x3E
DSI_DT_HSYNC = 0x21 # short packet — H sync start
DSI_DT_VSYNC = 0x01 # short packet — V sync start
# Known-valid DSI data types used in sync-byte validation (VC=0 + DT in this set)
VALID_DSI_DT = {0x01, 0x11, 0x21, 0x31, 0x08, 0x09, 0x19, 0x29, 0x39, 0x3E}
# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes)
HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire)
# Threshold for differential voltage: >0 = logic-1 (D+ > D-)
DAT_THRESH_V = 0.0
# Single-ended LP file thresholds (CH1=CLK+, CH3=DAT0+).
# In HS mode both CLK+ and DAT+ oscillate around the D-PHY common mode (~200 mV).
LP_SE_CLK_THRESH_V = 0.20 # CLK+ zero-crossing threshold for edge detection
LP_SE_DAT_THRESH_V = 0.20 # DAT+ HS bit threshold (> this = logic 1)
LP_SE_LP01_THRESH_V = 0.25 # DAT+ < this during LP-01/LP-00 SoT preamble
# Expected Lane 0 payload byte pattern for a static-pink display (R=0xFF G=0x33 B=0xBB).
# With 4-lane RGB888, Lane 0 carries every 4th byte of the full payload beginning at
# offset 0. The 12-byte boundary aligns R/G/B of consecutive pixels so Lane 0 sees:
# offset 0 → pixel 0 R = 0xFF
# offset 4 → pixel 1 G = 0x33
# offset 8 → pixel 2 B = 0xBB
# offset 12 → pixel 4 R = 0xFF (repeats)
# → 3-byte repeating cycle [0xFF, 0x33, 0xBB] on Lane 0.
STATIC_PINK_LANE0 = (0xFF, 0x33, 0xBB)
# ---------------------------------------------------------------------------
# I/O
# ---------------------------------------------------------------------------
def load_csv(path: Path):
data = np.genfromtxt(path, delimiter=",")
return data[:, 0], data[:, 1]
def find_proto_files(cap_num: int, data_dir: Path):
pattern_clk = str(data_dir / f"*_proto_{cap_num:04d}_clk.csv")
pattern_dat = str(data_dir / f"*_proto_{cap_num:04d}_dat.csv")
clk_files = sorted(glob.glob(pattern_clk))
dat_files = sorted(glob.glob(pattern_dat))
if not clk_files:
raise FileNotFoundError(f"No proto CLK file found for cap {cap_num:04d} in {data_dir}")
if not dat_files:
raise FileNotFoundError(f"No proto DAT file found for cap {cap_num:04d} in {data_dir}")
return Path(clk_files[-1]), Path(dat_files[-1])
def find_lp_files(cap_num: int, data_dir: Path):
pattern_clk = str(data_dir / f"*_lp_{cap_num:04d}_clk.csv")
pattern_dat = str(data_dir / f"*_lp_{cap_num:04d}_dat.csv")
clk_files = sorted(glob.glob(pattern_clk))
dat_files = sorted(glob.glob(pattern_dat))
if not clk_files:
raise FileNotFoundError(f"No LP CLK file found for cap {cap_num:04d} in {data_dir}")
if not dat_files:
raise FileNotFoundError(f"No LP DAT file found for cap {cap_num:04d} in {data_dir}")
return Path(clk_files[-1]), Path(dat_files[-1])
# ---------------------------------------------------------------------------
# Clock edge detection
# ---------------------------------------------------------------------------
def find_clock_edges(t_clk, v_clk, threshold=0.0):
"""
Return arrays of (rising_indices, falling_indices) in the CLK trace.
Filters out glitches: only keeps transitions separated by at least 1 ns.
"""
dt_ns = float(np.median(np.diff(t_clk))) * 1e9
min_gap = max(1, int(1.0 / dt_ns)) # ~1 ns minimum between edges
crossings = np.where(np.diff((v_clk > threshold).astype(int)))[0]
if len(crossings) < 2:
return np.array([], dtype=int), np.array([], dtype=int)
# Filter: keep only crossings separated by > min_gap samples
keep = np.concatenate(([True], np.diff(crossings) > min_gap))
crossings = crossings[keep]
level = (v_clk > threshold).astype(int)
rising = crossings[np.diff(level)[crossings] > 0]
falling = crossings[np.diff(level)[crossings] < 0]
return rising, falling
# ---------------------------------------------------------------------------
# HS burst detection
# ---------------------------------------------------------------------------
def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False):
"""
Find the start of the post-LP HS burst in the DAT trace.
single_ended=True — LP files (CH1=CLK+, CH3=DAT0+): detects LP-01/LP-00
as DAT+ < LP_SE_LP01_THRESH_V for ≥ 20 ns, then returns
index 50 ns after the plateau ends (HS common-mode rise).
Search starts at index 0 — LP-11 pre-trigger (~1.2 V)
is well above the threshold so no false matches.
single_ended=False — Proto files (F2=CH3-CH4 differential): LP-01 detected
as diff < -0.5 V for ≥ 20 ns, search from N//4.
Returns index into t_dat just past the SoT preamble, ready for CLK-edge sampling.
Falls back to rolling-std method for HS-triggered captures (differential only).
"""
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
N = len(v_dat)
# --- Single-ended LP path ---
# LP-01 + LP-00 + HS-PREPARE + HS-ZERO form a continuous "LP-low" region where
# DAT+ < 0.25 V and rolling std < 45 mV. The LP-low region ends when the first
# '1' bit transition in 0xB8 causes rolling std > 45 mV. Start bit decoding a
# few bits BEFORE that spike so the phase search can find complete 0xB8 near byte 0.
if single_ended:
LP11_THRESH_SE = 0.8 # V — LP-11 state (DAT+ high)
LP_LOW_V_SE = 0.25 # V — LP-01/LP-00/HS-ZERO are all below this
HS_STD_V_SE = 0.045 # V — rolling std above this → first HS data bit
LP_LOW_MIN_NS = 5.0 # ns — ignore LP-low runs shorter than this
LP_MARGIN_NS = 25.0 # ns — start decode this far before first data bit
win_samples = max(10, int(1.0 / dt_ns))
try:
from numpy.lib.stride_tricks import sliding_window_view
rstd = np.zeros(N)
wins = sliding_window_view(v_dat, win_samples)
rstd[win_samples - 1:win_samples - 1 + len(wins)] = wins.std(axis=-1)
except Exception:
rstd = np.array([v_dat[max(0, i - win_samples):i + 1].std() for i in range(N)])
# Find LP-11 end (first sample below LP11_THRESH_SE after LP-11)
lp11_end_idx = None
in_lp11 = False
for i in range(N):
if v_dat[i] > LP11_THRESH_SE:
in_lp11 = True
elif in_lp11:
lp11_end_idx = i
break
if lp11_end_idx is None:
return None
search_end = min(lp11_end_idx + int(2000.0 / dt_ns), N)
# Find LP-low plateau start: first sustained block of v < LP_LOW_V_SE
# AND rstd < HS_STD_V_SE (the LP-11 fall edge has high rstd so we skip it).
min_lp_run = max(5, int(LP_LOW_MIN_NS / dt_ns))
lp_low_start = None
run = 0
for i in range(lp11_end_idx, search_end):
if v_dat[i] < LP_LOW_V_SE and rstd[i] < HS_STD_V_SE:
run += 1
if run >= min_lp_run:
lp_low_start = i - run + 1
break
else:
run = 0
if lp_low_start is None:
return min(lp11_end_idx + max(1, int(50.0 / dt_ns)), N - 1)
# Find LP-low plateau end: first rstd > HS_STD_V_SE after the plateau begins.
# This is where the first '1' bit in 0xB8 creates a large voltage transition.
lp_low_end = None
for i in range(lp_low_start, search_end):
if rstd[i] > HS_STD_V_SE:
lp_low_end = i
break
if lp_low_end is None:
return min(lp_low_start + max(1, int(50.0 / dt_ns)), N - 1)
# Start decode LP_MARGIN_NS before the first '1' bit of 0xB8 so the 8-phase
# search sees the complete sync byte near byte 0.
margin = max(1, int(LP_MARGIN_NS / dt_ns))
return max(lp_low_start, lp_low_end - margin)
# --- Differential LP-triggered path ---
# LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns)
LP01_THRESH = -0.5
min_lp01 = max(2, int(20.0 / dt_ns))
search_from = N // 4 # skip any LP-01 fragment at capture start
run = 0
lp01_end = None
for i in range(search_from, N):
if v_dat[i] < LP01_THRESH:
run += 1
else:
if run >= min_lp01:
lp01_end = i
break
run = 0
if lp01_end is not None:
skip = max(1, int(200.0 / dt_ns))
return min(lp01_end + skip, N - 1)
# --- Fallback: HS-triggered captures (original rolling-std method) ---
win = max(1, int(1.0 / dt_ns))
min_run = max(5, int(5.0 / dt_ns))
rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(N)])
OSC_THRESH = 0.04
QUIET_THRESH = 0.02
quiet_min_run = max(5, int(200.0 / dt_ns))
quiet_end = None
run_len = 0
for i, std_val in enumerate(rstd):
if std_val < QUIET_THRESH:
run_len += 1
if run_len >= quiet_min_run:
quiet_end = i
else:
run_len = 0
if quiet_end is None:
return None
run_start = None
run_len = 0
for i in range(quiet_end, len(rstd)):
if rstd[i] >= OSC_THRESH:
if run_start is None:
run_start = i
run_len += 1
if run_len >= min_run:
return run_start
else:
run_start = None
run_len = 0
return None
# ---------------------------------------------------------------------------
# Bit decoding
# ---------------------------------------------------------------------------
def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx,
dat_thresh=None, clk_thresh=None):
"""
Sample DAT on every CLK edge (DDR) after hs_start_idx.
dat_thresh: voltage threshold for bit decisions on DAT (default: DAT_THRESH_V).
clk_thresh: voltage threshold for CLK edge detection (default: 0.0).
Returns list of (time_ns, bit) tuples.
"""
if dat_thresh is None:
dat_thresh = DAT_THRESH_V
if clk_thresh is None:
clk_thresh = 0.0
t_hs = t_dat[hs_start_idx]
rising, falling = find_clock_edges(t_clk, v_clk, threshold=clk_thresh)
all_edges = np.sort(np.concatenate([rising, falling]))
hs_mask = t_clk[all_edges] >= t_hs
hs_edges = all_edges[hs_mask]
if len(hs_edges) == 0:
return []
dt_dat = float(np.median(np.diff(t_dat))) * 1e9
bits = []
for edge_idx in hs_edges:
t_edge = t_clk[edge_idx]
dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9)))
dat_idx = max(0, min(dat_idx, len(v_dat) - 1))
bit = 1 if v_dat[dat_idx] > dat_thresh else 0
bits.append((t_edge * 1e9, bit))
return bits
# ---------------------------------------------------------------------------
# Byte reconstruction
# ---------------------------------------------------------------------------
def bits_to_bytes(bits):
"""
Pack bits into bytes (LSB first, as MIPI D-PHY transmits).
Returns list of (time_ns_of_first_bit, byte_value).
"""
result = []
for i in range(0, len(bits) - 7, 8):
byte_bits = [b for _, b in bits[i:i + 8]]
val = sum(byte_bits[j] << j for j in range(8))
result.append((bits[i][0], val))
return result
# ---------------------------------------------------------------------------
# DSI sync byte search and frame alignment
# ---------------------------------------------------------------------------
def find_sync_byte(raw_bytes):
"""
Search for the MIPI D-PHY HS sync byte (0xB8) in the decoded byte stream.
The sync byte precedes all data bytes in each HS burst.
Returns index into raw_bytes of the sync byte, or None.
"""
for i, (_, byte_val) in enumerate(raw_bytes):
if byte_val == HS_SYNC_BYTE:
return i
return None
def parse_long_packet_header(payload_bytes):
"""
Parse a DSI long packet header from lane-0 perspective.
Lane 0 carries: [DI, then payload bytes 0, 4, 8, ...]
The WC and ECC bytes are on lanes 1-3 (not captured here).
Returns dict with DI interpretation.
"""
if not payload_bytes:
return None
di = payload_bytes[0]
vc = (di >> 6) & 0x03
dt = di & 0x3F
dt_name = {
0x01: "VSS (V-Sync Start)",
0x11: "VSE (V-Sync End)",
0x21: "HSS (H-Sync Start)",
0x31: "HSE (H-Sync End)",
0x08: "EOT (End of Transmission)",
0x39: "RGB888 (long packet, 24bpp)",
0x3E: "Packed RGB888 (long packet, 24bpp)",
0x29: "Generic long write",
}.get(dt, f"unknown (0x{dt:02X})")
return {
"DI_raw": di,
"VC" : vc,
"DT" : dt,
"DT_name": dt_name,
}
# ---------------------------------------------------------------------------
# Main decode function
# ---------------------------------------------------------------------------
def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
"""
Full decode of a proto capture. Returns dict with results.
"""
clk_path, dat_path = find_proto_files(cap_num, data_dir)
if verbose:
print(f"\n{'='*60}")
print(f"Cap {cap_num:04d}: {dat_path.name}")
print(f"{'='*60}")
t_clk, v_clk = load_csv(clk_path)
t_dat, v_dat = load_csv(dat_path)
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
if verbose:
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)")
# Find HS burst start
hs_start_idx = find_hs_start(t_dat, v_dat)
if hs_start_idx is None:
if verbose:
print(" ERROR: Could not find HS burst start")
return None
t_hs_start_ns = t_dat[hs_start_idx] * 1e9
t_hs_end_ns = t_dat[-1] * 1e9
hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0
if verbose:
print(f" HS burst start: {t_hs_start_ns:.0f} ns ({hs_duration_us:.1f} µs available of ~18 µs full burst)")
# Decode bits
bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx)
if verbose:
print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)")
if len(bits) < 16:
if verbose:
print(" ERROR: Too few bits decoded")
return None
# Try all 8 bit-phase offsets. Pass 1: find earliest 0xB8 whose next byte has
# VC=0 and a known DSI DT (validated sync). Pass 2 fallback: earliest bare 0xB8.
raw_bytes = None
sync_idx = None
best_phase = 0
best_sync = len(bits)
validated = False
for phase in range(8):
rb = bits_to_bytes(bits[phase:])
for i in range(len(rb) - 1):
if rb[i][1] == HS_SYNC_BYTE:
next_byte = rb[i + 1][1]
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
if i < best_sync:
best_sync = i
best_phase = phase
raw_bytes = rb
sync_idx = i
validated = True
break # stop at first validated pair for this phase
if not validated:
for phase in range(8):
rb = bits_to_bytes(bits[phase:])
si = find_sync_byte(rb)
if si is not None and si < best_sync:
best_sync = si
best_phase = phase
raw_bytes = rb
sync_idx = si
if raw_bytes is None:
raw_bytes = bits_to_bytes(bits)
if sync_idx is None:
if verbose:
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0")
sync_idx = 0
else:
if verbose:
t_sync = raw_bytes[sync_idx][0]
qual = "validated" if validated else "bare"
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
# Data bytes after sync
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself
# Parse header
header = parse_long_packet_header([b for _, b in data_bytes[:8]])
if verbose and header:
print(f"\n DSI Header (lane 0):")
print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})")
# Payload bytes on lane 0 (every byte after header DI)
# Lane 0 payload: bytes 0, 4, 8, ... of the full pixel stream
# For RGB888: R0, G1, B2, R3, G4, B5, ...
lane0_payload = [b for _, b in data_bytes[1:]] # skip DI
if verbose:
n_payload = len(lane0_payload)
n_pixels_partial = n_payload * N_LANES // (BPP // 8)
print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)")
if n_payload >= 16:
hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64])
print(f" First 64 payload bytes: {hex_str}")
if n_payload > 64:
print(f" ...")
# Check for non-zero content and where it first appears
nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None)
if nonzero_idx is None:
print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)")
else:
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
# Static-pink pixel content check
if n_payload >= 12:
cc = check_pixel_content(lane0_payload)
match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes "
f"match static-pink pattern")
if cc["first_mismatch"]:
mm = cc["first_mismatch"]
match_str += (f" (first diff at offset {mm[0]}: "
f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})")
print(f"\n Static-pink check : {match_str}")
pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None
return {
"cap_num" : cap_num,
"hs_start_ns" : t_hs_start_ns,
"hs_duration_us" : hs_duration_us,
"n_bits" : len(bits),
"n_bytes" : len(raw_bytes),
"sync_idx" : sync_idx,
"header" : header,
"lane0_payload" : lane0_payload,
"pixel_check" : pixel_check,
}
# ---------------------------------------------------------------------------
# LP single-ended decode
# ---------------------------------------------------------------------------
def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True):
"""
Full decode of an LP capture (CH1=CLK+, CH3=DAT0+) using single-ended thresholds.
LP files are captured at 10 GSa/s (100 ps/sample, ~23 samples/bit at 432 Mbps) —
sufficient resolution to decode the HS bit stream without a separate proto pass.
Returns a dict with the same structure as decode_capture().
"""
clk_path, dat_path = find_lp_files(cap_num, data_dir)
if verbose:
print(f"\n{'='*60}")
print(f"Cap {cap_num:04d}: {dat_path.name} [LP single-ended]")
print(f"{'='*60}")
t_clk, v_clk = load_csv(clk_path)
t_dat, v_dat = load_csv(dat_path)
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
if verbose:
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs "
f"({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)")
hs_start_idx = find_hs_start(t_dat, v_dat, t_clk, single_ended=True)
if hs_start_idx is None:
if verbose:
print(" ERROR: Could not find HS burst start")
return None
t_hs_start_ns = t_dat[hs_start_idx] * 1e9
t_hs_end_ns = t_dat[-1] * 1e9
hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0
if verbose:
print(f" HS burst start: {t_hs_start_ns:.0f} ns "
f"({hs_duration_us:.1f} µs available of ~18 µs full burst)")
# Auto-detect HS common mode from the first 200 ns of the HS burst.
# CLK+ common mode (~217 mV) and DAT+ common mode (~104 mV on this board) differ;
# hard-coding one value for DAT+ breaks the decode. The median of the HS burst
# gives the correct bit threshold for any board without manual calibration.
hs_probe_end = min(hs_start_idx + max(1, int(200.0 / dt_ns)), len(v_dat))
dat_common_mode = float(np.median(v_dat[hs_start_idx:hs_probe_end]))
dat_common_mode = max(0.030, min(0.250, dat_common_mode)) # clamp to 30250 mV
if verbose:
print(f" DAT+ HS common mode: {dat_common_mode*1000:.0f} mV (auto-detected, used as bit threshold)")
bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx,
dat_thresh=dat_common_mode, clk_thresh=LP_SE_CLK_THRESH_V)
if verbose:
print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)")
if len(bits) < 16:
if verbose:
print(" ERROR: Too few bits decoded")
return None
raw_bytes = None
sync_idx = None
best_phase = 0
best_sync = len(bits)
validated = False
for phase in range(8):
rb = bits_to_bytes(bits[phase:])
for i in range(len(rb) - 1):
if rb[i][1] == HS_SYNC_BYTE:
next_byte = rb[i + 1][1]
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
if i < best_sync:
best_sync = i
best_phase = phase
raw_bytes = rb
sync_idx = i
validated = True
break # stop at first validated pair for this phase
if not validated:
for phase in range(8):
rb = bits_to_bytes(bits[phase:])
si = find_sync_byte(rb)
if si is not None and si < best_sync:
best_sync = si
best_phase = phase
raw_bytes = rb
sync_idx = si
if raw_bytes is None:
raw_bytes = bits_to_bytes(bits)
if sync_idx is None:
if verbose:
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0")
sync_idx = 0
else:
if verbose:
t_sync = raw_bytes[sync_idx][0]
qual = "validated" if validated else "bare"
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
data_bytes = raw_bytes[sync_idx + 1:]
header = parse_long_packet_header([b for _, b in data_bytes[:8]])
if verbose and header:
print(f"\n DSI Header (lane 0):")
print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})")
lane0_payload = [b for _, b in data_bytes[1:]]
if verbose:
n_payload = len(lane0_payload)
n_pixels_partial = n_payload * N_LANES // (BPP // 8)
print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)")
if n_payload >= 16:
hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64])
print(f" First 64 payload bytes: {hex_str}")
if n_payload > 64:
print(f" ...")
nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None)
if nonzero_idx is None:
print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)")
else:
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
if n_payload >= 12:
cc = check_pixel_content(lane0_payload)
match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes "
f"match static-pink pattern")
if cc["first_mismatch"]:
mm = cc["first_mismatch"]
match_str += (f" (first diff at offset {mm[0]}: "
f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})")
print(f"\n Static-pink check : {match_str}")
pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None
return {
"cap_num" : cap_num,
"hs_start_ns" : t_hs_start_ns,
"hs_duration_us" : hs_duration_us,
"n_bits" : len(bits),
"n_bytes" : len(raw_bytes),
"sync_idx" : sync_idx,
"header" : header,
"lane0_payload" : lane0_payload,
"pixel_check" : pixel_check,
}
# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------
def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128):
"""
Decode both captures and report byte-level differences in the first n_bytes.
"""
print(f"\nComparing cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)")
res_a = decode_capture(cap_a, data_dir, verbose=False)
res_b = decode_capture(cap_b, data_dir, verbose=False)
if res_a is None or res_b is None:
print(" ERROR: Could not decode one or both captures")
return
pa = res_a["lane0_payload"][:n_bytes]
pb = res_b["lane0_payload"][:n_bytes]
n_compare = min(len(pa), len(pb), n_bytes)
diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]]
print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns")
print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns")
if not diffs:
print(f"\n No differences in first {n_compare} bytes — data content matches.")
else:
print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:")
print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}")
for offset, ba, bb in diffs[:40]:
pixel_group = offset * N_LANES // (BPP // 8)
print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})")
if len(diffs) > 40:
print(f" ... ({len(diffs) - 40} more)")
# Check for data-shift pattern: does one capture's data appear shifted in the other?
if len(pa) > 8 and len(pb) > 8:
pa_arr = np.array(pa[:n_compare], dtype=np.uint8)
pb_arr = np.array(pb[:n_compare], dtype=np.uint8)
xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(),
pb_arr.astype(float) - pb_arr.mean(), mode="full")
lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1)
if lag != 0 and abs(lag) < n_compare // 2:
print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures")
else:
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
def compare_lp_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128):
"""
Decode both LP captures and report byte-level differences in the first n_bytes.
"""
print(f"\nComparing LP cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)")
res_a = decode_lp_capture(cap_a, data_dir, verbose=False)
res_b = decode_lp_capture(cap_b, data_dir, verbose=False)
if res_a is None or res_b is None:
print(" ERROR: Could not decode one or both LP captures")
return
pa = res_a["lane0_payload"][:n_bytes]
pb = res_b["lane0_payload"][:n_bytes]
n_compare = min(len(pa), len(pb), n_bytes)
diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]]
print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns")
print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns")
if not diffs:
print(f"\n No differences in first {n_compare} bytes — data content matches.")
else:
print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:")
print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}")
for offset, ba, bb in diffs[:40]:
pixel_group = offset * N_LANES // (BPP // 8)
print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})")
if len(diffs) > 40:
print(f" ... ({len(diffs) - 40} more)")
if len(pa) > 8 and len(pb) > 8:
pa_arr = np.array(pa[:n_compare], dtype=np.uint8)
pb_arr = np.array(pb[:n_compare], dtype=np.uint8)
xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(),
pb_arr.astype(float) - pb_arr.mean(), mode="full")
lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1)
if lag != 0 and abs(lag) < n_compare // 2:
print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures")
else:
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
# ---------------------------------------------------------------------------
# Pixel content verification and anomaly analysis
# ---------------------------------------------------------------------------
def check_pixel_content(lane0_payload: list, n_check: int = 60) -> dict:
"""
Verify the first n_check Lane 0 payload bytes against the expected static-pink
pattern STATIC_PINK_LANE0. Returns a dict:
match_pct — percentage of bytes matching expected pattern
n_mismatches — number of mismatching bytes in the checked window
first_mismatch — (offset, expected_byte, actual_byte) or None
n_checked — number of bytes examined
"""
check = lane0_payload[:n_check]
if not check:
return {"match_pct": None, "n_mismatches": 0,
"first_mismatch": None, "n_checked": 0}
mismatches = [
(i, STATIC_PINK_LANE0[i % 3], actual)
for i, actual in enumerate(check)
if actual != STATIC_PINK_LANE0[i % 3]
]
return {
"match_pct": round((1 - len(mismatches) / len(check)) * 100, 1),
"n_mismatches": len(mismatches),
"first_mismatch": mismatches[0] if mismatches else None,
"n_checked": len(check),
}
def analyse_for_anomalies(result: dict | None) -> dict:
"""
Summarise bit-level anomalies from a decode_capture() result.
Returns {"anomalous": bool, "flags": list[str]}.
Checks:
sync_byte_not_found — 0xB8 not found in any of 8 bit phases →
HS burst may not have started properly
sync_byte_late — 0xB8 found but at byte index > 5 →
garbage precedes sync → possible byte misalignment
unexpected_packet_type — DI data-type not in the expected set
pixel_content_mismatch — Lane 0 payload < 90 % match to static-pink pattern
"""
if result is None:
return {"anomalous": True, "flags": ["decode_failed"]}
flags = []
sync_idx = result.get("sync_idx")
if sync_idx is None:
flags.append("sync_byte_not_found — HS burst may not have started")
elif sync_idx > 5:
flags.append(
f"sync_byte_late (found at byte {sync_idx}, expected ≤ 5) — "
f"possible byte misalignment"
)
header = result.get("header")
if header:
dt = header.get("DT", -1)
known = {DSI_DT_RGB888, 0x39, DSI_DT_HSYNC, DSI_DT_VSYNC,
0x31, 0x11, 0x29, 0x08, 0x09, 0x19}
if dt not in known:
flags.append(f"unexpected_packet_type DT=0x{dt:02X}")
payload = result.get("lane0_payload", [])
if len(payload) >= 12:
cc = check_pixel_content(payload)
if cc["match_pct"] is not None and cc["match_pct"] < 90.0:
mm = cc["first_mismatch"]
detail = (
f"first diff at byte {mm[0]}: got 0x{mm[2]:02X} expected 0x{mm[1]:02X}"
if mm else ""
)
flags.append(
f"pixel_content_mismatch "
f"({cc['match_pct']:.0f}% of {cc['n_checked']} bytes match; {detail})"
)
return {"anomalous": bool(flags), "flags": flags}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Decode DSI packet content from proto or LP captures")
parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)")
parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory")
parser.add_argument("--compare", type=int, default=None,
metavar="CAP_B",
help="Compare --cap against CAP_B byte-by-byte")
parser.add_argument("--lp" , action="store_true",
help="Decode from LP single-ended files instead of proto differential files")
parser.add_argument("--list" , action="store_true", help="List available captures")
args = parser.parse_args()
data_dir = Path(args.dir)
if args.list:
proto_files = sorted(data_dir.glob("*_proto_*_dat.csv"))
proto_caps = sorted({int(f.stem.split("_")[-2]) for f in proto_files})
lp_files = sorted(data_dir.glob("*_lp_*_dat.csv"))
lp_caps = sorted({int(f.stem.split("_")[-2]) for f in lp_files})
print(f"Available proto captures: {proto_caps}")
print(f"Available LP captures: {lp_caps}")
return
if args.compare is not None:
if args.lp:
compare_lp_captures(args.cap, args.compare, data_dir)
else:
compare_captures(args.cap, args.compare, data_dir)
else:
if args.lp:
result = decode_lp_capture(args.cap, data_dir, verbose=True)
else:
result = decode_capture(args.cap, data_dir, verbose=True)
anomaly = analyse_for_anomalies(result)
if anomaly["anomalous"]:
print(f"\n*** BIT-LEVEL ANOMALIES: {', '.join(anomaly['flags'])} ***")
else:
print(f"\nNo bit-level anomalies detected (sync, packet type, pixel content all OK)")
if __name__ == "__main__":
main()