2026-03-31 15:38:56 +01:00
|
|
|
"""
|
2026-04-01 16:25:54 +01:00
|
|
|
List all projects in Altium 365 workspaces via the Nexar GraphQL API,
|
|
|
|
|
then export schematic sheet components to an xlsx file.
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
Requirements:
|
2026-04-01 16:25:54 +01:00
|
|
|
pip install requests openpyxl
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
Environment variables:
|
|
|
|
|
NEXAR_CLIENT_ID - Your Nexar application client ID
|
|
|
|
|
NEXAR_CLIENT_SECRET - Your Nexar application client secret
|
|
|
|
|
|
|
|
|
|
Your Nexar application must have the 'design' scope enabled.
|
|
|
|
|
Authentication opens a browser window for Altium 365 login.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import base64
|
|
|
|
|
import hashlib
|
|
|
|
|
import http.server
|
|
|
|
|
import os
|
|
|
|
|
import re
|
|
|
|
|
import sys
|
|
|
|
|
import webbrowser
|
|
|
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
import openpyxl
|
2026-03-31 15:38:56 +01:00
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
TOKEN_URL = "https://identity.nexar.com/connect/token"
|
|
|
|
|
AUTH_URL = "https://identity.nexar.com/connect/authorize"
|
|
|
|
|
NEXAR_API_URL = "https://api.nexar.com/graphql"
|
|
|
|
|
REDIRECT_URI = "http://localhost:3000/login"
|
|
|
|
|
SCOPES = "openid profile email design.domain user.access"
|
|
|
|
|
|
|
|
|
|
WORKSPACE_NAME = "Flowbird SAS"
|
|
|
|
|
|
|
|
|
|
QUERY_WORKSPACES_AND_PROJECTS = """
|
|
|
|
|
query GetWorkspacesAndProjects($workspace: String!) {
|
|
|
|
|
desWorkspaces(where: {name: {eq: $workspace}}) {
|
|
|
|
|
url
|
|
|
|
|
name
|
|
|
|
|
description
|
|
|
|
|
projects {
|
|
|
|
|
id
|
|
|
|
|
name
|
|
|
|
|
description
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
QUERY_VARIANTS = """
|
|
|
|
|
query GetVariants($projectId: ID!) {
|
|
|
|
|
desProjectById(id: $projectId) {
|
|
|
|
|
name
|
|
|
|
|
design {
|
|
|
|
|
workInProgress {
|
|
|
|
|
variants {
|
|
|
|
|
name
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
QUERY_SCHEMATICS = """
|
|
|
|
|
query GetSchematics($projectId: ID!) {
|
|
|
|
|
desProjectById(id: $projectId) {
|
|
|
|
|
design {
|
|
|
|
|
workInProgress {
|
|
|
|
|
variants {
|
|
|
|
|
name
|
|
|
|
|
schematics {
|
|
|
|
|
documentName
|
|
|
|
|
documentId
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
QUERY_SHEET_COMPONENTS = """
|
|
|
|
|
query GetSheetComponents($projectId: ID!, $cursor: String) {
|
|
|
|
|
desProjectById(id: $projectId) {
|
|
|
|
|
design {
|
|
|
|
|
workInProgress {
|
|
|
|
|
variants {
|
|
|
|
|
name
|
|
|
|
|
schematics {
|
|
|
|
|
documentId
|
|
|
|
|
designItems(first: 100, after: $cursor) {
|
|
|
|
|
pageInfo {
|
|
|
|
|
hasNextPage
|
|
|
|
|
endCursor
|
|
|
|
|
}
|
|
|
|
|
nodes {
|
|
|
|
|
designator
|
|
|
|
|
component {
|
|
|
|
|
name
|
|
|
|
|
description
|
2026-04-09 15:27:23 +01:00
|
|
|
details {
|
|
|
|
|
parameters {
|
|
|
|
|
name
|
|
|
|
|
value
|
|
|
|
|
}
|
2026-04-01 16:25:54 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
QUERY_FITTED_DESIGNATORS = """
|
|
|
|
|
query GetFittedDesignators($projectId: ID!) {
|
|
|
|
|
desProjectById(id: $projectId) {
|
|
|
|
|
design {
|
|
|
|
|
workInProgress {
|
|
|
|
|
variants {
|
|
|
|
|
name
|
|
|
|
|
bom {
|
|
|
|
|
bomItems {
|
|
|
|
|
bomItemInstances {
|
|
|
|
|
designator
|
|
|
|
|
isFitted
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2026-03-31 15:38:56 +01:00
|
|
|
_CALLBACK_HTML = """
|
|
|
|
|
<html><head><title>Nexar Login</title></head>
|
|
|
|
|
<body style="background:#000b24;color:#fff;font-family:sans-serif;text-align:center;padding-top:20%">
|
|
|
|
|
<h1>Login successful</h1><p>You can close this tab and return to the terminal.</p>
|
|
|
|
|
</body></html>
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_callback_handler(code_bucket):
|
|
|
|
|
class _Handler(http.server.BaseHTTPRequestHandler):
|
|
|
|
|
def log_message(self, *_):
|
2026-04-01 16:25:54 +01:00
|
|
|
pass
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
def do_GET(self):
|
|
|
|
|
parsed = urlparse(self.path)
|
|
|
|
|
if parsed.path != "/login":
|
|
|
|
|
self.send_response(404)
|
|
|
|
|
self.end_headers()
|
|
|
|
|
return
|
|
|
|
|
params = parse_qs(parsed.query)
|
|
|
|
|
self.send_response(200)
|
|
|
|
|
self.send_header("Content-type", "text/html")
|
|
|
|
|
self.end_headers()
|
|
|
|
|
self.wfile.write(_CALLBACK_HTML.encode())
|
|
|
|
|
code_bucket.append(params.get("code", [""])[0])
|
|
|
|
|
|
|
|
|
|
return _Handler
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_token(client_id: str, client_secret: str) -> str:
|
|
|
|
|
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode()
|
|
|
|
|
code_verifier = re.sub(r"[^a-zA-Z0-9]+", "", code_verifier)
|
|
|
|
|
|
|
|
|
|
digest = hashlib.sha256(code_verifier.encode()).digest()
|
|
|
|
|
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
|
|
|
|
|
|
|
|
|
|
auth_params = (
|
|
|
|
|
f"?response_type=code"
|
|
|
|
|
f"&client_id={client_id}"
|
|
|
|
|
f"&redirect_uri={REDIRECT_URI}"
|
|
|
|
|
f"&scope={SCOPES.replace(' ', '%20')}"
|
|
|
|
|
f"&code_challenge={code_challenge}"
|
|
|
|
|
f"&code_challenge_method=S256"
|
|
|
|
|
)
|
|
|
|
|
auth_url = AUTH_URL + auth_params
|
|
|
|
|
|
|
|
|
|
code_bucket = []
|
|
|
|
|
server = http.server.HTTPServer(("localhost", 3000), _make_callback_handler(code_bucket))
|
|
|
|
|
|
|
|
|
|
print("Opening browser for Nexar login...")
|
|
|
|
|
webbrowser.open(auth_url)
|
|
|
|
|
|
|
|
|
|
while not code_bucket:
|
|
|
|
|
server.handle_request()
|
|
|
|
|
server.server_close()
|
|
|
|
|
|
|
|
|
|
auth_code = code_bucket[0]
|
|
|
|
|
if not auth_code:
|
|
|
|
|
print("Error: no authorization code received.", file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
response = requests.post(
|
|
|
|
|
TOKEN_URL,
|
|
|
|
|
data={
|
|
|
|
|
"grant_type": "authorization_code",
|
|
|
|
|
"client_id": client_id,
|
|
|
|
|
"client_secret": client_secret,
|
|
|
|
|
"redirect_uri": REDIRECT_URI,
|
|
|
|
|
"code": auth_code,
|
|
|
|
|
"code_verifier": code_verifier,
|
|
|
|
|
},
|
|
|
|
|
timeout=30,
|
|
|
|
|
)
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
return response.json()["access_token"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def graphql(token: str, url: str, query: str, variables: dict = None) -> dict:
|
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
|
|
|
payload = {"query": query}
|
|
|
|
|
if variables:
|
|
|
|
|
payload["variables"] = variables
|
|
|
|
|
|
|
|
|
|
response = requests.post(url, json=payload, headers=headers, timeout=30)
|
|
|
|
|
if not response.ok:
|
|
|
|
|
print(f"HTTP {response.status_code}: {response.text}", file=sys.stderr)
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
|
result = response.json()
|
|
|
|
|
if "errors" in result:
|
|
|
|
|
for err in result["errors"]:
|
|
|
|
|
print(f"GraphQL error: {err.get('message')}", file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
return result["data"]
|
|
|
|
|
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
def prompt_choice(prompt: str, count: int) -> int:
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
choice = int(input(prompt))
|
|
|
|
|
if 1 <= choice <= count:
|
|
|
|
|
return choice
|
|
|
|
|
print(f" Please enter a number between 1 and {count}.")
|
|
|
|
|
except ValueError:
|
|
|
|
|
print(" Please enter a valid number.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normalise(s: str) -> str:
|
|
|
|
|
"""Lowercase and strip all non-alphanumeric characters for fuzzy matching."""
|
|
|
|
|
return re.sub(r"[^a-z0-9]", "", s.lower())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _prefix_match(a: str, b: str) -> bool:
|
|
|
|
|
"""True if normalised strings match or one is a prefix of the other."""
|
|
|
|
|
na, nb = normalise(a), normalise(b)
|
|
|
|
|
return bool(na and nb and (na == nb or na.startswith(nb) or nb.startswith(na)))
|
|
|
|
|
|
|
|
|
|
|
2026-04-09 15:27:23 +01:00
|
|
|
def _mpn_match(a: str, b: str) -> bool:
|
|
|
|
|
"""True if MPNs match, accounting for packaging suffix (last character) differences."""
|
|
|
|
|
na, nb = normalise(a), normalise(b)
|
|
|
|
|
if not na or not nb:
|
|
|
|
|
return False
|
|
|
|
|
if na == nb or na.startswith(nb) or nb.startswith(na):
|
|
|
|
|
return True
|
|
|
|
|
# Strip last character (e.g. R=reel, T=tape) and retry
|
|
|
|
|
if len(na) > 1 and len(nb) > 1 and na[:-1] == nb[:-1]:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
2026-04-10 14:55:30 +01:00
|
|
|
def _norm_elec(s: str) -> str:
|
|
|
|
|
"""Normalise electrical value: lowercase, µ→u, strip non-alphanumeric."""
|
|
|
|
|
if not s:
|
|
|
|
|
return ""
|
|
|
|
|
s = s.strip().lower()
|
|
|
|
|
for frm, to in [('\u00b5', 'u'), ('\u03bc', 'u'), ('µ', 'u'), ('μ', 'u'),
|
|
|
|
|
('\u03a9', 'ohm'), ('\u2126', 'ohm'), ('mf', 'uf')]:
|
|
|
|
|
s = s.replace(frm, to)
|
|
|
|
|
return re.sub(r'[^a-z0-9]', '', s)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _elec_match(a: str, b: str) -> bool:
|
|
|
|
|
"""True if two electrical values match (prefix-tolerant after normalisation)."""
|
|
|
|
|
na, nb = _norm_elec(a), _norm_elec(b)
|
|
|
|
|
if not na or not nb:
|
|
|
|
|
return False
|
|
|
|
|
return na == nb or nb.startswith(na) or na.startswith(nb)
|
|
|
|
|
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
def load_bom_data(bom_path: str) -> list | None:
|
|
|
|
|
"""Load BOM xlsx and return a list of entry dicts.
|
|
|
|
|
Returns None if no 'bom' tab is found."""
|
|
|
|
|
wb = openpyxl.load_workbook(bom_path, data_only=True)
|
|
|
|
|
|
|
|
|
|
bom_sheet = next((wb[n] for n in wb.sheetnames if n.lower() == "bom"), None)
|
|
|
|
|
if bom_sheet is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
header_row = next(bom_sheet.iter_rows(min_row=1, max_row=1, values_only=True))
|
|
|
|
|
headers = {str(v).lower().strip(): i for i, v in enumerate(header_row) if v is not None}
|
|
|
|
|
|
|
|
|
|
def find_col(*candidates):
|
|
|
|
|
for c in candidates:
|
|
|
|
|
if c in headers:
|
|
|
|
|
return headers[c]
|
|
|
|
|
return None
|
|
|
|
|
|
2026-04-10 14:55:30 +01:00
|
|
|
def cell(row, col):
|
|
|
|
|
return str(row[col]).strip() if col is not None and len(row) > col and row[col] is not None else ""
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
mfr_col = find_col("manufacturer", "mfr", "manufacturer name")
|
|
|
|
|
mpn_col = find_col("mpn", "manufacturer part number", "manufacturer part no", "part number")
|
|
|
|
|
cost_col = find_col("unit cost", "unit price", "cost", "price")
|
2026-04-10 14:55:30 +01:00
|
|
|
desc_col = find_col("description", "desc", "component description")
|
|
|
|
|
res_col = find_col("resistance", "resistance (ohms)")
|
|
|
|
|
tol_col = find_col("tolerance", "tol")
|
|
|
|
|
cap_col = find_col("capacitance", "capacitance (farads)")
|
|
|
|
|
volt_col = find_col("voltage", "voltage rating", "voltage - rated", "rated voltage")
|
|
|
|
|
fp_col = find_col("footprint", "case", "package", "package / case", "case - imperial", "case - metric")
|
2026-04-01 16:25:54 +01:00
|
|
|
|
|
|
|
|
entries = []
|
|
|
|
|
for row in bom_sheet.iter_rows(min_row=2, values_only=True):
|
2026-04-10 14:55:30 +01:00
|
|
|
mfr = cell(row, mfr_col)
|
|
|
|
|
mpn = cell(row, mpn_col)
|
2026-04-01 16:25:54 +01:00
|
|
|
if not mfr and not mpn:
|
|
|
|
|
continue
|
|
|
|
|
try:
|
|
|
|
|
cost = float(row[cost_col]) if cost_col is not None and len(row) > cost_col and row[cost_col] is not None else 0.0
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
cost = 0.0
|
2026-04-10 14:55:30 +01:00
|
|
|
entries.append({
|
|
|
|
|
"manufacturer": mfr,
|
|
|
|
|
"mpn": mpn,
|
|
|
|
|
"unit_cost": cost,
|
|
|
|
|
"description": cell(row, desc_col),
|
|
|
|
|
"resistance": cell(row, res_col),
|
|
|
|
|
"tolerance": cell(row, tol_col),
|
|
|
|
|
"capacitance": cell(row, cap_col),
|
|
|
|
|
"voltage": cell(row, volt_col),
|
|
|
|
|
"footprint": cell(row, fp_col),
|
|
|
|
|
})
|
2026-04-01 16:25:54 +01:00
|
|
|
|
|
|
|
|
return entries
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _find_bom_match(bom_data: list, mfr_name: str, mpn: str) -> dict | None:
|
|
|
|
|
"""Return the highest unit cost BOM entry that prefix-matches both manufacturer and MPN."""
|
|
|
|
|
candidates = [
|
|
|
|
|
e for e in bom_data
|
2026-04-09 15:27:23 +01:00
|
|
|
if _prefix_match(e["manufacturer"], mfr_name) and _mpn_match(e["mpn"], mpn)
|
2026-04-01 16:25:54 +01:00
|
|
|
]
|
|
|
|
|
return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None
|
|
|
|
|
|
|
|
|
|
|
2026-04-10 14:55:30 +01:00
|
|
|
def _extract_r_vals(s: str) -> tuple:
|
|
|
|
|
"""Extract (resistance_norm, footprint) from a description string using word boundaries."""
|
|
|
|
|
rm = re.search(r'\b(\d+\.?\d*\s*(?:kohms?|mohms?|k[\s\-]?ohms?|ohms?|k|m|r))\b', s, re.IGNORECASE)
|
|
|
|
|
resistance = _norm_elec(rm.group(1)) if rm else ""
|
|
|
|
|
fm = re.search(r'\b(0\d{3})\b', s)
|
|
|
|
|
footprint = fm.group(1) if fm else ""
|
|
|
|
|
return resistance, footprint
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_c_vals(s: str) -> tuple:
|
|
|
|
|
"""Extract (capacitance_norm, voltage_norm, case) from a description string."""
|
|
|
|
|
cm = re.search(r'\b(\d+\.?\d*\s*(?:p|n|u|µ|μ|m)\s*f)\b', s, re.IGNORECASE)
|
|
|
|
|
capacitance = _norm_elec(cm.group(1)) if cm else ""
|
|
|
|
|
vm = re.search(r'\b(\d+\.?\d*\s*v)\b', s, re.IGNORECASE)
|
|
|
|
|
voltage = _norm_elec(vm.group(1)) if vm else ""
|
|
|
|
|
fm = re.search(r'\b(0\d{3})\b', s)
|
|
|
|
|
case = fm.group(1) if fm else ""
|
|
|
|
|
return capacitance, voltage, case
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _find_passive_bom_match(bom_data: list, designator: str, nexar_params: dict, nexar_desc: str) -> dict | None:
|
|
|
|
|
"""Fallback matcher for R/C: extract values from Nexar params/description, match against BOM description."""
|
|
|
|
|
# Take only leading letters from designator e.g. C38 → C, R102 → R
|
|
|
|
|
prefix_match = re.match(r'^([a-zA-Z]+)', designator)
|
|
|
|
|
prefix = prefix_match.group(1).upper() if prefix_match else ""
|
|
|
|
|
if prefix not in ("R", "C"):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
pl = {k.lower(): v.strip() for k, v in nexar_params.items()
|
|
|
|
|
if v and str(v).strip() and str(v).strip() != "-"}
|
|
|
|
|
|
|
|
|
|
def gp(*keys):
|
|
|
|
|
for k in keys:
|
|
|
|
|
v = pl.get(k.lower(), "")
|
|
|
|
|
if v:
|
|
|
|
|
return v
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
pkg_keys = ("package / case", "package/case", "case - imperial",
|
|
|
|
|
"case - metric", "supplier device package", "footprint", "case")
|
|
|
|
|
|
|
|
|
|
if prefix == "R":
|
|
|
|
|
resistance = gp("resistance")
|
|
|
|
|
footprint = gp(*pkg_keys)
|
|
|
|
|
if not resistance or not footprint:
|
|
|
|
|
r_d, fp_d = _extract_r_vals(nexar_desc)
|
|
|
|
|
if not resistance: resistance = r_d
|
|
|
|
|
if not footprint: footprint = fp_d
|
|
|
|
|
if not resistance or not footprint:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
candidates = []
|
|
|
|
|
for e in bom_data:
|
|
|
|
|
desc = e.get("description", "")
|
|
|
|
|
if not desc or "res" not in desc.lower():
|
|
|
|
|
continue
|
|
|
|
|
bom_r, bom_fp = _extract_r_vals(desc)
|
|
|
|
|
if not bom_r or not _elec_match(bom_r, resistance):
|
|
|
|
|
continue
|
|
|
|
|
if not bom_fp or bom_fp.lower() != footprint.lower().strip():
|
|
|
|
|
continue
|
|
|
|
|
candidates.append(e)
|
|
|
|
|
|
|
|
|
|
else: # C
|
|
|
|
|
capacitance = gp("capacitance")
|
|
|
|
|
voltage = gp("voltage - rated", "voltage rating", "rated voltage", "voltage")
|
|
|
|
|
case = gp(*pkg_keys)
|
|
|
|
|
if not capacitance or not case:
|
|
|
|
|
c_d, v_d, cs_d = _extract_c_vals(nexar_desc)
|
|
|
|
|
if not capacitance: capacitance = c_d
|
|
|
|
|
if not voltage: voltage = v_d
|
|
|
|
|
if not case: case = cs_d
|
|
|
|
|
if not capacitance or not case:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
candidates = []
|
|
|
|
|
for e in bom_data:
|
|
|
|
|
desc = e.get("description", "")
|
|
|
|
|
if not desc or "cap" not in desc.lower():
|
|
|
|
|
continue
|
|
|
|
|
bom_c, bom_v, bom_case = _extract_c_vals(desc)
|
|
|
|
|
if not bom_c or not _elec_match(bom_c, capacitance):
|
|
|
|
|
continue
|
|
|
|
|
if not bom_case or bom_case.lower() != case.lower().strip():
|
|
|
|
|
continue
|
|
|
|
|
if voltage and bom_v and not _elec_match(bom_v, voltage):
|
|
|
|
|
continue
|
|
|
|
|
candidates.append(e)
|
|
|
|
|
|
|
|
|
|
return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None
|
|
|
|
|
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
MAX_MFR_PARTS = 5
|
|
|
|
|
|
|
|
|
|
def export_to_xlsx(project_name: str, variant_name: str, schematics: list, bom_data: dict | None) -> str:
|
|
|
|
|
wb = openpyxl.Workbook()
|
|
|
|
|
wb.remove(wb.active)
|
|
|
|
|
|
|
|
|
|
# Build header row
|
|
|
|
|
mfr_headers = []
|
|
|
|
|
for n in range(1, MAX_MFR_PARTS + 1):
|
|
|
|
|
mfr_headers += [f"Manufacturer {n}", f"Manufacturer Part Number {n}"]
|
|
|
|
|
headers = ["Designator", "Reference", "Description"] + mfr_headers
|
|
|
|
|
if bom_data is not None:
|
|
|
|
|
headers += ["Matched Manufacturer", "Matched MPN", "Unit Cost"]
|
|
|
|
|
|
|
|
|
|
col_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
|
|
|
|
|
|
|
|
for sheet in schematics:
|
|
|
|
|
sheet_name = sheet["documentName"] or sheet["documentId"]
|
|
|
|
|
safe_name = re.sub(r"[\\/*?:\[\]]", "_", sheet_name)[:31]
|
|
|
|
|
ws = wb.create_sheet(title=safe_name)
|
|
|
|
|
ws.append(headers)
|
|
|
|
|
|
|
|
|
|
for i, h in enumerate(headers):
|
|
|
|
|
col = col_letters[i] if i < 26 else col_letters[i // 26 - 1] + col_letters[i % 26]
|
|
|
|
|
ws.column_dimensions[col].width = 28 if "Manufacturer" in h or "Part" in h else 16 if i == 0 else 36
|
|
|
|
|
|
|
|
|
|
nodes = (sheet.get("designItems") or {}).get("nodes") or []
|
|
|
|
|
for node in sorted(nodes, key=lambda n: n.get("designator") or ""):
|
|
|
|
|
comp = node.get("component") or {}
|
2026-04-09 15:27:23 +01:00
|
|
|
params = {p["name"]: (p.get("value") or "").strip()
|
|
|
|
|
for p in (comp.get("details") or {}).get("parameters") or []
|
|
|
|
|
if p.get("name")}
|
2026-04-01 16:25:54 +01:00
|
|
|
|
2026-04-09 15:27:23 +01:00
|
|
|
# Collect up to MAX_MFR_PARTS manufacturer/MPN pairs from named parameters
|
2026-04-01 16:25:54 +01:00
|
|
|
pairs = []
|
2026-04-09 15:27:23 +01:00
|
|
|
for n in range(1, MAX_MFR_PARTS + 1):
|
|
|
|
|
mfr_name = params.get(f"Manufacturer {n}", "").strip()
|
|
|
|
|
mpn = params.get(f"Manufacturer Part Number {n}", "").strip()
|
|
|
|
|
if not mfr_name or not mpn or mfr_name == "-" or mpn == "-":
|
|
|
|
|
pairs.append(("-", "-"))
|
|
|
|
|
else:
|
|
|
|
|
pairs.append((mfr_name, mpn))
|
2026-04-01 16:25:54 +01:00
|
|
|
flat_pairs = [v for pair in pairs for v in pair]
|
|
|
|
|
row = [node.get("designator") or "", comp.get("name") or "", comp.get("description") or ""] + flat_pairs
|
|
|
|
|
|
|
|
|
|
if bom_data is not None:
|
|
|
|
|
match = None
|
2026-04-13 07:15:42 +01:00
|
|
|
designator = node.get("designator") or ""
|
|
|
|
|
ref = comp.get("name") or ""
|
|
|
|
|
nexar_desc = comp.get("description") or ""
|
|
|
|
|
|
|
|
|
|
valid_pairs = [(m, p) for m, p in pairs if m != "-" or p != "-"]
|
|
|
|
|
|
|
|
|
|
# --- MPN / manufacturer match ---
|
|
|
|
|
for mfr_name, mpn in valid_pairs:
|
2026-04-01 16:25:54 +01:00
|
|
|
match = _find_bom_match(bom_data, mfr_name, mpn)
|
|
|
|
|
if match:
|
|
|
|
|
break
|
2026-04-13 07:15:42 +01:00
|
|
|
|
|
|
|
|
# --- Passive fallback (R / C) ---
|
2026-04-10 14:55:30 +01:00
|
|
|
if not match:
|
2026-04-13 07:15:42 +01:00
|
|
|
match = _find_passive_bom_match(bom_data, designator, params, nexar_desc)
|
|
|
|
|
|
|
|
|
|
# --- Diagnostics when no match ---
|
|
|
|
|
if not match:
|
|
|
|
|
if not valid_pairs:
|
|
|
|
|
print(f" NO MATCH [{designator}] {ref}: no manufacturer data in Nexar")
|
|
|
|
|
else:
|
|
|
|
|
tried_mpns = [p for _, p in valid_pairs]
|
|
|
|
|
# Check if any Nexar MPN exists in BOM ignoring manufacturer
|
|
|
|
|
mpn_in_bom = next(
|
|
|
|
|
(e for e in bom_data for mpn in tried_mpns if _mpn_match(e["mpn"], mpn)),
|
|
|
|
|
None
|
|
|
|
|
)
|
|
|
|
|
if mpn_in_bom:
|
|
|
|
|
print(
|
|
|
|
|
f" NO MATCH [{designator}] {ref}: "
|
|
|
|
|
f"MPN '{mpn_in_bom['mpn']}' found in BOM but manufacturer differs "
|
|
|
|
|
f"(BOM: '{mpn_in_bom['manufacturer']}' vs Nexar: '{valid_pairs[0][0]}')"
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
# Check if BOM has the manufacturer but with a different MPN
|
|
|
|
|
mfr_in_bom = next(
|
|
|
|
|
(e for e in bom_data for mfr, _ in valid_pairs if _prefix_match(e["manufacturer"], mfr)),
|
|
|
|
|
None
|
|
|
|
|
)
|
|
|
|
|
if mfr_in_bom:
|
|
|
|
|
print(
|
|
|
|
|
f" NO MATCH [{designator}] {ref}: "
|
|
|
|
|
f"manufacturer '{mfr_in_bom['manufacturer']}' found in BOM "
|
|
|
|
|
f"but MPN differs (BOM: '{mfr_in_bom['mpn']}' vs Nexar: {tried_mpns})"
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
print(
|
|
|
|
|
f" NO MATCH [{designator}] {ref}: "
|
|
|
|
|
f"Nexar MPN(s) {tried_mpns} not found in BOM at all"
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
if match:
|
|
|
|
|
row += [match["manufacturer"], match["mpn"], match["unit_cost"]]
|
|
|
|
|
else:
|
|
|
|
|
row += ["-", "-", "-"]
|
|
|
|
|
|
|
|
|
|
ws.append(row)
|
|
|
|
|
|
|
|
|
|
output_dir = os.path.join(os.path.dirname(__file__), "output")
|
|
|
|
|
safe_project = re.sub(r'[\\/:*?"<>|]', "_", project_name).strip()
|
|
|
|
|
filename = os.path.join(output_dir, f"{safe_project} - {variant_name}.xlsx")
|
|
|
|
|
wb.save(filename)
|
|
|
|
|
return filename
|
|
|
|
|
|
|
|
|
|
|
2026-03-31 15:38:56 +01:00
|
|
|
def main():
|
|
|
|
|
client_id = os.environ.get("NEXAR_CLIENT_ID")
|
|
|
|
|
client_secret = os.environ.get("NEXAR_CLIENT_SECRET")
|
|
|
|
|
|
|
|
|
|
if not client_id or not client_secret:
|
2026-04-01 16:25:54 +01:00
|
|
|
print("Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", file=sys.stderr)
|
2026-03-31 15:38:56 +01:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
token = get_token(client_id, client_secret)
|
|
|
|
|
print("Authenticated.\n")
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
# --- Project selection ---
|
2026-03-31 15:38:56 +01:00
|
|
|
print(f"Fetching projects from workspace '{WORKSPACE_NAME}'...")
|
|
|
|
|
data = graphql(token, NEXAR_API_URL, QUERY_WORKSPACES_AND_PROJECTS, {"workspace": WORKSPACE_NAME})
|
|
|
|
|
workspaces = data["desWorkspaces"]
|
|
|
|
|
|
|
|
|
|
if not workspaces:
|
2026-04-01 16:25:54 +01:00
|
|
|
print("No workspaces found.")
|
2026-03-31 15:38:56 +01:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
projects = workspaces[0].get("projects") or []
|
|
|
|
|
if not projects:
|
|
|
|
|
print("No projects found in this workspace.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"\nProjects in '{WORKSPACE_NAME}':")
|
|
|
|
|
print("-" * 60)
|
|
|
|
|
for i, project in enumerate(projects, start=1):
|
2026-04-01 16:25:54 +01:00
|
|
|
desc = project.get("description") or ""
|
|
|
|
|
print(f" [{i}] {project['name']}" + (f" — {desc}" if desc else ""))
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
print()
|
2026-04-01 16:25:54 +01:00
|
|
|
choice = prompt_choice(f"Select a project (1-{len(projects)}): ", len(projects))
|
2026-03-31 15:38:56 +01:00
|
|
|
selected_project = projects[choice - 1]
|
|
|
|
|
print(f"\nSelected project: {selected_project['name']}")
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
# --- Variant selection ---
|
2026-03-31 15:38:56 +01:00
|
|
|
print("\nFetching variants...")
|
|
|
|
|
data = graphql(token, NEXAR_API_URL, QUERY_VARIANTS, {"projectId": selected_project["id"]})
|
2026-04-01 16:25:54 +01:00
|
|
|
variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
if not variants:
|
|
|
|
|
print("No variants found for this project.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"\nVariants in '{selected_project['name']}':")
|
|
|
|
|
print("-" * 60)
|
|
|
|
|
for i, variant in enumerate(variants, start=1):
|
|
|
|
|
print(f" [{i}] {variant['name']}")
|
|
|
|
|
|
|
|
|
|
print()
|
2026-04-01 16:25:54 +01:00
|
|
|
choice = prompt_choice(f"Select a variant (1-{len(variants)}): ", len(variants))
|
2026-03-31 15:38:56 +01:00
|
|
|
selected_variant = variants[choice - 1]
|
|
|
|
|
print(f"\nSelected variant: {selected_variant['name']}")
|
|
|
|
|
|
2026-04-01 16:25:54 +01:00
|
|
|
# --- BOM file selection ---
|
|
|
|
|
bom_dir = os.path.join(os.path.dirname(__file__), "BOM")
|
|
|
|
|
bom_files = sorted(f for f in os.listdir(bom_dir) if f.lower().endswith(".xlsx"))
|
|
|
|
|
|
|
|
|
|
if not bom_files:
|
|
|
|
|
print(f"\nNo xlsx files found in {bom_dir}.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"\nBOM files available:")
|
|
|
|
|
print("-" * 60)
|
|
|
|
|
for i, name in enumerate(bom_files, start=1):
|
|
|
|
|
print(f" [{i}] {name}")
|
|
|
|
|
|
|
|
|
|
print()
|
|
|
|
|
choice = prompt_choice(f"Select a BOM file (1-{len(bom_files)}): ", len(bom_files))
|
|
|
|
|
selected_bom = os.path.join(bom_dir, bom_files[choice - 1])
|
|
|
|
|
print(f"\nSelected BOM: {bom_files[choice - 1]}")
|
|
|
|
|
|
|
|
|
|
# --- Fetch schematic sheet list ---
|
|
|
|
|
print("\nFetching schematic sheets...")
|
|
|
|
|
data = graphql(token, NEXAR_API_URL, QUERY_SCHEMATICS, {"projectId": selected_project["id"]})
|
|
|
|
|
all_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
|
|
|
|
|
|
|
|
|
|
variant_data = next((v for v in all_variants if v["name"] == selected_variant["name"]), None)
|
|
|
|
|
if not variant_data:
|
|
|
|
|
print("Could not find schematic data for the selected variant.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
sheet_list = variant_data.get("schematics") or []
|
|
|
|
|
if not sheet_list:
|
|
|
|
|
print("No schematic sheets found.")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print(f"Found {len(sheet_list)} schematic sheet(s). Fetching components...")
|
|
|
|
|
|
|
|
|
|
# Build a map of documentId -> {documentName, nodes[]}
|
|
|
|
|
sheet_map = {s["documentId"]: {"documentName": s["documentName"], "nodes": []} for s in sheet_list}
|
|
|
|
|
|
|
|
|
|
# Paginate all components across all sheets
|
|
|
|
|
cursor = None
|
|
|
|
|
while True:
|
|
|
|
|
variables = {"projectId": selected_project["id"]}
|
|
|
|
|
if cursor:
|
|
|
|
|
variables["cursor"] = cursor
|
|
|
|
|
|
|
|
|
|
data = graphql(token, NEXAR_API_URL, QUERY_SHEET_COMPONENTS, variables)
|
|
|
|
|
fetched_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
|
|
|
|
|
fetched_variant = next((v for v in fetched_variants if v["name"] == selected_variant["name"]), None)
|
|
|
|
|
if not fetched_variant:
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
has_next = False
|
|
|
|
|
next_cursor = None
|
|
|
|
|
for sheet in fetched_variant.get("schematics") or []:
|
|
|
|
|
doc_id = sheet["documentId"]
|
|
|
|
|
if doc_id not in sheet_map:
|
|
|
|
|
continue
|
|
|
|
|
design_items = sheet.get("designItems") or {}
|
|
|
|
|
sheet_map[doc_id]["nodes"].extend(design_items.get("nodes") or [])
|
|
|
|
|
page_info = design_items.get("pageInfo") or {}
|
|
|
|
|
if page_info.get("hasNextPage"):
|
|
|
|
|
has_next = True
|
|
|
|
|
next_cursor = page_info.get("endCursor")
|
|
|
|
|
|
|
|
|
|
if not has_next:
|
|
|
|
|
break
|
|
|
|
|
cursor = next_cursor
|
|
|
|
|
|
|
|
|
|
# Rebuild schematics list in original order for export
|
|
|
|
|
schematics = [
|
|
|
|
|
{"documentName": sheet_map[s["documentId"]]["documentName"],
|
|
|
|
|
"documentId": s["documentId"],
|
|
|
|
|
"designItems": {"nodes": sheet_map[s["documentId"]]["nodes"]}}
|
|
|
|
|
for s in sheet_list
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# --- Fetch fitted designators for selected variant ---
|
|
|
|
|
print("Fetching fitted components for variant...")
|
|
|
|
|
data = graphql(token, NEXAR_API_URL, QUERY_FITTED_DESIGNATORS, {"projectId": selected_project["id"]})
|
|
|
|
|
bom_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
|
|
|
|
|
bom_variant = next((v for v in bom_variants if v["name"] == selected_variant["name"]), None)
|
|
|
|
|
|
|
|
|
|
fitted_designators = set()
|
|
|
|
|
if bom_variant:
|
|
|
|
|
for bom_item in (bom_variant.get("bom") or {}).get("bomItems") or []:
|
|
|
|
|
for instance in bom_item.get("bomItemInstances") or []:
|
|
|
|
|
if instance.get("isFitted"):
|
|
|
|
|
fitted_designators.add(instance["designator"])
|
|
|
|
|
|
|
|
|
|
# Filter each sheet's nodes to fitted components only
|
|
|
|
|
for s in schematics:
|
|
|
|
|
s["designItems"]["nodes"] = [
|
|
|
|
|
n for n in s["designItems"]["nodes"]
|
|
|
|
|
if n.get("designator") in fitted_designators
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
total_components = sum(len(s["designItems"]["nodes"]) for s in schematics)
|
|
|
|
|
print(f"Total: {total_components} fitted component(s) across {len(schematics)} sheet(s).")
|
|
|
|
|
|
|
|
|
|
# --- Load BOM cross-reference data ---
|
|
|
|
|
bom_data = load_bom_data(selected_bom)
|
|
|
|
|
if bom_data is None:
|
|
|
|
|
print("No 'bom' tab found in the selected BOM file — exporting without BOM data.")
|
|
|
|
|
else:
|
|
|
|
|
print(f"Loaded {len(bom_data)} entries from BOM.")
|
|
|
|
|
|
|
|
|
|
# --- Export ---
|
|
|
|
|
filename = export_to_xlsx(selected_project["name"], selected_variant["name"], schematics, bom_data)
|
|
|
|
|
print(f"\nExported to: {filename}")
|
|
|
|
|
|
2026-03-31 15:38:56 +01:00
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2026-04-01 16:25:54 +01:00
|
|
|
main()
|