2026-04-30 09:49:48 +01:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
"""
|
|
|
|
|
|
BoM Extractor
|
|
|
|
|
|
=============
|
|
|
|
|
|
Reads every .xlsx / .xlsm file from the BoM/ folder, extracts all
|
|
|
|
|
|
(Manufacturer, MPN) pairs from every table in every sheet, deduplicates,
|
|
|
|
|
|
and writes the result to bom_parts.xlsx.
|
|
|
|
|
|
|
|
|
|
|
|
Usage:
|
|
|
|
|
|
python bom_extract.py
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
|
import logging
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
|
import openpyxl
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
|
|
BOM_DIR = Path("BoM")
|
2026-04-30 12:39:48 +01:00
|
|
|
|
OUTPUT_DIR = Path("OUTPUT")
|
|
|
|
|
|
CHUNK_SIZE = 500
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
|
|
|
|
|
SKIP_MPNS = {
|
|
|
|
|
|
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
|
|
|
|
|
|
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
|
|
|
|
|
|
"do not populate",
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
|
level=logging.INFO,
|
|
|
|
|
|
format="%(asctime)s %(levelname)-8s %(message)s",
|
|
|
|
|
|
datefmt="%H:%M:%S",
|
|
|
|
|
|
)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _cell(value) -> str:
|
|
|
|
|
|
return str(value).strip() if value is not None else ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[tuple[str, str]]:
|
2026-04-30 12:39:48 +01:00
|
|
|
|
"""
|
|
|
|
|
|
Return all (manufacturer, mpn) pairs found across every table in the row list.
|
|
|
|
|
|
Handles multiple tables side-by-side on the same header row.
|
|
|
|
|
|
"""
|
2026-04-30 09:49:48 +01:00
|
|
|
|
parts: list[tuple[str, str]] = []
|
|
|
|
|
|
i = 0
|
|
|
|
|
|
while i < len(indexed_rows):
|
|
|
|
|
|
_, row = indexed_rows[i]
|
|
|
|
|
|
row_str = [_cell(v) for v in row]
|
|
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
|
|
|
|
|
|
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
if not mfr_cols or not mpn_cols:
|
2026-04-30 09:49:48 +01:00
|
|
|
|
i += 1
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
# Pair each mfr_col with its nearest unpaired mpn_col
|
|
|
|
|
|
pairs: list[tuple[int, int]] = []
|
|
|
|
|
|
used_mpn: set[int] = set()
|
|
|
|
|
|
for mfr_col in mfr_cols:
|
|
|
|
|
|
available = [c for c in mpn_cols if c not in used_mpn]
|
|
|
|
|
|
if not available:
|
|
|
|
|
|
break
|
|
|
|
|
|
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
|
|
|
|
|
|
pairs.append((mfr_col, best_mpn))
|
|
|
|
|
|
used_mpn.add(best_mpn)
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
max_j = i + 1
|
|
|
|
|
|
for mfr_col, mpn_col in pairs:
|
|
|
|
|
|
j = i + 1
|
2026-04-30 09:49:48 +01:00
|
|
|
|
empty_streak = 0
|
2026-04-30 12:39:48 +01:00
|
|
|
|
while j < len(indexed_rows):
|
|
|
|
|
|
_, dr = indexed_rows[j]
|
|
|
|
|
|
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
|
|
|
|
|
|
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
|
|
|
|
|
|
|
|
|
|
|
|
if not mfr and not mpn:
|
|
|
|
|
|
empty_streak += 1
|
|
|
|
|
|
if empty_streak >= 3:
|
|
|
|
|
|
break
|
|
|
|
|
|
j += 1
|
|
|
|
|
|
continue
|
|
|
|
|
|
empty_streak = 0
|
|
|
|
|
|
|
|
|
|
|
|
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
|
|
|
|
|
|
break
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
if mpn and mpn.lower() not in SKIP_MPNS:
|
|
|
|
|
|
parts.append((mfr, mpn))
|
|
|
|
|
|
j += 1
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
max_j = max(max_j, j)
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
i = max_j
|
2026-04-30 09:49:48 +01:00
|
|
|
|
return parts
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract(bom_dir: Path) -> list[tuple[str, str]]:
|
|
|
|
|
|
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
|
|
|
|
|
|
|
|
|
|
|
|
if not files:
|
|
|
|
|
|
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
|
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
seen: set[tuple[str, str]] = set()
|
|
|
|
|
|
parts: list[tuple[str, str]] = []
|
|
|
|
|
|
|
|
|
|
|
|
for f in files:
|
|
|
|
|
|
log.info(f"Reading {f.name}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
|
|
|
|
|
|
for sheet_name in wb.sheetnames:
|
|
|
|
|
|
ws = wb[sheet_name]
|
|
|
|
|
|
indexed = [
|
|
|
|
|
|
(i, tuple(row))
|
|
|
|
|
|
for i, row in enumerate(ws.iter_rows(values_only=True), start=1)
|
|
|
|
|
|
]
|
|
|
|
|
|
found = _find_tables(indexed)
|
|
|
|
|
|
new = [(mfr, mpn) for mfr, mpn in found
|
|
|
|
|
|
if (mfr.lower(), mpn.lower()) not in seen]
|
|
|
|
|
|
for mfr, mpn in new:
|
|
|
|
|
|
seen.add((mfr.lower(), mpn.lower()))
|
|
|
|
|
|
parts.append((mfr, mpn))
|
|
|
|
|
|
if found:
|
|
|
|
|
|
log.info(f" Sheet '{sheet_name}': {len(found)} rows, {len(new)} new unique")
|
|
|
|
|
|
wb.close()
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
log.error(f" Failed to read {f.name}: {exc}")
|
|
|
|
|
|
|
|
|
|
|
|
log.info(f"Total unique parts: {len(parts)}")
|
|
|
|
|
|
return parts
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
def write_chunks(parts: list[tuple[str, str]], output_dir: Path) -> None:
|
|
|
|
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
2026-04-30 09:49:48 +01:00
|
|
|
|
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
|
|
|
|
|
|
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
|
|
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
total = len(df)
|
|
|
|
|
|
n_files = (total + CHUNK_SIZE - 1) // CHUNK_SIZE
|
|
|
|
|
|
|
|
|
|
|
|
for idx in range(n_files):
|
|
|
|
|
|
chunk = df.iloc[idx * CHUNK_SIZE : (idx + 1) * CHUNK_SIZE]
|
|
|
|
|
|
out = output_dir / f"bom_parts_{idx + 1}_of_{n_files}.xlsx"
|
|
|
|
|
|
|
|
|
|
|
|
with pd.ExcelWriter(out, engine="openpyxl") as writer:
|
|
|
|
|
|
chunk.to_excel(writer, index=False, sheet_name="Parts")
|
|
|
|
|
|
ws = writer.sheets["Parts"]
|
|
|
|
|
|
for col in ws.columns:
|
|
|
|
|
|
width = max(len(str(cell.value or "")) for cell in col)
|
|
|
|
|
|
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
|
|
|
|
|
|
|
|
|
|
|
|
log.info(f" Written → {out} ({len(chunk)} parts)")
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
2026-04-30 12:39:48 +01:00
|
|
|
|
log.info(f"Done – {total} unique parts across {n_files} file(s) in {output_dir}/")
|
2026-04-30 09:49:48 +01:00
|
|
|
|
|
|
|
|
|
|
|
2026-05-01 10:31:54 +01:00
|
|
|
|
def write_siliconexpert(parts: list[tuple[str, str]], output_dir: Path) -> None:
|
|
|
|
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
|
|
|
|
|
|
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
|
|
|
|
|
|
out = output_dir / "siliconexpert.xlsx"
|
|
|
|
|
|
|
|
|
|
|
|
with pd.ExcelWriter(out, engine="openpyxl") as writer:
|
|
|
|
|
|
df.to_excel(writer, index=False, sheet_name="Parts")
|
|
|
|
|
|
ws = writer.sheets["Parts"]
|
|
|
|
|
|
for col in ws.columns:
|
|
|
|
|
|
width = max(len(str(cell.value or "")) for cell in col)
|
|
|
|
|
|
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
|
|
|
|
|
|
|
|
|
|
|
|
log.info(f" Written → {out} ({len(df)} parts)")
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-04-30 09:49:48 +01:00
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
if not BOM_DIR.exists():
|
|
|
|
|
|
log.error(f"BoM directory '{BOM_DIR}' not found.")
|
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
parts = extract(BOM_DIR)
|
2026-04-30 12:39:48 +01:00
|
|
|
|
write_chunks(parts, OUTPUT_DIR)
|
2026-05-01 10:31:54 +01:00
|
|
|
|
write_siliconexpert(parts, OUTPUT_DIR)
|