#!/usr/bin/env python3 """ proto_decoder.py Decodes DSI packet content from proto (differential) captures. Usage: python3 proto_decoder.py [--cap CAP_NUM] [--dir DATA_DIR] [--compare] The proto_*_clk and proto_*_dat captures are Ch1-Ch2 and Ch3-Ch4 differential waveforms at ~50-80 ps/sample. CLK runs continuously at ~215 MHz (430 Mbps DDR). DAT carries MIPI D-PHY HS data, sampled on both CLK edges. Decodes: - DSI long packet header: DI (data type / virtual channel), word count, ECC - First N payload bytes on lane 0 - Compares two captures to spot differing byte positions (data-shift detection) """ import argparse import glob import sys from pathlib import Path import numpy as np DATA_DIR = Path(__file__).parent / "data" DISPLAY_W = 1280 # pixels per line DISPLAY_H = 800 N_LANES = 4 BPP = 24 # bits per pixel (RGB888) # Expected bytes per line on lane 0: # payload = DISPLAY_W * (BPP//8) bytes total / N_LANES per lane # header = 4 bytes total (DI, WC_L, WC_H, ECC), 1 per lane # footer = 2 bytes total (CRC_L, CRC_H), distributed across first 2 lanes PAYLOAD_BYTES_PER_LANE = (DISPLAY_W * (BPP // 8)) // N_LANES # 960 HEADER_BYTES_PER_LANE = 1 # DI on lane 0 FOOTER_BYTES_PER_LANE = 1 # CRC_L on lane 0 TOTAL_LANE0_BYTES = HEADER_BYTES_PER_LANE + PAYLOAD_BYTES_PER_LANE + FOOTER_BYTES_PER_LANE # DSI data type for 24-bit packed pixel stream DSI_DT_RGB888 = 0x3E DSI_DT_HSYNC = 0x21 # short packet — H sync start DSI_DT_VSYNC = 0x01 # short packet — V sync start # MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes) HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire) # Threshold for differential voltage: >0 = logic-1 (D+ > D-) DAT_THRESH_V = 0.0 # Single-ended LP file thresholds (CH1=CLK+, CH3=DAT0+). # In HS mode both CLK+ and DAT+ oscillate around the D-PHY common mode (~200 mV). LP_SE_CLK_THRESH_V = 0.20 # CLK+ zero-crossing threshold for edge detection LP_SE_DAT_THRESH_V = 0.20 # DAT+ HS bit threshold (> this = logic 1) LP_SE_LP01_THRESH_V = 0.25 # DAT+ < this during LP-01/LP-00 SoT preamble # Expected Lane 0 payload byte pattern for a static-pink display (R=0xFF G=0x33 B=0xBB). # With 4-lane RGB888, Lane 0 carries every 4th byte of the full payload beginning at # offset 0. The 12-byte boundary aligns R/G/B of consecutive pixels so Lane 0 sees: # offset 0 → pixel 0 R = 0xFF # offset 4 → pixel 1 G = 0x33 # offset 8 → pixel 2 B = 0xBB # offset 12 → pixel 4 R = 0xFF (repeats) # → 3-byte repeating cycle [0xFF, 0x33, 0xBB] on Lane 0. STATIC_PINK_LANE0 = (0xFF, 0x33, 0xBB) # --------------------------------------------------------------------------- # I/O # --------------------------------------------------------------------------- def load_csv(path: Path): data = np.genfromtxt(path, delimiter=",") return data[:, 0], data[:, 1] def find_proto_files(cap_num: int, data_dir: Path): pattern_clk = str(data_dir / f"*_proto_{cap_num:04d}_clk.csv") pattern_dat = str(data_dir / f"*_proto_{cap_num:04d}_dat.csv") clk_files = sorted(glob.glob(pattern_clk)) dat_files = sorted(glob.glob(pattern_dat)) if not clk_files: raise FileNotFoundError(f"No proto CLK file found for cap {cap_num:04d} in {data_dir}") if not dat_files: raise FileNotFoundError(f"No proto DAT file found for cap {cap_num:04d} in {data_dir}") return Path(clk_files[-1]), Path(dat_files[-1]) def find_lp_files(cap_num: int, data_dir: Path): pattern_clk = str(data_dir / f"*_lp_{cap_num:04d}_clk.csv") pattern_dat = str(data_dir / f"*_lp_{cap_num:04d}_dat.csv") clk_files = sorted(glob.glob(pattern_clk)) dat_files = sorted(glob.glob(pattern_dat)) if not clk_files: raise FileNotFoundError(f"No LP CLK file found for cap {cap_num:04d} in {data_dir}") if not dat_files: raise FileNotFoundError(f"No LP DAT file found for cap {cap_num:04d} in {data_dir}") return Path(clk_files[-1]), Path(dat_files[-1]) # --------------------------------------------------------------------------- # Clock edge detection # --------------------------------------------------------------------------- def find_clock_edges(t_clk, v_clk, threshold=0.0): """ Return arrays of (rising_indices, falling_indices) in the CLK trace. Filters out glitches: only keeps transitions separated by at least 1 ns. """ dt_ns = float(np.median(np.diff(t_clk))) * 1e9 min_gap = max(1, int(1.0 / dt_ns)) # ~1 ns minimum between edges crossings = np.where(np.diff((v_clk > threshold).astype(int)))[0] if len(crossings) < 2: return np.array([], dtype=int), np.array([], dtype=int) # Filter: keep only crossings separated by > min_gap samples keep = np.concatenate(([True], np.diff(crossings) > min_gap)) crossings = crossings[keep] level = (v_clk > threshold).astype(int) rising = crossings[np.diff(level)[crossings] > 0] falling = crossings[np.diff(level)[crossings] < 0] return rising, falling # --------------------------------------------------------------------------- # HS burst detection # --------------------------------------------------------------------------- def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False): """ Find the start of the post-LP HS burst in the DAT trace. single_ended=True — LP files (CH1=CLK+, CH3=DAT0+): detects LP-01/LP-00 as DAT+ < LP_SE_LP01_THRESH_V for ≥ 20 ns, then returns index 50 ns after the plateau ends (HS common-mode rise). Search starts at index 0 — LP-11 pre-trigger (~1.2 V) is well above the threshold so no false matches. single_ended=False — Proto files (F2=CH3-CH4 differential): LP-01 detected as diff < -0.5 V for ≥ 20 ns, search from N//4. Returns index into t_dat just past the SoT preamble, ready for CLK-edge sampling. Falls back to rolling-std method for HS-triggered captures (differential only). """ dt_ns = float(np.median(np.diff(t_dat))) * 1e9 N = len(v_dat) # --- Single-ended LP path --- if single_ended: min_lp01 = max(2, int(20.0 / dt_ns)) run = 0 lp01_end = None for i in range(N): if v_dat[i] < LP_SE_LP01_THRESH_V: run += 1 else: if run >= min_lp01: lp01_end = i break run = 0 if lp01_end is not None: skip = max(1, int(50.0 / dt_ns)) return min(lp01_end + skip, N - 1) return None # --- Differential LP-triggered path --- # LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns) LP01_THRESH = -0.5 min_lp01 = max(2, int(20.0 / dt_ns)) search_from = N // 4 # skip any LP-01 fragment at capture start run = 0 lp01_end = None for i in range(search_from, N): if v_dat[i] < LP01_THRESH: run += 1 else: if run >= min_lp01: lp01_end = i break run = 0 if lp01_end is not None: skip = max(1, int(200.0 / dt_ns)) return min(lp01_end + skip, N - 1) # --- Fallback: HS-triggered captures (original rolling-std method) --- win = max(1, int(1.0 / dt_ns)) min_run = max(5, int(5.0 / dt_ns)) rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(N)]) OSC_THRESH = 0.04 QUIET_THRESH = 0.02 quiet_min_run = max(5, int(200.0 / dt_ns)) quiet_end = None run_len = 0 for i, std_val in enumerate(rstd): if std_val < QUIET_THRESH: run_len += 1 if run_len >= quiet_min_run: quiet_end = i else: run_len = 0 if quiet_end is None: return None run_start = None run_len = 0 for i in range(quiet_end, len(rstd)): if rstd[i] >= OSC_THRESH: if run_start is None: run_start = i run_len += 1 if run_len >= min_run: return run_start else: run_start = None run_len = 0 return None # --------------------------------------------------------------------------- # Bit decoding # --------------------------------------------------------------------------- def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, dat_thresh=None, clk_thresh=None): """ Sample DAT on every CLK edge (DDR) after hs_start_idx. dat_thresh: voltage threshold for bit decisions on DAT (default: DAT_THRESH_V). clk_thresh: voltage threshold for CLK edge detection (default: 0.0). Returns list of (time_ns, bit) tuples. """ if dat_thresh is None: dat_thresh = DAT_THRESH_V if clk_thresh is None: clk_thresh = 0.0 t_hs = t_dat[hs_start_idx] rising, falling = find_clock_edges(t_clk, v_clk, threshold=clk_thresh) all_edges = np.sort(np.concatenate([rising, falling])) hs_mask = t_clk[all_edges] >= t_hs hs_edges = all_edges[hs_mask] if len(hs_edges) == 0: return [] dt_dat = float(np.median(np.diff(t_dat))) * 1e9 bits = [] for edge_idx in hs_edges: t_edge = t_clk[edge_idx] dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9))) dat_idx = max(0, min(dat_idx, len(v_dat) - 1)) bit = 1 if v_dat[dat_idx] > dat_thresh else 0 bits.append((t_edge * 1e9, bit)) return bits # --------------------------------------------------------------------------- # Byte reconstruction # --------------------------------------------------------------------------- def bits_to_bytes(bits): """ Pack bits into bytes (LSB first, as MIPI D-PHY transmits). Returns list of (time_ns_of_first_bit, byte_value). """ result = [] for i in range(0, len(bits) - 7, 8): byte_bits = [b for _, b in bits[i:i + 8]] val = sum(byte_bits[j] << j for j in range(8)) result.append((bits[i][0], val)) return result # --------------------------------------------------------------------------- # DSI sync byte search and frame alignment # --------------------------------------------------------------------------- def find_sync_byte(raw_bytes): """ Search for the MIPI D-PHY HS sync byte (0xB8) in the decoded byte stream. The sync byte precedes all data bytes in each HS burst. Returns index into raw_bytes of the sync byte, or None. """ for i, (_, byte_val) in enumerate(raw_bytes): if byte_val == HS_SYNC_BYTE: return i return None def parse_long_packet_header(payload_bytes): """ Parse a DSI long packet header from lane-0 perspective. Lane 0 carries: [DI, then payload bytes 0, 4, 8, ...] The WC and ECC bytes are on lanes 1-3 (not captured here). Returns dict with DI interpretation. """ if not payload_bytes: return None di = payload_bytes[0] vc = (di >> 6) & 0x03 dt = di & 0x3F dt_name = { 0x01: "VSS (V-Sync Start)", 0x11: "VSE (V-Sync End)", 0x21: "HSS (H-Sync Start)", 0x31: "HSE (H-Sync End)", 0x08: "EOT (End of Transmission)", 0x39: "RGB888 (long packet, 24bpp)", 0x3E: "Packed RGB888 (long packet, 24bpp)", 0x29: "Generic long write", }.get(dt, f"unknown (0x{dt:02X})") return { "DI_raw": di, "VC" : vc, "DT" : dt, "DT_name": dt_name, } # --------------------------------------------------------------------------- # Main decode function # --------------------------------------------------------------------------- def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): """ Full decode of a proto capture. Returns dict with results. """ clk_path, dat_path = find_proto_files(cap_num, data_dir) if verbose: print(f"\n{'='*60}") print(f"Cap {cap_num:04d}: {dat_path.name}") print(f"{'='*60}") t_clk, v_clk = load_csv(clk_path) t_dat, v_dat = load_csv(dat_path) dt_ns = float(np.median(np.diff(t_dat))) * 1e9 if verbose: print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)") # Find HS burst start hs_start_idx = find_hs_start(t_dat, v_dat) if hs_start_idx is None: if verbose: print(" ERROR: Could not find HS burst start") return None t_hs_start_ns = t_dat[hs_start_idx] * 1e9 t_hs_end_ns = t_dat[-1] * 1e9 hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0 if verbose: print(f" HS burst start: {t_hs_start_ns:.0f} ns ({hs_duration_us:.1f} µs available of ~18 µs full burst)") # Decode bits bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx) if verbose: print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") if len(bits) < 16: if verbose: print(" ERROR: Too few bits decoded") return None # Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges. # LP-00 CLK edges before HS starts produce garbage bits; the correct phase is # the one where 0xB8 appears earliest in the byte stream. raw_bytes = None sync_idx = None best_phase = 0 best_sync = len(bits) # sentinel: "not found" for phase in range(8): rb = bits_to_bytes(bits[phase:]) si = find_sync_byte(rb) if si is not None and si < best_sync: best_sync = si best_phase = phase raw_bytes = rb sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) if sync_idx is None: if verbose: print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0") sync_idx = 0 else: if verbose: t_sync = raw_bytes[sync_idx][0] print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") # Data bytes after sync data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself # Parse header header = parse_long_packet_header([b for _, b in data_bytes[:8]]) if verbose and header: print(f"\n DSI Header (lane 0):") print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})") # Payload bytes on lane 0 (every byte after header DI) # Lane 0 payload: bytes 0, 4, 8, ... of the full pixel stream # For RGB888: R0, G1, B2, R3, G4, B5, ... lane0_payload = [b for _, b in data_bytes[1:]] # skip DI if verbose: n_payload = len(lane0_payload) n_pixels_partial = n_payload * N_LANES // (BPP // 8) print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)") if n_payload >= 16: hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64]) print(f" First 64 payload bytes: {hex_str}") if n_payload > 64: print(f" ...") # Check for non-zero content and where it first appears nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None) if nonzero_idx is None: print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)") else: print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})") print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}") # Static-pink pixel content check if n_payload >= 12: cc = check_pixel_content(lane0_payload) match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes " f"match static-pink pattern") if cc["first_mismatch"]: mm = cc["first_mismatch"] match_str += (f" (first diff at offset {mm[0]}: " f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})") print(f"\n Static-pink check : {match_str}") pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None return { "cap_num" : cap_num, "hs_start_ns" : t_hs_start_ns, "hs_duration_us" : hs_duration_us, "n_bits" : len(bits), "n_bytes" : len(raw_bytes), "sync_idx" : sync_idx, "header" : header, "lane0_payload" : lane0_payload, "pixel_check" : pixel_check, } # --------------------------------------------------------------------------- # LP single-ended decode # --------------------------------------------------------------------------- def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): """ Full decode of an LP capture (CH1=CLK+, CH3=DAT0+) using single-ended thresholds. LP files are captured at 10 GSa/s (100 ps/sample, ~23 samples/bit at 432 Mbps) — sufficient resolution to decode the HS bit stream without a separate proto pass. Returns a dict with the same structure as decode_capture(). """ clk_path, dat_path = find_lp_files(cap_num, data_dir) if verbose: print(f"\n{'='*60}") print(f"Cap {cap_num:04d}: {dat_path.name} [LP single-ended]") print(f"{'='*60}") t_clk, v_clk = load_csv(clk_path) t_dat, v_dat = load_csv(dat_path) dt_ns = float(np.median(np.diff(t_dat))) * 1e9 if verbose: print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs " f"({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)") hs_start_idx = find_hs_start(t_dat, v_dat, t_clk, single_ended=True) if hs_start_idx is None: if verbose: print(" ERROR: Could not find HS burst start") return None t_hs_start_ns = t_dat[hs_start_idx] * 1e9 t_hs_end_ns = t_dat[-1] * 1e9 hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0 if verbose: print(f" HS burst start: {t_hs_start_ns:.0f} ns " f"({hs_duration_us:.1f} µs available of ~18 µs full burst)") bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, dat_thresh=LP_SE_DAT_THRESH_V, clk_thresh=LP_SE_CLK_THRESH_V) if verbose: print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") if len(bits) < 16: if verbose: print(" ERROR: Too few bits decoded") return None raw_bytes = None sync_idx = None best_phase = 0 best_sync = len(bits) for phase in range(8): rb = bits_to_bytes(bits[phase:]) si = find_sync_byte(rb) if si is not None and si < best_sync: best_sync = si best_phase = phase raw_bytes = rb sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) if sync_idx is None: if verbose: print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0") sync_idx = 0 else: if verbose: t_sync = raw_bytes[sync_idx][0] print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") data_bytes = raw_bytes[sync_idx + 1:] header = parse_long_packet_header([b for _, b in data_bytes[:8]]) if verbose and header: print(f"\n DSI Header (lane 0):") print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})") lane0_payload = [b for _, b in data_bytes[1:]] if verbose: n_payload = len(lane0_payload) n_pixels_partial = n_payload * N_LANES // (BPP // 8) print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)") if n_payload >= 16: hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64]) print(f" First 64 payload bytes: {hex_str}") if n_payload > 64: print(f" ...") nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None) if nonzero_idx is None: print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)") else: print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})") print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}") if n_payload >= 12: cc = check_pixel_content(lane0_payload) match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes " f"match static-pink pattern") if cc["first_mismatch"]: mm = cc["first_mismatch"] match_str += (f" (first diff at offset {mm[0]}: " f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})") print(f"\n Static-pink check : {match_str}") pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None return { "cap_num" : cap_num, "hs_start_ns" : t_hs_start_ns, "hs_duration_us" : hs_duration_us, "n_bits" : len(bits), "n_bytes" : len(raw_bytes), "sync_idx" : sync_idx, "header" : header, "lane0_payload" : lane0_payload, "pixel_check" : pixel_check, } # --------------------------------------------------------------------------- # Comparison # --------------------------------------------------------------------------- def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128): """ Decode both captures and report byte-level differences in the first n_bytes. """ print(f"\nComparing cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)") res_a = decode_capture(cap_a, data_dir, verbose=False) res_b = decode_capture(cap_b, data_dir, verbose=False) if res_a is None or res_b is None: print(" ERROR: Could not decode one or both captures") return pa = res_a["lane0_payload"][:n_bytes] pb = res_b["lane0_payload"][:n_bytes] n_compare = min(len(pa), len(pb), n_bytes) diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]] print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns") print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns") if not diffs: print(f"\n No differences in first {n_compare} bytes — data content matches.") else: print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:") print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}") for offset, ba, bb in diffs[:40]: pixel_group = offset * N_LANES // (BPP // 8) print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})") if len(diffs) > 40: print(f" ... ({len(diffs) - 40} more)") # Check for data-shift pattern: does one capture's data appear shifted in the other? if len(pa) > 8 and len(pb) > 8: pa_arr = np.array(pa[:n_compare], dtype=np.uint8) pb_arr = np.array(pb[:n_compare], dtype=np.uint8) xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(), pb_arr.astype(float) - pb_arr.mean(), mode="full") lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1) if lag != 0 and abs(lag) < n_compare // 2: print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures") else: print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)") def compare_lp_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128): """ Decode both LP captures and report byte-level differences in the first n_bytes. """ print(f"\nComparing LP cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)") res_a = decode_lp_capture(cap_a, data_dir, verbose=False) res_b = decode_lp_capture(cap_b, data_dir, verbose=False) if res_a is None or res_b is None: print(" ERROR: Could not decode one or both LP captures") return pa = res_a["lane0_payload"][:n_bytes] pb = res_b["lane0_payload"][:n_bytes] n_compare = min(len(pa), len(pb), n_bytes) diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]] print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns") print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns") if not diffs: print(f"\n No differences in first {n_compare} bytes — data content matches.") else: print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:") print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}") for offset, ba, bb in diffs[:40]: pixel_group = offset * N_LANES // (BPP // 8) print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})") if len(diffs) > 40: print(f" ... ({len(diffs) - 40} more)") if len(pa) > 8 and len(pb) > 8: pa_arr = np.array(pa[:n_compare], dtype=np.uint8) pb_arr = np.array(pb[:n_compare], dtype=np.uint8) xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(), pb_arr.astype(float) - pb_arr.mean(), mode="full") lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1) if lag != 0 and abs(lag) < n_compare // 2: print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures") else: print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)") # --------------------------------------------------------------------------- # Pixel content verification and anomaly analysis # --------------------------------------------------------------------------- def check_pixel_content(lane0_payload: list, n_check: int = 60) -> dict: """ Verify the first n_check Lane 0 payload bytes against the expected static-pink pattern STATIC_PINK_LANE0. Returns a dict: match_pct — percentage of bytes matching expected pattern n_mismatches — number of mismatching bytes in the checked window first_mismatch — (offset, expected_byte, actual_byte) or None n_checked — number of bytes examined """ check = lane0_payload[:n_check] if not check: return {"match_pct": None, "n_mismatches": 0, "first_mismatch": None, "n_checked": 0} mismatches = [ (i, STATIC_PINK_LANE0[i % 3], actual) for i, actual in enumerate(check) if actual != STATIC_PINK_LANE0[i % 3] ] return { "match_pct": round((1 - len(mismatches) / len(check)) * 100, 1), "n_mismatches": len(mismatches), "first_mismatch": mismatches[0] if mismatches else None, "n_checked": len(check), } def analyse_for_anomalies(result: dict | None) -> dict: """ Summarise bit-level anomalies from a decode_capture() result. Returns {"anomalous": bool, "flags": list[str]}. Checks: sync_byte_not_found — 0xB8 not found in any of 8 bit phases → HS burst may not have started properly sync_byte_late — 0xB8 found but at byte index > 5 → garbage precedes sync → possible byte misalignment unexpected_packet_type — DI data-type not in the expected set pixel_content_mismatch — Lane 0 payload < 90 % match to static-pink pattern """ if result is None: return {"anomalous": True, "flags": ["decode_failed"]} flags = [] sync_idx = result.get("sync_idx") if sync_idx is None: flags.append("sync_byte_not_found — HS burst may not have started") elif sync_idx > 5: flags.append( f"sync_byte_late (found at byte {sync_idx}, expected ≤ 5) — " f"possible byte misalignment" ) header = result.get("header") if header: dt = header.get("DT", -1) known = {DSI_DT_RGB888, 0x39, DSI_DT_HSYNC, DSI_DT_VSYNC, 0x31, 0x11, 0x29, 0x08, 0x09, 0x19} if dt not in known: flags.append(f"unexpected_packet_type DT=0x{dt:02X}") payload = result.get("lane0_payload", []) if len(payload) >= 12: cc = check_pixel_content(payload) if cc["match_pct"] is not None and cc["match_pct"] < 90.0: mm = cc["first_mismatch"] detail = ( f"first diff at byte {mm[0]}: got 0x{mm[2]:02X} expected 0x{mm[1]:02X}" if mm else "" ) flags.append( f"pixel_content_mismatch " f"({cc['match_pct']:.0f}% of {cc['n_checked']} bytes match; {detail})" ) return {"anomalous": bool(flags), "flags": flags} # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Decode DSI packet content from proto or LP captures") parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)") parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory") parser.add_argument("--compare", type=int, default=None, metavar="CAP_B", help="Compare --cap against CAP_B byte-by-byte") parser.add_argument("--lp" , action="store_true", help="Decode from LP single-ended files instead of proto differential files") parser.add_argument("--list" , action="store_true", help="List available captures") args = parser.parse_args() data_dir = Path(args.dir) if args.list: proto_files = sorted(data_dir.glob("*_proto_*_dat.csv")) proto_caps = sorted({int(f.stem.split("_")[-2]) for f in proto_files}) lp_files = sorted(data_dir.glob("*_lp_*_dat.csv")) lp_caps = sorted({int(f.stem.split("_")[-2]) for f in lp_files}) print(f"Available proto captures: {proto_caps}") print(f"Available LP captures: {lp_caps}") return if args.compare is not None: if args.lp: compare_lp_captures(args.cap, args.compare, data_dir) else: compare_captures(args.cap, args.compare, data_dir) else: if args.lp: result = decode_lp_capture(args.cap, data_dir, verbose=True) else: result = decode_capture(args.cap, data_dir, verbose=True) anomaly = analyse_for_anomalies(result) if anomaly["anomalous"]: print(f"\n*** BIT-LEVEL ANOMALIES: {', '.join(anomaly['flags'])} ***") else: print(f"\nNo bit-level anomalies detected (sync, packet type, pixel content all OK)") if __name__ == "__main__": main()