172 lines
5.8 KiB
Python
172 lines
5.8 KiB
Python
import argparse
|
|
import gi
|
|
import glob
|
|
import os
|
|
import re
|
|
import struct
|
|
|
|
gi.require_version('Gst', '1.0')
|
|
from gi.repository import Gst, GLib
|
|
|
|
VIDEO_DIR = "/root/python"
|
|
|
|
|
|
def _natural_key(path):
|
|
"""Sort key so vid_2.mp4 comes before vid_10.mp4."""
|
|
name = os.path.basename(path)
|
|
return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', name)]
|
|
|
|
|
|
def build_video_list(video_dir=VIDEO_DIR):
|
|
"""Scan video_dir for .mp4 files and return them as sorted file:// URIs."""
|
|
files = sorted(glob.glob(os.path.join(video_dir, "*.mp4")), key=_natural_key)
|
|
if not files:
|
|
raise SystemExit(f"No .mp4 files found in {video_dir!r}")
|
|
return [Gst.filename_to_uri(os.path.abspath(f)) for f in files]
|
|
|
|
|
|
class KioskManager:
|
|
def __init__(self, pipeline, videos):
|
|
self.pipeline = pipeline
|
|
self.videos = videos
|
|
self.current_video_index = 0
|
|
|
|
# Hardware Paths
|
|
self.led_multi_path = "/sys/class/leds/display-indicator/multi_intensity"
|
|
self.led_trig_path = "/sys/class/leds/display-indicator/trigger"
|
|
self.led_br_path = "/sys/class/leds/display-indicator/brightness"
|
|
|
|
# Hex to RGB Tuples
|
|
self.colours = [
|
|
(95, 1, 111), # #5F016F
|
|
(255, 51, 187), # #FF33BB
|
|
(255, 128, 212),# #FF80D4
|
|
(255, 173, 228) # #FFADE4
|
|
]
|
|
self.colour_index = 0
|
|
|
|
self.setup_led()
|
|
|
|
def setup_led(self):
|
|
try:
|
|
if os.path.exists(self.led_trig_path):
|
|
with open(self.led_trig_path, 'w') as f: f.write("none")
|
|
with open(self.led_br_path, 'w') as f: f.write("255")
|
|
self.change_led_colour()
|
|
except Exception as e:
|
|
print(f"LED Setup Error: {e}")
|
|
|
|
def change_led_colour(self):
|
|
"""Cycles the next LED colour"""
|
|
r, g, b = self.colours[self.colour_index]
|
|
try:
|
|
if os.path.exists(self.led_multi_path):
|
|
with open(self.led_multi_path, 'w') as f:
|
|
f.write(f"{r} {g} {b}")
|
|
print(f"Loop/Switch! LED: {r} {g} {b}")
|
|
self.colour_index = (self.colour_index + 1) % len(self.colours)
|
|
except Exception as e:
|
|
print(f"Failed to write LED colour: {e}")
|
|
|
|
def switch_video(self):
|
|
"""Advance to the next video in the list (loops around)"""
|
|
self.pipeline.set_state(Gst.State.READY)
|
|
self.current_video_index = (self.current_video_index + 1) % len(self.videos)
|
|
new_uri = self.videos[self.current_video_index]
|
|
|
|
print(f"Switching to {new_uri}")
|
|
self.pipeline.set_property("uri", new_uri)
|
|
self.pipeline.set_state(Gst.State.PLAYING)
|
|
|
|
|
|
def handle_button(source, condition, manager):
|
|
"""Listens to /dev/input/event1 - any key press skips to the next video."""
|
|
EVENT_SIZE = 24
|
|
data = source.read(EVENT_SIZE)
|
|
if not data or len(data) < EVENT_SIZE:
|
|
return True
|
|
|
|
_, _, ev_type, ev_code, ev_value = struct.unpack('qqHHI', data)
|
|
|
|
# EV_KEY (1), value 1 is press
|
|
if ev_type == 1 and ev_value == 1:
|
|
manager.change_led_colour()
|
|
manager.switch_video()
|
|
|
|
return True
|
|
|
|
|
|
def _resolve_start_index(start_name: str, videos: list) -> int:
|
|
"""Map a basename like 'vid.mp4' to its index in the videos list."""
|
|
target = os.path.basename(start_name).lower()
|
|
for i, uri in enumerate(videos):
|
|
if os.path.basename(uri).lower() == target:
|
|
return i
|
|
raise SystemExit(
|
|
f"--start {start_name!r} not in kiosk video list: "
|
|
+ ", ".join(os.path.basename(u) for u in videos)
|
|
)
|
|
|
|
|
|
def play_kiosk(start_index: int = 0):
|
|
Gst.init(None)
|
|
|
|
pipeline = Gst.ElementFactory.make("playbin", "player")
|
|
|
|
SINK_STR = ("videoconvert ! video/x-raw,format=BGRx ! "
|
|
"kmssink driver-name=mxsfb-drm connector-id=37 plane-id=31 can-scale=false")
|
|
video_sink = Gst.parse_bin_from_description(SINK_STR, True)
|
|
pipeline.set_property("video-sink", video_sink)
|
|
pipeline.set_property("audio-sink", Gst.ElementFactory.make("fakesink"))
|
|
|
|
videos = build_video_list()
|
|
if start_index >= len(videos):
|
|
start_index = 0
|
|
manager = KioskManager(pipeline, videos)
|
|
manager.current_video_index = start_index
|
|
pipeline.set_property("uri", manager.videos[start_index])
|
|
print(f"Starting on: {manager.videos[start_index]}")
|
|
|
|
# --- INPUT MONITORING ---
|
|
try:
|
|
btn_fd = open("/dev/input/event1", "rb")
|
|
GLib.io_add_watch(btn_fd, GLib.IO_IN, handle_button, manager)
|
|
except Exception as e:
|
|
print(f"Button Node Error: {e}")
|
|
|
|
# --- BUS MESSAGES ---
|
|
bus = pipeline.get_bus()
|
|
bus.add_signal_watch()
|
|
|
|
def on_message(bus, msg, manager_instance):
|
|
if msg.type == Gst.MessageType.EOS:
|
|
# Video ended. Cycle LED and advance to the next video in the list.
|
|
manager_instance.change_led_colour()
|
|
manager_instance.switch_video()
|
|
elif msg.type == Gst.MessageType.ERROR:
|
|
err, debug = msg.parse_error()
|
|
print(f"GStreamer Error: {err}")
|
|
loop.quit()
|
|
|
|
bus.connect("message", on_message, manager)
|
|
|
|
pipeline.set_state(Gst.State.PLAYING)
|
|
loop = GLib.MainLoop()
|
|
|
|
try:
|
|
loop.run()
|
|
except KeyboardInterrupt:
|
|
pipeline.set_state(Gst.State.NULL)
|
|
|
|
if __name__ == "__main__":
|
|
p = argparse.ArgumentParser(description="Kiosk video player")
|
|
p.add_argument("--start", default="vid_1.mp4",
|
|
help="Initial video filename (basename match against kiosk list)")
|
|
# parse_known_args so legacy flags don't crash the kiosk
|
|
args, _unknown = p.parse_known_args()
|
|
|
|
# Gst must be initialised before build_video_list (it uses filename_to_uri).
|
|
Gst.init(None)
|
|
start_index = _resolve_start_index(args.start, build_video_list())
|
|
play_kiosk(start_index=start_index)
|