Changes
This commit is contained in:
102
mipi_test.py
102
mipi_test.py
@@ -3,7 +3,7 @@
|
|||||||
MIPI TEST APPLICATION - MIPI_TEST.PY
|
MIPI TEST APPLICATION - MIPI_TEST.PY
|
||||||
- ENTRY POINT OF APPLICATION
|
- ENTRY POINT OF APPLICATION
|
||||||
|
|
||||||
VERSION: 0.2
|
VERSION: 0.3
|
||||||
AUTHOR: D. RICE 25/03/2026
|
AUTHOR: D. RICE 25/03/2026
|
||||||
© 2026 ARRIVE
|
© 2026 ARRIVE
|
||||||
"""
|
"""
|
||||||
@@ -12,12 +12,22 @@ import time
|
|||||||
import sys
|
import sys
|
||||||
import requests
|
import requests
|
||||||
import threading
|
import threading
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
# --- Configuration ---
|
# --- Configuration ---
|
||||||
URL = "http://192.168.45.8:5000/display"
|
URL = "http://192.168.45.8:5000/display"
|
||||||
SCOPE_IP = "192.168.45.4"
|
SCOPE_IP = "192.168.45.4"
|
||||||
PSU_IP = "192.168.45.3"
|
PSU_IP = "192.168.45.3"
|
||||||
|
|
||||||
|
# --- Capture settings ---
|
||||||
|
# Pass 1 — signal quality: resolves individual bits at 140 Mbit/s (7.1 ns/bit)
|
||||||
|
SIG_SCALE = 2e-9 # 2 ns/div → 20 ns window
|
||||||
|
SIG_POINTS = 500_000 # 500 k pts → ~25 GSa/s
|
||||||
|
|
||||||
|
# Pass 2 — protocol/frame structure: shows LP↔HS transitions and burst envelope
|
||||||
|
PROTO_SCALE = 1e-6 # 1 µs/div → 10 µs window
|
||||||
|
PROTO_POINTS = 500_000 # 500 k pts → 50 MSa/s (enough to see burst structure)
|
||||||
|
|
||||||
test_running = False # Global flag to control the background thread
|
test_running = False # Global flag to control the background thread
|
||||||
|
|
||||||
# --- Instrument Connection ---
|
# --- Instrument Connection ---
|
||||||
@@ -155,24 +165,87 @@ def setup_math_channels():
|
|||||||
print(f" COULD NOT READ ERROR QUEUE ({e})")
|
print(f" COULD NOT READ ERROR QUEUE ({e})")
|
||||||
|
|
||||||
|
|
||||||
def save_screenshot(iteration):
|
def _set_timebase(scale, points):
|
||||||
"""Save a PNG screenshot to C:\\TEMP on the scope's local disk."""
|
"""Apply timebase scale and record length, then let the scope settle."""
|
||||||
|
scope.write(f":TIMebase:SCALe {scale:.3E}")
|
||||||
|
scope.write(f":ACQuire:POINts {points}")
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
|
||||||
|
def _arm_and_wait(timeout=20):
|
||||||
|
"""
|
||||||
|
Fire a single acquisition and block until the scope stops (trigger + capture done).
|
||||||
|
Returns True on success, False on timeout.
|
||||||
|
"""
|
||||||
|
scope.write(":SINGle")
|
||||||
|
deadline = time.time() + timeout
|
||||||
|
while time.time() < deadline:
|
||||||
try:
|
try:
|
||||||
filename = f"C:\\TEMP\\cap{iteration:04d}.png"
|
# OPERegister bit 3 (mask 0x08) = instrument is running/armed.
|
||||||
print(f"SAVING SCREENSHOT: {filename}")
|
# When it clears the acquisition is complete.
|
||||||
scope.write(f':DISK:SAVE:IMAGe "{filename}",PNG')
|
cond = int(scope.ask(":OPERegister:CONDition?").strip())
|
||||||
time.sleep(5.0) # wait for scope to write file — do not query during this
|
if not (cond & 0x08):
|
||||||
print("SCREENSHOT SAVED.")
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(0.25)
|
||||||
|
print(" WARNING: ACQUISITION TIMEOUT — SCOPE MAY NOT HAVE TRIGGERED.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _save_pass(tag, iteration, ts):
|
||||||
|
"""
|
||||||
|
Save F1 (CLK diff) and F2 (DAT diff) as CSV, plus a PNG screenshot.
|
||||||
|
Files land in C:\\TEMP on the scope's local disk.
|
||||||
|
Naming: <YYYYMMDD_HHMMSS>_<tag>_<iter>_clk.csv (date-first for easy sorting/deletion)
|
||||||
|
"""
|
||||||
|
base = f"C:\\TEMP\\{ts}_{tag}_{iteration:04d}"
|
||||||
|
try:
|
||||||
|
scope.write(f':DISK:SAVE:WAVeform FUNCtion1,"{base}_clk.csv",CSV')
|
||||||
|
time.sleep(2.5)
|
||||||
|
scope.write(f':DISK:SAVE:WAVeform FUNCtion2,"{base}_dat.csv",CSV')
|
||||||
|
time.sleep(2.5)
|
||||||
|
print(f" SAVED: {base}_clk.csv {base}_dat.csv")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"SCREENSHOT ERROR: {e}")
|
print(f" SAVE ERROR ({tag}): {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def dual_capture(iteration):
|
||||||
|
"""
|
||||||
|
Two-pass capture per test iteration:
|
||||||
|
Pass 1 — signal quality (SIG_SCALE / SIG_POINTS)
|
||||||
|
Pass 2 — frame structure (PROTO_SCALE / PROTO_POINTS)
|
||||||
|
Restores the original 5 ns/div timebase when done.
|
||||||
|
"""
|
||||||
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
print(f"DUAL CAPTURE #{iteration:04d} [{ts}]")
|
||||||
|
|
||||||
|
# ── Pass 1: signal quality ─────────────────────────────────────────────
|
||||||
|
print(" PASS 1: SIGNAL QUALITY...")
|
||||||
|
_set_timebase(SIG_SCALE, SIG_POINTS)
|
||||||
|
if _arm_and_wait():
|
||||||
|
_save_pass("sig", iteration, ts)
|
||||||
|
else:
|
||||||
|
print(" SKIPPING PASS 1 SAVE.")
|
||||||
|
|
||||||
|
# ── Pass 2: frame/protocol structure ──────────────────────────────────
|
||||||
|
print(" PASS 2: FRAME STRUCTURE...")
|
||||||
|
_set_timebase(PROTO_SCALE, PROTO_POINTS)
|
||||||
|
if _arm_and_wait():
|
||||||
|
_save_pass("proto", iteration, ts)
|
||||||
|
else:
|
||||||
|
print(" SKIPPING PASS 2 SAVE.")
|
||||||
|
|
||||||
|
# ── Restore original timebase ─────────────────────────────────────────
|
||||||
|
_set_timebase(5e-9, 500_000)
|
||||||
|
scope.write(":RUN")
|
||||||
|
|
||||||
|
|
||||||
def test_worker(on_time):
|
def test_worker(on_time):
|
||||||
"""
|
"""
|
||||||
Background loop:
|
Background loop:
|
||||||
- Arms scope in single trigger mode
|
- Turns display ON, waits on_time seconds for it to stabilise
|
||||||
- Turns display ON for on_time seconds
|
- Runs dual_capture (signal quality + frame structure)
|
||||||
- Saves screenshot
|
|
||||||
- Turns display OFF for 1 second
|
- Turns display OFF for 1 second
|
||||||
- Repeats until test_running = False
|
- Repeats until test_running = False
|
||||||
"""
|
"""
|
||||||
@@ -180,10 +253,9 @@ def test_worker(on_time):
|
|||||||
count = 1
|
count = 1
|
||||||
|
|
||||||
while test_running:
|
while test_running:
|
||||||
scope.write(":SINGle")
|
|
||||||
requests.put(URL, json={"state": "on"}, timeout=2)
|
requests.put(URL, json={"state": "on"}, timeout=2)
|
||||||
time.sleep(on_time + 0.5) # +0.5s to ensure acquisition completes
|
time.sleep(on_time) # let display stabilise before arming
|
||||||
save_screenshot(count)
|
dual_capture(count)
|
||||||
count += 1
|
count += 1
|
||||||
requests.put(URL, json={"state": "off"}, timeout=2)
|
requests.put(URL, json={"state": "off"}, timeout=2)
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
|
|||||||
Reference in New Issue
Block a user