Files
BoMtoCost/bom_extract.py

189 lines
6.0 KiB
Python
Raw Normal View History

2026-04-30 09:49:48 +01:00
#!/usr/bin/env python3
"""
BoM Extractor
=============
Reads every .xlsx / .xlsm file from the BoM/ folder, extracts all
(Manufacturer, MPN) pairs from every table in every sheet, deduplicates,
and writes the result to bom_parts.xlsx.
Usage:
python bom_extract.py
"""
from __future__ import annotations
import sys
import logging
from pathlib import Path
import openpyxl
import pandas as pd
BOM_DIR = Path("BoM")
2026-04-30 12:39:48 +01:00
OUTPUT_DIR = Path("OUTPUT")
CHUNK_SIZE = 500
2026-04-30 09:49:48 +01:00
SKIP_MPNS = {
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
"do not populate",
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
def _cell(value) -> str:
return str(value).strip() if value is not None else ""
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[tuple[str, str]]:
2026-04-30 12:39:48 +01:00
"""
Return all (manufacturer, mpn) pairs found across every table in the row list.
Handles multiple tables side-by-side on the same header row.
"""
2026-04-30 09:49:48 +01:00
parts: list[tuple[str, str]] = []
i = 0
while i < len(indexed_rows):
_, row = indexed_rows[i]
row_str = [_cell(v) for v in row]
2026-04-30 12:39:48 +01:00
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
if not mfr_cols or not mpn_cols:
2026-04-30 09:49:48 +01:00
i += 1
continue
2026-04-30 12:39:48 +01:00
# Pair each mfr_col with its nearest unpaired mpn_col
pairs: list[tuple[int, int]] = []
used_mpn: set[int] = set()
for mfr_col in mfr_cols:
available = [c for c in mpn_cols if c not in used_mpn]
if not available:
break
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
pairs.append((mfr_col, best_mpn))
used_mpn.add(best_mpn)
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
max_j = i + 1
for mfr_col, mpn_col in pairs:
j = i + 1
2026-04-30 09:49:48 +01:00
empty_streak = 0
2026-04-30 12:39:48 +01:00
while j < len(indexed_rows):
_, dr = indexed_rows[j]
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
if not mfr and not mpn:
empty_streak += 1
if empty_streak >= 3:
break
j += 1
continue
empty_streak = 0
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
break
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
if mpn and mpn.lower() not in SKIP_MPNS:
parts.append((mfr, mpn))
j += 1
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
max_j = max(max_j, j)
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
i = max_j
2026-04-30 09:49:48 +01:00
return parts
def extract(bom_dir: Path) -> list[tuple[str, str]]:
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
if not files:
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
sys.exit(1)
seen: set[tuple[str, str]] = set()
parts: list[tuple[str, str]] = []
for f in files:
log.info(f"Reading {f.name}")
try:
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
indexed = [
(i, tuple(row))
for i, row in enumerate(ws.iter_rows(values_only=True), start=1)
]
found = _find_tables(indexed)
new = [(mfr, mpn) for mfr, mpn in found
if (mfr.lower(), mpn.lower()) not in seen]
for mfr, mpn in new:
seen.add((mfr.lower(), mpn.lower()))
parts.append((mfr, mpn))
if found:
log.info(f" Sheet '{sheet_name}': {len(found)} rows, {len(new)} new unique")
wb.close()
except Exception as exc:
log.error(f" Failed to read {f.name}: {exc}")
log.info(f"Total unique parts: {len(parts)}")
return parts
2026-04-30 12:39:48 +01:00
def write_chunks(parts: list[tuple[str, str]], output_dir: Path) -> None:
output_dir.mkdir(exist_ok=True)
2026-04-30 09:49:48 +01:00
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
2026-04-30 12:39:48 +01:00
total = len(df)
n_files = (total + CHUNK_SIZE - 1) // CHUNK_SIZE
for idx in range(n_files):
chunk = df.iloc[idx * CHUNK_SIZE : (idx + 1) * CHUNK_SIZE]
out = output_dir / f"bom_parts_{idx + 1}_of_{n_files}.xlsx"
with pd.ExcelWriter(out, engine="openpyxl") as writer:
chunk.to_excel(writer, index=False, sheet_name="Parts")
ws = writer.sheets["Parts"]
for col in ws.columns:
width = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
log.info(f" Written → {out} ({len(chunk)} parts)")
2026-04-30 09:49:48 +01:00
2026-04-30 12:39:48 +01:00
log.info(f"Done {total} unique parts across {n_files} file(s) in {output_dir}/")
2026-04-30 09:49:48 +01:00
2026-05-01 10:31:54 +01:00
def write_siliconexpert(parts: list[tuple[str, str]], output_dir: Path) -> None:
output_dir.mkdir(exist_ok=True)
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
out = output_dir / "siliconexpert.xlsx"
with pd.ExcelWriter(out, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Parts")
ws = writer.sheets["Parts"]
for col in ws.columns:
width = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
log.info(f" Written → {out} ({len(df)} parts)")
2026-04-30 09:49:48 +01:00
if __name__ == "__main__":
if not BOM_DIR.exists():
log.error(f"BoM directory '{BOM_DIR}' not found.")
sys.exit(1)
parts = extract(BOM_DIR)
2026-04-30 12:39:48 +01:00
write_chunks(parts, OUTPUT_DIR)
2026-05-01 10:31:54 +01:00
write_siliconexpert(parts, OUTPUT_DIR)