diff --git a/__pycache__/proto_decoder.cpython-312.pyc b/__pycache__/proto_decoder.cpython-312.pyc new file mode 100644 index 0000000..c32e0cf Binary files /dev/null and b/__pycache__/proto_decoder.cpython-312.pyc differ diff --git a/device_server.py b/device_server.py index 11d3b00..f5b6642 100644 --- a/device_server.py +++ b/device_server.py @@ -212,16 +212,20 @@ def control_video(): if _video_proc is not None and _video_proc.poll() is None: return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200 try: + cmd = ["python3", KIOSK_SCRIPT] + mode = data.get("mode", "") + if mode == "static-pink": + cmd.append("--static-pink") log = open("/tmp/kiosk.log", "w") _video_proc = subprocess.Popen( - ["python3", KIOSK_SCRIPT], + cmd, stdout=log, stderr=subprocess.STDOUT, env=os.environ.copy(), ) except Exception as e: return jsonify({"error": f"failed to launch kiosk: {e}"}), 500 - return jsonify({"status": "started", "pid": _video_proc.pid}), 200 + return jsonify({"status": "started", "mode": mode or "video", "pid": _video_proc.pid}), 200 elif action == "stop": if _video_proc is not None and _video_proc.poll() is None: diff --git a/display_test_nexio.py b/display_test_nexio.py index 21150dd..720a0b5 100644 --- a/display_test_nexio.py +++ b/display_test_nexio.py @@ -180,5 +180,53 @@ def play_kiosk(): except KeyboardInterrupt: pipeline.set_state(Gst.State.NULL) +def play_static_color(r: int, g: int, b: int): + """Display a solid colour using GStreamer videotestsrc (no video file required). + + Uses videotestsrc pattern=solid-color so every DSI line carries the same + repeating RGB triplet — any deviation in the proto_decoder output is a DSI fault. + """ + Gst.init(None) + + argb = (0xFF << 24) | (r << 16) | (g << 8) | b + + SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! " + "kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false") + pipeline_str = ( + f"videotestsrc pattern=solid-color foreground-color={argb} ! " + f"video/x-raw,width=1280,height=800,framerate=60/1 ! " + f"{SINK_STR}" + ) + + pipeline = Gst.parse_launch(pipeline_str) + bus = pipeline.get_bus() + bus.add_signal_watch() + + loop = GLib.MainLoop() + + def on_message(bus, msg): + if msg.type == Gst.MessageType.ERROR: + err, debug = msg.parse_error() + print(f"GStreamer Error: {err}\nDebug: {debug}", flush=True) + loop.quit() + elif msg.type == Gst.MessageType.STATE_CHANGED: + if msg.src == pipeline: + old, new, _ = msg.parse_state_changed() + print(f"Pipeline: {old.value_nick} -> {new.value_nick}", flush=True) + + bus.connect("message", on_message) + pipeline.set_state(Gst.State.PLAYING) + print(f"Static colour R:{r} G:{g} B:{b} (0x{argb:08X}) — running", flush=True) + + try: + loop.run() + except KeyboardInterrupt: + pipeline.set_state(Gst.State.NULL) + + if __name__ == "__main__": - play_kiosk() + import sys + if "--static-pink" in sys.argv: + play_static_color(255, 51, 187) # R:255 G:51 B:187 + else: + play_kiosk() diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index 2c88ff7..42b91c3 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -841,8 +841,8 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None: def _start_video() -> None: try: - requests.put(VIDEO_URL, json={"action": "start"}, timeout=3) - print(" VIDEO: kiosk player started.") + requests.put(VIDEO_URL, json={"action": "start", "mode": "static-pink"}, timeout=3) + print(" VIDEO: static-pink display started.") except Exception as e: print(f" WARNING: video start failed: {e}") diff --git a/proto_decoder.py b/proto_decoder.py index 0940d33..71305a5 100644 --- a/proto_decoder.py +++ b/proto_decoder.py @@ -104,23 +104,51 @@ def find_clock_edges(t_clk, v_clk, threshold=0.0): def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): """ - Find the start of the main HS burst in the DAT trace. + Find the start of the post-LP HS burst in the DAT trace. - The proto capture often starts mid-HS (previous packet), so we: - 1. Find the LP quiet region (both LP-11 and LP-low have low differential std) - 2. Find the first sustained oscillation AFTER that quiet region + For LP-triggered captures (trigger = DAT D+ falling at LP-11→LP-01 transition): + - CLK is in continuous HS mode throughout (215 MHz running) + - DAT shows LP-01 (diff ≈ -1 V) near t=0, preceded by HS data from the + previous line and possibly an earlier LP-01 at the start of the capture + - LP-00 follows LP-01 briefly (~50-200 ns), then the new HS burst begins + - To avoid the LP-01 from the previous line (at capture start), search + from N//4 onwards — the trigger LP-01 is at the capture midpoint (t=0) - Returns: index into t_dat of approximate HS burst start, or None. + Returns index into t_dat just past LP-00, ready for CLK-edge sampling. + Falls back to original std-based method for HS-triggered captures. """ - dt_ns = float(np.median(np.diff(t_dat))) * 1e9 - win = max(1, int(1.0 / dt_ns)) # 1 ns rolling window - min_run = max(5, int(5.0 / dt_ns)) # at least 5 ns continuous + dt_ns = float(np.median(np.diff(t_dat))) * 1e9 + N = len(v_dat) - rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(len(v_dat))]) - OSC_THRESH = 0.04 # 40 mV — HS oscillation - QUIET_THRESH = 0.02 # 20 mV — LP quiet (LP-11 differential ≈ 0, low std) + # --- LP-triggered path --- + # LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns) + LP01_THRESH = -0.5 + min_lp01 = max(2, int(20.0 / dt_ns)) + search_from = N // 4 # skip any LP-01 fragment at capture start + + run = 0 + lp01_end = None + for i in range(search_from, N): + if v_dat[i] < LP01_THRESH: + run += 1 + else: + if run >= min_lp01: + lp01_end = i + break + run = 0 + + if lp01_end is not None: + # Skip 200 ns past LP-01 end to clear LP-00, then hand off to bit decoder + skip = max(1, int(200.0 / dt_ns)) + return min(lp01_end + skip, N - 1) + + # --- Fallback: HS-triggered captures (original rolling-std method) --- + win = max(1, int(1.0 / dt_ns)) + min_run = max(5, int(5.0 / dt_ns)) + rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(N)]) + OSC_THRESH = 0.04 + QUIET_THRESH = 0.02 - # Step 1: find a quiet (LP) region of at least 200 ns quiet_min_run = max(5, int(200.0 / dt_ns)) quiet_end = None run_len = 0 @@ -133,9 +161,8 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): run_len = 0 if quiet_end is None: - return None # no LP region found + return None - # Step 2: find first sustained oscillation after the LP region run_start = None run_len = 0 for i in range(quiet_end, len(rstd)): @@ -273,7 +300,7 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): dt_ns = float(np.median(np.diff(t_dat))) * 1e9 if verbose: - print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns:.0f} ps/sample)") + print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)") # Find HS burst start hs_start_idx = find_hs_start(t_dat, v_dat) @@ -299,18 +326,33 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(" ERROR: Too few bits decoded") return None - raw_bytes = bits_to_bytes(bits) + # Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges. + # LP-00 CLK edges before HS starts produce garbage bits; the correct phase is + # the one where 0xB8 appears earliest in the byte stream. + raw_bytes = None + sync_idx = None + best_phase = 0 + best_sync = len(bits) # sentinel: "not found" + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si + + if raw_bytes is None: + raw_bytes = bits_to_bytes(bits) - # Find sync byte alignment - sync_idx = find_sync_byte(raw_bytes) if sync_idx is None: if verbose: - print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found — using raw byte 0 as start") + print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0") sync_idx = 0 else: if verbose: t_sync = raw_bytes[sync_idx][0] - print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns)") + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") # Data bytes after sync data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself