diff --git a/__pycache__/proto_decoder.cpython-312.pyc b/__pycache__/proto_decoder.cpython-312.pyc index c402fba..70d1b59 100644 Binary files a/__pycache__/proto_decoder.cpython-312.pyc and b/__pycache__/proto_decoder.cpython-312.pyc differ diff --git a/flicker_watch.py b/flicker_watch.py new file mode 100644 index 0000000..7f87d71 --- /dev/null +++ b/flicker_watch.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python3 +""" +flicker_watch.py — Continuous LP capture during video on/off cycles. + +Operator watches the display. Script keeps cycling the video stream on/off +and triggering LP captures in the background. Files accumulate on the scope +without being transferred (fast). + +Keys (no Enter needed): + f — flicker observed: transfer + archive + analyse recent captures + g — good baseline: transfer + archive recent captures (no analysis) + q — quit + +Captures are organised under data/flicker/{event_ts}/ or data/good/{event_ts}/. +""" + +import json +import select +import shutil +import sys +import termios +import time +import tty +from datetime import datetime +from pathlib import Path + +import requests +import vxi11 + +import ai_mgmt +from csv_preprocessor import analyze_lp_file + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- +SCOPE_IP = "192.168.45.4" +DEVICE_BASE = "http://192.168.45.8:5000" +VIDEO_URL = f"{DEVICE_BASE}/video" + +DATA_DIR = Path(__file__).parent / "data" +FLICKER_DIR = DATA_DIR / "flicker" +GOOD_DIR = DATA_DIR / "good" + +# LP capture parameters (matched to mipi_test_interactive.py) +LP_SCALE = 1e-6 # 1 µs/div → 20 µs window +LP_POINTS = 200_000 +LP_TRIG_OFFSET = 9e-6 # 1 µs pre / 19 µs post-trigger +LP_V_SCALE = 0.2 +LP_V_OFFSET = 0.6 +LP_TRIG_LEVEL = 0.6 + +CYCLE_S = 10.0 # seconds video is on per cycle +TRIG_TIMEOUT_S = 2.0 # per-capture trigger wait + +# --------------------------------------------------------------------------- +# Scope setup +# --------------------------------------------------------------------------- +scope = vxi11.Instrument(SCOPE_IP) +scope.timeout = 30 + + +def setup_scope() -> None: + """One-shot scope init — channels, math, default trigger.""" + print("CONFIGURING SCOPE...") + cmds = [ + "*RST", ":RUN", ":STOP", + ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", + ":CHANnel1:LABel 'CLK+'", + ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", + ":CHANnel2:LABel 'CLK-'", + ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2", + ":CHANnel3:LABel 'DAT0+'", + ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2", + ":CHANnel4:LABel 'DAT0-'", + ":TIMebase:REFerence CENTer", + ":TRIGger:MODE EDGE", + ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", + ":DISPlay:LAYout STACKED", + ] + for c in cmds: + scope.write(c) + time.sleep(0.05) + print("SCOPE READY.") + + +def configure_for_lp() -> None: + """LP-mode: widen vertical range, falling-edge trigger on Ch3.""" + for ch in (1, 2, 3, 4): + scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") + scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") + scope.write(":TRIGger:EDGE:SOURce CHANnel3") + scope.write(":TRIGger:EDGE:SLOPe NEGative") + scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") + scope.write(":TRIGger:SWEep NORMal") + scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}") + scope.write(f":ACQuire:POINts {LP_POINTS}") + scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") + time.sleep(0.3) + + +def arm_and_wait(timeout_s: float) -> bool: + """:DIGitize + *OPC?. Returns True if trigger fired within timeout.""" + global scope + prev = scope.timeout + try: + scope.timeout = timeout_s + 2 + scope.write(":DIGitize") + return scope.ask("*OPC?").strip() == "1" + except Exception: + # Trigger timed out or scope locked up — reconnect. + try: + scope.close() + except Exception: + pass + time.sleep(1.0) + scope = vxi11.Instrument(SCOPE_IP) + scope.timeout = 30 + try: + scope.write(":STOP") + except Exception: + pass + return False + finally: + try: + scope.timeout = prev + except Exception: + pass + + +def save_lp(base_name: str) -> None: + """Save Ch1 (CLK+) and Ch3 (DAT0+) as CSV to scope's C:\\TEMP\\.""" + base = f"C:\\TEMP\\{base_name}" + scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') + time.sleep(2.5) + scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') + time.sleep(2.5) + + +# --------------------------------------------------------------------------- +# Non-blocking keyboard +# --------------------------------------------------------------------------- +class KeyReader: + def __enter__(self): + self.fd = sys.stdin.fileno() + self.old = termios.tcgetattr(self.fd) + tty.setcbreak(self.fd) + return self + + def get_key(self) -> str | None: + if select.select([sys.stdin], [], [], 0)[0]: + return sys.stdin.read(1).lower() + return None + + def __exit__(self, *_): + termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) + + +# --------------------------------------------------------------------------- +# Video control +# --------------------------------------------------------------------------- +def video_start() -> None: + try: + requests.put(VIDEO_URL, + json={"action": "start", "mode": "static-pink"}, + timeout=3) + except requests.exceptions.RequestException as e: + print(f" VIDEO START failed: {e}") + + +def video_stop() -> None: + try: + requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3) + except requests.exceptions.RequestException as e: + print(f" VIDEO STOP failed: {e}") + + +# --------------------------------------------------------------------------- +# Register snapshot from device (DSIM PHY + SN65DSI83) +# --------------------------------------------------------------------------- +def fetch_registers_snapshot(target_dir: Path, event_ts: str) -> None: + """GET /registers + /sn65_registers, print key indicators, save JSON.""" + combined: dict = {} + for endpoint, key in [("/registers", "dsim"), + ("/sn65_registers", "sn65")]: + try: + r = requests.get(f"{DEVICE_BASE}{endpoint}", timeout=5) + r.raise_for_status() + combined[key] = r.json() + except Exception as e: + print(f" REGISTERS: {endpoint} failed — {e}") + combined[key] = None + + # Quick-look indicators + sn65 = combined.get("sn65") or {} + regs = sn65.get("registers", {}) if isinstance(sn65, dict) else {} + csr_0a = regs.get("csr_0a", {}) or {} + csr_e5 = regs.get("csr_e5", {}) or {} + + if csr_0a: + pll_str = "LOCKED" if csr_0a.get("pll_lock") else "*** UNLOCKED ***" + clk_str = "detected" if csr_0a.get("clk_det") else "NOT detected" + print(f" SN65: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a.get('value')})") + + if csr_e5: + flags = [ + ("pll_unlock", "PLL_UNLOCK"), + ("cha_sot_bit_err", "SOT_BIT_ERR"), + ("cha_llp_err", "LLP_ERR"), + ("cha_ecc_err", "ECC_ERR"), + ("cha_lp_err", "LP_ERR"), + ("cha_crc_err", "CRC_ERR"), + ] + active = [label for k, label in flags if csr_e5.get(k)] + if active: + print(f" SN65: *** ERROR FLAGS: {', '.join(active)} " + f"(CSR 0xE5 = {csr_e5.get('value')}) ***") + else: + print(f" SN65: no error flags (CSR 0xE5 = {csr_e5.get('value')})") + + out = target_dir / f"{event_ts}_registers.json" + try: + out.write_text(json.dumps(combined, indent=2)) + print(f" registers → {out.relative_to(DATA_DIR.parent)}") + except Exception as e: + print(f" REGISTERS save failed: {e}") + + +# --------------------------------------------------------------------------- +# Event handling: archive recent captures and (for flicker) analyse +# --------------------------------------------------------------------------- +def archive_and_analyse(event: str, since_iso: str) -> None: + """ + Pull every CSV from the scope, move into data/{event}/{event_ts}/. + For flicker events, run csv_preprocessor on each LP capture and print a + summary table. Always pulls a register snapshot from the device too. + """ + event_ts = datetime.now().strftime("%Y%m%d_%H%M%S") + target = (FLICKER_DIR if event == "flicker" else GOOD_DIR) / event_ts + target.mkdir(parents=True, exist_ok=True) + + print(f"\n *** {event.upper()} EVENT @ {event_ts} ***") + + # Register snapshot first (fast, before scope transfer which takes longer) + fetch_registers_snapshot(target, event_ts) + + print(f" Transferring scope → {target} ...") + try: + copied, failed = ai_mgmt.transfer_csv_files() + except Exception as e: + print(f" TRANSFER ERROR: {e}") + return + print(f" {copied} file(s) transferred ({failed} failed)") + + # Move just-arrived CSVs out of data/ (flat) into the event folder. + moved = 0 + for csv in DATA_DIR.glob("*.csv"): + if csv.is_file(): + shutil.move(str(csv), target / csv.name) + moved += 1 + print(f" {moved} file(s) archived to {target.relative_to(DATA_DIR.parent)}") + + if event != "flicker": + return + + # Analyse the LP captures we just archived. + print("\n LP analysis (csv_preprocessor):") + print(" " + "-" * 78) + print(f" {'file':<46} {'lp_low_ns':>10} {'hs_amp_mV':>10} {'flicker?':>9}") + print(" " + "-" * 78) + + lp_files = sorted(target.glob("*_lp_*_dat.csv")) + for f in lp_files: + try: + m = analyze_lp_file(f) + lp_low = getattr(m, "lp_low_duration_ns", None) + hs_amp = getattr(m, "hs_amp_mV", None) + sus = getattr(m, "flicker_suspect", False) + print(f" {f.name:<46} " + f"{(f'{lp_low:.1f}' if lp_low is not None else '?'):>10} " + f"{(f'{hs_amp:.1f}' if hs_amp is not None else '?'):>10} " + f"{('YES' if sus else 'no'):>9}") + except Exception as e: + print(f" {f.name:<46} ERROR: {e}") + print(" " + "-" * 78) + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- +def main() -> None: + DATA_DIR.mkdir(exist_ok=True) + FLICKER_DIR.mkdir(exist_ok=True) + GOOD_DIR.mkdir(exist_ok=True) + + setup_scope() + configure_for_lp() + + print("\n" + "=" * 64) + print(" FLICKER WATCH — keys: f=flicker g=good q=quit") + print("=" * 64 + "\n") + + cycle = 0 + try: + with KeyReader() as keys: + while True: + cycle += 1 + cycle_ts = datetime.now().strftime("%Y%m%d_%H%M%S") + cycle_caps = [] + cycle_end = time.time() + CYCLE_S + + video_start() + print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON " + f"({CYCLE_S:.0f}s window)", flush=True) + + event = None + last_tick = 0.0 + while time.time() < cycle_end: + seq = len(cycle_caps) + 1 + base = f"{cycle_ts}_lp_c{cycle:03d}_{seq:02d}" + remaining = lambda: max(0, cycle_end - time.time()) + + if arm_and_wait(TRIG_TIMEOUT_S): + try: + save_lp(base) + cycle_caps.append(base) + print(f" + cap {seq:02d} [{remaining():4.1f}s left]", + flush=True) + except Exception as e: + print(f" save error: {e}", flush=True) + else: + # Trigger timed out — print a heartbeat at most every 2s + if time.time() - last_tick > 2.0: + print(f" ... waiting for trigger " + f"[{remaining():4.1f}s left]", flush=True) + last_tick = time.time() + + key = keys.get_key() + if key in ("f", "g", "q"): + event = key + break + + video_stop() + if event is None: + print(f"[cycle {cycle:03d}] ended " + f"({len(cycle_caps)} cap(s), no event)", + flush=True) + + if event == "f": + archive_and_analyse("flicker", cycle_ts) + elif event == "g": + archive_and_analyse("good", cycle_ts) + elif event == "q": + print("\nQUIT requested.") + break + + # Brief pause before next cycle so video stop settles. + time.sleep(0.5) + + except KeyboardInterrupt: + print("\nInterrupted (Ctrl+C).") + finally: + try: + video_stop() + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index 59a4f3f..2a44acc 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -1535,37 +1535,30 @@ def run_interactive_test() -> None: def run_continuous_test() -> None: """ - Continuous LP capture loop — no kiosk restart between iterations. + Continuous LP capture loop — pipeline restart per iteration. - Designed for periodic flicker that repeats roughly every second once the - display pipeline has started. The kiosk is started once; the scope - re-arms on the NORMAL LP trigger (VBLANK LP-11 → LP-01 falling edge on - Ch3) after each capture, effectively sampling one random display frame - every ~7 s. + The pipeline (kiosk) is stopped and restarted on every iteration so the + scope captures the startup LP-11→LP-01 transition that triggers the flicker. + The scope is configured and armed BEFORE _start_video() is called so that + the first HS burst after pipeline load is always captured. - With flicker on ~1/60 frames the expected time to first catch is - ~60 × 7 s ≈ 7 minutes of unattended running. + Sequence per iteration: + 1. _stop_video() — tear down pipeline + 2. _configure_for_lp() — set scope channels + trigger (takes ~400 ms) + 3. _start_video() — reload pipeline (LP transition fires ~1-2 s later) + 4. _arm_and_wait() — scope captures first LP-11→LP-01 on Ch3 + 5. Transfer + LP analysis + 6. If suspect: LP bit decode + byte comparison vs last clean capture - When the LP rule-based detector flags a suspect: - • The LP file already on disk (10 GSa/s, 100 ps/sample) is decoded - directly using single-ended CLK+/DAT0+ thresholds — no extra capture. - • proto_decoder checks the HS-SYNC byte position (misalignment) and the - Lane 0 pixel content (corruption). - • compare_lp_captures() shows byte-level diffs vs the last clean capture. - - Press Ctrl+C to stop. No report is written (raw LP/proto CSVs are kept). + Press Ctrl+C to stop. No HTML report is written; raw LP CSVs are kept in data/. """ import proto_decoder as _pd print("\n===== CONTINUOUS CAPTURE MODE =====") - print("Kiosk starts once. Scope re-arms on each VBLANK trigger (no restart).") - print("LP-only per iteration; LP bit decode fires directly on LP suspect files.") + print("Pipeline restart per iteration — captures startup LP transition.") + print("LP bit decode fires automatically on flicker suspects.") print("Press Ctrl+C to stop.\n") - _start_video() - print("Waiting 5 s for display pipeline to stabilise...") - time.sleep(5.0) - iteration = 1 clean_count = 0 flicker_count = 0 @@ -1575,11 +1568,20 @@ def run_continuous_test() -> None: while True: ts = datetime.now().strftime("%Y%m%d_%H%M%S") - # ── LP capture ────────────────────────────────────────────────── + # ── Stop pipeline, configure scope, then restart pipeline ───────── + _stop_video() + time.sleep(0.3) + + # Configure scope while pipeline is down — scope will be ready before + # the first LP edge fires after _start_video(). _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") - ok = _arm_and_wait(timeout=5) + + _start_video() + + # ── LP capture on startup transition ───────────────────────────── + ok = _arm_and_wait(timeout=10) scope.write(":TIMebase:POSition 0") _restore_hs_config() @@ -1598,7 +1600,7 @@ def run_continuous_test() -> None: iteration += 1 continue - # ── LP analysis (quiet) ────────────────────────────────────────── + # ── LP analysis ────────────────────────────────────────────────── lp_summaries, suspects = _analyze_lp_files(ts, iteration) if not suspects: @@ -1662,7 +1664,7 @@ def main_menu() -> None: print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") print("5. START INTERACTIVE FLICKER TEST (kiosk restart per iteration)") - print("6. START CONTINUOUS CAPTURE TEST (no restart; LP bit decode on flicker)") + print("6. START CONTINUOUS CAPTURE TEST (no restart; proto decode on flicker)") print("7. EXIT") choice = input("\nSELECT OPTION (1-7): ").strip() diff --git a/proto_decoder.py b/proto_decoder.py index 1f9a7ef..6168d7f 100644 --- a/proto_decoder.py +++ b/proto_decoder.py @@ -44,6 +44,9 @@ DSI_DT_RGB888 = 0x3E DSI_DT_HSYNC = 0x21 # short packet — H sync start DSI_DT_VSYNC = 0x01 # short packet — V sync start +# Known-valid DSI data types used in sync-byte validation (VC=0 + DT in this set) +VALID_DSI_DT = {0x01, 0x11, 0x21, 0x31, 0x08, 0x09, 0x19, 0x29, 0x39, 0x3E} + # MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes) HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire) @@ -149,23 +152,70 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False) N = len(v_dat) # --- Single-ended LP path --- + # LP-01 + LP-00 + HS-PREPARE + HS-ZERO form a continuous "LP-low" region where + # DAT+ < 0.25 V and rolling std < 45 mV. The LP-low region ends when the first + # '1' bit transition in 0xB8 causes rolling std > 45 mV. Start bit decoding a + # few bits BEFORE that spike so the phase search can find complete 0xB8 near byte 0. if single_ended: - min_lp01 = max(2, int(20.0 / dt_ns)) - run = 0 - lp01_end = None - for i in range(N): - if v_dat[i] < LP_SE_LP01_THRESH_V: - run += 1 - else: - if run >= min_lp01: - lp01_end = i - break - run = 0 + LP11_THRESH_SE = 0.8 # V — LP-11 state (DAT+ high) + LP_LOW_V_SE = 0.25 # V — LP-01/LP-00/HS-ZERO are all below this + HS_STD_V_SE = 0.045 # V — rolling std above this → first HS data bit + LP_LOW_MIN_NS = 5.0 # ns — ignore LP-low runs shorter than this + LP_MARGIN_NS = 25.0 # ns — start decode this far before first data bit - if lp01_end is not None: - skip = max(1, int(50.0 / dt_ns)) - return min(lp01_end + skip, N - 1) - return None + win_samples = max(10, int(1.0 / dt_ns)) + try: + from numpy.lib.stride_tricks import sliding_window_view + rstd = np.zeros(N) + wins = sliding_window_view(v_dat, win_samples) + rstd[win_samples - 1:win_samples - 1 + len(wins)] = wins.std(axis=-1) + except Exception: + rstd = np.array([v_dat[max(0, i - win_samples):i + 1].std() for i in range(N)]) + + # Find LP-11 end (first sample below LP11_THRESH_SE after LP-11) + lp11_end_idx = None + in_lp11 = False + for i in range(N): + if v_dat[i] > LP11_THRESH_SE: + in_lp11 = True + elif in_lp11: + lp11_end_idx = i + break + if lp11_end_idx is None: + return None + + search_end = min(lp11_end_idx + int(2000.0 / dt_ns), N) + + # Find LP-low plateau start: first sustained block of v < LP_LOW_V_SE + # AND rstd < HS_STD_V_SE (the LP-11 fall edge has high rstd so we skip it). + min_lp_run = max(5, int(LP_LOW_MIN_NS / dt_ns)) + lp_low_start = None + run = 0 + for i in range(lp11_end_idx, search_end): + if v_dat[i] < LP_LOW_V_SE and rstd[i] < HS_STD_V_SE: + run += 1 + if run >= min_lp_run: + lp_low_start = i - run + 1 + break + else: + run = 0 + if lp_low_start is None: + return min(lp11_end_idx + max(1, int(50.0 / dt_ns)), N - 1) + + # Find LP-low plateau end: first rstd > HS_STD_V_SE after the plateau begins. + # This is where the first '1' bit in 0xB8 creates a large voltage transition. + lp_low_end = None + for i in range(lp_low_start, search_end): + if rstd[i] > HS_STD_V_SE: + lp_low_end = i + break + if lp_low_end is None: + return min(lp_low_start + max(1, int(50.0 / dt_ns)), N - 1) + + # Start decode LP_MARGIN_NS before the first '1' bit of 0xB8 so the 8-phase + # search sees the complete sync byte near byte 0. + margin = max(1, int(LP_MARGIN_NS / dt_ns)) + return max(lp_low_start, lp_low_end - margin) # --- Differential LP-triggered path --- # LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns) @@ -379,21 +429,37 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(" ERROR: Too few bits decoded") return None - # Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges. - # LP-00 CLK edges before HS starts produce garbage bits; the correct phase is - # the one where 0xB8 appears earliest in the byte stream. - raw_bytes = None - sync_idx = None + # Try all 8 bit-phase offsets. Pass 1: find earliest 0xB8 whose next byte has + # VC=0 and a known DSI DT (validated sync). Pass 2 fallback: earliest bare 0xB8. + raw_bytes = None + sync_idx = None best_phase = 0 - best_sync = len(bits) # sentinel: "not found" + best_sync = len(bits) + validated = False + for phase in range(8): rb = bits_to_bytes(bits[phase:]) - si = find_sync_byte(rb) - if si is not None and si < best_sync: - best_sync = si - best_phase = phase - raw_bytes = rb - sync_idx = si + for i in range(len(rb) - 1): + if rb[i][1] == HS_SYNC_BYTE: + next_byte = rb[i + 1][1] + if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT: + if i < best_sync: + best_sync = i + best_phase = phase + raw_bytes = rb + sync_idx = i + validated = True + break # stop at first validated pair for this phase + + if not validated: + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) @@ -405,7 +471,8 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): else: if verbose: t_sync = raw_bytes[sync_idx][0] - print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") + qual = "validated" if validated else "bare" + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})") # Data bytes after sync data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself @@ -507,8 +574,19 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(f" HS burst start: {t_hs_start_ns:.0f} ns " f"({hs_duration_us:.1f} µs available of ~18 µs full burst)") + # Auto-detect HS common mode from the first 200 ns of the HS burst. + # CLK+ common mode (~217 mV) and DAT+ common mode (~104 mV on this board) differ; + # hard-coding one value for DAT+ breaks the decode. The median of the HS burst + # gives the correct bit threshold for any board without manual calibration. + hs_probe_end = min(hs_start_idx + max(1, int(200.0 / dt_ns)), len(v_dat)) + dat_common_mode = float(np.median(v_dat[hs_start_idx:hs_probe_end])) + dat_common_mode = max(0.030, min(0.250, dat_common_mode)) # clamp to 30–250 mV + + if verbose: + print(f" DAT+ HS common mode: {dat_common_mode*1000:.0f} mV (auto-detected, used as bit threshold)") + bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, - dat_thresh=LP_SE_DAT_THRESH_V, clk_thresh=LP_SE_CLK_THRESH_V) + dat_thresh=dat_common_mode, clk_thresh=LP_SE_CLK_THRESH_V) if verbose: print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") @@ -518,18 +596,35 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(" ERROR: Too few bits decoded") return None - raw_bytes = None - sync_idx = None + raw_bytes = None + sync_idx = None best_phase = 0 best_sync = len(bits) + validated = False + for phase in range(8): rb = bits_to_bytes(bits[phase:]) - si = find_sync_byte(rb) - if si is not None and si < best_sync: - best_sync = si - best_phase = phase - raw_bytes = rb - sync_idx = si + for i in range(len(rb) - 1): + if rb[i][1] == HS_SYNC_BYTE: + next_byte = rb[i + 1][1] + if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT: + if i < best_sync: + best_sync = i + best_phase = phase + raw_bytes = rb + sync_idx = i + validated = True + break # stop at first validated pair for this phase + + if not validated: + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) @@ -541,7 +636,8 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): else: if verbose: t_sync = raw_bytes[sync_idx][0] - print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") + qual = "validated" if validated else "bare" + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})") data_bytes = raw_bytes[sync_idx + 1:] header = parse_long_packet_header([b for _, b in data_bytes[:8]])