From 712a42ecb7502acfdc9c1013fb567345f257e87a Mon Sep 17 00:00:00 2001 From: david rice Date: Mon, 20 Apr 2026 13:48:24 +0100 Subject: [PATCH] Changes --- device_server.py | 59 ++++++++++++++ mipi_test_interactive.py | 170 +++++++++++++++++---------------------- 2 files changed, 134 insertions(+), 95 deletions(-) diff --git a/device_server.py b/device_server.py index c5640ce..a9f0946 100644 --- a/device_server.py +++ b/device_server.py @@ -24,6 +24,12 @@ REGISTER_COMMANDS = [ "memtool md -l 0x32e100b4+0x0c", # DSIM_PHYTIMING / PHYTIMING1 / PHYTIMING2 ] +# --------------------------------------------------------------------------- +# SN65DSI83 I2C configuration +# --------------------------------------------------------------------------- +SN65_I2C_BUS = 2 # i2c-2 on i.MX 8M Mini — change if bridge is on a different bus +SN65_I2C_ADDR = 0x2C # SN65DSI83 fixed 7-bit I2C address + # Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini) _DSIM_NAMES = { 0x32e10004: "DSIM_STATUS", @@ -118,5 +124,58 @@ def get_registers(): }), 200 +def _i2c_read_byte(bus: int, addr: int, reg: int) -> int | None: + """Read one byte from an I2C device using i2cget. Returns None on failure.""" + try: + result = subprocess.run( + ["i2cget", "-y", str(bus), f"0x{addr:02x}", f"0x{reg:02x}"], + capture_output=True, text=True, timeout=3 + ) + if result.returncode == 0: + return int(result.stdout.strip(), 16) + except Exception: + pass + return None + + +@app.route("/sn65_registers", methods=["GET"]) +def get_sn65_registers(): + """Read SN65DSI83 CSR 0x0A (PLL/CLK status) and 0xE5 (error flags) via I2C.""" + csr_0a = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0x0A) + csr_e5 = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, 0xE5) + + regs = {} + errors = [] + + if csr_0a is not None: + regs["csr_0a"] = { + "value": f"0x{csr_0a:02x}", + "pll_lock": bool(csr_0a & 0x80), # bit 7: PLL_EN_STAT (powered + locked) + "clk_det": bool(csr_0a & 0x40), # bit 6: high-speed CLK lane detected + } + else: + errors.append("CSR 0x0A: i2cget failed") + + if csr_e5 is not None: + regs["csr_e5"] = { + "value": f"0x{csr_e5:02x}", + "pll_unlock": bool(csr_e5 & 0x01), # default=1 at reset; must clear after init + "cha_sot_bit_err": bool(csr_e5 & 0x04), + "cha_llp_err": bool(csr_e5 & 0x08), # SoT/EoT/lane-merge error + "cha_ecc_err": bool(csr_e5 & 0x10), + "cha_lp_err": bool(csr_e5 & 0x20), + "cha_crc_err": bool(csr_e5 & 0x40), + } + else: + errors.append("CSR 0xE5: i2cget failed") + + return jsonify({ + "i2c_bus": SN65_I2C_BUS, + "i2c_addr": f"0x{SN65_I2C_ADDR:02x}", + "registers": regs, + "errors": errors if errors else None, + }), 200 + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5000) diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index 7902535..c5eba7c 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -38,7 +38,7 @@ import ai_mgmt import rigol_scope from csv_preprocessor import (analyze_lp_file, LPMetrics, HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS, - analyze_int_file, CLK_LP_LOW_MIN_NS) + analyze_int_file) load_dotenv(Path(__file__).parent / ".env") @@ -577,104 +577,81 @@ def _restore_hs_config(): scope.write(":TRIGger:EDGE:SOURce CHANnel1") scope.write(":TRIGger:EDGE:SLOPe POSitive") scope.write(":TRIGger:EDGE:LEVel 0.05") + scope.write(":TRIGger:SWEep AUTO") # CLK startup sets NORMAL — restore AUTO so :DIGitize captures HS data time.sleep(0.1) def _fetch_registers(ts: str, iteration: int) -> None: - """GET /registers from device server and save to data/ as JSON.""" + """GET /registers (DSIM PHY) and /sn65_registers (SN65DSI83 CSRs) then save combined JSON.""" + combined: dict = {} + + # DSIM PHY timing registers (memtool / memory-mapped) try: resp = requests.get(f"{DEVICE_BASE}/registers", timeout=5) resp.raise_for_status() - data = resp.json() - if data.get("errors"): - print(f" REGISTERS: device warnings — {data['errors']}") + dsim = resp.json() + combined["dsim"] = dsim + if dsim.get("errors"): + print(f" REGISTERS: DSIM warnings — {dsim['errors']}") + except requests.exceptions.RequestException as e: + print(f" REGISTERS: DSIM fetch failed — {e}") + combined["dsim"] = None + except Exception as e: + print(f" REGISTERS: DSIM error — {e}") + combined["dsim"] = None + + # SN65DSI83 CSR snapshot (I2C) + try: + resp = requests.get(f"{DEVICE_BASE}/sn65_registers", timeout=5) + resp.raise_for_status() + sn65 = resp.json() + combined["sn65"] = sn65 + + regs = sn65.get("registers", {}) + + csr_0a = regs.get("csr_0a", {}) + if csr_0a: + pll_str = "LOCKED " if csr_0a["pll_lock"] else "*** UNLOCKED ***" + clk_str = "detected " if csr_0a["clk_det"] else "NOT detected" + print(f" SN65DSI83: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a['value']})") + + csr_e5 = regs.get("csr_e5", {}) + if csr_e5: + _error_bits = [ + ("pll_unlock", "PLL_UNLOCK"), + ("cha_sot_bit_err", "SOT_BIT_ERR"), + ("cha_llp_err", "LLP_ERR"), + ("cha_ecc_err", "ECC_ERR"), + ("cha_lp_err", "LP_ERR"), + ("cha_crc_err", "CRC_ERR"), + ] + active = [label for key, label in _error_bits if csr_e5.get(key)] + if active: + print(f" SN65DSI83: *** ERROR FLAGS SET: {', '.join(active)} (CSR 0xE5 = {csr_e5['value']}) ***") + else: + print(f" SN65DSI83: no error flags (CSR 0xE5 = {csr_e5['value']})") + + if sn65.get("errors"): + print(f" SN65DSI83: I2C warnings — {sn65['errors']}") + + except requests.exceptions.RequestException as e: + print(f" REGISTERS: SN65DSI83 fetch failed — {e}") + combined["sn65"] = None + except Exception as e: + print(f" REGISTERS: SN65DSI83 error — {e}") + combined["sn65"] = None + + # Save combined JSON + try: DATA_DIR.mkdir(exist_ok=True) reg_path = DATA_DIR / f"{ts}_reg_{iteration:04d}.json" - reg_path.write_text(json.dumps(data, indent=2)) - print(f" SAVED: {reg_path.name} ({len(data.get('registers', []))} registers)") - except requests.exceptions.RequestException as e: - print(f" REGISTERS: fetch failed — {e}") + reg_path.write_text(json.dumps(combined, indent=2)) + dsim_count = len((combined.get("dsim") or {}).get("registers", [])) + print(f" SAVED: {reg_path.name} ({dsim_count} DSIM registers)") except Exception as e: - print(f" REGISTERS: error — {e}") + print(f" REGISTERS: save error — {e}") -def _arm_scope_for_clk_startup() -> None: - """ - Configure scope for CLK lane LP startup and arm with :SINGle (non-blocking). - Trigger: CLK+ (Ch1) falling edge — fires as CLK leaves LP-11, before DAT0+. - Call this BEFORE display ON so the trigger is armed when the LP-11→HS sequence starts. - """ - for ch in (1, 2, 3, 4): - scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") - scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") - time.sleep(0.05) - scope.write(":TRIGger:EDGE:SOURce CHANnel1") - scope.write(":TRIGger:EDGE:SLOPe NEGative") - scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") - scope.write(":TRIGger:SWEep NORMal") - scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}") - scope.write(f":ACQuire:POINts {LP_POINTS}") - time.sleep(0.3) - scope.write(":SINGle") - time.sleep(0.1) - print(" CLK STARTUP: scope armed on CLK+ falling edge.") - - -def _collect_clk_startup(ts: str, iteration: int, timeout: float = 3.0) -> list[str]: - """ - Poll for CLK startup trigger, save, transfer, and analyse the capture. - Returns LP summary strings (empty list if trigger timed out). - The CLK LP-00 duration is checked against the 300 ns SN65DSI83 lock minimum. - """ - print(" CLK STARTUP: waiting for trigger...") - deadline = time.time() + timeout - triggered = False - while time.time() < deadline: - try: - status = scope.ask(":TRIGger:STATus?").strip().upper() - if status in ("STOP", "TD"): - triggered = True - break - except Exception: - pass - time.sleep(0.1) - - if not triggered: - print(" CLK STARTUP: trigger timeout — CLK may already be in continuous HS.") - try: - scope.write(":STOP") # abort the pending :SINGle before reconfiguring - time.sleep(0.2) - except Exception: - pass - _restore_hs_config() - return [] - - _save_pass_channels("lp", iteration, ts) - _restore_hs_config() - - try: - copied, _ = ai_mgmt.transfer_csv_files() - print(f" CLK STARTUP: {copied} file(s) transferred.") - except Exception as e: - print(f" CLK STARTUP TRANSFER ERROR: {e}") - - summaries = [] - for channel in ("clk", "dat"): - path = DATA_DIR / f"{ts}_lp_{iteration:04d}_{channel}.csv" - if not path.exists(): - continue - try: - m = analyze_lp_file(path) - summaries.append(m.summary()) - if m.clk_lp_startup_ok is False: - print(f"\n *** CLK STARTUP WARNING: CLK LP-00 too short " - f"({m.lp_low_duration_ns:.0f} ns < {CLK_LP_LOW_MIN_NS:.0f} ns) — " - f"SN65DSI83 may fail to lock CLK lane ***\n") - except Exception as e: - print(f" CLK STARTUP ANALYSIS ERROR ({channel}): {e}") - - return summaries - def _analyze_int_file(ts: str, iteration: int) -> None: """Print IRQ pin summary and alert if the SN65DSI83 asserted the IRQ line.""" @@ -748,7 +725,17 @@ def dual_capture(iteration: int) -> str: # ── Pass 3: frame/protocol structure ────────────────────────────────── print(" PASS 3: FRAME STRUCTURE...") - _set_timebase(PROTO_SCALE, PROTO_POINTS) + try: + _set_timebase(PROTO_SCALE, PROTO_POINTS) + except Exception: + print(" SKIPPING PROTO SAVE.") + _fetch_registers(ts, iteration) + try: + _set_timebase(5e-9, 500_000) + scope.write(":RUN") + except Exception: + pass + return ts if _arm_and_wait(): _save_pass("proto", iteration, ts) else: @@ -1336,19 +1323,12 @@ def run_interactive_test() -> None: try: while True: - # ── Arm scope for CLK startup BEFORE display ON ──────────────── - ts_startup = datetime.now().strftime("%Y%m%d_%H%M%S") - _arm_scope_for_clk_startup() - # ── Display ON ───────────────────────────────────────────────── try: requests.put(URL, json={"state": "on"}, timeout=2) except requests.exceptions.RequestException as e: print(f" WARNING: display ON failed: {e}") - # ── Collect CLK startup (polls, saves, transfers, analyses) ──── - _collect_clk_startup(ts_startup, iteration) - # ── Three-pass capture ───────────────────────────────────────── ts = dual_capture(iteration)