"""On-target hardware shims used by the Flask app. Runs ON THE i.MX 8M Mini, not the host PC. Shells out to memtool, i2cget, and /sys/kernel/debug/regmap. Cache bypass for SN65DSI83 is mandatory before every IRQ_STAT read — see CLAUDE.md invariant 1. """ from __future__ import annotations import logging import re import shlex import signal import subprocess import time from pathlib import Path from typing import Optional log = logging.getLogger(__name__) SN65_REGMAP_DIR = "/sys/kernel/debug/regmap/4-002c" SN65_I2C_BUS = 4 SN65_I2C_ADDR = 0x2C DSIM_PHYTIMING_BASE = 0x32E100B4 DSIM_PHYTIMING_LEN = 0x0C FB_BLANK_PATH = "/sys/class/graphics/fb0/blank" # Held while a video pipeline is running so PUT /video stop can kill it. _video_proc: Optional[subprocess.Popen] = None # --------------------------------------------------------------------------- # Process helpers # --------------------------------------------------------------------------- def _run(cmd: str, check: bool = True, timeout: float = 5.0) -> str: log.debug("run: %s", cmd) res = subprocess.run( shlex.split(cmd), capture_output=True, text=True, timeout=timeout, ) if check and res.returncode != 0: raise RuntimeError( f"Command failed: {cmd}\nstderr: {res.stderr.strip()}" ) return res.stdout def _write_sysfs(path: str, value: str) -> None: Path(path).write_text(value) # --------------------------------------------------------------------------- # DSIM PHY_TIMING registers # --------------------------------------------------------------------------- def read_dsim_phy_timing() -> dict: """memtool md -l 0x32e100b4+0x0c → 3 little-endian 32-bit words. memtool prints lines like '0x32e100b4: 00000306 03120a04 00040707' — we strip the address prefix (everything up to and including the colon) before extracting hex words to avoid matching the address itself. """ out = _run(f"memtool md -l 0x{DSIM_PHYTIMING_BASE:x}+0x{DSIM_PHYTIMING_LEN:x}") hex_words: list[str] = [] for line in out.splitlines(): if ":" in line: line = line.split(":", 1)[1] hex_words.extend(re.findall(r"\b([0-9a-fA-F]{8})\b", line)) if len(hex_words) < 3: raise RuntimeError(f"memtool returned unexpected output:\n{out}") pt, pt1, pt2 = hex_words[:3] return { "PHY_TIMING": f"0x{pt}", "PHY_TIMING1": f"0x{pt1}", "PHY_TIMING2": f"0x{pt2}", "raw_hex": f"{pt} {pt1} {pt2}", } # --------------------------------------------------------------------------- # SN65DSI83 register map — regmap-bypassed # --------------------------------------------------------------------------- def _bypass_sn65_regmap_cache() -> None: bypass = Path(SN65_REGMAP_DIR) / "cache_bypass" try: bypass.write_text("1\n") except (FileNotFoundError, PermissionError) as e: log.warning("Could not bypass regmap cache (%s); falling back to i2cget", e) def _read_sn65_regmap() -> dict[str, str]: """Read /sys/kernel/debug/regmap/2-002c/registers — returns {hex_addr: hex_val}.""" regs_path = Path(SN65_REGMAP_DIR) / "registers" text = regs_path.read_text() out: dict[str, str] = {} for line in text.splitlines(): m = re.match(r"\s*([0-9a-fA-F]+)\s*:?\s*([0-9a-fA-F]+)", line.strip()) if m: out[m.group(1).lower().lstrip("0").rjust(2, "0")] = m.group(2).lower() return out def _read_sn65_via_i2cget(reg: int) -> int: out = _run(f"i2cget -y -f {SN65_I2C_BUS} 0x{SN65_I2C_ADDR:02x} 0x{reg:02x}") return int(out.strip(), 16) def read_sn65_registers() -> dict: """Cache-bypassed SN65DSI83 register read with explicit IRQ flags decoded. Critical: the bypass write must happen on every call — without it, IRQ_STAT (0xE5) returns the last cached value, not the current hardware state, and flicker events become invisible. """ _bypass_sn65_regmap_cache() try: regs = _read_sn65_regmap() irq_raw = int(regs.get("e5", "0"), 16) pll_raw = int(regs.get("0a", "0"), 16) clk_raw = int(regs.get("0b", "0"), 16) except FileNotFoundError: regs = {} irq_raw = _read_sn65_via_i2cget(0xE5) pll_raw = _read_sn65_via_i2cget(0x0A) clk_raw = _read_sn65_via_i2cget(0x0B) regs = { "e5": f"{irq_raw:02x}", "0a": f"{pll_raw:02x}", "0b": f"{clk_raw:02x}", } return { "registers": regs, "pll_locked": bool(pll_raw & 0x80), "clk_detected": bool(clk_raw & 0x01), "irq_stat_raw": f"0x{irq_raw:02X}", "sot_err": bool(irq_raw & (1 << 4)), "synch_err": bool(irq_raw & (1 << 3)), "unc_ecc_err": bool(irq_raw & (1 << 6)), } def settling_capture(duration_s: float = 2.0, interval_s: float = 0.1) -> list[dict]: """Sample SN65 registers at fixed cadence — catches transient LP→HS errors.""" snapshots: list[dict] = [] deadline = time.monotonic() + duration_s while time.monotonic() < deadline: snap = read_sn65_registers() snap["t_s"] = time.monotonic() snapshots.append(snap) time.sleep(interval_s) return snapshots # --------------------------------------------------------------------------- # Display / video control # --------------------------------------------------------------------------- def display_on() -> None: _write_sysfs(FB_BLANK_PATH, "0\n") def display_off() -> None: _write_sysfs(FB_BLANK_PATH, "4\n") _VIDEO_PIPELINES = { "static-pink": ( "gst-launch-1.0 videotestsrc pattern=solid-color foreground-color=0xFFFF69B4 " "! video/x-raw,width=1280,height=800,framerate=60/1 " "! fbdevsink device=/dev/fb0" ), } def video_start(mode: str = "static-pink") -> None: global _video_proc if _video_proc is not None and _video_proc.poll() is None: video_stop() pipeline = _VIDEO_PIPELINES.get(mode) if pipeline is None: raise ValueError(f"Unknown video mode: {mode}") _video_proc = subprocess.Popen( shlex.split(pipeline), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) def video_stop() -> None: global _video_proc if _video_proc is None: return if _video_proc.poll() is None: try: import os os.killpg(os.getpgid(_video_proc.pid), signal.SIGTERM) _video_proc.wait(timeout=2.0) except (ProcessLookupError, subprocess.TimeoutExpired): try: _video_proc.kill() except ProcessLookupError: pass _video_proc = None