""" List all projects in Altium 365 workspaces via the Nexar GraphQL API, then export schematic sheet components to an xlsx file. Requirements: pip install requests openpyxl Environment variables: NEXAR_CLIENT_ID - Your Nexar application client ID NEXAR_CLIENT_SECRET - Your Nexar application client secret Your Nexar application must have the 'design' scope enabled. Authentication opens a browser window for Altium 365 login. """ import base64 import hashlib import http.server import os import re import sys import webbrowser from urllib.parse import parse_qs, urlparse import openpyxl import requests TOKEN_URL = "https://identity.nexar.com/connect/token" AUTH_URL = "https://identity.nexar.com/connect/authorize" NEXAR_API_URL = "https://api.nexar.com/graphql" REDIRECT_URI = "http://localhost:3000/login" SCOPES = "openid profile email design.domain user.access" WORKSPACE_NAME = "Flowbird SAS" QUERY_WORKSPACES_AND_PROJECTS = """ query GetWorkspacesAndProjects($workspace: String!) { desWorkspaces(where: {name: {eq: $workspace}}) { url name description projects { id name description } } } """ QUERY_VARIANTS = """ query GetVariants($projectId: ID!) { desProjectById(id: $projectId) { name design { workInProgress { variants { name } } } } } """ QUERY_SCHEMATICS = """ query GetSchematics($projectId: ID!) { desProjectById(id: $projectId) { design { workInProgress { variants { name schematics { documentName documentId } } } } } } """ QUERY_SHEET_COMPONENTS = """ query GetSheetComponents($projectId: ID!, $cursor: String) { desProjectById(id: $projectId) { design { workInProgress { variants { name schematics { documentId designItems(first: 100, after: $cursor) { pageInfo { hasNextPage endCursor } nodes { designator component { name description details { parameters { name value } } } } } } } } } } } """ QUERY_FITTED_DESIGNATORS = """ query GetFittedDesignators($projectId: ID!) { desProjectById(id: $projectId) { design { workInProgress { variants { name bom { bomItems { bomItemInstances { designator isFitted } } } } } } } } """ _CALLBACK_HTML = """
You can close this tab and return to the terminal.
""" def _make_callback_handler(code_bucket): class _Handler(http.server.BaseHTTPRequestHandler): def log_message(self, *_): pass def do_GET(self): parsed = urlparse(self.path) if parsed.path != "/login": self.send_response(404) self.end_headers() return params = parse_qs(parsed.query) self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(_CALLBACK_HTML.encode()) code_bucket.append(params.get("code", [""])[0]) return _Handler def get_token(client_id: str, client_secret: str) -> str: code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode() code_verifier = re.sub(r"[^a-zA-Z0-9]+", "", code_verifier) digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=") auth_params = ( f"?response_type=code" f"&client_id={client_id}" f"&redirect_uri={REDIRECT_URI}" f"&scope={SCOPES.replace(' ', '%20')}" f"&code_challenge={code_challenge}" f"&code_challenge_method=S256" ) auth_url = AUTH_URL + auth_params code_bucket = [] server = http.server.HTTPServer(("localhost", 3000), _make_callback_handler(code_bucket)) print("Opening browser for Nexar login...") webbrowser.open(auth_url) while not code_bucket: server.handle_request() server.server_close() auth_code = code_bucket[0] if not auth_code: print("Error: no authorization code received.", file=sys.stderr) sys.exit(1) response = requests.post( TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": client_id, "client_secret": client_secret, "redirect_uri": REDIRECT_URI, "code": auth_code, "code_verifier": code_verifier, }, timeout=30, ) response.raise_for_status() return response.json()["access_token"] def graphql(token: str, url: str, query: str, variables: dict = None) -> dict: headers = {"Authorization": f"Bearer {token}"} payload = {"query": query} if variables: payload["variables"] = variables response = requests.post(url, json=payload, headers=headers, timeout=30) if not response.ok: print(f"HTTP {response.status_code}: {response.text}", file=sys.stderr) response.raise_for_status() result = response.json() if "errors" in result: for err in result["errors"]: print(f"GraphQL error: {err.get('message')}", file=sys.stderr) sys.exit(1) return result["data"] def prompt_choice(prompt: str, count: int) -> int: while True: try: choice = int(input(prompt)) if 1 <= choice <= count: return choice print(f" Please enter a number between 1 and {count}.") except ValueError: print(" Please enter a valid number.") def normalise(s: str) -> str: """Lowercase and strip all non-alphanumeric characters for fuzzy matching.""" return re.sub(r"[^a-z0-9]", "", s.lower()) def _prefix_match(a: str, b: str) -> bool: """True if normalised strings match or one is a prefix of the other.""" na, nb = normalise(a), normalise(b) return bool(na and nb and (na == nb or na.startswith(nb) or nb.startswith(na))) def _mpn_match(a: str, b: str) -> bool: """True if MPNs match, accounting for packaging suffix (last character) differences.""" na, nb = normalise(a), normalise(b) if not na or not nb: return False if na == nb or na.startswith(nb) or nb.startswith(na): return True # Strip last character (e.g. R=reel, T=tape) and retry if len(na) > 1 and len(nb) > 1 and na[:-1] == nb[:-1]: return True return False def _norm_elec(s: str) -> str: """Normalise electrical value: lowercase, µ→u, strip non-alphanumeric.""" if not s: return "" s = s.strip().lower() for frm, to in [('\u00b5', 'u'), ('\u03bc', 'u'), ('µ', 'u'), ('μ', 'u'), ('\u03a9', 'ohm'), ('\u2126', 'ohm'), ('mf', 'uf')]: s = s.replace(frm, to) return re.sub(r'[^a-z0-9]', '', s) def _elec_match(a: str, b: str) -> bool: """True if two electrical values match (prefix-tolerant after normalisation).""" na, nb = _norm_elec(a), _norm_elec(b) if not na or not nb: return False return na == nb or nb.startswith(na) or na.startswith(nb) def load_bom_data(bom_path: str) -> list | None: """Load BOM xlsx and return a list of entry dicts. Returns None if no 'bom' tab is found.""" wb = openpyxl.load_workbook(bom_path, data_only=True) bom_sheet = next((wb[n] for n in wb.sheetnames if n.lower() == "bom"), None) if bom_sheet is None: return None header_row = next(bom_sheet.iter_rows(min_row=1, max_row=1, values_only=True)) headers = {str(v).lower().strip(): i for i, v in enumerate(header_row) if v is not None} def find_col(*candidates): for c in candidates: if c in headers: return headers[c] return None def cell(row, col): return str(row[col]).strip() if col is not None and len(row) > col and row[col] is not None else "" mfr_col = find_col("manufacturer", "mfr", "manufacturer name") mpn_col = find_col("mpn", "manufacturer part number", "manufacturer part no", "part number") cost_col = find_col("unit cost", "unit price", "cost", "price") desc_col = find_col("description", "desc", "component description") res_col = find_col("resistance", "resistance (ohms)") tol_col = find_col("tolerance", "tol") cap_col = find_col("capacitance", "capacitance (farads)") volt_col = find_col("voltage", "voltage rating", "voltage - rated", "rated voltage") fp_col = find_col("footprint", "case", "package", "package / case", "case - imperial", "case - metric") entries = [] for row in bom_sheet.iter_rows(min_row=2, values_only=True): mfr = cell(row, mfr_col) mpn = cell(row, mpn_col) if not mfr and not mpn: continue try: cost = float(row[cost_col]) if cost_col is not None and len(row) > cost_col and row[cost_col] is not None else 0.0 except (ValueError, TypeError): cost = 0.0 entries.append({ "manufacturer": mfr, "mpn": mpn, "unit_cost": cost, "description": cell(row, desc_col), "resistance": cell(row, res_col), "tolerance": cell(row, tol_col), "capacitance": cell(row, cap_col), "voltage": cell(row, volt_col), "footprint": cell(row, fp_col), }) return entries def _find_bom_match(bom_data: list, mfr_name: str, mpn: str) -> dict | None: """Return the highest unit cost BOM entry that prefix-matches both manufacturer and MPN.""" candidates = [ e for e in bom_data if _prefix_match(e["manufacturer"], mfr_name) and _mpn_match(e["mpn"], mpn) ] return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None def _extract_r_vals(s: str) -> tuple: """Extract (resistance_norm, footprint) from a description string using word boundaries.""" rm = re.search(r'\b(\d+\.?\d*\s*(?:kohms?|mohms?|k[\s\-]?ohms?|ohms?|k|m|r))\b', s, re.IGNORECASE) resistance = _norm_elec(rm.group(1)) if rm else "" fm = re.search(r'\b(0\d{3})\b', s) footprint = fm.group(1) if fm else "" return resistance, footprint def _extract_c_vals(s: str) -> tuple: """Extract (capacitance_norm, voltage_norm, case) from a description string.""" cm = re.search(r'\b(\d+\.?\d*\s*(?:p|n|u|µ|μ|m)\s*f)\b', s, re.IGNORECASE) capacitance = _norm_elec(cm.group(1)) if cm else "" vm = re.search(r'\b(\d+\.?\d*\s*v)\b', s, re.IGNORECASE) voltage = _norm_elec(vm.group(1)) if vm else "" fm = re.search(r'\b(0\d{3})\b', s) case = fm.group(1) if fm else "" return capacitance, voltage, case def _find_passive_bom_match(bom_data: list, designator: str, nexar_params: dict, nexar_desc: str) -> dict | None: """Fallback matcher for R/C: extract values from Nexar params/description, match against BOM description.""" # Take only leading letters from designator e.g. C38 → C, R102 → R prefix_match = re.match(r'^([a-zA-Z]+)', designator) prefix = prefix_match.group(1).upper() if prefix_match else "" if prefix not in ("R", "C"): return None pl = {k.lower(): v.strip() for k, v in nexar_params.items() if v and str(v).strip() and str(v).strip() != "-"} def gp(*keys): for k in keys: v = pl.get(k.lower(), "") if v: return v return "" pkg_keys = ("package / case", "package/case", "case - imperial", "case - metric", "supplier device package", "footprint", "case") if prefix == "R": resistance = gp("resistance") footprint = gp(*pkg_keys) if not resistance or not footprint: r_d, fp_d = _extract_r_vals(nexar_desc) if not resistance: resistance = r_d if not footprint: footprint = fp_d if not resistance or not footprint: return None candidates = [] for e in bom_data: desc = e.get("description", "") if not desc or "res" not in desc.lower(): continue bom_r, bom_fp = _extract_r_vals(desc) if not bom_r or not _elec_match(bom_r, resistance): continue if not bom_fp or bom_fp.lower() != footprint.lower().strip(): continue candidates.append(e) else: # C capacitance = gp("capacitance") voltage = gp("voltage - rated", "voltage rating", "rated voltage", "voltage") case = gp(*pkg_keys) if not capacitance or not case: c_d, v_d, cs_d = _extract_c_vals(nexar_desc) if not capacitance: capacitance = c_d if not voltage: voltage = v_d if not case: case = cs_d if not capacitance or not case: return None candidates = [] for e in bom_data: desc = e.get("description", "") if not desc or "cap" not in desc.lower(): continue bom_c, bom_v, bom_case = _extract_c_vals(desc) if not bom_c or not _elec_match(bom_c, capacitance): continue if not bom_case or bom_case.lower() != case.lower().strip(): continue if voltage and bom_v and not _elec_match(bom_v, voltage): continue candidates.append(e) return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None MAX_MFR_PARTS = 5 def export_to_xlsx(project_name: str, variant_name: str, schematics: list, bom_data: dict | None) -> str: wb = openpyxl.Workbook() wb.remove(wb.active) # Build header row mfr_headers = [] for n in range(1, MAX_MFR_PARTS + 1): mfr_headers += [f"Manufacturer {n}", f"Manufacturer Part Number {n}"] headers = ["Designator", "Reference", "Description"] + mfr_headers if bom_data is not None: headers += ["Matched Manufacturer", "Matched MPN", "Unit Cost"] col_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" for sheet in schematics: sheet_name = sheet["documentName"] or sheet["documentId"] safe_name = re.sub(r"[\\/*?:\[\]]", "_", sheet_name)[:31] ws = wb.create_sheet(title=safe_name) ws.append(headers) for i, h in enumerate(headers): col = col_letters[i] if i < 26 else col_letters[i // 26 - 1] + col_letters[i % 26] ws.column_dimensions[col].width = 28 if "Manufacturer" in h or "Part" in h else 16 if i == 0 else 36 nodes = (sheet.get("designItems") or {}).get("nodes") or [] for node in sorted(nodes, key=lambda n: n.get("designator") or ""): comp = node.get("component") or {} params = {p["name"]: (p.get("value") or "").strip() for p in (comp.get("details") or {}).get("parameters") or [] if p.get("name")} # Collect up to MAX_MFR_PARTS manufacturer/MPN pairs from named parameters pairs = [] for n in range(1, MAX_MFR_PARTS + 1): mfr_name = params.get(f"Manufacturer {n}", "").strip() mpn = params.get(f"Manufacturer Part Number {n}", "").strip() if not mfr_name or not mpn or mfr_name == "-" or mpn == "-": pairs.append(("-", "-")) else: pairs.append((mfr_name, mpn)) flat_pairs = [v for pair in pairs for v in pair] row = [node.get("designator") or "", comp.get("name") or "", comp.get("description") or ""] + flat_pairs if bom_data is not None: match = None for mfr_name, mpn in pairs: if mfr_name == "-" and mpn == "-": continue match = _find_bom_match(bom_data, mfr_name, mpn) if match: break if not match: match = _find_passive_bom_match( bom_data, node.get("designator") or "", params, comp.get("description") or "" ) if match: row += [match["manufacturer"], match["mpn"], match["unit_cost"]] else: row += ["-", "-", "-"] ws.append(row) output_dir = os.path.join(os.path.dirname(__file__), "output") safe_project = re.sub(r'[\\/:*?"<>|]', "_", project_name).strip() filename = os.path.join(output_dir, f"{safe_project} - {variant_name}.xlsx") wb.save(filename) return filename def main(): client_id = os.environ.get("NEXAR_CLIENT_ID") client_secret = os.environ.get("NEXAR_CLIENT_SECRET") if not client_id or not client_secret: print("Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", file=sys.stderr) sys.exit(1) token = get_token(client_id, client_secret) print("Authenticated.\n") # --- Project selection --- print(f"Fetching projects from workspace '{WORKSPACE_NAME}'...") data = graphql(token, NEXAR_API_URL, QUERY_WORKSPACES_AND_PROJECTS, {"workspace": WORKSPACE_NAME}) workspaces = data["desWorkspaces"] if not workspaces: print("No workspaces found.") return projects = workspaces[0].get("projects") or [] if not projects: print("No projects found in this workspace.") return print(f"\nProjects in '{WORKSPACE_NAME}':") print("-" * 60) for i, project in enumerate(projects, start=1): desc = project.get("description") or "" print(f" [{i}] {project['name']}" + (f" — {desc}" if desc else "")) print() choice = prompt_choice(f"Select a project (1-{len(projects)}): ", len(projects)) selected_project = projects[choice - 1] print(f"\nSelected project: {selected_project['name']}") # --- Variant selection --- print("\nFetching variants...") data = graphql(token, NEXAR_API_URL, QUERY_VARIANTS, {"projectId": selected_project["id"]}) variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] if not variants: print("No variants found for this project.") return print(f"\nVariants in '{selected_project['name']}':") print("-" * 60) for i, variant in enumerate(variants, start=1): print(f" [{i}] {variant['name']}") print() choice = prompt_choice(f"Select a variant (1-{len(variants)}): ", len(variants)) selected_variant = variants[choice - 1] print(f"\nSelected variant: {selected_variant['name']}") # --- BOM file selection --- bom_dir = os.path.join(os.path.dirname(__file__), "BOM") bom_files = sorted(f for f in os.listdir(bom_dir) if f.lower().endswith(".xlsx")) if not bom_files: print(f"\nNo xlsx files found in {bom_dir}.") return print(f"\nBOM files available:") print("-" * 60) for i, name in enumerate(bom_files, start=1): print(f" [{i}] {name}") print() choice = prompt_choice(f"Select a BOM file (1-{len(bom_files)}): ", len(bom_files)) selected_bom = os.path.join(bom_dir, bom_files[choice - 1]) print(f"\nSelected BOM: {bom_files[choice - 1]}") # --- Fetch schematic sheet list --- print("\nFetching schematic sheets...") data = graphql(token, NEXAR_API_URL, QUERY_SCHEMATICS, {"projectId": selected_project["id"]}) all_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] variant_data = next((v for v in all_variants if v["name"] == selected_variant["name"]), None) if not variant_data: print("Could not find schematic data for the selected variant.") return sheet_list = variant_data.get("schematics") or [] if not sheet_list: print("No schematic sheets found.") return print(f"Found {len(sheet_list)} schematic sheet(s). Fetching components...") # Build a map of documentId -> {documentName, nodes[]} sheet_map = {s["documentId"]: {"documentName": s["documentName"], "nodes": []} for s in sheet_list} # Paginate all components across all sheets cursor = None while True: variables = {"projectId": selected_project["id"]} if cursor: variables["cursor"] = cursor data = graphql(token, NEXAR_API_URL, QUERY_SHEET_COMPONENTS, variables) fetched_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] fetched_variant = next((v for v in fetched_variants if v["name"] == selected_variant["name"]), None) if not fetched_variant: break has_next = False next_cursor = None for sheet in fetched_variant.get("schematics") or []: doc_id = sheet["documentId"] if doc_id not in sheet_map: continue design_items = sheet.get("designItems") or {} sheet_map[doc_id]["nodes"].extend(design_items.get("nodes") or []) page_info = design_items.get("pageInfo") or {} if page_info.get("hasNextPage"): has_next = True next_cursor = page_info.get("endCursor") if not has_next: break cursor = next_cursor # Rebuild schematics list in original order for export schematics = [ {"documentName": sheet_map[s["documentId"]]["documentName"], "documentId": s["documentId"], "designItems": {"nodes": sheet_map[s["documentId"]]["nodes"]}} for s in sheet_list ] # --- Fetch fitted designators for selected variant --- print("Fetching fitted components for variant...") data = graphql(token, NEXAR_API_URL, QUERY_FITTED_DESIGNATORS, {"projectId": selected_project["id"]}) bom_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] bom_variant = next((v for v in bom_variants if v["name"] == selected_variant["name"]), None) fitted_designators = set() if bom_variant: for bom_item in (bom_variant.get("bom") or {}).get("bomItems") or []: for instance in bom_item.get("bomItemInstances") or []: if instance.get("isFitted"): fitted_designators.add(instance["designator"]) # Filter each sheet's nodes to fitted components only for s in schematics: s["designItems"]["nodes"] = [ n for n in s["designItems"]["nodes"] if n.get("designator") in fitted_designators ] total_components = sum(len(s["designItems"]["nodes"]) for s in schematics) print(f"Total: {total_components} fitted component(s) across {len(schematics)} sheet(s).") # --- Load BOM cross-reference data --- bom_data = load_bom_data(selected_bom) if bom_data is None: print("No 'bom' tab found in the selected BOM file — exporting without BOM data.") else: print(f"Loaded {len(bom_data)} entries from BOM.") # --- Export --- filename = export_to_xlsx(selected_project["name"], selected_variant["name"], schematics, bom_data) print(f"\nExported to: {filename}") if __name__ == "__main__": main()