This commit is contained in:
david rice
2026-04-24 15:24:27 +01:00
parent f8d7727ff7
commit bc1d5bdc30
5 changed files with 119 additions and 25 deletions

Binary file not shown.

View File

@@ -212,16 +212,20 @@ def control_video():
if _video_proc is not None and _video_proc.poll() is None:
return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200
try:
cmd = ["python3", KIOSK_SCRIPT]
mode = data.get("mode", "")
if mode == "static-pink":
cmd.append("--static-pink")
log = open("/tmp/kiosk.log", "w")
_video_proc = subprocess.Popen(
["python3", KIOSK_SCRIPT],
cmd,
stdout=log,
stderr=subprocess.STDOUT,
env=os.environ.copy(),
)
except Exception as e:
return jsonify({"error": f"failed to launch kiosk: {e}"}), 500
return jsonify({"status": "started", "pid": _video_proc.pid}), 200
return jsonify({"status": "started", "mode": mode or "video", "pid": _video_proc.pid}), 200
elif action == "stop":
if _video_proc is not None and _video_proc.poll() is None:

View File

@@ -180,5 +180,53 @@ def play_kiosk():
except KeyboardInterrupt:
pipeline.set_state(Gst.State.NULL)
def play_static_color(r: int, g: int, b: int):
"""Display a solid colour using GStreamer videotestsrc (no video file required).
Uses videotestsrc pattern=solid-color so every DSI line carries the same
repeating RGB triplet — any deviation in the proto_decoder output is a DSI fault.
"""
Gst.init(None)
argb = (0xFF << 24) | (r << 16) | (g << 8) | b
SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! "
"kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false")
pipeline_str = (
f"videotestsrc pattern=solid-color foreground-color={argb} ! "
f"video/x-raw,width=1280,height=800,framerate=60/1 ! "
f"{SINK_STR}"
)
pipeline = Gst.parse_launch(pipeline_str)
bus = pipeline.get_bus()
bus.add_signal_watch()
loop = GLib.MainLoop()
def on_message(bus, msg):
if msg.type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"GStreamer Error: {err}\nDebug: {debug}", flush=True)
loop.quit()
elif msg.type == Gst.MessageType.STATE_CHANGED:
if msg.src == pipeline:
old, new, _ = msg.parse_state_changed()
print(f"Pipeline: {old.value_nick} -> {new.value_nick}", flush=True)
bus.connect("message", on_message)
pipeline.set_state(Gst.State.PLAYING)
print(f"Static colour R:{r} G:{g} B:{b} (0x{argb:08X}) — running", flush=True)
try:
loop.run()
except KeyboardInterrupt:
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
import sys
if "--static-pink" in sys.argv:
play_static_color(255, 51, 187) # R:255 G:51 B:187
else:
play_kiosk()

View File

@@ -841,8 +841,8 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
def _start_video() -> None:
try:
requests.put(VIDEO_URL, json={"action": "start"}, timeout=3)
print(" VIDEO: kiosk player started.")
requests.put(VIDEO_URL, json={"action": "start", "mode": "static-pink"}, timeout=3)
print(" VIDEO: static-pink display started.")
except Exception as e:
print(f" WARNING: video start failed: {e}")

View File

@@ -104,23 +104,51 @@ def find_clock_edges(t_clk, v_clk, threshold=0.0):
def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
"""
Find the start of the main HS burst in the DAT trace.
Find the start of the post-LP HS burst in the DAT trace.
The proto capture often starts mid-HS (previous packet), so we:
1. Find the LP quiet region (both LP-11 and LP-low have low differential std)
2. Find the first sustained oscillation AFTER that quiet region
For LP-triggered captures (trigger = DAT D+ falling at LP-11→LP-01 transition):
- CLK is in continuous HS mode throughout (215 MHz running)
- DAT shows LP-01 (diff ≈ -1 V) near t=0, preceded by HS data from the
previous line and possibly an earlier LP-01 at the start of the capture
- LP-00 follows LP-01 briefly (~50-200 ns), then the new HS burst begins
- To avoid the LP-01 from the previous line (at capture start), search
from N//4 onwards — the trigger LP-01 is at the capture midpoint (t=0)
Returns: index into t_dat of approximate HS burst start, or None.
Returns index into t_dat just past LP-00, ready for CLK-edge sampling.
Falls back to original std-based method for HS-triggered captures.
"""
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
win = max(1, int(1.0 / dt_ns)) # 1 ns rolling window
min_run = max(5, int(5.0 / dt_ns)) # at least 5 ns continuous
N = len(v_dat)
rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(len(v_dat))])
OSC_THRESH = 0.04 # 40 mV — HS oscillation
QUIET_THRESH = 0.02 # 20 mV — LP quiet (LP-11 differential ≈ 0, low std)
# --- LP-triggered path ---
# LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns)
LP01_THRESH = -0.5
min_lp01 = max(2, int(20.0 / dt_ns))
search_from = N // 4 # skip any LP-01 fragment at capture start
run = 0
lp01_end = None
for i in range(search_from, N):
if v_dat[i] < LP01_THRESH:
run += 1
else:
if run >= min_lp01:
lp01_end = i
break
run = 0
if lp01_end is not None:
# Skip 200 ns past LP-01 end to clear LP-00, then hand off to bit decoder
skip = max(1, int(200.0 / dt_ns))
return min(lp01_end + skip, N - 1)
# --- Fallback: HS-triggered captures (original rolling-std method) ---
win = max(1, int(1.0 / dt_ns))
min_run = max(5, int(5.0 / dt_ns))
rstd = np.array([v_dat[max(0, i - win):i + 1].std() for i in range(N)])
OSC_THRESH = 0.04
QUIET_THRESH = 0.02
# Step 1: find a quiet (LP) region of at least 200 ns
quiet_min_run = max(5, int(200.0 / dt_ns))
quiet_end = None
run_len = 0
@@ -133,9 +161,8 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
run_len = 0
if quiet_end is None:
return None # no LP region found
return None
# Step 2: find first sustained oscillation after the LP region
run_start = None
run_len = 0
for i in range(quiet_end, len(rstd)):
@@ -273,7 +300,7 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
if verbose:
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns:.0f} ps/sample)")
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs ({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)")
# Find HS burst start
hs_start_idx = find_hs_start(t_dat, v_dat)
@@ -299,18 +326,33 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
print(" ERROR: Too few bits decoded")
return None
# Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges.
# LP-00 CLK edges before HS starts produce garbage bits; the correct phase is
# the one where 0xB8 appears earliest in the byte stream.
raw_bytes = None
sync_idx = None
best_phase = 0
best_sync = len(bits) # sentinel: "not found"
for phase in range(8):
rb = bits_to_bytes(bits[phase:])
si = find_sync_byte(rb)
if si is not None and si < best_sync:
best_sync = si
best_phase = phase
raw_bytes = rb
sync_idx = si
if raw_bytes is None:
raw_bytes = bits_to_bytes(bits)
# Find sync byte alignment
sync_idx = find_sync_byte(raw_bytes)
if sync_idx is None:
if verbose:
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found — using raw byte 0 as start")
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0")
sync_idx = 0
else:
if verbose:
t_sync = raw_bytes[sync_idx][0]
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns)")
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})")
# Data bytes after sync
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself