diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc index 7142b24..ee65eac 100644 Binary files a/__pycache__/csv_preprocessor.cpython-312.pyc and b/__pycache__/csv_preprocessor.cpython-312.pyc differ diff --git a/csv_preprocessor.py b/csv_preprocessor.py index c72f945..a8751eb 100644 --- a/csv_preprocessor.py +++ b/csv_preprocessor.py @@ -47,6 +47,9 @@ LP11_SPEC_MIN_V = 1.0 # V — LP-11 minimum voltage spec LP11_SPEC_MAX_V = 1.45 # V — LP-11 maximum voltage spec LP_LOW_DUR_MIN_NS = 50.0 # ns — minimum LP-low duration per D-PHY spec (LP-01 + LP-00 combined) HS_OSC_STD_V = 0.045 # V — rolling-std threshold above which a region is classified as HS +# If rolling-std fires within this margin after LP-low ends, it's detecting HS onset (not LP-11 +# return). Mode A only fires when lp11_to_hs exceeds lp_low_duration by more than this margin. +LP_LOW_HS_ONSET_MARGIN_NS = 20.0 # ns # Flicker detection thresholds # LP-low plateau below this → SoT sequence too brief for receiver to detect → flicker risk @@ -1011,21 +1014,24 @@ def analyze_lp_file(path: Path) -> "LPMetrics": and hs_amplitude_mv < HS_BURST_AMPLITUDE_MIN_MV and ( # Mode A: LP-low normal, rolling-std fired but HS amplitude is sub-threshold. - # Require amp ≥ HS_MODE_A_MIN_MV to exclude LP-11-return artifacts: when LP-11 - # returns after LP-low without any HS attempt the burst window is pure DC ~0 V - # (two LP-11 regions straddling a clean LP-low), giving amp ≈ 0–3 mV. A genuine - # weak HS attempt leaves measurable oscillations well above this floor. + # Two guards prevent false positives: + # 1. amp ≥ HS_MODE_A_MIN_MV: excludes LP-11-return artifacts where the burst + # window is pure DC ~0 V (amp ≈ 0–3 mV). + # 2. lp11_to_hs > lp_low + LP_LOW_HS_ONSET_MARGIN_NS: excludes HS-onset firing + # where rolling-std triggers right when LP-low ends (lp11_to_hs ≈ lp_low + 5 ns). + # True LP-11 return or delayed HS would be significantly beyond LP-low end. (lp11_to_hs_ns is not None and lp11_to_hs_ns >= LP_LOW_DUR_MIN_NS - and hs_amplitude_mv >= HS_MODE_A_MIN_MV) + and hs_amplitude_mv >= HS_MODE_A_MIN_MV + and (lp_low_duration_ns is None + or lp11_to_hs_ns > lp_low_duration_ns + LP_LOW_HS_ONSET_MARGIN_NS)) # Mode A2: rolling-std never fired — HS absent or amplitude below HS_OSC_STD_V; # weak oscillations are misclassified as LP-low, masking the true HS failure or lp11_to_hs_ns is None # Mode B: LP-low anomalously short + low amplitude = marginal HS launch or _lp_low_short - # Mode D: LP-low normal (≥ 200 ns) but rolling-std fired on LP-11 falling edge - # transition noise (lp11_to_hs < LP_LOW_DUR_MIN_NS). HS amplitude sub-threshold - # confirms the HS burst never formed — bridge entered LP-low but returned to LP-11 - # without completing SoT. Confirmed: capture 0035 (lp_low=379 ns, amp=19 mV). + # Mode D: LP-low normal, noise-spike lp11_to_hs (< 50 ns), low HS amplitude. + # Requires dynamic display content (video) during the test — with static/DC content + # the probe noise floor is 15–35 mV regardless of HS health, making this unreliable. or (lp11_to_hs_ns is not None and lp11_to_hs_ns < LP_LOW_DUR_MIN_NS and not _lp_low_short) @@ -1044,7 +1050,11 @@ def analyze_lp_file(path: Path) -> "LPMetrics": or ( lp_transition_valid and ( - (lp_low_duration_ns is None or lp_low_duration_ns < FLICKER_LP_LOW_MAX_NS) + # LP-00 absent entirely (LP-01/LP-00 skipped — SoT never completed). + # Short-but-non-zero LP-low is handled by Mode B inside hs_burst_absent; + # flagging it standalone (even with healthy HS amplitude) caused false + # positives when noise triggers gave lp_low < 50 ns with normal HS. + lp_low_duration_ns is None or hs_burst_absent ) ) diff --git a/device_server.py b/device_server.py index 8013fbe..d049ed2 100644 --- a/device_server.py +++ b/device_server.py @@ -11,11 +11,20 @@ Add addresses to REGISTER_COMMANDS to capture more register ranges. import os import re import subprocess +import threading from flask import Flask, jsonify, request app = Flask(__name__) +# --------------------------------------------------------------------------- +# Video playback state (managed as a subprocess) +# --------------------------------------------------------------------------- +KIOSK_SCRIPT = "/root/python/display_test_nexio.py" + +_video_proc: subprocess.Popen | None = None +_video_lock = threading.Lock() + # --------------------------------------------------------------------------- # Register commands to execute on each GET /registers request. # Each entry is a complete memtool command string. @@ -180,5 +189,41 @@ def get_sn65_registers(): }), 200 +@app.route("/video", methods=["PUT"]) +def control_video(): + """Start or stop the kiosk video player. + + PUT /video {"action": "start"|"stop"} + """ + global _video_proc + data = request.get_json(force=True) or {} + action = data.get("action", "").lower() + + with _video_lock: + if action == "start": + if _video_proc is not None and _video_proc.poll() is None: + return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200 + _video_proc = subprocess.Popen( + ["python3", KIOSK_SCRIPT], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return jsonify({"status": "started", "pid": _video_proc.pid}), 200 + + elif action == "stop": + if _video_proc is not None and _video_proc.poll() is None: + _video_proc.terminate() + try: + _video_proc.wait(timeout=3) + except subprocess.TimeoutExpired: + _video_proc.kill() + _video_proc.wait() + _video_proc = None + return jsonify({"status": "stopped"}), 200 + + else: + return jsonify({"error": "Invalid action. Use 'start' or 'stop'"}), 400 + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000) diff --git a/display_test_nexio.py b/display_test_nexio.py new file mode 100644 index 0000000..618efcb --- /dev/null +++ b/display_test_nexio.py @@ -0,0 +1,169 @@ +import gi +import struct +import os + +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib + +class KioskManager: + def __init__(self, pipeline): + self.pipeline = pipeline + self.videos = [ + "file:///root/python/vid.mp4", + "file:///root/python/vid2.mp4" + ] + self.current_video_index = 0 + + # Hardware Paths + self.backlight_path = "/sys/class/backlight/main_backlight/brightness" + self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity" + self.led_trig_path = "/sys/class/leds/display-indicator/trigger" + self.led_br_path = "/sys/class/leds/display-indicator/brightness" + + # Backlight State + self.bl_current = 10 + self.bl_direction = 1 # 1 for increasing, -1 for decreasing + + # Hex to RGB Tuples + self.colours = [ + (95, 1, 111), # #5F016F + (255, 51, 187), # #FF33BB + (255, 128, 212),# #FF80D4 + (255, 173, 228) # #FFADE4 + ] + self.colour_index = 0 + + self.setup_led() + + def setup_led(self): + try: + if os.path.exists(self.led_trig_path): + with open(self.led_trig_path, 'w') as f: f.write("none") + with open(self.led_br_path, 'w') as f: f.write("255") + self.change_led_colour() + except Exception as e: + print(f"LED Setup Error: {e}") + + def change_led_colour(self): + """Cycles the next LED colour""" + r, g, b = self.colours[self.colour_index] + try: + if os.path.exists(self.led_multi_path): + with open(self.led_multi_path, 'w') as f: + f.write(f"{r} {g} {b}") + print(f"Loop/Switch! LED: {r} {g} {b}") + self.colour_index = (self.colour_index + 1) % len(self.colours) + except Exception as e: + print(f"Failed to write LED colour: {e}") + + def cycle_backlight(self): + """Touch Logic: Adjusts screen brightness and cycles LED""" + self.bl_current += self.bl_direction + if self.bl_current >= 20: + self.bl_current = 20 + self.bl_direction = -1 + elif self.bl_current <= 0: + self.bl_current = 0 + self.bl_direction = 1 + + try: + if os.path.exists(self.backlight_path): + with open(self.backlight_path, 'w') as f: + f.write(str(self.bl_current)) + print(f"Touch Event: Brightness now {self.bl_current}") + + except Exception as e: + print(f"Backlight Error: {e}") + + def switch_video(self): + """Manual trigger to swap between vid.mp4 and vid2.mp4""" + self.pipeline.set_state(Gst.State.READY) + self.current_video_index = (self.current_video_index + 1) % len(self.videos) + new_uri = self.videos[self.current_video_index] + + print(f"Button Trigger: Switching to {new_uri}") + self.pipeline.set_property("uri", new_uri) + self.pipeline.set_state(Gst.State.PLAYING) + +def handle_touch(source, condition, manager): + """Listens to /dev/input/event2""" + EVENT_SIZE = 24 + data = source.read(EVENT_SIZE) + if not data or len(data) < EVENT_SIZE: + return True + + _, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data) + + # EV_KEY (1) and BTN_TOUCH (330), value 1 is Press + if ev_type == 1 and ev_code == 330 and ev_value == 1: + manager.cycle_backlight() + return True + + +def handle_button(source, condition, manager): + """Lestens to /dev/input/event1""" + EVENT_SIZE = 24 + data = source.read(EVENT_SIZE) + if not data or len(data) < EVENT_SIZE: + return True + + _, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data) + + # EV_KEY (1), value 1 is press + if ev_type == 1 and ev_value == 1: + manager.switch_video() + + return True + +def play_kiosk(): + Gst.init(None) + + pipeline = Gst.ElementFactory.make("playbin", "player") + + SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! " + "kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false") + video_sink = Gst.parse_bin_from_description(SINK_STR, True) + pipeline.set_property("video-sink", video_sink) + pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink")) + + manager = KioskManager(pipeline) + pipeline.set_property("uri", manager.videos[0]) + + # --- INPUT MONITORING --- + try: + btn_fd = open("/dev/input/event1", "rb") + GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager) + except Exception as e: print(f"Button Node Error: {e}") + + try: + # Touchscreen assigned to event2 + tch_fd = open("/dev/input/event2", "rb") + GLib.io_add_watch(tch_fd, GLib.IO_IN, handle_touch, manager) + except Exception as e: print(f"Touch Node Error: {e}") + + # --- BUS MESSAGES --- + bus = pipeline.get_bus() + bus.add_signal_watch() + + def on_message(bus, msg, manager_instance): + if msg.type == Gst.MessageType.EOS: + # Video ended. cycle LED and loop back to start + manager_instance.change_led_colour() + pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0) + elif msg.type == Gst.MessageType.ERROR: + err, debug = msg.parse_error() + print(f"GStreamer Error: {err}") + loop.quit + + bus.connect("message", on_message, manager) + + pipeline.set_state(Gst.State.PLAYING) + loop = GLib.MainLoop() + + try: + loop.run() + except KeyboardInterrupt: + pipeline.set_state(Gst.State.NULL) + +if __name__ == "__main__": + play_kiosk() diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index 6aa8dc6..d386e0e 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -27,7 +27,6 @@ import sys import requests from datetime import datetime from pathlib import Path - import math import os import struct @@ -51,6 +50,7 @@ load_dotenv(Path(__file__).parent / ".env") DEVICE_BASE = "http://192.168.45.8:5000" URL = f"{DEVICE_BASE}/display" +VIDEO_URL = f"{DEVICE_BASE}/video" SCOPE_IP = "192.168.45.4" PSU_IP = "192.168.45.3" @@ -841,6 +841,22 @@ def _append_flicker_log(ts: str, iteration: int, m: LPMetrics) -> None: ]) +def _start_video() -> None: + try: + requests.put(VIDEO_URL, json={"action": "start"}, timeout=3) + print(" VIDEO: kiosk player started.") + except Exception as e: + print(f" WARNING: video start failed: {e}") + + +def _stop_video() -> None: + try: + requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3) + print(" VIDEO: kiosk player stopped.") + except Exception as e: + print(f" WARNING: video stop failed: {e}") + + def _play_alarm() -> None: """Play three short beeps using a generated WAV tone.""" sample_rate = 44100 @@ -1356,6 +1372,8 @@ def run_interactive_test() -> None: print("The display stays ON while Claude and the operator assess the frame.") print("Press Ctrl+C at any time to stop.\n") + _start_video() + try: while True: # ── Display ON ───────────────────────────────────────────────── @@ -1423,7 +1441,7 @@ def run_interactive_test() -> None: print("\n FLICKER CONFIRMED BY OPERATOR. STOPPING TEST.") stop_reason = (f"Flicker confirmed by operator at " f"capture {iteration:04d} [{ts}]") - # Display OFF + _stop_video() try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: @@ -1450,6 +1468,7 @@ def run_interactive_test() -> None: except KeyboardInterrupt: print("\n\n TEST INTERRUPTED (Ctrl+C).") stop_reason = "Test interrupted by operator (Ctrl+C)" + _stop_video() try: requests.put(URL, json={"state": "off"}, timeout=2) except Exception: