Compare commits
3 Commits
2892ea45ff
...
75248c9574
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75248c9574 | ||
|
|
dd93fbd893 | ||
|
|
9c75598728 |
Binary file not shown.
BIN
__pycache__/explode_h5.cpython-312.pyc
Normal file
BIN
__pycache__/explode_h5.cpython-312.pyc
Normal file
Binary file not shown.
Binary file not shown.
@@ -892,12 +892,27 @@ def analyze_lp_file(path: Path) -> "LPMetrics":
|
|||||||
HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V)
|
HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V)
|
||||||
trans : everything else (transitions between states)
|
trans : everything else (transitions between states)
|
||||||
"""
|
"""
|
||||||
m = re.match(r"(\d{8}_\d{6})_lp_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE)
|
# Accept three filename formats:
|
||||||
|
# legacy: "_lp_0001_"
|
||||||
|
# watch: "_lp_c001_01_"
|
||||||
|
# segmented: "_lp_c001_01_seg005_" (one segment exploded from H5)
|
||||||
|
m = re.match(
|
||||||
|
r"(\d{8}_\d{6})_lp_(c\d+_\d+(?:_seg\d+)?|\d+)_(clk|dat)\.csv",
|
||||||
|
path.name, re.IGNORECASE,
|
||||||
|
)
|
||||||
if not m:
|
if not m:
|
||||||
raise ValueError(f"Filename does not match lp pattern: {path.name}")
|
raise ValueError(f"Filename does not match lp pattern: {path.name}")
|
||||||
|
|
||||||
timestamp, cap_str, channel = m.groups()
|
timestamp, cap_str, channel = m.groups()
|
||||||
capture_num = int(cap_str)
|
# Derive an int capture_num from whatever digits the id contains, so it
|
||||||
|
# remains sortable (e.g., c001_01_seg005 → 1*1_000_000 + 1*1_000 + 5).
|
||||||
|
digit_groups = re.findall(r"\d+", cap_str)
|
||||||
|
if len(digit_groups) == 1:
|
||||||
|
capture_num = int(digit_groups[0])
|
||||||
|
else:
|
||||||
|
capture_num = 0
|
||||||
|
for i, d in enumerate(reversed(digit_groups)):
|
||||||
|
capture_num += int(d) * (1000 ** i)
|
||||||
|
|
||||||
times, volts = _read_csv(path)
|
times, volts = _read_csv(path)
|
||||||
dt = float(np.diff(times).mean())
|
dt = float(np.diff(times).mean())
|
||||||
|
|||||||
231
explode_h5.py
Normal file
231
explode_h5.py
Normal file
@@ -0,0 +1,231 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
explode_h5.py — split a Keysight segmented H5 file into per-segment CSVs.
|
||||||
|
|
||||||
|
When the scope is in segmented memory mode, a single :DISK:SAVE:WAVeform
|
||||||
|
call dumps all N segments into one .h5 file (much faster than saving N CSVs
|
||||||
|
sequentially). This script splits that file back into individual CSVs whose
|
||||||
|
names match the lp_ pattern that csv_preprocessor.analyze_lp_file() expects:
|
||||||
|
|
||||||
|
{ts}_lp_{cap_id}_seg{NNN}_{clk|dat}.csv
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 explode_h5.py <file.h5> [<file.h5> ...]
|
||||||
|
|
||||||
|
Or import explode() from this module.
|
||||||
|
|
||||||
|
Notes on Keysight Infiniium H5 layout:
|
||||||
|
The format used by :DISK:SAVE:WAVeform ... ,H5 nests waveform datasets
|
||||||
|
inside a "Waveforms"/"Channel N" group, with attributes XInc, XOrg,
|
||||||
|
YInc, YOrg, NumSegments, NumPoints, etc. We probe the structure
|
||||||
|
dynamically because slight variations exist between firmware versions.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import h5py
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
LP_NAME_RE = re.compile(
|
||||||
|
r"(?P<ts>\d{8}_\d{6})_lp_(?P<id>c\d+_\d+|\d+)_(?P<chan>clk|dat)\.h5",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _walk(grp, depth: int = 0, max_depth: int = 4) -> list[tuple[str, h5py.Group]]:
|
||||||
|
"""Return all groups under `grp` up to max_depth, with their full paths."""
|
||||||
|
out = [(grp.name, grp)]
|
||||||
|
if depth >= max_depth:
|
||||||
|
return out
|
||||||
|
if isinstance(grp, h5py.Group):
|
||||||
|
for k in grp.keys():
|
||||||
|
try:
|
||||||
|
child = grp[k]
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if isinstance(child, h5py.Group):
|
||||||
|
out.extend(_walk(child, depth + 1, max_depth))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _find_segments(h5_root) -> tuple[h5py.Group, list[str], dict]:
|
||||||
|
"""
|
||||||
|
Locate the group that contains per-segment waveform datasets.
|
||||||
|
|
||||||
|
Returns (group, sorted_dataset_keys, attrs_dict). The attrs dict merges
|
||||||
|
attributes from the root, parent, and target group so we can find
|
||||||
|
XInc / XOrg / YInc / YOrg wherever Keysight chose to put them.
|
||||||
|
"""
|
||||||
|
groups = _walk(h5_root)
|
||||||
|
|
||||||
|
# Score each group by how many child *datasets* it has (segments are
|
||||||
|
# typically datasets named "Waveform 1", "Waveform 2", ... or
|
||||||
|
# "Channel 1", or just "1", "2", ...).
|
||||||
|
best = None
|
||||||
|
best_count = 0
|
||||||
|
for path, grp in groups:
|
||||||
|
if not isinstance(grp, h5py.Group):
|
||||||
|
continue
|
||||||
|
ds_keys = [k for k in grp.keys() if isinstance(grp[k], h5py.Dataset)]
|
||||||
|
# Filter: only datasets whose shape looks like a 1-D voltage trace
|
||||||
|
ds_keys = [
|
||||||
|
k for k in ds_keys
|
||||||
|
if grp[k].ndim == 1 and grp[k].size > 100
|
||||||
|
]
|
||||||
|
if len(ds_keys) > best_count:
|
||||||
|
best_count = len(ds_keys)
|
||||||
|
best = (grp, ds_keys)
|
||||||
|
|
||||||
|
if best is None or best_count == 0:
|
||||||
|
# 2-D dataset case: a single dataset of shape (N_segments, N_points)
|
||||||
|
for path, grp in groups:
|
||||||
|
for k in grp.keys() if isinstance(grp, h5py.Group) else []:
|
||||||
|
ds = grp[k]
|
||||||
|
if isinstance(ds, h5py.Dataset) and ds.ndim == 2 and ds.shape[0] > 1 and ds.shape[1] > 100:
|
||||||
|
return grp, [k], _collect_attrs(h5_root, grp, ds)
|
||||||
|
raise ValueError("No segment datasets found in H5")
|
||||||
|
|
||||||
|
grp, ds_keys = best
|
||||||
|
# Numerical sort if keys end with digits
|
||||||
|
ds_keys.sort(key=lambda s: (
|
||||||
|
int(re.search(r"\d+", s).group()) if re.search(r"\d+", s) else 0
|
||||||
|
))
|
||||||
|
return grp, ds_keys, _collect_attrs(h5_root, grp)
|
||||||
|
|
||||||
|
|
||||||
|
def _collect_attrs(*scopes) -> dict:
|
||||||
|
"""Merge attrs from multiple HDF5 nodes (later overrides earlier)."""
|
||||||
|
out = {}
|
||||||
|
for s in scopes:
|
||||||
|
try:
|
||||||
|
out.update({k: s.attrs[k] for k in s.attrs})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _attr(attrs: dict, *names, default=None):
|
||||||
|
"""Return the first attribute that exists from a list of candidate names."""
|
||||||
|
for n in names:
|
||||||
|
if n in attrs:
|
||||||
|
v = attrs[n]
|
||||||
|
try:
|
||||||
|
# numpy scalar/bytes to native python
|
||||||
|
if isinstance(v, (bytes, bytearray)):
|
||||||
|
v = v.decode(errors="ignore")
|
||||||
|
if hasattr(v, "item") and getattr(v, "size", 1) == 1:
|
||||||
|
v = v.item()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return v
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def explode(h5_path: Path, out_dir: Path | None = None,
|
||||||
|
verbose: bool = False) -> list[Path]:
|
||||||
|
"""
|
||||||
|
Split `h5_path` into per-segment CSVs.
|
||||||
|
|
||||||
|
Returns the list of CSV paths written. CSVs are placed in `out_dir`
|
||||||
|
(default: same dir as h5_path).
|
||||||
|
"""
|
||||||
|
h5_path = Path(h5_path)
|
||||||
|
out_dir = Path(out_dir) if out_dir else h5_path.parent
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
name_match = LP_NAME_RE.match(h5_path.name)
|
||||||
|
if not name_match:
|
||||||
|
raise ValueError(f"Not an LP H5 filename: {h5_path.name}")
|
||||||
|
ts = name_match["ts"]
|
||||||
|
cap_id = name_match["id"]
|
||||||
|
chan = name_match["chan"]
|
||||||
|
|
||||||
|
csvs: list[Path] = []
|
||||||
|
with h5py.File(h5_path, "r") as f:
|
||||||
|
grp, ds_keys, attrs = _find_segments(f)
|
||||||
|
x_inc = float(_attr(attrs, "XInc", "XIncrement", "x_increment", default=1e-10))
|
||||||
|
x_org = float(_attr(attrs, "XOrg", "XOrigin", "x_origin", default=0.0))
|
||||||
|
y_inc = _attr(attrs, "YInc", "YIncrement", "y_increment", default=None)
|
||||||
|
y_org = _attr(attrs, "YOrg", "YOrigin", "y_origin", default=None)
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f" group: {grp.name} segments: {len(ds_keys)} "
|
||||||
|
f"XInc={x_inc:.3e} XOrg={x_org:.3e} YInc={y_inc} YOrg={y_org}")
|
||||||
|
|
||||||
|
# Single 2-D dataset case: shape (N_segments, N_points)
|
||||||
|
if len(ds_keys) == 1 and grp[ds_keys[0]].ndim == 2:
|
||||||
|
ds = grp[ds_keys[0]][:]
|
||||||
|
for i in range(ds.shape[0]):
|
||||||
|
volts = np.asarray(ds[i], dtype=float)
|
||||||
|
if y_inc is not None and y_org is not None:
|
||||||
|
volts = volts * float(y_inc) + float(y_org)
|
||||||
|
csvs.append(_write_segment_csv(
|
||||||
|
out_dir, ts, cap_id, chan, i + 1, x_inc, x_org, volts,
|
||||||
|
))
|
||||||
|
return csvs
|
||||||
|
|
||||||
|
# Multi-dataset case: each dataset is one segment
|
||||||
|
for i, key in enumerate(ds_keys, start=1):
|
||||||
|
volts = np.asarray(grp[key][:], dtype=float)
|
||||||
|
if y_inc is not None and y_org is not None:
|
||||||
|
# Some Keysight files store raw codes that need scaling
|
||||||
|
if np.issubdtype(grp[key].dtype, np.integer):
|
||||||
|
volts = volts * float(y_inc) + float(y_org)
|
||||||
|
csvs.append(_write_segment_csv(
|
||||||
|
out_dir, ts, cap_id, chan, i, x_inc, x_org, volts,
|
||||||
|
))
|
||||||
|
return csvs
|
||||||
|
|
||||||
|
|
||||||
|
def _write_segment_csv(out_dir: Path, ts: str, cap_id: str, chan: str,
|
||||||
|
seg_idx: int, x_inc: float, x_org: float,
|
||||||
|
volts: np.ndarray) -> Path:
|
||||||
|
n = len(volts)
|
||||||
|
times = np.arange(n) * x_inc + x_org
|
||||||
|
csv_path = out_dir / f"{ts}_lp_{cap_id}_seg{seg_idx:03d}_{chan}.csv"
|
||||||
|
np.savetxt(
|
||||||
|
csv_path,
|
||||||
|
np.column_stack([times, volts]),
|
||||||
|
delimiter=",",
|
||||||
|
fmt="%.6e",
|
||||||
|
)
|
||||||
|
return csv_path
|
||||||
|
|
||||||
|
|
||||||
|
def inspect(h5_path: Path) -> None:
|
||||||
|
"""Print the H5 hierarchy + attrs. Useful for debugging unknown files."""
|
||||||
|
with h5py.File(h5_path, "r") as f:
|
||||||
|
def visit(name, obj):
|
||||||
|
if isinstance(obj, h5py.Group):
|
||||||
|
kind = "GROUP"
|
||||||
|
shape = ""
|
||||||
|
else:
|
||||||
|
kind = "DSET"
|
||||||
|
shape = f" shape={obj.shape} dtype={obj.dtype}"
|
||||||
|
print(f" {kind} /{name}{shape}")
|
||||||
|
for k, v in obj.attrs.items():
|
||||||
|
vs = str(v)[:60]
|
||||||
|
print(f" attr {k} = {vs}")
|
||||||
|
f.visititems(visit)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
args = sys.argv[1:]
|
||||||
|
if not args:
|
||||||
|
print(__doc__)
|
||||||
|
sys.exit(0)
|
||||||
|
if args[0] == "--inspect":
|
||||||
|
for p in args[1:]:
|
||||||
|
print(f"\n=== {p} ===")
|
||||||
|
inspect(Path(p))
|
||||||
|
sys.exit(0)
|
||||||
|
for p in args:
|
||||||
|
try:
|
||||||
|
outs = explode(Path(p), verbose=True)
|
||||||
|
print(f"{Path(p).name}: {len(outs)} segment(s) → CSVs")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{Path(p).name}: ERROR — {e}")
|
||||||
478
flicker_watch.py
Normal file
478
flicker_watch.py
Normal file
@@ -0,0 +1,478 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
flicker_watch.py — Continuous LP capture during video on/off cycles.
|
||||||
|
|
||||||
|
Operator watches the display. Script keeps cycling the video stream on/off
|
||||||
|
and triggering LP captures in the background. Files accumulate on the scope
|
||||||
|
without being transferred (fast).
|
||||||
|
|
||||||
|
Keys (no Enter needed):
|
||||||
|
f — flicker observed: transfer + archive + analyse recent captures
|
||||||
|
g — good baseline: transfer + archive recent captures (no analysis)
|
||||||
|
q — quit
|
||||||
|
|
||||||
|
Captures are organised under data/flicker/{event_ts}/ or data/good/{event_ts}/.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import select
|
||||||
|
import shutil
|
||||||
|
import sys
|
||||||
|
import termios
|
||||||
|
import time
|
||||||
|
import tty
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import vxi11
|
||||||
|
|
||||||
|
import ai_mgmt
|
||||||
|
from csv_preprocessor import analyze_lp_file
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Config
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
SCOPE_IP = "192.168.45.4"
|
||||||
|
DEVICE_BASE = "http://192.168.45.8:5000"
|
||||||
|
VIDEO_URL = f"{DEVICE_BASE}/video"
|
||||||
|
|
||||||
|
DATA_DIR = Path(__file__).parent / "data"
|
||||||
|
FLICKER_DIR = DATA_DIR / "flicker"
|
||||||
|
GOOD_DIR = DATA_DIR / "good"
|
||||||
|
|
||||||
|
# LP capture parameters (matched to mipi_test_interactive.py)
|
||||||
|
LP_SCALE = 1e-6 # 1 µs/div → 20 µs window
|
||||||
|
LP_POINTS = 200_000
|
||||||
|
LP_TRIG_OFFSET = 9e-6 # 1 µs pre / 19 µs post-trigger
|
||||||
|
LP_V_SCALE = 0.2
|
||||||
|
LP_V_OFFSET = 0.6
|
||||||
|
LP_TRIG_LEVEL = 0.6
|
||||||
|
|
||||||
|
# Segmented memory: capture N back-to-back LP triggers per :DIGitize, then
|
||||||
|
# dump the whole acquisition as a single H5 file. Massively higher coverage
|
||||||
|
# than single-shot CSV captures.
|
||||||
|
SEGMENT_COUNT = 100
|
||||||
|
SAVE_FORMAT = "H5" # Keysight native multi-segment format
|
||||||
|
|
||||||
|
CYCLE_S = 10.0 # seconds video is on per cycle
|
||||||
|
# Filling N segments takes ~N × LP-trigger period. LP triggers fire roughly
|
||||||
|
# at line rate (≈48 kHz) so 100 segments fill in ms, but allow margin.
|
||||||
|
TRIG_TIMEOUT_S = max(SEGMENT_COUNT * 0.020 + 5.0, 10.0)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scope setup
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
scope = vxi11.Instrument(SCOPE_IP)
|
||||||
|
scope.timeout = 30
|
||||||
|
|
||||||
|
|
||||||
|
def setup_scope() -> None:
|
||||||
|
"""One-shot scope init — channels, math, default trigger."""
|
||||||
|
print("CONFIGURING SCOPE...")
|
||||||
|
cmds = [
|
||||||
|
"*RST", ":RUN", ":STOP",
|
||||||
|
":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2",
|
||||||
|
":CHANnel1:LABel 'CLK+'",
|
||||||
|
":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2",
|
||||||
|
":CHANnel2:LABel 'CLK-'",
|
||||||
|
":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2",
|
||||||
|
":CHANnel3:LABel 'DAT0+'",
|
||||||
|
":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2",
|
||||||
|
":CHANnel4:LABel 'DAT0-'",
|
||||||
|
":TIMebase:REFerence CENTer",
|
||||||
|
":TRIGger:MODE EDGE",
|
||||||
|
":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON",
|
||||||
|
":DISPlay:LAYout STACKED",
|
||||||
|
]
|
||||||
|
for c in cmds:
|
||||||
|
scope.write(c)
|
||||||
|
time.sleep(0.05)
|
||||||
|
print("SCOPE READY.")
|
||||||
|
|
||||||
|
|
||||||
|
def configure_for_lp() -> None:
|
||||||
|
"""LP-mode + segmented memory: N back-to-back LP triggers per acquisition."""
|
||||||
|
for ch in (1, 2, 3, 4):
|
||||||
|
scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}")
|
||||||
|
scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}")
|
||||||
|
scope.write(":TRIGger:EDGE:SOURce CHANnel3")
|
||||||
|
scope.write(":TRIGger:EDGE:SLOPe NEGative")
|
||||||
|
scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}")
|
||||||
|
scope.write(":TRIGger:SWEep NORMal")
|
||||||
|
scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}")
|
||||||
|
scope.write(f":ACQuire:POINts {LP_POINTS}")
|
||||||
|
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
|
||||||
|
# Segmented memory: fill N segments per :DIGitize.
|
||||||
|
scope.write(":ACQuire:MODE SEGMented")
|
||||||
|
scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}")
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
def arm_and_wait(timeout_s: float) -> bool:
|
||||||
|
""":DIGitize + *OPC?. Returns True if trigger fired within timeout."""
|
||||||
|
global scope
|
||||||
|
prev = scope.timeout
|
||||||
|
try:
|
||||||
|
scope.timeout = timeout_s + 2
|
||||||
|
scope.write(":DIGitize")
|
||||||
|
return scope.ask("*OPC?").strip() == "1"
|
||||||
|
except Exception:
|
||||||
|
# Trigger timed out or scope locked up — reconnect.
|
||||||
|
try:
|
||||||
|
scope.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(1.0)
|
||||||
|
scope = vxi11.Instrument(SCOPE_IP)
|
||||||
|
scope.timeout = 30
|
||||||
|
try:
|
||||||
|
scope.write(":STOP")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
scope.timeout = prev
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def save_lp(base_name: str) -> None:
|
||||||
|
"""Save all N segments of Ch1 (CLK+) and Ch3 (DAT0+) as a single H5 each."""
|
||||||
|
base = f"C:\\TEMP\\{base_name}"
|
||||||
|
ext = SAVE_FORMAT.lower()
|
||||||
|
scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.{ext}",{SAVE_FORMAT}')
|
||||||
|
time.sleep(3.0)
|
||||||
|
scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.{ext}",{SAVE_FORMAT}')
|
||||||
|
time.sleep(3.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Non-blocking keyboard
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class KeyReader:
|
||||||
|
def __enter__(self):
|
||||||
|
self.fd = sys.stdin.fileno()
|
||||||
|
self.old = termios.tcgetattr(self.fd)
|
||||||
|
tty.setcbreak(self.fd)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_key(self) -> str | None:
|
||||||
|
if select.select([sys.stdin], [], [], 0)[0]:
|
||||||
|
return sys.stdin.read(1).lower()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def __exit__(self, *_):
|
||||||
|
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Video control
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def video_start() -> None:
|
||||||
|
try:
|
||||||
|
requests.put(VIDEO_URL,
|
||||||
|
json={"action": "start", "mode": "static-pink"},
|
||||||
|
timeout=3)
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
print(f" VIDEO START failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def video_stop() -> None:
|
||||||
|
try:
|
||||||
|
requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3)
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
print(f" VIDEO STOP failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# H5 transfer (ai_mgmt only handles CSV — segmented mode produces .h5)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def _transfer_h5_files() -> int:
|
||||||
|
"""SMB-pull every .h5 from the scope share into DATA_DIR; delete on scope."""
|
||||||
|
from smb.SMBConnection import SMBConnection
|
||||||
|
import socket
|
||||||
|
conn = SMBConnection(
|
||||||
|
ai_mgmt.USERNAME, ai_mgmt.PASSWORD,
|
||||||
|
socket.gethostname(), ai_mgmt.SERVER_NAME,
|
||||||
|
use_ntlm_v2=True, is_direct_tcp=True,
|
||||||
|
)
|
||||||
|
if not conn.connect(ai_mgmt.SERVER, 445):
|
||||||
|
print(" H5 transfer: could not connect to scope share")
|
||||||
|
return 0
|
||||||
|
count = 0
|
||||||
|
try:
|
||||||
|
h5_paths: list[str] = []
|
||||||
|
def walk(path: str) -> None:
|
||||||
|
for entry in conn.listPath(ai_mgmt.SHARE, path):
|
||||||
|
if entry.filename in (".", ".."):
|
||||||
|
continue
|
||||||
|
full = f"{path}/{entry.filename}"
|
||||||
|
if entry.isDirectory:
|
||||||
|
walk(full)
|
||||||
|
elif entry.filename.lower().endswith(".h5"):
|
||||||
|
h5_paths.append(full)
|
||||||
|
walk("/")
|
||||||
|
for remote in h5_paths:
|
||||||
|
local = DATA_DIR / Path(remote).name
|
||||||
|
try:
|
||||||
|
with open(local, "wb") as fh:
|
||||||
|
conn.retrieveFile(ai_mgmt.SHARE, remote, fh)
|
||||||
|
conn.deleteFiles(ai_mgmt.SHARE, remote)
|
||||||
|
count += 1
|
||||||
|
except Exception as e:
|
||||||
|
print(f" H5 transfer failed for {Path(remote).name}: {e}")
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Register snapshot from device (DSIM PHY + SN65DSI83)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def fetch_registers_snapshot(target_dir: Path, event_ts: str) -> None:
|
||||||
|
"""GET /registers + /sn65_registers, print key indicators, save JSON."""
|
||||||
|
combined: dict = {}
|
||||||
|
for endpoint, key in [("/registers", "dsim"),
|
||||||
|
("/sn65_registers", "sn65")]:
|
||||||
|
try:
|
||||||
|
r = requests.get(f"{DEVICE_BASE}{endpoint}", timeout=5)
|
||||||
|
r.raise_for_status()
|
||||||
|
combined[key] = r.json()
|
||||||
|
except Exception as e:
|
||||||
|
print(f" REGISTERS: {endpoint} failed — {e}")
|
||||||
|
combined[key] = None
|
||||||
|
|
||||||
|
# Quick-look indicators
|
||||||
|
sn65 = combined.get("sn65") or {}
|
||||||
|
regs = sn65.get("registers", {}) if isinstance(sn65, dict) else {}
|
||||||
|
csr_0a = regs.get("csr_0a", {}) or {}
|
||||||
|
csr_e5 = regs.get("csr_e5", {}) or {}
|
||||||
|
|
||||||
|
if csr_0a:
|
||||||
|
pll_str = "LOCKED" if csr_0a.get("pll_lock") else "*** UNLOCKED ***"
|
||||||
|
clk_str = "detected" if csr_0a.get("clk_det") else "NOT detected"
|
||||||
|
print(f" SN65: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a.get('value')})")
|
||||||
|
|
||||||
|
if csr_e5:
|
||||||
|
flags = [
|
||||||
|
("pll_unlock", "PLL_UNLOCK"),
|
||||||
|
("cha_sot_bit_err", "SOT_BIT_ERR"),
|
||||||
|
("cha_llp_err", "LLP_ERR"),
|
||||||
|
("cha_ecc_err", "ECC_ERR"),
|
||||||
|
("cha_lp_err", "LP_ERR"),
|
||||||
|
("cha_crc_err", "CRC_ERR"),
|
||||||
|
]
|
||||||
|
active = [label for k, label in flags if csr_e5.get(k)]
|
||||||
|
if active:
|
||||||
|
print(f" SN65: *** ERROR FLAGS: {', '.join(active)} "
|
||||||
|
f"(CSR 0xE5 = {csr_e5.get('value')}) ***")
|
||||||
|
else:
|
||||||
|
print(f" SN65: no error flags (CSR 0xE5 = {csr_e5.get('value')})")
|
||||||
|
|
||||||
|
out = target_dir / f"{event_ts}_registers.json"
|
||||||
|
try:
|
||||||
|
out.write_text(json.dumps(combined, indent=2))
|
||||||
|
print(f" registers → {out.relative_to(DATA_DIR.parent)}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" REGISTERS save failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Event handling: archive recent captures and (for flicker) analyse
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def archive_and_analyse(event: str, since_iso: str) -> None:
|
||||||
|
"""
|
||||||
|
Pull every CSV from the scope, move into data/{event}/{event_ts}/.
|
||||||
|
For flicker events, run csv_preprocessor on each LP capture and print a
|
||||||
|
summary table. Always pulls a register snapshot from the device too.
|
||||||
|
"""
|
||||||
|
event_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
target = (FLICKER_DIR if event == "flicker" else GOOD_DIR) / event_ts
|
||||||
|
target.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
print(f"\n *** {event.upper()} EVENT @ {event_ts} ***")
|
||||||
|
|
||||||
|
# Register snapshot first (fast, before scope transfer which takes longer)
|
||||||
|
fetch_registers_snapshot(target, event_ts)
|
||||||
|
|
||||||
|
print(f" Transferring scope → {target} ...")
|
||||||
|
try:
|
||||||
|
copied, failed = ai_mgmt.transfer_csv_files()
|
||||||
|
except Exception as e:
|
||||||
|
print(f" TRANSFER ERROR: {e}")
|
||||||
|
return
|
||||||
|
print(f" {copied} file(s) transferred ({failed} failed)")
|
||||||
|
|
||||||
|
# ai_mgmt only fetches CSVs. H5 (segmented) files need a separate pass.
|
||||||
|
h5_count = _transfer_h5_files()
|
||||||
|
if h5_count:
|
||||||
|
print(f" {h5_count} H5 file(s) transferred")
|
||||||
|
|
||||||
|
# Move just-arrived files (csv + h5) out of data/ (flat) into the event folder.
|
||||||
|
moved = 0
|
||||||
|
for f in list(DATA_DIR.glob("*.csv")) + list(DATA_DIR.glob("*.h5")):
|
||||||
|
if f.is_file():
|
||||||
|
shutil.move(str(f), target / f.name)
|
||||||
|
moved += 1
|
||||||
|
print(f" {moved} file(s) archived to {target.relative_to(DATA_DIR.parent)}")
|
||||||
|
|
||||||
|
# Explode each H5 into per-segment CSVs so csv_preprocessor can analyse them.
|
||||||
|
from explode_h5 import explode
|
||||||
|
h5_files = sorted(target.glob("*_lp_*.h5"))
|
||||||
|
seg_csv_count = 0
|
||||||
|
for h5 in h5_files:
|
||||||
|
try:
|
||||||
|
csvs = explode(h5)
|
||||||
|
seg_csv_count += len(csvs)
|
||||||
|
except Exception as e:
|
||||||
|
print(f" EXPLODE error on {h5.name}: {e}")
|
||||||
|
if h5_files:
|
||||||
|
print(f" exploded {len(h5_files)} H5 file(s) → {seg_csv_count} segment CSV(s)")
|
||||||
|
|
||||||
|
if event != "flicker":
|
||||||
|
return
|
||||||
|
|
||||||
|
# Analyse every segment CSV. Flag outliers.
|
||||||
|
print("\n Per-segment LP analysis:")
|
||||||
|
rows = []
|
||||||
|
for f in sorted(target.glob("*_lp_*_dat.csv")):
|
||||||
|
try:
|
||||||
|
m = analyze_lp_file(f)
|
||||||
|
rows.append({
|
||||||
|
"file": f.name,
|
||||||
|
"lp_low": float(m.lp_low_duration_ns) if m.lp_low_duration_ns is not None else None,
|
||||||
|
"hs_amp": float(m.hs_amplitude_mv) if m.hs_amplitude_mv is not None else None,
|
||||||
|
"hs_dur": float(m.hs_burst_dur_ns) if m.hs_burst_dur_ns is not None else None,
|
||||||
|
"n_burst": int(m.n_hs_bursts) if m.n_hs_bursts is not None else None,
|
||||||
|
"sus": bool(m.flicker_suspect),
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
rows.append({"file": f.name, "error": str(e)})
|
||||||
|
|
||||||
|
n_total = len(rows)
|
||||||
|
n_sus = sum(1 for r in rows if r.get("sus"))
|
||||||
|
print(f" {n_total} segments analysed ({n_sus} flagged as flicker_suspect)")
|
||||||
|
|
||||||
|
# Outlier search across the segments themselves.
|
||||||
|
def _outliers(field: str, lo_thresh: float | None = None,
|
||||||
|
hi_thresh: float | None = None) -> list[dict]:
|
||||||
|
vals = sorted(r[field] for r in rows if r.get(field) is not None)
|
||||||
|
if not vals:
|
||||||
|
return []
|
||||||
|
med = vals[len(vals) // 2]
|
||||||
|
out = []
|
||||||
|
for r in rows:
|
||||||
|
v = r.get(field)
|
||||||
|
if v is None: continue
|
||||||
|
far = (lo_thresh is not None and v < lo_thresh) or \
|
||||||
|
(hi_thresh is not None and v > hi_thresh)
|
||||||
|
if far:
|
||||||
|
out.append({"file": r["file"], field: v, "median": med})
|
||||||
|
return out
|
||||||
|
|
||||||
|
print("\n Anomalies vs segment-set median:")
|
||||||
|
for label, field, lo, hi in [
|
||||||
|
("very-short LP-low (<50 ns)", "lp_low", 50, None),
|
||||||
|
("very-low HS amplitude (<50 mV)", "hs_amp", 50, None),
|
||||||
|
("very-high HS amplitude (>140 mV)","hs_amp", None, 140),
|
||||||
|
("short HS burst (<8000 ns)", "hs_dur", 8000, None),
|
||||||
|
]:
|
||||||
|
ax = _outliers(field, lo, hi)
|
||||||
|
if ax:
|
||||||
|
print(f" {label}: {len(ax)} segment(s)")
|
||||||
|
for x in ax[:8]:
|
||||||
|
print(f" {x['file']} {field}={x[field]:.1f} "
|
||||||
|
f"(set median={x['median']:.1f})")
|
||||||
|
if len(ax) > 8:
|
||||||
|
print(f" ... +{len(ax) - 8} more")
|
||||||
|
else:
|
||||||
|
print(f" {label}: none")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Main loop
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
def main() -> None:
|
||||||
|
DATA_DIR.mkdir(exist_ok=True)
|
||||||
|
FLICKER_DIR.mkdir(exist_ok=True)
|
||||||
|
GOOD_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
setup_scope()
|
||||||
|
configure_for_lp()
|
||||||
|
|
||||||
|
print("\n" + "=" * 64)
|
||||||
|
print(" FLICKER WATCH — keys: f=flicker g=good q=quit")
|
||||||
|
print("=" * 64 + "\n")
|
||||||
|
|
||||||
|
cycle = 0
|
||||||
|
try:
|
||||||
|
with KeyReader() as keys:
|
||||||
|
while True:
|
||||||
|
cycle += 1
|
||||||
|
cycle_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
cycle_caps = []
|
||||||
|
cycle_end = time.time() + CYCLE_S
|
||||||
|
|
||||||
|
video_start()
|
||||||
|
print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON "
|
||||||
|
f"({CYCLE_S:.0f}s window, {SEGMENT_COUNT} segs/acquire)",
|
||||||
|
flush=True)
|
||||||
|
|
||||||
|
event = None
|
||||||
|
last_tick = 0.0
|
||||||
|
while time.time() < cycle_end:
|
||||||
|
seq = len(cycle_caps) + 1
|
||||||
|
base = f"{cycle_ts}_lp_c{cycle:03d}_{seq:02d}"
|
||||||
|
remaining = lambda: max(0, cycle_end - time.time())
|
||||||
|
|
||||||
|
if arm_and_wait(TRIG_TIMEOUT_S):
|
||||||
|
try:
|
||||||
|
save_lp(base)
|
||||||
|
cycle_caps.append(base)
|
||||||
|
print(f" + acq {seq:02d} ({SEGMENT_COUNT} segs) "
|
||||||
|
f"[{remaining():4.1f}s left]", flush=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f" save error: {e}", flush=True)
|
||||||
|
else:
|
||||||
|
# Trigger timed out — print a heartbeat at most every 2s
|
||||||
|
if time.time() - last_tick > 2.0:
|
||||||
|
print(f" ... waiting for trigger "
|
||||||
|
f"[{remaining():4.1f}s left]", flush=True)
|
||||||
|
last_tick = time.time()
|
||||||
|
|
||||||
|
key = keys.get_key()
|
||||||
|
if key in ("f", "g", "q"):
|
||||||
|
event = key
|
||||||
|
break
|
||||||
|
|
||||||
|
video_stop()
|
||||||
|
if event is None:
|
||||||
|
print(f"[cycle {cycle:03d}] ended "
|
||||||
|
f"({len(cycle_caps)} acq(s) ≈ "
|
||||||
|
f"{len(cycle_caps) * SEGMENT_COUNT} segments, no event)",
|
||||||
|
flush=True)
|
||||||
|
|
||||||
|
if event == "f":
|
||||||
|
archive_and_analyse("flicker", cycle_ts)
|
||||||
|
elif event == "g":
|
||||||
|
archive_and_analyse("good", cycle_ts)
|
||||||
|
elif event == "q":
|
||||||
|
print("\nQUIT requested.")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Brief pause before next cycle so video stop settles.
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nInterrupted (Ctrl+C).")
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
video_stop()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -21,7 +21,6 @@ AUTHOR: D. RICE 16/04/2026
|
|||||||
import csv as _csv_mod
|
import csv as _csv_mod
|
||||||
import html
|
import html
|
||||||
import json
|
import json
|
||||||
import subprocess
|
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
import requests
|
import requests
|
||||||
@@ -38,7 +37,6 @@ import vxi11
|
|||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
import ai_mgmt
|
import ai_mgmt
|
||||||
import rigol_scope
|
|
||||||
from csv_preprocessor import (analyze_lp_file, LPMetrics,
|
from csv_preprocessor import (analyze_lp_file, LPMetrics,
|
||||||
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
|
HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS)
|
||||||
|
|
||||||
@@ -420,7 +418,6 @@ except Exception as e:
|
|||||||
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
|
print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
rigol_scope.connect()
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Scope configuration (identical to mipi_test.py)
|
# Scope configuration (identical to mipi_test.py)
|
||||||
@@ -676,7 +673,6 @@ def _fetch_registers(ts: str, iteration: int) -> None:
|
|||||||
|
|
||||||
# ── Register snapshot: print start values and flag any changes ───
|
# ── Register snapshot: print start values and flag any changes ───
|
||||||
snap_start = settling.get("snapshot_start") or {}
|
snap_start = settling.get("snapshot_start") or {}
|
||||||
snap_end = settling.get("snapshot_end") or {}
|
|
||||||
changed = settling.get("changed_regs") or {}
|
changed = settling.get("changed_regs") or {}
|
||||||
|
|
||||||
if snap_start:
|
if snap_start:
|
||||||
@@ -738,21 +734,11 @@ def dual_capture(iteration: int) -> str:
|
|||||||
_configure_for_lp()
|
_configure_for_lp()
|
||||||
_set_timebase(LP_SCALE, LP_POINTS)
|
_set_timebase(LP_SCALE, LP_POINTS)
|
||||||
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
|
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
|
||||||
if rigol_scope.is_connected():
|
|
||||||
rigol_scope.arm()
|
|
||||||
if _arm_and_wait(timeout=30):
|
if _arm_and_wait(timeout=30):
|
||||||
_save_pass_channels("lp", iteration, ts)
|
_save_pass_channels("lp", iteration, ts)
|
||||||
else:
|
else:
|
||||||
print(" SKIPPING LP SAVE.")
|
print(" SKIPPING LP SAVE.")
|
||||||
scope.write(":TIMebase:POSition 0") # restore centred for subsequent passes
|
scope.write(":TIMebase:POSition 0") # restore centred for subsequent passes
|
||||||
if rigol_scope.is_connected():
|
|
||||||
DATA_DIR.mkdir(exist_ok=True)
|
|
||||||
v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv"
|
|
||||||
n = rigol_scope.read_waveform_csv(v18_path)
|
|
||||||
if n:
|
|
||||||
print(f" SAVED: {v18_path.name} ({n} samples)")
|
|
||||||
else:
|
|
||||||
print(" RIGOL CH1: waveform read failed — check connection and probe.")
|
|
||||||
_restore_hs_config()
|
_restore_hs_config()
|
||||||
|
|
||||||
# ── Pass 2: HS signal quality ──────────────────────────────────────────
|
# ── Pass 2: HS signal quality ──────────────────────────────────────────
|
||||||
@@ -1022,8 +1008,6 @@ def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics
|
|||||||
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
|
ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
_configure_for_lp()
|
_configure_for_lp()
|
||||||
_set_timebase(LP_SCALE, LP_POINTS)
|
_set_timebase(LP_SCALE, LP_POINTS)
|
||||||
if rigol_scope.is_connected():
|
|
||||||
rigol_scope.arm()
|
|
||||||
if _arm_and_wait(timeout=10):
|
if _arm_and_wait(timeout=10):
|
||||||
_save_pass_channels("lp", iteration, ts_fu)
|
_save_pass_channels("lp", iteration, ts_fu)
|
||||||
else:
|
else:
|
||||||
@@ -1545,6 +1529,129 @@ def run_interactive_test() -> None:
|
|||||||
f"({len(events)} total suspect(s) assessed)")
|
f"({len(events)} total suspect(s) assessed)")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Continuous capture mode (periodic flicker — no kiosk restart)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def run_continuous_test() -> None:
|
||||||
|
"""
|
||||||
|
Continuous LP capture loop — pipeline restart per iteration.
|
||||||
|
|
||||||
|
The pipeline (kiosk) is stopped and restarted on every iteration so the
|
||||||
|
scope captures the startup LP-11→LP-01 transition that triggers the flicker.
|
||||||
|
The scope is configured and armed BEFORE _start_video() is called so that
|
||||||
|
the first HS burst after pipeline load is always captured.
|
||||||
|
|
||||||
|
Sequence per iteration:
|
||||||
|
1. _stop_video() — tear down pipeline
|
||||||
|
2. _configure_for_lp() — set scope channels + trigger (takes ~400 ms)
|
||||||
|
3. _start_video() — reload pipeline (LP transition fires ~1-2 s later)
|
||||||
|
4. _arm_and_wait() — scope captures first LP-11→LP-01 on Ch3
|
||||||
|
5. Transfer + LP analysis
|
||||||
|
6. If suspect: LP bit decode + byte comparison vs last clean capture
|
||||||
|
|
||||||
|
Press Ctrl+C to stop. No HTML report is written; raw LP CSVs are kept in data/.
|
||||||
|
"""
|
||||||
|
import proto_decoder as _pd
|
||||||
|
|
||||||
|
print("\n===== CONTINUOUS CAPTURE MODE =====")
|
||||||
|
print("Pipeline restart per iteration — captures startup LP transition.")
|
||||||
|
print("LP bit decode fires automatically on flicker suspects.")
|
||||||
|
print("Press Ctrl+C to stop.\n")
|
||||||
|
|
||||||
|
iteration = 1
|
||||||
|
clean_count = 0
|
||||||
|
flicker_count = 0
|
||||||
|
last_clean_iter: int | None = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
|
||||||
|
# ── Stop pipeline, configure scope, then restart pipeline ─────────
|
||||||
|
_stop_video()
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
# Configure scope while pipeline is down — scope will be ready before
|
||||||
|
# the first LP edge fires after _start_video().
|
||||||
|
_configure_for_lp()
|
||||||
|
_set_timebase(LP_SCALE, LP_POINTS)
|
||||||
|
scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}")
|
||||||
|
|
||||||
|
_start_video()
|
||||||
|
|
||||||
|
# ── LP capture on startup transition ─────────────────────────────
|
||||||
|
ok = _arm_and_wait(timeout=10)
|
||||||
|
scope.write(":TIMebase:POSition 0")
|
||||||
|
_restore_hs_config()
|
||||||
|
|
||||||
|
if not ok:
|
||||||
|
print(f" [{iteration:04d}] LP trigger timeout — retrying")
|
||||||
|
time.sleep(0.5)
|
||||||
|
continue
|
||||||
|
|
||||||
|
_save_pass_channels("lp", iteration, ts)
|
||||||
|
|
||||||
|
# ── Transfer LP files ────────────────────────────────────────────
|
||||||
|
try:
|
||||||
|
ai_mgmt.transfer_csv_files()
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [{iteration:04d}] transfer error: {e}")
|
||||||
|
iteration += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# ── LP analysis ──────────────────────────────────────────────────
|
||||||
|
lp_summaries, suspects = _analyze_lp_files(ts, iteration)
|
||||||
|
|
||||||
|
if not suspects:
|
||||||
|
clean_count += 1
|
||||||
|
last_clean_iter = iteration
|
||||||
|
print(f" [{iteration:04d}] clean "
|
||||||
|
f"({clean_count} clean {flicker_count} flicker)")
|
||||||
|
iteration += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# ── Flicker detected ─────────────────────────────────────────────
|
||||||
|
flicker_count += 1
|
||||||
|
_play_alarm()
|
||||||
|
print(f"\n[{iteration:04d}] *** FLICKER SUSPECT #{flicker_count} ***")
|
||||||
|
for s in lp_summaries:
|
||||||
|
print(s)
|
||||||
|
|
||||||
|
# ── MIPI bit decode from LP files ────────────────────────────────
|
||||||
|
# LP files are already local (transferred above). At 10 GSa/s
|
||||||
|
# (100 ps/sample, ~23 samples/bit at 432 Mbps) they have sufficient
|
||||||
|
# resolution to decode the HS bit stream directly using single-ended
|
||||||
|
# CLK+ / DAT0+ thresholds. No separate proto pass needed.
|
||||||
|
print("\n --- MIPI BIT DECODE (from LP capture) ---")
|
||||||
|
try:
|
||||||
|
result = _pd.decode_lp_capture(iteration, DATA_DIR, verbose=True)
|
||||||
|
anomaly = _pd.analyse_for_anomalies(result)
|
||||||
|
if anomaly["anomalous"]:
|
||||||
|
print(f"\n *** BIT-LEVEL ANOMALIES: "
|
||||||
|
f"{', '.join(anomaly['flags'])} ***")
|
||||||
|
else:
|
||||||
|
print(f"\n Bit decode: no structural or content anomalies "
|
||||||
|
f"(sync OK, packet type OK, pixel content OK)")
|
||||||
|
|
||||||
|
if result and last_clean_iter is not None:
|
||||||
|
print()
|
||||||
|
_pd.compare_lp_captures(last_clean_iter, iteration, DATA_DIR)
|
||||||
|
except Exception as e:
|
||||||
|
print(f" bit decode error: {e}")
|
||||||
|
|
||||||
|
print()
|
||||||
|
iteration += 1
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n\nContinuous test stopped (Ctrl+C).")
|
||||||
|
|
||||||
|
_stop_video()
|
||||||
|
total = clean_count + flicker_count
|
||||||
|
print(f"\nSummary: {total} iterations — {clean_count} clean, "
|
||||||
|
f"{flicker_count} flicker suspect(s) caught and decoded.")
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Menu
|
# Menu
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -1556,23 +1663,18 @@ def main_menu() -> None:
|
|||||||
print("2. SETUP SCOPE (RUN FIRST)")
|
print("2. SETUP SCOPE (RUN FIRST)")
|
||||||
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
|
print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)")
|
||||||
print("4. PSU OUTPUT ON/OFF (CH1)")
|
print("4. PSU OUTPUT ON/OFF (CH1)")
|
||||||
print("5. START INTERACTIVE FLICKER TEST")
|
print("5. START INTERACTIVE FLICKER TEST (kiosk restart per iteration)")
|
||||||
print("6. EXIT")
|
print("6. START CONTINUOUS CAPTURE TEST (no restart; proto decode on flicker)")
|
||||||
|
print("7. EXIT")
|
||||||
|
|
||||||
choice = input("\nSELECT OPTION (1-6): ").strip()
|
choice = input("\nSELECT OPTION (1-7): ").strip()
|
||||||
|
|
||||||
if choice == '1':
|
if choice == '1':
|
||||||
print(f"PSU : {psu.ask('*IDN?').strip()}")
|
print(f"PSU : {psu.ask('*IDN?').strip()}")
|
||||||
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
|
print(f"SCOPE: {scope.ask('*IDN?').strip()}")
|
||||||
if rigol_scope.is_connected():
|
|
||||||
print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}")
|
|
||||||
else:
|
|
||||||
print("RIGOL: NOT CONNECTED")
|
|
||||||
|
|
||||||
elif choice == '2':
|
elif choice == '2':
|
||||||
setup_scope()
|
setup_scope()
|
||||||
if rigol_scope.is_connected():
|
|
||||||
rigol_scope.configure()
|
|
||||||
|
|
||||||
elif choice == '3':
|
elif choice == '3':
|
||||||
psu.write('CH1:VOLT 24.0')
|
psu.write('CH1:VOLT 24.0')
|
||||||
@@ -1591,14 +1693,16 @@ def main_menu() -> None:
|
|||||||
run_interactive_test()
|
run_interactive_test()
|
||||||
|
|
||||||
elif choice == '6':
|
elif choice == '6':
|
||||||
|
run_continuous_test()
|
||||||
|
|
||||||
|
elif choice == '7':
|
||||||
psu.close()
|
psu.close()
|
||||||
scope.close()
|
scope.close()
|
||||||
rigol_scope.disconnect()
|
|
||||||
print("INSTRUMENTS CLOSED. BYE.")
|
print("INSTRUMENTS CLOSED. BYE.")
|
||||||
break
|
break
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print("INVALID ENTRY. PLEASE CHOOSE 1-6.")
|
print("INVALID ENTRY. PLEASE CHOOSE 1-7.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
507
proto_decoder.py
507
proto_decoder.py
@@ -44,12 +44,31 @@ DSI_DT_RGB888 = 0x3E
|
|||||||
DSI_DT_HSYNC = 0x21 # short packet — H sync start
|
DSI_DT_HSYNC = 0x21 # short packet — H sync start
|
||||||
DSI_DT_VSYNC = 0x01 # short packet — V sync start
|
DSI_DT_VSYNC = 0x01 # short packet — V sync start
|
||||||
|
|
||||||
|
# Known-valid DSI data types used in sync-byte validation (VC=0 + DT in this set)
|
||||||
|
VALID_DSI_DT = {0x01, 0x11, 0x21, 0x31, 0x08, 0x09, 0x19, 0x29, 0x39, 0x3E}
|
||||||
|
|
||||||
# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes)
|
# MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes)
|
||||||
HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire)
|
HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire)
|
||||||
|
|
||||||
# Threshold for differential voltage: >0 = logic-1 (D+ > D-)
|
# Threshold for differential voltage: >0 = logic-1 (D+ > D-)
|
||||||
DAT_THRESH_V = 0.0
|
DAT_THRESH_V = 0.0
|
||||||
|
|
||||||
|
# Single-ended LP file thresholds (CH1=CLK+, CH3=DAT0+).
|
||||||
|
# In HS mode both CLK+ and DAT+ oscillate around the D-PHY common mode (~200 mV).
|
||||||
|
LP_SE_CLK_THRESH_V = 0.20 # CLK+ zero-crossing threshold for edge detection
|
||||||
|
LP_SE_DAT_THRESH_V = 0.20 # DAT+ HS bit threshold (> this = logic 1)
|
||||||
|
LP_SE_LP01_THRESH_V = 0.25 # DAT+ < this during LP-01/LP-00 SoT preamble
|
||||||
|
|
||||||
|
# Expected Lane 0 payload byte pattern for a static-pink display (R=0xFF G=0x33 B=0xBB).
|
||||||
|
# With 4-lane RGB888, Lane 0 carries every 4th byte of the full payload beginning at
|
||||||
|
# offset 0. The 12-byte boundary aligns R/G/B of consecutive pixels so Lane 0 sees:
|
||||||
|
# offset 0 → pixel 0 R = 0xFF
|
||||||
|
# offset 4 → pixel 1 G = 0x33
|
||||||
|
# offset 8 → pixel 2 B = 0xBB
|
||||||
|
# offset 12 → pixel 4 R = 0xFF (repeats)
|
||||||
|
# → 3-byte repeating cycle [0xFF, 0x33, 0xBB] on Lane 0.
|
||||||
|
STATIC_PINK_LANE0 = (0xFF, 0x33, 0xBB)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# I/O
|
# I/O
|
||||||
@@ -72,6 +91,18 @@ def find_proto_files(cap_num: int, data_dir: Path):
|
|||||||
return Path(clk_files[-1]), Path(dat_files[-1])
|
return Path(clk_files[-1]), Path(dat_files[-1])
|
||||||
|
|
||||||
|
|
||||||
|
def find_lp_files(cap_num: int, data_dir: Path):
|
||||||
|
pattern_clk = str(data_dir / f"*_lp_{cap_num:04d}_clk.csv")
|
||||||
|
pattern_dat = str(data_dir / f"*_lp_{cap_num:04d}_dat.csv")
|
||||||
|
clk_files = sorted(glob.glob(pattern_clk))
|
||||||
|
dat_files = sorted(glob.glob(pattern_dat))
|
||||||
|
if not clk_files:
|
||||||
|
raise FileNotFoundError(f"No LP CLK file found for cap {cap_num:04d} in {data_dir}")
|
||||||
|
if not dat_files:
|
||||||
|
raise FileNotFoundError(f"No LP DAT file found for cap {cap_num:04d} in {data_dir}")
|
||||||
|
return Path(clk_files[-1]), Path(dat_files[-1])
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Clock edge detection
|
# Clock edge detection
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -102,25 +133,91 @@ def find_clock_edges(t_clk, v_clk, threshold=0.0):
|
|||||||
# HS burst detection
|
# HS burst detection
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
|
def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False):
|
||||||
"""
|
"""
|
||||||
Find the start of the post-LP HS burst in the DAT trace.
|
Find the start of the post-LP HS burst in the DAT trace.
|
||||||
|
|
||||||
For LP-triggered captures (trigger = DAT D+ falling at LP-11→LP-01 transition):
|
single_ended=True — LP files (CH1=CLK+, CH3=DAT0+): detects LP-01/LP-00
|
||||||
- CLK is in continuous HS mode throughout (215 MHz running)
|
as DAT+ < LP_SE_LP01_THRESH_V for ≥ 20 ns, then returns
|
||||||
- DAT shows LP-01 (diff ≈ -1 V) near t=0, preceded by HS data from the
|
index 50 ns after the plateau ends (HS common-mode rise).
|
||||||
previous line and possibly an earlier LP-01 at the start of the capture
|
Search starts at index 0 — LP-11 pre-trigger (~1.2 V)
|
||||||
- LP-00 follows LP-01 briefly (~50-200 ns), then the new HS burst begins
|
is well above the threshold so no false matches.
|
||||||
- To avoid the LP-01 from the previous line (at capture start), search
|
single_ended=False — Proto files (F2=CH3-CH4 differential): LP-01 detected
|
||||||
from N//4 onwards — the trigger LP-01 is at the capture midpoint (t=0)
|
as diff < -0.5 V for ≥ 20 ns, search from N//4.
|
||||||
|
|
||||||
Returns index into t_dat just past LP-00, ready for CLK-edge sampling.
|
Returns index into t_dat just past the SoT preamble, ready for CLK-edge sampling.
|
||||||
Falls back to original std-based method for HS-triggered captures.
|
Falls back to rolling-std method for HS-triggered captures (differential only).
|
||||||
"""
|
"""
|
||||||
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
|
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
|
||||||
N = len(v_dat)
|
N = len(v_dat)
|
||||||
|
|
||||||
# --- LP-triggered path ---
|
# --- Single-ended LP path ---
|
||||||
|
# LP-01 + LP-00 + HS-PREPARE + HS-ZERO form a continuous "LP-low" region where
|
||||||
|
# DAT+ < 0.25 V and rolling std < 45 mV. The LP-low region ends when the first
|
||||||
|
# '1' bit transition in 0xB8 causes rolling std > 45 mV. Start bit decoding a
|
||||||
|
# few bits BEFORE that spike so the phase search can find complete 0xB8 near byte 0.
|
||||||
|
if single_ended:
|
||||||
|
LP11_THRESH_SE = 0.8 # V — LP-11 state (DAT+ high)
|
||||||
|
LP_LOW_V_SE = 0.25 # V — LP-01/LP-00/HS-ZERO are all below this
|
||||||
|
HS_STD_V_SE = 0.045 # V — rolling std above this → first HS data bit
|
||||||
|
LP_LOW_MIN_NS = 5.0 # ns — ignore LP-low runs shorter than this
|
||||||
|
LP_MARGIN_NS = 25.0 # ns — start decode this far before first data bit
|
||||||
|
|
||||||
|
win_samples = max(10, int(1.0 / dt_ns))
|
||||||
|
try:
|
||||||
|
from numpy.lib.stride_tricks import sliding_window_view
|
||||||
|
rstd = np.zeros(N)
|
||||||
|
wins = sliding_window_view(v_dat, win_samples)
|
||||||
|
rstd[win_samples - 1:win_samples - 1 + len(wins)] = wins.std(axis=-1)
|
||||||
|
except Exception:
|
||||||
|
rstd = np.array([v_dat[max(0, i - win_samples):i + 1].std() for i in range(N)])
|
||||||
|
|
||||||
|
# Find LP-11 end (first sample below LP11_THRESH_SE after LP-11)
|
||||||
|
lp11_end_idx = None
|
||||||
|
in_lp11 = False
|
||||||
|
for i in range(N):
|
||||||
|
if v_dat[i] > LP11_THRESH_SE:
|
||||||
|
in_lp11 = True
|
||||||
|
elif in_lp11:
|
||||||
|
lp11_end_idx = i
|
||||||
|
break
|
||||||
|
if lp11_end_idx is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
search_end = min(lp11_end_idx + int(2000.0 / dt_ns), N)
|
||||||
|
|
||||||
|
# Find LP-low plateau start: first sustained block of v < LP_LOW_V_SE
|
||||||
|
# AND rstd < HS_STD_V_SE (the LP-11 fall edge has high rstd so we skip it).
|
||||||
|
min_lp_run = max(5, int(LP_LOW_MIN_NS / dt_ns))
|
||||||
|
lp_low_start = None
|
||||||
|
run = 0
|
||||||
|
for i in range(lp11_end_idx, search_end):
|
||||||
|
if v_dat[i] < LP_LOW_V_SE and rstd[i] < HS_STD_V_SE:
|
||||||
|
run += 1
|
||||||
|
if run >= min_lp_run:
|
||||||
|
lp_low_start = i - run + 1
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
run = 0
|
||||||
|
if lp_low_start is None:
|
||||||
|
return min(lp11_end_idx + max(1, int(50.0 / dt_ns)), N - 1)
|
||||||
|
|
||||||
|
# Find LP-low plateau end: first rstd > HS_STD_V_SE after the plateau begins.
|
||||||
|
# This is where the first '1' bit in 0xB8 creates a large voltage transition.
|
||||||
|
lp_low_end = None
|
||||||
|
for i in range(lp_low_start, search_end):
|
||||||
|
if rstd[i] > HS_STD_V_SE:
|
||||||
|
lp_low_end = i
|
||||||
|
break
|
||||||
|
if lp_low_end is None:
|
||||||
|
return min(lp_low_start + max(1, int(50.0 / dt_ns)), N - 1)
|
||||||
|
|
||||||
|
# Start decode LP_MARGIN_NS before the first '1' bit of 0xB8 so the 8-phase
|
||||||
|
# search sees the complete sync byte near byte 0.
|
||||||
|
margin = max(1, int(LP_MARGIN_NS / dt_ns))
|
||||||
|
return max(lp_low_start, lp_low_end - margin)
|
||||||
|
|
||||||
|
# --- Differential LP-triggered path ---
|
||||||
# LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns)
|
# LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns)
|
||||||
LP01_THRESH = -0.5
|
LP01_THRESH = -0.5
|
||||||
min_lp01 = max(2, int(20.0 / dt_ns))
|
min_lp01 = max(2, int(20.0 / dt_ns))
|
||||||
@@ -138,7 +235,6 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
|
|||||||
run = 0
|
run = 0
|
||||||
|
|
||||||
if lp01_end is not None:
|
if lp01_end is not None:
|
||||||
# Skip 200 ns past LP-01 end to clear LP-00, then hand off to bit decoder
|
|
||||||
skip = max(1, int(200.0 / dt_ns))
|
skip = max(1, int(200.0 / dt_ns))
|
||||||
return min(lp01_end + skip, N - 1)
|
return min(lp01_end + skip, N - 1)
|
||||||
|
|
||||||
@@ -182,17 +278,25 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0):
|
|||||||
# Bit decoding
|
# Bit decoding
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx):
|
def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx,
|
||||||
|
dat_thresh=None, clk_thresh=None):
|
||||||
"""
|
"""
|
||||||
Sample DAT on every CLK edge (DDR) after hs_start_idx.
|
Sample DAT on every CLK edge (DDR) after hs_start_idx.
|
||||||
|
|
||||||
|
dat_thresh: voltage threshold for bit decisions on DAT (default: DAT_THRESH_V).
|
||||||
|
clk_thresh: voltage threshold for CLK edge detection (default: 0.0).
|
||||||
Returns list of (time_ns, bit) tuples.
|
Returns list of (time_ns, bit) tuples.
|
||||||
"""
|
"""
|
||||||
|
if dat_thresh is None:
|
||||||
|
dat_thresh = DAT_THRESH_V
|
||||||
|
if clk_thresh is None:
|
||||||
|
clk_thresh = 0.0
|
||||||
|
|
||||||
t_hs = t_dat[hs_start_idx]
|
t_hs = t_dat[hs_start_idx]
|
||||||
|
|
||||||
rising, falling = find_clock_edges(t_clk, v_clk)
|
rising, falling = find_clock_edges(t_clk, v_clk, threshold=clk_thresh)
|
||||||
all_edges = np.sort(np.concatenate([rising, falling]))
|
all_edges = np.sort(np.concatenate([rising, falling]))
|
||||||
|
|
||||||
# Only edges after HS start
|
|
||||||
hs_mask = t_clk[all_edges] >= t_hs
|
hs_mask = t_clk[all_edges] >= t_hs
|
||||||
hs_edges = all_edges[hs_mask]
|
hs_edges = all_edges[hs_mask]
|
||||||
|
|
||||||
@@ -204,10 +308,9 @@ def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx):
|
|||||||
bits = []
|
bits = []
|
||||||
for edge_idx in hs_edges:
|
for edge_idx in hs_edges:
|
||||||
t_edge = t_clk[edge_idx]
|
t_edge = t_clk[edge_idx]
|
||||||
# Find nearest sample in DAT trace
|
|
||||||
dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9)))
|
dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9)))
|
||||||
dat_idx = max(0, min(dat_idx, len(v_dat) - 1))
|
dat_idx = max(0, min(dat_idx, len(v_dat) - 1))
|
||||||
bit = 1 if v_dat[dat_idx] > DAT_THRESH_V else 0
|
bit = 1 if v_dat[dat_idx] > dat_thresh else 0
|
||||||
bits.append((t_edge * 1e9, bit))
|
bits.append((t_edge * 1e9, bit))
|
||||||
|
|
||||||
return bits
|
return bits
|
||||||
@@ -326,21 +429,37 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
|||||||
print(" ERROR: Too few bits decoded")
|
print(" ERROR: Too few bits decoded")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges.
|
# Try all 8 bit-phase offsets. Pass 1: find earliest 0xB8 whose next byte has
|
||||||
# LP-00 CLK edges before HS starts produce garbage bits; the correct phase is
|
# VC=0 and a known DSI DT (validated sync). Pass 2 fallback: earliest bare 0xB8.
|
||||||
# the one where 0xB8 appears earliest in the byte stream.
|
raw_bytes = None
|
||||||
raw_bytes = None
|
sync_idx = None
|
||||||
sync_idx = None
|
|
||||||
best_phase = 0
|
best_phase = 0
|
||||||
best_sync = len(bits) # sentinel: "not found"
|
best_sync = len(bits)
|
||||||
|
validated = False
|
||||||
|
|
||||||
for phase in range(8):
|
for phase in range(8):
|
||||||
rb = bits_to_bytes(bits[phase:])
|
rb = bits_to_bytes(bits[phase:])
|
||||||
si = find_sync_byte(rb)
|
for i in range(len(rb) - 1):
|
||||||
if si is not None and si < best_sync:
|
if rb[i][1] == HS_SYNC_BYTE:
|
||||||
best_sync = si
|
next_byte = rb[i + 1][1]
|
||||||
best_phase = phase
|
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
|
||||||
raw_bytes = rb
|
if i < best_sync:
|
||||||
sync_idx = si
|
best_sync = i
|
||||||
|
best_phase = phase
|
||||||
|
raw_bytes = rb
|
||||||
|
sync_idx = i
|
||||||
|
validated = True
|
||||||
|
break # stop at first validated pair for this phase
|
||||||
|
|
||||||
|
if not validated:
|
||||||
|
for phase in range(8):
|
||||||
|
rb = bits_to_bytes(bits[phase:])
|
||||||
|
si = find_sync_byte(rb)
|
||||||
|
if si is not None and si < best_sync:
|
||||||
|
best_sync = si
|
||||||
|
best_phase = phase
|
||||||
|
raw_bytes = rb
|
||||||
|
sync_idx = si
|
||||||
|
|
||||||
if raw_bytes is None:
|
if raw_bytes is None:
|
||||||
raw_bytes = bits_to_bytes(bits)
|
raw_bytes = bits_to_bytes(bits)
|
||||||
@@ -352,7 +471,8 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
|||||||
else:
|
else:
|
||||||
if verbose:
|
if verbose:
|
||||||
t_sync = raw_bytes[sync_idx][0]
|
t_sync = raw_bytes[sync_idx][0]
|
||||||
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})")
|
qual = "validated" if validated else "bare"
|
||||||
|
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
|
||||||
|
|
||||||
# Data bytes after sync
|
# Data bytes after sync
|
||||||
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself
|
data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself
|
||||||
@@ -388,6 +508,18 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
|||||||
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
|
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
|
||||||
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
|
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
|
||||||
|
|
||||||
|
# Static-pink pixel content check
|
||||||
|
if n_payload >= 12:
|
||||||
|
cc = check_pixel_content(lane0_payload)
|
||||||
|
match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes "
|
||||||
|
f"match static-pink pattern")
|
||||||
|
if cc["first_mismatch"]:
|
||||||
|
mm = cc["first_mismatch"]
|
||||||
|
match_str += (f" (first diff at offset {mm[0]}: "
|
||||||
|
f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})")
|
||||||
|
print(f"\n Static-pink check : {match_str}")
|
||||||
|
|
||||||
|
pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None
|
||||||
return {
|
return {
|
||||||
"cap_num" : cap_num,
|
"cap_num" : cap_num,
|
||||||
"hs_start_ns" : t_hs_start_ns,
|
"hs_start_ns" : t_hs_start_ns,
|
||||||
@@ -397,6 +529,164 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
|||||||
"sync_idx" : sync_idx,
|
"sync_idx" : sync_idx,
|
||||||
"header" : header,
|
"header" : header,
|
||||||
"lane0_payload" : lane0_payload,
|
"lane0_payload" : lane0_payload,
|
||||||
|
"pixel_check" : pixel_check,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# LP single-ended decode
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True):
|
||||||
|
"""
|
||||||
|
Full decode of an LP capture (CH1=CLK+, CH3=DAT0+) using single-ended thresholds.
|
||||||
|
|
||||||
|
LP files are captured at 10 GSa/s (100 ps/sample, ~23 samples/bit at 432 Mbps) —
|
||||||
|
sufficient resolution to decode the HS bit stream without a separate proto pass.
|
||||||
|
Returns a dict with the same structure as decode_capture().
|
||||||
|
"""
|
||||||
|
clk_path, dat_path = find_lp_files(cap_num, data_dir)
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"Cap {cap_num:04d}: {dat_path.name} [LP single-ended]")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
t_clk, v_clk = load_csv(clk_path)
|
||||||
|
t_dat, v_dat = load_csv(dat_path)
|
||||||
|
dt_ns = float(np.median(np.diff(t_dat))) * 1e9
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs "
|
||||||
|
f"({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)")
|
||||||
|
|
||||||
|
hs_start_idx = find_hs_start(t_dat, v_dat, t_clk, single_ended=True)
|
||||||
|
if hs_start_idx is None:
|
||||||
|
if verbose:
|
||||||
|
print(" ERROR: Could not find HS burst start")
|
||||||
|
return None
|
||||||
|
|
||||||
|
t_hs_start_ns = t_dat[hs_start_idx] * 1e9
|
||||||
|
t_hs_end_ns = t_dat[-1] * 1e9
|
||||||
|
hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f" HS burst start: {t_hs_start_ns:.0f} ns "
|
||||||
|
f"({hs_duration_us:.1f} µs available of ~18 µs full burst)")
|
||||||
|
|
||||||
|
# Auto-detect HS common mode from the first 200 ns of the HS burst.
|
||||||
|
# CLK+ common mode (~217 mV) and DAT+ common mode (~104 mV on this board) differ;
|
||||||
|
# hard-coding one value for DAT+ breaks the decode. The median of the HS burst
|
||||||
|
# gives the correct bit threshold for any board without manual calibration.
|
||||||
|
hs_probe_end = min(hs_start_idx + max(1, int(200.0 / dt_ns)), len(v_dat))
|
||||||
|
dat_common_mode = float(np.median(v_dat[hs_start_idx:hs_probe_end]))
|
||||||
|
dat_common_mode = max(0.030, min(0.250, dat_common_mode)) # clamp to 30–250 mV
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f" DAT+ HS common mode: {dat_common_mode*1000:.0f} mV (auto-detected, used as bit threshold)")
|
||||||
|
|
||||||
|
bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx,
|
||||||
|
dat_thresh=dat_common_mode, clk_thresh=LP_SE_CLK_THRESH_V)
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)")
|
||||||
|
|
||||||
|
if len(bits) < 16:
|
||||||
|
if verbose:
|
||||||
|
print(" ERROR: Too few bits decoded")
|
||||||
|
return None
|
||||||
|
|
||||||
|
raw_bytes = None
|
||||||
|
sync_idx = None
|
||||||
|
best_phase = 0
|
||||||
|
best_sync = len(bits)
|
||||||
|
validated = False
|
||||||
|
|
||||||
|
for phase in range(8):
|
||||||
|
rb = bits_to_bytes(bits[phase:])
|
||||||
|
for i in range(len(rb) - 1):
|
||||||
|
if rb[i][1] == HS_SYNC_BYTE:
|
||||||
|
next_byte = rb[i + 1][1]
|
||||||
|
if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT:
|
||||||
|
if i < best_sync:
|
||||||
|
best_sync = i
|
||||||
|
best_phase = phase
|
||||||
|
raw_bytes = rb
|
||||||
|
sync_idx = i
|
||||||
|
validated = True
|
||||||
|
break # stop at first validated pair for this phase
|
||||||
|
|
||||||
|
if not validated:
|
||||||
|
for phase in range(8):
|
||||||
|
rb = bits_to_bytes(bits[phase:])
|
||||||
|
si = find_sync_byte(rb)
|
||||||
|
if si is not None and si < best_sync:
|
||||||
|
best_sync = si
|
||||||
|
best_phase = phase
|
||||||
|
raw_bytes = rb
|
||||||
|
sync_idx = si
|
||||||
|
|
||||||
|
if raw_bytes is None:
|
||||||
|
raw_bytes = bits_to_bytes(bits)
|
||||||
|
|
||||||
|
if sync_idx is None:
|
||||||
|
if verbose:
|
||||||
|
print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0")
|
||||||
|
sync_idx = 0
|
||||||
|
else:
|
||||||
|
if verbose:
|
||||||
|
t_sync = raw_bytes[sync_idx][0]
|
||||||
|
qual = "validated" if validated else "bare"
|
||||||
|
print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})")
|
||||||
|
|
||||||
|
data_bytes = raw_bytes[sync_idx + 1:]
|
||||||
|
header = parse_long_packet_header([b for _, b in data_bytes[:8]])
|
||||||
|
|
||||||
|
if verbose and header:
|
||||||
|
print(f"\n DSI Header (lane 0):")
|
||||||
|
print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})")
|
||||||
|
|
||||||
|
lane0_payload = [b for _, b in data_bytes[1:]]
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
n_payload = len(lane0_payload)
|
||||||
|
n_pixels_partial = n_payload * N_LANES // (BPP // 8)
|
||||||
|
print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)")
|
||||||
|
|
||||||
|
if n_payload >= 16:
|
||||||
|
hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64])
|
||||||
|
print(f" First 64 payload bytes: {hex_str}")
|
||||||
|
if n_payload > 64:
|
||||||
|
print(f" ...")
|
||||||
|
|
||||||
|
nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None)
|
||||||
|
if nonzero_idx is None:
|
||||||
|
print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)")
|
||||||
|
else:
|
||||||
|
print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})")
|
||||||
|
print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}")
|
||||||
|
|
||||||
|
if n_payload >= 12:
|
||||||
|
cc = check_pixel_content(lane0_payload)
|
||||||
|
match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes "
|
||||||
|
f"match static-pink pattern")
|
||||||
|
if cc["first_mismatch"]:
|
||||||
|
mm = cc["first_mismatch"]
|
||||||
|
match_str += (f" (first diff at offset {mm[0]}: "
|
||||||
|
f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})")
|
||||||
|
print(f"\n Static-pink check : {match_str}")
|
||||||
|
|
||||||
|
pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None
|
||||||
|
return {
|
||||||
|
"cap_num" : cap_num,
|
||||||
|
"hs_start_ns" : t_hs_start_ns,
|
||||||
|
"hs_duration_us" : hs_duration_us,
|
||||||
|
"n_bits" : len(bits),
|
||||||
|
"n_bytes" : len(raw_bytes),
|
||||||
|
"sync_idx" : sync_idx,
|
||||||
|
"header" : header,
|
||||||
|
"lane0_payload" : lane0_payload,
|
||||||
|
"pixel_check" : pixel_check,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -450,32 +740,175 @@ def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128)
|
|||||||
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
|
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
|
||||||
|
|
||||||
|
|
||||||
|
def compare_lp_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128):
|
||||||
|
"""
|
||||||
|
Decode both LP captures and report byte-level differences in the first n_bytes.
|
||||||
|
"""
|
||||||
|
print(f"\nComparing LP cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)")
|
||||||
|
|
||||||
|
res_a = decode_lp_capture(cap_a, data_dir, verbose=False)
|
||||||
|
res_b = decode_lp_capture(cap_b, data_dir, verbose=False)
|
||||||
|
|
||||||
|
if res_a is None or res_b is None:
|
||||||
|
print(" ERROR: Could not decode one or both LP captures")
|
||||||
|
return
|
||||||
|
|
||||||
|
pa = res_a["lane0_payload"][:n_bytes]
|
||||||
|
pb = res_b["lane0_payload"][:n_bytes]
|
||||||
|
|
||||||
|
n_compare = min(len(pa), len(pb), n_bytes)
|
||||||
|
diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]]
|
||||||
|
|
||||||
|
print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns")
|
||||||
|
print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns")
|
||||||
|
|
||||||
|
if not diffs:
|
||||||
|
print(f"\n No differences in first {n_compare} bytes — data content matches.")
|
||||||
|
else:
|
||||||
|
print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:")
|
||||||
|
print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}")
|
||||||
|
for offset, ba, bb in diffs[:40]:
|
||||||
|
pixel_group = offset * N_LANES // (BPP // 8)
|
||||||
|
print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})")
|
||||||
|
if len(diffs) > 40:
|
||||||
|
print(f" ... ({len(diffs) - 40} more)")
|
||||||
|
|
||||||
|
if len(pa) > 8 and len(pb) > 8:
|
||||||
|
pa_arr = np.array(pa[:n_compare], dtype=np.uint8)
|
||||||
|
pb_arr = np.array(pb[:n_compare], dtype=np.uint8)
|
||||||
|
xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(),
|
||||||
|
pb_arr.astype(float) - pb_arr.mean(), mode="full")
|
||||||
|
lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1)
|
||||||
|
if lag != 0 and abs(lag) < n_compare // 2:
|
||||||
|
print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures")
|
||||||
|
else:
|
||||||
|
print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Pixel content verification and anomaly analysis
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def check_pixel_content(lane0_payload: list, n_check: int = 60) -> dict:
|
||||||
|
"""
|
||||||
|
Verify the first n_check Lane 0 payload bytes against the expected static-pink
|
||||||
|
pattern STATIC_PINK_LANE0. Returns a dict:
|
||||||
|
match_pct — percentage of bytes matching expected pattern
|
||||||
|
n_mismatches — number of mismatching bytes in the checked window
|
||||||
|
first_mismatch — (offset, expected_byte, actual_byte) or None
|
||||||
|
n_checked — number of bytes examined
|
||||||
|
"""
|
||||||
|
check = lane0_payload[:n_check]
|
||||||
|
if not check:
|
||||||
|
return {"match_pct": None, "n_mismatches": 0,
|
||||||
|
"first_mismatch": None, "n_checked": 0}
|
||||||
|
mismatches = [
|
||||||
|
(i, STATIC_PINK_LANE0[i % 3], actual)
|
||||||
|
for i, actual in enumerate(check)
|
||||||
|
if actual != STATIC_PINK_LANE0[i % 3]
|
||||||
|
]
|
||||||
|
return {
|
||||||
|
"match_pct": round((1 - len(mismatches) / len(check)) * 100, 1),
|
||||||
|
"n_mismatches": len(mismatches),
|
||||||
|
"first_mismatch": mismatches[0] if mismatches else None,
|
||||||
|
"n_checked": len(check),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def analyse_for_anomalies(result: dict | None) -> dict:
|
||||||
|
"""
|
||||||
|
Summarise bit-level anomalies from a decode_capture() result.
|
||||||
|
Returns {"anomalous": bool, "flags": list[str]}.
|
||||||
|
|
||||||
|
Checks:
|
||||||
|
sync_byte_not_found — 0xB8 not found in any of 8 bit phases →
|
||||||
|
HS burst may not have started properly
|
||||||
|
sync_byte_late — 0xB8 found but at byte index > 5 →
|
||||||
|
garbage precedes sync → possible byte misalignment
|
||||||
|
unexpected_packet_type — DI data-type not in the expected set
|
||||||
|
pixel_content_mismatch — Lane 0 payload < 90 % match to static-pink pattern
|
||||||
|
"""
|
||||||
|
if result is None:
|
||||||
|
return {"anomalous": True, "flags": ["decode_failed"]}
|
||||||
|
|
||||||
|
flags = []
|
||||||
|
|
||||||
|
sync_idx = result.get("sync_idx")
|
||||||
|
if sync_idx is None:
|
||||||
|
flags.append("sync_byte_not_found — HS burst may not have started")
|
||||||
|
elif sync_idx > 5:
|
||||||
|
flags.append(
|
||||||
|
f"sync_byte_late (found at byte {sync_idx}, expected ≤ 5) — "
|
||||||
|
f"possible byte misalignment"
|
||||||
|
)
|
||||||
|
|
||||||
|
header = result.get("header")
|
||||||
|
if header:
|
||||||
|
dt = header.get("DT", -1)
|
||||||
|
known = {DSI_DT_RGB888, 0x39, DSI_DT_HSYNC, DSI_DT_VSYNC,
|
||||||
|
0x31, 0x11, 0x29, 0x08, 0x09, 0x19}
|
||||||
|
if dt not in known:
|
||||||
|
flags.append(f"unexpected_packet_type DT=0x{dt:02X}")
|
||||||
|
|
||||||
|
payload = result.get("lane0_payload", [])
|
||||||
|
if len(payload) >= 12:
|
||||||
|
cc = check_pixel_content(payload)
|
||||||
|
if cc["match_pct"] is not None and cc["match_pct"] < 90.0:
|
||||||
|
mm = cc["first_mismatch"]
|
||||||
|
detail = (
|
||||||
|
f"first diff at byte {mm[0]}: got 0x{mm[2]:02X} expected 0x{mm[1]:02X}"
|
||||||
|
if mm else ""
|
||||||
|
)
|
||||||
|
flags.append(
|
||||||
|
f"pixel_content_mismatch "
|
||||||
|
f"({cc['match_pct']:.0f}% of {cc['n_checked']} bytes match; {detail})"
|
||||||
|
)
|
||||||
|
|
||||||
|
return {"anomalous": bool(flags), "flags": flags}
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# CLI
|
# CLI
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="Decode DSI packet content from proto captures")
|
parser = argparse.ArgumentParser(description="Decode DSI packet content from proto or LP captures")
|
||||||
parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)")
|
parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)")
|
||||||
parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory")
|
parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory")
|
||||||
parser.add_argument("--compare", type=int, default=None,
|
parser.add_argument("--compare", type=int, default=None,
|
||||||
metavar="CAP_B",
|
metavar="CAP_B",
|
||||||
help="Compare --cap against CAP_B byte-by-byte")
|
help="Compare --cap against CAP_B byte-by-byte")
|
||||||
parser.add_argument("--list" , action="store_true", help="List available proto captures")
|
parser.add_argument("--lp" , action="store_true",
|
||||||
|
help="Decode from LP single-ended files instead of proto differential files")
|
||||||
|
parser.add_argument("--list" , action="store_true", help="List available captures")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
data_dir = Path(args.dir)
|
data_dir = Path(args.dir)
|
||||||
|
|
||||||
if args.list:
|
if args.list:
|
||||||
files = sorted(data_dir.glob("*_proto_*_dat.csv"))
|
proto_files = sorted(data_dir.glob("*_proto_*_dat.csv"))
|
||||||
caps = sorted({int(f.stem.split("_")[-2]) for f in files})
|
proto_caps = sorted({int(f.stem.split("_")[-2]) for f in proto_files})
|
||||||
print(f"Available proto captures: {caps}")
|
lp_files = sorted(data_dir.glob("*_lp_*_dat.csv"))
|
||||||
|
lp_caps = sorted({int(f.stem.split("_")[-2]) for f in lp_files})
|
||||||
|
print(f"Available proto captures: {proto_caps}")
|
||||||
|
print(f"Available LP captures: {lp_caps}")
|
||||||
return
|
return
|
||||||
|
|
||||||
if args.compare is not None:
|
if args.compare is not None:
|
||||||
compare_captures(args.cap, args.compare, data_dir)
|
if args.lp:
|
||||||
|
compare_lp_captures(args.cap, args.compare, data_dir)
|
||||||
|
else:
|
||||||
|
compare_captures(args.cap, args.compare, data_dir)
|
||||||
else:
|
else:
|
||||||
decode_capture(args.cap, data_dir, verbose=True)
|
if args.lp:
|
||||||
|
result = decode_lp_capture(args.cap, data_dir, verbose=True)
|
||||||
|
else:
|
||||||
|
result = decode_capture(args.cap, data_dir, verbose=True)
|
||||||
|
anomaly = analyse_for_anomalies(result)
|
||||||
|
if anomaly["anomalous"]:
|
||||||
|
print(f"\n*** BIT-LEVEL ANOMALIES: {', '.join(anomaly['flags'])} ***")
|
||||||
|
else:
|
||||||
|
print(f"\nNo bit-level anomalies detected (sync, packet type, pixel content all OK)")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user