chenages
This commit is contained in:
140
bom_extract.py
Normal file
140
bom_extract.py
Normal file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
BoM Extractor
|
||||
=============
|
||||
Reads every .xlsx / .xlsm file from the BoM/ folder, extracts all
|
||||
(Manufacturer, MPN) pairs from every table in every sheet, deduplicates,
|
||||
and writes the result to bom_parts.xlsx.
|
||||
|
||||
Usage:
|
||||
python bom_extract.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import openpyxl
|
||||
import pandas as pd
|
||||
|
||||
BOM_DIR = Path("BoM")
|
||||
OUTPUT_FILE = Path("bom_parts.xlsx")
|
||||
|
||||
SKIP_MPNS = {
|
||||
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
|
||||
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
|
||||
"do not populate",
|
||||
}
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _cell(value) -> str:
|
||||
return str(value).strip() if value is not None else ""
|
||||
|
||||
|
||||
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[tuple[str, str]]:
|
||||
"""Return all (manufacturer, mpn) pairs found across every table in the row list."""
|
||||
parts: list[tuple[str, str]] = []
|
||||
i = 0
|
||||
while i < len(indexed_rows):
|
||||
_, row = indexed_rows[i]
|
||||
row_str = [_cell(v) for v in row]
|
||||
|
||||
mfr_col = next((c for c, v in enumerate(row_str) if v.lower() == "manufacturer"), None)
|
||||
mpn_col = next((c for c, v in enumerate(row_str) if v.lower() == "mpn"), None)
|
||||
|
||||
if mfr_col is None or mpn_col is None:
|
||||
i += 1
|
||||
continue
|
||||
|
||||
j = i + 1
|
||||
empty_streak = 0
|
||||
while j < len(indexed_rows):
|
||||
_, dr = indexed_rows[j]
|
||||
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
|
||||
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
|
||||
|
||||
if not mfr and not mpn:
|
||||
empty_streak += 1
|
||||
if empty_streak >= 3:
|
||||
break
|
||||
j += 1
|
||||
continue
|
||||
empty_streak = 0
|
||||
|
||||
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
|
||||
break
|
||||
|
||||
if mpn and mpn.lower() not in SKIP_MPNS:
|
||||
parts.append((mfr, mpn))
|
||||
j += 1
|
||||
|
||||
i = j
|
||||
return parts
|
||||
|
||||
|
||||
def extract(bom_dir: Path) -> list[tuple[str, str]]:
|
||||
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
|
||||
|
||||
if not files:
|
||||
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
|
||||
sys.exit(1)
|
||||
|
||||
seen: set[tuple[str, str]] = set()
|
||||
parts: list[tuple[str, str]] = []
|
||||
|
||||
for f in files:
|
||||
log.info(f"Reading {f.name}")
|
||||
try:
|
||||
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
|
||||
for sheet_name in wb.sheetnames:
|
||||
ws = wb[sheet_name]
|
||||
indexed = [
|
||||
(i, tuple(row))
|
||||
for i, row in enumerate(ws.iter_rows(values_only=True), start=1)
|
||||
]
|
||||
found = _find_tables(indexed)
|
||||
new = [(mfr, mpn) for mfr, mpn in found
|
||||
if (mfr.lower(), mpn.lower()) not in seen]
|
||||
for mfr, mpn in new:
|
||||
seen.add((mfr.lower(), mpn.lower()))
|
||||
parts.append((mfr, mpn))
|
||||
if found:
|
||||
log.info(f" Sheet '{sheet_name}': {len(found)} rows, {len(new)} new unique")
|
||||
wb.close()
|
||||
except Exception as exc:
|
||||
log.error(f" Failed to read {f.name}: {exc}")
|
||||
|
||||
log.info(f"Total unique parts: {len(parts)}")
|
||||
return parts
|
||||
|
||||
|
||||
def write(parts: list[tuple[str, str]], output: Path) -> None:
|
||||
df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"])
|
||||
df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True)
|
||||
|
||||
with pd.ExcelWriter(output, engine="openpyxl") as writer:
|
||||
df.to_excel(writer, index=False, sheet_name="Parts")
|
||||
ws = writer.sheets["Parts"]
|
||||
for col in ws.columns:
|
||||
width = max(len(str(cell.value or "")) for cell in col)
|
||||
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60)
|
||||
|
||||
log.info(f"Written → {output} ({len(parts)} unique parts)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not BOM_DIR.exists():
|
||||
log.error(f"BoM directory '{BOM_DIR}' not found.")
|
||||
sys.exit(1)
|
||||
|
||||
parts = extract(BOM_DIR)
|
||||
write(parts, OUTPUT_FILE)
|
||||
Reference in New Issue
Block a user