Files
BoMtoCost/octo_fill.py
David Rice 70b2b6acc3 Updates
2026-04-30 12:39:48 +01:00

321 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Octo Fill
=========
Reads the Octopart export (OCTO/octo.xlsx) and fills the
"Unit Cost EUR @1000" column in every component table across every
sheet/tab of every BoM file in the BoM/ folder.
Matching strategy:
1. Exact match on both Original Manufacturer + Original Part (preferred)
2. Fallback: match on Original Part alone (handles slight manufacturer
name differences between BoM and Octopart)
Where a part appears more than once in octo.xlsx (multiple distributor
offers), the lowest price is used.
Cells that already contain a value are left untouched.
Usage:
python octo_fill.py
"""
from __future__ import annotations
import sys
import logging
from pathlib import Path
from typing import Optional
import openpyxl
from openpyxl.cell.cell import MergedCell
# ── Patch openpyxl for newer Excel attribute it doesn't know about ─────────────
from openpyxl.worksheet.dimensions import SheetFormatProperties as _SFP
_sfp_orig = _SFP.__init__
def _sfp_patched(self, **kw):
kw.pop("defaultColWidthPt", None)
_sfp_orig(self, **kw)
_SFP.__init__ = _sfp_patched
# ──────────────────────────────────────────────────────────────────────────────
BOM_DIR = Path("BoM")
OCTO_FILE = Path("OCTO/octo.xlsx")
COST_HEADER = "Unit Cost EUR @1000"
SKIP_MPNS = {
"", "0", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
"do not populate",
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ── Load Octopart data ─────────────────────────────────────────────────────────
def load_octo(path: Path) -> tuple[dict[tuple[str,str], float], dict[str, float]]:
"""
Returns:
exact_map (manufacturer_lower, mpn_lower) → lowest unit price
mpn_map mpn_lower → lowest unit price (fallback)
"""
log.info(f"Reading Octopart data from {path}")
wb = openpyxl.load_workbook(path, data_only=True, read_only=True)
exact_map: dict[tuple[str, str], float] = {}
mpn_map: dict[str, float] = {}
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers: Optional[dict[str, int]] = None # col_name → 0-based index
for row in ws.iter_rows(values_only=True):
row = list(row)
if headers is None:
# Find header row
row_lower = [str(v).strip().lower() if v is not None else "" for v in row]
if "original part" in row_lower and "original manufacturer" in row_lower:
headers = {str(row[i]).strip(): i for i in range(len(row)) if row[i] is not None}
continue
if not any(row):
continue
mpn_col = _find_col(headers, "original part")
mfr_col = _find_col(headers, "original manufacturer")
price_col = _find_col(headers, "unit price")
if mpn_col is None or price_col is None:
continue
mpn = str(row[mpn_col]).strip() if mpn_col < len(row) and row[mpn_col] is not None else ""
mfr = str(row[mfr_col]).strip() if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None else ""
price_raw = row[price_col] if price_col < len(row) else None
if not mpn or mpn.lower() in SKIP_MPNS:
continue
try:
price = float(price_raw)
except (TypeError, ValueError):
continue
if price <= 0:
continue
key = (mfr.lower(), mpn.lower())
if key not in exact_map or price < exact_map[key]:
exact_map[key] = price
mpn_k = mpn.lower()
if mpn_k not in mpn_map or price < mpn_map[mpn_k]:
mpn_map[mpn_k] = price
wb.close()
log.info(f" Loaded {len(exact_map)} unique (manufacturer, part) entries from Octopart")
return exact_map, mpn_map
def _find_col(headers: dict[str, int], prefix: str) -> Optional[int]:
"""Case-insensitive prefix match on header names."""
for name, idx in headers.items():
if name.lower().startswith(prefix.lower()):
return idx
return None
# ── BoM table finding ──────────────────────────────────────────────────────────
def _cell(value) -> str:
return str(value).strip() if value is not None else ""
def _find_tables(indexed_rows: list[tuple[int, tuple]]):
"""
Yields TableInfo dicts per component table found.
Handles multiple tables side-by-side on the same row by finding ALL
Manufacturer+MPN column pairs in a header row, not just the first.
Includes 'start_col' so the cost-column search stays within each table.
"""
i = 0
while i < len(indexed_rows):
row_num, row = indexed_rows[i]
row_str = [_cell(v) for v in row]
# All column positions that are "manufacturer" or "mpn"
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
if not mfr_cols or not mpn_cols:
i += 1
continue
# Pair each mfr_col with its nearest unpaired mpn_col
pairs: list[tuple[int, int]] = []
used_mpn: set[int] = set()
for mfr_col in mfr_cols:
available = [c for c in mpn_cols if c not in used_mpn]
if not available:
break
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
pairs.append((mfr_col, best_mpn))
used_mpn.add(best_mpn)
max_j = i + 1
for mfr_col, mpn_col in pairs:
data: list[tuple[int, str, str]] = []
j = i + 1
empty_streak = 0
while j < len(indexed_rows):
dr_num, dr = indexed_rows[j]
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
if not mfr and not mpn:
empty_streak += 1
if empty_streak >= 3:
break
j += 1
continue
empty_streak = 0
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
break
if mpn and mpn.lower() not in SKIP_MPNS:
data.append((dr_num, mfr, mpn))
j += 1
max_j = max(max_j, j)
yield {
"header_row": row_num,
"mfr_col": mfr_col + 1, # 1-based
"mpn_col": mpn_col + 1,
"start_col": min(mfr_col, mpn_col) + 1, # leftmost col of this table
"data": data,
}
i = max_j
# ── Write back to BoM files ────────────────────────────────────────────────────
def fill_boms(
bom_dir: Path,
exact_map: dict[tuple[str, str], float],
mpn_map: dict[str, float],
) -> None:
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
if not files:
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
sys.exit(1)
total_filled = 0
total_skipped = 0
total_missing = 0
for f in files:
log.info(f"Processing {f.name}")
try:
wb = openpyxl.load_workbook(f)
except Exception as exc:
log.error(f" Cannot open {f.name}: {exc}")
continue
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
indexed = [
(i, tuple(row))
for i, row in enumerate(ws.iter_rows(values_only=True), start=1)
]
for table in _find_tables(indexed):
header_row = table["header_row"]
log.info(
f" Sheet '{sheet_name}' row {header_row}: "
f"table at col {table['start_col']}, {len(table['data'])} parts"
)
# Find or create the cost column.
# Accept either of the two known column names (the primary
# COST_HEADER or the name used by the earlier write-back script).
KNOWN_COST_HEADERS = {
COST_HEADER.lower(),
"unit cost 1000x data",
}
cost_col = None
last_used = table["start_col"]
max_col = ws.max_column or 1
# Search only within this table's column range (from its
# leftmost column rightward) so side-by-side tables don't
# steal each other's cost column.
for c in range(table["start_col"], max_col + 1):
val = ws.cell(header_row, c).value
if val is not None:
last_used = c
if str(val).strip().lower() in KNOWN_COST_HEADERS:
cost_col = c
break
if cost_col is None:
cost_col = last_used + 1
while isinstance(ws.cell(header_row, cost_col), MergedCell):
cost_col += 1
ws.cell(header_row, cost_col).value = COST_HEADER
for row_num, mfr, mpn in table["data"]:
cell = ws.cell(row_num, cost_col)
if isinstance(cell, MergedCell):
continue
existing = cell.value
if existing is not None and str(existing).strip() not in ("", "0") and existing != 0:
total_skipped += 1
continue
# Look up price: exact match first, then MPN-only fallback
price = exact_map.get((mfr.lower(), mpn.lower()))
if price is None:
price = mpn_map.get(mpn.lower())
if price is not None:
log.debug(f" MPN-only match: {mpn} (mfr '{mfr}' not matched)")
if price is not None:
cell.value = price
total_filled += 1
else:
total_missing += 1
log.info(f" No match in Octopart: [{mfr}] [{mpn}]")
try:
wb.save(f)
log.info(f" Saved {f.name}")
except PermissionError:
log.error(f" Cannot save {f.name} close it in Excel first.")
except Exception as exc:
log.error(f" Save failed for {f.name}: {exc}")
log.info(
f"Done filled: {total_filled}, "
f"already populated (skipped): {total_skipped}, "
f"no match in Octopart: {total_missing}"
)
# ── Main ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
for p in (BOM_DIR, OCTO_FILE):
if not p.exists():
log.error(f"Not found: {p}")
sys.exit(1)
exact_map, mpn_map = load_octo(OCTO_FILE)
fill_boms(BOM_DIR, exact_map, mpn_map)