This commit is contained in:
david rice
2026-04-21 12:26:10 +01:00
parent 9f1536a157
commit ca0faf79d8
5 changed files with 255 additions and 12 deletions

View File

@@ -47,6 +47,9 @@ LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec
LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec
LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined)
HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS
# If rolling-std fires within this margin after LP-low ends, it's detecting HS onset (not LP-11
# return). Mode A only fires when lp11_to_hs exceeds lp_low_duration by more than this margin.
LP_LOW_HS_ONSET_MARGIN_NS = 20.0 # ns
# Flicker detection thresholds
# LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk
@@ -1011,21 +1014,24 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
and hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV
and (
# Mode A: LP-low normal, rolling-std fired but HS amplitude is sub-threshold.
# Require amp ≥ HS_MODE_A_MIN_MV to exclude LP-11-return artifacts: when LP-11
# returns after LP-low without any HS attempt the burst window is pure DC ~0 V
# (two LP-11 regions straddling a clean LP-low), giving amp ≈ 03 mV. A genuine
# weak HS attempt leaves measurable oscillations well above this floor.
# Two guards prevent false positives:
# 1. amp ≥ HS_MODE_A_MIN_MV: excludes LP-11-return artifacts where the burst
# window is pure DC ~0 V (amp ≈ 03 mV).
# 2. lp11_to_hs > lp_low + LP_LOW_HS_ONSET_MARGIN_NS: excludes HS-onset firing
# where rolling-std triggers right when LP-low ends (lp11_to_hs ≈ lp_low + 5 ns).
# True LP-11 return or delayed HS would be significantly beyond LP-low end.
(lp11_to_hs_ns is not None and lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS
and hs_amplitude_mv >= HS_MODE_A_MIN_MV)
and hs_amplitude_mv >= HS_MODE_A_MIN_MV
and (lp_low_duration_ns is None
or lp11_to_hs_ns > lp_low_duration_ns + LP_LOW_HS_ONSET_MARGIN_NS))
# Mode A2: rolling-std never fired — HS absent or amplitude below HS_OSC_STD_V;
# weak oscillations are misclassified as LP-low, masking the true HS failure
or lp11_to_hs_ns is None
# Mode B: LP-low anomalously short + low amplitude = marginal HS launch
or _lp_low_short
# Mode D: LP-low normal (≥ 200 ns) but rolling-std fired on LP-11 falling edge
# transition noise (lp11_to_hs < LP_LOW_DUR_MIN_NS). HS amplitude sub-threshold
# confirms the HS burst never formed — bridge entered LP-low but returned to LP-11
# without completing SoT. Confirmed: capture 0035 (lp_low=379 ns, amp=19 mV).
# Mode D: LP-low normal, noise-spike lp11_to_hs (< 50 ns), low HS amplitude.
# Requires dynamic display content (video) during the test — with static/DC content
# the probe noise floor is 1535 mV regardless of HS health, making this unreliable.
or (lp11_to_hs_ns is not None
and lp11_to_hs_ns < LP_LOW_DUR_MIN_NS
and not _lp_low_short)
@@ -1044,7 +1050,11 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
or (
lp_transition_valid
and (
(lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS)
# LP-00 absent entirely (LP-01/LP-00 skipped — SoT never completed).
# Short-but-non-zero LP-low is handled by Mode B inside hs_burst_absent;
# flagging it standalone (even with healthy HS amplitude) caused false
# positives when noise triggers gave lp_low < 50 ns with normal HS.
lp_low_duration_ns is None
or hs_burst_absent
)
)

View File

@@ -11,11 +11,20 @@ Add addresses to REGISTER_COMMANDS to capture more register ranges.
import os
import re
import subprocess
import threading
from flask import Flask, jsonify, request
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Video playback state (managed as a subprocess)
# ---------------------------------------------------------------------------
KIOSK_SCRIPT = "/root/python/display_test_nexio.py"
_video_proc: subprocess.Popen | None = None
_video_lock = threading.Lock()
# ---------------------------------------------------------------------------
# Register commands to execute on each GET /registers request.
# Each entry is a complete memtool command string.
@@ -180,5 +189,41 @@ def get_sn65_registers():
}), 200
@app.route("/video", methods=["PUT"])
def control_video():
"""Start or stop the kiosk video player.
PUT /video {"action": "start"|"stop"}
"""
global _video_proc
data = request.get_json(force=True) or {}
action = data.get("action", "").lower()
with _video_lock:
if action == "start":
if _video_proc is not None and _video_proc.poll() is None:
return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200
_video_proc = subprocess.Popen(
["python3", KIOSK_SCRIPT],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return jsonify({"status": "started", "pid": _video_proc.pid}), 200
elif action == "stop":
if _video_proc is not None and _video_proc.poll() is None:
_video_proc.terminate()
try:
_video_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
_video_proc.kill()
_video_proc.wait()
_video_proc = None
return jsonify({"status": "stopped"}), 200
else:
return jsonify({"error": "Invalid action. Use 'start' or 'stop'"}), 400
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

169
display_test_nexio.py Normal file
View File

@@ -0,0 +1,169 @@
import gi
import struct
import os
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
class KioskManager:
def __init__(self, pipeline):
self.pipeline = pipeline
self.videos = [
"file:///root/python/vid.mp4",
"file:///root/python/vid2.mp4"
]
self.current_video_index = 0
# Hardware Paths
self.backlight_path = "/sys/class/backlight/main_backlight/brightness"
self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity"
self.led_trig_path = "/sys/class/leds/display-indicator/trigger"
self.led_br_path = "/sys/class/leds/display-indicator/brightness"
# Backlight State
self.bl_current = 10
self.bl_direction = 1 # 1 for increasing, -1 for decreasing
# Hex to RGB Tuples
self.colours = [
(95, 1, 111), # #5F016F
(255, 51, 187), # #FF33BB
(255, 128, 212),# #FF80D4
(255, 173, 228) # #FFADE4
]
self.colour_index = 0
self.setup_led()
def setup_led(self):
try:
if os.path.exists(self.led_trig_path):
with open(self.led_trig_path, 'w') as f: f.write("none")
with open(self.led_br_path, 'w') as f: f.write("255")
self.change_led_colour()
except Exception as e:
print(f"LED Setup Error: {e}")
def change_led_colour(self):
"""Cycles the next LED colour"""
r, g, b = self.colours[self.colour_index]
try:
if os.path.exists(self.led_multi_path):
with open(self.led_multi_path, 'w') as f:
f.write(f"{r} {g} {b}")
print(f"Loop/Switch! LED: {r} {g} {b}")
self.colour_index = (self.colour_index + 1) % len(self.colours)
except Exception as e:
print(f"Failed to write LED colour: {e}")
def cycle_backlight(self):
"""Touch Logic: Adjusts screen brightness and cycles LED"""
self.bl_current += self.bl_direction
if self.bl_current >= 20:
self.bl_current = 20
self.bl_direction = -1
elif self.bl_current <= 0:
self.bl_current = 0
self.bl_direction = 1
try:
if os.path.exists(self.backlight_path):
with open(self.backlight_path, 'w') as f:
f.write(str(self.bl_current))
print(f"Touch Event: Brightness now {self.bl_current}")
except Exception as e:
print(f"Backlight Error: {e}")
def switch_video(self):
"""Manual trigger to swap between vid.mp4 and vid2.mp4"""
self.pipeline.set_state(Gst.State.READY)
self.current_video_index = (self.current_video_index + 1) % len(self.videos)
new_uri = self.videos[self.current_video_index]
print(f"Button Trigger: Switching to {new_uri}")
self.pipeline.set_property("uri", new_uri)
self.pipeline.set_state(Gst.State.PLAYING)
def handle_touch(source, condition, manager):
"""Listens to /dev/input/event2"""
EVENT_SIZE = 24
data = source.read(EVENT_SIZE)
if not data or len(data) < EVENT_SIZE:
return True
_, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data)
# EV_KEY (1) and BTN_TOUCH (330), value 1 is Press
if ev_type == 1 and ev_code == 330 and ev_value == 1:
manager.cycle_backlight()
return True
def handle_button(source, condition, manager):
"""Lestens to /dev/input/event1"""
EVENT_SIZE = 24
data = source.read(EVENT_SIZE)
if not data or len(data) < EVENT_SIZE:
return True
_, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data)
# EV_KEY (1), value 1 is press
if ev_type == 1 and ev_value == 1:
manager.switch_video()
return True
def play_kiosk():
Gst.init(None)
pipeline = Gst.ElementFactory.make("playbin", "player")
SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! "
"kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false")
video_sink = Gst.parse_bin_from_description(SINK_STR, True)
pipeline.set_property("video-sink", video_sink)
pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink"))
manager = KioskManager(pipeline)
pipeline.set_property("uri", manager.videos[0])
# --- INPUT MONITORING ---
try:
btn_fd = open("/dev/input/event1", "rb")
GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager)
except Exception as e: print(f"Button Node Error: {e}")
try:
# Touchscreen assigned to event2
tch_fd = open("/dev/input/event2", "rb")
GLib.io_add_watch(tch_fd, GLib.IO_IN, handle_touch, manager)
except Exception as e: print(f"Touch Node Error: {e}")
# --- BUS MESSAGES ---
bus = pipeline.get_bus()
bus.add_signal_watch()
def on_message(bus, msg, manager_instance):
if msg.type == Gst.MessageType.EOS:
# Video ended. cycle LED and loop back to start
manager_instance.change_led_colour()
pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0)
elif msg.type == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(f"GStreamer Error: {err}")
loop.quit
bus.connect("message", on_message, manager)
pipeline.set_state(Gst.State.PLAYING)
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
play_kiosk()

View File

@@ -27,7 +27,6 @@ import sys
import requests
from datetime import datetime
from pathlib import Path
import math
import os
import struct
@@ -51,6 +50,7 @@ load_dotenv(Path(__file__).parent / ".env")
DEVICE_BASE = "http://192.168.45.8:5000"
URL = f"{DEVICE_BASE}/display"
VIDEO_URL = f"{DEVICE_BASE}/video"
SCOPE_IP = "192.168.45.4"
PSU_IP = "192.168.45.3"
@@ -841,6 +841,22 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None:
])
def _start_video() -> None:
try:
requests.put(VIDEO_URL, json={"action": "start"}, timeout=3)
print(" VIDEO: kiosk player started.")
except Exception as e:
print(f" WARNING: video start failed: {e}")
def _stop_video() -> None:
try:
requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3)
print(" VIDEO: kiosk player stopped.")
except Exception as e:
print(f" WARNING: video stop failed: {e}")
def _play_alarm() -> None:
"""Play three short beeps using a generated WAV tone."""
sample_rate = 44100
@@ -1356,6 +1372,8 @@ def run_interactive_test() -> None:
print("The display stays ON while Claude and the operator assess the frame.")
print("Press Ctrl+C at any time to stop.\n")
_start_video()
try:
while True:
# ── Display ON ─────────────────────────────────────────────────
@@ -1423,7 +1441,7 @@ def run_interactive_test() -> None:
print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.")
stop_reason = (f"Flicker confirmed by operator at "
f"capture {iteration:04d} [{ts}]")
# Display OFF
_stop_video()
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception:
@@ -1450,6 +1468,7 @@ def run_interactive_test() -> None:
except KeyboardInterrupt:
print("\n\n TEST INTERRUPTED (Ctrl+C).")
stop_reason = "Test interrupted by operator (Ctrl+C)"
_stop_video()
try:
requests.put(URL, json={"state": "off"}, timeout=2)
except Exception: