673 lines
24 KiB
Python
673 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
BoM Price Checker
|
||
=================
|
||
Reads every .xlsx / .xlsm file from the BoM/ folder.
|
||
Each file may have many sheets, and each sheet may have many tables.
|
||
A "table" is any block of rows that begins with a header row containing
|
||
both a 'Manufacturer' column and an 'MPN' column.
|
||
|
||
For every unique (Manufacturer, MPN) pair found, the script queries:
|
||
- DigiKey (v4 API – developer.digikey.com)
|
||
- Mouser (v1 API – mouser.com/api-hub)
|
||
- Farnell (Element14 API – partner.element14.com)
|
||
|
||
All prices are converted to EUR at quantity 1000 using live FX rates
|
||
from Frankfurter (api.frankfurter.app – free, no key required).
|
||
|
||
Results are written back into the original BoM files as a new column
|
||
"Unit Cost 1000x Data" placed at the end of each component table.
|
||
Cells that already contain a value are left untouched.
|
||
|
||
Cached results are stored in price_cache.json and reused for 7 days.
|
||
|
||
Quick start
|
||
-----------
|
||
pip install -r requirements.txt
|
||
cp .env.example .env # then add your API keys
|
||
python bom_price_checker.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import logging
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import requests
|
||
import openpyxl
|
||
import pandas as pd
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
# ── Configuration ──────────────────────────────────────────────────────────────
|
||
|
||
BOM_DIR = Path("BoM")
|
||
CACHE_FILE = Path("price_cache.json")
|
||
QUANTITY = 1000
|
||
CACHE_TTL_DAYS = 7
|
||
REQUEST_DELAY = 0.35 # seconds between successive API calls
|
||
COST_HEADER = "Unit Cost 1000x Data"
|
||
|
||
# MPN cell values that mean "no part fitted / TBD" – skip these
|
||
SKIP_MPNS = {
|
||
"", "tbd", "n/a", "na", "-", "--", "---", "?", "none",
|
||
"null", "nan", "xxx", "x", "dnf", "dnp", "do not fit",
|
||
"do not populate",
|
||
}
|
||
|
||
# ── Logging ────────────────────────────────────────────────────────────────────
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
# ── Currency conversion ────────────────────────────────────────────────────────
|
||
|
||
_fx_cache: dict[str, float] = {}
|
||
|
||
|
||
def eur_rate(currency: str) -> float:
|
||
"""Return how many EUR equal 1 unit of *currency* (Frankfurter API, no key needed)."""
|
||
currency = currency.upper().strip()
|
||
if currency == "EUR":
|
||
return 1.0
|
||
if currency in _fx_cache:
|
||
return _fx_cache[currency]
|
||
try:
|
||
r = requests.get(
|
||
"https://api.frankfurter.app/latest",
|
||
params={"from": currency, "to": "EUR"},
|
||
timeout=10,
|
||
)
|
||
r.raise_for_status()
|
||
rate: float = r.json()["rates"]["EUR"]
|
||
_fx_cache[currency] = rate
|
||
log.debug(f"FX 1 {currency} = {rate:.5f} EUR")
|
||
return rate
|
||
except Exception as exc:
|
||
log.warning(f"FX lookup failed ({currency}→EUR): {exc} – defaulting to 1.0")
|
||
return 1.0
|
||
|
||
|
||
def to_eur(amount: float, currency: str) -> float:
|
||
return round(amount * eur_rate(currency), 5)
|
||
|
||
|
||
# ── Price cache ────────────────────────────────────────────────────────────────
|
||
|
||
class PriceCache:
|
||
def __init__(self, path: Path) -> None:
|
||
self.path = path
|
||
self._data: dict[str, dict] = {}
|
||
self._load()
|
||
|
||
def _load(self) -> None:
|
||
if self.path.exists():
|
||
try:
|
||
self._data = json.loads(self.path.read_text(encoding="utf-8"))
|
||
except (json.JSONDecodeError, OSError):
|
||
self._data = {}
|
||
|
||
def save(self) -> None:
|
||
self.path.write_text(json.dumps(self._data, indent=2), encoding="utf-8")
|
||
|
||
@staticmethod
|
||
def _key(manufacturer: str, mpn: str) -> str:
|
||
return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}"
|
||
|
||
def get(self, manufacturer: str, mpn: str) -> Optional[dict]:
|
||
entry = self._data.get(self._key(manufacturer, mpn))
|
||
if not entry:
|
||
return None
|
||
try:
|
||
age = datetime.now() - datetime.fromisoformat(entry["cached_at"])
|
||
except (KeyError, ValueError):
|
||
return None
|
||
return entry if age <= timedelta(days=CACHE_TTL_DAYS) else None
|
||
|
||
def put(self, manufacturer: str, mpn: str, prices: dict) -> None:
|
||
prices["cached_at"] = datetime.now().isoformat()
|
||
self._data[self._key(manufacturer, mpn)] = prices
|
||
self.save()
|
||
|
||
|
||
# ── BoM parsing ────────────────────────────────────────────────────────────────
|
||
|
||
def _cell(value) -> str:
|
||
return str(value).strip() if value is not None else ""
|
||
|
||
|
||
def _skip(mpn: str) -> bool:
|
||
return mpn.lower() in SKIP_MPNS
|
||
|
||
|
||
@dataclass
|
||
class TableLocation:
|
||
"""Records exactly where a component table lives inside a worksheet."""
|
||
sheet_name: str
|
||
header_row: int # 1-based openpyxl row
|
||
mfr_col: int # 1-based openpyxl column
|
||
mpn_col: int # 1-based openpyxl column
|
||
data: list[tuple[int, str, str]] = field(default_factory=list)
|
||
# data entries: (1-based row number, manufacturer string, mpn string)
|
||
|
||
|
||
def _find_tables(indexed_rows: list[tuple[int, tuple]]) -> list[TableLocation]:
|
||
"""
|
||
Scan a list of (1-based-row-num, row-values-tuple) pairs for sub-tables
|
||
that have both a 'Manufacturer' and 'MPN' header column.
|
||
"""
|
||
tables: list[TableLocation] = []
|
||
i = 0
|
||
while i < len(indexed_rows):
|
||
row_num, row = indexed_rows[i]
|
||
row_str = [_cell(v) for v in row]
|
||
|
||
mfr_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "manufacturer"), None)
|
||
mpn_col_0 = next((c for c, v in enumerate(row_str) if v.lower() == "mpn"), None)
|
||
|
||
if mfr_col_0 is None or mpn_col_0 is None:
|
||
i += 1
|
||
continue
|
||
|
||
# Found a header row – consume data rows below it
|
||
data: list[tuple[int, str, str]] = []
|
||
j = i + 1
|
||
empty_streak = 0
|
||
while j < len(indexed_rows):
|
||
dr_num, dr = indexed_rows[j]
|
||
mfr = _cell(dr[mfr_col_0] if mfr_col_0 < len(dr) else None)
|
||
mpn = _cell(dr[mpn_col_0] if mpn_col_0 < len(dr) else None)
|
||
|
||
if not mfr and not mpn:
|
||
empty_streak += 1
|
||
if empty_streak >= 3:
|
||
break
|
||
j += 1
|
||
continue
|
||
empty_streak = 0
|
||
|
||
# Another header row signals the end of this table
|
||
if mfr.lower() == "manufacturer" and mpn.lower() == "mpn":
|
||
break
|
||
|
||
if mpn and not _skip(mpn):
|
||
data.append((dr_num, mfr, mpn))
|
||
j += 1
|
||
|
||
tables.append(TableLocation(
|
||
sheet_name="", # filled by caller
|
||
header_row=row_num,
|
||
mfr_col=mfr_col_0 + 1, # convert to 1-based
|
||
mpn_col=mpn_col_0 + 1,
|
||
data=data,
|
||
))
|
||
i = j # jump past the table we just consumed
|
||
return tables
|
||
|
||
|
||
def extract_parts(
|
||
bom_dir: Path,
|
||
) -> tuple[dict[Path, list[TableLocation]], set[tuple[str, str]]]:
|
||
"""
|
||
Returns:
|
||
file_map – {file_path: [TableLocation, ...]} for write-back
|
||
parts – set of unique (manufacturer, mpn) pairs for API lookup
|
||
"""
|
||
file_map: dict[Path, list[TableLocation]] = {}
|
||
parts: set[tuple[str, str]] = set()
|
||
|
||
all_files = sorted(bom_dir.iterdir())
|
||
xlsx_files = [f for f in all_files if f.suffix.lower() in {".xlsx", ".xlsm"}]
|
||
other_files = [f for f in all_files if f.suffix.lower() in {".xls", ".csv"}]
|
||
|
||
for f in other_files:
|
||
log.warning(
|
||
f"Skipping {f.name} "
|
||
f"({'read-only; price write-back requires .xlsx' if f.suffix.lower() == '.xls' else 'CSV – convert to .xlsx for write-back'})"
|
||
)
|
||
|
||
if not xlsx_files:
|
||
log.warning(f"No .xlsx/.xlsm files found in {bom_dir}/")
|
||
return file_map, parts
|
||
|
||
for f in xlsx_files:
|
||
log.info(f"Reading {f.name}")
|
||
tables_in_file: list[TableLocation] = []
|
||
try:
|
||
wb = openpyxl.load_workbook(f, data_only=True, read_only=True)
|
||
for sheet_name in wb.sheetnames:
|
||
ws = wb[sheet_name]
|
||
indexed = [
|
||
(row_num, tuple(row))
|
||
for row_num, row in enumerate(ws.iter_rows(values_only=True), start=1)
|
||
]
|
||
tables = _find_tables(indexed)
|
||
for t in tables:
|
||
t.sheet_name = sheet_name
|
||
if tables:
|
||
total_parts = sum(len(t.data) for t in tables)
|
||
log.info(f" Sheet '{sheet_name}': {len(tables)} table(s), {total_parts} part rows")
|
||
for t in tables:
|
||
for _, mfr, mpn in t.data:
|
||
parts.add((mfr, mpn))
|
||
tables_in_file.extend(tables)
|
||
wb.close()
|
||
except Exception as exc:
|
||
log.error(f" Failed to read {f.name}: {exc}")
|
||
continue
|
||
|
||
if tables_in_file:
|
||
file_map[f] = tables_in_file
|
||
|
||
log.info(f"Unique (Manufacturer, MPN) pairs: {len(parts)}")
|
||
return file_map, parts
|
||
|
||
|
||
# ── Distributor API clients ────────────────────────────────────────────────────
|
||
|
||
class DigiKeyClient:
|
||
"""DigiKey API v4. Register at developer.digikey.com for a Client ID + Secret."""
|
||
|
||
_TOKEN_URL = "https://api.digikey.com/v1/oauth2/token"
|
||
_SEARCH_URL = "https://api.digikey.com/products/v4/search/keyword"
|
||
|
||
def __init__(self) -> None:
|
||
self.client_id = os.getenv("DIGIKEY_CLIENT_ID", "")
|
||
self.client_secret = os.getenv("DIGIKEY_CLIENT_SECRET", "")
|
||
self._access_token: Optional[str] = None
|
||
self._token_expiry = datetime.min
|
||
|
||
@property
|
||
def configured(self) -> bool:
|
||
return bool(self.client_id and self.client_secret)
|
||
|
||
def _get_token(self) -> Optional[str]:
|
||
if not self.configured:
|
||
return None
|
||
if self._access_token and datetime.now() < self._token_expiry:
|
||
return self._access_token
|
||
try:
|
||
r = requests.post(
|
||
self._TOKEN_URL,
|
||
data={
|
||
"client_id": self.client_id,
|
||
"client_secret": self.client_secret,
|
||
"grant_type": "client_credentials",
|
||
},
|
||
timeout=15,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
self._access_token = data["access_token"]
|
||
self._token_expiry = datetime.now() + timedelta(
|
||
seconds=data.get("expires_in", 1800) - 60
|
||
)
|
||
return self._access_token
|
||
except Exception as exc:
|
||
log.debug(f"DigiKey token error: {exc}")
|
||
return None
|
||
|
||
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
|
||
token = self._get_token()
|
||
if not token:
|
||
return None
|
||
try:
|
||
r = requests.post(
|
||
self._SEARCH_URL,
|
||
headers={
|
||
"X-DIGIKEY-Client-Id": self.client_id,
|
||
"Authorization": f"Bearer {token}",
|
||
"X-DIGIKEY-Locale-Site": "FR",
|
||
"X-DIGIKEY-Locale-Language": "fr",
|
||
"X-DIGIKEY-Locale-Currency": "EUR",
|
||
"Content-Type": "application/json",
|
||
},
|
||
json={"keywords": mpn, "limit": 10, "offset": 0},
|
||
timeout=15,
|
||
)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
|
||
candidates = (
|
||
data.get("exactManufacturerProducts")
|
||
or data.get("exactManufacturerProductCount", [])
|
||
or []
|
||
)
|
||
if not candidates:
|
||
candidates = data.get("products", [])
|
||
|
||
for product in candidates:
|
||
if product.get("manufacturerProductNumber", "").lower() == mpn.lower():
|
||
p = self._price_at_qty(product)
|
||
if p is not None:
|
||
return p
|
||
if candidates:
|
||
return self._price_at_qty(candidates[0])
|
||
except Exception as exc:
|
||
log.debug(f"DigiKey error [{mpn}]: {exc}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _price_at_qty(product: dict) -> Optional[float]:
|
||
breaks = product.get("standardPricing") or product.get("priceBreaks") or []
|
||
best: Optional[float] = None
|
||
for pb in breaks:
|
||
try:
|
||
qty = int(pb.get("breakQuantity") or pb.get("quantity") or 0)
|
||
price = float(pb.get("unitPrice") or pb.get("price") or 0)
|
||
if qty <= QUANTITY and price > 0:
|
||
best = price # already EUR via locale header
|
||
except (ValueError, TypeError):
|
||
continue
|
||
return best
|
||
|
||
|
||
class MouserClient:
|
||
"""Mouser Electronics API v1. Register at mouser.com/api-hub."""
|
||
|
||
_URL = "https://api.mouser.com/api/v1/search/partnumber"
|
||
|
||
def __init__(self) -> None:
|
||
self.api_key = os.getenv("MOUSER_API_KEY", "")
|
||
|
||
@property
|
||
def configured(self) -> bool:
|
||
return bool(self.api_key)
|
||
|
||
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
|
||
if not self.configured:
|
||
return None
|
||
try:
|
||
r = requests.post(
|
||
f"{self._URL}?apiKey={self.api_key}¤cy=EUR",
|
||
json={"SearchByPartRequest": {"mouserPartNumber": mpn, "partSearchOptions": ""}},
|
||
timeout=15,
|
||
)
|
||
r.raise_for_status()
|
||
part_list = r.json().get("SearchResults", {}).get("Parts", [])
|
||
|
||
for part in part_list:
|
||
if part.get("ManufacturerPartNumber", "").lower() == mpn.lower():
|
||
p = self._price_at_qty(part)
|
||
if p is not None:
|
||
return p
|
||
if part_list:
|
||
return self._price_at_qty(part_list[0])
|
||
except Exception as exc:
|
||
log.debug(f"Mouser error [{mpn}]: {exc}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _price_at_qty(part: dict) -> Optional[float]:
|
||
currency = part.get("PricingCurrencyCode", "EUR")
|
||
best: Optional[float] = None
|
||
for pb in part.get("PriceBreaks", []):
|
||
try:
|
||
qty = int(pb.get("Quantity", 0))
|
||
raw = str(pb.get("Price", "")).strip()
|
||
for sym in ("€", "$", "£", "¥", ","):
|
||
raw = raw.replace(sym, "")
|
||
price = float(raw)
|
||
if qty <= QUANTITY and price > 0:
|
||
best = to_eur(price, currency)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
return best
|
||
|
||
|
||
class FarnellClient:
|
||
"""Farnell / Element14 API. Register at partner.element14.com."""
|
||
|
||
_URL = "https://api.element14.com/catalog/products"
|
||
|
||
def __init__(self) -> None:
|
||
self.api_key = os.getenv("FARNELL_API_KEY", "")
|
||
# Change to e.g. "de.farnell.com" for German pricing
|
||
self.store = os.getenv("FARNELL_STORE", "fr.farnell.com")
|
||
|
||
@property
|
||
def configured(self) -> bool:
|
||
return bool(self.api_key)
|
||
|
||
def get_price_eur(self, manufacturer: str, mpn: str) -> Optional[float]:
|
||
if not self.configured:
|
||
return None
|
||
try:
|
||
r = requests.get(
|
||
self._URL,
|
||
params={
|
||
"term": f"mfrPartNum:{mpn}",
|
||
"storeInfo.id": self.store,
|
||
"resultsSettings.offset": 0,
|
||
"resultsSettings.numberOfResults": 5,
|
||
"resultsSettings.responseGroup": "prices",
|
||
"callInfo.omitXmlSchema": "false",
|
||
"callInfo.responseDataFormat": "json",
|
||
"callInfo.apiKey": self.api_key,
|
||
},
|
||
timeout=15,
|
||
)
|
||
r.raise_for_status()
|
||
products = r.json().get("keywordSearchReturn", {}).get("products", [])
|
||
|
||
for product in products:
|
||
pn = product.get("translatedManufacturerPartNumber") or product.get("id", "")
|
||
if str(pn).lower() == mpn.lower():
|
||
p = self._price_at_qty(product)
|
||
if p is not None:
|
||
return p
|
||
if products:
|
||
return self._price_at_qty(products[0])
|
||
except Exception as exc:
|
||
log.debug(f"Farnell error [{mpn}]: {exc}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _price_at_qty(product: dict) -> Optional[float]:
|
||
currency = product.get("currency", "EUR")
|
||
best: Optional[float] = None
|
||
for pb in product.get("prices", []):
|
||
try:
|
||
qty = int(pb.get("from", 0))
|
||
price = float(pb.get("cost", 0))
|
||
if qty <= QUANTITY and price > 0:
|
||
best = to_eur(price, currency)
|
||
except (ValueError, TypeError):
|
||
continue
|
||
return best
|
||
|
||
|
||
# ── Result dataclass ───────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class PartResult:
|
||
manufacturer: str
|
||
mpn: str
|
||
digikey_eur: Optional[float] = None
|
||
mouser_eur: Optional[float] = None
|
||
farnell_eur: Optional[float] = None
|
||
|
||
@property
|
||
def average_eur(self) -> Optional[float]:
|
||
vals = [v for v in (self.digikey_eur, self.mouser_eur, self.farnell_eur)
|
||
if v is not None]
|
||
return round(sum(vals) / len(vals), 5) if vals else None
|
||
|
||
def sources_found(self) -> int:
|
||
return sum(v is not None for v in
|
||
(self.digikey_eur, self.mouser_eur, self.farnell_eur))
|
||
|
||
|
||
# ── API lookup with caching ────────────────────────────────────────────────────
|
||
|
||
def lookup(
|
||
manufacturer: str,
|
||
mpn: str,
|
||
cache: PriceCache,
|
||
dk: DigiKeyClient,
|
||
mu: MouserClient,
|
||
fa: FarnellClient,
|
||
) -> PartResult:
|
||
cached = cache.get(manufacturer, mpn)
|
||
if cached:
|
||
log.debug(f" cache hit: {mpn}")
|
||
return PartResult(
|
||
manufacturer=manufacturer,
|
||
mpn=mpn,
|
||
digikey_eur=cached.get("digikey_eur"),
|
||
mouser_eur=cached.get("mouser_eur"),
|
||
farnell_eur=cached.get("farnell_eur"),
|
||
)
|
||
|
||
result = PartResult(manufacturer=manufacturer, mpn=mpn)
|
||
result.digikey_eur = dk.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY)
|
||
result.mouser_eur = mu.get_price_eur(manufacturer, mpn); time.sleep(REQUEST_DELAY)
|
||
result.farnell_eur = fa.get_price_eur(manufacturer, mpn)
|
||
|
||
cache.put(manufacturer, mpn, {
|
||
"digikey_eur": result.digikey_eur,
|
||
"mouser_eur": result.mouser_eur,
|
||
"farnell_eur": result.farnell_eur,
|
||
})
|
||
return result
|
||
|
||
|
||
# ── Write-back ─────────────────────────────────────────────────────────────────
|
||
|
||
def _part_key(manufacturer: str, mpn: str) -> str:
|
||
return f"{manufacturer.strip().lower()}||{mpn.strip().lower()}"
|
||
|
||
|
||
def write_back(
|
||
file_map: dict[Path, list[TableLocation]],
|
||
price_lookup: dict[str, Optional[float]],
|
||
) -> None:
|
||
"""
|
||
Write the average EUR price at qty 1000 back into each BoM file.
|
||
|
||
For every component table:
|
||
- Adds a "Unit Cost 1000x Data" column header after the last existing
|
||
header in the table's header row (if the column doesn't exist already).
|
||
- Writes the price into each data row where the cell is currently empty.
|
||
Cells that already contain a value are left unchanged.
|
||
"""
|
||
for file_path, tables in file_map.items():
|
||
log.info(f"Updating {file_path.name}")
|
||
try:
|
||
wb = openpyxl.load_workbook(file_path)
|
||
except Exception as exc:
|
||
log.error(f" Cannot open {file_path.name} for writing: {exc}")
|
||
continue
|
||
|
||
for table in tables:
|
||
ws = wb[table.sheet_name]
|
||
|
||
# ── Find or create the "Unit Cost 1000x Data" column ──────────────
|
||
cost_col: Optional[int] = None
|
||
last_used_col = 1
|
||
max_col = ws.max_column or 1
|
||
|
||
for c in range(1, max_col + 1):
|
||
val = ws.cell(table.header_row, c).value
|
||
if val is not None:
|
||
last_used_col = c
|
||
if str(val).strip().lower() == COST_HEADER.lower():
|
||
cost_col = c
|
||
|
||
if cost_col is None:
|
||
cost_col = last_used_col + 1
|
||
ws.cell(table.header_row, cost_col).value = COST_HEADER
|
||
|
||
# ── Fill prices for empty cells ───────────────────────────────────
|
||
filled = 0
|
||
skipped = 0
|
||
for row_num, mfr, mpn in table.data:
|
||
cell = ws.cell(row_num, cost_col)
|
||
existing = cell.value
|
||
if existing is not None and str(existing).strip() != "":
|
||
skipped += 1
|
||
continue
|
||
price = price_lookup.get(_part_key(mfr, mpn))
|
||
if price is not None:
|
||
cell.value = price
|
||
filled += 1
|
||
|
||
log.info(
|
||
f" Sheet '{table.sheet_name}' (header row {table.header_row}): "
|
||
f"{filled} filled, {skipped} skipped (already had a value)"
|
||
)
|
||
|
||
try:
|
||
wb.save(file_path)
|
||
log.info(f" Saved {file_path.name}")
|
||
except PermissionError:
|
||
log.error(
|
||
f" Cannot save {file_path.name} – close the file in Excel first."
|
||
)
|
||
except Exception as exc:
|
||
log.error(f" Save failed for {file_path.name}: {exc}")
|
||
|
||
|
||
# ── Main ───────────────────────────────────────────────────────────────────────
|
||
|
||
def main() -> None:
|
||
if not BOM_DIR.exists():
|
||
log.error(f"BoM directory '{BOM_DIR}' does not exist.")
|
||
sys.exit(1)
|
||
|
||
dk = DigiKeyClient()
|
||
mu = MouserClient()
|
||
fa = FarnellClient()
|
||
|
||
active = [n for n, c in [("DigiKey", dk), ("Mouser", mu), ("Farnell", fa)]
|
||
if c.configured]
|
||
if active:
|
||
log.info(f"Configured APIs: {', '.join(active)}")
|
||
else:
|
||
log.warning("No API keys found. Copy .env.example → .env and fill in your keys.")
|
||
|
||
file_map, parts = extract_parts(BOM_DIR)
|
||
|
||
if not parts:
|
||
log.error("No valid (Manufacturer, MPN) pairs found in BoM files.")
|
||
sys.exit(1)
|
||
|
||
cache = PriceCache(CACHE_FILE)
|
||
price_lookup: dict[str, Optional[float]] = {}
|
||
|
||
total = len(parts)
|
||
for i, (mfr, mpn) in enumerate(sorted(parts), 1):
|
||
log.info(f"[{i:>4}/{total}] {mfr or '(no mfr)':35s} {mpn}")
|
||
r = lookup(mfr, mpn, cache, dk, mu, fa)
|
||
tag = f"€{r.average_eur:.4f}" if r.average_eur is not None else "—"
|
||
log.info(f" avg {tag} ({r.sources_found()}/3 sources)")
|
||
price_lookup[_part_key(mfr, mpn)] = r.average_eur
|
||
|
||
write_back(file_map, price_lookup)
|
||
|
||
found = sum(1 for v in price_lookup.values() if v is not None)
|
||
missing = [mpn for (_, mpn), v in
|
||
((k.split("||", 1), v) for k, v in price_lookup.items())
|
||
if v is None]
|
||
log.info(f"Done – pricing found for {found}/{total} unique parts.")
|
||
if missing:
|
||
sample = ", ".join(missing[:15])
|
||
tail = f" … (+{len(missing) - 15} more)" if len(missing) > 15 else ""
|
||
log.info(f"No pricing found for: {sample}{tail}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|