diff --git a/mipi_test.py b/mipi_test.py index af74bfa..33ee4a5 100644 --- a/mipi_test.py +++ b/mipi_test.py @@ -32,8 +32,11 @@ SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window SIG_POINTS = 500_000 # 500 k pts → ~25 GSa/s # Pass 2 — protocol/frame structure: shows LP↔HS transitions and burst envelope -PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window -PROTO_POINTS = 500_000 # 500 k pts → 50 MSa/s (enough to see burst structure) +# 1280×800 24bpp 4-lane: full HS burst ≈ 18 µs. 4 µs/div → 40 µs window captures +# the complete line payload for DSI packet decode. 500 k pts @ 40 µs = 80 ps/sample +# (~28 samples/bit at 430 Mbps) — adequate for bit-level decoding. +PROTO_SCALE = 4e-6 # 4 µs/div → 40 µs window (was 1 µs/div) +PROTO_POINTS = 500_000 # 500 k pts # Pass 3 — LP state capture: widens vertical range to show LP-11 (~1.2 V) # Channels reconfigured to 200 mV/div, offset +0.6 V → display spans −0.2 V to 1.4 V. @@ -353,13 +356,18 @@ def dual_capture(iteration): else: print(" SKIPPING SIG SAVE.") - # ── Pass 3: frame/protocol structure ────────────────────────────────── - print(" PASS 3: FRAME STRUCTURE...") + # ── Pass 3: frame/protocol structure (LP-triggered differential) ───────── + # Re-apply LP trigger so the LP-00 → HS transition lands near t=0 in F2. + # This gives a fixed byte-framing anchor: HS sync byte 0xB8 appears at + # t≈380 ns, followed by DI, WC, ECC, then the full pixel payload. + print(" PASS 3: FRAME STRUCTURE (LP-triggered differential)...") + _configure_for_lp() _set_timebase(PROTO_SCALE, PROTO_POINTS) if _arm_and_wait(): _save_pass("proto", iteration, ts) else: print(" SKIPPING PROTO SAVE.") + _restore_hs_config() # ── Fetch DSI register snapshot from device ─────────────────────────── # Display is still ON here; registers reflect the active pipeline state. diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index eb5ef3f..2c88ff7 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -59,7 +59,7 @@ SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window SIG_POINTS = 500_000 # Pass 2 — protocol/frame structure (HS differential, jitter/freq) -PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window +PROTO_SCALE = 4e-6 # 4 µs/div → 40 µs window (was 1 µs/div) PROTO_POINTS = 500_000 # Pass 3 — LP state capture (single-ended, widens vertical range to show LP-11) @@ -700,12 +700,17 @@ def dual_capture(iteration: int) -> str: else: print(" SKIPPING SIG SAVE.") - # ── Pass 3: frame/protocol structure ────────────────────────────────── - print(" PASS 3: FRAME STRUCTURE...") + # ── Pass 3: frame/protocol structure (LP-triggered differential) ───────── + # Re-apply LP trigger so the LP-00 → HS transition lands near t=0 in F2. + # This gives a fixed byte-framing anchor: HS sync byte 0xB8 appears at + # t≈380 ns, followed by DI, WC, ECC, then the full pixel payload. + print(" PASS 3: FRAME STRUCTURE (LP-triggered differential)...") + _configure_for_lp() try: _set_timebase(PROTO_SCALE, PROTO_POINTS) except Exception: print(" SKIPPING PROTO SAVE.") + _restore_hs_config() _fetch_registers(ts, iteration) try: _set_timebase(5e-9, 500_000) @@ -717,6 +722,7 @@ def dual_capture(iteration: int) -> str: _save_pass("proto", iteration, ts) else: print(" SKIPPING PROTO SAVE.") + _restore_hs_config() # ── DSI register snapshot ───────────────────────────────────────────── _fetch_registers(ts, iteration) diff --git a/proto_decoder.py b/proto_decoder.py new file mode 100644 index 0000000..0940d33 --- /dev/null +++ b/proto_decoder.py @@ -0,0 +1,440 @@ +#!/usr/bin/env python3 +""" +proto_decoder.py + +Decodes DSI packet content from proto (differential) captures. + +Usage: + python3 proto_decoder.py [--cap CAP_NUM] [--dir DATA_DIR] [--compare] + +The proto_*_clk and proto_*_dat captures are Ch1-Ch2 and Ch3-Ch4 differential +waveforms at ~50-80 ps/sample. CLK runs continuously at ~215 MHz (430 Mbps DDR). +DAT carries MIPI D-PHY HS data, sampled on both CLK edges. + +Decodes: + - DSI long packet header: DI (data type / virtual channel), word count, ECC + - First N payload bytes on lane 0 + - Compares two captures to spot differing byte positions (data-shift detection) +""" + +import argparse +import glob +import sys +from pathlib import Path + +import numpy as np + +DATA_DIR = Path(__file__).parent / "data" +DISPLAY_W = 1280 # pixels per line +DISPLAY_H = 800 +N_LANES = 4 +BPP = 24 # bits per pixel (RGB888) + +# Expected bytes per line on lane 0: +# payload = DISPLAY_W * (BPP//8) bytes total / N_LANES per lane +# header = 4 bytes total (DI, WC_L, WC_H, ECC), 1 per lane +# footer = 2 bytes total (CRC_L, CRC_H), distributed across first 2 lanes +PAYLOAD_BYTES_PER_LANE = (DISPLAY_W * (BPP // 8)) // N_LANES # 960 +HEADER_BYTES_PER_LANE = 1 # DI on lane 0 +FOOTER_BYTES_PER_LANE = 1 # CRC_L on lane 0 +TOTAL_LANE0_BYTES = HEADER_BYTES_PER_LANE + PAYLOAD_BYTES_PER_LANE + FOOTER_BYTES_PER_LANE + +# DSI data type for 24-bit packed pixel stream +DSI_DT_RGB888 = 0x3E +DSI_DT_HSYNC = 0x21 # short packet — H sync start +DSI_DT_VSYNC = 0x01 # short packet — V sync start + +# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes) +HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire) + +# Threshold for differential voltage: >0 = logic-1 (D+ > D-) +DAT_THRESH_V = 0.0 + + +# --------------------------------------------------------------------------- +# I/O +# --------------------------------------------------------------------------- + +def load_csv(path: Path): + data = np.genfromtxt(path, delimiter=",") + return data[:, 0], data[:, 1] + + +def find_proto_files(cap_num: int, data_dir: Path): + pattern_clk = str(data_dir / f"*_proto_{cap_num:04d}_clk.csv") + pattern_dat = str(data_dir / f"*_proto_{cap_num:04d}_dat.csv") + clk_files = sorted(glob.glob(pattern_clk)) + dat_files = sorted(glob.glob(pattern_dat)) + if not clk_files: + raise FileNotFoundError(f"No proto CLK file found for cap {cap_num:04d} in {data_dir}") + if not dat_files: + raise FileNotFoundError(f"No proto DAT file found for cap {cap_num:04d} in {data_dir}") + return Path(clk_files[-1]), Path(dat_files[-1]) + + +# --------------------------------------------------------------------------- +# Clock edge detection +# --------------------------------------------------------------------------- + +def find_clock_edges(t_clk, v_clk, threshold=0.0): + """ + Return arrays of (rising_indices, falling_indices) in the CLK trace. + Filters out glitches: only keeps transitions separated by at least 1 ns. + """ + dt_ns = float(np.median(np.diff(t_clk))) * 1e9 + min_gap = max(1, int(1.0 / dt_ns)) # ~1 ns minimum between edges + + crossings = np.where(np.diff((v_clk > threshold).astype(int)))[0] + if len(crossings) < 2: + return np.array([], dtype=int), np.array([], dtype=int) + + # Filter: keep only crossings separated by > min_gap samples + keep = np.concatenate(([True], np.diff(crossings) > min_gap)) + crossings = crossings[keep] + + level = (v_clk > threshold).astype(int) + rising = crossings[np.diff(level)[crossings] > 0] + falling = crossings[np.diff(level)[crossings] < 0] + return rising, falling + + +# --------------------------------------------------------------------------- +# HS burst detection +# --------------------------------------------------------------------------- + +def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): + """ + Find the start of the main HS burst in the DAT trace. + + The proto capture often starts mid-HS (previous packet), so we: + 1. Find the LP quiet region (both LP-11 and LP-low have low differential std) + 2. Find the first sustained oscillation AFTER that quiet region + + Returns: index into t_dat of approximate HS burst start, or None. + """ + dt_ns = float(np.median(np.diff(t_dat))) * 1e9 + win = max(1, int(1.0 / dt_ns)) # 1 ns rolling window + min_run = max(5, int(5.0 / dt_ns)) # at least 5 ns continuous + + rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(len(v_dat))]) + OSC_THRESH = 0.04 # 40 mV — HS oscillation + QUIET_THRESH = 0.02 # 20 mV — LP quiet (LP-11 differential ≈ 0, low std) + + # Step 1: find a quiet (LP) region of at least 200 ns + quiet_min_run = max(5, int(200.0 / dt_ns)) + quiet_end = None + run_len = 0 + for i, std_val in enumerate(rstd): + if std_val < QUIET_THRESH: + run_len += 1 + if run_len >= quiet_min_run: + quiet_end = i + else: + run_len = 0 + + if quiet_end is None: + return None # no LP region found + + # Step 2: find first sustained oscillation after the LP region + run_start = None + run_len = 0 + for i in range(quiet_end, len(rstd)): + if rstd[i] >= OSC_THRESH: + if run_start is None: + run_start = i + run_len += 1 + if run_len >= min_run: + return run_start + else: + run_start = None + run_len = 0 + return None + + +# --------------------------------------------------------------------------- +# Bit decoding +# --------------------------------------------------------------------------- + +def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx): + """ + Sample DAT on every CLK edge (DDR) after hs_start_idx. + Returns list of (time_ns, bit) tuples. + """ + t_hs = t_dat[hs_start_idx] + + rising, falling = find_clock_edges(t_clk, v_clk) + all_edges = np.sort(np.concatenate([rising, falling])) + + # Only edges after HS start + hs_mask = t_clk[all_edges] >= t_hs + hs_edges = all_edges[hs_mask] + + if len(hs_edges) == 0: + return [] + + dt_dat = float(np.median(np.diff(t_dat))) * 1e9 + + bits = [] + for edge_idx in hs_edges: + t_edge = t_clk[edge_idx] + # Find nearest sample in DAT trace + dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9))) + dat_idx = max(0, min(dat_idx, len(v_dat) - 1)) + bit = 1 if v_dat[dat_idx] > DAT_THRESH_V else 0 + bits.append((t_edge * 1e9, bit)) + + return bits + + +# --------------------------------------------------------------------------- +# Byte reconstruction +# --------------------------------------------------------------------------- + +def bits_to_bytes(bits): + """ + Pack bits into bytes (LSB first, as MIPI D-PHY transmits). + Returns list of (time_ns_of_first_bit, byte_value). + """ + result = [] + for i in range(0, len(bits) - 7, 8): + byte_bits = [b for _, b in bits[i:i + 8]] + val = sum(byte_bits[j] << j for j in range(8)) + result.append((bits[i][0], val)) + return result + + +# --------------------------------------------------------------------------- +# DSI sync byte search and frame alignment +# --------------------------------------------------------------------------- + +def find_sync_byte(raw_bytes): + """ + Search for the MIPI D-PHY HS sync byte (0xB8) in the decoded byte stream. + The sync byte precedes all data bytes in each HS burst. + Returns index into raw_bytes of the sync byte, or None. + """ + for i, (_, byte_val) in enumerate(raw_bytes): + if byte_val == HS_SYNC_BYTE: + return i + return None + + +def parse_long_packet_header(payload_bytes): + """ + Parse a DSI long packet header from lane-0 perspective. + Lane 0 carries: [DI, then payload bytes 0, 4, 8, ...] + The WC and ECC bytes are on lanes 1-3 (not captured here). + + Returns dict with DI interpretation. + """ + if not payload_bytes: + return None + + di = payload_bytes[0] + vc = (di >> 6) & 0x03 + dt = di & 0x3F + + dt_name = { + 0x01: "VSS (V-Sync Start)", + 0x11: "VSE (V-Sync End)", + 0x21: "HSS (H-Sync Start)", + 0x31: "HSE (H-Sync End)", + 0x08: "EOT (End of Transmission)", + 0x39: "RGB888 (long packet, 24bpp)", + 0x3E: "Packed RGB888 (long packet, 24bpp)", + 0x29: "Generic long write", + }.get(dt, f"unknown (0x{dt:02X})") + + return { + "DI_raw": di, + "VC" : vc, + "DT" : dt, + "DT_name": dt_name, + } + + +# --------------------------------------------------------------------------- +# Main decode function +# --------------------------------------------------------------------------- + +def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): + """ + Full decode of a proto capture. Returns dict with results. + """ + clk_path, dat_path = find_proto_files(cap_num, data_dir) + + if verbose: + print(f"\n{'='*60}") + print(f"Cap {cap_num:04d}: {dat_path.name}") + print(f"{'='*60}") + + t_clk, v_clk = load_csv(clk_path) + t_dat, v_dat = load_csv(dat_path) + dt_ns = float(np.median(np.diff(t_dat))) * 1e9 + + if verbose: + print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns:.0f} ps/sample)") + + # Find HS burst start + hs_start_idx = find_hs_start(t_dat, v_dat) + if hs_start_idx is None: + if verbose: + print(" ERROR: Could not find HS burst start") + return None + + t_hs_start_ns = t_dat[hs_start_idx] * 1e9 + t_hs_end_ns = t_dat[-1] * 1e9 + hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0 + + if verbose: + print(f" HS burst start: {t_hs_start_ns:.0f} ns ({hs_duration_us:.1f} µs available of ~18 µs full burst)") + + # Decode bits + bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx) + if verbose: + print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") + + if len(bits) < 16: + if verbose: + print(" ERROR: Too few bits decoded") + return None + + raw_bytes = bits_to_bytes(bits) + + # Find sync byte alignment + sync_idx = find_sync_byte(raw_bytes) + if sync_idx is None: + if verbose: + print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found — using raw byte 0 as start") + sync_idx = 0 + else: + if verbose: + t_sync = raw_bytes[sync_idx][0] + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns)") + + # Data bytes after sync + data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself + + # Parse header + header = parse_long_packet_header([b for _, b in data_bytes[:8]]) + + if verbose and header: + print(f"\n DSI Header (lane 0):") + print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})") + + # Payload bytes on lane 0 (every byte after header DI) + # Lane 0 payload: bytes 0, 4, 8, ... of the full pixel stream + # For RGB888: R0, G1, B2, R3, G4, B5, ... + lane0_payload = [b for _, b in data_bytes[1:]] # skip DI + + if verbose: + n_payload = len(lane0_payload) + n_pixels_partial = n_payload * N_LANES // (BPP // 8) + print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)") + + if n_payload >= 16: + hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64]) + print(f" First 64 payload bytes: {hex_str}") + if n_payload > 64: + print(f" ...") + + # Check for non-zero content and where it first appears + nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None) + if nonzero_idx is None: + print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)") + else: + print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})") + print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}") + + return { + "cap_num" : cap_num, + "hs_start_ns" : t_hs_start_ns, + "hs_duration_us" : hs_duration_us, + "n_bits" : len(bits), + "n_bytes" : len(raw_bytes), + "sync_idx" : sync_idx, + "header" : header, + "lane0_payload" : lane0_payload, + } + + +# --------------------------------------------------------------------------- +# Comparison +# --------------------------------------------------------------------------- + +def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128): + """ + Decode both captures and report byte-level differences in the first n_bytes. + """ + print(f"\nComparing cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)") + + res_a = decode_capture(cap_a, data_dir, verbose=False) + res_b = decode_capture(cap_b, data_dir, verbose=False) + + if res_a is None or res_b is None: + print(" ERROR: Could not decode one or both captures") + return + + pa = res_a["lane0_payload"][:n_bytes] + pb = res_b["lane0_payload"][:n_bytes] + + n_compare = min(len(pa), len(pb), n_bytes) + diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]] + + print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns") + print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns") + + if not diffs: + print(f"\n No differences in first {n_compare} bytes — data content matches.") + else: + print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:") + print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}") + for offset, ba, bb in diffs[:40]: + pixel_group = offset * N_LANES // (BPP // 8) + print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})") + if len(diffs) > 40: + print(f" ... ({len(diffs) - 40} more)") + + # Check for data-shift pattern: does one capture's data appear shifted in the other? + if len(pa) > 8 and len(pb) > 8: + pa_arr = np.array(pa[:n_compare], dtype=np.uint8) + pb_arr = np.array(pb[:n_compare], dtype=np.uint8) + xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(), + pb_arr.astype(float) - pb_arr.mean(), mode="full") + lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1) + if lag != 0 and abs(lag) < n_compare // 2: + print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures") + else: + print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Decode DSI packet content from proto captures") + parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)") + parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory") + parser.add_argument("--compare", type=int, default=None, + metavar="CAP_B", + help="Compare --cap against CAP_B byte-by-byte") + parser.add_argument("--list" , action="store_true", help="List available proto captures") + args = parser.parse_args() + + data_dir = Path(args.dir) + + if args.list: + files = sorted(data_dir.glob("*_proto_*_dat.csv")) + caps = sorted({int(f.stem.split("_")[-2]) for f in files}) + print(f"Available proto captures: {caps}") + return + + if args.compare is not None: + compare_captures(args.cap, args.compare, data_dir) + else: + decode_capture(args.cap, data_dir, verbose=True) + + +if __name__ == "__main__": + main()