From 2892ea45fffc4f1ff657ad0bf58633b2d1e12707 Mon Sep 17 00:00:00 2001 From: david rice Date: Mon, 27 Apr 2026 13:58:09 +0100 Subject: [PATCH] Updates --- device_server.py | 90 ++++++++++++++++++++++++++++++++++++++-- mipi_test_interactive.py | 31 ++++++++++++-- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/device_server.py b/device_server.py index f4e6b53..b8d88b7 100644 --- a/device_server.py +++ b/device_server.py @@ -53,10 +53,63 @@ _settling_log: list = [] _settling_lock: threading.Lock = threading.Lock() +# SN65DSI83 configuration registers to snapshot at start and end of each settling window. +# Grouped by purpose so a reset-to-default is obvious at a glance. +# Register address → human-readable name. +_SN65_SNAPSHOT_REGS: dict[int, str] = { + # Core enable / PLL + 0x09: "CLK_SRC", # DSI clock source / PLL pre-divider + 0x0A: "PLL_STATUS", # PLL_EN_STAT (bit7) + CHA_CLK_DET (bit3) [status] + 0x0D: "PLL_EN", # bit0 = PLL enable; should be 0x01 when running + # DSI receiver config + 0x10: "DSI_LANES", # CHA_DSI_DATA_EQ_SEL + lane count + 0x11: "DSI_CLK_RANGE", # DSI byte-clock frequency range + 0x12: "LVDS_CLK_RANGE", # LVDS output clock range + # Active area + 0x18: "HACT_LOW", # CHA active line length, low byte + 0x19: "HACT_HIGH", # CHA active line length, high byte + 0x1A: "VACT_LOW", # CHA vertical display size, low byte + 0x1B: "VACT_HIGH", # CHA vertical display size, high byte + # Sync timing + 0x20: "SYNC_DLY_LOW", # CHA sync delay, low byte + 0x21: "SYNC_DLY_HIGH", # CHA sync delay, high byte + 0x22: "HSYNC_W_LOW", # CHA HSYNC pulse width, low byte + 0x23: "HSYNC_W_HIGH", # CHA HSYNC pulse width, high byte + 0x24: "VSYNC_W_LOW", # CHA VSYNC pulse width, low byte + 0x25: "VSYNC_W_HIGH", # CHA VSYNC pulse width, high byte + 0x26: "HBP", # CHA horizontal back porch + 0x28: "VBP", # CHA vertical back porch + 0x2A: "HFP", # CHA horizontal front porch + 0x2C: "VFP", # CHA vertical front porch + # Format / output + 0x2D: "TEST_PATTERN", # bit0 = enable colour bar test pattern + 0x3C: "LVDS_FORMAT", # LVDS output format (colour depth, channel swap) + # Live LVDS line counter — changes every frame when bridge is actively outputting + 0xE0: "LINE_CNT_LOW", # CHA line count, low byte [live] + 0xE1: "LINE_CNT_HIGH", # CHA line count, high byte [live] + # Error flags + 0xE5: "CHA_ERR", # DSI error flags [status] +} + + +def _sn65_snapshot() -> dict: + """Read all _SN65_SNAPSHOT_REGS in one pass. Returns {reg_hex: value_hex|None}.""" + result = {} + for reg, name in _SN65_SNAPSHOT_REGS.items(): + val, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, reg) + result[f"0x{reg:02x}"] = {"name": name, "value": f"0x{val:02x}" if val is not None else None} + return result + + def _run_settling_poll() -> None: - """Poll SN65DSI83 csr_0a + csr_e5 at 10 ms intervals for 1.5 s after restart.""" + """Poll SN65DSI83 csr_0a + csr_e5 at ~10 ms intervals for 1.5 s after restart. + Also takes a full configuration register snapshot at t=0 and t=end so callers + can detect bridge re-initialisation or configuration loss.""" t_start = time.time() t_end = t_start + SETTLING_DURATION_S + + snapshot_start = _sn65_snapshot() + readings: list = [] while time.time() < t_end: t_ms = round((time.time() - t_start) * 1000, 1) @@ -71,9 +124,18 @@ def _run_settling_poll() -> None: "any_error": bool(val_e5) if val_e5 is not None else None, }) time.sleep(SETTLING_INTERVAL_S) + + snapshot_end = _sn65_snapshot() + with _settling_lock: _settling_log.clear() _settling_log.extend(readings) + _settling_extra["snapshot_start"] = snapshot_start + _settling_extra["snapshot_end"] = snapshot_end + + +# Stores the two register snapshots from the most recent settling poll. +_settling_extra: dict = {} # Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini) _DSIM_NAMES = { @@ -220,15 +282,37 @@ def _i2c_read_byte(bus: int, addr: int, reg: int) -> tuple[int | None, str]: @app.route("/sn65_settling", methods=["GET"]) def get_sn65_settling(): - """Return the most recent post-restart settling poll (csr_0a + csr_e5 over 1.5 s).""" + """Return the most recent post-restart settling poll. + + Includes: + snapshot_start — full register dump taken immediately before polling begins + snapshot_end — full register dump taken immediately after polling ends + readings — csr_0a + csr_e5 sampled every ~10 ms during the window + """ with _settling_lock: - readings = list(_settling_log) + readings = list(_settling_log) + snap_start = dict(_settling_extra.get("snapshot_start") or {}) + snap_end = dict(_settling_extra.get("snapshot_end") or {}) + error_readings = [r for r in readings if r.get("any_error")] + + # Diff the two snapshots so the caller can immediately see what changed. + changed = {} + for reg, info_s in snap_start.items(): + info_e = snap_end.get(reg, {}) + v_s = info_s.get("value") + v_e = info_e.get("value") + if v_s != v_e: + changed[reg] = {"name": info_s.get("name"), "start": v_s, "end": v_e} + return jsonify({ "n_readings": len(readings), "n_error": len(error_readings), "duration_s": SETTLING_DURATION_S, "interval_ms": int(SETTLING_INTERVAL_S * 1000), + "snapshot_start": snap_start, + "snapshot_end": snap_end, + "changed_regs": changed, "readings": readings, }), 200 diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index e1479f7..6a96526 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -646,7 +646,7 @@ def _fetch_registers(ts: str, iteration: int) -> None: print(f" REGISTERS: SN65DSI83 error — {e}") combined["sn65"] = None - # SN65DSI83 post-restart settling poll + # SN65DSI83 post-restart settling poll + register snapshots try: resp = requests.get(f"{DEVICE_BASE}/sn65_settling", timeout=10) resp.raise_for_status() @@ -656,13 +656,14 @@ def _fetch_registers(ts: str, iteration: int) -> None: n = settling.get("n_readings", 0) n_err = settling.get("n_error", 0) dur = settling.get("duration_s", 0) + + # ── csr_e5 error summary ────────────────────────────────────────── if n_err: - # Print the first and last error readings for quick diagnosis err_readings = [r for r in settling.get("readings", []) if r.get("any_error")] times = [r["t_ms"] for r in err_readings] print(f" SN65 SETTLING: *** {n_err}/{n} readings had csr_e5 errors " f"over {dur:.1f} s (t={times[0]:.0f}–{times[-1]:.0f} ms) ***") - for r in err_readings[:3]: # show up to first 3 error readings + for r in err_readings[:3]: print(f" t={r['t_ms']:6.1f} ms csr_0a={r['csr_0a']} " f"csr_e5={r['csr_e5']} " f"pll={'Y' if r['pll_lock'] else 'N'} " @@ -672,6 +673,30 @@ def _fetch_registers(ts: str, iteration: int) -> None: if r.get("clk_det") is False) print(f" SN65 SETTLING: no csr_e5 errors in {n} readings over {dur:.1f} s" + (f" ({clk_false} readings with clk_det=False)" if clk_false else "")) + + # ── Register snapshot: print start values and flag any changes ─── + snap_start = settling.get("snapshot_start") or {} + snap_end = settling.get("snapshot_end") or {} + changed = settling.get("changed_regs") or {} + + if snap_start: + print(f" SN65 REGS (t=0):", end="") + # Print a compact one-liner of key config registers + _key = ["0x0d", "0x10", "0x11", "0x18", "0x19", "0x1a", "0x1b", + "0x3c", "0xe0", "0xe1"] + parts = [] + for r in _key: + info = snap_start.get(r, {}) + parts.append(f"{info.get('name','?')}={info.get('value','?')}") + print(" " + " ".join(parts)) + + if changed: + print(f" SN65 REGS CHANGED during settling window ({len(changed)} registers):") + for reg, diff in changed.items(): + print(f" {reg} {diff['name']:16s} {diff['start']} → {diff['end']}") + elif snap_start: + print(f" SN65 REGS: stable (no register changes between t=0 and t={dur:.1f}s)") + except requests.exceptions.RequestException as e: print(f" REGISTERS: settling poll fetch failed — {e}") combined["sn65_settling"] = None