Files
BoMtoCost/octo_fill.py
David Rice 1ebd78f538 Finised
2026-05-05 15:02:49 +01:00

404 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Octo Fill Silicon Expert
==========================
Reads the Silicon Expert export (OCTO/seout.xlsx) and fills the
"Unit Cost EUR @1000" column in every component table across every
sheet/tab of every BoM file in the BoM/ folder.
Silicon Expert column mapping:
Manufacturer → UPLOADED MFG
MPN → UPLOADED PART
Price → BUDGETARY PRICES column, parses "Min X & Avg Y" → uses Avg (EUR)
Matching strategy:
1. Exact match on both Uploaded Mfg + Uploaded Part (preferred)
2. Fallback: match on Uploaded Part alone
Where a part appears more than once, the lowest price is used.
Cells that already contain a value are left untouched.
Usage:
python octo_fill.py
"""
from __future__ import annotations
import sys
import logging
from pathlib import Path
from typing import Optional
import re
import openpyxl
from openpyxl.cell.cell import MergedCell
# ── Patch openpyxl for newer Excel attribute it doesn't know about ─────────────
from openpyxl.worksheet.dimensions import SheetFormatProperties as _SFP
_sfp_orig = _SFP.__init__
def _sfp_patched(self, **kw):
kw.pop("defaultColWidthPt", None)
_sfp_orig(self, **kw)
_SFP.__init__ = _sfp_patched
# ──────────────────────────────────────────────────────────────────────────────
BOM_DIR = Path("BoM")
OCTO_DIR = Path("OCTO")
SEOUT_FILE = OCTO_DIR / "seout.xlsx"
COST_HEADER = "Unit Cost EUR @1000"
SKIP_MPNS = {
"", "0", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
"do not populate", "total",
}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ── Load Silicon Expert data ───────────────────────────────────────────────────
def _find_col(headers: dict[str, int], substring: str) -> Optional[int]:
"""Return the index of the first header whose name contains substring (case-insensitive)."""
for name, idx in headers.items():
if substring.lower() in name.lower():
return idx
return None
def load_seout(path: Path) -> tuple[dict[tuple[str, str], float], dict[str, float]]:
"""
Load seout.xlsx into lookup maps.
exact_map (mfg_lower, part_lower) → lowest unit price (EUR)
mpn_map part_lower → lowest unit price (EUR) [fallback]
"""
exact_map: dict[tuple[str, str], float] = {}
mpn_map: dict[str, float] = {}
wb = openpyxl.load_workbook(path, data_only=True, read_only=True)
added = 0
avg_col_name = None
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
headers: Optional[dict[str, int]] = None
for row in ws.iter_rows(values_only=True):
row = list(row)
if headers is None:
row_lower = [str(v).strip().lower() if v is not None else "" for v in row]
has_part = any("uploaded part" in v for v in row_lower)
has_mfg = any("uploaded mfg" in v for v in row_lower)
if has_part and has_mfg:
headers = {
str(row[i]).strip(): i
for i in range(len(row))
if row[i] is not None
}
log.info(f" Sheet '{sheet_name}' headers: {list(headers.keys())}")
for h in headers:
if "budgetary" in h.lower() or "price" in h.lower():
avg_col_name = h
break
continue
if not any(row):
continue
mpn_col = _find_col(headers, "uploaded part")
mfr_col = _find_col(headers, "uploaded mfg")
# Silicon Expert stores prices as "Min X & Avg Y" in a BUDGETARY PRICES column
price_col = _find_col(headers, "budgetary") or _find_col(headers, "price")
if mpn_col is None or price_col is None:
continue
mpn = (
str(row[mpn_col]).strip()
if mpn_col < len(row) and row[mpn_col] is not None
else ""
)
mfr = (
str(row[mfr_col]).strip()
if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None
else ""
)
price_raw = str(row[price_col]).strip() if price_col < len(row) and row[price_col] is not None else ""
if not mpn or mpn.lower() in SKIP_MPNS:
continue
# Parse "Min 0.818 & Avg 1.3225562077" → extract the Avg value
avg_match = re.search(r'Avg\s+([\d.]+)', price_raw, re.IGNORECASE)
if not avg_match:
continue
try:
price = float(avg_match.group(1))
except ValueError:
continue
if price <= 0:
continue
key = (mfr.lower(), mpn.lower())
if key not in exact_map or price < exact_map[key]:
exact_map[key] = price
added += 1
mpn_k = mpn.lower()
if mpn_k not in mpn_map or price < mpn_map[mpn_k]:
mpn_map[mpn_k] = price
wb.close()
if not exact_map:
log.warning(
f"No entries loaded from {path.name}. "
"Check that the file has columns containing 'Uploaded Part', 'Uploaded Mfg', "
"Expected a 'BUDGETARY PRICES' column with values like 'Min X & Avg Y'."
)
log.info(
f"Silicon Expert ({path.name}): {len(exact_map)} unique (mfg, part) entries "
f"— avg price column: '{avg_col_name}'"
)
return exact_map, mpn_map
# ── BoM table finding ──────────────────────────────────────────────────────────
def _cell(value) -> str:
return str(value).strip() if value is not None else ""
def _find_tables(indexed_rows: list[tuple[int, tuple]]):
"""
Yields TableInfo dicts per component table found.
Handles multiple tables side-by-side on the same row.
Includes 'start_col' so the cost-column search stays within each table.
"""
i = 0
while i < len(indexed_rows):
row_num, row = indexed_rows[i]
row_str = [_cell(v) for v in row]
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
if not mfr_cols or not mpn_cols:
i += 1
continue
pairs: list[tuple[int, int]] = []
used_mpn: set[int] = set()
for mfr_col in mfr_cols:
available = [c for c in mpn_cols if c not in used_mpn]
if not available:
break
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
pairs.append((mfr_col, best_mpn))
used_mpn.add(best_mpn)
max_j = i + 1
new_header_j = None # earliest row where a same-column header reappeared
for mfr_col, mpn_col in pairs:
data: list[tuple[int, str, str]] = []
j = i + 1
empty_streak = 0
while j < len(indexed_rows):
dr_num, dr = indexed_rows[j]
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
if not mfr and not mpn:
empty_streak += 1
if empty_streak >= 3:
break
j += 1
continue
empty_streak = 0
# Same-column header detected — record it but let other pairs
# continue reading past it so their data isn't truncated.
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
if new_header_j is None or j < new_header_j:
new_header_j = j
break
if mpn and mpn.lower() not in SKIP_MPNS:
data.append((dr_num, mfr, mpn))
j += 1
max_j = max(max_j, j)
yield {
"header_row": row_num,
"mfr_col": mfr_col + 1,
"mpn_col": mpn_col + 1,
"start_col": min(mfr_col, mpn_col) + 1,
"data": data,
}
# Rewind to the earliest sub-table header so the outer loop can pick it
# up, while still allowing wider tables (other columns) to have yielded
# their full data above.
i = new_header_j if new_header_j is not None else max_j
# ── Write back to BoM files ────────────────────────────────────────────────────
def fill_boms(
bom_dir: Path,
exact_map: dict[tuple[str, str], float],
mpn_map: dict[str, float],
) -> None:
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
if not files:
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
sys.exit(1)
total_filled = 0
total_skipped = 0
total_missing = 0
for f in files:
log.info(f"Processing {f.name}")
try:
# data_only resolves formula cells (e.g. =UPPER("Mfr")) to their values
# for detection; the writable wb is used for writing prices.
wb_ro = openpyxl.load_workbook(f, data_only=True, read_only=True)
wb = openpyxl.load_workbook(f)
except Exception as exc:
log.error(f" Cannot open {f.name}: {exc}")
continue
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
indexed = [
(i, tuple(row))
for i, row in enumerate(wb_ro[sheet_name].iter_rows(values_only=True), start=1)
]
# Reuse the same cost column for all stacked tables at the same
# start_col on this sheet, so a second sub-table doesn't create a
# new column one position to the right.
sheet_cost_cols: dict[int, int] = {}
KNOWN_COST_HEADERS = {
COST_HEADER.lower(),
"unit cost 1000x data",
}
for table in _find_tables(indexed):
header_row = table["header_row"]
data_rows = [r for r, _, _ in table["data"]]
row_range = (
f" (Excel rows {data_rows[0]}{data_rows[-1]})"
if data_rows else " (no data rows detected)"
)
log.info(
f" Sheet '{sheet_name}' row {header_row}: "
f"table at col {table['start_col']}, {len(table['data'])} parts{row_range}"
)
if table["start_col"] in sheet_cost_cols:
# Stacked table — reuse the cost column found/created by the
# first table at this column position on this sheet.
cost_col = sheet_cost_cols[table["start_col"]]
else:
cost_col = None
last_used = table["start_col"]
max_col = ws.max_column or 1
for c in range(table["start_col"], max_col + 1):
val = ws.cell(header_row, c).value
if val is not None:
val_str = str(val).strip()
if not val_str.startswith("="):
last_used = c
if val_str.lower() in KNOWN_COST_HEADERS:
cost_col = c
break
if cost_col is None:
cost_col = last_used + 1
while isinstance(ws.cell(header_row, cost_col), MergedCell):
cost_col += 1
ws.cell(header_row, cost_col).value = COST_HEADER
sheet_cost_cols[table["start_col"]] = cost_col
log.info(f" Cost column: {cost_col} ('{ws.cell(header_row, cost_col).value}')")
tbl_filled = tbl_skipped = tbl_missing = 0
for row_num, mfr, mpn in table["data"]:
cell = ws.cell(row_num, cost_col)
if isinstance(cell, MergedCell):
continue
existing = cell.value
is_formula = isinstance(existing, str) and existing.startswith("=")
is_empty = (
existing is None
or (isinstance(existing, str) and existing.strip() in ("", "0"))
or (isinstance(existing, (int, float)) and existing == 0)
)
if not is_empty and not is_formula:
log.debug(f" Skip row {row_num} [{mpn}]: cell already has {repr(existing)}")
total_skipped += 1
tbl_skipped += 1
continue
price = exact_map.get((mfr.lower(), mpn.lower()))
if price is None:
price = mpn_map.get(mpn.lower())
if price is not None:
log.debug(f" MPN-only match: {mpn} (mfr '{mfr}' not matched)")
if price is not None:
cell.value = price
cell.number_format = "0.000000"
total_filled += 1
tbl_filled += 1
else:
total_missing += 1
tbl_missing += 1
log.debug(f" No match: [{mfr}] [{mpn}]")
log.info(
f" → filled {tbl_filled}, skipped {tbl_skipped}, "
f"no match {tbl_missing}"
)
wb_ro.close()
try:
wb.save(f)
log.info(f" Saved {f.name}")
except PermissionError:
log.error(f" Cannot save {f.name} close it in Excel first.")
except Exception as exc:
log.error(f" Save failed for {f.name}: {exc}")
log.info(
f"Done filled: {total_filled}, "
f"already populated (skipped): {total_skipped}, "
f"no match in Silicon Expert: {total_missing}"
)
# ── Main ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
for p in (BOM_DIR, OCTO_DIR):
if not p.exists():
log.error(f"Not found: {p}")
sys.exit(1)
if not SEOUT_FILE.exists():
log.error(f"Silicon Expert export not found: {SEOUT_FILE}")
sys.exit(1)
exact_map, mpn_map = load_seout(SEOUT_FILE)
fill_boms(BOM_DIR, exact_map, mpn_map)