Files
MiPi_TEST/explode_h5.py
david rice 75248c9574 updates
2026-05-07 12:10:02 +01:00

232 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
explode_h5.py — split a Keysight segmented H5 file into per-segment CSVs.
When the scope is in segmented memory mode, a single :DISK:SAVE:WAVeform
call dumps all N segments into one .h5 file (much faster than saving N CSVs
sequentially). This script splits that file back into individual CSVs whose
names match the lp_ pattern that csv_preprocessor.analyze_lp_file() expects:
{ts}_lp_{cap_id}_seg{NNN}_{clk|dat}.csv
Usage:
python3 explode_h5.py <file.h5> [<file.h5> ...]
Or import explode() from this module.
Notes on Keysight Infiniium H5 layout:
The format used by :DISK:SAVE:WAVeform ... ,H5 nests waveform datasets
inside a "Waveforms"/"Channel N" group, with attributes XInc, XOrg,
YInc, YOrg, NumSegments, NumPoints, etc. We probe the structure
dynamically because slight variations exist between firmware versions.
"""
from __future__ import annotations
import re
import sys
from pathlib import Path
import h5py
import numpy as np
LP_NAME_RE = re.compile(
r"(?P<ts>\d{8}_\d{6})_lp_(?P<id>c\d+_\d+|\d+)_(?P<chan>clk|dat)\.h5",
re.IGNORECASE,
)
def _walk(grp, depth: int = 0, max_depth: int = 4) -> list[tuple[str, h5py.Group]]:
"""Return all groups under `grp` up to max_depth, with their full paths."""
out = [(grp.name, grp)]
if depth >= max_depth:
return out
if isinstance(grp, h5py.Group):
for k in grp.keys():
try:
child = grp[k]
except Exception:
continue
if isinstance(child, h5py.Group):
out.extend(_walk(child, depth + 1, max_depth))
return out
def _find_segments(h5_root) -> tuple[h5py.Group, list[str], dict]:
"""
Locate the group that contains per-segment waveform datasets.
Returns (group, sorted_dataset_keys, attrs_dict). The attrs dict merges
attributes from the root, parent, and target group so we can find
XInc / XOrg / YInc / YOrg wherever Keysight chose to put them.
"""
groups = _walk(h5_root)
# Score each group by how many child *datasets* it has (segments are
# typically datasets named "Waveform 1", "Waveform 2", ... or
# "Channel 1", or just "1", "2", ...).
best = None
best_count = 0
for path, grp in groups:
if not isinstance(grp, h5py.Group):
continue
ds_keys = [k for k in grp.keys() if isinstance(grp[k], h5py.Dataset)]
# Filter: only datasets whose shape looks like a 1-D voltage trace
ds_keys = [
k for k in ds_keys
if grp[k].ndim == 1 and grp[k].size > 100
]
if len(ds_keys) > best_count:
best_count = len(ds_keys)
best = (grp, ds_keys)
if best is None or best_count == 0:
# 2-D dataset case: a single dataset of shape (N_segments, N_points)
for path, grp in groups:
for k in grp.keys() if isinstance(grp, h5py.Group) else []:
ds = grp[k]
if isinstance(ds, h5py.Dataset) and ds.ndim == 2 and ds.shape[0] > 1 and ds.shape[1] > 100:
return grp, [k], _collect_attrs(h5_root, grp, ds)
raise ValueError("No segment datasets found in H5")
grp, ds_keys = best
# Numerical sort if keys end with digits
ds_keys.sort(key=lambda s: (
int(re.search(r"\d+", s).group()) if re.search(r"\d+", s) else 0
))
return grp, ds_keys, _collect_attrs(h5_root, grp)
def _collect_attrs(*scopes) -> dict:
"""Merge attrs from multiple HDF5 nodes (later overrides earlier)."""
out = {}
for s in scopes:
try:
out.update({k: s.attrs[k] for k in s.attrs})
except Exception:
pass
return out
def _attr(attrs: dict, *names, default=None):
"""Return the first attribute that exists from a list of candidate names."""
for n in names:
if n in attrs:
v = attrs[n]
try:
# numpy scalar/bytes to native python
if isinstance(v, (bytes, bytearray)):
v = v.decode(errors="ignore")
if hasattr(v, "item") and getattr(v, "size", 1) == 1:
v = v.item()
except Exception:
pass
return v
return default
def explode(h5_path: Path, out_dir: Path | None = None,
verbose: bool = False) -> list[Path]:
"""
Split `h5_path` into per-segment CSVs.
Returns the list of CSV paths written. CSVs are placed in `out_dir`
(default: same dir as h5_path).
"""
h5_path = Path(h5_path)
out_dir = Path(out_dir) if out_dir else h5_path.parent
out_dir.mkdir(parents=True, exist_ok=True)
name_match = LP_NAME_RE.match(h5_path.name)
if not name_match:
raise ValueError(f"Not an LP H5 filename: {h5_path.name}")
ts = name_match["ts"]
cap_id = name_match["id"]
chan = name_match["chan"]
csvs: list[Path] = []
with h5py.File(h5_path, "r") as f:
grp, ds_keys, attrs = _find_segments(f)
x_inc = float(_attr(attrs, "XInc", "XIncrement", "x_increment", default=1e-10))
x_org = float(_attr(attrs, "XOrg", "XOrigin", "x_origin", default=0.0))
y_inc = _attr(attrs, "YInc", "YIncrement", "y_increment", default=None)
y_org = _attr(attrs, "YOrg", "YOrigin", "y_origin", default=None)
if verbose:
print(f" group: {grp.name} segments: {len(ds_keys)} "
f"XInc={x_inc:.3e} XOrg={x_org:.3e} YInc={y_inc} YOrg={y_org}")
# Single 2-D dataset case: shape (N_segments, N_points)
if len(ds_keys) == 1 and grp[ds_keys[0]].ndim == 2:
ds = grp[ds_keys[0]][:]
for i in range(ds.shape[0]):
volts = np.asarray(ds[i], dtype=float)
if y_inc is not None and y_org is not None:
volts = volts * float(y_inc) + float(y_org)
csvs.append(_write_segment_csv(
out_dir, ts, cap_id, chan, i + 1, x_inc, x_org, volts,
))
return csvs
# Multi-dataset case: each dataset is one segment
for i, key in enumerate(ds_keys, start=1):
volts = np.asarray(grp[key][:], dtype=float)
if y_inc is not None and y_org is not None:
# Some Keysight files store raw codes that need scaling
if np.issubdtype(grp[key].dtype, np.integer):
volts = volts * float(y_inc) + float(y_org)
csvs.append(_write_segment_csv(
out_dir, ts, cap_id, chan, i, x_inc, x_org, volts,
))
return csvs
def _write_segment_csv(out_dir: Path, ts: str, cap_id: str, chan: str,
seg_idx: int, x_inc: float, x_org: float,
volts: np.ndarray) -> Path:
n = len(volts)
times = np.arange(n) * x_inc + x_org
csv_path = out_dir / f"{ts}_lp_{cap_id}_seg{seg_idx:03d}_{chan}.csv"
np.savetxt(
csv_path,
np.column_stack([times, volts]),
delimiter=",",
fmt="%.6e",
)
return csv_path
def inspect(h5_path: Path) -> None:
"""Print the H5 hierarchy + attrs. Useful for debugging unknown files."""
with h5py.File(h5_path, "r") as f:
def visit(name, obj):
if isinstance(obj, h5py.Group):
kind = "GROUP"
shape = ""
else:
kind = "DSET"
shape = f" shape={obj.shape} dtype={obj.dtype}"
print(f" {kind} /{name}{shape}")
for k, v in obj.attrs.items():
vs = str(v)[:60]
print(f" attr {k} = {vs}")
f.visititems(visit)
if __name__ == "__main__":
args = sys.argv[1:]
if not args:
print(__doc__)
sys.exit(0)
if args[0] == "--inspect":
for p in args[1:]:
print(f"\n=== {p} ===")
inspect(Path(p))
sys.exit(0)
for p in args:
try:
outs = explode(Path(p), verbose=True)
print(f"{Path(p).name}: {len(outs)} segment(s) → CSVs")
except Exception as e:
print(f"{Path(p).name}: ERROR — {e}")