Files
BoMtoCost/bom_price_checker.py
David Rice 70b2b6acc3 Updates
2026-04-30 12:39:48 +01:00

716 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
BoM Price Checker
=================
Reads every .xlsx / .xlsm file from the BoM/ folder.
Each file may have many sheets, and each sheet may have many tables.
A "table" is any block of rows that begins with a header row containing
both a 'Manufacturer' column and an 'MPN' column.
For every unique (Manufacturer, MPN) pair found, the script queries:
- DigiKey (v4 API developer.digikey.com)
- Mouser (v1 API mouser.com/api-hub)
- Farnell (Element14 API partner.element14.com)
All prices are converted to EUR at quantity 1000 using live FX rates
from Frankfurter (api.frankfurter.app free, no key required).
Results are written back into the original BoM files as a new column
"Unit Cost 1000x Data" placed at the end of each component table.
Cells that already contain a value are left untouched.
Cached results are stored in price_cache.json and reused for 7 days.
Quick start
-----------
pip install -r requirements.txt
cp .env.example .env # then add your API keys
python bom_price_checker.py
"""
from __future__ import annotations
import os
import sys
import json
import time
import logging
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import requests
import openpyxl
from openpyxl.cell.cell import MergedCell
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
# ── Configuration ──────────────────────────────────────────────────────────────
BOM_DIR = Path("BoM")
CACHE_FILE = Path("price_cache.json")
QUANTITY = 1000
CACHE_TTL_DAYS = 7
REQUEST_DELAY = 0.35 # seconds between successive API calls
COST_HEADER = "Unit Cost 1000x Data"
# MPN cell values that mean "no part fitted / TBD" skip these
SKIP_MPNS = {
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
"do not populate",
}
# ── Logging ────────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ── Currency conversion ────────────────────────────────────────────────────────
_fx_cache: dict[str, float] = {}
def eur_rate(currency: str) -> float:
"""Return how many EUR equal 1 unit of *currency* (Frankfurter API, no key needed)."""
currency = currency.upper().strip()
if currency == "EUR":
return 1.0
if currency in _fx_cache:
return _fx_cache[currency]
try:
r = requests.get(
"https://api.frankfurter.app/latest",
params={"from": currency, "to": "EUR"},
timeout=10,
)
r.raise_for_status()
rate: float = r.json()["rates"]["EUR"]
_fx_cache[currency] = rate
log.debug(f"FX 1 {currency} = {rate:.5f} EUR")
return rate
except Exception as exc:
log.warning(f"FX lookup failed ({currency}→EUR): {exc} defaulting to 1.0")
return 1.0
def to_eur(amount: float, currency: str) -> float:
return round(amount * eur_rate(currency), 5)
# ── Price cache ────────────────────────────────────────────────────────────────
class PriceCache:
def __init__(self, path: Path) -> None:
self.path = path
self._data: dict[str, dict] = {}
self._load()
def _load(self) -> None:
if self.path.exists():
try:
self._data = json.loads(self.path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
self._data = {}
def save(self) -> None:
self.path.write_text(json.dumps(self._data, indent=2), encoding="utf-8")
@staticmethod
def _key(manufacturer: str, mpn: str) -> str:
return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}"
def get(self, manufacturer: str, mpn: str) -> Optional[dict]:
entry = self._data.get(self._key(manufacturer, mpn))
if not entry:
return None
try:
age = datetime.now() - datetime.fromisoformat(entry["cached_at"])
except (KeyError, ValueError):
return None
return entry if age <= timedelta(days=CACHE_TTL_DAYS) else None
def put(self, manufacturer: str, mpn: str, prices: dict) -> None:
prices["cached_at"] = datetime.now().isoformat()
self._data[self._key(manufacturer, mpn)] = prices
self.save()
# ── BoM parsing ────────────────────────────────────────────────────────────────
def _cell(value) -> str:
return str(value).strip() if value is not None else ""
def _skip(mpn: str) -> bool:
return mpn.lower() in SKIP_MPNS
@dataclass
class TableLocation:
"""Records exactly where a component table lives inside a worksheet."""
sheet_name: str
header_row: int # 1-based openpyxl row
mfr_col: int # 1-based openpyxl column
mpn_col: int # 1-based openpyxl column
data: list[tuple[int, str, str]] = field(default_factory=list)
# data entries: (1-based row number, manufacturer string, mpn string)
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[TableLocation]:
"""
Scan a list of (1-based-row-num, row-values-tuple) pairs for sub-tables
that have both a 'Manufacturer' and 'MPN' header column.
Handles multiple tables side-by-side on the same header row.
"""
tables: list[TableLocation] = []
i = 0
while i < len(indexed_rows):
row_num, row = indexed_rows[i]
row_str = [_cell(v) for v in row]
mfr_cols = [c for c, v in enumerate(row_str) if v.lower() == "manufacturer"]
mpn_cols = [c for c, v in enumerate(row_str) if v.lower() == "mpn"]
if not mfr_cols or not mpn_cols:
i += 1
continue
# Pair each mfr_col with its nearest unpaired mpn_col
pairs: list[tuple[int, int]] = []
used_mpn: set[int] = set()
for mfr_col_0 in mfr_cols:
available = [c for c in mpn_cols if c not in used_mpn]
if not available:
break
best_mpn = min(available, key=lambda c: abs(c - mfr_col_0))
pairs.append((mfr_col_0, best_mpn))
used_mpn.add(best_mpn)
max_j = i + 1
for mfr_col_0, mpn_col_0 in pairs:
data: list[tuple[int, str, str]] = []
j = i + 1
empty_streak = 0
while j < len(indexed_rows):
dr_num, dr = indexed_rows[j]
mfr = _cell(dr[mfr_col_0] if mfr_col_0 < len(dr) else None)
mpn = _cell(dr[mpn_col_0] if mpn_col_0 < len(dr) else None)
if not mfr and not mpn:
empty_streak += 1
if empty_streak >= 3:
break
j += 1
continue
empty_streak = 0
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
break
if mpn and not _skip(mpn):
data.append((dr_num, mfr, mpn))
j += 1
max_j = max(max_j, j)
tables.append(TableLocation(
sheet_name="",
header_row=row_num,
mfr_col=mfr_col_0 + 1, # convert to 1-based
mpn_col=mpn_col_0 + 1,
data=data,
))
i = max_j
return tables
def extract_parts(
bom_dir: Path,
) -> tuple[dict[Path, list[TableLocation]], set[tuple[str, str]]]:
"""
Returns:
file_map {file_path: [TableLocation, ...]} for write-back
parts set of unique (manufacturer, mpn) pairs for API lookup
"""
file_map: dict[Path, list[TableLocation]] = {}
parts: set[tuple[str, str]] = set()
all_files = sorted(bom_dir.iterdir())
xlsx_files = [f for f in all_files if f.suffix.lower() in {".xlsx", ".xlsm"}]
other_files = [f for f in all_files if f.suffix.lower() in {".xls", ".csv"}]
for f in other_files:
log.warning(
f"Skipping {f.name} "
f"({'read-only; price write-back requires .xlsx' if f.suffix.lower() == '.xls' else 'CSV convert to .xlsx for write-back'})"
)
if not xlsx_files:
log.warning(f"No .xlsx/.xlsm files found in {bom_dir}/")
return file_map, parts
for f in xlsx_files:
log.info(f"Reading {f.name}")
tables_in_file: list[TableLocation] = []
try:
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
indexed = [
(row_num, tuple(row))
for row_num, row in enumerate(ws.iter_rows(values_only=True), start=1)
]
tables = _find_tables(indexed)
for t in tables:
t.sheet_name = sheet_name
if tables:
total_parts = sum(len(t.data) for t in tables)
log.info(f" Sheet '{sheet_name}': {len(tables)} table(s), {total_parts} part rows")
for t in tables:
for _, mfr, mpn in t.data:
parts.add((mfr, mpn))
tables_in_file.extend(tables)
wb.close()
except Exception as exc:
log.error(f" Failed to read {f.name}: {exc}")
continue
if tables_in_file:
file_map[f] = tables_in_file
log.info(f"Unique (Manufacturer, MPN) pairs: {len(parts)}")
return file_map, parts
# ── Distributor API clients ────────────────────────────────────────────────────
class DigiKeyClient:
"""DigiKey API v4. Register at developer.digikey.com for a Client ID + Secret."""
_TOKEN_URL = "https://api.digikey.com/v1/oauth2/token"
_SEARCH_URL = "https://api.digikey.com/products/v4/search/keyword"
def __init__(self) -> None:
self.client_id = os.getenv("DIGIKEY_CLIENT_ID", "")
self.client_secret = os.getenv("DIGIKEY_CLIENT_SECRET", "")
self._access_token: Optional[str] = None
self._token_expiry = datetime.min
@property
def configured(self) -> bool:
return bool(self.client_id and self.client_secret)
def _get_token(self) -> Optional[str]:
if not self.configured:
return None
if self._access_token and datetime.now() < self._token_expiry:
return self._access_token
try:
r = requests.post(
self._TOKEN_URL,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
},
timeout=15,
)
r.raise_for_status()
data = r.json()
self._access_token = data["access_token"]
self._token_expiry = datetime.now() + timedelta(
seconds=data.get("expires_in", 1800) - 60
)
return self._access_token
except Exception as exc:
log.debug(f"DigiKey token error: {exc}")
return None
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
token = self._get_token()
if not token:
return None
try:
r = requests.post(
self._SEARCH_URL,
headers={
"X-DIGIKEY-Client-Id": self.client_id,
"Authorization": f"Bearer {token}",
"X-DIGIKEY-Locale-Site": "FR",
"X-DIGIKEY-Locale-Language": "fr",
"X-DIGIKEY-Locale-Currency": "EUR",
"Content-Type": "application/json",
},
json={"keywords": mpn, "limit": 10, "offset": 0},
timeout=15,
)
r.raise_for_status()
data = r.json()
candidates = (
data.get("exactManufacturerProducts")
or data.get("exactManufacturerProductCount", [])
or []
)
if not candidates:
candidates = data.get("products", [])
for product in candidates:
if product.get("manufacturerProductNumber", "").lower() == mpn.lower():
p = self._price_at_qty(product)
if p is not None:
return p
if candidates:
return self._price_at_qty(candidates[0])
except Exception as exc:
log.debug(f"DigiKey error [{mpn}]: {exc}")
return None
@staticmethod
def _price_at_qty(product: dict) -> Optional[float]:
breaks = product.get("standardPricing") or product.get("priceBreaks") or []
best: Optional[float] = None
for pb in breaks:
try:
qty = int(pb.get("breakQuantity") or pb.get("quantity") or 0)
price = float(pb.get("unitPrice") or pb.get("price") or 0)
if qty <= QUANTITY and price > 0:
best = price # already EUR via locale header
except (ValueError, TypeError):
continue
return best
class MouserClient:
"""Mouser Electronics API v1. Register at mouser.com/api-hub."""
_URL = "https://api.mouser.com/api/v1/search/partnumber"
def __init__(self) -> None:
self.api_key = os.getenv("MOUSER_API_KEY", "")
@property
def configured(self) -> bool:
return bool(self.api_key)
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
if not self.configured:
return None
try:
r = requests.post(
f"{self._URL}?apiKey={self.api_key}&currency=EUR",
json={"SearchByPartRequest": {"mouserPartNumber": mpn, "partSearchOptions": ""}},
timeout=15,
)
r.raise_for_status()
part_list = r.json().get("SearchResults", {}).get("Parts", [])
for part in part_list:
if part.get("ManufacturerPartNumber", "").lower() == mpn.lower():
p = self._price_at_qty(part)
if p is not None:
return p
if part_list:
return self._price_at_qty(part_list[0])
except Exception as exc:
log.debug(f"Mouser error [{mpn}]: {exc}")
return None
@staticmethod
def _price_at_qty(part: dict) -> Optional[float]:
currency = part.get("PricingCurrencyCode", "EUR")
best: Optional[float] = None
for pb in part.get("PriceBreaks", []):
try:
qty = int(pb.get("Quantity", 0))
raw = str(pb.get("Price", "")).strip()
for sym in ("", "$", "£", "¥", ","):
raw = raw.replace(sym, "")
price = float(raw)
if qty <= QUANTITY and price > 0:
best = to_eur(price, currency)
except (ValueError, TypeError):
continue
return best
class FarnellClient:
"""Farnell / Element14 API. Register at partner.element14.com."""
_URL = "https://api.element14.com/catalog/products"
def __init__(self) -> None:
self.api_key = os.getenv("FARNELL_API_KEY", "")
# Change to e.g. "de.farnell.com" for German pricing
self.store = os.getenv("FARNELL_STORE", "fr.farnell.com")
@property
def configured(self) -> bool:
return bool(self.api_key)
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
if not self.configured:
return None
try:
r = requests.get(
self._URL,
params={
"term": f"mfrPartNum:{mpn}",
"storeInfo.id": self.store,
"resultsSettings.offset": 0,
"resultsSettings.numberOfResults": 5,
"resultsSettings.responseGroup": "prices",
"callInfo.omitXmlSchema": "false",
"callInfo.responseDataFormat": "json",
"callInfo.apiKey": self.api_key,
},
timeout=15,
)
r.raise_for_status()
products = r.json().get("keywordSearchReturn", {}).get("products", [])
for product in products:
pn = product.get("translatedManufacturerPartNumber") or product.get("id", "")
if str(pn).lower() == mpn.lower():
p = self._price_at_qty(product)
if p is not None:
return p
if products:
return self._price_at_qty(products[0])
except Exception as exc:
log.debug(f"Farnell error [{mpn}]: {exc}")
return None
@staticmethod
def _price_at_qty(product: dict) -> Optional[float]:
currency = product.get("currency", "EUR")
best: Optional[float] = None
for pb in product.get("prices", []):
try:
qty = int(pb.get("from", 0))
price = float(pb.get("cost", 0))
if qty <= QUANTITY and price > 0:
best = to_eur(price, currency)
except (ValueError, TypeError):
continue
return best
# ── Result dataclass ───────────────────────────────────────────────────────────
@dataclass
class PartResult:
manufacturer: str
mpn: str
digikey_eur: Optional[float] = None
mouser_eur: Optional[float] = None
farnell_eur: Optional[float] = None
@property
def average_eur(self) -> Optional[float]:
vals = [v for v in (self.digikey_eur, self.mouser_eur, self.farnell_eur)
if v is not None]
return round(sum(vals) / len(vals), 5) if vals else None
def sources_found(self) -> int:
return sum(v is not None for v in
(self.digikey_eur, self.mouser_eur, self.farnell_eur))
# ── API lookup with caching ────────────────────────────────────────────────────
def lookup(
manufacturer: str,
mpn: str,
cache: PriceCache,
dk: DigiKeyClient,
mu: MouserClient,
fa: FarnellClient,
) -> PartResult:
cached = cache.get(manufacturer, mpn)
if cached:
log.debug(f" cache hit: {mpn}")
return PartResult(
manufacturer=manufacturer,
mpn=mpn,
digikey_eur=cached.get("digikey_eur"),
mouser_eur=cached.get("mouser_eur"),
farnell_eur=cached.get("farnell_eur"),
)
result = PartResult(manufacturer=manufacturer, mpn=mpn)
result.digikey_eur = dk.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY)
result.mouser_eur = mu.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY)
result.farnell_eur = fa.get_price_eur(manufacturer, mpn)
cache.put(manufacturer, mpn, {
"digikey_eur": result.digikey_eur,
"mouser_eur": result.mouser_eur,
"farnell_eur": result.farnell_eur,
})
return result
# ── Write-back ─────────────────────────────────────────────────────────────────
def _part_key(manufacturer: str, mpn: str) -> str:
return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}"
def write_back(
file_map: dict[Path, list[TableLocation]],
price_lookup: dict[str, Optional[float]],
) -> None:
"""
Write the average EUR price at qty 1000 back into each BoM file.
For every component table:
- Adds a "Unit Cost 1000x Data" column header after the last existing
header in the table's header row (if the column doesn't exist already).
- Writes the price into each data row where the cell is currently empty.
Cells that already contain a value are left unchanged.
"""
for file_path, tables in file_map.items():
log.info(f"Updating {file_path.name}")
try:
wb = openpyxl.load_workbook(file_path)
except Exception as exc:
log.error(f" Cannot open {file_path.name} for writing: {exc}")
continue
for table in tables:
ws = wb[table.sheet_name]
# ── Find or create the "Unit Cost 1000x Data" column ──────────────
cost_col: Optional[int] = None
last_used_col = 1
max_col = ws.max_column or 1
for c in range(1, max_col + 1):
val = ws.cell(table.header_row, c).value
if val is not None:
last_used_col = c
if str(val).strip().lower() == COST_HEADER.lower():
cost_col = c
if cost_col is None:
cost_col = last_used_col + 1
ws.cell(table.header_row, cost_col).value = COST_HEADER
# ── Fill prices for empty cells ───────────────────────────────────
filled = 0
skipped = 0
for row_num, mfr, mpn in table.data:
cell = ws.cell(row_num, cost_col)
if isinstance(cell, MergedCell):
continue
existing = cell.value
if existing is not None and str(existing).strip() != "":
skipped += 1
continue
price = price_lookup.get(_part_key(mfr, mpn))
if price is not None:
cell.value = price
filled += 1
log.info(
f" Sheet '{table.sheet_name}' (header row {table.header_row}): "
f"{filled} filled, {skipped} skipped (already had a value)"
)
try:
wb.save(file_path)
log.info(f" Saved {file_path.name}")
except PermissionError:
log.error(
f" Cannot save {file_path.name} close the file in Excel first."
)
except Exception as exc:
log.error(f" Save failed for {file_path.name}: {exc}")
# ── Summary output ────────────────────────────────────────────────────────────
OUTPUT_FILE = Path("bom_prices.xlsx")
def write_summary(parts: set[tuple[str, str]], price_lookup: dict[str, Optional[float]]) -> None:
rows = []
for mfr, mpn in sorted(parts):
price = price_lookup.get(_part_key(mfr, mpn))
rows.append({
"Manufacturer": mfr,
"MPN": mpn,
f"Unit Cost EUR @{QUANTITY}": price,
})
df = pd.DataFrame(rows)
with pd.ExcelWriter(OUTPUT_FILE, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="Prices")
ws = writer.sheets["Prices"]
for col in ws.columns:
width = max(len(str(cell.value or "")) for cell in col)
ws.column_dimensions[col[0].column_letter].width = min(width + 3, 50)
log.info(f"Summary written → {OUTPUT_FILE}")
# ── Main ───────────────────────────────────────────────────────────────────────
def main() -> None:
if not BOM_DIR.exists():
log.error(f"BoM directory '{BOM_DIR}' does not exist.")
sys.exit(1)
dk = DigiKeyClient()
mu = MouserClient()
fa = FarnellClient()
active = [n for n, c in [("DigiKey", dk), ("Mouser", mu), ("Farnell", fa)]
if c.configured]
if active:
log.info(f"Configured APIs: {', '.join(active)}")
else:
log.warning("No API keys found. Copy .env.example → .env and fill in your keys.")
_, parts = extract_parts(BOM_DIR)
if not parts:
log.error("No valid (Manufacturer, MPN) pairs found in BoM files.")
sys.exit(1)
cache = PriceCache(CACHE_FILE)
price_lookup: dict[str, Optional[float]] = {}
total = len(parts)
for i, (mfr, mpn) in enumerate(sorted(parts), 1):
log.info(f"[{i:>4}/{total}] {mfr or '(no mfr)':35s} {mpn}")
r = lookup(mfr, mpn, cache, dk, mu, fa)
tag = f"{r.average_eur:.4f}" if r.average_eur is not None else ""
log.info(f" avg {tag} ({r.sources_found()}/3 sources)")
price_lookup[_part_key(mfr, mpn)] = r.average_eur
write_summary(parts, price_lookup)
found = sum(1 for v in price_lookup.values() if v is not None)
missing = [mpn for (_, mpn), v in
((k.split("||", 1), v) for k, v in price_lookup.items())
if v is None]
log.info(f"Done pricing found for {found}/{total} unique parts.")
if missing:
sample = ", ".join(missing[:15])
tail = f" … (+{len(missing) - 15} more)" if len(missing) > 15 else ""
log.info(f"No pricing found for: {sample}{tail}")
if __name__ == "__main__":
main()