"""Per-run artefact writers and the master flicker_log.csv appender.""" from __future__ import annotations import csv import json import os from datetime import datetime from pathlib import Path from typing import Optional import pandas as pd from config import CAPTURE_ROOT FLICKER_LOG_NAME = "flicker_log.csv" FLICKER_LOG_COLUMNS = [ "run_id", "timestamp", "flicker_detected", "sot_err", "synch_err", "pll_locked", "t_lpx_ns", "t_hs_prepare_ns", "t_hs_prepare_pass", "t_clk_prepare_ns", "t_clk_zero_ns", "t_clk_prep_plus_zero_ns", "t_clk_prep_zero_pass", "phy_timing_raw", "phy_timing1_raw", "phy_timing2_raw", "notes", ] def make_run_dir(root: str = CAPTURE_ROOT, run_idx: Optional[int] = None) -> Path: base = Path(root) base.mkdir(parents=True, exist_ok=True) if run_idx is None: run_idx = _next_run_index(base) stamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_id = f"run_{run_idx:03d}_{stamp}" path = base / run_id path.mkdir(parents=True, exist_ok=False) return path def _next_run_index(base: Path) -> int: existing = [p.name for p in base.iterdir() if p.is_dir() and p.name.startswith("run_")] if not existing: return 1 nums: list[int] = [] for n in existing: try: nums.append(int(n.split("_")[1])) except (IndexError, ValueError): continue return (max(nums) + 1) if nums else 1 def save_waveforms(run_dir: Path, waveforms: dict[str, pd.DataFrame]) -> None: """Save each channel as waveform_chN.csv per spec ยง8.3.""" label_to_ch = {"CLK_P": 1, "CLK_N": 2, "DAT0_P": 3, "DAT0_N": 4} for label, df in waveforms.items(): ch = label_to_ch.get(label) if ch is None: continue df.to_csv(run_dir / f"waveform_ch{ch}.csv", index=False) def save_registers(run_dir: Path, dsim: dict, sn65: dict, settling: dict | list) -> None: payload = {"dsim": dsim, "sn65": sn65, "settling": settling} (run_dir / "registers.json").write_text(json.dumps(payload, indent=2)) def save_timing_analysis(run_dir: Path, measurements: dict, spec_pass: dict, packet_fault: dict, lane_stall: dict) -> None: payload = { "measurements_ns": measurements, "spec_compliance": spec_pass, "packet_fault_a": packet_fault, "lane_stall_b": lane_stall, } (run_dir / "timing_analysis.json").write_text(json.dumps(payload, indent=2)) def save_summary(run_dir: Path, summary_text: str) -> None: (run_dir / "summary.txt").write_text(summary_text) def append_flicker_log(root: str, row: dict) -> None: log_path = Path(root) / FLICKER_LOG_NAME is_new = not log_path.exists() with log_path.open("a", newline="") as f: writer = csv.DictWriter(f, fieldnames=FLICKER_LOG_COLUMNS, extrasaction="ignore") if is_new: writer.writeheader() writer.writerow(row) def build_summary(run_id: str, sn65_parsed: dict, measurements: dict, spec_pass: dict, packet_fault: dict, lane_stall: dict, dsim_parsed: dict, note: str = "") -> str: lines = [ f"Run: {run_id}", f"Timestamp: {datetime.now().isoformat(timespec='seconds')}", "", "[ SN65DSI83 ]", f" PLL locked: {sn65_parsed.get('pll_locked')}", f" Clock detect: {sn65_parsed.get('clk_detected')}", f" IRQ_STAT: {sn65_parsed.get('irq_stat_raw')}", f" SOT_ERR: {sn65_parsed.get('sot_err')}", f" SYNCH_ERR: {sn65_parsed.get('synch_err')}", f" UNC_ECC_ERR: {sn65_parsed.get('unc_ecc_err')}", f" FLICKER: {sn65_parsed.get('flicker_detected')}", "", "[ D-PHY timings (ns) ]", ] for k, v in measurements.items(): sp = spec_pass.get(k, {}) marker = "OK" if sp.get("pass") else "VIOLATION" margin = sp.get("margin_ns") margin_str = f"margin={margin:+.2f}" if margin is not None else "margin=n/a" v_str = f"{v:.2f}" if v is not None and v == v else "nan" # NaN check lines.append(f" {k:30s} {v_str:>8s} [{marker}] (min={sp.get('min_ns')}, {margin_str})") lines += [ "", "[ Packet decode (Lane 0) ]", f" Fault A (zero-payload pixel pkt): {packet_fault.get('fault_a_detected')}", f" First payload bytes: {packet_fault.get('first_pixel_payload_hex')}", f" Pixel packets / total: " f"{packet_fault.get('n_pixel_packets')} / {packet_fault.get('n_total_packets')}", "", "[ Lane stall ]", f" Fault B (LP-11 stall): {lane_stall.get('fault_b_detected')}", f" Longest LP-11 (ms): {lane_stall.get('longest_lp11_ms')}", "", "[ DSIM raw / decoded ]", f" PHY_TIMING: {dsim_parsed.get('PHY_TIMING_raw')}", f" PHY_TIMING1: {dsim_parsed.get('PHY_TIMING1_raw')}", f" PHY_TIMING2: {dsim_parsed.get('PHY_TIMING2_raw')}", ] if note: lines += ["", f"Note: {note}"] return os.linesep.join(lines) + os.linesep def build_log_row(run_id: str, sn65_parsed: dict, measurements: dict, spec_pass: dict, dsim_parsed: dict, note: str = "") -> dict: return { "run_id": run_id, "timestamp": datetime.now().isoformat(timespec="seconds"), "flicker_detected": sn65_parsed.get("flicker_detected"), "sot_err": sn65_parsed.get("sot_err"), "synch_err": sn65_parsed.get("synch_err"), "pll_locked": sn65_parsed.get("pll_locked"), "t_lpx_ns": measurements.get("t_lpx"), "t_hs_prepare_ns": measurements.get("t_hs_prepare"), "t_hs_prepare_pass": spec_pass.get("t_hs_prepare", {}).get("pass"), "t_clk_prepare_ns": measurements.get("t_clk_prepare"), "t_clk_zero_ns": measurements.get("t_clk_zero"), "t_clk_prep_plus_zero_ns": measurements.get("t_clk_prepare_plus_zero"), "t_clk_prep_zero_pass": spec_pass.get("t_clk_prepare_plus_zero", {}).get("pass"), "phy_timing_raw": dsim_parsed.get("PHY_TIMING_raw"), "phy_timing1_raw": dsim_parsed.get("PHY_TIMING1_raw"), "phy_timing2_raw": dsim_parsed.get("PHY_TIMING2_raw"), "notes": note, }