171 lines
5.3 KiB
Python
171 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
BoM Extractor
|
||
=============
|
||
Reads every .xlsx / .xlsm file from the BoM/ folder, extracts all
|
||
(Manufacturer, MPN) pairs from every table in every sheet, deduplicates,
|
||
and writes the result to bom_parts.xlsx.
|
||
|
||
Usage:
|
||
python bom_extract.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import sys
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
import openpyxl
|
||
import pandas as pd
|
||
|
||
BOM_DIR = Path("BoM")
|
||
OUTPUT_DIR = Path("OUTPUT")
|
||
CHUNK_SIZE = 500
|
||
|
||
SKIP_MPNS = {
|
||
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
|
||
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
|
||
"do not populate",
|
||
}
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
def _cell(value) -> str:
|
||
return str(value).strip() if value is not None else ""
|
||
|
||
|
||
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[tuple[str, str]]:
|
||
"""
|
||
Return all (manufacturer, mpn) pairs found across every table in the row list.
|
||
Handles multiple tables side-by-side on the same header row.
|
||
"""
|
||
parts: list[tuple[str, str]] = []
|
||
i = 0
|
||
while i < len(indexed_rows):
|
||
_, row = indexed_rows[i]
|
||
row_str = [_cell(v) for v in row]
|
||
|
||
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
|
||
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
|
||
|
||
if not mfr_cols or not mpn_cols:
|
||
i += 1
|
||
continue
|
||
|
||
# Pair each mfr_col with its nearest unpaired mpn_col
|
||
pairs: list[tuple[int, int]] = []
|
||
used_mpn: set[int] = set()
|
||
for mfr_col in mfr_cols:
|
||
available = [c for c in mpn_cols if c not in used_mpn]
|
||
if not available:
|
||
break
|
||
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
|
||
pairs.append((mfr_col, best_mpn))
|
||
used_mpn.add(best_mpn)
|
||
|
||
max_j = i + 1
|
||
for mfr_col, mpn_col in pairs:
|
||
j = i + 1
|
||
empty_streak = 0
|
||
while j < len(indexed_rows):
|
||
_, dr = indexed_rows[j]
|
||
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
|
||
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
|
||
|
||
if not mfr and not mpn:
|
||
empty_streak += 1
|
||
if empty_streak >= 3:
|
||
break
|
||
j += 1
|
||
continue
|
||
empty_streak = 0
|
||
|
||
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
|
||
break
|
||
|
||
if mpn and mpn.lower() not in SKIP_MPNS:
|
||
parts.append((mfr, mpn))
|
||
j += 1
|
||
|
||
max_j = max(max_j, j)
|
||
|
||
i = max_j
|
||
return parts
|
||
|
||
|
||
def extract(bom_dir: Path) -> list[tuple[str, str]]:
|
||
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
|
||
|
||
if not files:
|
||
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
|
||
sys.exit(1)
|
||
|
||
seen: set[tuple[str, str]] = set()
|
||
parts: list[tuple[str, str]] = []
|
||
|
||
for f in files:
|
||
log.info(f"Reading {f.name}")
|
||
try:
|
||
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
indexed = [
|
||
(i, tuple(row))
|
||
for i, row in enumerate(ws.iter_rows(values_only=True), start=1)
|
||
]
|
||
found = _find_tables(indexed)
|
||
new = [(mfr, mpn) for mfr, mpn in found
|
||
if (mfr.lower(), mpn.lower()) not in seen]
|
||
for mfr, mpn in new:
|
||
seen.add((mfr.lower(), mpn.lower()))
|
||
parts.append((mfr, mpn))
|
||
if found:
|
||
log.info(f" Sheet '{sheet_name}': {len(found)} rows, {len(new)} new unique")
|
||
wb.close()
|
||
except Exception as exc:
|
||
log.error(f" Failed to read {f.name}: {exc}")
|
||
|
||
log.info(f"Total unique parts: {len(parts)}")
|
||
return parts
|
||
|
||
|
||
def write_chunks(parts: list[tuple[str, str]], output_dir: Path) -> None:
|
||
output_dir.mkdir(exist_ok=True)
|
||
|
||
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
|
||
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
|
||
|
||
total = len(df)
|
||
n_files = (total + CHUNK_SIZE - 1) // CHUNK_SIZE
|
||
|
||
for idx in range(n_files):
|
||
chunk = df.iloc[idx * CHUNK_SIZE : (idx + 1) * CHUNK_SIZE]
|
||
out = output_dir / f"bom_parts_{idx + 1}_of_{n_files}.xlsx"
|
||
|
||
with pd.ExcelWriter(out, engine="openpyxl") as writer:
|
||
chunk.to_excel(writer, index=False, sheet_name="Parts")
|
||
ws = writer.sheets["Parts"]
|
||
for col in ws.columns:
|
||
width = max(len(str(cell.value or "")) for cell in col)
|
||
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
|
||
|
||
log.info(f" Written → {out} ({len(chunk)} parts)")
|
||
|
||
log.info(f"Done – {total} unique parts across {n_files} file(s) in {output_dir}/")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if not BOM_DIR.exists():
|
||
log.error(f"BoM directory '{BOM_DIR}' not found.")
|
||
sys.exit(1)
|
||
|
||
parts = extract(BOM_DIR)
|
||
write_chunks(parts, OUTPUT_DIR)
|