diff --git a/BACKUP/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx b/BACKUP/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx index 7eaff75..c180145 100644 Binary files a/BACKUP/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx and b/BACKUP/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx differ diff --git a/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx b/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx deleted file mode 100644 index 7a7930c..0000000 Binary files a/BoM/Axio4_Nexio_Conduent Subsystem Cost Analysis.xlsx and /dev/null differ diff --git a/inspect_seout.py b/inspect_seout.py new file mode 100644 index 0000000..2296d26 --- /dev/null +++ b/inspect_seout.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +"""Quick diagnostic – prints every sheet name, its headers, and 3 sample rows.""" +from pathlib import Path +import openpyxl + +path = Path("OCTO") / "seout.xlsx" +if not path.exists(): + print(f"NOT FOUND: {path}") + raise SystemExit(1) + +wb = openpyxl.load_workbook(path, data_only=True, read_only=True) +for sheet in wb.sheetnames: + ws = wb[sheet] + rows = list(ws.iter_rows(values_only=True)) + print(f"\n=== Sheet: {sheet!r} ({len(rows)} rows) ===") + # Print first 15 rows so we can see past the metadata + for i, row in enumerate(rows[:15]): + vals = [v for v in row if v is not None] + if vals: + print(f" row {i+1}: {list(row)}") + # Also find any row that looks like a header (contains text with "part" or "mfg" or "price") + print("\n --- Searching for header row ---") + for i, row in enumerate(rows): + row_str = [str(v).lower() for v in row if v is not None] + if any("part" in v or "mfg" in v or "price" in v or "avg" in v for v in row_str): + print(f" row {i+1}: {list(row)}") + break +wb.close() diff --git a/octo_fill.py b/octo_fill.py index 3a4a753..b830e41 100644 --- a/octo_fill.py +++ b/octo_fill.py @@ -1,19 +1,21 @@ #!/usr/bin/env python3 """ -Octo Fill -========= -Reads the Octopart export (OCTO/octo.xlsx) and fills the +Octo Fill – Silicon Expert +========================== +Reads the Silicon Expert export (OCTO/seout.xlsx) and fills the "Unit Cost EUR @1000" column in every component table across every sheet/tab of every BoM file in the BoM/ folder. +Silicon Expert column mapping: + Manufacturer → UPLOADED MFG + MPN → UPLOADED PART + Price → BUDGETARY PRICES column, parses "Min X & Avg Y" → uses Avg (EUR) + Matching strategy: - 1. Exact match on both Original Manufacturer + Original Part (preferred) - 2. Fallback: match on Original Part alone (handles slight manufacturer - name differences between BoM and Octopart) - -Where a part appears more than once in octo.xlsx (multiple distributor -offers), the lowest price is used. + 1. Exact match on both Uploaded Mfg + Uploaded Part (preferred) + 2. Fallback: match on Uploaded Part alone +Where a part appears more than once, the lowest price is used. Cells that already contain a value are left untouched. Usage: @@ -27,6 +29,7 @@ import logging from pathlib import Path from typing import Optional +import re import openpyxl from openpyxl.cell.cell import MergedCell @@ -39,8 +42,9 @@ def _sfp_patched(self, **kw): _SFP.__init__ = _sfp_patched # ────────────────────────────────────────────────────────────────────────────── -BOM_DIR = Path("BoM") -OCTO_DIR = Path("OCTO") +BOM_DIR = Path("BoM") +OCTO_DIR = Path("OCTO") +SEOUT_FILE = OCTO_DIR / "seout.xlsx" COST_HEADER = "Unit Cost EUR @1000" SKIP_MPNS = { @@ -57,12 +61,28 @@ logging.basicConfig( log = logging.getLogger(__name__) -# ── Load Octopart data ───────────────────────────────────────────────────────── +# ── Load Silicon Expert data ─────────────────────────────────────────────────── + +def _find_col(headers: dict[str, int], substring: str) -> Optional[int]: + """Return the index of the first header whose name contains substring (case-insensitive).""" + for name, idx in headers.items(): + if substring.lower() in name.lower(): + return idx + return None + + +def load_seout(path: Path) -> tuple[dict[tuple[str, str], float], dict[str, float]]: + """ + Load seout.xlsx into lookup maps. + exact_map – (mfg_lower, part_lower) → lowest unit price (EUR) + mpn_map – part_lower → lowest unit price (EUR) [fallback] + """ + exact_map: dict[tuple[str, str], float] = {} + mpn_map: dict[str, float] = {} -def _load_single(path: Path, exact_map: dict, mpn_map: dict) -> int: - """Load one Octopart xlsx into the shared maps. Returns number of entries added.""" wb = openpyxl.load_workbook(path, data_only=True, read_only=True) added = 0 + avg_col_name = None for sheet_name in wb.sheetnames: ws = wb[sheet_name] @@ -72,30 +92,54 @@ def _load_single(path: Path, exact_map: dict, mpn_map: dict) -> int: row = list(row) if headers is None: row_lower = [str(v).strip().lower() if v is not None else "" for v in row] - if "original part" in row_lower and "original manufacturer" in row_lower: - headers = {str(row[i]).strip(): i for i in range(len(row)) if row[i] is not None} + has_part = any("uploaded part" in v for v in row_lower) + has_mfg = any("uploaded mfg" in v for v in row_lower) + if has_part and has_mfg: + headers = { + str(row[i]).strip(): i + for i in range(len(row)) + if row[i] is not None + } + log.info(f" Sheet '{sheet_name}' headers: {list(headers.keys())}") + for h in headers: + if "budgetary" in h.lower() or "price" in h.lower(): + avg_col_name = h + break continue if not any(row): continue - mpn_col = _find_col(headers, "original part") - mfr_col = _find_col(headers, "original manufacturer") - price_col = _find_col(headers, "unit price") + mpn_col = _find_col(headers, "uploaded part") + mfr_col = _find_col(headers, "uploaded mfg") + # Silicon Expert stores prices as "Min X & Avg Y" in a BUDGETARY PRICES column + price_col = _find_col(headers, "budgetary") or _find_col(headers, "price") if mpn_col is None or price_col is None: continue - mpn = str(row[mpn_col]).strip() if mpn_col < len(row) and row[mpn_col] is not None else "" - mfr = str(row[mfr_col]).strip() if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None else "" - price_raw = row[price_col] if price_col < len(row) else None + mpn = ( + str(row[mpn_col]).strip() + if mpn_col < len(row) and row[mpn_col] is not None + else "" + ) + mfr = ( + str(row[mfr_col]).strip() + if mfr_col is not None and mfr_col < len(row) and row[mfr_col] is not None + else "" + ) + price_raw = str(row[price_col]).strip() if price_col < len(row) and row[price_col] is not None else "" if not mpn or mpn.lower() in SKIP_MPNS: continue + # Parse "Min 0.818 & Avg 1.3225562077" → extract the Avg value + avg_match = re.search(r'Avg\s+([\d.]+)', price_raw, re.IGNORECASE) + if not avg_match: + continue try: - price = float(price_raw) - except (TypeError, ValueError): + price = float(avg_match.group(1)) + except ValueError: continue if price <= 0: @@ -111,39 +155,19 @@ def _load_single(path: Path, exact_map: dict, mpn_map: dict) -> int: mpn_map[mpn_k] = price wb.close() - return added - - -def load_octo(octo_dir: Path) -> tuple[dict[tuple[str, str], float], dict[str, float]]: - """ - Reads every .xlsx file in octo_dir into shared lookup maps. - exact_map – (manufacturer_lower, mpn_lower) → lowest unit price - mpn_map – mpn_lower → lowest unit price (fallback) - """ - files = sorted(octo_dir.glob("*.xlsx")) - if not files: - log.error(f"No .xlsx files found in {octo_dir}/") - sys.exit(1) - - exact_map: dict[tuple[str, str], float] = {} - mpn_map: dict[str, float] = {} - - for f in files: - added = _load_single(f, exact_map, mpn_map) - log.info(f" {f.name}: {added} entries loaded") - - log.info(f"Octopart total: {len(exact_map)} unique (manufacturer, part) entries") + if not exact_map: + log.warning( + f"No entries loaded from {path.name}. " + "Check that the file has columns containing 'Uploaded Part', 'Uploaded Mfg', " + "Expected a 'BUDGETARY PRICES' column with values like 'Min X & Avg Y'." + ) + log.info( + f"Silicon Expert ({path.name}): {len(exact_map)} unique (mfg, part) entries " + f"— avg price column: '{avg_col_name}'" + ) return exact_map, mpn_map -def _find_col(headers: dict[str, int], prefix: str) -> Optional[int]: - """Case-insensitive prefix match on header names.""" - for name, idx in headers.items(): - if name.lower().startswith(prefix.lower()): - return idx - return None - - # ── BoM table finding ────────────────────────────────────────────────────────── def _cell(value) -> str: @@ -153,8 +177,7 @@ def _cell(value) -> str: def _find_tables(indexed_rows: list[tuple[int, tuple]]): """ Yields TableInfo dicts per component table found. - Handles multiple tables side-by-side on the same row by finding ALL - Manufacturer+MPN column pairs in a header row, not just the first. + Handles multiple tables side-by-side on the same row. Includes 'start_col' so the cost-column search stays within each table. """ i = 0 @@ -162,7 +185,6 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]): row_num, row = indexed_rows[i] row_str = [_cell(v) for v in row] - # All column positions that are "manufacturer" or "mpn" mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"] mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"] @@ -170,7 +192,6 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]): i += 1 continue - # Pair each mfr_col with its nearest unpaired mpn_col pairs: list[tuple[int, int]] = [] used_mpn: set[int] = set() for mfr_col in mfr_cols: @@ -181,7 +202,9 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]): pairs.append((mfr_col, best_mpn)) used_mpn.add(best_mpn) - max_j = i + 1 + max_j = i + 1 + new_header_j = None # earliest row where a same-column header reappeared + for mfr_col, mpn_col in pairs: data: list[tuple[int, str, str]] = [] j = i + 1 @@ -199,10 +222,11 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]): continue empty_streak = 0 - # Detect a new table header anywhere in the row (handles sub-tables - # at different column positions than the current table) - dr_str_lower = [_cell(v).lower() for v in dr] - if "manufacturer" in dr_str_lower and "mpn" in dr_str_lower: + # Same-column header detected — record it but let other pairs + # continue reading past it so their data isn't truncated. + if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": + if new_header_j is None or j < new_header_j: + new_header_j = j break if mpn and mpn.lower() not in SKIP_MPNS: @@ -212,13 +236,16 @@ def _find_tables(indexed_rows: list[tuple[int, tuple]]): max_j = max(max_j, j) yield { "header_row": row_num, - "mfr_col": mfr_col + 1, # 1-based + "mfr_col": mfr_col + 1, "mpn_col": mpn_col + 1, - "start_col": min(mfr_col, mpn_col) + 1, # leftmost col of this table + "start_col": min(mfr_col, mpn_col) + 1, "data": data, } - i = max_j + # Rewind to the earliest sub-table header so the outer loop can pick it + # up, while still allowing wider tables (other columns) to have yielded + # their full data above. + i = new_header_j if new_header_j is not None else max_j # ── Write back to BoM files ──────────────────────────────────────────────────── @@ -240,8 +267,8 @@ def fill_boms( for f in files: log.info(f"Processing {f.name}") try: - # data_only gives us resolved cell values (not formula strings) for - # table/part detection; the writable wb is used for reading/writing prices. + # data_only resolves formula cells (e.g. =UPPER("Mfr")) to their values + # for detection; the writable wb is used for writing prices. wb_ro = openpyxl.load_workbook(f, data_only=True, read_only=True) wb = openpyxl.load_workbook(f) except Exception as exc: @@ -255,10 +282,20 @@ def fill_boms( for i, row in enumerate(wb_ro[sheet_name].iter_rows(values_only=True), start=1) ] + # Reuse the same cost column for all stacked tables at the same + # start_col on this sheet, so a second sub-table doesn't create a + # new column one position to the right. + sheet_cost_cols: dict[int, int] = {} + + KNOWN_COST_HEADERS = { + COST_HEADER.lower(), + "unit cost 1000x data", + } + for table in _find_tables(indexed): header_row = table["header_row"] - data_rows = [r for r, _, _ in table["data"]] - row_range = ( + data_rows = [r for r, _, _ in table["data"]] + row_range = ( f" (Excel rows {data_rows[0]}–{data_rows[-1]})" if data_rows else " (no data rows detected)" ) @@ -267,41 +304,40 @@ def fill_boms( f"table at col {table['start_col']}, {len(table['data'])} parts{row_range}" ) - # Find or create the cost column. - # Accept either of the two known column names (the primary - # COST_HEADER or the name used by the earlier write-back script). - KNOWN_COST_HEADERS = { - COST_HEADER.lower(), - "unit cost 1000x data", - } - cost_col = None - last_used = table["start_col"] - max_col = ws.max_column or 1 - # Search only within this table's column range (from its - # leftmost column rightward) so side-by-side tables don't - # steal each other's cost column. - for c in range(table["start_col"], max_col + 1): - val = ws.cell(header_row, c).value - if val is not None: - val_str = str(val).strip() - # Don't count formula placeholders as "used" columns - if not val_str.startswith("="): - last_used = c - if val_str.lower() in KNOWN_COST_HEADERS: - cost_col = c - break + if table["start_col"] in sheet_cost_cols: + # Stacked table — reuse the cost column found/created by the + # first table at this column position on this sheet. + cost_col = sheet_cost_cols[table["start_col"]] + else: + cost_col = None + last_used = table["start_col"] + max_col = ws.max_column or 1 + for c in range(table["start_col"], max_col + 1): + val = ws.cell(header_row, c).value + if val is not None: + val_str = str(val).strip() + if not val_str.startswith("="): + last_used = c + if val_str.lower() in KNOWN_COST_HEADERS: + cost_col = c + break - if cost_col is None: - cost_col = last_used + 1 - while isinstance(ws.cell(header_row, cost_col), MergedCell): - cost_col += 1 - ws.cell(header_row, cost_col).value = COST_HEADER + if cost_col is None: + cost_col = last_used + 1 + while isinstance(ws.cell(header_row, cost_col), MergedCell): + cost_col += 1 + ws.cell(header_row, cost_col).value = COST_HEADER + sheet_cost_cols[table["start_col"]] = cost_col + + log.info(f" Cost column: {cost_col} ('{ws.cell(header_row, cost_col).value}')") + + tbl_filled = tbl_skipped = tbl_missing = 0 for row_num, mfr, mpn in table["data"]: cell = ws.cell(row_num, cost_col) if isinstance(cell, MergedCell): continue - existing = cell.value + existing = cell.value is_formula = isinstance(existing, str) and existing.startswith("=") is_empty = ( existing is None @@ -309,11 +345,11 @@ def fill_boms( or (isinstance(existing, (int, float)) and existing == 0) ) if not is_empty and not is_formula: - log.info(f" Skip row {row_num} [{mpn}]: cell already has {repr(existing)}") + log.debug(f" Skip row {row_num} [{mpn}]: cell already has {repr(existing)}") total_skipped += 1 + tbl_skipped += 1 continue - # Look up price: exact match first, then MPN-only fallback price = exact_map.get((mfr.lower(), mpn.lower())) if price is None: price = mpn_map.get(mpn.lower()) @@ -321,11 +357,19 @@ def fill_boms( log.debug(f" MPN-only match: {mpn} (mfr '{mfr}' not matched)") if price is not None: - cell.value = price + cell.value = price + cell.number_format = "0.000000" total_filled += 1 + tbl_filled += 1 else: total_missing += 1 - log.info(f" No match in Octopart: [{mfr}] [{mpn}]") + tbl_missing += 1 + log.debug(f" No match: [{mfr}] [{mpn}]") + + log.info( + f" → filled {tbl_filled}, skipped {tbl_skipped}, " + f"no match {tbl_missing}" + ) wb_ro.close() try: @@ -339,7 +383,7 @@ def fill_boms( log.info( f"Done – filled: {total_filled}, " f"already populated (skipped): {total_skipped}, " - f"no match in Octopart: {total_missing}" + f"no match in Silicon Expert: {total_missing}" ) @@ -351,5 +395,9 @@ if __name__ == "__main__": log.error(f"Not found: {p}") sys.exit(1) - exact_map, mpn_map = load_octo(OCTO_DIR) + if not SEOUT_FILE.exists(): + log.error(f"Silicon Expert export not found: {SEOUT_FILE}") + sys.exit(1) + + exact_map, mpn_map = load_seout(SEOUT_FILE) fill_boms(BOM_DIR, exact_map, mpn_map)