import gi import struct import os gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib class KioskManager: def __init__(self, pipeline): self.pipeline = pipeline self.videos = [ "file:///root/python/vid.mp4", "file:///root/python/vid2.mp4" ] self.current_video_index = 0 # Hardware Paths self.backlight_path = "/sys/class/backlight/main_backlight/brightness" self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity" self.led_trig_path = "/sys/class/leds/display-indicator/trigger" self.led_br_path = "/sys/class/leds/display-indicator/brightness" # Backlight State self.bl_current = 10 self.bl_direction = 1 # 1 for increasing, -1 for decreasing # LED State self.colors = [(95, 1, 111), (255, 51, 187), (255, 128, 212), (255, 173, 228)] self.color_index = 0 self.setup_led() def setup_led(self): try: if os.path.exists(self.led_trig_path): with open(self.led_trig_path, 'w') as f: f.write("none") with open(self.led_br_path, 'w') as f: f.write("255") self.change_led_colour() except Exception as e: print(f"LED Setup Error: {e}") def change_led_colour(self): """Cycles the LED color""" r, g, b = self.colors[self.color_index] try: if os.path.exists(self.led_multi_path): with open(self.led_multi_path, 'w') as f: f.write(f"{r} {g} {b}") self.color_index = (self.color_index + 1) % len(self.colors) except Exception as e: print(f"Failed to write LED colour: {e}") def cycle_backlight(self): """Touch Logic: Adjusts screen brightness and cycles LED""" self.bl_current += self.bl_direction if self.bl_current >= 20: self.bl_current = 20 self.bl_direction = -1 elif self.bl_current <= 0: self.bl_current = 0 self.bl_direction = 1 try: if os.path.exists(self.backlight_path): with open(self.backlight_path, 'w') as f: f.write(str(self.bl_current)) print(f"Touch Event: Brightness now {self.bl_current}") self.change_led_colour() # Visual feedback for touch except Exception as e: print(f"Backlight Error: {e}") def switch_video(self): """Button Logic: Swaps between vid.mp4 and vid2.mp4""" self.pipeline.set_state(Gst.State.READY) self.current_video_index = (self.current_video_index + 1) % len(self.videos) new_uri = self.videos[self.current_video_index] print(f"Physical Button: Swapping to {new_uri}") self.pipeline.set_property("uri", new_uri) self.pipeline.set_state(Gst.State.PLAYING) def handle_touch(source, condition, manager): """Listens to /dev/input/event2""" EVENT_SIZE = 24 data = source.read(EVENT_SIZE) if not data or len(data) < EVENT_SIZE: return True _, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data) # EV_KEY (1) and BTN_TOUCH (330), value 1 is Press if ev_type == 1 and ev_code == 330 and ev_value == 1: manager.cycle_backlight() return True def handle_button(source, condition, manager): """Listens to /dev/input/event1""" EVENT_SIZE = 24 data = source.read(EVENT_SIZE) if not data or len(data) < EVENT_SIZE: return True _, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data) # EV_KEY (1), value 1 is Press # Note: We don't check ev_code 330 here, as it's a physical button if ev_type == 1 and ev_value == 1: manager.switch_video() return True def play_kiosk(): Gst.init(None) pipeline = Gst.ElementFactory.make("playbin", "player") SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! " "kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false") video_sink = Gst.parse_bin_from_description(SINK_STR, True) pipeline.set_property("video-sink", video_sink) pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink")) manager = KioskManager(pipeline) pipeline.set_property("uri", manager.videos[0]) # --- INPUT ASSIGNMENT --- try: # Physical Button assigned to event1 btn_fd = open("/dev/input/event1", "rb") GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager) except Exception as e: print(f"Button Node Error: {e}") try: # Touchscreen assigned to event2 tch_fd = open("/dev/input/event2", "rb") GLib.io_add_watch(tch_fd, GLib.IO_IN, handle_touch, manager) except Exception as e: print(f"Touch Node Error: {e}") # --- LOOPING LOGIC --- bus = pipeline.get_bus() bus.add_signal_watch() def on_message(bus, msg, manager_instance): if msg.type == Gst.MessageType.EOS: # Video ended, cycle LED and loop back to start manager_instance.change_led_colour() pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, 0) elif msg.type == Gst.MessageType.ERROR: err, debug = msg.parse_error() print(f"GStreamer Error: {err}") loop.quit() return True bus.connect("message", on_message, manager) pipeline.set_state(Gst.State.PLAYING) loop = GLib.MainLoop() try: loop.run() except KeyboardInterrupt: pipeline.set_state(Gst.State.NULL) if __name__ == "__main__": play_kiosk()