Files
MiPi_TEST/proto_decoder.py

441 lines
16 KiB
Python
Raw Normal View History

2026-04-24 14:30:48 +01:00
#!/usr/bin/env python3
"""
proto_decoder.py
Decodes DSI packet content from proto (differential) captures.
Usage:
python3 proto_decoder.py [--cap CAP_NUM] [--dir DATA_DIR] [--compare]
The proto_*_clk and proto_*_dat captures are Ch1-Ch2 and Ch3-Ch4 differential
waveforms at ~50-80 ps/sample. CLK runs continuously at ~215 MHz (430 Mbps DDR).
DAT carries MIPI D-PHY HS data, sampled on both CLK edges.
Decodes:
- DSI long packet header: DI (data type / virtual channel), word count, ECC
- First N payload bytes on lane 0
- Compares two captures to spot differing byte positions (data-shift detection)
"""
import argparse
import glob
import sys
from pathlib import Path
import numpy as np
DATA_DIR = Path(__file__).parent / "data"
DISPLAY_W = 1280 # pixels per line
DISPLAY_H = 800
N_LANES = 4
BPP = 24 # bits per pixel (RGB888)
# Expected bytes per line on lane 0:
# payload = DISPLAY_W * (BPP//8) bytes total / N_LANES per lane
# header = 4 bytes total (DI, WC_L, WC_H, ECC), 1 per lane
# footer = 2 bytes total (CRC_L, CRC_H), distributed across first 2 lanes
PAYLOAD_BYTES_PER_LANE = (DISPLAY_W * (BPP // 8)) // N_LANES # 960
HEADER_BYTES_PER_LANE = 1 # DI on lane 0
FOOTER_BYTES_PER_LANE = 1 # CRC_L on lane 0
TOTAL_LANE0_BYTES = HEADER_BYTES_PER_LANE + PAYLOAD_BYTES_PER_LANE + FOOTER_BYTES_PER_LANE
# DSI data type for 24-bit packed pixel stream
DSI_DT_RGB888 = 0x3E
DSI_DT_HSYNC = 0x21 # short packet — H sync start
DSI_DT_VSYNC = 0x01 # short packet — V sync start
# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes)
HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire)
# Threshold for differential voltage: >0 = logic-1 (D+ > D-)
DAT_THRESH_V = 0.0
# ---------------------------------------------------------------------------
# I/O
# ---------------------------------------------------------------------------
def load_csv(path: Path):
data = np.genfromtxt(path, delimiter=",")
return data[:, 0], data[:, 1]
def find_proto_files(cap_num: int, data_dir: Path):
pattern_clk = str(data_dir / f"*_proto_{cap_num:04d}_clk.csv")
pattern_dat = str(data_dir / f"*_proto_{cap_num:04d}_dat.csv")
clk_files = sorted(glob.glob(pattern_clk))
dat_files = sorted(glob.glob(pattern_dat))
if not clk_files:
raise FileNotFoundError(f"No proto CLK file found for cap {cap_num:04d} in {data_dir}")
if not dat_files:
raise FileNotFoundError(f"No proto DAT file found for cap {cap_num:04d} in {data_dir}")
return Path(clk_files[-1]), Path(dat_files[-1])
# ---------------------------------------------------------------------------
# Clock edge detection
# ---------------------------------------------------------------------------
def find_clock_edges(t_clk, v_clk, threshold=0.0):
"""
Return arrays of (rising_indices, falling_indices) in the CLK trace.
Filters out glitches: only keeps transitions separated by at least 1 ns.
"""
dt_ns = float(np.median(np.diff(t_clk))) * 1e9
min_gap = max(1, int(1.0 / dt_ns)) # ~1 ns minimum between edges
crossings = np.where(np.diff((v_clk > threshold).astype(int)))[0]
if len(crossings) < 2:
return np.array([], dtype=int), np.array([], dtype=int)
# Filter: keep only crossings separated by > min_gap samples
keep = np.concatenate(([True], np.diff(crossings) > min_gap))
crossings = crossings[keep]
level = (v_clk > threshold).astype(int)
rising = crossings[np.diff(level)[crossings] > 0]
falling = crossings[np.diff(level)[crossings] < 0]
return rising, falling
# ---------------------------------------------------------------------------
# HS burst detection
# ---------------------------------------------------------------------------
def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
"""
Find the start of the main HS burst in the DAT trace.
The proto capture often starts mid-HS (previous packet), so we:
1. Find the LP quiet region (both LP-11 and LP-low have low differential std)
2. Find the first sustained oscillation AFTER that quiet region
Returns: index into t_dat of approximate HS burst start, or None.
"""
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
win = max(1, int(1.0 / dt_ns)) # 1 ns rolling window
min_run = max(5, int(5.0 / dt_ns)) # at least 5 ns continuous
rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(len(v_dat))])
OSC_THRESH = 0.04 # 40 mV — HS oscillation
QUIET_THRESH = 0.02 # 20 mV — LP quiet (LP-11 differential ≈ 0, low std)
# Step 1: find a quiet (LP) region of at least 200 ns
quiet_min_run = max(5, int(200.0 / dt_ns))
quiet_end = None
run_len = 0
for i, std_val in enumerate(rstd):
if std_val < QUIET_THRESH:
run_len += 1
if run_len >= quiet_min_run:
quiet_end = i
else:
run_len = 0
if quiet_end is None:
return None # no LP region found
# Step 2: find first sustained oscillation after the LP region
run_start = None
run_len = 0
for i in range(quiet_end, len(rstd)):
if rstd[i] >= OSC_THRESH:
if run_start is None:
run_start = i
run_len += 1
if run_len >= min_run:
return run_start
else:
run_start = None
run_len = 0
return None
# ---------------------------------------------------------------------------
# Bit decoding
# ---------------------------------------------------------------------------
def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx):
"""
Sample DAT on every CLK edge (DDR) after hs_start_idx.
Returns list of (time_ns, bit) tuples.
"""
t_hs = t_dat[hs_start_idx]
rising, falling = find_clock_edges(t_clk, v_clk)
all_edges = np.sort(np.concatenate([rising, falling]))
# Only edges after HS start
hs_mask = t_clk[all_edges] >= t_hs
hs_edges = all_edges[hs_mask]
if len(hs_edges) == 0:
return []
dt_dat = float(np.median(np.diff(t_dat))) * 1e9
bits = []
for edge_idx in hs_edges:
t_edge = t_clk[edge_idx]
# Find nearest sample in DAT trace
dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9)))
dat_idx = max(0, min(dat_idx, len(v_dat) - 1))
bit = 1 if v_dat[dat_idx] > DAT_THRESH_V else 0
bits.append((t_edge * 1e9, bit))
return bits
# ---------------------------------------------------------------------------
# Byte reconstruction
# ---------------------------------------------------------------------------
def bits_to_bytes(bits):
"""
Pack bits into bytes (LSB first, as MIPI D-PHY transmits).
Returns list of (time_ns_of_first_bit, byte_value).
"""
result = []
for i in range(0, len(bits) - 7, 8):
byte_bits = [b for _, b in bits[i:i + 8]]
val = sum(byte_bits[j] << j for j in range(8))
result.append((bits[i][0], val))
return result
# ---------------------------------------------------------------------------
# DSI sync byte search and frame alignment
# ---------------------------------------------------------------------------
def find_sync_byte(raw_bytes):
"""
Search for the MIPI D-PHY HS sync byte (0xB8) in the decoded byte stream.
The sync byte precedes all data bytes in each HS burst.
Returns index into raw_bytes of the sync byte, or None.
"""
for i, (_, byte_val) in enumerate(raw_bytes):
if byte_val == HS_SYNC_BYTE:
return i
return None
def parse_long_packet_header(payload_bytes):
"""
Parse a DSI long packet header from lane-0 perspective.
Lane 0 carries: [DI, then payload bytes 0, 4, 8, ...]
The WC and ECC bytes are on lanes 1-3 (not captured here).
Returns dict with DI interpretation.
"""
if not payload_bytes:
return None
di = payload_bytes[0]
vc = (di >> 6) & 0x03
dt = di & 0x3F
dt_name = {
0x01: "VSS (V-Sync Start)",
0x11: "VSE (V-Sync End)",
0x21: "HSS (H-Sync Start)",
0x31: "HSE (H-Sync End)",
0x08: "EOT (End of Transmission)",
0x39: "RGB888 (long packet, 24bpp)",
0x3E: "Packed RGB888 (long packet, 24bpp)",
0x29: "Generic long write",
}.get(dt, f"unknown (0x{dt:02X})")
return {
"DI_raw": di,
"VC" : vc,
"DT" : dt,
"DT_name": dt_name,
}
# ---------------------------------------------------------------------------
# Main decode function
# ---------------------------------------------------------------------------
def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
"""
Full decode of a proto capture. Returns dict with results.
"""
clk_path, dat_path = find_proto_files(cap_num, data_dir)
if verbose:
print(f"\n{'='*60}")
print(f"Cap {cap_num:04d}: {dat_path.name}")
print(f"{'='*60}")
t_clk, v_clk = load_csv(clk_path)
t_dat, v_dat = load_csv(dat_path)
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
if verbose:
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns:.0f} ps/sample)")
# Find HS burst start
hs_start_idx = find_hs_start(t_dat, v_dat)
if hs_start_idx is None:
if verbose:
print(" ERROR: Could not find HS burst start")
return None
t_hs_start_ns = t_dat[hs_start_idx] * 1e9
t_hs_end_ns = t_dat[-1] * 1e9
hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0
if verbose:
print(f" HS burst start: {t_hs_start_ns:.0f} ns ({hs_duration_us:.1f} µs available of ~18 µs full burst)")
# Decode bits
bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx)
if verbose:
print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)")
if len(bits) < 16:
if verbose:
print(" ERROR: Too few bits decoded")
return None
raw_bytes = bits_to_bytes(bits)
# Find sync byte alignment
sync_idx = find_sync_byte(raw_bytes)
if sync_idx is None:
if verbose:
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found — using raw byte 0 as start")
sync_idx = 0
else:
if verbose:
t_sync = raw_bytes[sync_idx][0]
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns)")
# Data bytes after sync
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself
# Parse header
header = parse_long_packet_header([b for _, b in data_bytes[:8]])
if verbose and header:
print(f"\n DSI Header (lane 0):")
print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})")
# Payload bytes on lane 0 (every byte after header DI)
# Lane 0 payload: bytes 0, 4, 8, ... of the full pixel stream
# For RGB888: R0, G1, B2, R3, G4, B5, ...
lane0_payload = [b for _, b in data_bytes[1:]] # skip DI
if verbose:
n_payload = len(lane0_payload)
n_pixels_partial = n_payload * N_LANES // (BPP // 8)
print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)")
if n_payload >= 16:
hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64])
print(f" First 64 payload bytes: {hex_str}")
if n_payload > 64:
print(f" ...")
# Check for non-zero content and where it first appears
nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None)
if nonzero_idx is None:
print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)")
else:
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
return {
"cap_num" : cap_num,
"hs_start_ns" : t_hs_start_ns,
"hs_duration_us" : hs_duration_us,
"n_bits" : len(bits),
"n_bytes" : len(raw_bytes),
"sync_idx" : sync_idx,
"header" : header,
"lane0_payload" : lane0_payload,
}
# ---------------------------------------------------------------------------
# Comparison
# ---------------------------------------------------------------------------
def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128):
"""
Decode both captures and report byte-level differences in the first n_bytes.
"""
print(f"\nComparing cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)")
res_a = decode_capture(cap_a, data_dir, verbose=False)
res_b = decode_capture(cap_b, data_dir, verbose=False)
if res_a is None or res_b is None:
print(" ERROR: Could not decode one or both captures")
return
pa = res_a["lane0_payload"][:n_bytes]
pb = res_b["lane0_payload"][:n_bytes]
n_compare = min(len(pa), len(pb), n_bytes)
diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]]
print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns")
print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns")
if not diffs:
print(f"\n No differences in first {n_compare} bytes — data content matches.")
else:
print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:")
print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}")
for offset, ba, bb in diffs[:40]:
pixel_group = offset * N_LANES // (BPP // 8)
print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})")
if len(diffs) > 40:
print(f" ... ({len(diffs) - 40} more)")
# Check for data-shift pattern: does one capture's data appear shifted in the other?
if len(pa) > 8 and len(pb) > 8:
pa_arr = np.array(pa[:n_compare], dtype=np.uint8)
pb_arr = np.array(pb[:n_compare], dtype=np.uint8)
xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(),
pb_arr.astype(float) - pb_arr.mean(), mode="full")
lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1)
if lag != 0 and abs(lag) < n_compare // 2:
print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures")
else:
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Decode DSI packet content from proto captures")
parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)")
parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory")
parser.add_argument("--compare", type=int, default=None,
metavar="CAP_B",
help="Compare --cap against CAP_B byte-by-byte")
parser.add_argument("--list" , action="store_true", help="List available proto captures")
args = parser.parse_args()
data_dir = Path(args.dir)
if args.list:
files = sorted(data_dir.glob("*_proto_*_dat.csv"))
caps = sorted({int(f.stem.split("_")[-2]) for f in files})
print(f"Available proto captures: {caps}")
return
if args.compare is not None:
compare_captures(args.cap, args.compare, data_dir)
else:
decode_capture(args.cap, data_dir, verbose=True)
if __name__ == "__main__":
main()