Files
MiPi_TEST/device_server.py

407 lines
16 KiB
Python
Raw Normal View History

2026-04-09 10:29:53 +01:00
"""
device_server.py deploy this on the target device (192.168.45.8)
Provides:
PUT /display {"state": "on"|"off"} blank/unblank framebuffer
GET /registers read MIPI DSI PHY registers via memtool
Add addresses to REGISTER_COMMANDS to capture more register ranges.
"""
import os
import re
2026-04-21 16:23:43 +01:00
import socket
2026-04-09 10:29:53 +01:00
import subprocess
2026-04-21 12:26:10 +01:00
import threading
2026-04-24 15:37:12 +01:00
import time
2026-04-09 10:29:53 +01:00
from flask import Flask, jsonify, request
app = Flask(__name__)
2026-04-21 12:26:10 +01:00
# ---------------------------------------------------------------------------
# Video playback state (managed as a subprocess)
# ---------------------------------------------------------------------------
2026-04-21 15:38:17 +01:00
KIOSK_SCRIPT = "/root/display_test_nexio.py"
2026-04-21 12:26:10 +01:00
_video_proc: subprocess.Popen | None = None
_video_lock = threading.Lock()
2026-04-24 15:37:12 +01:00
_kiosk_args: list[str] = ["python3", KIOSK_SCRIPT] # updated when kiosk is started
2026-04-21 12:26:10 +01:00
2026-04-09 10:29:53 +01:00
# ---------------------------------------------------------------------------
# Register commands to execute on each GET /registers request.
# Each entry is a complete memtool command string.
# ---------------------------------------------------------------------------
REGISTER_COMMANDS = [
"memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2
]
2026-04-20 13:48:24 +01:00
# ---------------------------------------------------------------------------
# SN65DSI83 I2C configuration
# ---------------------------------------------------------------------------
2026-04-20 16:06:01 +01:00
SN65_I2C_BUS = 4 # i2c-4 on this board
2026-04-20 13:48:24 +01:00
SN65_I2C_ADDR = 0x2C # SN65DSI83 fixed 7-bit I2C address
2026-04-27 10:35:56 +01:00
# Settling-period poll — started in a background thread immediately after each
# kiosk kill+restart. Samples csr_0a and csr_e5 every SETTLING_INTERVAL_S for
# SETTLING_DURATION_S seconds. Results stored in _settling_log and returned by
# GET /sn65_settling so the host can correlate DSI errors with LP captures.
SETTLING_DURATION_S = 1.5 # seconds to poll after restart
SETTLING_INTERVAL_S = 0.010 # 10 ms between I2C reads
_settling_log: list = []
_settling_lock: threading.Lock = threading.Lock()
2026-04-27 13:58:09 +01:00
# SN65DSI83 configuration registers to snapshot at start and end of each settling window.
# Grouped by purpose so a reset-to-default is obvious at a glance.
# Register address → human-readable name.
_SN65_SNAPSHOT_REGS: dict[int, str] = {
# Core enable / PLL
0x09: "CLK_SRC", # DSI clock source / PLL pre-divider
0x0A: "PLL_STATUS", # PLL_EN_STAT (bit7) + CHA_CLK_DET (bit3) [status]
0x0D: "PLL_EN", # bit0 = PLL enable; should be 0x01 when running
# DSI receiver config
0x10: "DSI_LANES", # CHA_DSI_DATA_EQ_SEL + lane count
0x11: "DSI_CLK_RANGE", # DSI byte-clock frequency range
0x12: "LVDS_CLK_RANGE", # LVDS output clock range
# Active area
0x18: "HACT_LOW", # CHA active line length, low byte
0x19: "HACT_HIGH", # CHA active line length, high byte
0x1A: "VACT_LOW", # CHA vertical display size, low byte
0x1B: "VACT_HIGH", # CHA vertical display size, high byte
# Sync timing
0x20: "SYNC_DLY_LOW", # CHA sync delay, low byte
0x21: "SYNC_DLY_HIGH", # CHA sync delay, high byte
0x22: "HSYNC_W_LOW", # CHA HSYNC pulse width, low byte
0x23: "HSYNC_W_HIGH", # CHA HSYNC pulse width, high byte
0x24: "VSYNC_W_LOW", # CHA VSYNC pulse width, low byte
0x25: "VSYNC_W_HIGH", # CHA VSYNC pulse width, high byte
0x26: "HBP", # CHA horizontal back porch
0x28: "VBP", # CHA vertical back porch
0x2A: "HFP", # CHA horizontal front porch
0x2C: "VFP", # CHA vertical front porch
# Format / output
0x2D: "TEST_PATTERN", # bit0 = enable colour bar test pattern
0x3C: "LVDS_FORMAT", # LVDS output format (colour depth, channel swap)
# Live LVDS line counter — changes every frame when bridge is actively outputting
0xE0: "LINE_CNT_LOW", # CHA line count, low byte [live]
0xE1: "LINE_CNT_HIGH", # CHA line count, high byte [live]
# Error flags
0xE5: "CHA_ERR", # DSI error flags [status]
}
def _sn65_snapshot() -> dict:
"""Read all _SN65_SNAPSHOT_REGS in one pass. Returns {reg_hex: value_hex|None}."""
result = {}
for reg, name in _SN65_SNAPSHOT_REGS.items():
val, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, reg)
result[f"0x{reg:02x}"] = {"name": name, "value": f"0x{val:02x}" if val is not None else None}
return result
2026-04-27 10:35:56 +01:00
def _run_settling_poll() -> None:
2026-04-27 13:58:09 +01:00
"""Poll SN65DSI83 csr_0a + csr_e5 at ~10 ms intervals for 1.5 s after restart.
Also takes a full configuration register snapshot at t=0 and t=end so callers
can detect bridge re-initialisation or configuration loss."""
2026-04-27 10:35:56 +01:00
t_start = time.time()
t_end = t_start + SETTLING_DURATION_S
2026-04-27 13:58:09 +01:00
snapshot_start = _sn65_snapshot()
2026-04-27 10:35:56 +01:00
readings: list = []
while time.time() < t_end:
t_ms = round((time.time() - t_start) * 1000, 1)
val_0a, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0x0A)
val_e5, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0xE5)
readings.append({
"t_ms": t_ms,
"csr_0a": f"0x{val_0a:02x}" if val_0a is not None else None,
"csr_e5": f"0x{val_e5:02x}" if val_e5 is not None else None,
"pll_lock": bool(val_0a & 0x80) if val_0a is not None else None,
"clk_det": bool(val_0a & 0x08) if val_0a is not None else None,
"any_error": bool(val_e5) if val_e5 is not None else None,
})
time.sleep(SETTLING_INTERVAL_S)
2026-04-27 13:58:09 +01:00
snapshot_end = _sn65_snapshot()
2026-04-27 10:35:56 +01:00
with _settling_lock:
_settling_log.clear()
_settling_log.extend(readings)
2026-04-27 13:58:09 +01:00
_settling_extra["snapshot_start"] = snapshot_start
_settling_extra["snapshot_end"] = snapshot_end
# Stores the two register snapshots from the most recent settling poll.
_settling_extra: dict = {}
2026-04-27 10:35:56 +01:00
2026-04-09 10:29:53 +01:00
# Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini)
_DSIM_NAMES = {
0x32e10004: "DSIM_STATUS",
0x32e10008: "DSIM_CLKCTRL",
0x32e1000c: "DSIM_TIMEOUT",
0x32e10010: "DSIM_CONFIG",
0x32e10014: "DSIM_ESCMODE",
0x32e100ac: "DSIM_PHYACCHR",
0x32e100b0: "DSIM_PHYACCHR1",
0x32e100b4: "DSIM_PHYTIMING",
0x32e100b8: "DSIM_PHYTIMING1",
0x32e100bc: "DSIM_PHYTIMING2",
}
def _parse_memtool_output(raw: str) -> list:
"""
Parse 'memtool md -l' output into a list of dicts.
Handles both formats:
32e100b4: 00000001 12345678 ...
0x32e100b4: 0x00000001 0x12345678 ...
"""
registers = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
m = re.match(r"(?:0x)?([0-9a-fA-F]+)\s*:\s*(.+)", line)
if not m:
continue
base_addr = int(m.group(1), 16)
values = re.findall(r"[0-9a-fA-F]{8}", m.group(2))
for i, val in enumerate(values):
addr = base_addr + i * 4
registers.append({
"address": f"0x{addr:08x}",
"value": f"0x{val.lower()}",
"name": _DSIM_NAMES.get(addr, ""),
})
return registers
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/display", methods=["PUT"])
def control_display():
2026-04-24 15:37:12 +01:00
global _video_proc
2026-04-21 15:38:17 +01:00
data = request.get_json(force=True) or {}
2026-04-09 10:29:53 +01:00
state = data.get("state", "").lower()
2026-04-21 15:38:17 +01:00
if state == "on":
with _video_lock:
2026-04-24 15:37:12 +01:00
if "--static-pink" in _kiosk_args:
# Full kill+restart so the DSI controller goes through LP-11 → HS
# startup, giving the scope a clean LP trigger on Pass 1.
if _video_proc is not None and _video_proc.poll() is None:
_video_proc.terminate()
try:
_video_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
_video_proc.kill()
_video_proc.wait()
time.sleep(0.15) # let DSI reach LP-11
2026-04-27 10:35:56 +01:00
# Start settling poll immediately — captures csr_e5 error flags
# during DSI startup so the host can determine root cause of flicker.
threading.Thread(target=_run_settling_poll, daemon=True).start()
2026-04-24 15:37:12 +01:00
try:
log = open("/tmp/kiosk.log", "w")
_video_proc = subprocess.Popen(
_kiosk_args, stdout=log,
stderr=subprocess.STDOUT, env=os.environ.copy(),
)
except Exception as e:
return jsonify({"error": f"restart failed: {e}"}), 500
return jsonify({"status": "static-pink restarted"}), 200
elif _video_proc is not None and _video_proc.poll() is None:
# Video mode: UDP trigger switches clip and reloads pipeline
2026-04-21 16:23:43 +01:00
_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
_sock.sendto(b'switch', ('127.0.0.1', 5001))
_sock.close()
2026-04-21 15:38:17 +01:00
return jsonify({"status": "video switched"}), 200
2026-04-24 15:37:12 +01:00
# fallback when no kiosk is running
2026-04-09 10:29:53 +01:00
os.system("echo 0 > /sys/class/graphics/fb0/blank")
return jsonify({"status": "Display ON"}), 200
2026-04-21 15:38:17 +01:00
elif state == "off":
# nothing to do while video is managing the display
return jsonify({"status": "ok"}), 200
2026-04-09 10:29:53 +01:00
else:
return jsonify({"error": "Invalid state. Use 'on' or 'off'"}), 400
@app.route("/registers", methods=["GET"])
def get_registers():
"""Read MIPI DSI PHY timing registers via memtool and return JSON."""
all_registers = []
raw_lines = []
errors = []
for cmd_str in REGISTER_COMMANDS:
try:
result = subprocess.run(
cmd_str.split(), capture_output=True, text=True, timeout=5
)
raw = result.stdout.strip()
if raw:
raw_lines.append(raw)
all_registers.extend(_parse_memtool_output(raw))
if result.returncode != 0 and result.stderr.strip():
errors.append(f"{cmd_str}: {result.stderr.strip()}")
except FileNotFoundError:
errors.append(f"{cmd_str}: memtool not found in PATH")
except subprocess.TimeoutExpired:
errors.append(f"{cmd_str}: timed out after 5 s")
except Exception as e:
errors.append(f"{cmd_str}: {e}")
return jsonify({
"commands": REGISTER_COMMANDS,
"registers": all_registers,
"raw": "\n".join(raw_lines),
"errors": errors if errors else None,
}), 200
2026-04-20 16:06:01 +01:00
def _i2c_read_byte(bus: int, addr: int, reg: int) -> tuple[int | None, str]:
"""Read one byte via i2cget. Returns (value, "") on success or (None, error_str) on failure."""
2026-04-20 13:48:24 +01:00
try:
result = subprocess.run(
2026-04-20 16:06:01 +01:00
["i2cget", "-y", "-f", str(bus), f"0x{addr:02x}", f"0x{reg:02x}"],
2026-04-20 13:48:24 +01:00
capture_output=True, text=True, timeout=3
)
if result.returncode == 0:
2026-04-20 16:06:01 +01:00
return int(result.stdout.strip(), 16), ""
stderr = result.stderr.strip() or f"exit code {result.returncode}"
return None, stderr
except FileNotFoundError:
return None, "i2cget not found in PATH"
except Exception as e:
return None, str(e)
2026-04-20 13:48:24 +01:00
2026-04-27 10:35:56 +01:00
@app.route("/sn65_settling", methods=["GET"])
def get_sn65_settling():
2026-04-27 13:58:09 +01:00
"""Return the most recent post-restart settling poll.
Includes:
snapshot_start full register dump taken immediately before polling begins
snapshot_end full register dump taken immediately after polling ends
readings csr_0a + csr_e5 sampled every ~10 ms during the window
"""
2026-04-27 10:35:56 +01:00
with _settling_lock:
2026-04-27 13:58:09 +01:00
readings = list(_settling_log)
snap_start = dict(_settling_extra.get("snapshot_start") or {})
snap_end = dict(_settling_extra.get("snapshot_end") or {})
2026-04-27 10:35:56 +01:00
error_readings = [r for r in readings if r.get("any_error")]
2026-04-27 13:58:09 +01:00
# Diff the two snapshots so the caller can immediately see what changed.
changed = {}
for reg, info_s in snap_start.items():
info_e = snap_end.get(reg, {})
v_s = info_s.get("value")
v_e = info_e.get("value")
if v_s != v_e:
changed[reg] = {"name": info_s.get("name"), "start": v_s, "end": v_e}
2026-04-27 10:35:56 +01:00
return jsonify({
"n_readings": len(readings),
"n_error": len(error_readings),
"duration_s": SETTLING_DURATION_S,
"interval_ms": int(SETTLING_INTERVAL_S * 1000),
2026-04-27 13:58:09 +01:00
"snapshot_start": snap_start,
"snapshot_end": snap_end,
"changed_regs": changed,
2026-04-27 10:35:56 +01:00
"readings": readings,
}), 200
2026-04-20 13:48:24 +01:00
@app.route("/sn65_registers", methods=["GET"])
def get_sn65_registers():
"""Read SN65DSI83 CSR 0x0A (PLL/CLK status) and 0xE5 (error flags) via I2C."""
2026-04-20 16:06:01 +01:00
csr_0a, err_0a = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0x0A)
csr_e5, err_e5 = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0xE5)
2026-04-20 13:48:24 +01:00
regs = {}
errors = []
if csr_0a is not None:
regs["csr_0a"] = {
"value": f"0x{csr_0a:02x}",
"pll_lock": bool(csr_0a & 0x80), # bit 7: PLL_EN_STAT (powered + locked)
2026-04-20 16:06:01 +01:00
"clk_det": bool(csr_0a & 0x08), # bit 3: CHA_CLK_DET (HS clock detected)
2026-04-20 13:48:24 +01:00
}
else:
2026-04-20 16:06:01 +01:00
errors.append(f"CSR 0x0A: {err_0a}")
2026-04-20 13:48:24 +01:00
if csr_e5 is not None:
regs["csr_e5"] = {
"value": f"0x{csr_e5:02x}",
"pll_unlock": bool(csr_e5 & 0x01), # default=1 at reset; must clear after init
"cha_sot_bit_err": bool(csr_e5 & 0x04),
"cha_llp_err": bool(csr_e5 & 0x08), # SoT/EoT/lane-merge error
"cha_ecc_err": bool(csr_e5 & 0x10),
"cha_lp_err": bool(csr_e5 & 0x20),
"cha_crc_err": bool(csr_e5 & 0x40),
}
else:
2026-04-20 16:06:01 +01:00
errors.append(f"CSR 0xE5: {err_e5}")
2026-04-20 13:48:24 +01:00
return jsonify({
"i2c_bus": SN65_I2C_BUS,
"i2c_addr": f"0x{SN65_I2C_ADDR:02x}",
"registers": regs,
"errors": errors if errors else None,
}), 200
2026-04-21 12:26:10 +01:00
@app.route("/video", methods=["PUT"])
def control_video():
"""Start or stop the kiosk video player.
PUT /video {"action": "start"|"stop"}
"""
global _video_proc
data = request.get_json(force=True) or {}
action = data.get("action", "").lower()
with _video_lock:
if action == "start":
if _video_proc is not None and _video_proc.poll() is None:
return jsonify({"status": "already_running", "pid": _video_proc.pid}), 200
2026-04-21 16:05:58 +01:00
try:
2026-04-24 15:24:27 +01:00
cmd = ["python3", KIOSK_SCRIPT]
mode = data.get("mode", "")
if mode == "static-pink":
cmd.append("--static-pink")
2026-04-24 15:37:12 +01:00
_kiosk_args[:] = cmd # persist so control_display knows the mode
2026-04-21 16:05:58 +01:00
log = open("/tmp/kiosk.log", "w")
_video_proc = subprocess.Popen(
2026-04-24 15:24:27 +01:00
cmd,
2026-04-21 16:05:58 +01:00
stdout=log,
stderr=subprocess.STDOUT,
env=os.environ.copy(),
)
except Exception as e:
return jsonify({"error": f"failed to launch kiosk: {e}"}), 500
2026-04-24 15:24:27 +01:00
return jsonify({"status": "started", "mode": mode or "video", "pid": _video_proc.pid}), 200
2026-04-21 12:26:10 +01:00
elif action == "stop":
if _video_proc is not None and _video_proc.poll() is None:
_video_proc.terminate()
try:
_video_proc.wait(timeout=3)
except subprocess.TimeoutExpired:
_video_proc.kill()
_video_proc.wait()
_video_proc = None
return jsonify({"status": "stopped"}), 200
else:
return jsonify({"error": "Invalid action. Use 'start' or 'stop'"}), 400
2026-04-09 10:29:53 +01:00
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)