404 lines
15 KiB
Python
404 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Octo Fill – Silicon Expert
|
||
==========================
|
||
Reads the Silicon Expert export (OCTO/seout.xlsx) and fills the
|
||
"Unit Cost EUR @1000" column in every component table across every
|
||
sheet/tab of every BoM file in the BoM/ folder.
|
||
|
||
Silicon Expert column mapping:
|
||
Manufacturer → UPLOADED MFG
|
||
MPN → UPLOADED PART
|
||
Price → BUDGETARY PRICES column, parses "Min X & Avg Y" → uses Avg (EUR)
|
||
|
||
Matching strategy:
|
||
1. Exact match on both Uploaded Mfg + Uploaded Part (preferred)
|
||
2. Fallback: match on Uploaded Part alone
|
||
|
||
Where a part appears more than once, the lowest price is used.
|
||
Cells that already contain a value are left untouched.
|
||
|
||
Usage:
|
||
python octo_fill.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import sys
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import re
|
||
import openpyxl
|
||
from openpyxl.cell.cell import MergedCell
|
||
|
||
# ── Patch openpyxl for newer Excel attribute it doesn't know about ─────────────
|
||
from openpyxl.worksheet.dimensions import SheetFormatProperties as _SFP
|
||
_sfp_orig = _SFP.__init__
|
||
def _sfp_patched(self, **kw):
|
||
kw.pop("defaultColWidthPt", None)
|
||
_sfp_orig(self, **kw)
|
||
_SFP.__init__ = _sfp_patched
|
||
# ──────────────────────────────────────────────────────────────────────────────
|
||
|
||
BOM_DIR = Path("BoM")
|
||
OCTO_DIR = Path("OCTO")
|
||
SEOUT_FILE = OCTO_DIR / "seout.xlsx"
|
||
COST_HEADER = "Unit Cost EUR @1000"
|
||
|
||
SKIP_MPNS = {
|
||
"", "0", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
|
||
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
|
||
"do not populate", "total",
|
||
}
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
# ── Load Silicon Expert data ───────────────────────────────────────────────────
|
||
|
||
def _find_col(headers: dict[str, int], substring: str) -> Optional[int]:
|
||
"""Return the index of the first header whose name contains substring (case-insensitive)."""
|
||
for name, idx in headers.items():
|
||
if substring.lower() in name.lower():
|
||
return idx
|
||
return None
|
||
|
||
|
||
def load_seout(path: Path) -> tuple[dict[tuple[str, str], float], dict[str, float]]:
|
||
"""
|
||
Load seout.xlsx into lookup maps.
|
||
exact_map – (mfg_lower, part_lower) → lowest unit price (EUR)
|
||
mpn_map – part_lower → lowest unit price (EUR) [fallback]
|
||
"""
|
||
exact_map: dict[tuple[str, str], float] = {}
|
||
mpn_map: dict[str, float] = {}
|
||
|
||
wb = openpyxl.load_workbook(path, data_only=True, read_only=True)
|
||
added = 0
|
||
avg_col_name = None
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
headers: Optional[dict[str, int]] = None
|
||
|
||
for row in ws.iter_rows(values_only=True):
|
||
row = list(row)
|
||
if headers is None:
|
||
row_lower = [str(v).strip().lower() if v is not None else "" for v in row]
|
||
has_part = any("uploaded part" in v for v in row_lower)
|
||
has_mfg = any("uploaded mfg" in v for v in row_lower)
|
||
if has_part and has_mfg:
|
||
headers = {
|
||
str(row[i]).strip(): i
|
||
for i in range(len(row))
|
||
if row[i] is not None
|
||
}
|
||
log.info(f" Sheet '{sheet_name}' headers: {list(headers.keys())}")
|
||
for h in headers:
|
||
if "budgetary" in h.lower() or "price" in h.lower():
|
||
avg_col_name = h
|
||
break
|
||
continue
|
||
|
||
if not any(row):
|
||
continue
|
||
|
||
mpn_col = _find_col(headers, "uploaded part")
|
||
mfr_col = _find_col(headers, "uploaded mfg")
|
||
# Silicon Expert stores prices as "Min X & Avg Y" in a BUDGETARY PRICES column
|
||
price_col = _find_col(headers, "budgetary") or _find_col(headers, "price")
|
||
|
||
if mpn_col is None or price_col is None:
|
||
continue
|
||
|
||
mpn = (
|
||
str(row[mpn_col]).strip()
|
||
if mpn_col < len(row) and row[mpn_col] is not None
|
||
else ""
|
||
)
|
||
mfr = (
|
||
str(row[mfr_col]).strip()
|
||
if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None
|
||
else ""
|
||
)
|
||
price_raw = str(row[price_col]).strip() if price_col < len(row) and row[price_col] is not None else ""
|
||
|
||
if not mpn or mpn.lower() in SKIP_MPNS:
|
||
continue
|
||
|
||
# Parse "Min 0.818 & Avg 1.3225562077" → extract the Avg value
|
||
avg_match = re.search(r'Avg\s+([\d.]+)', price_raw, re.IGNORECASE)
|
||
if not avg_match:
|
||
continue
|
||
try:
|
||
price = float(avg_match.group(1))
|
||
except ValueError:
|
||
continue
|
||
|
||
if price <= 0:
|
||
continue
|
||
|
||
key = (mfr.lower(), mpn.lower())
|
||
if key not in exact_map or price < exact_map[key]:
|
||
exact_map[key] = price
|
||
added += 1
|
||
|
||
mpn_k = mpn.lower()
|
||
if mpn_k not in mpn_map or price < mpn_map[mpn_k]:
|
||
mpn_map[mpn_k] = price
|
||
|
||
wb.close()
|
||
if not exact_map:
|
||
log.warning(
|
||
f"No entries loaded from {path.name}. "
|
||
"Check that the file has columns containing 'Uploaded Part', 'Uploaded Mfg', "
|
||
"Expected a 'BUDGETARY PRICES' column with values like 'Min X & Avg Y'."
|
||
)
|
||
log.info(
|
||
f"Silicon Expert ({path.name}): {len(exact_map)} unique (mfg, part) entries "
|
||
f"— avg price column: '{avg_col_name}'"
|
||
)
|
||
return exact_map, mpn_map
|
||
|
||
|
||
# ── BoM table finding ──────────────────────────────────────────────────────────
|
||
|
||
def _cell(value) -> str:
|
||
return str(value).strip() if value is not None else ""
|
||
|
||
|
||
def _find_tables(indexed_rows: list[tuple[int, tuple]]):
|
||
"""
|
||
Yields TableInfo dicts per component table found.
|
||
Handles multiple tables side-by-side on the same row.
|
||
Includes 'start_col' so the cost-column search stays within each table.
|
||
"""
|
||
i = 0
|
||
while i < len(indexed_rows):
|
||
row_num, row = indexed_rows[i]
|
||
row_str = [_cell(v) for v in row]
|
||
|
||
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
|
||
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
|
||
|
||
if not mfr_cols or not mpn_cols:
|
||
i += 1
|
||
continue
|
||
|
||
pairs: list[tuple[int, int]] = []
|
||
used_mpn: set[int] = set()
|
||
for mfr_col in mfr_cols:
|
||
available = [c for c in mpn_cols if c not in used_mpn]
|
||
if not available:
|
||
break
|
||
best_mpn = min(available, key=lambda c: abs(c - mfr_col))
|
||
pairs.append((mfr_col, best_mpn))
|
||
used_mpn.add(best_mpn)
|
||
|
||
max_j = i + 1
|
||
new_header_j = None # earliest row where a same-column header reappeared
|
||
|
||
for mfr_col, mpn_col in pairs:
|
||
data: list[tuple[int, str, str]] = []
|
||
j = i + 1
|
||
empty_streak = 0
|
||
while j < len(indexed_rows):
|
||
dr_num, dr = indexed_rows[j]
|
||
mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None)
|
||
mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None)
|
||
|
||
if not mfr and not mpn:
|
||
empty_streak += 1
|
||
if empty_streak >= 3:
|
||
break
|
||
j += 1
|
||
continue
|
||
empty_streak = 0
|
||
|
||
# Same-column header detected — record it but let other pairs
|
||
# continue reading past it so their data isn't truncated.
|
||
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
|
||
if new_header_j is None or j < new_header_j:
|
||
new_header_j = j
|
||
break
|
||
|
||
if mpn and mpn.lower() not in SKIP_MPNS:
|
||
data.append((dr_num, mfr, mpn))
|
||
j += 1
|
||
|
||
max_j = max(max_j, j)
|
||
yield {
|
||
"header_row": row_num,
|
||
"mfr_col": mfr_col + 1,
|
||
"mpn_col": mpn_col + 1,
|
||
"start_col": min(mfr_col, mpn_col) + 1,
|
||
"data": data,
|
||
}
|
||
|
||
# Rewind to the earliest sub-table header so the outer loop can pick it
|
||
# up, while still allowing wider tables (other columns) to have yielded
|
||
# their full data above.
|
||
i = new_header_j if new_header_j is not None else max_j
|
||
|
||
|
||
# ── Write back to BoM files ────────────────────────────────────────────────────
|
||
|
||
def fill_boms(
|
||
bom_dir: Path,
|
||
exact_map: dict[tuple[str, str], float],
|
||
mpn_map: dict[str, float],
|
||
) -> None:
|
||
files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"})
|
||
if not files:
|
||
log.error(f"No .xlsx/.xlsm files found in {bom_dir}/")
|
||
sys.exit(1)
|
||
|
||
total_filled = 0
|
||
total_skipped = 0
|
||
total_missing = 0
|
||
|
||
for f in files:
|
||
log.info(f"Processing {f.name}")
|
||
try:
|
||
# data_only resolves formula cells (e.g. =UPPER("Mfr")) to their values
|
||
# for detection; the writable wb is used for writing prices.
|
||
wb_ro = openpyxl.load_workbook(f, data_only=True, read_only=True)
|
||
wb = openpyxl.load_workbook(f)
|
||
except Exception as exc:
|
||
log.error(f" Cannot open {f.name}: {exc}")
|
||
continue
|
||
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
indexed = [
|
||
(i, tuple(row))
|
||
for i, row in enumerate(wb_ro[sheet_name].iter_rows(values_only=True), start=1)
|
||
]
|
||
|
||
# Reuse the same cost column for all stacked tables at the same
|
||
# start_col on this sheet, so a second sub-table doesn't create a
|
||
# new column one position to the right.
|
||
sheet_cost_cols: dict[int, int] = {}
|
||
|
||
KNOWN_COST_HEADERS = {
|
||
COST_HEADER.lower(),
|
||
"unit cost 1000x data",
|
||
}
|
||
|
||
for table in _find_tables(indexed):
|
||
header_row = table["header_row"]
|
||
data_rows = [r for r, _, _ in table["data"]]
|
||
row_range = (
|
||
f" (Excel rows {data_rows[0]}–{data_rows[-1]})"
|
||
if data_rows else " (no data rows detected)"
|
||
)
|
||
log.info(
|
||
f" Sheet '{sheet_name}' row {header_row}: "
|
||
f"table at col {table['start_col']}, {len(table['data'])} parts{row_range}"
|
||
)
|
||
|
||
if table["start_col"] in sheet_cost_cols:
|
||
# Stacked table — reuse the cost column found/created by the
|
||
# first table at this column position on this sheet.
|
||
cost_col = sheet_cost_cols[table["start_col"]]
|
||
else:
|
||
cost_col = None
|
||
last_used = table["start_col"]
|
||
max_col = ws.max_column or 1
|
||
for c in range(table["start_col"], max_col + 1):
|
||
val = ws.cell(header_row, c).value
|
||
if val is not None:
|
||
val_str = str(val).strip()
|
||
if not val_str.startswith("="):
|
||
last_used = c
|
||
if val_str.lower() in KNOWN_COST_HEADERS:
|
||
cost_col = c
|
||
break
|
||
|
||
if cost_col is None:
|
||
cost_col = last_used + 1
|
||
while isinstance(ws.cell(header_row, cost_col), MergedCell):
|
||
cost_col += 1
|
||
ws.cell(header_row, cost_col).value = COST_HEADER
|
||
|
||
sheet_cost_cols[table["start_col"]] = cost_col
|
||
|
||
log.info(f" Cost column: {cost_col} ('{ws.cell(header_row, cost_col).value}')")
|
||
|
||
tbl_filled = tbl_skipped = tbl_missing = 0
|
||
for row_num, mfr, mpn in table["data"]:
|
||
cell = ws.cell(row_num, cost_col)
|
||
if isinstance(cell, MergedCell):
|
||
continue
|
||
existing = cell.value
|
||
is_formula = isinstance(existing, str) and existing.startswith("=")
|
||
is_empty = (
|
||
existing is None
|
||
or (isinstance(existing, str) and existing.strip() in ("", "0"))
|
||
or (isinstance(existing, (int, float)) and existing == 0)
|
||
)
|
||
if not is_empty and not is_formula:
|
||
log.debug(f" Skip row {row_num} [{mpn}]: cell already has {repr(existing)}")
|
||
total_skipped += 1
|
||
tbl_skipped += 1
|
||
continue
|
||
|
||
price = exact_map.get((mfr.lower(), mpn.lower()))
|
||
if price is None:
|
||
price = mpn_map.get(mpn.lower())
|
||
if price is not None:
|
||
log.debug(f" MPN-only match: {mpn} (mfr '{mfr}' not matched)")
|
||
|
||
if price is not None:
|
||
cell.value = price
|
||
cell.number_format = "0.000000"
|
||
total_filled += 1
|
||
tbl_filled += 1
|
||
else:
|
||
total_missing += 1
|
||
tbl_missing += 1
|
||
log.debug(f" No match: [{mfr}] [{mpn}]")
|
||
|
||
log.info(
|
||
f" → filled {tbl_filled}, skipped {tbl_skipped}, "
|
||
f"no match {tbl_missing}"
|
||
)
|
||
|
||
wb_ro.close()
|
||
try:
|
||
wb.save(f)
|
||
log.info(f" Saved {f.name}")
|
||
except PermissionError:
|
||
log.error(f" Cannot save {f.name} – close it in Excel first.")
|
||
except Exception as exc:
|
||
log.error(f" Save failed for {f.name}: {exc}")
|
||
|
||
log.info(
|
||
f"Done – filled: {total_filled}, "
|
||
f"already populated (skipped): {total_skipped}, "
|
||
f"no match in Silicon Expert: {total_missing}"
|
||
)
|
||
|
||
|
||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
for p in (BOM_DIR, OCTO_DIR):
|
||
if not p.exists():
|
||
log.error(f"Not found: {p}")
|
||
sys.exit(1)
|
||
|
||
if not SEOUT_FILE.exists():
|
||
log.error(f"Silicon Expert export not found: {SEOUT_FILE}")
|
||
sys.exit(1)
|
||
|
||
exact_map, mpn_map = load_seout(SEOUT_FILE)
|
||
fill_boms(BOM_DIR, exact_map, mpn_map)
|