diff --git a/BOM/Elemaster Details NEXIO.xlsx b/BOM/Elemaster Details NEXIO.xlsx new file mode 100644 index 0000000..bcfcfe6 Binary files /dev/null and b/BOM/Elemaster Details NEXIO.xlsx differ diff --git a/nexarxclaude.py b/nexarxclaude.py index 1f8b004..6f1828b 100644 --- a/nexarxclaude.py +++ b/nexarxclaude.py @@ -1,8 +1,9 @@ """ -List all projects in Altium 365 workspaces via the Nexar GraphQL API. +List all projects in Altium 365 workspaces via the Nexar GraphQL API, +then export schematic sheet components to an xlsx file. Requirements: - pip install requests + pip install requests openpyxl Environment variables: NEXAR_CLIENT_ID - Your Nexar application client ID @@ -21,6 +22,7 @@ import sys import webbrowser from urllib.parse import parse_qs, urlparse +import openpyxl import requests TOKEN_URL = "https://identity.nexar.com/connect/token" @@ -61,6 +63,80 @@ query GetVariants($projectId: ID!) { } """ +QUERY_SCHEMATICS = """ +query GetSchematics($projectId: ID!) { + desProjectById(id: $projectId) { + design { + workInProgress { + variants { + name + schematics { + documentName + documentId + } + } + } + } + } +} +""" + +QUERY_SHEET_COMPONENTS = """ +query GetSheetComponents($projectId: ID!, $cursor: String) { + desProjectById(id: $projectId) { + design { + workInProgress { + variants { + name + schematics { + documentId + designItems(first: 100, after: $cursor) { + pageInfo { + hasNextPage + endCursor + } + nodes { + designator + component { + name + description + manufacturerParts { + partNumber + companyName + } + } + } + } + } + } + } + } + } +} +""" + +QUERY_FITTED_DESIGNATORS = """ +query GetFittedDesignators($projectId: ID!) { + desProjectById(id: $projectId) { + design { + workInProgress { + variants { + name + bom { + bomItems { + bomItemInstances { + designator + isFitted + } + } + } + } + } + } + } +} +""" + _CALLBACK_HTML = """ Nexar Login @@ -70,10 +146,9 @@ _CALLBACK_HTML = """ def _make_callback_handler(code_bucket): - """Return an HTTPRequestHandler that captures the OAuth2 auth code.""" class _Handler(http.server.BaseHTTPRequestHandler): def log_message(self, *_): - pass # silence request logs + pass def do_GET(self): parsed = urlparse(self.path) @@ -92,15 +167,12 @@ def _make_callback_handler(code_bucket): def get_token(client_id: str, client_secret: str) -> str: - """Obtain a Nexar access token via Authorization Code + PKCE flow.""" - # Generate PKCE verifier and challenge code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode() code_verifier = re.sub(r"[^a-zA-Z0-9]+", "", code_verifier) digest = hashlib.sha256(code_verifier.encode()).digest() code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=") - # Build authorization URL auth_params = ( f"?response_type=code" f"&client_id={client_id}" @@ -111,7 +183,6 @@ def get_token(client_id: str, client_secret: str) -> str: ) auth_url = AUTH_URL + auth_params - # Start local callback server and open browser code_bucket = [] server = http.server.HTTPServer(("localhost", 3000), _make_callback_handler(code_bucket)) @@ -127,7 +198,6 @@ def get_token(client_id: str, client_secret: str) -> str: print("Error: no authorization code received.", file=sys.stderr) sys.exit(1) - # Exchange auth code for token response = requests.post( TOKEN_URL, data={ @@ -164,33 +234,161 @@ def graphql(token: str, url: str, query: str, variables: dict = None) -> dict: return result["data"] +def prompt_choice(prompt: str, count: int) -> int: + while True: + try: + choice = int(input(prompt)) + if 1 <= choice <= count: + return choice + print(f" Please enter a number between 1 and {count}.") + except ValueError: + print(" Please enter a valid number.") + + +def normalise(s: str) -> str: + """Lowercase and strip all non-alphanumeric characters for fuzzy matching.""" + return re.sub(r"[^a-z0-9]", "", s.lower()) + + +def _prefix_match(a: str, b: str) -> bool: + """True if normalised strings match or one is a prefix of the other.""" + na, nb = normalise(a), normalise(b) + return bool(na and nb and (na == nb or na.startswith(nb) or nb.startswith(na))) + + +def load_bom_data(bom_path: str) -> list | None: + """Load BOM xlsx and return a list of entry dicts. + Returns None if no 'bom' tab is found.""" + wb = openpyxl.load_workbook(bom_path, data_only=True) + + bom_sheet = next((wb[n] for n in wb.sheetnames if n.lower() == "bom"), None) + if bom_sheet is None: + return None + + header_row = next(bom_sheet.iter_rows(min_row=1, max_row=1, values_only=True)) + headers = {str(v).lower().strip(): i for i, v in enumerate(header_row) if v is not None} + + def find_col(*candidates): + for c in candidates: + if c in headers: + return headers[c] + return None + + mfr_col = find_col("manufacturer", "mfr", "manufacturer name") + mpn_col = find_col("mpn", "manufacturer part number", "manufacturer part no", "part number") + cost_col = find_col("unit cost", "unit price", "cost", "price") + + entries = [] + for row in bom_sheet.iter_rows(min_row=2, values_only=True): + mfr = str(row[mfr_col]).strip() if mfr_col is not None and len(row) > mfr_col and row[mfr_col] is not None else "" + mpn = str(row[mpn_col]).strip() if mpn_col is not None and len(row) > mpn_col and row[mpn_col] is not None else "" + if not mfr and not mpn: + continue + try: + cost = float(row[cost_col]) if cost_col is not None and len(row) > cost_col and row[cost_col] is not None else 0.0 + except (ValueError, TypeError): + cost = 0.0 + entries.append({"manufacturer": mfr, "mpn": mpn, "unit_cost": cost}) + + return entries + + +def _find_bom_match(bom_data: list, mfr_name: str, mpn: str) -> dict | None: + """Return the highest unit cost BOM entry that prefix-matches both manufacturer and MPN.""" + candidates = [ + e for e in bom_data + if _prefix_match(e["manufacturer"], mfr_name) and _prefix_match(e["mpn"], mpn) + ] + return max(candidates, key=lambda e: e["unit_cost"]) if candidates else None + + +MAX_MFR_PARTS = 5 + +def export_to_xlsx(project_name: str, variant_name: str, schematics: list, bom_data: dict | None) -> str: + wb = openpyxl.Workbook() + wb.remove(wb.active) + + # Build header row + mfr_headers = [] + for n in range(1, MAX_MFR_PARTS + 1): + mfr_headers += [f"Manufacturer {n}", f"Manufacturer Part Number {n}"] + headers = ["Designator", "Reference", "Description"] + mfr_headers + if bom_data is not None: + headers += ["Matched Manufacturer", "Matched MPN", "Unit Cost"] + + col_letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + + for sheet in schematics: + sheet_name = sheet["documentName"] or sheet["documentId"] + safe_name = re.sub(r"[\\/*?:\[\]]", "_", sheet_name)[:31] + ws = wb.create_sheet(title=safe_name) + ws.append(headers) + + for i, h in enumerate(headers): + col = col_letters[i] if i < 26 else col_letters[i // 26 - 1] + col_letters[i % 26] + ws.column_dimensions[col].width = 28 if "Manufacturer" in h or "Part" in h else 16 if i == 0 else 36 + + nodes = (sheet.get("designItems") or {}).get("nodes") or [] + for node in sorted(nodes, key=lambda n: n.get("designator") or ""): + comp = node.get("component") or {} + mfr_parts = comp.get("manufacturerParts") or [] + + # Collect up to MAX_MFR_PARTS manufacturer/MPN pairs + pairs = [] + for mp in mfr_parts[:MAX_MFR_PARTS]: + mfr_name = mp.get("companyName") or "-" + mpn = mp.get("partNumber") or "-" + pairs.append((mfr_name, mpn)) + # Pad with '-' if fewer than MAX_MFR_PARTS + while len(pairs) < MAX_MFR_PARTS: + pairs.append(("-", "-")) + + flat_pairs = [v for pair in pairs for v in pair] + row = [node.get("designator") or "", comp.get("name") or "", comp.get("description") or ""] + flat_pairs + + if bom_data is not None: + match = None + for mfr_name, mpn in pairs: + if mfr_name == "-" and mpn == "-": + continue + match = _find_bom_match(bom_data, mfr_name, mpn) + if match: + break + if match: + row += [match["manufacturer"], match["mpn"], match["unit_cost"]] + else: + row += ["-", "-", "-"] + + ws.append(row) + + output_dir = os.path.join(os.path.dirname(__file__), "output") + safe_project = re.sub(r'[\\/:*?"<>|]', "_", project_name).strip() + filename = os.path.join(output_dir, f"{safe_project} - {variant_name}.xlsx") + wb.save(filename) + return filename + + def main(): client_id = os.environ.get("NEXAR_CLIENT_ID") client_secret = os.environ.get("NEXAR_CLIENT_SECRET") if not client_id or not client_secret: - print( - "Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", - file=sys.stderr, - ) + print("Error: NEXAR_CLIENT_ID and NEXAR_CLIENT_SECRET must be set.", file=sys.stderr) sys.exit(1) token = get_token(client_id, client_secret) print("Authenticated.\n") + # --- Project selection --- print(f"Fetching projects from workspace '{WORKSPACE_NAME}'...") data = graphql(token, NEXAR_API_URL, QUERY_WORKSPACES_AND_PROJECTS, {"workspace": WORKSPACE_NAME}) workspaces = data["desWorkspaces"] if not workspaces: - print( - "No workspaces found. Ensure the 'design' scope is enabled on your " - "Nexar application and you are a member of an Altium 365 workspace." - ) + print("No workspaces found.") return projects = workspaces[0].get("projects") or [] - if not projects: print("No projects found in this workspace.") return @@ -198,28 +396,18 @@ def main(): print(f"\nProjects in '{WORKSPACE_NAME}':") print("-" * 60) for i, project in enumerate(projects, start=1): - description = project.get("description") or "" - desc_str = f" — {description}" if description else "" - print(f" [{i}] {project['name']}{desc_str}") + desc = project.get("description") or "" + print(f" [{i}] {project['name']}" + (f" — {desc}" if desc else "")) print() - while True: - try: - choice = int(input(f"Select a project (1-{len(projects)}): ")) - if 1 <= choice <= len(projects): - break - print(f" Please enter a number between 1 and {len(projects)}.") - except ValueError: - print(" Please enter a valid number.") - + choice = prompt_choice(f"Select a project (1-{len(projects)}): ", len(projects)) selected_project = projects[choice - 1] print(f"\nSelected project: {selected_project['name']}") - # Fetch variants for the selected project + # --- Variant selection --- print("\nFetching variants...") data = graphql(token, NEXAR_API_URL, QUERY_VARIANTS, {"projectId": selected_project["id"]}) - wip = data["desProjectById"]["design"]["workInProgress"] - variants = wip.get("variants") or [] + variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] if not variants: print("No variants found for this project.") @@ -231,18 +419,120 @@ def main(): print(f" [{i}] {variant['name']}") print() - while True: - try: - choice = int(input(f"Select a variant (1-{len(variants)}): ")) - if 1 <= choice <= len(variants): - break - print(f" Please enter a number between 1 and {len(variants)}.") - except ValueError: - print(" Please enter a valid number.") - + choice = prompt_choice(f"Select a variant (1-{len(variants)}): ", len(variants)) selected_variant = variants[choice - 1] print(f"\nSelected variant: {selected_variant['name']}") + # --- BOM file selection --- + bom_dir = os.path.join(os.path.dirname(__file__), "BOM") + bom_files = sorted(f for f in os.listdir(bom_dir) if f.lower().endswith(".xlsx")) + + if not bom_files: + print(f"\nNo xlsx files found in {bom_dir}.") + return + + print(f"\nBOM files available:") + print("-" * 60) + for i, name in enumerate(bom_files, start=1): + print(f" [{i}] {name}") + + print() + choice = prompt_choice(f"Select a BOM file (1-{len(bom_files)}): ", len(bom_files)) + selected_bom = os.path.join(bom_dir, bom_files[choice - 1]) + print(f"\nSelected BOM: {bom_files[choice - 1]}") + + # --- Fetch schematic sheet list --- + print("\nFetching schematic sheets...") + data = graphql(token, NEXAR_API_URL, QUERY_SCHEMATICS, {"projectId": selected_project["id"]}) + all_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] + + variant_data = next((v for v in all_variants if v["name"] == selected_variant["name"]), None) + if not variant_data: + print("Could not find schematic data for the selected variant.") + return + + sheet_list = variant_data.get("schematics") or [] + if not sheet_list: + print("No schematic sheets found.") + return + + print(f"Found {len(sheet_list)} schematic sheet(s). Fetching components...") + + # Build a map of documentId -> {documentName, nodes[]} + sheet_map = {s["documentId"]: {"documentName": s["documentName"], "nodes": []} for s in sheet_list} + + # Paginate all components across all sheets + cursor = None + while True: + variables = {"projectId": selected_project["id"]} + if cursor: + variables["cursor"] = cursor + + data = graphql(token, NEXAR_API_URL, QUERY_SHEET_COMPONENTS, variables) + fetched_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] + fetched_variant = next((v for v in fetched_variants if v["name"] == selected_variant["name"]), None) + if not fetched_variant: + break + + has_next = False + next_cursor = None + for sheet in fetched_variant.get("schematics") or []: + doc_id = sheet["documentId"] + if doc_id not in sheet_map: + continue + design_items = sheet.get("designItems") or {} + sheet_map[doc_id]["nodes"].extend(design_items.get("nodes") or []) + page_info = design_items.get("pageInfo") or {} + if page_info.get("hasNextPage"): + has_next = True + next_cursor = page_info.get("endCursor") + + if not has_next: + break + cursor = next_cursor + + # Rebuild schematics list in original order for export + schematics = [ + {"documentName": sheet_map[s["documentId"]]["documentName"], + "documentId": s["documentId"], + "designItems": {"nodes": sheet_map[s["documentId"]]["nodes"]}} + for s in sheet_list + ] + + # --- Fetch fitted designators for selected variant --- + print("Fetching fitted components for variant...") + data = graphql(token, NEXAR_API_URL, QUERY_FITTED_DESIGNATORS, {"projectId": selected_project["id"]}) + bom_variants = data["desProjectById"]["design"]["workInProgress"].get("variants") or [] + bom_variant = next((v for v in bom_variants if v["name"] == selected_variant["name"]), None) + + fitted_designators = set() + if bom_variant: + for bom_item in (bom_variant.get("bom") or {}).get("bomItems") or []: + for instance in bom_item.get("bomItemInstances") or []: + if instance.get("isFitted"): + fitted_designators.add(instance["designator"]) + + # Filter each sheet's nodes to fitted components only + for s in schematics: + s["designItems"]["nodes"] = [ + n for n in s["designItems"]["nodes"] + if n.get("designator") in fitted_designators + ] + + total_components = sum(len(s["designItems"]["nodes"]) for s in schematics) + print(f"Total: {total_components} fitted component(s) across {len(schematics)} sheet(s).") + + # --- Load BOM cross-reference data --- + bom_data = load_bom_data(selected_bom) + if bom_data is None: + print("No 'bom' tab found in the selected BOM file — exporting without BOM data.") + else: + print(f"Loaded {len(bom_data)} entries from BOM.") + + # --- Export --- + filename = export_to_xlsx(selected_project["name"], selected_variant["name"], schematics, bom_data) + print(f"\nExported to: {filename}") + if __name__ == "__main__": - main() \ No newline at end of file + main()