232 lines
8.0 KiB
Python
232 lines
8.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
explode_h5.py — split a Keysight segmented H5 file into per-segment CSVs.
|
|
|
|
When the scope is in segmented memory mode, a single :DISK:SAVE:WAVeform
|
|
call dumps all N segments into one .h5 file (much faster than saving N CSVs
|
|
sequentially). This script splits that file back into individual CSVs whose
|
|
names match the lp_ pattern that csv_preprocessor.analyze_lp_file() expects:
|
|
|
|
{ts}_lp_{cap_id}_seg{NNN}_{clk|dat}.csv
|
|
|
|
Usage:
|
|
python3 explode_h5.py <file.h5> [<file.h5> ...]
|
|
|
|
Or import explode() from this module.
|
|
|
|
Notes on Keysight Infiniium H5 layout:
|
|
The format used by :DISK:SAVE:WAVeform ... ,H5 nests waveform datasets
|
|
inside a "Waveforms"/"Channel N" group, with attributes XInc, XOrg,
|
|
YInc, YOrg, NumSegments, NumPoints, etc. We probe the structure
|
|
dynamically because slight variations exist between firmware versions.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import h5py
|
|
import numpy as np
|
|
|
|
LP_NAME_RE = re.compile(
|
|
r"(?P<ts>\d{8}_\d{6})_lp_(?P<id>c\d+_\d+|\d+)_(?P<chan>clk|dat)\.h5",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _walk(grp, depth: int = 0, max_depth: int = 4) -> list[tuple[str, h5py.Group]]:
|
|
"""Return all groups under `grp` up to max_depth, with their full paths."""
|
|
out = [(grp.name, grp)]
|
|
if depth >= max_depth:
|
|
return out
|
|
if isinstance(grp, h5py.Group):
|
|
for k in grp.keys():
|
|
try:
|
|
child = grp[k]
|
|
except Exception:
|
|
continue
|
|
if isinstance(child, h5py.Group):
|
|
out.extend(_walk(child, depth + 1, max_depth))
|
|
return out
|
|
|
|
|
|
def _find_segments(h5_root) -> tuple[h5py.Group, list[str], dict]:
|
|
"""
|
|
Locate the group that contains per-segment waveform datasets.
|
|
|
|
Returns (group, sorted_dataset_keys, attrs_dict). The attrs dict merges
|
|
attributes from the root, parent, and target group so we can find
|
|
XInc / XOrg / YInc / YOrg wherever Keysight chose to put them.
|
|
"""
|
|
groups = _walk(h5_root)
|
|
|
|
# Score each group by how many child *datasets* it has (segments are
|
|
# typically datasets named "Waveform 1", "Waveform 2", ... or
|
|
# "Channel 1", or just "1", "2", ...).
|
|
best = None
|
|
best_count = 0
|
|
for path, grp in groups:
|
|
if not isinstance(grp, h5py.Group):
|
|
continue
|
|
ds_keys = [k for k in grp.keys() if isinstance(grp[k], h5py.Dataset)]
|
|
# Filter: only datasets whose shape looks like a 1-D voltage trace
|
|
ds_keys = [
|
|
k for k in ds_keys
|
|
if grp[k].ndim == 1 and grp[k].size > 100
|
|
]
|
|
if len(ds_keys) > best_count:
|
|
best_count = len(ds_keys)
|
|
best = (grp, ds_keys)
|
|
|
|
if best is None or best_count == 0:
|
|
# 2-D dataset case: a single dataset of shape (N_segments, N_points)
|
|
for path, grp in groups:
|
|
for k in grp.keys() if isinstance(grp, h5py.Group) else []:
|
|
ds = grp[k]
|
|
if isinstance(ds, h5py.Dataset) and ds.ndim == 2 and ds.shape[0] > 1 and ds.shape[1] > 100:
|
|
return grp, [k], _collect_attrs(h5_root, grp, ds)
|
|
raise ValueError("No segment datasets found in H5")
|
|
|
|
grp, ds_keys = best
|
|
# Numerical sort if keys end with digits
|
|
ds_keys.sort(key=lambda s: (
|
|
int(re.search(r"\d+", s).group()) if re.search(r"\d+", s) else 0
|
|
))
|
|
return grp, ds_keys, _collect_attrs(h5_root, grp)
|
|
|
|
|
|
def _collect_attrs(*scopes) -> dict:
|
|
"""Merge attrs from multiple HDF5 nodes (later overrides earlier)."""
|
|
out = {}
|
|
for s in scopes:
|
|
try:
|
|
out.update({k: s.attrs[k] for k in s.attrs})
|
|
except Exception:
|
|
pass
|
|
return out
|
|
|
|
|
|
def _attr(attrs: dict, *names, default=None):
|
|
"""Return the first attribute that exists from a list of candidate names."""
|
|
for n in names:
|
|
if n in attrs:
|
|
v = attrs[n]
|
|
try:
|
|
# numpy scalar/bytes to native python
|
|
if isinstance(v, (bytes, bytearray)):
|
|
v = v.decode(errors="ignore")
|
|
if hasattr(v, "item") and getattr(v, "size", 1) == 1:
|
|
v = v.item()
|
|
except Exception:
|
|
pass
|
|
return v
|
|
return default
|
|
|
|
|
|
def explode(h5_path: Path, out_dir: Path | None = None,
|
|
verbose: bool = False) -> list[Path]:
|
|
"""
|
|
Split `h5_path` into per-segment CSVs.
|
|
|
|
Returns the list of CSV paths written. CSVs are placed in `out_dir`
|
|
(default: same dir as h5_path).
|
|
"""
|
|
h5_path = Path(h5_path)
|
|
out_dir = Path(out_dir) if out_dir else h5_path.parent
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
name_match = LP_NAME_RE.match(h5_path.name)
|
|
if not name_match:
|
|
raise ValueError(f"Not an LP H5 filename: {h5_path.name}")
|
|
ts = name_match["ts"]
|
|
cap_id = name_match["id"]
|
|
chan = name_match["chan"]
|
|
|
|
csvs: list[Path] = []
|
|
with h5py.File(h5_path, "r") as f:
|
|
grp, ds_keys, attrs = _find_segments(f)
|
|
x_inc = float(_attr(attrs, "XInc", "XIncrement", "x_increment", default=1e-10))
|
|
x_org = float(_attr(attrs, "XOrg", "XOrigin", "x_origin", default=0.0))
|
|
y_inc = _attr(attrs, "YInc", "YIncrement", "y_increment", default=None)
|
|
y_org = _attr(attrs, "YOrg", "YOrigin", "y_origin", default=None)
|
|
|
|
if verbose:
|
|
print(f" group: {grp.name} segments: {len(ds_keys)} "
|
|
f"XInc={x_inc:.3e} XOrg={x_org:.3e} YInc={y_inc} YOrg={y_org}")
|
|
|
|
# Single 2-D dataset case: shape (N_segments, N_points)
|
|
if len(ds_keys) == 1 and grp[ds_keys[0]].ndim == 2:
|
|
ds = grp[ds_keys[0]][:]
|
|
for i in range(ds.shape[0]):
|
|
volts = np.asarray(ds[i], dtype=float)
|
|
if y_inc is not None and y_org is not None:
|
|
volts = volts * float(y_inc) + float(y_org)
|
|
csvs.append(_write_segment_csv(
|
|
out_dir, ts, cap_id, chan, i + 1, x_inc, x_org, volts,
|
|
))
|
|
return csvs
|
|
|
|
# Multi-dataset case: each dataset is one segment
|
|
for i, key in enumerate(ds_keys, start=1):
|
|
volts = np.asarray(grp[key][:], dtype=float)
|
|
if y_inc is not None and y_org is not None:
|
|
# Some Keysight files store raw codes that need scaling
|
|
if np.issubdtype(grp[key].dtype, np.integer):
|
|
volts = volts * float(y_inc) + float(y_org)
|
|
csvs.append(_write_segment_csv(
|
|
out_dir, ts, cap_id, chan, i, x_inc, x_org, volts,
|
|
))
|
|
return csvs
|
|
|
|
|
|
def _write_segment_csv(out_dir: Path, ts: str, cap_id: str, chan: str,
|
|
seg_idx: int, x_inc: float, x_org: float,
|
|
volts: np.ndarray) -> Path:
|
|
n = len(volts)
|
|
times = np.arange(n) * x_inc + x_org
|
|
csv_path = out_dir / f"{ts}_lp_{cap_id}_seg{seg_idx:03d}_{chan}.csv"
|
|
np.savetxt(
|
|
csv_path,
|
|
np.column_stack([times, volts]),
|
|
delimiter=",",
|
|
fmt="%.6e",
|
|
)
|
|
return csv_path
|
|
|
|
|
|
def inspect(h5_path: Path) -> None:
|
|
"""Print the H5 hierarchy + attrs. Useful for debugging unknown files."""
|
|
with h5py.File(h5_path, "r") as f:
|
|
def visit(name, obj):
|
|
if isinstance(obj, h5py.Group):
|
|
kind = "GROUP"
|
|
shape = ""
|
|
else:
|
|
kind = "DSET"
|
|
shape = f" shape={obj.shape} dtype={obj.dtype}"
|
|
print(f" {kind} /{name}{shape}")
|
|
for k, v in obj.attrs.items():
|
|
vs = str(v)[:60]
|
|
print(f" attr {k} = {vs}")
|
|
f.visititems(visit)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = sys.argv[1:]
|
|
if not args:
|
|
print(__doc__)
|
|
sys.exit(0)
|
|
if args[0] == "--inspect":
|
|
for p in args[1:]:
|
|
print(f"\n=== {p} ===")
|
|
inspect(Path(p))
|
|
sys.exit(0)
|
|
for p in args:
|
|
try:
|
|
outs = explode(Path(p), verbose=True)
|
|
print(f"{Path(p).name}: {len(outs)} segment(s) → CSVs")
|
|
except Exception as e:
|
|
print(f"{Path(p).name}: ERROR — {e}")
|