import argparse import gi import glob import os import re import struct gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib VIDEO_DIR = "/root/python" def _natural_key(path): """Sort key so vid_2.mp4 comes before vid_10.mp4.""" name = os.path.basename(path) return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', name)] def build_video_list(video_dir=VIDEO_DIR): """Scan video_dir for .mp4 files and return them as sorted file:// URIs.""" files = sorted(glob.glob(os.path.join(video_dir, "*.mp4")), key=_natural_key) if not files: raise SystemExit(f"No .mp4 files found in {video_dir!r}") return [Gst.filename_to_uri(os.path.abspath(f)) for f in files] class KioskManager: def __init__(self, pipeline, videos): self.pipeline = pipeline self.videos = videos self.current_video_index = 0 # Hardware Paths self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity" self.led_trig_path = "/sys/class/leds/display-indicator/trigger" self.led_br_path = "/sys/class/leds/display-indicator/brightness" # Hex to RGB Tuples self.colours = [ (95, 1, 111), # #5F016F (255, 51, 187), # #FF33BB (255, 128, 212),# #FF80D4 (255, 173, 228) # #FFADE4 ] self.colour_index = 0 self.setup_led() def setup_led(self): try: if os.path.exists(self.led_trig_path): with open(self.led_trig_path, 'w') as f: f.write("none") with open(self.led_br_path, 'w') as f: f.write("255") self.change_led_colour() except Exception as e: print(f"LED Setup Error: {e}") def change_led_colour(self): """Cycles the next LED colour""" r, g, b = self.colours[self.colour_index] try: if os.path.exists(self.led_multi_path): with open(self.led_multi_path, 'w') as f: f.write(f"{r} {g} {b}") print(f"Loop/Switch! LED: {r} {g} {b}") self.colour_index = (self.colour_index + 1) % len(self.colours) except Exception as e: print(f"Failed to write LED colour: {e}") def switch_video(self): """Advance to the next video in the list (loops around)""" self.pipeline.set_state(Gst.State.READY) self.current_video_index = (self.current_video_index + 1) % len(self.videos) new_uri = self.videos[self.current_video_index] print(f"Switching to {new_uri}") self.pipeline.set_property("uri", new_uri) self.pipeline.set_state(Gst.State.PLAYING) def handle_button(source, condition, manager): """Listens to /dev/input/event1 - any key press skips to the next video.""" EVENT_SIZE = 24 data = source.read(EVENT_SIZE) if not data or len(data) < EVENT_SIZE: return True _, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data) # EV_KEY (1), value 1 is press if ev_type == 1 and ev_value == 1: manager.change_led_colour() manager.switch_video() return True def _resolve_start_index(start_name: str, videos: list) -> int: """Map a basename like 'vid.mp4' to its index in the videos list.""" target = os.path.basename(start_name).lower() for i, uri in enumerate(videos): if os.path.basename(uri).lower() == target: return i raise SystemExit( f"--start {start_name!r} not in kiosk video list: " + ", ".join(os.path.basename(u) for u in videos) ) def play_kiosk(start_index: int = 0): Gst.init(None) pipeline = Gst.ElementFactory.make("playbin", "player") SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! " "kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false") video_sink = Gst.parse_bin_from_description(SINK_STR, True) pipeline.set_property("video-sink", video_sink) pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink")) videos = build_video_list() if start_index >= len(videos): start_index = 0 manager = KioskManager(pipeline, videos) manager.current_video_index = start_index pipeline.set_property("uri", manager.videos[start_index]) print(f"Starting on: {manager.videos[start_index]}") # --- INPUT MONITORING --- try: btn_fd = open("/dev/input/event1", "rb") GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager) except Exception as e: print(f"Button Node Error: {e}") # --- BUS MESSAGES --- bus = pipeline.get_bus() bus.add_signal_watch() def on_message(bus, msg, manager_instance): if msg.type == Gst.MessageType.EOS: # Video ended. Cycle LED and advance to the next video in the list. manager_instance.change_led_colour() manager_instance.switch_video() elif msg.type == Gst.MessageType.ERROR: err, debug = msg.parse_error() print(f"GStreamer Error: {err}") loop.quit() bus.connect("message", on_message, manager) pipeline.set_state(Gst.State.PLAYING) loop = GLib.MainLoop() try: loop.run() except KeyboardInterrupt: pipeline.set_state(Gst.State.NULL) if __name__ == "__main__": p = argparse.ArgumentParser(description="Kiosk video player") p.add_argument("--start", default="vid_1.mp4", help="Initial video filename (basename match against kiosk list)") # parse_known_args so legacy flags don't crash the kiosk args, _unknown = p.parse_known_args() # Gst must be initialised before build_video_list (it uses filename_to_uri). Gst.init(None) start_index = _resolve_start_index(args.start, build_video_list()) play_kiosk(start_index=start_index)