""" device_server.py — deploy this on the target device (192.168.45.8) Provides: PUT /display {"state": "on"|"off"} — blank/unblank framebuffer GET /registers — read MIPI DSI PHY registers via memtool Add addresses to REGISTER_COMMANDS to capture more register ranges. """ import os import re import socket import subprocess import threading import time from flask import Flask, jsonify, request app = Flask(__name__) # --------------------------------------------------------------------------- # Video playback state (managed as a subprocess) # --------------------------------------------------------------------------- KIOSK_SCRIPT = "/root/display_test_nexio.py" _video_proc: subprocess.Popen | None = None _video_lock = threading.Lock() _kiosk_args: list[str] = ["python3", KIOSK_SCRIPT] # updated when kiosk is started # --------------------------------------------------------------------------- # Register commands to execute on each GET /registers request. # Each entry is a complete memtool command string. # --------------------------------------------------------------------------- REGISTER_COMMANDS = [ "memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2 ] # --------------------------------------------------------------------------- # SN65DSI83 I2C configuration # --------------------------------------------------------------------------- SN65_I2C_BUS = 4 # i2c-4 on this board SN65_I2C_ADDR = 0x2C # SN65DSI83 fixed 7-bit I2C address # Settling-period poll — started in a background thread immediately after each # kiosk kill+restart. Samples csr_0a and csr_e5 every SETTLING_INTERVAL_S for # SETTLING_DURATION_S seconds. Results stored in _settling_log and returned by # GET /sn65_settling so the host can correlate DSI errors with LP captures. SETTLING_DURATION_S = 1.5 # seconds to poll after restart SETTLING_INTERVAL_S = 0.010 # 10 ms between I2C reads _settling_log: list = [] _settling_lock: threading.Lock = threading.Lock() def _run_settling_poll() -> None: """Poll SN65DSI83 csr_0a + csr_e5 at 10 ms intervals for 1.5 s after restart.""" t_start = time.time() t_end = t_start + SETTLING_DURATION_S readings: list = [] while time.time() < t_end: t_ms = round((time.time() - t_start) * 1000, 1) val_0a, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0x0A) val_e5, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0xE5) readings.append({ "t_ms": t_ms, "csr_0a": f"0x{val_0a:02x}" if val_0a is not None else None, "csr_e5": f"0x{val_e5:02x}" if val_e5 is not None else None, "pll_lock": bool(val_0a & 0x80) if val_0a is not None else None, "clk_det": bool(val_0a & 0x08) if val_0a is not None else None, "any_error": bool(val_e5) if val_e5 is not None else None, }) time.sleep(SETTLING_INTERVAL_S) with _settling_lock: _settling_log.clear() _settling_log.extend(readings) # Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini) _DSIM_NAMES = { 0x32e10004: "DSIM_STATUS", 0x32e10008: "DSIM_CLKCTRL", 0x32e1000c: "DSIM_TIMEOUT", 0x32e10010: "DSIM_CONFIG", 0x32e10014: "DSIM_ESCMODE", 0x32e100ac: "DSIM_PHYACCHR", 0x32e100b0: "DSIM_PHYACCHR1", 0x32e100b4: "DSIM_PHYTIMING", 0x32e100b8: "DSIM_PHYTIMING1", 0x32e100bc: "DSIM_PHYTIMING2", } def _parse_memtool_output(raw: str) -> list: """ Parse 'memtool md -l' output into a list of dicts. Handles both formats: 32e100b4: 00000001 12345678 ... 0x32e100b4: 0x00000001 0x12345678 ... """ registers = [] for line in raw.splitlines(): line = line.strip() if not line: continue m = re.match(r"(?:0x)?([0-9a-fA-F]+)\s*:\s*(.+)", line) if not m: continue base_addr = int(m.group(1), 16) values = re.findall(r"[0-9a-fA-F]{8}", m.group(2)) for i, val in enumerate(values): addr = base_addr + i * 4 registers.append({ "address": f"0x{addr:08x}", "value": f"0x{val.lower()}", "name": _DSIM_NAMES.get(addr, ""), }) return registers # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.route("/display", methods=["PUT"]) def control_display(): global _video_proc data = request.get_json(force=True) or {} state = data.get("state", "").lower() if state == "on": with _video_lock: if "--static-pink" in _kiosk_args: # Full kill+restart so the DSI controller goes through LP-11 → HS # startup, giving the scope a clean LP trigger on Pass 1. if _video_proc is not None and _video_proc.poll() is None: _video_proc.terminate() try: _video_proc.wait(timeout=3) except subprocess.TimeoutExpired: _video_proc.kill() _video_proc.wait() time.sleep(0.15) # let DSI reach LP-11 # Start settling poll immediately — captures csr_e5 error flags # during DSI startup so the host can determine root cause of flicker. threading.Thread(target=_run_settling_poll, daemon=True).start() try: log = open("/tmp/kiosk.log", "w") _video_proc = subprocess.Popen( _kiosk_args, stdout=log, stderr=subprocess.STDOUT, env=os.environ.copy(), ) except Exception as e: return jsonify({"error": f"restart failed: {e}"}), 500 return jsonify({"status": "static-pink restarted"}), 200 elif _video_proc is not None and _video_proc.poll() is None: # Video mode: UDP trigger switches clip and reloads pipeline _sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) _sock.sendto(b'switch', ('127.0.0.1', 5001)) _sock.close() return jsonify({"status": "video switched"}), 200 # fallback when no kiosk is running os.system("echo 0 > /sys/class/graphics/fb0/blank") return jsonify({"status": "Display ON"}), 200 elif state == "off": # nothing to do while video is managing the display return jsonify({"status": "ok"}), 200 else: return jsonify({"error": "Invalid state. Use 'on' or 'off'"}), 400 @app.route("/registers", methods=["GET"]) def get_registers(): """Read MIPI DSI PHY timing registers via memtool and return JSON.""" all_registers = [] raw_lines = [] errors = [] for cmd_str in REGISTER_COMMANDS: try: result = subprocess.run( cmd_str.split(), capture_output=True, text=True, timeout=5 ) raw = result.stdout.strip() if raw: raw_lines.append(raw) all_registers.extend(_parse_memtool_output(raw)) if result.returncode != 0 and result.stderr.strip(): errors.append(f"{cmd_str}: {result.stderr.strip()}") except FileNotFoundError: errors.append(f"{cmd_str}: memtool not found in PATH") except subprocess.TimeoutExpired: errors.append(f"{cmd_str}: timed out after 5 s") except Exception as e: errors.append(f"{cmd_str}: {e}") return jsonify({ "commands": REGISTER_COMMANDS, "registers": all_registers, "raw": "\n".join(raw_lines), "errors": errors if errors else None, }), 200 def _i2c_read_byte(bus: int, addr: int, reg: int) -> tuple[int | None, str]: """Read one byte via i2cget. Returns (value, "") on success or (None, error_str) on failure.""" try: result = subprocess.run( ["i2cget", "-y", "-f", str(bus), f"0x{addr:02x}", f"0x{reg:02x}"], capture_output=True, text=True, timeout=3 ) if result.returncode == 0: return int(result.stdout.strip(), 16), "" stderr = result.stderr.strip() or f"exit code {result.returncode}" return None, stderr except FileNotFoundError: return None, "i2cget not found in PATH" except Exception as e: return None, str(e) @app.route("/sn65_settling", methods=["GET"]) def get_sn65_settling(): """Return the most recent post-restart settling poll (csr_0a + csr_e5 over 1.5 s).""" with _settling_lock: readings = list(_settling_log) error_readings = [r for r in readings if r.get("any_error")] return jsonify({ "n_readings": len(readings), "n_error": len(error_readings), "duration_s": SETTLING_DURATION_S, "interval_ms": int(SETTLING_INTERVAL_S * 1000), "readings": readings, }), 200 @app.route("/sn65_registers", methods=["GET"]) def get_sn65_registers(): """Read SN65DSI83 CSR 0x0A (PLL/CLK status) and 0xE5 (error flags) via I2C.""" csr_0a, err_0a = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0x0A) csr_e5, err_e5 = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0xE5) regs = {} errors = [] if csr_0a is not None: regs["csr_0a"] = { "value": f"0x{csr_0a:02x}", "pll_lock": bool(csr_0a & 0x80), # bit 7: PLL_EN_STAT (powered + locked) "clk_det": bool(csr_0a & 0x08), # bit 3: CHA_CLK_DET (HS clock detected) } else: errors.append(f"CSR 0x0A: {err_0a}") if csr_e5 is not None: regs["csr_e5"] = { "value": f"0x{csr_e5:02x}", "pll_unlock": bool(csr_e5 & 0x01), # default=1 at reset; must clear after init "cha_sot_bit_err": bool(csr_e5 & 0x04), "cha_llp_err": bool(csr_e5 & 0x08), # SoT/EoT/lane-merge error "cha_ecc_err": bool(csr_e5 & 0x10), "cha_lp_err": bool(csr_e5 & 0x20), "cha_crc_err": bool(csr_e5 & 0x40), } else: errors.append(f"CSR 0xE5: {err_e5}") return jsonify({ "i2c_bus": SN65_I2C_BUS, "i2c_addr": f"0x{SN65_I2C_ADDR:02x}", "registers": regs, "errors": errors if errors else None, }), 200 @app.route("/video", methods=["PUT"]) def control_video(): """Start or stop the kiosk video player. PUT /video {"action": "start"|"stop"} """ global _video_proc data = request.get_json(force=True) or {} action = data.get("action", "").lower() with _video_lock: if action == "start": if _video_proc is not None and _video_proc.poll() is None: return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200 try: cmd = ["python3", KIOSK_SCRIPT] mode = data.get("mode", "") if mode == "static-pink": cmd.append("--static-pink") _kiosk_args[:] = cmd # persist so control_display knows the mode log = open("/tmp/kiosk.log", "w") _video_proc = subprocess.Popen( cmd, stdout=log, stderr=subprocess.STDOUT, env=os.environ.copy(), ) except Exception as e: return jsonify({"error": f"failed to launch kiosk: {e}"}), 500 return jsonify({"status": "started", "mode": mode or "video", "pid": _video_proc.pid}), 200 elif action == "stop": if _video_proc is not None and _video_proc.poll() is None: _video_proc.terminate() try: _video_proc.wait(timeout=3) except subprocess.TimeoutExpired: _video_proc.kill() _video_proc.wait() _video_proc = None return jsonify({"status": "stopped"}), 200 else: return jsonify({"error": "Invalid action. Use 'start' or 'stop'"}), 400 if __name__ == "__main__": app.run(host="0.0.0.0", port=5000)