Files
NEXARxCLAUDE/nexarxclaude.py

555 lines
18 KiB
Python
Raw Normal View History

2026-03-31 15:38:56 +01:00
"""
2026-04-01 16:25:54 +01:00
List all projects in Altium 365 workspaces via the Nexar GraphQL API,
then export schematic sheet components to an xlsx file.
2026-03-31 15:38:56 +01:00
Requirements:
2026-04-01 16:25:54 +01:00
pip install requests openpyxl
2026-03-31 15:38:56 +01:00
Environment variables:
NEXAR_CLIENT_ID - Your Nexar application client ID
NEXAR_CLIENT_SECRET - Your Nexar application client secret
Your Nexar application must have the 'design' scope enabled.
Authentication opens a browser window for Altium 365 login.
"""
import base64
import hashlib
import http.server
import os
import re
import sys
import webbrowser
from urllib.parse import parse_qs, urlparse
2026-04-01 16:25:54 +01:00
import openpyxl
2026-03-31 15:38:56 +01:00
import requests
TOKEN_URL = "https://identity.nexar.com/connect/token"
AUTH_URL = "https://identity.nexar.com/connect/authorize"
NEXAR_API_URL = "https://api.nexar.com/graphql"
REDIRECT_URI = "http://localhost:3000/login"
SCOPES = "openid profile email design.domain user.access"
WORKSPACE_NAME = "Flowbird SAS"
QUERY_WORKSPACES_AND_PROJECTS = """
query GetWorkspacesAndProjects($workspace: String!) {
desWorkspaces(where: {name: {eq: $workspace}}) {
url
name
description
projects {
id
name
description
}
}
}
"""
QUERY_VARIANTS = """
query GetVariants($projectId: ID!) {
desProjectById(id: $projectId) {
name
design {
workInProgress {
variants {
name
}
}
}
}
}
"""
2026-04-01 16:25:54 +01:00
QUERY_SCHEMATICS = """
query GetSchematics($projectId: ID!) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
schematics {
documentName
documentId
}
}
}
}
}
}
"""
QUERY_SHEET_COMPONENTS = """
query GetSheetComponents($projectId: ID!, $cursor: String) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
schematics {
documentId
designItems(first: 100, after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
nodes {
designator
component {
name
description
2026-04-09 15:27:23 +01:00
details {
parameters {
name
value
}
2026-04-01 16:25:54 +01:00
}
}
}
}
}
}
}
}
}
}
"""
QUERY_FITTED_DESIGNATORS = """
query GetFittedDesignators($projectId: ID!) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
bom {
bomItems {
bomItemInstances {
designator
isFitted
}
}
}
}
}
}
}
}
"""
2026-03-31 15:38:56 +01:00
_CALLBACK_HTML = """
<html><head><title>Nexar Login</title></head>
<body style="background:#000b24;color:#fff;font-family:sans-serif;text-align:center;padding-top:20%">
<h1>Login successful</h1><p>You can close this tab and return to the terminal.</p>
</body></html>
"""
def _make_callback_handler(code_bucket):
class _Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, *_):
2026-04-01 16:25:54 +01:00
pass
2026-03-31 15:38:56 +01:00
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path != "/login":
self.send_response(404)
self.end_headers()
return
params = parse_qs(parsed.query)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(_CALLBACK_HTML.encode())
code_bucket.append(params.get("code", [""])[0])
return _Handler
def get_token(client_id: str, client_secret: str) -> str:
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode()
code_verifier = re.sub(r"[^a-zA-Z0-9]+", "", code_verifier)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
auth_params = (
f"?response_type=code"
f"&client_id={client_id}"
f"&redirect_uri={REDIRECT_URI}"
f"&scope={SCOPES.replace(' ', '%20')}"
f"&code_challenge={code_challenge}"
f"&code_challenge_method=S256"
)
auth_url = AUTH_URL + auth_params
code_bucket = []
server = http.server.HTTPServer(("localhost", 3000), _make_callback_handler(code_bucket))
print("Opening browser for Nexar login...")
webbrowser.open(auth_url)
while not code_bucket:
server.handle_request()
server.server_close()
auth_code = code_bucket[0]
if not auth_code:
print("Error: no authorization code received.", file=sys.stderr)
sys.exit(1)
response = requests.post(
TOKEN_URL,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": REDIRECT_URI,
"code": auth_code,
"code_verifier": code_verifier,
},
timeout=30,
)
response.raise_for_status()
return response.json()["access_token"]
def graphql(token: str, url: str, query: str, variables: dict = None) -> dict:
headers = {"Authorization": f"Bearer {token}"}
payload = {"query": query}
if variables:
payload["variables"] = variables
response = requests.post(url, json=payload, headers=headers, timeout=30)
if not response.ok:
print(f"HTTP {response.status_code}: {response.text}", file=sys.stderr)
response.raise_for_status()
result = response.json()
if "errors" in result:
for err in result["errors"]:
print(f"GraphQL error: {err.get('message')}", file=sys.stderr)
sys.exit(1)
return result["data"]
2026-04-01 16:25:54 +01:00
def prompt_choice(prompt: str, count: int) -> int:
while True:
try:
choice = int(input(prompt))
if 1 <= choice <= count:
return choice
print(f" Please enter a number between 1 and {count}.")
except ValueError:
print(" Please enter a valid number.")
def normalise(s: str) -> str:
"""Lowercase and strip all non-alphanumeric characters for fuzzy matching."""
return re.sub(r"[^a-z0-9]", "", s.lower())
def _prefix_match(a: str, b: str) -> bool:
"""True if normalised strings match or one is a prefix of the other."""
na, nb = normalise(a), normalise(b)
return bool(na and nb and (na == nb or na.startswith(nb) or nb.startswith(na)))
2026-04-09 15:27:23 +01:00
def _mpn_match(a: str, b: str) -> bool:
"""True if MPNs match, accounting for packaging suffix (last character) differences."""
na, nb = normalise(a), normalise(b)
if not na or not nb:
return False
if na == nb or na.startswith(nb) or nb.startswith(na):
return True
# Strip last character (e.g. R=reel, T=tape) and retry
if len(na) > 1 and len(nb) > 1 and na[:-1] == nb[:-1]:
return True
return False
2026-04-01 16:25:54 +01:00
def load_bom_data(bom_path: str) -> list | None:
"""Load BOM xlsx and return a list of entry dicts.
Returns None if no 'bom' tab is found."""
wb = openpyxl.load_workbook(bom_path, data_only=True)
bom_sheet = next((wb[n] for n in wb.sheetnames if n.lower() == "bom"), None)
if bom_sheet is None:
return None
header_row = next(bom_sheet.iter_rows(min_row=1, max_row=1, values_only=True))
headers = {str(v).lower().strip(): i for i, v in enumerate(header_row) if v is not None}
def find_col(*candidates):
for c in candidates:
if c in headers:
return headers[c]
return None
mfr_col = find_col("manufacturer", "mfr", "manufacturer name")
mpn_col = find_col("mpn", "manufacturer part number", "manufacturer part no", "part number")
cost_col = find_col("unit cost", "unit price", "cost", "price")
entries = []
for row in bom_sheet.iter_rows(min_row=2, values_only=True):
mfr = str(row[mfr_col]).strip() if mfr_col is not None and len(row) > mfr_col and row[mfr_col] is not None else ""
mpn = str(row[mpn_col]).strip() if mpn_col is not None and len(row) > mpn_col and row[mpn_col] is not None else ""
if not mfr and not mpn:
continue
try:
cost = float(row[cost_col]) if cost_col is not None and len(row) > cost_col and row[cost_col] is not None else 0.0
except (ValueError, TypeError):
cost = 0.0
entries.append({"manufacturer": mfr, "mpn": mpn, "unit_cost": cost})
return entries
def _find_bom_match(bom_data: list, mfr_name: str, mpn: str) -> dict | None:
"""Return the highest unit cost BOM entry that prefix-matches both manufacturer and MPN."""
candidates = [
e for e in bom_data
2026-04-09 15:27:23 +01:00
if _prefix_match(e["manufacturer"], mfr_name) and _mpn_match(e["mpn"], mpn)
2026-04-01 16:25:54 +01:00
]
return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None
MAX_MFR_PARTS = 5
def export_to_xlsx(project_name: str, variant_name: str, schematics: list, bom_data: dict | None) -> str:
wb = openpyxl.Workbook()
wb.remove(wb.active)
# Build header row
mfr_headers = []
for n in range(1, MAX_MFR_PARTS + 1):
mfr_headers += [f"Manufacturer {n}", f"Manufacturer Part Number {n}"]
headers = ["Designator", "Reference", "Description"] + mfr_headers
if bom_data is not None:
headers += ["Matched Manufacturer", "Matched MPN", "Unit Cost"]
col_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
for sheet in schematics:
sheet_name = sheet["documentName"] or sheet["documentId"]
safe_name = re.sub(r"[\\/*?:\[\]]", "_", sheet_name)[:31]
ws = wb.create_sheet(title=safe_name)
ws.append(headers)
for i, h in enumerate(headers):
col = col_letters[i] if i < 26 else col_letters[i // 26 - 1] + col_letters[i % 26]
ws.column_dimensions[col].width = 28 if "Manufacturer" in h or "Part" in h else 16 if i == 0 else 36
nodes = (sheet.get("designItems") or {}).get("nodes") or []
for node in sorted(nodes, key=lambda n: n.get("designator") or ""):
comp = node.get("component") or {}
2026-04-09 15:27:23 +01:00
params = {p["name"]: (p.get("value") or "").strip()
for p in (comp.get("details") or {}).get("parameters") or []
if p.get("name")}
2026-04-01 16:25:54 +01:00
2026-04-09 15:27:23 +01:00
# Collect up to MAX_MFR_PARTS manufacturer/MPN pairs from named parameters
2026-04-01 16:25:54 +01:00
pairs = []
2026-04-09 15:27:23 +01:00
for n in range(1, MAX_MFR_PARTS + 1):
mfr_name = params.get(f"Manufacturer {n}", "").strip()
mpn = params.get(f"Manufacturer Part Number {n}", "").strip()
if not mfr_name or not mpn or mfr_name == "-" or mpn == "-":
pairs.append(("-", "-"))
else:
pairs.append((mfr_name, mpn))
2026-04-01 16:25:54 +01:00
flat_pairs = [v for pair in pairs for v in pair]
row = [node.get("designator") or "", comp.get("name") or "", comp.get("description") or ""] + flat_pairs
if bom_data is not None:
match = None
for mfr_name, mpn in pairs:
if mfr_name == "-" and mpn == "-":
continue
match = _find_bom_match(bom_data, mfr_name, mpn)
if match:
break
if match:
row += [match["manufacturer"], match["mpn"], match["unit_cost"]]
else:
row += ["-", "-", "-"]
ws.append(row)
output_dir = os.path.join(os.path.dirname(__file__), "output")
safe_project = re.sub(r'[\\/:*?"<>|]', "_", project_name).strip()
filename = os.path.join(output_dir, f"{safe_project} - {variant_name}.xlsx")
wb.save(filename)
return filename
2026-03-31 15:38:56 +01:00
def main():
client_id = os.environ.get("NEXAR_CLIENT_ID")
client_secret = os.environ.get("NEXAR_CLIENT_SECRET")
if not client_id or not client_secret:
2026-04-01 16:25:54 +01:00
print("Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", file=sys.stderr)
2026-03-31 15:38:56 +01:00
sys.exit(1)
token = get_token(client_id, client_secret)
print("Authenticated.\n")
2026-04-01 16:25:54 +01:00
# --- Project selection ---
2026-03-31 15:38:56 +01:00
print(f"Fetching projects from workspace '{WORKSPACE_NAME}'...")
data = graphql(token, NEXAR_API_URL, QUERY_WORKSPACES_AND_PROJECTS, {"workspace": WORKSPACE_NAME})
workspaces = data["desWorkspaces"]
if not workspaces:
2026-04-01 16:25:54 +01:00
print("No workspaces found.")
2026-03-31 15:38:56 +01:00
return
projects = workspaces[0].get("projects") or []
if not projects:
print("No projects found in this workspace.")
return
print(f"\nProjects in '{WORKSPACE_NAME}':")
print("-" * 60)
for i, project in enumerate(projects, start=1):
2026-04-01 16:25:54 +01:00
desc = project.get("description") or ""
print(f" [{i}] {project['name']}" + (f"{desc}" if desc else ""))
2026-03-31 15:38:56 +01:00
print()
2026-04-01 16:25:54 +01:00
choice = prompt_choice(f"Select a project (1-{len(projects)}): ", len(projects))
2026-03-31 15:38:56 +01:00
selected_project = projects[choice - 1]
print(f"\nSelected project: {selected_project['name']}")
2026-04-01 16:25:54 +01:00
# --- Variant selection ---
2026-03-31 15:38:56 +01:00
print("\nFetching variants...")
data = graphql(token, NEXAR_API_URL, QUERY_VARIANTS, {"projectId": selected_project["id"]})
2026-04-01 16:25:54 +01:00
variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
2026-03-31 15:38:56 +01:00
if not variants:
print("No variants found for this project.")
return
print(f"\nVariants in '{selected_project['name']}':")
print("-" * 60)
for i, variant in enumerate(variants, start=1):
print(f" [{i}] {variant['name']}")
print()
2026-04-01 16:25:54 +01:00
choice = prompt_choice(f"Select a variant (1-{len(variants)}): ", len(variants))
2026-03-31 15:38:56 +01:00
selected_variant = variants[choice - 1]
print(f"\nSelected variant: {selected_variant['name']}")
2026-04-01 16:25:54 +01:00
# --- BOM file selection ---
bom_dir = os.path.join(os.path.dirname(__file__), "BOM")
bom_files = sorted(f for f in os.listdir(bom_dir) if f.lower().endswith(".xlsx"))
if not bom_files:
print(f"\nNo xlsx files found in {bom_dir}.")
return
print(f"\nBOM files available:")
print("-" * 60)
for i, name in enumerate(bom_files, start=1):
print(f" [{i}] {name}")
print()
choice = prompt_choice(f"Select a BOM file (1-{len(bom_files)}): ", len(bom_files))
selected_bom = os.path.join(bom_dir, bom_files[choice - 1])
print(f"\nSelected BOM: {bom_files[choice - 1]}")
# --- Fetch schematic sheet list ---
print("\nFetching schematic sheets...")
data = graphql(token, NEXAR_API_URL, QUERY_SCHEMATICS, {"projectId": selected_project["id"]})
all_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
variant_data = next((v for v in all_variants if v["name"] == selected_variant["name"]), None)
if not variant_data:
print("Could not find schematic data for the selected variant.")
return
sheet_list = variant_data.get("schematics") or []
if not sheet_list:
print("No schematic sheets found.")
return
print(f"Found {len(sheet_list)} schematic sheet(s). Fetching components...")
# Build a map of documentId -> {documentName, nodes[]}
sheet_map = {s["documentId"]: {"documentName": s["documentName"], "nodes": []} for s in sheet_list}
# Paginate all components across all sheets
cursor = None
while True:
variables = {"projectId": selected_project["id"]}
if cursor:
variables["cursor"] = cursor
data = graphql(token, NEXAR_API_URL, QUERY_SHEET_COMPONENTS, variables)
fetched_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
fetched_variant = next((v for v in fetched_variants if v["name"] == selected_variant["name"]), None)
if not fetched_variant:
break
has_next = False
next_cursor = None
for sheet in fetched_variant.get("schematics") or []:
doc_id = sheet["documentId"]
if doc_id not in sheet_map:
continue
design_items = sheet.get("designItems") or {}
sheet_map[doc_id]["nodes"].extend(design_items.get("nodes") or [])
page_info = design_items.get("pageInfo") or {}
if page_info.get("hasNextPage"):
has_next = True
next_cursor = page_info.get("endCursor")
if not has_next:
break
cursor = next_cursor
# Rebuild schematics list in original order for export
schematics = [
{"documentName": sheet_map[s["documentId"]]["documentName"],
"documentId": s["documentId"],
"designItems": {"nodes": sheet_map[s["documentId"]]["nodes"]}}
for s in sheet_list
]
# --- Fetch fitted designators for selected variant ---
print("Fetching fitted components for variant...")
data = graphql(token, NEXAR_API_URL, QUERY_FITTED_DESIGNATORS, {"projectId": selected_project["id"]})
bom_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
bom_variant = next((v for v in bom_variants if v["name"] == selected_variant["name"]), None)
fitted_designators = set()
if bom_variant:
for bom_item in (bom_variant.get("bom") or {}).get("bomItems") or []:
for instance in bom_item.get("bomItemInstances") or []:
if instance.get("isFitted"):
fitted_designators.add(instance["designator"])
# Filter each sheet's nodes to fitted components only
for s in schematics:
s["designItems"]["nodes"] = [
n for n in s["designItems"]["nodes"]
if n.get("designator") in fitted_designators
]
total_components = sum(len(s["designItems"]["nodes"]) for s in schematics)
print(f"Total: {total_components} fitted component(s) across {len(schematics)} sheet(s).")
# --- Load BOM cross-reference data ---
bom_data = load_bom_data(selected_bom)
if bom_data is None:
print("No 'bom' tab found in the selected BOM file — exporting without BOM data.")
else:
print(f"Loaded {len(bom_data)} entries from BOM.")
# --- Export ---
filename = export_to_xlsx(selected_project["name"], selected_variant["name"], schematics, bom_data)
print(f"\nExported to: {filename}")
2026-03-31 15:38:56 +01:00
if __name__ == "__main__":
2026-04-01 16:25:54 +01:00
main()