2026-04-21 12:26:10 +01:00
|
|
|
import gi
|
2026-04-21 16:23:43 +01:00
|
|
|
import socket
|
2026-04-21 12:26:10 +01:00
|
|
|
import struct
|
2026-04-21 16:23:43 +01:00
|
|
|
import threading
|
2026-04-21 12:26:10 +01:00
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
gi.require_version('Gst', '1.0')
|
|
|
|
|
from gi.repository import Gst, GLib
|
|
|
|
|
|
|
|
|
|
class KioskManager:
|
|
|
|
|
def __init__(self, pipeline):
|
|
|
|
|
self.pipeline = pipeline
|
|
|
|
|
self.videos = [
|
2026-04-21 16:05:58 +01:00
|
|
|
"file:///root/vid.mp4",
|
|
|
|
|
"file:///root/vid2.mp4"
|
2026-04-21 12:26:10 +01:00
|
|
|
]
|
|
|
|
|
self.current_video_index = 0
|
|
|
|
|
|
|
|
|
|
# Hardware Paths
|
|
|
|
|
self.backlight_path = "/sys/class/backlight/main_backlight/brightness"
|
|
|
|
|
self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity"
|
|
|
|
|
self.led_trig_path = "/sys/class/leds/display-indicator/trigger"
|
|
|
|
|
self.led_br_path = "/sys/class/leds/display-indicator/brightness"
|
|
|
|
|
|
|
|
|
|
# Backlight State
|
|
|
|
|
self.bl_current = 10
|
|
|
|
|
self.bl_direction = 1 # 1 for increasing, -1 for decreasing
|
|
|
|
|
|
|
|
|
|
# Hex to RGB Tuples
|
|
|
|
|
self.colours = [
|
|
|
|
|
(95, 1, 111), # #5F016F
|
|
|
|
|
(255, 51, 187), # #FF33BB
|
|
|
|
|
(255, 128, 212),# #FF80D4
|
|
|
|
|
(255, 173, 228) # #FFADE4
|
|
|
|
|
]
|
|
|
|
|
self.colour_index = 0
|
|
|
|
|
|
|
|
|
|
self.setup_led()
|
|
|
|
|
|
|
|
|
|
def setup_led(self):
|
|
|
|
|
try:
|
|
|
|
|
if os.path.exists(self.led_trig_path):
|
|
|
|
|
with open(self.led_trig_path, 'w') as f: f.write("none")
|
|
|
|
|
with open(self.led_br_path, 'w') as f: f.write("255")
|
|
|
|
|
self.change_led_colour()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"LED Setup Error: {e}")
|
|
|
|
|
|
|
|
|
|
def change_led_colour(self):
|
|
|
|
|
"""Cycles the next LED colour"""
|
|
|
|
|
r, g, b = self.colours[self.colour_index]
|
|
|
|
|
try:
|
|
|
|
|
if os.path.exists(self.led_multi_path):
|
|
|
|
|
with open(self.led_multi_path, 'w') as f:
|
|
|
|
|
f.write(f"{r} {g} {b}")
|
|
|
|
|
print(f"Loop/Switch! LED: {r} {g} {b}")
|
|
|
|
|
self.colour_index = (self.colour_index + 1) % len(self.colours)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Failed to write LED colour: {e}")
|
|
|
|
|
|
|
|
|
|
def cycle_backlight(self):
|
|
|
|
|
"""Touch Logic: Adjusts screen brightness and cycles LED"""
|
|
|
|
|
self.bl_current += self.bl_direction
|
|
|
|
|
if self.bl_current >= 20:
|
|
|
|
|
self.bl_current = 20
|
|
|
|
|
self.bl_direction = -1
|
|
|
|
|
elif self.bl_current <= 0:
|
|
|
|
|
self.bl_current = 0
|
|
|
|
|
self.bl_direction = 1
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if os.path.exists(self.backlight_path):
|
|
|
|
|
with open(self.backlight_path, 'w') as f:
|
|
|
|
|
f.write(str(self.bl_current))
|
|
|
|
|
print(f"Touch Event: Brightness now {self.bl_current}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Backlight Error: {e}")
|
|
|
|
|
|
|
|
|
|
def switch_video(self):
|
|
|
|
|
"""Manual trigger to swap between vid.mp4 and vid2.mp4"""
|
|
|
|
|
self.pipeline.set_state(Gst.State.READY)
|
|
|
|
|
self.current_video_index = (self.current_video_index + 1) % len(self.videos)
|
|
|
|
|
new_uri = self.videos[self.current_video_index]
|
|
|
|
|
|
|
|
|
|
print(f"Button Trigger: Switching to {new_uri}")
|
|
|
|
|
self.pipeline.set_property("uri", new_uri)
|
|
|
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
|
|
|
|
|
|
def handle_touch(source, condition, manager):
|
|
|
|
|
"""Listens to /dev/input/event2"""
|
|
|
|
|
EVENT_SIZE = 24
|
|
|
|
|
data = source.read(EVENT_SIZE)
|
|
|
|
|
if not data or len(data) < EVENT_SIZE:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
_, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data)
|
|
|
|
|
|
|
|
|
|
# EV_KEY (1) and BTN_TOUCH (330), value 1 is Press
|
|
|
|
|
if ev_type == 1 and ev_code == 330 and ev_value == 1:
|
|
|
|
|
manager.cycle_backlight()
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_button(source, condition, manager):
|
|
|
|
|
"""Lestens to /dev/input/event1"""
|
|
|
|
|
EVENT_SIZE = 24
|
|
|
|
|
data = source.read(EVENT_SIZE)
|
|
|
|
|
if not data or len(data) < EVENT_SIZE:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
_, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data)
|
|
|
|
|
|
|
|
|
|
# EV_KEY (1), value 1 is press
|
|
|
|
|
if ev_type == 1 and ev_value == 1:
|
|
|
|
|
manager.switch_video()
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def play_kiosk():
|
|
|
|
|
Gst.init(None)
|
|
|
|
|
|
|
|
|
|
pipeline = Gst.ElementFactory.make("playbin", "player")
|
|
|
|
|
|
|
|
|
|
SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! "
|
|
|
|
|
"kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false")
|
|
|
|
|
video_sink = Gst.parse_bin_from_description(SINK_STR, True)
|
|
|
|
|
pipeline.set_property("video-sink", video_sink)
|
|
|
|
|
pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink"))
|
|
|
|
|
|
|
|
|
|
manager = KioskManager(pipeline)
|
|
|
|
|
pipeline.set_property("uri", manager.videos[0])
|
|
|
|
|
|
2026-04-21 16:23:43 +01:00
|
|
|
# UDP trigger → switch video (device_server sends a packet to port 5001)
|
|
|
|
|
def _udp_listener():
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
sock.bind(('127.0.0.1', 5001))
|
|
|
|
|
while True:
|
|
|
|
|
sock.recv(64)
|
|
|
|
|
GLib.idle_add(manager.switch_video)
|
|
|
|
|
|
|
|
|
|
threading.Thread(target=_udp_listener, daemon=True).start()
|
2026-04-21 15:38:17 +01:00
|
|
|
|
2026-04-21 12:26:10 +01:00
|
|
|
# --- INPUT MONITORING ---
|
|
|
|
|
try:
|
|
|
|
|
btn_fd = open("/dev/input/event1", "rb")
|
|
|
|
|
GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager)
|
|
|
|
|
except Exception as e: print(f"Button Node Error: {e}")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Touchscreen assigned to event2
|
|
|
|
|
tch_fd = open("/dev/input/event2", "rb")
|
|
|
|
|
GLib.io_add_watch(tch_fd, GLib.IO_IN, handle_touch, manager)
|
|
|
|
|
except Exception as e: print(f"Touch Node Error: {e}")
|
|
|
|
|
|
|
|
|
|
# --- BUS MESSAGES ---
|
|
|
|
|
bus = pipeline.get_bus()
|
|
|
|
|
bus.add_signal_watch()
|
|
|
|
|
|
|
|
|
|
def on_message(bus, msg, manager_instance):
|
|
|
|
|
if msg.type == Gst.MessageType.EOS:
|
|
|
|
|
manager_instance.change_led_colour()
|
|
|
|
|
pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0)
|
|
|
|
|
elif msg.type == Gst.MessageType.ERROR:
|
|
|
|
|
err, debug = msg.parse_error()
|
2026-04-21 16:16:39 +01:00
|
|
|
print(f"GStreamer Error: {err}\nDebug: {debug}", flush=True)
|
|
|
|
|
loop.quit()
|
|
|
|
|
elif msg.type == Gst.MessageType.STATE_CHANGED:
|
|
|
|
|
if msg.src == pipeline:
|
|
|
|
|
old, new, _ = msg.parse_state_changed()
|
|
|
|
|
print(f"Pipeline: {old.value_nick} -> {new.value_nick}", flush=True)
|
2026-04-21 12:26:10 +01:00
|
|
|
|
|
|
|
|
bus.connect("message", on_message, manager)
|
|
|
|
|
|
|
|
|
|
pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
|
loop = GLib.MainLoop()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loop.run()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
pipeline.set_state(Gst.State.NULL)
|
|
|
|
|
|
2026-04-24 15:24:27 +01:00
|
|
|
def play_static_color(r: int, g: int, b: int):
|
|
|
|
|
"""Display a solid colour using GStreamer videotestsrc (no video file required).
|
|
|
|
|
|
|
|
|
|
Uses videotestsrc pattern=solid-color so every DSI line carries the same
|
|
|
|
|
repeating RGB triplet — any deviation in the proto_decoder output is a DSI fault.
|
2026-04-24 15:37:12 +01:00
|
|
|
|
|
|
|
|
Listens on UDP port 5001 for a trigger packet (same as play_kiosk), which
|
|
|
|
|
briefly cycles the pipeline through READY→PLAYING to generate the LP→HS
|
|
|
|
|
startup sequence that the scope captures on Pass 1.
|
2026-04-24 15:24:27 +01:00
|
|
|
"""
|
|
|
|
|
Gst.init(None)
|
|
|
|
|
|
|
|
|
|
argb = (0xFF << 24) | (r << 16) | (g << 8) | b
|
|
|
|
|
|
|
|
|
|
SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! "
|
|
|
|
|
"kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false")
|
|
|
|
|
pipeline_str = (
|
|
|
|
|
f"videotestsrc pattern=solid-color foreground-color={argb} ! "
|
|
|
|
|
f"video/x-raw,width=1280,height=800,framerate=60/1 ! "
|
|
|
|
|
f"{SINK_STR}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
pipeline = Gst.parse_launch(pipeline_str)
|
|
|
|
|
bus = pipeline.get_bus()
|
|
|
|
|
bus.add_signal_watch()
|
|
|
|
|
|
|
|
|
|
loop = GLib.MainLoop()
|
|
|
|
|
|
|
|
|
|
def on_message(bus, msg):
|
|
|
|
|
if msg.type == Gst.MessageType.ERROR:
|
|
|
|
|
err, debug = msg.parse_error()
|
|
|
|
|
print(f"GStreamer Error: {err}\nDebug: {debug}", flush=True)
|
|
|
|
|
loop.quit()
|
|
|
|
|
elif msg.type == Gst.MessageType.STATE_CHANGED:
|
|
|
|
|
if msg.src == pipeline:
|
|
|
|
|
old, new, _ = msg.parse_state_changed()
|
|
|
|
|
print(f"Pipeline: {old.value_nick} -> {new.value_nick}", flush=True)
|
|
|
|
|
|
2026-04-24 15:37:12 +01:00
|
|
|
def _restart_pipeline():
|
|
|
|
|
"""Cycle READY→PLAYING to produce the LP→HS startup the scope triggers on."""
|
|
|
|
|
print("UDP trigger: restarting pipeline (READY → PLAYING)", flush=True)
|
|
|
|
|
pipeline.set_state(Gst.State.READY)
|
|
|
|
|
pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
|
return False # GLib.idle_add one-shot
|
|
|
|
|
|
|
|
|
|
def _udp_listener():
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
sock.bind(('127.0.0.1', 5001))
|
|
|
|
|
while True:
|
|
|
|
|
sock.recv(64)
|
|
|
|
|
GLib.idle_add(_restart_pipeline)
|
|
|
|
|
|
|
|
|
|
threading.Thread(target=_udp_listener, daemon=True).start()
|
|
|
|
|
|
2026-04-24 15:24:27 +01:00
|
|
|
bus.connect("message", on_message)
|
|
|
|
|
pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
|
print(f"Static colour R:{r} G:{g} B:{b} (0x{argb:08X}) — running", flush=True)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
loop.run()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
pipeline.set_state(Gst.State.NULL)
|
|
|
|
|
|
|
|
|
|
|
2026-04-21 12:26:10 +01:00
|
|
|
if __name__ == "__main__":
|
2026-04-24 15:24:27 +01:00
|
|
|
import sys
|
|
|
|
|
if "--static-pink" in sys.argv:
|
|
|
|
|
play_static_color(255, 51, 187) # R:255 G:51 B:187
|
|
|
|
|
else:
|
|
|
|
|
play_kiosk()
|