From dd93fbd89306033c3517398a9b5b208e244708d4 Mon Sep 17 00:00:00 2001 From: david rice Date: Thu, 7 May 2026 09:01:32 +0100 Subject: [PATCH] Updates --- __pycache__/proto_decoder.cpython-312.pyc | Bin 34517 -> 37890 bytes flicker_watch.py | 369 ++++++++++++++++++++++ mipi_test_interactive.py | 54 ++-- proto_decoder.py | 172 +++++++--- 4 files changed, 531 insertions(+), 64 deletions(-) create mode 100644 flicker_watch.py diff --git a/__pycache__/proto_decoder.cpython-312.pyc b/__pycache__/proto_decoder.cpython-312.pyc index c402fbaa98f6fe30b9d6c3e2bc3cb1f5308373b4..70d1b59e7f3cbe7f6ebe0ec13055346b9cdd8ef0 100644 GIT binary patch delta 10861 zcmc&)d3amZm48qBX4$eV$(FS|@0PdNah%mj?8MGO96OFpASS6$e3jUVEjw3owmuiv?R?t1Py=Wf4ypI`f1*=J^C#uu})bR2|-YpxBx_1QDVeEtV#DlS9@IFj{czRyaG zeKwNamrru~3gEw;nEDDyZl8mg{mQ*El6PFu=Oh-ti&!D8AT~(5Gt&8x7RVw}aGdWe zBK5=$zr|XPG>}4wl@P~quCJ6h0n3OBu$(Mrts@|gxcwD-RYW+h?5j* z`65yaI@S9dNZo--j-whG=dXVPy#;~ij>?Y8JWaD#`O~r`YtV^Qg8)LfD_R5-LILJs z=@kz`1HuZ-(@LVk&c7plLGexACf%m&RJtI0>+IvoKAB3u8by*$v)18+Xe?*%V1Rf> z=}2hAyLWKNAEb*Rd%NV3JdxtBE|UA&Rh;D8o9U3hP7qWDBAWWu@X8X;H4t zXn?7rCAuN?uqK?vOMxu6bn>9yW|VPamdJ~SCuH#k;cPK`zeRfYpt)85q~QrZo+}zf z1IaoDzEA#!a9CInvy-)sLAsD5#;}QGhjT-PQg^N;V&=GT4)lQG9=)>@Xfh*t0cI5un*tzzDRT`&pwQ!X(_ zw4!IA41|X*V2c9#I#mr8S;i|v&f;{Z8ER&w)g)$6(;C6{qAKp^q;5UvPg{oGmK*do zGQHD!f_FhD1NhXI@Tm@7@>GK>?9U&c5UuyIkP#|37SRkY*CPzCtF zks1ppjD_Rwsti3+?pMIj<)b32of(dDh|a&n6@;y_0#`-{8Pkv`QGS*?gv*$P{rQqF z&kTD)cW&PD(>eXJeO$k6cUj!J3?W*WU9L=jiaGIlX6Im)K9^Tgos(P^pG%@o_#Zs|2G4rk9 z;gVAYDC?91{9A&n7o?Xn>FJeLlCf79Bgw9k69l>}VrC~NvA~AUES8VUpo8)ox^^eJ zwu|NcGM;0LgLQYRM8O?3mT)0!(x_67-o|s89%m)e-aO-<=XS%NaJ#T1phN}wW#OE| z43lPsjXSwX`Kj!~+!6Us?l7OOmFOX zFRpxEOCqwO~w-Efj0L1OFimw(R?K-d8uib?ni#iOA>HdUEJ0 zD55y7v>9PF!a9UC2x|diw!ql%=+U~N!98`s5FI3bFJS+HAl;6o%42@oG7^l*gCP>r zwjb&DkA?H^8AjeOAA-|_0rVM$BFBDTA!XMf$eOTa(Y{C|N zXUy2Kt)ao&yJ>rS&n9nAdraT4&D+s+n|B9GZ0hm$^tQzlS&5RZn>)RoJuw3$x3+HI z7*FZJnckppcogXXQ&mAxM+M{uWBg#uFfD!?1kb6XwK)m3^SlZ^}n;~LPrp~eLK4-P1#-`f{O zt{FK^-a>(6!njc2oX{^6xDXr@S(r4uZ7rB8KHEQCGNXK?I_j#6S{F^|ugX-~UHnfB z>o1!vUs!%-`Bba4YNG9RbNTex%z>zRF%yBF1PUzm(aA1My?E2}fi^@wvQ`E6EYFRd+Td);Eug2_&h#b2Wr~9I&`fHq8o1gBv z`jzamnbMhq7b-4pd8KOYLXLUz;2F)Ne8Fs;s*0LR7fMQ}gO4nm>3(F@WXJ1y1#eny zXXID2LB`dbxN4WpAo|h%nW86qF6PZCpK?TN*F?+LUb3xy*7R)mbC#%iV`Sd2@hT{3 zpU`D0B9FVkJlT5k@Kj+myL7?nn$)4`YAAvR2J5`u`De%n@!*48%VhUi`BcYD(NQY>!Xd+2WN_A4qR9iEm=9&Fn93jrg>-UkDTq7ob9hUH(qk|{q|Ky zU(|fZ{cTs}DCfg#+hkmRCm)2PjeD-VRll`}d!eGLOQCtGsH}^ZzpSe2Y|y=2pVg(| z3B{V=CY1D6AxpneN7hpF(Iy*dU!Ks#k5rXs7gzqJey*Offz2_m99Hwr4OD zq@RPF0)Do+xS9t_KTu+j@lI*1te)Q@{e4;8?mldbyCLjuW5e_W#5`(x8Z%VX9~>JB z(fydb$u zq35NdN}HW2X=L|~;L$)oYe64_@?L3sWix1bx^kCHpm>2`_a>T+G3=4%ARvdQMoC|N z08BYt?NZJ`#+}j^t55Q7skEk8_Pl0xQ_Xc*gt)`zkPQl1Ace`(4i4`jIK1;hNbXqx z&YVYb?J1rfwxrvI@D>-ZWl2gP0y}0C_Y9B+E(CrJTpL9a0WlN-QW6h%c?5!M#tP6B zGSk(SGP(kdm;O z=S~eNxKjZjHRj;TkPU2%hZ`)f`9#ap3@c@CIA6>k=TEuC{I~+3OqrOUp+dfNaZzKW zG?X%#ls%*%<>%x{kMUq$I49r{Wwb)fj^9v-2Hg-Y5Y1FZDgr#I4BOMxhj_%Chd5Ft z!n*^=suoR!Ok7r97Womj(fT#)soXXeE7L8TPeV4bACw$ zUm<;e$!|;>;*U`m&>DQQ>Xn{pa`8_}KWSQFifLgZ9|T_N2YR;0NBxp@=~C5oS&cxy zBK0h-s{JZ7_%)JvLHOB;2Lxfq`qlJnki4!Igtp$*^+)JMOvweIl0GFpzVw08Cn4QQ zrvNX==!5V>rvZX64?zYyBIsG^&SmBNHR--(kCw8E@_0pdG_cxu(S%bav-$_KhnBD8 z`Q_46D{6T`dSk_%UK@M*iYZ6wU?3Dz1bo1`@#XA~GkxTd>?EY@y!#R+9|edRcC>bE zZu7SFZ1%SGvNMRzNs}vGaA3x!Vl@NjP;@F8w{|+;ikc{c}_=C`Tz!&t#v@DJ%A$<+IP{y-mbWi-~)du`W zLhO)?$%2D1Rmh8VV~T@gzM+^7wD2X)-18`Y8bO6!s<9(&K)Qcbn^ptyARIzTzMUn+ z58(t;==Y6=#;Bi`LA4WD^-IcGXY)gR1lX8$eDi|IGQMfSlskUQWmDc{uT+3pkpFg) z1RIq03FYN1`_z(Y<+N(5Ihs{DAp?r|x!}}d>F~7a%%`KKl53n!Ye&Xvnk+gwhJ5*K zb2PhR!Qq-ZINk8z*n^v=eGhg_Y8UdH(4uqtVALF`oEe-ey;Qg6JI$}ub}iWQr&c@= zUMMS{(k_%$P3acODlk?)rCBKROlg7EzH5O#t|~#FcO9I;a@k^^YLIS6t}INnUpBd> zLeuTj8>Ymlsb*r`Ws`ea28vCr1D#IZHC;S?^viV%PB*CFES}Us%z#n%84VI;K@zrY z(!O3?eP-1~>qMJ0u+5P(Fh4jvKM=SLeYkyHy(!KjZB zq-UNql}Xd#J>Ehr@QP~*=fYc#`DW6?dBg+UF2!(2S+aG46IwIK1H6zD3y`>E6?o80 zHOvrhOO_MIO-NxrJ}t82!hx7RvQnH7%AqBcrE38tDj-#i6GpTtQdmH$87WMi^9fRz z53jo5GpYkdmIq|eEZTq=0y`sSeAi8pLJN?>lmRzbj-2pgmd6c`upzaDD)1MBn6fu) z7wwD`+T#i^QfSXm!4A*mb-+eVtcQ2l)Pp5Zv|N}=i-8E@lx`HwkXj--0Gn>$kRUBS2*d&u z(wXKg9U^?*B4v!RxlX%3!XaH@x9CoBNO#zkvP}TH8b#p-4!M+ZNT6Y4Ia3|&TLGz+ zm;#knLA*ItvKmrr5+ye(>m&|T6FB6b)F@mOM*TjN%UW;Xkj1b*@dbMYS+{=?O|k+_2T&`lJC zm17BeL{8L;Ss9yKYWK~^GbP7G%oPQ2pNHI@K~iB6q(~}@%RD=0vp0A0k$*<}zJc&B z2*_ILGXTfTf^bL2HX#V_!$baBcP7<7#=7W5Or7Zl(LL3_c1n57H0{#c9U6nq&H9zuejQ%geZxB8} z_$|V9gx?{2i12?1%MpH$@CO7An5d_a;wNdUg*eqA{25ID4s5XoA+i<`-@szI@O0kO z?p@!pCHe5a<0H2Lsz>>cVT(Bg117T&j0o8Xu*1PC6vRALAGsP(_#Dj9T!eP4v;iTq z8}U;Jn-K0pfQK56<{{wKAZCE)ZuqXh$Io8Nv+-IHpLAZFbRW4nP&m*zYDIVrWx@AS zj^-n# z1}jvveCKXHV&w8{srO;aoaHHJw03Q@yycRudRUVv>f{=H^3<#DQgPg>shq7An# zd@iV~s}m|g#PA69d*}+ueuo2Y^ULlS_pmjMS5C?n>^hvB?0%kJc!ij^UV8HmYf~#m zR{_Kn!+u{N{^3pqAEbtTM`Ci{p5O&DtwmvV2t`<1!AsS=tSrO@R;H|DK76%@;b`dhV{C6*dk~vrqTa&J7%(twFhIp2)6mVl~TP6EO`GDEC3;JF|-hw#n6gtfb~=PaOEQjNnlaGX%WDgTY~6sDA*y7yJ$d zvxDJrjG-l02!Kbyw%nMhr?<6t^Lp>L&7E7k9j%@1_4Hn>h?B;nVs9OE0Q!I#J>c&@ zK=(^;1WIz8SOs@$2KjO+cjSls!?Qmdc}pQv^3r!htNC6@KDM)tjU_upD_9eRYy_l@ z)QG^wLyxg6>2Jo0UF?Wv^CES;?SoQ|8{g4FezYSMdSWi^bTq>jL2MuB}R z5tt8F!P6Tw_Gsuozz;nkwH;c7Ulro2!zUN@j3W)$crft1V#=A zV(L-f(V-C^p~YBS3J_BPQSl891$9tONCg<8`9p=OFw|HqpMCexDf~Qi*OB*?d3R&W z&tOaSFtj}6qx2r>#9jONd!%38wVXdMRYbO{8nIGPibULM8e=+1k4DB-Y*u)rs`0Ab zc&yNBR80{K`G;f5;ONj`h)utkIm1I=V9MqfSv<6`)5se*U9)?spXz>)8d%g z>m?)oUN1d~eN;<_?{VehUK7)J*&o7ry{t!nAVg0|U%AJwPHaii%lGUO4q=V$oNw3-9d-3z6)zHM=iz}qc>d`yBh(y3j2DNK}3#XGv*TJ z9Ne;)*XU`8#S~<)KNO$k%wuS7tQHKxhfy$b3MIM`a5-GpGy|;|8}hHAPeDca#|8ge zx^nz})io8D)4@yhUcdSl8J}~ND5Rg>Te!MQ#;c~*U*+(3W=Z^auJk>Yes@T*1B-)K ZIsBa=@!z@L_gMPfW75(S4=J$k{|Bw}gYdSlTJy*ZF$Rm1HXm z1Ap}E`{JL@xy!ld-h1x(-S=|fr1XpXrTka(^2`D};TJDNjwy%pUE~_JQn^<;xh;?@{DGcj|Bw{K1HJy}X>!VRPr`qYIRhxEh`rA?$h+(@KpgCOuyEolJ9n@M8@ z__QS4LYhYF1c4eQA-v=?t`a<=uwU9Q`P-Af6CaS+^IE4k%>GgPf@opi)b;C}&|*vS zyl%IocY{bhtk1ZvqzFXSG8BoDz$BfJCjvu}v2ctwK=F3=xbdf=D>-bsP0T65E~N-% z>^*a(=w-P%l}mkCP>xUo5Yq$r_(QqfJZ^VvGHtX8s&8h4IcT&x_q0Z|v)2or$aWz# zHW3;Pgvl_t&lK8!SrHX8u7qsKv>BCPCVIt4^wVH5Zk%-}M#V6?2THYy zIc^e#s1_~HmTLvYbZ32*5Be-04Cup#9NCt!tC*CWX;2hBScaHB4ddW5Anex>V>~y@ z&Q<0Dy-d^4Bc6AaS>?gJ($-*J#o6;xa<7<=^>gM^`uX$GKmfAWZLW?lHJ_}RWUmMd zI9%@htjgT1<;uh55)0V7d1b7hn%8IV)C1;_G%SRqz169iWto5-QS!4_Q!&Hp!m4@% zcIH+)dx02P)M=_WXZAgAA^CADDTo(lS;=+;rgzeVFo!t6jVA^HV;z)_i zP+7YQ2MnUsl8RQv%Hz?dIA>5VtpLL*&)T~v23bEfE|&Fmr7Pcjf$A2EBVDuw zwDVa-_O9Wt+Mn9|*7W1+(!zu5{CTt;no&E#GKA#_YY`17Z z@ko?*V5uf16V()sPK}3YP!9WDs!k4&psea{!ymd98<}p0c}?6Nh{n|1SR^_;77m1? zBur>G){6bA5$pxVCRnJ+7MU3^)=C;M&S`2x`9uf3ZhlIcJ1V%Ge1@ zIeXG;OkC6pM*BHi(cG<%R30TKu77Cktf%3ut?`g?KEL3QX5QvJWIShuYUcV#FqmA2 zy5|e*bJyOx?+s7W&kLI7Ew;H^4;v3_-n11Rtvj-A-s_v|{iWSKU*tTjxljNdE))v6 z_A@!&tBuif&Iu#$J5hBwaZ)_leM)!M)_f+ndA`7w{89NY#KQmns8ewDi#)%-*jeAZ z!kBbdZPyg$!c_bs?Sda3A9M+uuC2HUGdJH@SjGvMbtme4XZe;d?sj^MC#Ir=uE0ju z0I2%lD**NhXJcLlYz%7>vjhJ_Td^;C0wFXtPDerV8>oy0dK!*Rjmh*zRK$QP4yu~l zf@5?B^cb+PzmxL*dYx(Dj5$rFvQh~joO+U(V%6?%}(H9I}r$$ZX) zvHj5y??GpwzJmRvxdXiXee)j4cixIU@V;{%&mF}P&#yuWlf!vGn{FM2sl3}-rh5zu znwhEXUeU!^TZQy}L-N_S&m{Jp<;4TGtgMx<*hryA$Rw1wT?TqcJ&Jvy6f)L~IO~PH z#d8_tt!l*)FH~%Ct74BAC=OP-@?(h4C>+#PpiOvXLn(o)4kTeIA#Wvx3Li8{qY{M=ET zOg(4~sDVq3AU7@4G=sJzT~mGBl(8%dij&l!n_uKnyf}{jNpm8tka3IoLzLo-sd$M} zBKy+i!Zc(}xgw*HVx)ae{iRt`Xi-YQ(N?7tu+w9fI9VNqxY8EtzT06fVmS;_0EB0 z-^95Y*~QNKrauAkQ_Lzp-`jT|@%eUb=%h!{gu&j51@BYAQl z!`yg_$l)q56DagOY=y)@zmD)ewl@!kV{%|}Bp3^;CN5*bhxzdmi4Lk#ETZb=053L^ zphpg-3FIIe!KI`Bj_`rNuG!FM`Viz;6#(n@R{K&!l4iS5a8jP4VG8dg!cO>$y{}Ea zx8ax=HVanAOy9i4KGQpIEu7hM&T60A$=ox2i?;_|$sygjJkQbg6S@=nqaA1S>JCZg z@`{hHKbz-0B+cjK-MjYa;E7eoqVw+JLw)n^vO`8HfEU>r4Fnp$P z}uu2i@6bMeQfKp@zx+{s9a2UqzaN(^(91Wx`Jp;7}A#zZ4D>dS|D00^snPY>%W3%>p9WF6o6#*`yB0_m7u}yC+0t$x+0R`Wwr;f(=yrXYZt>0V zve7x+azf=1*8`-wF};b# z5dI7SFId!v@Fc=_5S~JK8sWPDJK=R{0Dk-FpJUPY5RwRgf$$8%Ujq0gI*xJ_;aP;| z5a7xy(7#F`ejnin2tPzX)}}u~cpl*egdZcM?gA+N1mUL$cqyQNjeu7H`ZI)w5MD%h z7~yXa{ubdSgqHzAz%u+{?i=#h(`)NW>8qggs{;KykYbnK5nkEl_q0M0{d;Wn4+yUz zoI!Y<<=wEO^el>agP?B!#4fuvfB|W_#7w)&6aS2D&LR8?;a?DbjqoPITL^C>{3`-~ ze|iU{e?xc|!0)1XZ0LU=TtN7e_YAdg+m67%%8h}Z?w#om0fAkq!i|9q+qN#0ea?mC zsT*9j?Ks}22>%JNz~`07gQ_*n=kSp>9qlJVNw+}!M|0~6yDH0-Bs%rjB~x5^bGGk(6icp>875mH|q z_&xC=TKNdUj?ls094T718pU=1RXZLIMpNIL^g%j29^9vDf`hSBHflj*Rs;{W{;j|o zB30szY)52I`m=RtVtg`4!Xe4cieG7MCtVbtTp=M`;CD0b({N zW7m3Y5-f>z%B|Z1h4&W4CFb0Gxz9!Hc*{{D^ zX~FZDI;VJ8pOaXj@|t*8@@?fUt@OUYUc9qY+`=>mZffQm#S^9vyD)?!2!|0KM8NGh zZ$fahuOFx=!^lpJyO70(p$I+#?&_j%u~!b1PDjzx5M|#$>9F6 zi6EhRY-7X~(UY(ojEu$1&9vkzg+M$dzyf;HSSB{)5t&DyYDgb3RT`V57$m9&at%Lclt(5%@#q@;7%ceBsvb^cEKF4` z9OfAOKiBu}U)qFAp&EkGiSgjr{wrq3M;;Ht_vHg%$DAzu<7P?BW!-Z%)>hRJ2#|?T zAV8xyMitvPSLVVzq8bDIKehq^J|etf(p%Y6a~?xF-LTi@_9Sk`K6?=MBJlUr5$H_w zQO!fxhUyNKI4SXEX}~g0S-mI?A?!!E2VoZBF@!(DZhQgwQeg7oYeZ#`RQUE03CXFL z<_mK_U*rZ`O zthn None: + """One-shot scope init — channels, math, default trigger.""" + print("CONFIGURING SCOPE...") + cmds = [ + "*RST", ":RUN", ":STOP", + ":CHANnel1:DISPlay ON", ":CHANnel1:INPut DC50", ":CHANnel1:PROBe 19.2", + ":CHANnel1:LABel 'CLK+'", + ":CHANnel2:DISPlay ON", ":CHANnel2:INPut DC50", ":CHANnel2:PROBe 19.2", + ":CHANnel2:LABel 'CLK-'", + ":CHANnel3:DISPlay ON", ":CHANnel3:INPut DC50", ":CHANnel3:PROBe 19.2", + ":CHANnel3:LABel 'DAT0+'", + ":CHANnel4:DISPlay ON", ":CHANnel4:INPut DC50", ":CHANnel4:PROBe 19.2", + ":CHANnel4:LABel 'DAT0-'", + ":TIMebase:REFerence CENTer", + ":TRIGger:MODE EDGE", + ":ACQuire:MODE RTIMe", ":ACQuire:INTerpolate ON", + ":DISPlay:LAYout STACKED", + ] + for c in cmds: + scope.write(c) + time.sleep(0.05) + print("SCOPE READY.") + + +def configure_for_lp() -> None: + """LP-mode: widen vertical range, falling-edge trigger on Ch3.""" + for ch in (1, 2, 3, 4): + scope.write(f":CHANnel{ch}:SCALe {LP_V_SCALE:.3f}") + scope.write(f":CHANnel{ch}:OFFSet {LP_V_OFFSET:.3f}") + scope.write(":TRIGger:EDGE:SOURce CHANnel3") + scope.write(":TRIGger:EDGE:SLOPe NEGative") + scope.write(f":TRIGger:EDGE:LEVel {LP_TRIG_LEVEL:.3f}") + scope.write(":TRIGger:SWEep NORMal") + scope.write(f":TIMebase:SCALe {LP_SCALE:.3E}") + scope.write(f":ACQuire:POINts {LP_POINTS}") + scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") + time.sleep(0.3) + + +def arm_and_wait(timeout_s: float) -> bool: + """:DIGitize + *OPC?. Returns True if trigger fired within timeout.""" + global scope + prev = scope.timeout + try: + scope.timeout = timeout_s + 2 + scope.write(":DIGitize") + return scope.ask("*OPC?").strip() == "1" + except Exception: + # Trigger timed out or scope locked up — reconnect. + try: + scope.close() + except Exception: + pass + time.sleep(1.0) + scope = vxi11.Instrument(SCOPE_IP) + scope.timeout = 30 + try: + scope.write(":STOP") + except Exception: + pass + return False + finally: + try: + scope.timeout = prev + except Exception: + pass + + +def save_lp(base_name: str) -> None: + """Save Ch1 (CLK+) and Ch3 (DAT0+) as CSV to scope's C:\\TEMP\\.""" + base = f"C:\\TEMP\\{base_name}" + scope.write(f':DISK:SAVE:WAVeform CHANnel1,"{base}_clk.csv",CSV') + time.sleep(2.5) + scope.write(f':DISK:SAVE:WAVeform CHANnel3,"{base}_dat.csv",CSV') + time.sleep(2.5) + + +# --------------------------------------------------------------------------- +# Non-blocking keyboard +# --------------------------------------------------------------------------- +class KeyReader: + def __enter__(self): + self.fd = sys.stdin.fileno() + self.old = termios.tcgetattr(self.fd) + tty.setcbreak(self.fd) + return self + + def get_key(self) -> str | None: + if select.select([sys.stdin], [], [], 0)[0]: + return sys.stdin.read(1).lower() + return None + + def __exit__(self, *_): + termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old) + + +# --------------------------------------------------------------------------- +# Video control +# --------------------------------------------------------------------------- +def video_start() -> None: + try: + requests.put(VIDEO_URL, + json={"action": "start", "mode": "static-pink"}, + timeout=3) + except requests.exceptions.RequestException as e: + print(f" VIDEO START failed: {e}") + + +def video_stop() -> None: + try: + requests.put(VIDEO_URL, json={"action": "stop"}, timeout=3) + except requests.exceptions.RequestException as e: + print(f" VIDEO STOP failed: {e}") + + +# --------------------------------------------------------------------------- +# Register snapshot from device (DSIM PHY + SN65DSI83) +# --------------------------------------------------------------------------- +def fetch_registers_snapshot(target_dir: Path, event_ts: str) -> None: + """GET /registers + /sn65_registers, print key indicators, save JSON.""" + combined: dict = {} + for endpoint, key in [("/registers", "dsim"), + ("/sn65_registers", "sn65")]: + try: + r = requests.get(f"{DEVICE_BASE}{endpoint}", timeout=5) + r.raise_for_status() + combined[key] = r.json() + except Exception as e: + print(f" REGISTERS: {endpoint} failed — {e}") + combined[key] = None + + # Quick-look indicators + sn65 = combined.get("sn65") or {} + regs = sn65.get("registers", {}) if isinstance(sn65, dict) else {} + csr_0a = regs.get("csr_0a", {}) or {} + csr_e5 = regs.get("csr_e5", {}) or {} + + if csr_0a: + pll_str = "LOCKED" if csr_0a.get("pll_lock") else "*** UNLOCKED ***" + clk_str = "detected" if csr_0a.get("clk_det") else "NOT detected" + print(f" SN65: PLL {pll_str} CLK {clk_str} (CSR 0x0A = {csr_0a.get('value')})") + + if csr_e5: + flags = [ + ("pll_unlock", "PLL_UNLOCK"), + ("cha_sot_bit_err", "SOT_BIT_ERR"), + ("cha_llp_err", "LLP_ERR"), + ("cha_ecc_err", "ECC_ERR"), + ("cha_lp_err", "LP_ERR"), + ("cha_crc_err", "CRC_ERR"), + ] + active = [label for k, label in flags if csr_e5.get(k)] + if active: + print(f" SN65: *** ERROR FLAGS: {', '.join(active)} " + f"(CSR 0xE5 = {csr_e5.get('value')}) ***") + else: + print(f" SN65: no error flags (CSR 0xE5 = {csr_e5.get('value')})") + + out = target_dir / f"{event_ts}_registers.json" + try: + out.write_text(json.dumps(combined, indent=2)) + print(f" registers → {out.relative_to(DATA_DIR.parent)}") + except Exception as e: + print(f" REGISTERS save failed: {e}") + + +# --------------------------------------------------------------------------- +# Event handling: archive recent captures and (for flicker) analyse +# --------------------------------------------------------------------------- +def archive_and_analyse(event: str, since_iso: str) -> None: + """ + Pull every CSV from the scope, move into data/{event}/{event_ts}/. + For flicker events, run csv_preprocessor on each LP capture and print a + summary table. Always pulls a register snapshot from the device too. + """ + event_ts = datetime.now().strftime("%Y%m%d_%H%M%S") + target = (FLICKER_DIR if event == "flicker" else GOOD_DIR) / event_ts + target.mkdir(parents=True, exist_ok=True) + + print(f"\n *** {event.upper()} EVENT @ {event_ts} ***") + + # Register snapshot first (fast, before scope transfer which takes longer) + fetch_registers_snapshot(target, event_ts) + + print(f" Transferring scope → {target} ...") + try: + copied, failed = ai_mgmt.transfer_csv_files() + except Exception as e: + print(f" TRANSFER ERROR: {e}") + return + print(f" {copied} file(s) transferred ({failed} failed)") + + # Move just-arrived CSVs out of data/ (flat) into the event folder. + moved = 0 + for csv in DATA_DIR.glob("*.csv"): + if csv.is_file(): + shutil.move(str(csv), target / csv.name) + moved += 1 + print(f" {moved} file(s) archived to {target.relative_to(DATA_DIR.parent)}") + + if event != "flicker": + return + + # Analyse the LP captures we just archived. + print("\n LP analysis (csv_preprocessor):") + print(" " + "-" * 78) + print(f" {'file':<46} {'lp_low_ns':>10} {'hs_amp_mV':>10} {'flicker?':>9}") + print(" " + "-" * 78) + + lp_files = sorted(target.glob("*_lp_*_dat.csv")) + for f in lp_files: + try: + m = analyze_lp_file(f) + lp_low = getattr(m, "lp_low_duration_ns", None) + hs_amp = getattr(m, "hs_amp_mV", None) + sus = getattr(m, "flicker_suspect", False) + print(f" {f.name:<46} " + f"{(f'{lp_low:.1f}' if lp_low is not None else '?'):>10} " + f"{(f'{hs_amp:.1f}' if hs_amp is not None else '?'):>10} " + f"{('YES' if sus else 'no'):>9}") + except Exception as e: + print(f" {f.name:<46} ERROR: {e}") + print(" " + "-" * 78) + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- +def main() -> None: + DATA_DIR.mkdir(exist_ok=True) + FLICKER_DIR.mkdir(exist_ok=True) + GOOD_DIR.mkdir(exist_ok=True) + + setup_scope() + configure_for_lp() + + print("\n" + "=" * 64) + print(" FLICKER WATCH — keys: f=flicker g=good q=quit") + print("=" * 64 + "\n") + + cycle = 0 + try: + with KeyReader() as keys: + while True: + cycle += 1 + cycle_ts = datetime.now().strftime("%Y%m%d_%H%M%S") + cycle_caps = [] + cycle_end = time.time() + CYCLE_S + + video_start() + print(f"\n[cycle {cycle:03d} {cycle_ts}] video ON " + f"({CYCLE_S:.0f}s window)", flush=True) + + event = None + last_tick = 0.0 + while time.time() < cycle_end: + seq = len(cycle_caps) + 1 + base = f"{cycle_ts}_lp_c{cycle:03d}_{seq:02d}" + remaining = lambda: max(0, cycle_end - time.time()) + + if arm_and_wait(TRIG_TIMEOUT_S): + try: + save_lp(base) + cycle_caps.append(base) + print(f" + cap {seq:02d} [{remaining():4.1f}s left]", + flush=True) + except Exception as e: + print(f" save error: {e}", flush=True) + else: + # Trigger timed out — print a heartbeat at most every 2s + if time.time() - last_tick > 2.0: + print(f" ... waiting for trigger " + f"[{remaining():4.1f}s left]", flush=True) + last_tick = time.time() + + key = keys.get_key() + if key in ("f", "g", "q"): + event = key + break + + video_stop() + if event is None: + print(f"[cycle {cycle:03d}] ended " + f"({len(cycle_caps)} cap(s), no event)", + flush=True) + + if event == "f": + archive_and_analyse("flicker", cycle_ts) + elif event == "g": + archive_and_analyse("good", cycle_ts) + elif event == "q": + print("\nQUIT requested.") + break + + # Brief pause before next cycle so video stop settles. + time.sleep(0.5) + + except KeyboardInterrupt: + print("\nInterrupted (Ctrl+C).") + finally: + try: + video_stop() + except Exception: + pass + + +if __name__ == "__main__": + main() diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index 59a4f3f..2a44acc 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -1535,37 +1535,30 @@ def run_interactive_test() -> None: def run_continuous_test() -> None: """ - Continuous LP capture loop — no kiosk restart between iterations. + Continuous LP capture loop — pipeline restart per iteration. - Designed for periodic flicker that repeats roughly every second once the - display pipeline has started. The kiosk is started once; the scope - re-arms on the NORMAL LP trigger (VBLANK LP-11 → LP-01 falling edge on - Ch3) after each capture, effectively sampling one random display frame - every ~7 s. + The pipeline (kiosk) is stopped and restarted on every iteration so the + scope captures the startup LP-11→LP-01 transition that triggers the flicker. + The scope is configured and armed BEFORE _start_video() is called so that + the first HS burst after pipeline load is always captured. - With flicker on ~1/60 frames the expected time to first catch is - ~60 × 7 s ≈ 7 minutes of unattended running. + Sequence per iteration: + 1. _stop_video() — tear down pipeline + 2. _configure_for_lp() — set scope channels + trigger (takes ~400 ms) + 3. _start_video() — reload pipeline (LP transition fires ~1-2 s later) + 4. _arm_and_wait() — scope captures first LP-11→LP-01 on Ch3 + 5. Transfer + LP analysis + 6. If suspect: LP bit decode + byte comparison vs last clean capture - When the LP rule-based detector flags a suspect: - • The LP file already on disk (10 GSa/s, 100 ps/sample) is decoded - directly using single-ended CLK+/DAT0+ thresholds — no extra capture. - • proto_decoder checks the HS-SYNC byte position (misalignment) and the - Lane 0 pixel content (corruption). - • compare_lp_captures() shows byte-level diffs vs the last clean capture. - - Press Ctrl+C to stop. No report is written (raw LP/proto CSVs are kept). + Press Ctrl+C to stop. No HTML report is written; raw LP CSVs are kept in data/. """ import proto_decoder as _pd print("\n===== CONTINUOUS CAPTURE MODE =====") - print("Kiosk starts once. Scope re-arms on each VBLANK trigger (no restart).") - print("LP-only per iteration; LP bit decode fires directly on LP suspect files.") + print("Pipeline restart per iteration — captures startup LP transition.") + print("LP bit decode fires automatically on flicker suspects.") print("Press Ctrl+C to stop.\n") - _start_video() - print("Waiting 5 s for display pipeline to stabilise...") - time.sleep(5.0) - iteration = 1 clean_count = 0 flicker_count = 0 @@ -1575,11 +1568,20 @@ def run_continuous_test() -> None: while True: ts = datetime.now().strftime("%Y%m%d_%H%M%S") - # ── LP capture ────────────────────────────────────────────────── + # ── Stop pipeline, configure scope, then restart pipeline ───────── + _stop_video() + time.sleep(0.3) + + # Configure scope while pipeline is down — scope will be ready before + # the first LP edge fires after _start_video(). _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") - ok = _arm_and_wait(timeout=5) + + _start_video() + + # ── LP capture on startup transition ───────────────────────────── + ok = _arm_and_wait(timeout=10) scope.write(":TIMebase:POSition 0") _restore_hs_config() @@ -1598,7 +1600,7 @@ def run_continuous_test() -> None: iteration += 1 continue - # ── LP analysis (quiet) ────────────────────────────────────────── + # ── LP analysis ────────────────────────────────────────────────── lp_summaries, suspects = _analyze_lp_files(ts, iteration) if not suspects: @@ -1662,7 +1664,7 @@ def main_menu() -> None: print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") print("5. START INTERACTIVE FLICKER TEST (kiosk restart per iteration)") - print("6. START CONTINUOUS CAPTURE TEST (no restart; LP bit decode on flicker)") + print("6. START CONTINUOUS CAPTURE TEST (no restart; proto decode on flicker)") print("7. EXIT") choice = input("\nSELECT OPTION (1-7): ").strip() diff --git a/proto_decoder.py b/proto_decoder.py index 1f9a7ef..6168d7f 100644 --- a/proto_decoder.py +++ b/proto_decoder.py @@ -44,6 +44,9 @@ DSI_DT_RGB888 = 0x3E DSI_DT_HSYNC = 0x21 # short packet — H sync start DSI_DT_VSYNC = 0x01 # short packet — V sync start +# Known-valid DSI data types used in sync-byte validation (VC=0 + DT in this set) +VALID_DSI_DT = {0x01, 0x11, 0x21, 0x31, 0x08, 0x09, 0x19, 0x29, 0x39, 0x3E} + # MIPI D-PHY HS sync byte (transmitted at start of each HS burst, all-lanes) HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire) @@ -149,23 +152,70 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False) N = len(v_dat) # --- Single-ended LP path --- + # LP-01 + LP-00 + HS-PREPARE + HS-ZERO form a continuous "LP-low" region where + # DAT+ < 0.25 V and rolling std < 45 mV. The LP-low region ends when the first + # '1' bit transition in 0xB8 causes rolling std > 45 mV. Start bit decoding a + # few bits BEFORE that spike so the phase search can find complete 0xB8 near byte 0. if single_ended: - min_lp01 = max(2, int(20.0 / dt_ns)) - run = 0 - lp01_end = None - for i in range(N): - if v_dat[i] < LP_SE_LP01_THRESH_V: - run += 1 - else: - if run >= min_lp01: - lp01_end = i - break - run = 0 + LP11_THRESH_SE = 0.8 # V — LP-11 state (DAT+ high) + LP_LOW_V_SE = 0.25 # V — LP-01/LP-00/HS-ZERO are all below this + HS_STD_V_SE = 0.045 # V — rolling std above this → first HS data bit + LP_LOW_MIN_NS = 5.0 # ns — ignore LP-low runs shorter than this + LP_MARGIN_NS = 25.0 # ns — start decode this far before first data bit - if lp01_end is not None: - skip = max(1, int(50.0 / dt_ns)) - return min(lp01_end + skip, N - 1) - return None + win_samples = max(10, int(1.0 / dt_ns)) + try: + from numpy.lib.stride_tricks import sliding_window_view + rstd = np.zeros(N) + wins = sliding_window_view(v_dat, win_samples) + rstd[win_samples - 1:win_samples - 1 + len(wins)] = wins.std(axis=-1) + except Exception: + rstd = np.array([v_dat[max(0, i - win_samples):i + 1].std() for i in range(N)]) + + # Find LP-11 end (first sample below LP11_THRESH_SE after LP-11) + lp11_end_idx = None + in_lp11 = False + for i in range(N): + if v_dat[i] > LP11_THRESH_SE: + in_lp11 = True + elif in_lp11: + lp11_end_idx = i + break + if lp11_end_idx is None: + return None + + search_end = min(lp11_end_idx + int(2000.0 / dt_ns), N) + + # Find LP-low plateau start: first sustained block of v < LP_LOW_V_SE + # AND rstd < HS_STD_V_SE (the LP-11 fall edge has high rstd so we skip it). + min_lp_run = max(5, int(LP_LOW_MIN_NS / dt_ns)) + lp_low_start = None + run = 0 + for i in range(lp11_end_idx, search_end): + if v_dat[i] < LP_LOW_V_SE and rstd[i] < HS_STD_V_SE: + run += 1 + if run >= min_lp_run: + lp_low_start = i - run + 1 + break + else: + run = 0 + if lp_low_start is None: + return min(lp11_end_idx + max(1, int(50.0 / dt_ns)), N - 1) + + # Find LP-low plateau end: first rstd > HS_STD_V_SE after the plateau begins. + # This is where the first '1' bit in 0xB8 creates a large voltage transition. + lp_low_end = None + for i in range(lp_low_start, search_end): + if rstd[i] > HS_STD_V_SE: + lp_low_end = i + break + if lp_low_end is None: + return min(lp_low_start + max(1, int(50.0 / dt_ns)), N - 1) + + # Start decode LP_MARGIN_NS before the first '1' bit of 0xB8 so the 8-phase + # search sees the complete sync byte near byte 0. + margin = max(1, int(LP_MARGIN_NS / dt_ns)) + return max(lp_low_start, lp_low_end - margin) # --- Differential LP-triggered path --- # LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns) @@ -379,21 +429,37 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(" ERROR: Too few bits decoded") return None - # Try all 8 bit-phase offsets to handle framing uncertainty from LP-00 CLK edges. - # LP-00 CLK edges before HS starts produce garbage bits; the correct phase is - # the one where 0xB8 appears earliest in the byte stream. - raw_bytes = None - sync_idx = None + # Try all 8 bit-phase offsets. Pass 1: find earliest 0xB8 whose next byte has + # VC=0 and a known DSI DT (validated sync). Pass 2 fallback: earliest bare 0xB8. + raw_bytes = None + sync_idx = None best_phase = 0 - best_sync = len(bits) # sentinel: "not found" + best_sync = len(bits) + validated = False + for phase in range(8): rb = bits_to_bytes(bits[phase:]) - si = find_sync_byte(rb) - if si is not None and si < best_sync: - best_sync = si - best_phase = phase - raw_bytes = rb - sync_idx = si + for i in range(len(rb) - 1): + if rb[i][1] == HS_SYNC_BYTE: + next_byte = rb[i + 1][1] + if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT: + if i < best_sync: + best_sync = i + best_phase = phase + raw_bytes = rb + sync_idx = i + validated = True + break # stop at first validated pair for this phase + + if not validated: + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) @@ -405,7 +471,8 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): else: if verbose: t_sync = raw_bytes[sync_idx][0] - print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") + qual = "validated" if validated else "bare" + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})") # Data bytes after sync data_bytes = raw_bytes[sync_idx + 1:] # skip the sync byte itself @@ -507,8 +574,19 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(f" HS burst start: {t_hs_start_ns:.0f} ns " f"({hs_duration_us:.1f} µs available of ~18 µs full burst)") + # Auto-detect HS common mode from the first 200 ns of the HS burst. + # CLK+ common mode (~217 mV) and DAT+ common mode (~104 mV on this board) differ; + # hard-coding one value for DAT+ breaks the decode. The median of the HS burst + # gives the correct bit threshold for any board without manual calibration. + hs_probe_end = min(hs_start_idx + max(1, int(200.0 / dt_ns)), len(v_dat)) + dat_common_mode = float(np.median(v_dat[hs_start_idx:hs_probe_end])) + dat_common_mode = max(0.030, min(0.250, dat_common_mode)) # clamp to 30–250 mV + + if verbose: + print(f" DAT+ HS common mode: {dat_common_mode*1000:.0f} mV (auto-detected, used as bit threshold)") + bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, - dat_thresh=LP_SE_DAT_THRESH_V, clk_thresh=LP_SE_CLK_THRESH_V) + dat_thresh=dat_common_mode, clk_thresh=LP_SE_CLK_THRESH_V) if verbose: print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") @@ -518,18 +596,35 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(" ERROR: Too few bits decoded") return None - raw_bytes = None - sync_idx = None + raw_bytes = None + sync_idx = None best_phase = 0 best_sync = len(bits) + validated = False + for phase in range(8): rb = bits_to_bytes(bits[phase:]) - si = find_sync_byte(rb) - if si is not None and si < best_sync: - best_sync = si - best_phase = phase - raw_bytes = rb - sync_idx = si + for i in range(len(rb) - 1): + if rb[i][1] == HS_SYNC_BYTE: + next_byte = rb[i + 1][1] + if (next_byte >> 6) == 0 and (next_byte & 0x3F) in VALID_DSI_DT: + if i < best_sync: + best_sync = i + best_phase = phase + raw_bytes = rb + sync_idx = i + validated = True + break # stop at first validated pair for this phase + + if not validated: + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si if raw_bytes is None: raw_bytes = bits_to_bytes(bits) @@ -541,7 +636,8 @@ def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): else: if verbose: t_sync = raw_bytes[sync_idx][0] - print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") + qual = "validated" if validated else "bare" + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase}, {qual})") data_bytes = raw_bytes[sync_idx + 1:] header = parse_long_packet_header([b for _, b in data_bytes[:8]])