Files
MiPi_Investigation/server/hw_interface.py

216 lines
6.6 KiB
Python
Raw Normal View History

2026-05-06 15:57:48 +01:00
"""On-target hardware shims used by the Flask app.
Runs ON THE i.MX 8M Mini, not the host PC. Shells out to memtool, i2cget,
and /sys/kernel/debug/regmap. Cache bypass for SN65DSI83 is mandatory
before every IRQ_STAT read see CLAUDE.md invariant 1.
"""
from __future__ import annotations
import logging
import re
import shlex
import signal
import subprocess
import time
from pathlib import Path
from typing import Optional
log = logging.getLogger(__name__)
SN65_REGMAP_DIR = "/sys/kernel/debug/regmap/4-002c"
SN65_I2C_BUS = 4
SN65_I2C_ADDR = 0x2C
DSIM_PHYTIMING_BASE = 0x32E100B4
DSIM_PHYTIMING_LEN = 0x0C
FB_BLANK_PATH = "/sys/class/graphics/fb0/blank"
# Held while a video pipeline is running so PUT /video stop can kill it.
_video_proc: Optional[subprocess.Popen] = None
# ---------------------------------------------------------------------------
# Process helpers
# ---------------------------------------------------------------------------
def _run(cmd: str, check: bool = True, timeout: float = 5.0) -> str:
log.debug("run: %s", cmd)
res = subprocess.run(
shlex.split(cmd),
capture_output=True,
text=True,
timeout=timeout,
)
if check and res.returncode != 0:
raise RuntimeError(
f"Command failed: {cmd}\nstderr: {res.stderr.strip()}"
)
return res.stdout
def _write_sysfs(path: str, value: str) -> None:
Path(path).write_text(value)
# ---------------------------------------------------------------------------
# DSIM PHY_TIMING registers
# ---------------------------------------------------------------------------
def read_dsim_phy_timing() -> dict:
"""memtool md -l 0x32e100b4+0x0c → 3 little-endian 32-bit words.
memtool prints lines like '0x32e100b4: 00000306 03120a04 00040707'
we strip the address prefix (everything up to and including the colon)
before extracting hex words to avoid matching the address itself.
"""
out = _run(f"memtool md -l 0x{DSIM_PHYTIMING_BASE:x}+0x{DSIM_PHYTIMING_LEN:x}")
hex_words: list[str] = []
for line in out.splitlines():
if ":" in line:
line = line.split(":", 1)[1]
hex_words.extend(re.findall(r"\b([0-9a-fA-F]{8})\b", line))
if len(hex_words) < 3:
raise RuntimeError(f"memtool returned unexpected output:\n{out}")
pt, pt1, pt2 = hex_words[:3]
return {
"PHY_TIMING": f"0x{pt}",
"PHY_TIMING1": f"0x{pt1}",
"PHY_TIMING2": f"0x{pt2}",
"raw_hex": f"{pt} {pt1} {pt2}",
}
# ---------------------------------------------------------------------------
# SN65DSI83 register map — regmap-bypassed
# ---------------------------------------------------------------------------
def _bypass_sn65_regmap_cache() -> None:
bypass = Path(SN65_REGMAP_DIR) / "cache_bypass"
try:
bypass.write_text("1\n")
except (FileNotFoundError, PermissionError) as e:
log.warning("Could not bypass regmap cache (%s); falling back to i2cget", e)
def _read_sn65_regmap() -> dict[str, str]:
"""Read /sys/kernel/debug/regmap/2-002c/registers — returns {hex_addr: hex_val}."""
regs_path = Path(SN65_REGMAP_DIR) / "registers"
text = regs_path.read_text()
out: dict[str, str] = {}
for line in text.splitlines():
m = re.match(r"\s*([0-9a-fA-F]+)\s*:?\s*([0-9a-fA-F]+)", line.strip())
if m:
out[m.group(1).lower().lstrip("0").rjust(2, "0")] = m.group(2).lower()
return out
def _read_sn65_via_i2cget(reg: int) -> int:
out = _run(f"i2cget -y -f {SN65_I2C_BUS} 0x{SN65_I2C_ADDR:02x} 0x{reg:02x}")
return int(out.strip(), 16)
def read_sn65_registers() -> dict:
"""Cache-bypassed SN65DSI83 register read with explicit IRQ flags decoded.
Critical: the bypass write must happen on every call without it,
IRQ_STAT (0xE5) returns the last cached value, not the current hardware
state, and flicker events become invisible.
"""
_bypass_sn65_regmap_cache()
try:
regs = _read_sn65_regmap()
irq_raw = int(regs.get("e5", "0"), 16)
pll_raw = int(regs.get("0a", "0"), 16)
clk_raw = int(regs.get("0b", "0"), 16)
except FileNotFoundError:
regs = {}
irq_raw = _read_sn65_via_i2cget(0xE5)
pll_raw = _read_sn65_via_i2cget(0x0A)
clk_raw = _read_sn65_via_i2cget(0x0B)
regs = {
"e5": f"{irq_raw:02x}",
"0a": f"{pll_raw:02x}",
"0b": f"{clk_raw:02x}",
}
return {
"registers": regs,
"pll_locked": bool(pll_raw & 0x80),
"clk_detected": bool(clk_raw & 0x01),
"irq_stat_raw": f"0x{irq_raw:02X}",
"sot_err": bool(irq_raw & (1 << 4)),
"synch_err": bool(irq_raw & (1 << 3)),
"unc_ecc_err": bool(irq_raw & (1 << 6)),
}
def settling_capture(duration_s: float = 2.0, interval_s: float = 0.1) -> list[dict]:
"""Sample SN65 registers at fixed cadence — catches transient LP→HS errors."""
snapshots: list[dict] = []
deadline = time.monotonic() + duration_s
while time.monotonic() < deadline:
snap = read_sn65_registers()
snap["t_s"] = time.monotonic()
snapshots.append(snap)
time.sleep(interval_s)
return snapshots
# ---------------------------------------------------------------------------
# Display / video control
# ---------------------------------------------------------------------------
def display_on() -> None:
_write_sysfs(FB_BLANK_PATH, "0\n")
def display_off() -> None:
_write_sysfs(FB_BLANK_PATH, "4\n")
_VIDEO_PIPELINES = {
"static-pink": (
"gst-launch-1.0 videotestsrc pattern=solid-color foreground-color=0xFFFF69B4 "
"! video/x-raw,width=1280,height=800,framerate=60/1 "
"! fbdevsink device=/dev/fb0"
),
}
def video_start(mode: str = "static-pink") -> None:
global _video_proc
if _video_proc is not None and _video_proc.poll() is None:
video_stop()
pipeline = _VIDEO_PIPELINES.get(mode)
if pipeline is None:
raise ValueError(f"Unknown video mode: {mode}")
_video_proc = subprocess.Popen(
shlex.split(pipeline),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
def video_stop() -> None:
global _video_proc
if _video_proc is None:
return
if _video_proc.poll() is None:
try:
import os
os.killpg(os.getpgid(_video_proc.pid), signal.SIGTERM)
_video_proc.wait(timeout=2.0)
except (ProcessLookupError, subprocess.TimeoutExpired):
try:
_video_proc.kill()
except ProcessLookupError:
pass
_video_proc = None