This commit is contained in:
David Rice
2026-04-01 16:25:54 +01:00
parent 2b41c91cfe
commit a6c417569b
2 changed files with 333 additions and 43 deletions

Binary file not shown.

View File

@@ -1,8 +1,9 @@
"""
List all projects in Altium 365 workspaces via the Nexar GraphQL API.
List all projects in Altium 365 workspaces via the Nexar GraphQL API,
then export schematic sheet components to an xlsx file.
Requirements:
pip install requests
pip install requests openpyxl
Environment variables:
NEXAR_CLIENT_ID - Your Nexar application client ID
@@ -21,6 +22,7 @@ import sys
import webbrowser
from urllib.parse import parse_qs, urlparse
import openpyxl
import requests
TOKEN_URL = "https://identity.nexar.com/connect/token"
@@ -61,6 +63,80 @@ query GetVariants($projectId: ID!) {
}
"""
QUERY_SCHEMATICS = """
query GetSchematics($projectId: ID!) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
schematics {
documentName
documentId
}
}
}
}
}
}
"""
QUERY_SHEET_COMPONENTS = """
query GetSheetComponents($projectId: ID!, $cursor: String) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
schematics {
documentId
designItems(first: 100, after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
nodes {
designator
component {
name
description
manufacturerParts {
partNumber
companyName
}
}
}
}
}
}
}
}
}
}
"""
QUERY_FITTED_DESIGNATORS = """
query GetFittedDesignators($projectId: ID!) {
desProjectById(id: $projectId) {
design {
workInProgress {
variants {
name
bom {
bomItems {
bomItemInstances {
designator
isFitted
}
}
}
}
}
}
}
}
"""
_CALLBACK_HTML = """
<html><head><title>Nexar Login</title></head>
<body style="background:#000b24;color:#fff;font-family:sans-serif;text-align:center;padding-top:20%">
@@ -70,10 +146,9 @@ _CALLBACK_HTML = """
def _make_callback_handler(code_bucket):
"""Return an HTTPRequestHandler that captures the OAuth2 auth code."""
class _Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, *_):
pass # silence request logs
pass
def do_GET(self):
parsed = urlparse(self.path)
@@ -92,15 +167,12 @@ def _make_callback_handler(code_bucket):
def get_token(client_id: str, client_secret: str) -> str:
"""Obtain a Nexar access token via Authorization Code + PKCE flow."""
# Generate PKCE verifier and challenge
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode()
code_verifier = re.sub(r"[^a-zA-Z0-9]+", "", code_verifier)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
# Build authorization URL
auth_params = (
f"?response_type=code"
f"&client_id={client_id}"
@@ -111,7 +183,6 @@ def get_token(client_id: str, client_secret: str) -> str:
)
auth_url = AUTH_URL + auth_params
# Start local callback server and open browser
code_bucket = []
server = http.server.HTTPServer(("localhost", 3000), _make_callback_handler(code_bucket))
@@ -127,7 +198,6 @@ def get_token(client_id: str, client_secret: str) -> str:
print("Error: no authorization code received.", file=sys.stderr)
sys.exit(1)
# Exchange auth code for token
response = requests.post(
TOKEN_URL,
data={
@@ -164,33 +234,161 @@ def graphql(token: str, url: str, query: str, variables: dict = None) -> dict:
return result["data"]
def prompt_choice(prompt: str, count: int) -> int:
while True:
try:
choice = int(input(prompt))
if 1 <= choice <= count:
return choice
print(f" Please enter a number between 1 and {count}.")
except ValueError:
print(" Please enter a valid number.")
def normalise(s: str) -> str:
"""Lowercase and strip all non-alphanumeric characters for fuzzy matching."""
return re.sub(r"[^a-z0-9]", "", s.lower())
def _prefix_match(a: str, b: str) -> bool:
"""True if normalised strings match or one is a prefix of the other."""
na, nb = normalise(a), normalise(b)
return bool(na and nb and (na == nb or na.startswith(nb) or nb.startswith(na)))
def load_bom_data(bom_path: str) -> list | None:
"""Load BOM xlsx and return a list of entry dicts.
Returns None if no 'bom' tab is found."""
wb = openpyxl.load_workbook(bom_path, data_only=True)
bom_sheet = next((wb[n] for n in wb.sheetnames if n.lower() == "bom"), None)
if bom_sheet is None:
return None
header_row = next(bom_sheet.iter_rows(min_row=1, max_row=1, values_only=True))
headers = {str(v).lower().strip(): i for i, v in enumerate(header_row) if v is not None}
def find_col(*candidates):
for c in candidates:
if c in headers:
return headers[c]
return None
mfr_col = find_col("manufacturer", "mfr", "manufacturer name")
mpn_col = find_col("mpn", "manufacturer part number", "manufacturer part no", "part number")
cost_col = find_col("unit cost", "unit price", "cost", "price")
entries = []
for row in bom_sheet.iter_rows(min_row=2, values_only=True):
mfr = str(row[mfr_col]).strip() if mfr_col is not None and len(row) > mfr_col and row[mfr_col] is not None else ""
mpn = str(row[mpn_col]).strip() if mpn_col is not None and len(row) > mpn_col and row[mpn_col] is not None else ""
if not mfr and not mpn:
continue
try:
cost = float(row[cost_col]) if cost_col is not None and len(row) > cost_col and row[cost_col] is not None else 0.0
except (ValueError, TypeError):
cost = 0.0
entries.append({"manufacturer": mfr, "mpn": mpn, "unit_cost": cost})
return entries
def _find_bom_match(bom_data: list, mfr_name: str, mpn: str) -> dict | None:
"""Return the highest unit cost BOM entry that prefix-matches both manufacturer and MPN."""
candidates = [
e for e in bom_data
if _prefix_match(e["manufacturer"], mfr_name) and _prefix_match(e["mpn"], mpn)
]
return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None
MAX_MFR_PARTS = 5
def export_to_xlsx(project_name: str, variant_name: str, schematics: list, bom_data: dict | None) -> str:
wb = openpyxl.Workbook()
wb.remove(wb.active)
# Build header row
mfr_headers = []
for n in range(1, MAX_MFR_PARTS + 1):
mfr_headers += [f"Manufacturer {n}", f"Manufacturer Part Number {n}"]
headers = ["Designator", "Reference", "Description"] + mfr_headers
if bom_data is not None:
headers += ["Matched Manufacturer", "Matched MPN", "Unit Cost"]
col_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
for sheet in schematics:
sheet_name = sheet["documentName"] or sheet["documentId"]
safe_name = re.sub(r"[\\/*?:\[\]]", "_", sheet_name)[:31]
ws = wb.create_sheet(title=safe_name)
ws.append(headers)
for i, h in enumerate(headers):
col = col_letters[i] if i < 26 else col_letters[i // 26 - 1] + col_letters[i % 26]
ws.column_dimensions[col].width = 28 if "Manufacturer" in h or "Part" in h else 16 if i == 0 else 36
nodes = (sheet.get("designItems") or {}).get("nodes") or []
for node in sorted(nodes, key=lambda n: n.get("designator") or ""):
comp = node.get("component") or {}
mfr_parts = comp.get("manufacturerParts") or []
# Collect up to MAX_MFR_PARTS manufacturer/MPN pairs
pairs = []
for mp in mfr_parts[:MAX_MFR_PARTS]:
mfr_name = mp.get("companyName") or "-"
mpn = mp.get("partNumber") or "-"
pairs.append((mfr_name, mpn))
# Pad with '-' if fewer than MAX_MFR_PARTS
while len(pairs) < MAX_MFR_PARTS:
pairs.append(("-", "-"))
flat_pairs = [v for pair in pairs for v in pair]
row = [node.get("designator") or "", comp.get("name") or "", comp.get("description") or ""] + flat_pairs
if bom_data is not None:
match = None
for mfr_name, mpn in pairs:
if mfr_name == "-" and mpn == "-":
continue
match = _find_bom_match(bom_data, mfr_name, mpn)
if match:
break
if match:
row += [match["manufacturer"], match["mpn"], match["unit_cost"]]
else:
row += ["-", "-", "-"]
ws.append(row)
output_dir = os.path.join(os.path.dirname(__file__), "output")
safe_project = re.sub(r'[\\/:*?"<>|]', "_", project_name).strip()
filename = os.path.join(output_dir, f"{safe_project} - {variant_name}.xlsx")
wb.save(filename)
return filename
def main():
client_id = os.environ.get("NEXAR_CLIENT_ID")
client_secret = os.environ.get("NEXAR_CLIENT_SECRET")
if not client_id or not client_secret:
print(
"Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.",
file=sys.stderr,
)
print("Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", file=sys.stderr)
sys.exit(1)
token = get_token(client_id, client_secret)
print("Authenticated.\n")
# --- Project selection ---
print(f"Fetching projects from workspace '{WORKSPACE_NAME}'...")
data = graphql(token, NEXAR_API_URL, QUERY_WORKSPACES_AND_PROJECTS, {"workspace": WORKSPACE_NAME})
workspaces = data["desWorkspaces"]
if not workspaces:
print(
"No workspaces found. Ensure the 'design' scope is enabled on your "
"Nexar application and you are a member of an Altium 365 workspace."
)
print("No workspaces found.")
return
projects = workspaces[0].get("projects") or []
if not projects:
print("No projects found in this workspace.")
return
@@ -198,28 +396,18 @@ def main():
print(f"\nProjects in '{WORKSPACE_NAME}':")
print("-" * 60)
for i, project in enumerate(projects, start=1):
description = project.get("description") or ""
desc_str = f"{description}" if description else ""
print(f" [{i}] {project['name']}{desc_str}")
desc = project.get("description") or ""
print(f" [{i}] {project['name']}" + (f"{desc}" if desc else ""))
print()
while True:
try:
choice = int(input(f"Select a project (1-{len(projects)}): "))
if 1 <= choice <= len(projects):
break
print(f" Please enter a number between 1 and {len(projects)}.")
except ValueError:
print(" Please enter a valid number.")
choice = prompt_choice(f"Select a project (1-{len(projects)}): ", len(projects))
selected_project = projects[choice - 1]
print(f"\nSelected project: {selected_project['name']}")
# Fetch variants for the selected project
# --- Variant selection ---
print("\nFetching variants...")
data = graphql(token, NEXAR_API_URL, QUERY_VARIANTS, {"projectId": selected_project["id"]})
wip = data["desProjectById"]["design"]["workInProgress"]
variants = wip.get("variants") or []
variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
if not variants:
print("No variants found for this project.")
@@ -231,18 +419,120 @@ def main():
print(f" [{i}] {variant['name']}")
print()
while True:
try:
choice = int(input(f"Select a variant (1-{len(variants)}): "))
if 1 <= choice <= len(variants):
break
print(f" Please enter a number between 1 and {len(variants)}.")
except ValueError:
print(" Please enter a valid number.")
choice = prompt_choice(f"Select a variant (1-{len(variants)}): ", len(variants))
selected_variant = variants[choice - 1]
print(f"\nSelected variant: {selected_variant['name']}")
# --- BOM file selection ---
bom_dir = os.path.join(os.path.dirname(__file__), "BOM")
bom_files = sorted(f for f in os.listdir(bom_dir) if f.lower().endswith(".xlsx"))
if not bom_files:
print(f"\nNo xlsx files found in {bom_dir}.")
return
print(f"\nBOM files available:")
print("-" * 60)
for i, name in enumerate(bom_files, start=1):
print(f" [{i}] {name}")
print()
choice = prompt_choice(f"Select a BOM file (1-{len(bom_files)}): ", len(bom_files))
selected_bom = os.path.join(bom_dir, bom_files[choice - 1])
print(f"\nSelected BOM: {bom_files[choice - 1]}")
# --- Fetch schematic sheet list ---
print("\nFetching schematic sheets...")
data = graphql(token, NEXAR_API_URL, QUERY_SCHEMATICS, {"projectId": selected_project["id"]})
all_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
variant_data = next((v for v in all_variants if v["name"] == selected_variant["name"]), None)
if not variant_data:
print("Could not find schematic data for the selected variant.")
return
sheet_list = variant_data.get("schematics") or []
if not sheet_list:
print("No schematic sheets found.")
return
print(f"Found {len(sheet_list)} schematic sheet(s). Fetching components...")
# Build a map of documentId -> {documentName, nodes[]}
sheet_map = {s["documentId"]: {"documentName": s["documentName"], "nodes": []} for s in sheet_list}
# Paginate all components across all sheets
cursor = None
while True:
variables = {"projectId": selected_project["id"]}
if cursor:
variables["cursor"] = cursor
data = graphql(token, NEXAR_API_URL, QUERY_SHEET_COMPONENTS, variables)
fetched_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
fetched_variant = next((v for v in fetched_variants if v["name"] == selected_variant["name"]), None)
if not fetched_variant:
break
has_next = False
next_cursor = None
for sheet in fetched_variant.get("schematics") or []:
doc_id = sheet["documentId"]
if doc_id not in sheet_map:
continue
design_items = sheet.get("designItems") or {}
sheet_map[doc_id]["nodes"].extend(design_items.get("nodes") or [])
page_info = design_items.get("pageInfo") or {}
if page_info.get("hasNextPage"):
has_next = True
next_cursor = page_info.get("endCursor")
if not has_next:
break
cursor = next_cursor
# Rebuild schematics list in original order for export
schematics = [
{"documentName": sheet_map[s["documentId"]]["documentName"],
"documentId": s["documentId"],
"designItems": {"nodes": sheet_map[s["documentId"]]["nodes"]}}
for s in sheet_list
]
# --- Fetch fitted designators for selected variant ---
print("Fetching fitted components for variant...")
data = graphql(token, NEXAR_API_URL, QUERY_FITTED_DESIGNATORS, {"projectId": selected_project["id"]})
bom_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or []
bom_variant = next((v for v in bom_variants if v["name"] == selected_variant["name"]), None)
fitted_designators = set()
if bom_variant:
for bom_item in (bom_variant.get("bom") or {}).get("bomItems") or []:
for instance in bom_item.get("bomItemInstances") or []:
if instance.get("isFitted"):
fitted_designators.add(instance["designator"])
# Filter each sheet's nodes to fitted components only
for s in schematics:
s["designItems"]["nodes"] = [
n for n in s["designItems"]["nodes"]
if n.get("designator") in fitted_designators
]
total_components = sum(len(s["designItems"]["nodes"]) for s in schematics)
print(f"Total: {total_components} fitted component(s) across {len(schematics)} sheet(s).")
# --- Load BOM cross-reference data ---
bom_data = load_bom_data(selected_bom)
if bom_data is None:
print("No 'bom' tab found in the selected BOM file — exporting without BOM data.")
else:
print(f"Loaded {len(bom_data)} entries from BOM.")
# --- Export ---
filename = export_to_xlsx(selected_project["name"], selected_variant["name"], schematics, bom_data)
print(f"\nExported to: {filename}")
if __name__ == "__main__":
main()
main()