diff --git a/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx b/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx index 28a9d19..6de1e08 100644 Binary files a/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx and b/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx differ diff --git a/bom_extract.py b/bom_extract.py index b0a7882..2f5d64e 100644 --- a/bom_extract.py +++ b/bom_extract.py @@ -20,7 +20,8 @@ import openpyxl import pandas as pd BOM_DIR = Path("BoM") -OUTPUT_FILE = Path("bom_parts.xlsx") +OUTPUT_DIR = Path("OUTPUT") +CHUNK_SIZE = 500 SKIP_MPNS = { "", "tbd", "n/a", "na", "-", "--", "---", "?", "none", @@ -41,43 +42,61 @@ def _cell(value) -> str: def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[tuple[str, str]]: - """Return all (manufacturer, mpn) pairs found across every table in the row list.""" + """ + Return all (manufacturer, mpn) pairs found across every table in the row list. + Handles multiple tables side-by-side on the same header row. + """ parts: list[tuple[str, str]] = [] i = 0 while i < len(indexed_rows): _, row = indexed_rows[i] row_str = [_cell(v) for v in row] - mfr_col = next((c for c, v in enumerate(row_str) if v.lower() == "manufacturer"), None) - mpn_col = next((c for c, v in enumerate(row_str) if v.lower() == "mpn"), None) + mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"] + mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"] - if mfr_col is None or mpn_col is None: + if not mfr_cols or not mpn_cols: i += 1 continue - j = i + 1 - empty_streak = 0 - while j < len(indexed_rows): - _, dr = indexed_rows[j] - mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None) - mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None) - - if not mfr and not mpn: - empty_streak += 1 - if empty_streak >= 3: - break - j += 1 - continue - empty_streak = 0 - - if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + # Pair each mfr_col with its nearest unpaired mpn_col + pairs: list[tuple[int, int]] = [] + used_mpn: set[int] = set() + for mfr_col in mfr_cols: + available = [c for c in mpn_cols if c not in used_mpn] + if not available: break + best_mpn = min(available, key=lambda c: abs(c - mfr_col)) + pairs.append((mfr_col, best_mpn)) + used_mpn.add(best_mpn) - if mpn and mpn.lower() not in SKIP_MPNS: - parts.append((mfr, mpn)) - j += 1 + max_j = i + 1 + for mfr_col, mpn_col in pairs: + j = i + 1 + empty_streak = 0 + while j < len(indexed_rows): + _, dr = indexed_rows[j] + mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None) + mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None) - i = j + if not mfr and not mpn: + empty_streak += 1 + if empty_streak >= 3: + break + j += 1 + continue + empty_streak = 0 + + if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + break + + if mpn and mpn.lower() not in SKIP_MPNS: + parts.append((mfr, mpn)) + j += 1 + + max_j = max(max_j, j) + + i = max_j return parts @@ -117,18 +136,29 @@ def extract(bom_dir: Path) -> list[tuple[str, str]]: return parts -def write(parts: list[tuple[str, str]], output: Path) -> None: +def write_chunks(parts: list[tuple[str, str]], output_dir: Path) -> None: + output_dir.mkdir(exist_ok=True) + df = pd.DataFrame(parts, columns=["Manufacturer", "MPN"]) df.sort_values(["Manufacturer", "MPN"], inplace=True, ignore_index=True) - with pd.ExcelWriter(output, engine="openpyxl") as writer: - df.to_excel(writer, index=False, sheet_name="Parts") - ws = writer.sheets["Parts"] - for col in ws.columns: - width = max(len(str(cell.value or "")) for cell in col) - ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60) + total = len(df) + n_files = (total + CHUNK_SIZE - 1) // CHUNK_SIZE - log.info(f"Written → {output} ({len(parts)} unique parts)") + for idx in range(n_files): + chunk = df.iloc[idx * CHUNK_SIZE : (idx + 1) * CHUNK_SIZE] + out = output_dir / f"bom_parts_{idx + 1}_of_{n_files}.xlsx" + + with pd.ExcelWriter(out, engine="openpyxl") as writer: + chunk.to_excel(writer, index=False, sheet_name="Parts") + ws = writer.sheets["Parts"] + for col in ws.columns: + width = max(len(str(cell.value or "")) for cell in col) + ws.column_dimensions[col[0].column_letter].width = min(width + 3, 60) + + log.info(f" Written → {out} ({len(chunk)} parts)") + + log.info(f"Done – {total} unique parts across {n_files} file(s) in {output_dir}/") if __name__ == "__main__": @@ -137,4 +167,4 @@ if __name__ == "__main__": sys.exit(1) parts = extract(BOM_DIR) - write(parts, OUTPUT_FILE) + write_chunks(parts, OUTPUT_DIR) diff --git a/bom_parts.xlsx b/bom_parts.xlsx deleted file mode 100644 index 1002e1c..0000000 Binary files a/bom_parts.xlsx and /dev/null differ diff --git a/bom_price_checker.py b/bom_price_checker.py index 7991143..f2962fe 100644 --- a/bom_price_checker.py +++ b/bom_price_checker.py @@ -168,6 +168,7 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[TableLocation]: """ Scan a list of (1-based-row-num, row-values-tuple) pairs for sub-tables that have both a 'Manufacturer' and 'MPN' header column. + Handles multiple tables side-by-side on the same header row. """ tables: list[TableLocation] = [] i = 0 @@ -175,46 +176,59 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[TableLocation]: row_num, row = indexed_rows[i] row_str = [_cell(v) for v in row] - mfr_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "manufacturer"), None) - mpn_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "mpn"), None) + mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"] + mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"] - if mfr_col_0 is None or mpn_col_0 is None: + if not mfr_cols or not mpn_cols: i += 1 continue - # Found a header row – consume data rows below it - data: list[tuple[int, str, str]] = [] - j = i + 1 - empty_streak = 0 - while j < len(indexed_rows): - dr_num, dr = indexed_rows[j] - mfr = _cell(dr[mfr_col_0] if mfr_col_0 < len(dr) else None) - mpn = _cell(dr[mpn_col_0] if mpn_col_0 < len(dr) else None) - - if not mfr and not mpn: - empty_streak += 1 - if empty_streak >= 3: - break - j += 1 - continue - empty_streak = 0 - - # Another header row signals the end of this table - if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + # Pair each mfr_col with its nearest unpaired mpn_col + pairs: list[tuple[int, int]] = [] + used_mpn: set[int] = set() + for mfr_col_0 in mfr_cols: + available = [c for c in mpn_cols if c not in used_mpn] + if not available: break + best_mpn = min(available, key=lambda c: abs(c - mfr_col_0)) + pairs.append((mfr_col_0, best_mpn)) + used_mpn.add(best_mpn) - if mpn and not _skip(mpn): - data.append((dr_num, mfr, mpn)) - j += 1 + max_j = i + 1 + for mfr_col_0, mpn_col_0 in pairs: + data: list[tuple[int, str, str]] = [] + j = i + 1 + empty_streak = 0 + while j < len(indexed_rows): + dr_num, dr = indexed_rows[j] + mfr = _cell(dr[mfr_col_0] if mfr_col_0 < len(dr) else None) + mpn = _cell(dr[mpn_col_0] if mpn_col_0 < len(dr) else None) - tables.append(TableLocation( - sheet_name="", # filled by caller - header_row=row_num, - mfr_col=mfr_col_0 + 1, # convert to 1-based - mpn_col=mpn_col_0 + 1, - data=data, - )) - i = j # jump past the table we just consumed + if not mfr and not mpn: + empty_streak += 1 + if empty_streak >= 3: + break + j += 1 + continue + empty_streak = 0 + + if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + break + + if mpn and not _skip(mpn): + data.append((dr_num, mfr, mpn)) + j += 1 + + max_j = max(max_j, j) + tables.append(TableLocation( + sheet_name="", + header_row=row_num, + mfr_col=mfr_col_0 + 1, # convert to 1-based + mpn_col=mpn_col_0 + 1, + data=data, + )) + + i = max_j return tables diff --git a/bom_prices.xlsx b/bom_prices.xlsx deleted file mode 100644 index 41e9e46..0000000 Binary files a/bom_prices.xlsx and /dev/null differ diff --git a/octo_fill.py b/octo_fill.py new file mode 100644 index 0000000..5f57426 --- /dev/null +++ b/octo_fill.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 +""" +Octo Fill +========= +Reads the Octopart export (OCTO/octo.xlsx) and fills the +"Unit Cost EUR @1000" column in every component table across every +sheet/tab of every BoM file in the BoM/ folder. + +Matching strategy: + 1. Exact match on both Original Manufacturer + Original Part (preferred) + 2. Fallback: match on Original Part alone (handles slight manufacturer + name differences between BoM and Octopart) + +Where a part appears more than once in octo.xlsx (multiple distributor +offers), the lowest price is used. + +Cells that already contain a value are left untouched. + +Usage: + python octo_fill.py +""" + +from __future__ import annotations + +import sys +import logging +from pathlib import Path +from typing import Optional + +import openpyxl +from openpyxl.cell.cell import MergedCell + +# ── Patch openpyxl for newer Excel attribute it doesn't know about ───────────── +from openpyxl.worksheet.dimensions import SheetFormatProperties as _SFP +_sfp_orig = _SFP.__init__ +def _sfp_patched(self, **kw): + kw.pop("defaultColWidthPt", None) + _sfp_orig(self, **kw) +_SFP.__init__ = _sfp_patched +# ────────────────────────────────────────────────────────────────────────────── + +BOM_DIR = Path("BoM") +OCTO_FILE = Path("OCTO/octo.xlsx") +COST_HEADER = "Unit Cost EUR @1000" + +SKIP_MPNS = { + "", "0", "tbd", "n/a", "na", "-", "--", "---", "?", "none", + "null", "nan", "xxx", "x", "dnf", "dnp", "do not fit", + "do not populate", +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger(__name__) + + +# ── Load Octopart data ───────────────────────────────────────────────────────── + +def load_octo(path: Path) -> tuple[dict[tuple[str,str], float], dict[str, float]]: + """ + Returns: + exact_map – (manufacturer_lower, mpn_lower) → lowest unit price + mpn_map – mpn_lower → lowest unit price (fallback) + """ + log.info(f"Reading Octopart data from {path}") + wb = openpyxl.load_workbook(path, data_only=True, read_only=True) + + exact_map: dict[tuple[str, str], float] = {} + mpn_map: dict[str, float] = {} + + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + headers: Optional[dict[str, int]] = None # col_name → 0-based index + + for row in ws.iter_rows(values_only=True): + row = list(row) + if headers is None: + # Find header row + row_lower = [str(v).strip().lower() if v is not None else "" for v in row] + if "original part" in row_lower and "original manufacturer" in row_lower: + headers = {str(row[i]).strip(): i for i in range(len(row)) if row[i] is not None} + continue + + if not any(row): + continue + + mpn_col = _find_col(headers, "original part") + mfr_col = _find_col(headers, "original manufacturer") + price_col = _find_col(headers, "unit price") + + if mpn_col is None or price_col is None: + continue + + mpn = str(row[mpn_col]).strip() if mpn_col < len(row) and row[mpn_col] is not None else "" + mfr = str(row[mfr_col]).strip() if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None else "" + price_raw = row[price_col] if price_col < len(row) else None + + if not mpn or mpn.lower() in SKIP_MPNS: + continue + + try: + price = float(price_raw) + except (TypeError, ValueError): + continue + + if price <= 0: + continue + + key = (mfr.lower(), mpn.lower()) + if key not in exact_map or price < exact_map[key]: + exact_map[key] = price + + mpn_k = mpn.lower() + if mpn_k not in mpn_map or price < mpn_map[mpn_k]: + mpn_map[mpn_k] = price + + wb.close() + + log.info(f" Loaded {len(exact_map)} unique (manufacturer, part) entries from Octopart") + return exact_map, mpn_map + + +def _find_col(headers: dict[str, int], prefix: str) -> Optional[int]: + """Case-insensitive prefix match on header names.""" + for name, idx in headers.items(): + if name.lower().startswith(prefix.lower()): + return idx + return None + + +# ── BoM table finding ────────────────────────────────────────────────────────── + +def _cell(value) -> str: + return str(value).strip() if value is not None else "" + + +def _find_tables(indexed_rows: list[tuple[int, tuple]]): + """ + Yields TableInfo dicts per component table found. + Handles multiple tables side-by-side on the same row by finding ALL + Manufacturer+MPN column pairs in a header row, not just the first. + Includes 'start_col' so the cost-column search stays within each table. + """ + i = 0 + while i < len(indexed_rows): + row_num, row = indexed_rows[i] + row_str = [_cell(v) for v in row] + + # All column positions that are "manufacturer" or "mpn" + mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"] + mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"] + + if not mfr_cols or not mpn_cols: + i += 1 + continue + + # Pair each mfr_col with its nearest unpaired mpn_col + pairs: list[tuple[int, int]] = [] + used_mpn: set[int] = set() + for mfr_col in mfr_cols: + available = [c for c in mpn_cols if c not in used_mpn] + if not available: + break + best_mpn = min(available, key=lambda c: abs(c - mfr_col)) + pairs.append((mfr_col, best_mpn)) + used_mpn.add(best_mpn) + + max_j = i + 1 + for mfr_col, mpn_col in pairs: + data: list[tuple[int, str, str]] = [] + j = i + 1 + empty_streak = 0 + while j < len(indexed_rows): + dr_num, dr = indexed_rows[j] + mfr = _cell(dr[mfr_col] if mfr_col < len(dr) else None) + mpn = _cell(dr[mpn_col] if mpn_col < len(dr) else None) + + if not mfr and not mpn: + empty_streak += 1 + if empty_streak >= 3: + break + j += 1 + continue + empty_streak = 0 + + if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + break + + if mpn and mpn.lower() not in SKIP_MPNS: + data.append((dr_num, mfr, mpn)) + j += 1 + + max_j = max(max_j, j) + yield { + "header_row": row_num, + "mfr_col": mfr_col + 1, # 1-based + "mpn_col": mpn_col + 1, + "start_col": min(mfr_col, mpn_col) + 1, # leftmost col of this table + "data": data, + } + + i = max_j + + +# ── Write back to BoM files ──────────────────────────────────────────────────── + +def fill_boms( + bom_dir: Path, + exact_map: dict[tuple[str, str], float], + mpn_map: dict[str, float], +) -> None: + files = sorted(f for f in bom_dir.iterdir() if f.suffix.lower() in {".xlsx", ".xlsm"}) + if not files: + log.error(f"No .xlsx/.xlsm files found in {bom_dir}/") + sys.exit(1) + + total_filled = 0 + total_skipped = 0 + total_missing = 0 + + for f in files: + log.info(f"Processing {f.name}") + try: + wb = openpyxl.load_workbook(f) + except Exception as exc: + log.error(f" Cannot open {f.name}: {exc}") + continue + + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + indexed = [ + (i, tuple(row)) + for i, row in enumerate(ws.iter_rows(values_only=True), start=1) + ] + + for table in _find_tables(indexed): + header_row = table["header_row"] + log.info( + f" Sheet '{sheet_name}' row {header_row}: " + f"table at col {table['start_col']}, {len(table['data'])} parts" + ) + + # Find or create the cost column. + # Accept either of the two known column names (the primary + # COST_HEADER or the name used by the earlier write-back script). + KNOWN_COST_HEADERS = { + COST_HEADER.lower(), + "unit cost 1000x data", + } + cost_col = None + last_used = table["start_col"] + max_col = ws.max_column or 1 + # Search only within this table's column range (from its + # leftmost column rightward) so side-by-side tables don't + # steal each other's cost column. + for c in range(table["start_col"], max_col + 1): + val = ws.cell(header_row, c).value + if val is not None: + last_used = c + if str(val).strip().lower() in KNOWN_COST_HEADERS: + cost_col = c + break + + if cost_col is None: + cost_col = last_used + 1 + while isinstance(ws.cell(header_row, cost_col), MergedCell): + cost_col += 1 + ws.cell(header_row, cost_col).value = COST_HEADER + + for row_num, mfr, mpn in table["data"]: + cell = ws.cell(row_num, cost_col) + if isinstance(cell, MergedCell): + continue + existing = cell.value + if existing is not None and str(existing).strip() not in ("", "0") and existing != 0: + total_skipped += 1 + continue + + # Look up price: exact match first, then MPN-only fallback + price = exact_map.get((mfr.lower(), mpn.lower())) + if price is None: + price = mpn_map.get(mpn.lower()) + if price is not None: + log.debug(f" MPN-only match: {mpn} (mfr '{mfr}' not matched)") + + if price is not None: + cell.value = price + total_filled += 1 + else: + total_missing += 1 + log.info(f" No match in Octopart: [{mfr}] [{mpn}]") + + try: + wb.save(f) + log.info(f" Saved {f.name}") + except PermissionError: + log.error(f" Cannot save {f.name} – close it in Excel first.") + except Exception as exc: + log.error(f" Save failed for {f.name}: {exc}") + + log.info( + f"Done – filled: {total_filled}, " + f"already populated (skipped): {total_skipped}, " + f"no match in Octopart: {total_missing}" + ) + + +# ── Main ─────────────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + for p in (BOM_DIR, OCTO_FILE): + if not p.exists(): + log.error(f"Not found: {p}") + sys.exit(1) + + exact_map, mpn_map = load_octo(OCTO_FILE) + fill_boms(BOM_DIR, exact_map, mpn_map)