diff --git a/__pycache__/csv_preprocessor.cpython-312.pyc b/__pycache__/csv_preprocessor.cpython-312.pyc index 46c8935..9578d74 100644 Binary files a/__pycache__/csv_preprocessor.cpython-312.pyc and b/__pycache__/csv_preprocessor.cpython-312.pyc differ diff --git a/__pycache__/explode_h5.cpython-312.pyc b/__pycache__/explode_h5.cpython-312.pyc new file mode 100644 index 0000000..5f916bb Binary files /dev/null and b/__pycache__/explode_h5.cpython-312.pyc differ diff --git a/csv_preprocessor.py b/csv_preprocessor.py index 78f42db..e0a368d 100644 --- a/csv_preprocessor.py +++ b/csv_preprocessor.py @@ -892,12 +892,27 @@ def analyze_lp_file(path: Path) -> "LPMetrics": HS : voltage in mid-range with high oscillation (rolling std > HS_OSC_STD_V) trans : everything else (transitions between states) """ - m = re.match(r"(\d{8}_\d{6})_lp_(\d+)_(clk|dat)\.csv", path.name, re.IGNORECASE) + # Accept three filename formats: + # legacy: "_lp_0001_" + # watch: "_lp_c001_01_" + # segmented: "_lp_c001_01_seg005_" (one segment exploded from H5) + m = re.match( + r"(\d{8}_\d{6})_lp_(c\d+_\d+(?:_seg\d+)?|\d+)_(clk|dat)\.csv", + path.name, re.IGNORECASE, + ) if not m: raise ValueError(f"Filename does not match lp pattern: {path.name}") timestamp, cap_str, channel = m.groups() - capture_num = int(cap_str) + # Derive an int capture_num from whatever digits the id contains, so it + # remains sortable (e.g., c001_01_seg005 → 1*1_000_000 + 1*1_000 + 5). + digit_groups = re.findall(r"\d+", cap_str) + if len(digit_groups) == 1: + capture_num = int(digit_groups[0]) + else: + capture_num = 0 + for i, d in enumerate(reversed(digit_groups)): + capture_num += int(d) * (1000 ** i) times, volts = _read_csv(path) dt = float(np.diff(times).mean()) diff --git a/explode_h5.py b/explode_h5.py new file mode 100644 index 0000000..410ad34 --- /dev/null +++ b/explode_h5.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +explode_h5.py — split a Keysight segmented H5 file into per-segment CSVs. + +When the scope is in segmented memory mode, a single :DISK:SAVE:WAVeform +call dumps all N segments into one .h5 file (much faster than saving N CSVs +sequentially). This script splits that file back into individual CSVs whose +names match the lp_ pattern that csv_preprocessor.analyze_lp_file() expects: + + {ts}_lp_{cap_id}_seg{NNN}_{clk|dat}.csv + +Usage: + python3 explode_h5.py [ ...] + +Or import explode() from this module. + +Notes on Keysight Infiniium H5 layout: + The format used by :DISK:SAVE:WAVeform ... ,H5 nests waveform datasets + inside a "Waveforms"/"Channel N" group, with attributes XInc, XOrg, + YInc, YOrg, NumSegments, NumPoints, etc. We probe the structure + dynamically because slight variations exist between firmware versions. +""" + +from __future__ import annotations + +import re +import sys +from pathlib import Path + +import h5py +import numpy as np + +LP_NAME_RE = re.compile( + r"(?P\d{8}_\d{6})_lp_(?Pc\d+_\d+|\d+)_(?Pclk|dat)\.h5", + re.IGNORECASE, +) + + +def _walk(grp, depth: int = 0, max_depth: int = 4) -> list[tuple[str, h5py.Group]]: + """Return all groups under `grp` up to max_depth, with their full paths.""" + out = [(grp.name, grp)] + if depth >= max_depth: + return out + if isinstance(grp, h5py.Group): + for k in grp.keys(): + try: + child = grp[k] + except Exception: + continue + if isinstance(child, h5py.Group): + out.extend(_walk(child, depth + 1, max_depth)) + return out + + +def _find_segments(h5_root) -> tuple[h5py.Group, list[str], dict]: + """ + Locate the group that contains per-segment waveform datasets. + + Returns (group, sorted_dataset_keys, attrs_dict). The attrs dict merges + attributes from the root, parent, and target group so we can find + XInc / XOrg / YInc / YOrg wherever Keysight chose to put them. + """ + groups = _walk(h5_root) + + # Score each group by how many child *datasets* it has (segments are + # typically datasets named "Waveform 1", "Waveform 2", ... or + # "Channel 1", or just "1", "2", ...). + best = None + best_count = 0 + for path, grp in groups: + if not isinstance(grp, h5py.Group): + continue + ds_keys = [k for k in grp.keys() if isinstance(grp[k], h5py.Dataset)] + # Filter: only datasets whose shape looks like a 1-D voltage trace + ds_keys = [ + k for k in ds_keys + if grp[k].ndim == 1 and grp[k].size > 100 + ] + if len(ds_keys) > best_count: + best_count = len(ds_keys) + best = (grp, ds_keys) + + if best is None or best_count == 0: + # 2-D dataset case: a single dataset of shape (N_segments, N_points) + for path, grp in groups: + for k in grp.keys() if isinstance(grp, h5py.Group) else []: + ds = grp[k] + if isinstance(ds, h5py.Dataset) and ds.ndim == 2 and ds.shape[0] > 1 and ds.shape[1] > 100: + return grp, [k], _collect_attrs(h5_root, grp, ds) + raise ValueError("No segment datasets found in H5") + + grp, ds_keys = best + # Numerical sort if keys end with digits + ds_keys.sort(key=lambda s: ( + int(re.search(r"\d+", s).group()) if re.search(r"\d+", s) else 0 + )) + return grp, ds_keys, _collect_attrs(h5_root, grp) + + +def _collect_attrs(*scopes) -> dict: + """Merge attrs from multiple HDF5 nodes (later overrides earlier).""" + out = {} + for s in scopes: + try: + out.update({k: s.attrs[k] for k in s.attrs}) + except Exception: + pass + return out + + +def _attr(attrs: dict, *names, default=None): + """Return the first attribute that exists from a list of candidate names.""" + for n in names: + if n in attrs: + v = attrs[n] + try: + # numpy scalar/bytes to native python + if isinstance(v, (bytes, bytearray)): + v = v.decode(errors="ignore") + if hasattr(v, "item") and getattr(v, "size", 1) == 1: + v = v.item() + except Exception: + pass + return v + return default + + +def explode(h5_path: Path, out_dir: Path | None = None, + verbose: bool = False) -> list[Path]: + """ + Split `h5_path` into per-segment CSVs. + + Returns the list of CSV paths written. CSVs are placed in `out_dir` + (default: same dir as h5_path). + """ + h5_path = Path(h5_path) + out_dir = Path(out_dir) if out_dir else h5_path.parent + out_dir.mkdir(parents=True, exist_ok=True) + + name_match = LP_NAME_RE.match(h5_path.name) + if not name_match: + raise ValueError(f"Not an LP H5 filename: {h5_path.name}") + ts = name_match["ts"] + cap_id = name_match["id"] + chan = name_match["chan"] + + csvs: list[Path] = [] + with h5py.File(h5_path, "r") as f: + grp, ds_keys, attrs = _find_segments(f) + x_inc = float(_attr(attrs, "XInc", "XIncrement", "x_increment", default=1e-10)) + x_org = float(_attr(attrs, "XOrg", "XOrigin", "x_origin", default=0.0)) + y_inc = _attr(attrs, "YInc", "YIncrement", "y_increment", default=None) + y_org = _attr(attrs, "YOrg", "YOrigin", "y_origin", default=None) + + if verbose: + print(f" group: {grp.name} segments: {len(ds_keys)} " + f"XInc={x_inc:.3e} XOrg={x_org:.3e} YInc={y_inc} YOrg={y_org}") + + # Single 2-D dataset case: shape (N_segments, N_points) + if len(ds_keys) == 1 and grp[ds_keys[0]].ndim == 2: + ds = grp[ds_keys[0]][:] + for i in range(ds.shape[0]): + volts = np.asarray(ds[i], dtype=float) + if y_inc is not None and y_org is not None: + volts = volts * float(y_inc) + float(y_org) + csvs.append(_write_segment_csv( + out_dir, ts, cap_id, chan, i + 1, x_inc, x_org, volts, + )) + return csvs + + # Multi-dataset case: each dataset is one segment + for i, key in enumerate(ds_keys, start=1): + volts = np.asarray(grp[key][:], dtype=float) + if y_inc is not None and y_org is not None: + # Some Keysight files store raw codes that need scaling + if np.issubdtype(grp[key].dtype, np.integer): + volts = volts * float(y_inc) + float(y_org) + csvs.append(_write_segment_csv( + out_dir, ts, cap_id, chan, i, x_inc, x_org, volts, + )) + return csvs + + +def _write_segment_csv(out_dir: Path, ts: str, cap_id: str, chan: str, + seg_idx: int, x_inc: float, x_org: float, + volts: np.ndarray) -> Path: + n = len(volts) + times = np.arange(n) * x_inc + x_org + csv_path = out_dir / f"{ts}_lp_{cap_id}_seg{seg_idx:03d}_{chan}.csv" + np.savetxt( + csv_path, + np.column_stack([times, volts]), + delimiter=",", + fmt="%.6e", + ) + return csv_path + + +def inspect(h5_path: Path) -> None: + """Print the H5 hierarchy + attrs. Useful for debugging unknown files.""" + with h5py.File(h5_path, "r") as f: + def visit(name, obj): + if isinstance(obj, h5py.Group): + kind = "GROUP" + shape = "" + else: + kind = "DSET" + shape = f" shape={obj.shape} dtype={obj.dtype}" + print(f" {kind} /{name}{shape}") + for k, v in obj.attrs.items(): + vs = str(v)[:60] + print(f" attr {k} = {vs}") + f.visititems(visit) + + +if __name__ == "__main__": + args = sys.argv[1:] + if not args: + print(__doc__) + sys.exit(0) + if args[0] == "--inspect": + for p in args[1:]: + print(f"\n=== {p} ===") + inspect(Path(p)) + sys.exit(0) + for p in args: + try: + outs = explode(Path(p), verbose=True) + print(f"{Path(p).name}: {len(outs)} segment(s) → CSVs") + except Exception as e: + print(f"{Path(p).name}: ERROR — {e}") diff --git a/flicker_watch.py b/flicker_watch.py index 7f87d71..02fd1e9 100644 --- a/flicker_watch.py +++ b/flicker_watch.py @@ -49,8 +49,16 @@ LP_V_SCALE = 0.2 LP_V_OFFSET = 0.6 LP_TRIG_LEVEL = 0.6 +# Segmented memory: capture N back-to-back LP triggers per :DIGitize, then +# dump the whole acquisition as a single H5 file. Massively higher coverage +# than single-shot CSV captures. +SEGMENT_COUNT = 100 +SAVE_FORMAT = "H5" # Keysight native multi-segment format + CYCLE_S = 10.0 # seconds video is on per cycle -TRIG_TIMEOUT_S = 2.0 # per-capture trigger wait +# Filling N segments takes ~N × LP-trigger period. LP triggers fire roughly +# at line rate (≈48 kHz) so 100 segments fill in ms, but allow margin. +TRIG_TIMEOUT_S = max(SEGMENT_COUNT * 0.020 + 5.0, 10.0) # --------------------------------------------------------------------------- # Scope setup @@ -84,7 +92,7 @@ def setup_scope() -> None: def configure_for_lp() -> None: - """LP-mode: widen vertical range, falling-edge trigger on Ch3.""" + """LP-mode + segmented memory: N back-to-back LP triggers per acquisition.""" for ch in (1, 2, 3, 4): scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") @@ -95,7 +103,10 @@ def configure_for_lp() -> None: scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}") scope.write(f":ACQuire:POINts {LP_POINTS}") scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") - time.sleep(0.3) + # Segmented memory: fill N segments per :DIGitize. + scope.write(":ACQuire:MODE SEGMented") + scope.write(f":ACQuire:SEGMented:COUNt {SEGMENT_COUNT}") + time.sleep(0.5) def arm_and_wait(timeout_s: float) -> bool: @@ -128,12 +139,13 @@ def arm_and_wait(timeout_s: float) -> bool: def save_lp(base_name: str) -> None: - """Save Ch1 (CLK+) and Ch3 (DAT0+) as CSV to scope's C:\\TEMP\\.""" + """Save all N segments of Ch1 (CLK+) and Ch3 (DAT0+) as a single H5 each.""" base = f"C:\\TEMP\\{base_name}" - scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') - time.sleep(2.5) - scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') - time.sleep(2.5) + ext = SAVE_FORMAT.lower() + scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.{ext}",{SAVE_FORMAT}') + time.sleep(3.0) + scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.{ext}",{SAVE_FORMAT}') + time.sleep(3.0) # --------------------------------------------------------------------------- @@ -174,6 +186,48 @@ def video_stop() -> None: print(f" VIDEO STOP failed: {e}") +# --------------------------------------------------------------------------- +# H5 transfer (ai_mgmt only handles CSV — segmented mode produces .h5) +# --------------------------------------------------------------------------- +def _transfer_h5_files() -> int: + """SMB-pull every .h5 from the scope share into DATA_DIR; delete on scope.""" + from smb.SMBConnection import SMBConnection + import socket + conn = SMBConnection( + ai_mgmt.USERNAME, ai_mgmt.PASSWORD, + socket.gethostname(), ai_mgmt.SERVER_NAME, + use_ntlm_v2=True, is_direct_tcp=True, + ) + if not conn.connect(ai_mgmt.SERVER, 445): + print(" H5 transfer: could not connect to scope share") + return 0 + count = 0 + try: + h5_paths: list[str] = [] + def walk(path: str) -> None: + for entry in conn.listPath(ai_mgmt.SHARE, path): + if entry.filename in (".", ".."): + continue + full = f"{path}/{entry.filename}" + if entry.isDirectory: + walk(full) + elif entry.filename.lower().endswith(".h5"): + h5_paths.append(full) + walk("/") + for remote in h5_paths: + local = DATA_DIR / Path(remote).name + try: + with open(local, "wb") as fh: + conn.retrieveFile(ai_mgmt.SHARE, remote, fh) + conn.deleteFiles(ai_mgmt.SHARE, remote) + count += 1 + except Exception as e: + print(f" H5 transfer failed for {Path(remote).name}: {e}") + finally: + conn.close() + return count + + # --------------------------------------------------------------------------- # Register snapshot from device (DSIM PHY + SN65DSI83) # --------------------------------------------------------------------------- @@ -251,37 +305,90 @@ def archive_and_analyse(event: str, since_iso: str) -> None: return print(f" {copied} file(s) transferred ({failed} failed)") - # Move just-arrived CSVs out of data/ (flat) into the event folder. + # ai_mgmt only fetches CSVs. H5 (segmented) files need a separate pass. + h5_count = _transfer_h5_files() + if h5_count: + print(f" {h5_count} H5 file(s) transferred") + + # Move just-arrived files (csv + h5) out of data/ (flat) into the event folder. moved = 0 - for csv in DATA_DIR.glob("*.csv"): - if csv.is_file(): - shutil.move(str(csv), target / csv.name) + for f in list(DATA_DIR.glob("*.csv")) + list(DATA_DIR.glob("*.h5")): + if f.is_file(): + shutil.move(str(f), target / f.name) moved += 1 print(f" {moved} file(s) archived to {target.relative_to(DATA_DIR.parent)}") + # Explode each H5 into per-segment CSVs so csv_preprocessor can analyse them. + from explode_h5 import explode + h5_files = sorted(target.glob("*_lp_*.h5")) + seg_csv_count = 0 + for h5 in h5_files: + try: + csvs = explode(h5) + seg_csv_count += len(csvs) + except Exception as e: + print(f" EXPLODE error on {h5.name}: {e}") + if h5_files: + print(f" exploded {len(h5_files)} H5 file(s) → {seg_csv_count} segment CSV(s)") + if event != "flicker": return - # Analyse the LP captures we just archived. - print("\n LP analysis (csv_preprocessor):") - print(" " + "-" * 78) - print(f" {'file':<46} {'lp_low_ns':>10} {'hs_amp_mV':>10} {'flicker?':>9}") - print(" " + "-" * 78) - - lp_files = sorted(target.glob("*_lp_*_dat.csv")) - for f in lp_files: + # Analyse every segment CSV. Flag outliers. + print("\n Per-segment LP analysis:") + rows = [] + for f in sorted(target.glob("*_lp_*_dat.csv")): try: m = analyze_lp_file(f) - lp_low = getattr(m, "lp_low_duration_ns", None) - hs_amp = getattr(m, "hs_amp_mV", None) - sus = getattr(m, "flicker_suspect", False) - print(f" {f.name:<46} " - f"{(f'{lp_low:.1f}' if lp_low is not None else '?'):>10} " - f"{(f'{hs_amp:.1f}' if hs_amp is not None else '?'):>10} " - f"{('YES' if sus else 'no'):>9}") + rows.append({ + "file": f.name, + "lp_low": float(m.lp_low_duration_ns) if m.lp_low_duration_ns is not None else None, + "hs_amp": float(m.hs_amplitude_mv) if m.hs_amplitude_mv is not None else None, + "hs_dur": float(m.hs_burst_dur_ns) if m.hs_burst_dur_ns is not None else None, + "n_burst": int(m.n_hs_bursts) if m.n_hs_bursts is not None else None, + "sus": bool(m.flicker_suspect), + }) except Exception as e: - print(f" {f.name:<46} ERROR: {e}") - print(" " + "-" * 78) + rows.append({"file": f.name, "error": str(e)}) + + n_total = len(rows) + n_sus = sum(1 for r in rows if r.get("sus")) + print(f" {n_total} segments analysed ({n_sus} flagged as flicker_suspect)") + + # Outlier search across the segments themselves. + def _outliers(field: str, lo_thresh: float | None = None, + hi_thresh: float | None = None) -> list[dict]: + vals = sorted(r[field] for r in rows if r.get(field) is not None) + if not vals: + return [] + med = vals[len(vals) // 2] + out = [] + for r in rows: + v = r.get(field) + if v is None: continue + far = (lo_thresh is not None and v < lo_thresh) or \ + (hi_thresh is not None and v > hi_thresh) + if far: + out.append({"file": r["file"], field: v, "median": med}) + return out + + print("\n Anomalies vs segment-set median:") + for label, field, lo, hi in [ + ("very-short LP-low (<50 ns)", "lp_low", 50, None), + ("very-low HS amplitude (<50 mV)", "hs_amp", 50, None), + ("very-high HS amplitude (>140 mV)","hs_amp", None, 140), + ("short HS burst (<8000 ns)", "hs_dur", 8000, None), + ]: + ax = _outliers(field, lo, hi) + if ax: + print(f" {label}: {len(ax)} segment(s)") + for x in ax[:8]: + print(f" {x['file']} {field}={x[field]:.1f} " + f"(set median={x['median']:.1f})") + if len(ax) > 8: + print(f" ... +{len(ax) - 8} more") + else: + print(f" {label}: none") # --------------------------------------------------------------------------- @@ -310,7 +417,8 @@ def main() -> None: video_start() print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON " - f"({CYCLE_S:.0f}s window)", flush=True) + f"({CYCLE_S:.0f}s window, {SEGMENT_COUNT} segs/acquire)", + flush=True) event = None last_tick = 0.0 @@ -323,8 +431,8 @@ def main() -> None: try: save_lp(base) cycle_caps.append(base) - print(f" + cap {seq:02d} [{remaining():4.1f}s left]", - flush=True) + print(f" + acq {seq:02d} ({SEGMENT_COUNT} segs) " + f"[{remaining():4.1f}s left]", flush=True) except Exception as e: print(f" save error: {e}", flush=True) else: @@ -342,7 +450,8 @@ def main() -> None: video_stop() if event is None: print(f"[cycle {cycle:03d}] ended " - f"({len(cycle_caps)} cap(s), no event)", + f"({len(cycle_caps)} acq(s) ≈ " + f"{len(cycle_caps) * SEGMENT_COUNT} segments, no event)", flush=True) if event == "f":