#!/usr/bin/env python3 """ BoM Price Checker ================= Reads every .xlsx / .xlsm file from the BoM/ folder. Each file may have many sheets, and each sheet may have many tables. A "table" is any block of rows that begins with a header row containing both a 'Manufacturer' column and an 'MPN' column. For every unique (Manufacturer, MPN) pair found, the script queries: - DigiKey (v4 API – developer.digikey.com) - Mouser (v1 API – mouser.com/api-hub) - Farnell (Element14 API – partner.element14.com) - RS Components (v2 API – developers.rs-online.com) All prices are converted to EUR at quantity 1000 using live FX rates from Frankfurter (api.frankfurter.app – free, no key required). Results are written back into the original BoM files as a new column "Unit Cost 1000x Data" placed at the end of each component table. Cells that already contain a value are left untouched. Cached results are stored in price_cache.json and reused for 7 days. Quick start ----------- pip install -r requirements.txt cp .env.example .env # then add your API keys python bom_price_checker.py """ from __future__ import annotations import os import sys import json import time import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Optional import requests import openpyxl import pandas as pd from dotenv import load_dotenv load_dotenv() # ── Configuration ────────────────────────────────────────────────────────────── BOM_DIR = Path("BoM") CACHE_FILE = Path("price_cache.json") QUANTITY = 1000 CACHE_TTL_DAYS = 7 REQUEST_DELAY = 0.35 # seconds between successive API calls COST_HEADER = "Unit Cost 1000x Data" # MPN cell values that mean "no part fitted / TBD" – skip these SKIP_MPNS = { "", "tbd", "n/a", "na", "-", "--", "---", "?", "none", "null", "nan", "xxx", "x", "dnf", "dnp", "do not fit", "do not populate", } # ── Logging ──────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) # ── Currency conversion ──────────────────────────────────────────────────────── _fx_cache: dict[str, float] = {} def eur_rate(currency: str) -> float: """Return how many EUR equal 1 unit of *currency* (Frankfurter API, no key needed).""" currency = currency.upper().strip() if currency == "EUR": return 1.0 if currency in _fx_cache: return _fx_cache[currency] try: r = requests.get( "https://api.frankfurter.app/latest", params={"from": currency, "to": "EUR"}, timeout=10, ) r.raise_for_status() rate: float = r.json()["rates"]["EUR"] _fx_cache[currency] = rate log.debug(f"FX 1 {currency} = {rate:.5f} EUR") return rate except Exception as exc: log.warning(f"FX lookup failed ({currency}→EUR): {exc} – defaulting to 1.0") return 1.0 def to_eur(amount: float, currency: str) -> float: return round(amount * eur_rate(currency), 5) # ── Price cache ──────────────────────────────────────────────────────────────── class PriceCache: def __init__(self, path: Path) -> None: self.path = path self._data: dict[str, dict] = {} self._load() def _load(self) -> None: if self.path.exists(): try: self._data = json.loads(self.path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): self._data = {} def save(self) -> None: self.path.write_text(json.dumps(self._data, indent=2), encoding="utf-8") @staticmethod def _key(manufacturer: str, mpn: str) -> str: return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}" def get(self, manufacturer: str, mpn: str) -> Optional[dict]: entry = self._data.get(self._key(manufacturer, mpn)) if not entry: return None try: age = datetime.now() - datetime.fromisoformat(entry["cached_at"]) except (KeyError, ValueError): return None return entry if age <= timedelta(days=CACHE_TTL_DAYS) else None def put(self, manufacturer: str, mpn: str, prices: dict) -> None: prices["cached_at"] = datetime.now().isoformat() self._data[self._key(manufacturer, mpn)] = prices self.save() # ── BoM parsing ──────────────────────────────────────────────────────────────── def _cell(value) -> str: return str(value).strip() if value is not None else "" def _skip(mpn: str) -> bool: return mpn.lower() in SKIP_MPNS @dataclass class TableLocation: """Records exactly where a component table lives inside a worksheet.""" sheet_name: str header_row: int # 1-based openpyxl row mfr_col: int # 1-based openpyxl column mpn_col: int # 1-based openpyxl column data: list[tuple[int, str, str]] = field(default_factory=list) # data entries: (1-based row number, manufacturer string, mpn string) def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[TableLocation]: """ Scan a list of (1-based-row-num, row-values-tuple) pairs for sub-tables that have both a 'Manufacturer' and 'MPN' header column. """ tables: list[TableLocation] = [] i = 0 while i < len(indexed_rows): row_num, row = indexed_rows[i] row_str = [_cell(v) for v in row] mfr_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "manufacturer"), None) mpn_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "mpn"), None) if mfr_col_0 is None or mpn_col_0 is None: i += 1 continue # Found a header row – consume data rows below it data: list[tuple[int, str, str]] = [] j = i + 1 empty_streak = 0 while j < len(indexed_rows): dr_num, dr = indexed_rows[j] mfr = _cell(dr[mfr_col_0] if mfr_col_0 < len(dr) else None) mpn = _cell(dr[mpn_col_0] if mpn_col_0 < len(dr) else None) if not mfr and not mpn: empty_streak += 1 if empty_streak >= 3: break j += 1 continue empty_streak = 0 # Another header row signals the end of this table if mfr.lower() == "manufacturer" and mpn.lower() == "mpn": break if mpn and not _skip(mpn): data.append((dr_num, mfr, mpn)) j += 1 tables.append(TableLocation( sheet_name="", # filled by caller header_row=row_num, mfr_col=mfr_col_0 + 1, # convert to 1-based mpn_col=mpn_col_0 + 1, data=data, )) i = j # jump past the table we just consumed return tables def extract_parts( bom_dir: Path, ) -> tuple[dict[Path, list[TableLocation]], set[tuple[str, str]]]: """ Returns: file_map – {file_path: [TableLocation, ...]} for write-back parts – set of unique (manufacturer, mpn) pairs for API lookup """ file_map: dict[Path, list[TableLocation]] = {} parts: set[tuple[str, str]] = set() all_files = sorted(bom_dir.iterdir()) xlsx_files = [f for f in all_files if f.suffix.lower() in {".xlsx", ".xlsm"}] other_files = [f for f in all_files if f.suffix.lower() in {".xls", ".csv"}] for f in other_files: log.warning( f"Skipping {f.name} " f"({'read-only; price write-back requires .xlsx' if f.suffix.lower() == '.xls' else 'CSV – convert to .xlsx for write-back'})" ) if not xlsx_files: log.warning(f"No .xlsx/.xlsm files found in {bom_dir}/") return file_map, parts for f in xlsx_files: log.info(f"Reading {f.name}") tables_in_file: list[TableLocation] = [] try: wb = openpyxl.load_workbook(f, data_only=True, read_only=True) for sheet_name in wb.sheetnames: ws = wb[sheet_name] indexed = [ (row_num, tuple(row)) for row_num, row in enumerate(ws.iter_rows(values_only=True), start=1) ] tables = _find_tables(indexed) for t in tables: t.sheet_name = sheet_name if tables: total_parts = sum(len(t.data) for t in tables) log.info(f" Sheet '{sheet_name}': {len(tables)} table(s), {total_parts} part rows") for t in tables: for _, mfr, mpn in t.data: parts.add((mfr, mpn)) tables_in_file.extend(tables) wb.close() except Exception as exc: log.error(f" Failed to read {f.name}: {exc}") continue if tables_in_file: file_map[f] = tables_in_file log.info(f"Unique (Manufacturer, MPN) pairs: {len(parts)}") return file_map, parts # ── Distributor API clients ──────────────────────────────────────────────────── class DigiKeyClient: """DigiKey API v4. Register at developer.digikey.com for a Client ID + Secret.""" _TOKEN_URL = "https://api.digikey.com/v1/oauth2/token" _SEARCH_URL = "https://api.digikey.com/products/v4/search/keyword" def __init__(self) -> None: self.client_id = os.getenv("DIGIKEY_CLIENT_ID", "") self.client_secret = os.getenv("DIGIKEY_CLIENT_SECRET", "") self._access_token: Optional[str] = None self._token_expiry = datetime.min @property def configured(self) -> bool: return bool(self.client_id and self.client_secret) def _get_token(self) -> Optional[str]: if not self.configured: return None if self._access_token and datetime.now() < self._token_expiry: return self._access_token try: r = requests.post( self._TOKEN_URL, data={ "client_id": self.client_id, "client_secret": self.client_secret, "grant_type": "client_credentials", }, timeout=15, ) r.raise_for_status() data = r.json() self._access_token = data["access_token"] self._token_expiry = datetime.now() + timedelta( seconds=data.get("expires_in", 1800) - 60 ) return self._access_token except Exception as exc: log.debug(f"DigiKey token error: {exc}") return None def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]: token = self._get_token() if not token: return None try: r = requests.post( self._SEARCH_URL, headers={ "X-DIGIKEY-Client-Id": self.client_id, "Authorization": f"Bearer {token}", "X-DIGIKEY-Locale-Site": "FR", "X-DIGIKEY-Locale-Language": "fr", "X-DIGIKEY-Locale-Currency": "EUR", "Content-Type": "application/json", }, json={"keywords": mpn, "limit": 10, "offset": 0}, timeout=15, ) r.raise_for_status() data = r.json() candidates = ( data.get("exactManufacturerProducts") or data.get("exactManufacturerProductCount", []) or [] ) if not candidates: candidates = data.get("products", []) for product in candidates: if product.get("manufacturerProductNumber", "").lower() == mpn.lower(): p = self._price_at_qty(product) if p is not None: return p if candidates: return self._price_at_qty(candidates[0]) except Exception as exc: log.debug(f"DigiKey error [{mpn}]: {exc}") return None @staticmethod def _price_at_qty(product: dict) -> Optional[float]: breaks = product.get("standardPricing") or product.get("priceBreaks") or [] best: Optional[float] = None for pb in breaks: try: qty = int(pb.get("breakQuantity") or pb.get("quantity") or 0) price = float(pb.get("unitPrice") or pb.get("price") or 0) if qty <= QUANTITY and price > 0: best = price # already EUR via locale header except (ValueError, TypeError): continue return best class MouserClient: """Mouser Electronics API v1. Register at mouser.com/api-hub.""" _URL = "https://api.mouser.com/api/v1/search/partnumber" def __init__(self) -> None: self.api_key = os.getenv("MOUSER_API_KEY", "") @property def configured(self) -> bool: return bool(self.api_key) def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]: if not self.configured: return None try: r = requests.post( f"{self._URL}?apiKey={self.api_key}¤cy=EUR", json={"SearchByPartRequest": {"mouserPartNumber": mpn, "partSearchOptions": ""}}, timeout=15, ) r.raise_for_status() part_list = r.json().get("SearchResults", {}).get("Parts", []) for part in part_list: if part.get("ManufacturerPartNumber", "").lower() == mpn.lower(): p = self._price_at_qty(part) if p is not None: return p if part_list: return self._price_at_qty(part_list[0]) except Exception as exc: log.debug(f"Mouser error [{mpn}]: {exc}") return None @staticmethod def _price_at_qty(part: dict) -> Optional[float]: currency = part.get("PricingCurrencyCode", "EUR") best: Optional[float] = None for pb in part.get("PriceBreaks", []): try: qty = int(pb.get("Quantity", 0)) raw = str(pb.get("Price", "")).strip() for sym in ("€", "$", "£", "¥", ","): raw = raw.replace(sym, "") price = float(raw) if qty <= QUANTITY and price > 0: best = to_eur(price, currency) except (ValueError, TypeError): continue return best class FarnellClient: """Farnell / Element14 API. Register at partner.element14.com.""" _URL = "https://api.element14.com/catalog/products" def __init__(self) -> None: self.api_key = os.getenv("FARNELL_API_KEY", "") # Change to e.g. "de.farnell.com" for German pricing self.store = os.getenv("FARNELL_STORE", "fr.farnell.com") @property def configured(self) -> bool: return bool(self.api_key) def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]: if not self.configured: return None try: r = requests.get( self._URL, params={ "term": f"mfrPartNum:{mpn}", "storeInfo.id": self.store, "resultsSettings.offset": 0, "resultsSettings.numberOfResults": 5, "resultsSettings.responseGroup": "prices", "callInfo.omitXmlSchema": "false", "callInfo.responseDataFormat": "json", "callInfo.apiKey": self.api_key, }, timeout=15, ) r.raise_for_status() products = r.json().get("keywordSearchReturn", {}).get("products", []) for product in products: pn = product.get("translatedManufacturerPartNumber") or product.get("id", "") if str(pn).lower() == mpn.lower(): p = self._price_at_qty(product) if p is not None: return p if products: return self._price_at_qty(products[0]) except Exception as exc: log.debug(f"Farnell error [{mpn}]: {exc}") return None @staticmethod def _price_at_qty(product: dict) -> Optional[float]: currency = product.get("currency", "EUR") best: Optional[float] = None for pb in product.get("prices", []): try: qty = int(pb.get("from", 0)) price = float(pb.get("cost", 0)) if qty <= QUANTITY and price > 0: best = to_eur(price, currency) except (ValueError, TypeError): continue return best class RSClient: """RS Components API v2. Register at developers.rs-online.com.""" _URL = "https://api.rs-online.com/searchProducts/v2/search" def __init__(self) -> None: self.api_key = os.getenv("RS_API_KEY", "") @property def configured(self) -> bool: return bool(self.api_key) def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]: if not self.configured: return None try: r = requests.get( self._URL, params={ "term": mpn, "fields": "prices,partNumber,manufacturer,manufacturerPartNumber", "size": 5, }, headers={"apiKey": self.api_key}, timeout=15, ) r.raise_for_status() products = r.json().get("products", []) for product in products: pn = product.get("manufacturerPartNumber", "") if str(pn).lower() == mpn.lower(): p = self._price_at_qty(product) if p is not None: return p if products: return self._price_at_qty(products[0]) except Exception as exc: log.debug(f"RS error [{mpn}]: {exc}") return None @staticmethod def _price_at_qty(product: dict) -> Optional[float]: currency = product.get("priceCurrency", "EUR") best: Optional[float] = None for pb in product.get("prices", []): try: qty = int(pb.get("from") or pb.get("quantity") or 0) price = float(pb.get("price") or pb.get("unitPrice") or 0) if qty <= QUANTITY and price > 0: best = to_eur(price, currency) except (ValueError, TypeError): continue return best # ── Result dataclass ─────────────────────────────────────────────────────────── @dataclass class PartResult: manufacturer: str mpn: str digikey_eur: Optional[float] = None mouser_eur: Optional[float] = None farnell_eur: Optional[float] = None rs_eur: Optional[float] = None @property def average_eur(self) -> Optional[float]: vals = [v for v in (self.digikey_eur, self.mouser_eur, self.farnell_eur, self.rs_eur) if v is not None] return round(sum(vals) / len(vals), 5) if vals else None def sources_found(self) -> int: return sum(v is not None for v in (self.digikey_eur, self.mouser_eur, self.farnell_eur, self.rs_eur)) # ── API lookup with caching ──────────────────────────────────────────────────── def lookup( manufacturer: str, mpn: str, cache: PriceCache, dk: DigiKeyClient, mu: MouserClient, fa: FarnellClient, rs: RSClient, ) -> PartResult: cached = cache.get(manufacturer, mpn) if cached: log.debug(f" cache hit: {mpn}") return PartResult( manufacturer=manufacturer, mpn=mpn, digikey_eur=cached.get("digikey_eur"), mouser_eur=cached.get("mouser_eur"), farnell_eur=cached.get("farnell_eur"), rs_eur=cached.get("rs_eur"), ) result = PartResult(manufacturer=manufacturer, mpn=mpn) result.digikey_eur = dk.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY) result.mouser_eur = mu.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY) result.farnell_eur = fa.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY) result.rs_eur = rs.get_price_eur(manufacturer, mpn) cache.put(manufacturer, mpn, { "digikey_eur": result.digikey_eur, "mouser_eur": result.mouser_eur, "farnell_eur": result.farnell_eur, "rs_eur": result.rs_eur, }) return result # ── Write-back ───────────────────────────────────────────────────────────────── def _part_key(manufacturer: str, mpn: str) -> str: return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}" def write_back( file_map: dict[Path, list[TableLocation]], price_lookup: dict[str, Optional[float]], ) -> None: """ Write the average EUR price at qty 1000 back into each BoM file. For every component table: - Adds a "Unit Cost 1000x Data" column header after the last existing header in the table's header row (if the column doesn't exist already). - Writes the price into each data row where the cell is currently empty. Cells that already contain a value are left unchanged. """ for file_path, tables in file_map.items(): log.info(f"Updating {file_path.name}") try: wb = openpyxl.load_workbook(file_path) except Exception as exc: log.error(f" Cannot open {file_path.name} for writing: {exc}") continue for table in tables: ws = wb[table.sheet_name] # ── Find or create the "Unit Cost 1000x Data" column ────────────── cost_col: Optional[int] = None last_used_col = 1 max_col = ws.max_column or 1 for c in range(1, max_col + 1): val = ws.cell(table.header_row, c).value if val is not None: last_used_col = c if str(val).strip().lower() == COST_HEADER.lower(): cost_col = c if cost_col is None: cost_col = last_used_col + 1 ws.cell(table.header_row, cost_col).value = COST_HEADER # ── Fill prices for empty cells ─────────────────────────────────── filled = 0 skipped = 0 for row_num, mfr, mpn in table.data: cell = ws.cell(row_num, cost_col) existing = cell.value if existing is not None and str(existing).strip() != "": skipped += 1 continue price = price_lookup.get(_part_key(mfr, mpn)) if price is not None: cell.value = price filled += 1 log.info( f" Sheet '{table.sheet_name}' (header row {table.header_row}): " f"{filled} filled, {skipped} skipped (already had a value)" ) try: wb.save(file_path) log.info(f" Saved {file_path.name}") except PermissionError: log.error( f" Cannot save {file_path.name} – close the file in Excel first." ) except Exception as exc: log.error(f" Save failed for {file_path.name}: {exc}") # ── Main ─────────────────────────────────────────────────────────────────────── def main() -> None: if not BOM_DIR.exists(): log.error(f"BoM directory '{BOM_DIR}' does not exist.") sys.exit(1) dk = DigiKeyClient() mu = MouserClient() fa = FarnellClient() rs = RSClient() active = [n for n, c in [("DigiKey", dk), ("Mouser", mu), ("Farnell", fa), ("RS", rs)] if c.configured] if active: log.info(f"Configured APIs: {', '.join(active)}") else: log.warning("No API keys found. Copy .env.example → .env and fill in your keys.") file_map, parts = extract_parts(BOM_DIR) if not parts: log.error("No valid (Manufacturer, MPN) pairs found in BoM files.") sys.exit(1) cache = PriceCache(CACHE_FILE) price_lookup: dict[str, Optional[float]] = {} total = len(parts) for i, (mfr, mpn) in enumerate(sorted(parts), 1): log.info(f"[{i:>4}/{total}] {mfr or '(no mfr)':35s} {mpn}") r = lookup(mfr, mpn, cache, dk, mu, fa, rs) tag = f"€{r.average_eur:.4f}" if r.average_eur is not None else "—" log.info(f" avg {tag} ({r.sources_found()}/4 sources)") price_lookup[_part_key(mfr, mpn)] = r.average_eur write_back(file_map, price_lookup) found = sum(1 for v in price_lookup.values() if v is not None) missing = [mpn for (_, mpn), v in ((k.split("||", 1), v) for k, v in price_lookup.items()) if v is None] log.info(f"Done – pricing found for {found}/{total} unique parts.") if missing: sample = ", ".join(missing[:15]) tail = f" … (+{len(missing) - 15} more)" if len(missing) > 15 else "" log.info(f"No pricing found for: {sample}{tail}") if __name__ == "__main__": main()