From 9c75598728caa09c0339291c0b93e819f3d04196 Mon Sep 17 00:00:00 2001 From: david rice Date: Mon, 27 Apr 2026 13:58:09 +0100 Subject: [PATCH] Updated --- __pycache__/proto_decoder.cpython-312.pyc | Bin 20185 -> 34517 bytes device_server.py | 90 ++++- mipi_test_interactive.py | 185 +++++++++-- proto_decoder.py | 385 ++++++++++++++++++++-- 4 files changed, 604 insertions(+), 56 deletions(-) diff --git a/__pycache__/proto_decoder.cpython-312.pyc b/__pycache__/proto_decoder.cpython-312.pyc index c32e0cf6420d08b4244fad56a47843f1bf253b19..c402fbaa98f6fe30b9d6c3e2bc3cb1f5308373b4 100644 GIT binary patch delta 14658 zcmc(GYj_mbm0(rB)eos#-K|IKReG!Seu69zk^q4aLSTe3#`g5Kx>`ce>XxfY7|GRc zIpb`E*YhDt}5|LpywQ}=o9z31M0&bjB@zHy%Z@Q)Pf|6nrdDG1<&hDfo=wl8|L$z% z<B5wBHl&N#dN$`p`Y6LT zu(|LpPNUdH)&a2+Ht$91XepZyu#7DLSk5-ZTeF4UiV+Q4^rHG`CFF);oy4*k$gX0W z*<$FrMDCs~^%jii*fJ<_KCq-5N~+lww!&M(RzjO9Z!L7-gxor|+FQ#irnMABa#d^% z47{GLg&{R~8`-)ORXG&J=@gW=;S$mY0!>XRrWDTB%M`zV>|;jE}l}NbRHZ)zvQF zgp13^#svrp0ZbPR2AF{Z(yui+X09li!R+igB&};~A&$YS#Rw_@@EQON2^{I$MEVBA za>sR0{3O=@CY@scjy=V=a7LP=Vw_VHHww&2Dv-kIun6thvygGGuY9Vhq zTQcd2+qe&eN$4gdO}!x*S~2>DU(U1i$TVbm!48?Ore`eE~hi3UHU@lZ#wlmQy1-w z-lG_$Jc_5wiNIfyzK#2X#DKnlP8v>EnwZG_&bJ3!4$lOlhAwh}c9! zR@go^ptfDWJwk$Lj$;8jJ1FWYToHXROUx2837ykLdu;32MZ0Lljl4#t+4ppv5$oC^ zW_uJgY_fQFFPExd!(pCB5!63RIn9#}rSwBf?_Pe)KkDp^tE&XlWMGSb}+10@OlIu>eV$iH6U!%OLS|_7RJpZC#zw$ zLYZr**EMjUvB`C)?_lpxpX)F)65yDRPyZ>?+{pO(T1GhL^)sATnB@F?tUWMR?&64Dm*K{Eud|-Ce}ozGx;f7=h8Nr% zjL$7FGQ!5#wCb9gmK(kyq6hw3!Y&lcxMTWw}D z(rYMi2&d^DANF~rM{P}dF5vUQQrGcCInj$Y(~%yAgbov{!wq# z`ha~BRgRDOxvf}B$qQ^W&Fh~W_j0fYoHl#wMbMl${zi$nM&%Y_TvKm8oG?dP^X3d&==8e*McHyyezGX+vvbAR`q+;;tNS+VJX#oZJfN;gO?lV0AG84NMML8lskk zZ_ohz-9WlZk&~h^-i}qCsDix&(qV{7ij$PS(xskGt0CF98T*#bP)RIg-s%%OaYn4hPqWHkrkF0x?Y5PRW|jDpEyvanIl*u$Tr$_{U`_^nf_oG>S)7X@2Ob_1fZ=$?_^|+R*?=E8HgE**h!e=ozhPo%^@R@%1LwG@F8r4u$$3ud{prZ zL%n4i;W5tis4;a2L|G)f1Wsyz6Sz)nOqdDbGD!v#V=IA4KP&T@eNZ#1aZgOZNq1Vg z0~C_?QEZNEgX_X#WBe4m#@JI)eUf@neR9@OmEbzYbGxwBZt2~;1GE(w!a-n{L?<@@ zH9-=&fUrC_1(kjd|9mMd%M1l)RhJpj=C0X`XZ5RE3q)7#IkWnk2J>A0HACUd&Rba( z3zK2*()3DJzci3vqP%IgOML#L8EUl?SOzePQ)i22HEYsx{`Rzm{_EEI)9%F{sjwi& zWPmE`>6FntvzN=cyraOVptGa{MU9GFTKY;+-5>150VAn^k4}zru+L$?aCoev8rTuA z+}sgN(5@&w995ol`?wiM^T-!a8Zbl~3uL>4JdIzStqIfJ=A{Mhd#S&H*tJe$h zT?7LI7@PP;|&5djrn?<3tKV1`2H4 zg4E25^*tNP`sR@hMSVTFS$;;A_Wf`33{wSdc=z}O++S;%+rI$b4OJUQHVn)J-yjz_ z2RAAhruX2%{Rcak9f3(ugZu%>SZ}W(=&9~A-5Z$<0}9#jB*zP~+SS2qXi$BF4wUsx zBcDkF_0DP>-%fH$SY|_qEC?%bIGPPDLw1$(HeSg#q&MpeVWK~%FHnC2(w|8;>)Rlm z)ldi%XR3tGjSQkZA{U!D=H|Vf+&7^f;T;><*v-%$rjKkHrpo8`dznUO2TTgW&OJ;g z(|C$|74f3H18w);zrrwwcXV>|kc5#iI}de2jdPe%;s{^Em=(|txEl}`GvS`{1>9^0 z^BG*aD0%3}PJsci(BR|Kv!L`LF*c-+8xB`qRB$+rK`sao63@ejBgz7AI8Y6QCftf^XybZzEd z9|c|j6mpG^@ubd14iHWz8qs<|Z4Zo$@Lr&yu7ep32)JmdlWrKDjQ7+8+&zHSji<7a?jK9(Ejf;a+yP+(nM4-# zV9Y0Vw=m{kU=-Oi_Y#6%NfRyG%*a*Aj^TcdvEN8PXxZe5DmKwC{Kj>)I=n0)fMC*ZE-lD44ZUgApG`r_!h9#0&ay_GT)t(*6 zsk&x&&M8*Sxe=y)DRZgkyY?k5RP3fJS1X<2;tP}Ep@kER`AfrBEZ_BB*|GHGReD`X z7wx6*P_(6KfSyacWwb;o7qzku z%xV!2=)m#5~=SJ^E&qJhP}azP_?z&B6YUBxsCC`m~Wa^kUf@-HE? zg#b#gf~!oCSq0JyRoQeQK%1T|LW>g!NJjcjRdGXc%)+7I3@HstQb)|%gBkE|X48Te zM6nf&EXWy2V2k2aLCvm(zO4e9LP-$QMJsD}Q1Kc;n@}V@?(Cuqq*tAlp;Cc~RixN5 zm|FRHWxOU$L1$(^1M?^pGek3446;nYtdwC^h-R2brI9|vv%p(38b0oB-dw2JxAO`BK%aMO8pa(H4s zTTJwyN7&8Bo$*%5FWdeiCFa4z9+A$~6!&z9g-KlnHz~A#gj_*V7kf(*e^Omxx4uXT zb<+Q?DG1pU7nvflXvUmEqE0f-ZK><#Z%Zlf5{qC4-C~|tlpvQgbv`L|d&DB)fm8wF z$a_}aw{g`gHl6J;b+93!4ibfrY>1tp^!3J8W>-ud+>L3X4w4vA2m3(jBby5by9eU^ ziNWrLRCOW+TkA{e;K0949h8S@5YYHSLQ+)(b<$gN8^93P z)ClTOTvx1Ro^Ni6E0hhWok8W+mS$$(@C5Hf`y0W_PmYX?dB#9Ig5{47_$CFsDuSgf zHU%_ap*{)iL9T%bi+OfTI2M=$bPVr>%K=z{m=N*3fI7j)Y;XrMDXVbE+JqLyo2B`5d0;AcM&WCI0X0KA@~k*?_t(oA-Ig-TL`|5;6DOzD!6fs z`w@Hx!4d-G;oNsa2)~Em`w0FT0cx4t-yrw_f*&Hdf*_$CbN>mG?;}7BnEP7By21XRxJ_B9R!~sxQpOZ1p5(uhTwOU z3TTw7@3qDBJxhY{uDHsV0ZVrS{&@-{$_(|XnQ~MvJiDk{&V5AcZ>`rg_eRopt>|`3 z!B&3<=xp2RO!OFfrRur-`J-WVn7yc9Y>8BEUVIKzwv3(hrxcymOMj}NY{d(9D4)F2 zx?=5<^4kt>gXWtuS2LLfYs69>DeuPy1Ej$|Y+%fuo1FJ9JQnU<+`3lMy3#ZBsggE6 zM$ha-!)b=}YFj6mKy1>DwtuwVv^eJ57u@Hz&UZ#E&YAu-Q@Qllo5qzlOY5Zi&ACK# z6_vNL9MT7y`Or;+?f<6=+idxNQen5=r^4=ysj$X`3R_~_&*(pBG48L>MAfjOF1P#{ z=6-qko9$0%a*_I z<5nW<;2W3+_vF4I*{t2k{T$1XaYYq=ej^X~_h3N!qAQF3O!{$GnPx)?IUCaFUHQH7 zJAB86f?=YX{p9wxq1e_QRYQNS9*Ay|quXy+Ay(vL8!84bad7E|fcMLz-FfZEtuU*; zz8(w+8fHAed7WG{4nj2oqV6IcRiWF5-1lYfi>G!&`it%o7+0jb!m*JZyMPXRg=f9s zVgjNd`{y?#b5CxFoMgNLar>a5d}zt-E_vL@x!#I>swd&9*d9&u1USx%UMQ{u8?|DY zYTWDg%gzjqn;RW>pNcBo!~8`nXGZK61P-kIOM(dofD0#udvfU`()&G6fvv7+dzNN5 z)O$j4dCT?#O3Pg%RDGW^?=4lmZ!rP9e6IaBYTFY;bnYagiy00G$3VtQuWsAjn5o|p z7zd$<@>~}CvR#O?V&^slb^yD$EP#X4O;TM~MJTBs@Eq<6E*tCNmf&(QcJG$paxpKl z6*vbb^AO}CC_qq%pa=njpcp|30bMhnL{I=`&ZVr1s9fU!HTU25(~<;VX_@dH(_iuNdn-} z+8c@qBDm@kl7o$MBF4MxcC2B8J~RY$4y5o6JUdciaQT)1knZOOV=vvlOj z=#?iTgQF4su@%+WEn@-j!6S>4E5^>5-nDyDH?_u@;JkX?w5Bsf(tB2P+b_Sm{Zm?b zORJaWcK$9@Oqm8~4wXQ%gi&qG{DO8}^rw1|TJeD7Ls@zk=?|_Qils#wE<6fR!E0HX z1;-BjE8kFoCk1NAK}d zGDPKrOGVX)@~qR?`8zjK1fM*`Dh7yCAhWk3X_-e=nTOGf3q_(&e z8@#CI)NsXC;8;L^rc>%9@X6C9K5-aBPzhdf?c^(ffE=~MUT$n;iV;Gfjmn0p7=6JE zm}&)#F&g)%d(00eKU7{5_Nf%Z7r_}belbIbdJgT`;X1HqaIb5iXRx=io?-6QUpq*B z2Heac@H3;8jj%}K!=CVR9#Edqs{`6TF`UZoXk-D2zGHpG@c0v!HZNFR2^w&y4uk(2 zP{fUJ*j`3A2KqQm1T9)2u;+m`jo#H5u6r$a1Xq=8tgB6QPYnFEj2o10w~r*84EUV} z1AcE}L@{Gwa$?C5$)oU|a*vON{NNJDI)mi+J5@xBiyBjBBr9=5r#k}+PMF1Q7Gh8v zwWX-#OBCmhV?|WgoEn)0?uC^Hp^6qv*(IVOqYwF_`Z$5S{QZxyWIYU(u_ZkEsC*NL zkI6A#HNQDxES>4SVKA>26fZWdmQ*Z0dPhl{+d(@@YX|<|C|zh<&UOBaT9v8!XQO?# zXZGZ(!LnknSa>GfbYb$Ez3#f9F0`(u^oA8+4eo{FbKYylvW1cBy1I2GG@9wZL&4`$ zBbCQ2Sl${6JO0G=EqQDSB9=JeOHT@9Rq8Qg8AdGH{7Uco3CDY^tHOB zR?P9fUVSG`jg~VtTFwlBiwEV9I$yQjq~ZP$VjdbV8u0*DCz*>3!eo()%xI8rjDoN7 z4aE>V0tnI~bv(g8;?3ZGuDTSQ?2N^Xa!eBdDUtRc=*fG4Ts(ZT##qX2A84gzvg5?B+P&StP?*1~2U z(}R0lC8|%-4;`#kRI#?#RrgpY9!x9yXyTPvP$PtZ>&YScD2mM@MvW(uPVZz{Pzwgb zY~*_{?1{(N9Ad=lfpjjG1a+d0%VHhy;w=x;K*fBB7hoI*$reINw#e8*FlO{dGr_cn zEJJakn-XYR3aPRf(sD5kP*V7$kryPuWWk~Vtlc+7<`0jG;(k&WDxETy zlsPx)S?+Zhts(;9+Oc;|+nBa{It`7wxnYzQ@EXYDWr22xn0UP=0z5wk?}Oy#z~q3o zqyBM_EwLd?`s3mx?uZqpCcK2>gCh+b;A^%E}GP#KO zK}mlI>UtE6Q|wstU4&A$q)1$Gr|{@VNm^NxvJY4WXnZ!ZNDD6X?_v@xnb0 zVu37_Op-UTZ}ka(;8}lE8*{UA8myxO5Y@n|Ls)MfLx@A-G%&r?+)2hjO=4c3VRP)1-H>+5)6fUUV@_lPy)ncC$<_|5jhHYU1OeB`J zS@lhWY3{MpU8_dRnSs*-k-W`IilvstQ<2>66=Tm$zL05a1#H-t{KJ!6uwd<$!mtTh^mdtfd%xye3@=#-Nm$3 z+P>Rkuw8<+!~out#ofhH@ouIl;X{j;f$X%)wea<|^zU}}-q)_EMw@g+vDu>fUROv; zS&pcce!9D|@&HIcCXO@t*u z89wRexMPr-R@K${CZc*?02_}BjP%~=-k6PpNcj8A?y2&YKSePL~8D@4#B-0EHMD!6`kN6kE;0z-zn z64lC96P`$bs8;6HQx9b7s&Tkgb6AO{;a5Mv!q-gppP0(7naUzfBP*s+>308c zr6xSETEGHNQ|EwLT0KJFROx@B%D$$`p4Wv3B3(x!s_Yfj6Vl^*OG8=t^GDu%`rOlz znr+JkUCUYBvxc>-{3|sd6$~yt`i5tr`0a`d6>ryGs15VWB`u5Fua#{6u(17l!QgV% z{#iX#jW^%2T(EUHs}r8`Li4JR3ikfq=6jd324=Nux{xVit`2vEYa{8KS9I-Y-GTJx zurre0x}s}a)urFm8onmJAV#!BtA>m-n_t?z24*DH+qw(7aM`=H7i*Vv9~ygZ8T024 zy?Nx^kpY-v4w&TaeW!QbvX?IGe7o;LU-;OSkZQ%= zJG*Nwvofq%&TN>~t{O7uwp@p|#Q5@}VNvy+wD;1MDz8*t>*!x@+#6{afL{S+KKh~Y zAbFbM`q1e9{F550a6kR|#|3-I3yZ(4t>06p`;n@CPqQwnbGdNUTrTi(p)J#?9F%pD z?ojtGr(*EQK_?9!u-h|OM&CwLoU5Xg2ameTMYbX+t=u!T^xQzMMmFL~uMIpE;+{loPa$|3!D&Fk9Yc_gNfUwr zOdh}(aqEx+Q-gWPNad%e35O1O_9#F9xqzjAfX&F!Acuq;7qkO%Xhe&ude9Xqn+V9! z!Htac!DmM>RpRDCQApfd8@jDv)t>Zuw{c&Gdhkc_x1`tiy%y3@*?AEvb2ZBmht_o) zWv`4-mi2T6T{72tmx9l_K8${Xx zUDI7^pY_%zdecH#*mj|2or35h3;;f#Owi@@Fue}zrZUb>UsvYdQ0cz93mAaPL9~ni E3w;UiJOBUy delta 5307 zcmZ`d32+QrYAN5Xllf&lHFDWo#SdTJvYJzDF^lZG zAe^p}{Ytf54r@UUz`90mmMbpsr)%XFxe}gra_|Cox?T=w}Hp>5IjaawLsYS}GEYClnR8!1jKkIA7x{SO3Lz!^-=~H_Kkyj2%s+ zqB9Xio&xNBfPHlJcYTPjtAuG(08kt*m{{Te{<4+u?wNi;D=8is{0%-o&)Q^5)~-=p zHe{>jc`z3Yfiwj|Ih$-PV!Onq;)s)u7 zICOh7X)X!&^~I zN+jaRDI{rt(%swp!B@Ts&u}l&u#{BeT0E5$2TBne-RRKRBPgz-coIcZNiCktq%tZx zd;-m;WCdx{IF*^2PGvOIv8}fs9XYHcoJl5uyjY-+>t=k_>D0VR`2n8}IUb9l51w8| z-M#3Ph?0tgkv16aLTRi-6=u53eZA9Qc_0gl&Z!j z6N|7wKr0akb@QwwEl1x)swQC#rDALmmC{-U7YTMV)G&&r5{V5NCvjYf0mY6l^@qE+ zg~KSRie2>WBqNbj=Ba>_8LVn(QkjY;)dvBoRn3Dvj4y_ueieacTxnJSMXK}x+v(Spqqjo0HzH*ZAvk%v+*R}MmMY=g;}B`GqVbo zG(`-VHZ20zZ`$VQ1NTv+l6gq(srg7!#XISmA2(fN$HKjllZTIwoH%Scfm~-Kkq-Bo zuDlf^)Ow~BT+(#X{fLs3O+Ic4>P$RMCTd~=O$X$KV5kwZ`r%>qV_Jc_`o_>ID<0;nzgewt&sH~xmpZs zF$J)EW6wHBnS1bz-Ed4%tMWZyP1L% zv5^nS^KOxhbXSn~8fr!?I!K&#O>tSbQ3^>Y1uGA5@i401jdGw-bFg7}4Nm6iInBwC zi>+i|V|BbtqXm);nctjMH%a!OD$2sCt3N3hBi3J3=EQpJX?|ycfD;B?{>^G8xxYJOg;I zz`yDN_gLcAOF6gyQr#P_hNa=V!S?H!73Eg2kEqSfwskw_50b0RpD(l9#AW9@{@|sB z%ZK0q>UVar{Pywuix_5hWg+6tmIGm9fI6CF54H|$tZGYJik(*++f`AO3{ zCnYe3wdw$nS8=EiWK^#RbZC={8uVO?aDInd-PZm(&kvK|Y-zMpGI)+Ob%bhJRC-xd zs*A}edj)?9&VR*`6CHb?se7j5U#Xz;TN~^I_CMjsA8uXdD@kitv*oAu)x%xC5G?o& zvVXLC^=R+cc=Fxt+XZ>wx18h`+iUs##NPKgXevM7H<;6fLsA-H>W`mV1`u-~9rEE9 zSk{3J>Q*c(E?ww3pj&&k#dJ$g4~E`>%P=Khds#)O1Kv@k*)%ktT`2w`WQ0$xJ>i({ zLTUA({DC;Gw<9!i{P@x11L#mHlaNs|MLhkLTN!z7=$RqC9BmXBmW&3FZigyF8CZLI zV;OR!U+kpv4Krtvk?us)uz-N6G`a(-DA-rL=c6lfy}tnzeYL;d_Ejn{`B{HItV25* zKyg|j^fI8{MMC!BS5cCuXojNp#zXnT?1Pz}Y1dOlKm6b|(kV2m493AC4G%mIK&7{Y5c4#wl5;!TZ8`*wO1{3c zW~)m3`7e8Y{JkNV=rst9GMYR3iv078!PaDL=kabJOvx3vSo2(%cQRHq+k^g_c`Y$s z`mTnMN~`0SJt6**BgE?tQ1w6}0d)UjR~YR6{;p03wR?CWL;Snj1<{FvkJS3V9=$-a=+8^eFhR0Hu+dZ1-y2Uayt^8QHr3-!1j6 zQt@cKnuaq(#+*v~A{rwdMK}6`M;rKO$;*%Sm%m7H>=zd{$}H;>kIGF_-sGgqU& ze)vSbF&M<(rrTEmn3klflcCdfe1<%Bx;3X;;imOk=0}J&x(Zg3QH6ET16&!g&kUTtcaMBGKE+skq~)PpzS-0ZRX&t#nZ{2GtHpI*)#1mdZ-YA zlZrO4K+_AxqdhaI`t38-;vZA7=-pvj7V`y^`QA%(xk3RgZ+IJd|4gtiOsBNI0$W8& z=379!gr{bu1=A``s$u|tpW^(0f)E8Skj{xVew-Yen5btZz!y!;rX{RIih|&G0q+XG z`ufB%tDm{VYn<0 z_!?thWCDDVg~s)6@_X4f@=ErV+&{rN{wV;{F{^0OoP_avu+r__-HEuWnWd@*R~6B4 zM$zlXU3!ef`l@l>SaUSqaaLWexK{OC)%CuY_P((9X4P%yz+GqX zl6Ga`^1}71n|*IPclkg0xQweh#50wCRMj)mLVBMJY_W__fifI7lp7NHlOlg_2^M0z z>4-$+R5TL7KZiB0C-YA>RMV(1oe}oiR3yU8rzAD}AxHlF$!dE(ddNRKIYEABt}DmV z_gVTf%m3`MTIg~g1shp^BjM8q$P^}1sYE_HX^b#aQ=YVvfHbsVV%od2vh}HRFQkG@ zLfMB);3nXYQ+EJdxANrI7oH{8o@zLB9|ic@`+X1dyzk?4mO6l=d?{c4adQbws~UOx zM`QdGJa4~xXq}_amA?G*d#(4``lGa!?EFe= dict: + """Read all _SN65_SNAPSHOT_REGS in one pass. Returns {reg_hex: value_hex|None}.""" + result = {} + for reg, name in _SN65_SNAPSHOT_REGS.items(): + val, _ = _i2c_read_byte(SN65_I2C_BUS, SN65_I2C_ADDR, reg) + result[f"0x{reg:02x}"] = {"name": name, "value": f"0x{val:02x}" if val is not None else None} + return result + + def _run_settling_poll() -> None: - """Poll SN65DSI83 csr_0a + csr_e5 at 10 ms intervals for 1.5 s after restart.""" + """Poll SN65DSI83 csr_0a + csr_e5 at ~10 ms intervals for 1.5 s after restart. + Also takes a full configuration register snapshot at t=0 and t=end so callers + can detect bridge re-initialisation or configuration loss.""" t_start = time.time() t_end = t_start + SETTLING_DURATION_S + + snapshot_start = _sn65_snapshot() + readings: list = [] while time.time() < t_end: t_ms = round((time.time() - t_start) * 1000, 1) @@ -71,9 +124,18 @@ def _run_settling_poll() -> None: "any_error": bool(val_e5) if val_e5 is not None else None, }) time.sleep(SETTLING_INTERVAL_S) + + snapshot_end = _sn65_snapshot() + with _settling_lock: _settling_log.clear() _settling_log.extend(readings) + _settling_extra["snapshot_start"] = snapshot_start + _settling_extra["snapshot_end"] = snapshot_end + + +# Stores the two register snapshots from the most recent settling poll. +_settling_extra: dict = {} # Known Samsung DSIM register names (base 0x32E10000, i.MX 8M Mini) _DSIM_NAMES = { @@ -220,15 +282,37 @@ def _i2c_read_byte(bus: int, addr: int, reg: int) -> tuple[int | None, str]: @app.route("/sn65_settling", methods=["GET"]) def get_sn65_settling(): - """Return the most recent post-restart settling poll (csr_0a + csr_e5 over 1.5 s).""" + """Return the most recent post-restart settling poll. + + Includes: + snapshot_start — full register dump taken immediately before polling begins + snapshot_end — full register dump taken immediately after polling ends + readings — csr_0a + csr_e5 sampled every ~10 ms during the window + """ with _settling_lock: - readings = list(_settling_log) + readings = list(_settling_log) + snap_start = dict(_settling_extra.get("snapshot_start") or {}) + snap_end = dict(_settling_extra.get("snapshot_end") or {}) + error_readings = [r for r in readings if r.get("any_error")] + + # Diff the two snapshots so the caller can immediately see what changed. + changed = {} + for reg, info_s in snap_start.items(): + info_e = snap_end.get(reg, {}) + v_s = info_s.get("value") + v_e = info_e.get("value") + if v_s != v_e: + changed[reg] = {"name": info_s.get("name"), "start": v_s, "end": v_e} + return jsonify({ "n_readings": len(readings), "n_error": len(error_readings), "duration_s": SETTLING_DURATION_S, "interval_ms": int(SETTLING_INTERVAL_S * 1000), + "snapshot_start": snap_start, + "snapshot_end": snap_end, + "changed_regs": changed, "readings": readings, }), 200 diff --git a/mipi_test_interactive.py b/mipi_test_interactive.py index e1479f7..59a4f3f 100644 --- a/mipi_test_interactive.py +++ b/mipi_test_interactive.py @@ -21,7 +21,6 @@ AUTHOR: D. RICE 16/04/2026 import csv as _csv_mod import html import json -import subprocess import time import sys import requests @@ -38,7 +37,6 @@ import vxi11 from dotenv import load_dotenv import ai_mgmt -import rigol_scope from csv_preprocessor import (analyze_lp_file, LPMetrics, HS_BURST_AMPLITUDE_MIN_MV, FLICKER_LP_LOW_MAX_NS) @@ -420,7 +418,6 @@ except Exception as e: print(f"ERROR: CANNOT CONNECT TO INSTRUMENTS: {e}") sys.exit(1) -rigol_scope.connect() # --------------------------------------------------------------------------- # Scope configuration (identical to mipi_test.py) @@ -646,7 +643,7 @@ def _fetch_registers(ts: str, iteration: int) -> None: print(f" REGISTERS: SN65DSI83 error — {e}") combined["sn65"] = None - # SN65DSI83 post-restart settling poll + # SN65DSI83 post-restart settling poll + register snapshots try: resp = requests.get(f"{DEVICE_BASE}/sn65_settling", timeout=10) resp.raise_for_status() @@ -656,13 +653,14 @@ def _fetch_registers(ts: str, iteration: int) -> None: n = settling.get("n_readings", 0) n_err = settling.get("n_error", 0) dur = settling.get("duration_s", 0) + + # ── csr_e5 error summary ────────────────────────────────────────── if n_err: - # Print the first and last error readings for quick diagnosis err_readings = [r for r in settling.get("readings", []) if r.get("any_error")] times = [r["t_ms"] for r in err_readings] print(f" SN65 SETTLING: *** {n_err}/{n} readings had csr_e5 errors " f"over {dur:.1f} s (t={times[0]:.0f}–{times[-1]:.0f} ms) ***") - for r in err_readings[:3]: # show up to first 3 error readings + for r in err_readings[:3]: print(f" t={r['t_ms']:6.1f} ms csr_0a={r['csr_0a']} " f"csr_e5={r['csr_e5']} " f"pll={'Y' if r['pll_lock'] else 'N'} " @@ -672,6 +670,29 @@ def _fetch_registers(ts: str, iteration: int) -> None: if r.get("clk_det") is False) print(f" SN65 SETTLING: no csr_e5 errors in {n} readings over {dur:.1f} s" + (f" ({clk_false} readings with clk_det=False)" if clk_false else "")) + + # ── Register snapshot: print start values and flag any changes ─── + snap_start = settling.get("snapshot_start") or {} + changed = settling.get("changed_regs") or {} + + if snap_start: + print(f" SN65 REGS (t=0):", end="") + # Print a compact one-liner of key config registers + _key = ["0x0d", "0x10", "0x11", "0x18", "0x19", "0x1a", "0x1b", + "0x3c", "0xe0", "0xe1"] + parts = [] + for r in _key: + info = snap_start.get(r, {}) + parts.append(f"{info.get('name','?')}={info.get('value','?')}") + print(" " + " ".join(parts)) + + if changed: + print(f" SN65 REGS CHANGED during settling window ({len(changed)} registers):") + for reg, diff in changed.items(): + print(f" {reg} {diff['name']:16s} {diff['start']} → {diff['end']}") + elif snap_start: + print(f" SN65 REGS: stable (no register changes between t=0 and t={dur:.1f}s)") + except requests.exceptions.RequestException as e: print(f" REGISTERS: settling poll fetch failed — {e}") combined["sn65_settling"] = None @@ -713,21 +734,11 @@ def dual_capture(iteration: int) -> str: _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") - if rigol_scope.is_connected(): - rigol_scope.arm() if _arm_and_wait(timeout=30): _save_pass_channels("lp", iteration, ts) else: print(" SKIPPING LP SAVE.") scope.write(":TIMebase:POSition 0") # restore centred for subsequent passes - if rigol_scope.is_connected(): - DATA_DIR.mkdir(exist_ok=True) - v18_path = DATA_DIR / f"{ts}_pwr_{iteration:04d}_1v8.csv" - n = rigol_scope.read_waveform_csv(v18_path) - if n: - print(f" SAVED: {v18_path.name} ({n} samples)") - else: - print(" RIGOL CH1: waveform read failed — check connection and probe.") _restore_hs_config() # ── Pass 2: HS signal quality ────────────────────────────────────────── @@ -997,8 +1008,6 @@ def _lp_followup_capture(iteration: int) -> tuple[str, list[str], list[LPMetrics ts_fu = datetime.now().strftime("%Y%m%d_%H%M%S") _configure_for_lp() _set_timebase(LP_SCALE, LP_POINTS) - if rigol_scope.is_connected(): - rigol_scope.arm() if _arm_and_wait(timeout=10): _save_pass_channels("lp", iteration, ts_fu) else: @@ -1520,6 +1529,127 @@ def run_interactive_test() -> None: f"({len(events)} total suspect(s) assessed)") +# --------------------------------------------------------------------------- +# Continuous capture mode (periodic flicker — no kiosk restart) +# --------------------------------------------------------------------------- + +def run_continuous_test() -> None: + """ + Continuous LP capture loop — no kiosk restart between iterations. + + Designed for periodic flicker that repeats roughly every second once the + display pipeline has started. The kiosk is started once; the scope + re-arms on the NORMAL LP trigger (VBLANK LP-11 → LP-01 falling edge on + Ch3) after each capture, effectively sampling one random display frame + every ~7 s. + + With flicker on ~1/60 frames the expected time to first catch is + ~60 × 7 s ≈ 7 minutes of unattended running. + + When the LP rule-based detector flags a suspect: + • The LP file already on disk (10 GSa/s, 100 ps/sample) is decoded + directly using single-ended CLK+/DAT0+ thresholds — no extra capture. + • proto_decoder checks the HS-SYNC byte position (misalignment) and the + Lane 0 pixel content (corruption). + • compare_lp_captures() shows byte-level diffs vs the last clean capture. + + Press Ctrl+C to stop. No report is written (raw LP/proto CSVs are kept). + """ + import proto_decoder as _pd + + print("\n===== CONTINUOUS CAPTURE MODE =====") + print("Kiosk starts once. Scope re-arms on each VBLANK trigger (no restart).") + print("LP-only per iteration; LP bit decode fires directly on LP suspect files.") + print("Press Ctrl+C to stop.\n") + + _start_video() + print("Waiting 5 s for display pipeline to stabilise...") + time.sleep(5.0) + + iteration = 1 + clean_count = 0 + flicker_count = 0 + last_clean_iter: int | None = None + + try: + while True: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + + # ── LP capture ────────────────────────────────────────────────── + _configure_for_lp() + _set_timebase(LP_SCALE, LP_POINTS) + scope.write(f":TIMebase:POSition {LP_TRIG_OFFSET:.2E}") + ok = _arm_and_wait(timeout=5) + scope.write(":TIMebase:POSition 0") + _restore_hs_config() + + if not ok: + print(f" [{iteration:04d}] LP trigger timeout — retrying") + time.sleep(0.5) + continue + + _save_pass_channels("lp", iteration, ts) + + # ── Transfer LP files ──────────────────────────────────────────── + try: + ai_mgmt.transfer_csv_files() + except Exception as e: + print(f" [{iteration:04d}] transfer error: {e}") + iteration += 1 + continue + + # ── LP analysis (quiet) ────────────────────────────────────────── + lp_summaries, suspects = _analyze_lp_files(ts, iteration) + + if not suspects: + clean_count += 1 + last_clean_iter = iteration + print(f" [{iteration:04d}] clean " + f"({clean_count} clean {flicker_count} flicker)") + iteration += 1 + continue + + # ── Flicker detected ───────────────────────────────────────────── + flicker_count += 1 + _play_alarm() + print(f"\n[{iteration:04d}] *** FLICKER SUSPECT #{flicker_count} ***") + for s in lp_summaries: + print(s) + + # ── MIPI bit decode from LP files ──────────────────────────────── + # LP files are already local (transferred above). At 10 GSa/s + # (100 ps/sample, ~23 samples/bit at 432 Mbps) they have sufficient + # resolution to decode the HS bit stream directly using single-ended + # CLK+ / DAT0+ thresholds. No separate proto pass needed. + print("\n --- MIPI BIT DECODE (from LP capture) ---") + try: + result = _pd.decode_lp_capture(iteration, DATA_DIR, verbose=True) + anomaly = _pd.analyse_for_anomalies(result) + if anomaly["anomalous"]: + print(f"\n *** BIT-LEVEL ANOMALIES: " + f"{', '.join(anomaly['flags'])} ***") + else: + print(f"\n Bit decode: no structural or content anomalies " + f"(sync OK, packet type OK, pixel content OK)") + + if result and last_clean_iter is not None: + print() + _pd.compare_lp_captures(last_clean_iter, iteration, DATA_DIR) + except Exception as e: + print(f" bit decode error: {e}") + + print() + iteration += 1 + + except KeyboardInterrupt: + print("\n\nContinuous test stopped (Ctrl+C).") + + _stop_video() + total = clean_count + flicker_count + print(f"\nSummary: {total} iterations — {clean_count} clean, " + f"{flicker_count} flicker suspect(s) caught and decoded.") + + # --------------------------------------------------------------------------- # Menu # --------------------------------------------------------------------------- @@ -1531,23 +1661,18 @@ def main_menu() -> None: print("2. SETUP SCOPE (RUN FIRST)") print("3. CONFIGURE PSU (DEFAULT 24V / 1.5A)") print("4. PSU OUTPUT ON/OFF (CH1)") - print("5. START INTERACTIVE FLICKER TEST") - print("6. EXIT") + print("5. START INTERACTIVE FLICKER TEST (kiosk restart per iteration)") + print("6. START CONTINUOUS CAPTURE TEST (no restart; LP bit decode on flicker)") + print("7. EXIT") - choice = input("\nSELECT OPTION (1-6): ").strip() + choice = input("\nSELECT OPTION (1-7): ").strip() if choice == '1': print(f"PSU : {psu.ask('*IDN?').strip()}") print(f"SCOPE: {scope.ask('*IDN?').strip()}") - if rigol_scope.is_connected(): - print(f"RIGOL: {rigol_scope.rigol.ask('*IDN?').strip()}") - else: - print("RIGOL: NOT CONNECTED") elif choice == '2': setup_scope() - if rigol_scope.is_connected(): - rigol_scope.configure() elif choice == '3': psu.write('CH1:VOLT 24.0') @@ -1566,14 +1691,16 @@ def main_menu() -> None: run_interactive_test() elif choice == '6': + run_continuous_test() + + elif choice == '7': psu.close() scope.close() - rigol_scope.disconnect() print("INSTRUMENTS CLOSED. BYE.") break else: - print("INVALID ENTRY. PLEASE CHOOSE 1-6.") + print("INVALID ENTRY. PLEASE CHOOSE 1-7.") if __name__ == "__main__": diff --git a/proto_decoder.py b/proto_decoder.py index 71305a5..1f9a7ef 100644 --- a/proto_decoder.py +++ b/proto_decoder.py @@ -50,6 +50,22 @@ HS_SYNC_BYTE = 0xB8 # 1011_1000 in bit order (LSB first → 00011101 on wire) # Threshold for differential voltage: >0 = logic-1 (D+ > D-) DAT_THRESH_V = 0.0 +# Single-ended LP file thresholds (CH1=CLK+, CH3=DAT0+). +# In HS mode both CLK+ and DAT+ oscillate around the D-PHY common mode (~200 mV). +LP_SE_CLK_THRESH_V = 0.20 # CLK+ zero-crossing threshold for edge detection +LP_SE_DAT_THRESH_V = 0.20 # DAT+ HS bit threshold (> this = logic 1) +LP_SE_LP01_THRESH_V = 0.25 # DAT+ < this during LP-01/LP-00 SoT preamble + +# Expected Lane 0 payload byte pattern for a static-pink display (R=0xFF G=0x33 B=0xBB). +# With 4-lane RGB888, Lane 0 carries every 4th byte of the full payload beginning at +# offset 0. The 12-byte boundary aligns R/G/B of consecutive pixels so Lane 0 sees: +# offset 0 → pixel 0 R = 0xFF +# offset 4 → pixel 1 G = 0x33 +# offset 8 → pixel 2 B = 0xBB +# offset 12 → pixel 4 R = 0xFF (repeats) +# → 3-byte repeating cycle [0xFF, 0x33, 0xBB] on Lane 0. +STATIC_PINK_LANE0 = (0xFF, 0x33, 0xBB) + # --------------------------------------------------------------------------- # I/O @@ -72,6 +88,18 @@ def find_proto_files(cap_num: int, data_dir: Path): return Path(clk_files[-1]), Path(dat_files[-1]) +def find_lp_files(cap_num: int, data_dir: Path): + pattern_clk = str(data_dir / f"*_lp_{cap_num:04d}_clk.csv") + pattern_dat = str(data_dir / f"*_lp_{cap_num:04d}_dat.csv") + clk_files = sorted(glob.glob(pattern_clk)) + dat_files = sorted(glob.glob(pattern_dat)) + if not clk_files: + raise FileNotFoundError(f"No LP CLK file found for cap {cap_num:04d} in {data_dir}") + if not dat_files: + raise FileNotFoundError(f"No LP DAT file found for cap {cap_num:04d} in {data_dir}") + return Path(clk_files[-1]), Path(dat_files[-1]) + + # --------------------------------------------------------------------------- # Clock edge detection # --------------------------------------------------------------------------- @@ -102,25 +130,44 @@ def find_clock_edges(t_clk, v_clk, threshold=0.0): # HS burst detection # --------------------------------------------------------------------------- -def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): +def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0, single_ended=False): """ Find the start of the post-LP HS burst in the DAT trace. - For LP-triggered captures (trigger = DAT D+ falling at LP-11→LP-01 transition): - - CLK is in continuous HS mode throughout (215 MHz running) - - DAT shows LP-01 (diff ≈ -1 V) near t=0, preceded by HS data from the - previous line and possibly an earlier LP-01 at the start of the capture - - LP-00 follows LP-01 briefly (~50-200 ns), then the new HS burst begins - - To avoid the LP-01 from the previous line (at capture start), search - from N//4 onwards — the trigger LP-01 is at the capture midpoint (t=0) + single_ended=True — LP files (CH1=CLK+, CH3=DAT0+): detects LP-01/LP-00 + as DAT+ < LP_SE_LP01_THRESH_V for ≥ 20 ns, then returns + index 50 ns after the plateau ends (HS common-mode rise). + Search starts at index 0 — LP-11 pre-trigger (~1.2 V) + is well above the threshold so no false matches. + single_ended=False — Proto files (F2=CH3-CH4 differential): LP-01 detected + as diff < -0.5 V for ≥ 20 ns, search from N//4. - Returns index into t_dat just past LP-00, ready for CLK-edge sampling. - Falls back to original std-based method for HS-triggered captures. + Returns index into t_dat just past the SoT preamble, ready for CLK-edge sampling. + Falls back to rolling-std method for HS-triggered captures (differential only). """ dt_ns = float(np.median(np.diff(t_dat))) * 1e9 N = len(v_dat) - # --- LP-triggered path --- + # --- Single-ended LP path --- + if single_ended: + min_lp01 = max(2, int(20.0 / dt_ns)) + run = 0 + lp01_end = None + for i in range(N): + if v_dat[i] < LP_SE_LP01_THRESH_V: + run += 1 + else: + if run >= min_lp01: + lp01_end = i + break + run = 0 + + if lp01_end is not None: + skip = max(1, int(50.0 / dt_ns)) + return min(lp01_end + skip, N - 1) + return None + + # --- Differential LP-triggered path --- # LP-01: D+ = 0 V, D- = high → diff strongly negative (< -0.5 V for ≥ 20 ns) LP01_THRESH = -0.5 min_lp01 = max(2, int(20.0 / dt_ns)) @@ -138,7 +185,6 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): run = 0 if lp01_end is not None: - # Skip 200 ns past LP-01 end to clear LP-00, then hand off to bit decoder skip = max(1, int(200.0 / dt_ns)) return min(lp01_end + skip, N - 1) @@ -182,17 +228,25 @@ def find_hs_start(t_dat, v_dat, t_clk=None, window_ns=500.0): # Bit decoding # --------------------------------------------------------------------------- -def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx): +def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, + dat_thresh=None, clk_thresh=None): """ Sample DAT on every CLK edge (DDR) after hs_start_idx. + + dat_thresh: voltage threshold for bit decisions on DAT (default: DAT_THRESH_V). + clk_thresh: voltage threshold for CLK edge detection (default: 0.0). Returns list of (time_ns, bit) tuples. """ + if dat_thresh is None: + dat_thresh = DAT_THRESH_V + if clk_thresh is None: + clk_thresh = 0.0 + t_hs = t_dat[hs_start_idx] - rising, falling = find_clock_edges(t_clk, v_clk) + rising, falling = find_clock_edges(t_clk, v_clk, threshold=clk_thresh) all_edges = np.sort(np.concatenate([rising, falling])) - # Only edges after HS start hs_mask = t_clk[all_edges] >= t_hs hs_edges = all_edges[hs_mask] @@ -204,10 +258,9 @@ def decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx): bits = [] for edge_idx in hs_edges: t_edge = t_clk[edge_idx] - # Find nearest sample in DAT trace dat_idx = int(round((t_edge - t_dat[0]) / (dt_dat * 1e-9))) dat_idx = max(0, min(dat_idx, len(v_dat) - 1)) - bit = 1 if v_dat[dat_idx] > DAT_THRESH_V else 0 + bit = 1 if v_dat[dat_idx] > dat_thresh else 0 bits.append((t_edge * 1e9, bit)) return bits @@ -388,6 +441,18 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})") print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}") + # Static-pink pixel content check + if n_payload >= 12: + cc = check_pixel_content(lane0_payload) + match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes " + f"match static-pink pattern") + if cc["first_mismatch"]: + mm = cc["first_mismatch"] + match_str += (f" (first diff at offset {mm[0]}: " + f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})") + print(f"\n Static-pink check : {match_str}") + + pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None return { "cap_num" : cap_num, "hs_start_ns" : t_hs_start_ns, @@ -397,6 +462,135 @@ def decode_capture(cap_num: int, data_dir: Path, verbose: bool = True): "sync_idx" : sync_idx, "header" : header, "lane0_payload" : lane0_payload, + "pixel_check" : pixel_check, + } + + +# --------------------------------------------------------------------------- +# LP single-ended decode +# --------------------------------------------------------------------------- + +def decode_lp_capture(cap_num: int, data_dir: Path, verbose: bool = True): + """ + Full decode of an LP capture (CH1=CLK+, CH3=DAT0+) using single-ended thresholds. + + LP files are captured at 10 GSa/s (100 ps/sample, ~23 samples/bit at 432 Mbps) — + sufficient resolution to decode the HS bit stream without a separate proto pass. + Returns a dict with the same structure as decode_capture(). + """ + clk_path, dat_path = find_lp_files(cap_num, data_dir) + + if verbose: + print(f"\n{'='*60}") + print(f"Cap {cap_num:04d}: {dat_path.name} [LP single-ended]") + print(f"{'='*60}") + + t_clk, v_clk = load_csv(clk_path) + t_dat, v_dat = load_csv(dat_path) + dt_ns = float(np.median(np.diff(t_dat))) * 1e9 + + if verbose: + print(f" Window: {t_dat[0]*1e6:.2f}..{t_dat[-1]*1e6:.2f} µs " + f"({len(t_dat)} samples, {dt_ns*1000:.0f} ps/sample)") + + hs_start_idx = find_hs_start(t_dat, v_dat, t_clk, single_ended=True) + if hs_start_idx is None: + if verbose: + print(" ERROR: Could not find HS burst start") + return None + + t_hs_start_ns = t_dat[hs_start_idx] * 1e9 + t_hs_end_ns = t_dat[-1] * 1e9 + hs_duration_us = (t_hs_end_ns - t_hs_start_ns) / 1000.0 + + if verbose: + print(f" HS burst start: {t_hs_start_ns:.0f} ns " + f"({hs_duration_us:.1f} µs available of ~18 µs full burst)") + + bits = decode_bits(t_dat, v_dat, t_clk, v_clk, hs_start_idx, + dat_thresh=LP_SE_DAT_THRESH_V, clk_thresh=LP_SE_CLK_THRESH_V) + + if verbose: + print(f" Decoded {len(bits)} bits ({len(bits)//8} bytes)") + + if len(bits) < 16: + if verbose: + print(" ERROR: Too few bits decoded") + return None + + raw_bytes = None + sync_idx = None + best_phase = 0 + best_sync = len(bits) + for phase in range(8): + rb = bits_to_bytes(bits[phase:]) + si = find_sync_byte(rb) + if si is not None and si < best_sync: + best_sync = si + best_phase = phase + raw_bytes = rb + sync_idx = si + + if raw_bytes is None: + raw_bytes = bits_to_bytes(bits) + + if sync_idx is None: + if verbose: + print(f" WARNING: HS sync byte (0x{HS_SYNC_BYTE:02X}) not found in any bit phase — using raw byte 0") + sync_idx = 0 + else: + if verbose: + t_sync = raw_bytes[sync_idx][0] + print(f" HS sync byte found at byte {sync_idx} (t={t_sync:.0f} ns, bit phase={best_phase})") + + data_bytes = raw_bytes[sync_idx + 1:] + header = parse_long_packet_header([b for _, b in data_bytes[:8]]) + + if verbose and header: + print(f"\n DSI Header (lane 0):") + print(f" DI = 0x{header['DI_raw']:02X} → VC={header['VC']} DT=0x{header['DT']:02X} ({header['DT_name']})") + + lane0_payload = [b for _, b in data_bytes[1:]] + + if verbose: + n_payload = len(lane0_payload) + n_pixels_partial = n_payload * N_LANES // (BPP // 8) + print(f"\n Lane 0 payload: {n_payload} bytes decoded (≈ first {n_pixels_partial} pixels' components)") + + if n_payload >= 16: + hex_str = " ".join(f"{b:02X}" for b in lane0_payload[:64]) + print(f" First 64 payload bytes: {hex_str}") + if n_payload > 64: + print(f" ...") + + nonzero_idx = next((i for i, b in enumerate(lane0_payload) if b != 0x00), None) + if nonzero_idx is None: + print(f"\n All {n_payload} payload bytes are 0x00 (blank / border region)") + else: + print(f"\n First non-zero byte at payload offset {nonzero_idx} (0x{lane0_payload[nonzero_idx]:02X})") + print(f" → Corresponds to pixel group ~{nonzero_idx * N_LANES // (BPP // 8)}") + + if n_payload >= 12: + cc = check_pixel_content(lane0_payload) + match_str = (f"{cc['match_pct']:.0f}% of {cc['n_checked']} bytes " + f"match static-pink pattern") + if cc["first_mismatch"]: + mm = cc["first_mismatch"] + match_str += (f" (first diff at offset {mm[0]}: " + f"got 0x{mm[2]:02X} expected 0x{mm[1]:02X})") + print(f"\n Static-pink check : {match_str}") + + pixel_check = check_pixel_content(lane0_payload) if len(lane0_payload) >= 12 else None + return { + "cap_num" : cap_num, + "hs_start_ns" : t_hs_start_ns, + "hs_duration_us" : hs_duration_us, + "n_bits" : len(bits), + "n_bytes" : len(raw_bytes), + "sync_idx" : sync_idx, + "header" : header, + "lane0_payload" : lane0_payload, + "pixel_check" : pixel_check, } @@ -450,32 +644,175 @@ def compare_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128) print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)") +def compare_lp_captures(cap_a: int, cap_b: int, data_dir: Path, n_bytes: int = 128): + """ + Decode both LP captures and report byte-level differences in the first n_bytes. + """ + print(f"\nComparing LP cap {cap_a:04d} vs cap {cap_b:04d} (first {n_bytes} payload bytes on lane 0)") + + res_a = decode_lp_capture(cap_a, data_dir, verbose=False) + res_b = decode_lp_capture(cap_b, data_dir, verbose=False) + + if res_a is None or res_b is None: + print(" ERROR: Could not decode one or both LP captures") + return + + pa = res_a["lane0_payload"][:n_bytes] + pb = res_b["lane0_payload"][:n_bytes] + + n_compare = min(len(pa), len(pb), n_bytes) + diffs = [(i, pa[i], pb[i]) for i in range(n_compare) if pa[i] != pb[i]] + + print(f" Cap {cap_a:04d}: {len(pa)} bytes available, DI=0x{res_a['header']['DI_raw']:02X} HS_start={res_a['hs_start_ns']:.0f}ns") + print(f" Cap {cap_b:04d}: {len(pb)} bytes available, DI=0x{res_b['header']['DI_raw']:02X} HS_start={res_b['hs_start_ns']:.0f}ns") + + if not diffs: + print(f"\n No differences in first {n_compare} bytes — data content matches.") + else: + print(f"\n {len(diffs)} byte differences in first {n_compare} bytes:") + print(f" {'Offset':>8} {'Cap_A':>6} {'Cap_B':>6}") + for offset, ba, bb in diffs[:40]: + pixel_group = offset * N_LANES // (BPP // 8) + print(f" {offset:>8} 0x{ba:02X} 0x{bb:02X} (pixel group ≈ {pixel_group})") + if len(diffs) > 40: + print(f" ... ({len(diffs) - 40} more)") + + if len(pa) > 8 and len(pb) > 8: + pa_arr = np.array(pa[:n_compare], dtype=np.uint8) + pb_arr = np.array(pb[:n_compare], dtype=np.uint8) + xcorr = np.correlate(pa_arr.astype(float) - pa_arr.mean(), + pb_arr.astype(float) - pb_arr.mean(), mode="full") + lag = int(np.argmax(np.abs(xcorr))) - (n_compare - 1) + if lag != 0 and abs(lag) < n_compare // 2: + print(f"\n Cross-correlation peak at lag={lag} bytes → data may be shifted by {lag} bytes between captures") + else: + print(f"\n Cross-correlation peak at lag={lag} bytes (0 = no shift)") + + +# --------------------------------------------------------------------------- +# Pixel content verification and anomaly analysis +# --------------------------------------------------------------------------- + +def check_pixel_content(lane0_payload: list, n_check: int = 60) -> dict: + """ + Verify the first n_check Lane 0 payload bytes against the expected static-pink + pattern STATIC_PINK_LANE0. Returns a dict: + match_pct — percentage of bytes matching expected pattern + n_mismatches — number of mismatching bytes in the checked window + first_mismatch — (offset, expected_byte, actual_byte) or None + n_checked — number of bytes examined + """ + check = lane0_payload[:n_check] + if not check: + return {"match_pct": None, "n_mismatches": 0, + "first_mismatch": None, "n_checked": 0} + mismatches = [ + (i, STATIC_PINK_LANE0[i % 3], actual) + for i, actual in enumerate(check) + if actual != STATIC_PINK_LANE0[i % 3] + ] + return { + "match_pct": round((1 - len(mismatches) / len(check)) * 100, 1), + "n_mismatches": len(mismatches), + "first_mismatch": mismatches[0] if mismatches else None, + "n_checked": len(check), + } + + +def analyse_for_anomalies(result: dict | None) -> dict: + """ + Summarise bit-level anomalies from a decode_capture() result. + Returns {"anomalous": bool, "flags": list[str]}. + + Checks: + sync_byte_not_found — 0xB8 not found in any of 8 bit phases → + HS burst may not have started properly + sync_byte_late — 0xB8 found but at byte index > 5 → + garbage precedes sync → possible byte misalignment + unexpected_packet_type — DI data-type not in the expected set + pixel_content_mismatch — Lane 0 payload < 90 % match to static-pink pattern + """ + if result is None: + return {"anomalous": True, "flags": ["decode_failed"]} + + flags = [] + + sync_idx = result.get("sync_idx") + if sync_idx is None: + flags.append("sync_byte_not_found — HS burst may not have started") + elif sync_idx > 5: + flags.append( + f"sync_byte_late (found at byte {sync_idx}, expected ≤ 5) — " + f"possible byte misalignment" + ) + + header = result.get("header") + if header: + dt = header.get("DT", -1) + known = {DSI_DT_RGB888, 0x39, DSI_DT_HSYNC, DSI_DT_VSYNC, + 0x31, 0x11, 0x29, 0x08, 0x09, 0x19} + if dt not in known: + flags.append(f"unexpected_packet_type DT=0x{dt:02X}") + + payload = result.get("lane0_payload", []) + if len(payload) >= 12: + cc = check_pixel_content(payload) + if cc["match_pct"] is not None and cc["match_pct"] < 90.0: + mm = cc["first_mismatch"] + detail = ( + f"first diff at byte {mm[0]}: got 0x{mm[2]:02X} expected 0x{mm[1]:02X}" + if mm else "" + ) + flags.append( + f"pixel_content_mismatch " + f"({cc['match_pct']:.0f}% of {cc['n_checked']} bytes match; {detail})" + ) + + return {"anomalous": bool(flags), "flags": flags} + + # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): - parser = argparse.ArgumentParser(description="Decode DSI packet content from proto captures") + parser = argparse.ArgumentParser(description="Decode DSI packet content from proto or LP captures") parser.add_argument("--cap" , type=int, default=214, help="Capture number to decode (default: 214)") parser.add_argument("--dir" , type=str, default=str(DATA_DIR), help="Data directory") parser.add_argument("--compare", type=int, default=None, metavar="CAP_B", help="Compare --cap against CAP_B byte-by-byte") - parser.add_argument("--list" , action="store_true", help="List available proto captures") + parser.add_argument("--lp" , action="store_true", + help="Decode from LP single-ended files instead of proto differential files") + parser.add_argument("--list" , action="store_true", help="List available captures") args = parser.parse_args() data_dir = Path(args.dir) if args.list: - files = sorted(data_dir.glob("*_proto_*_dat.csv")) - caps = sorted({int(f.stem.split("_")[-2]) for f in files}) - print(f"Available proto captures: {caps}") + proto_files = sorted(data_dir.glob("*_proto_*_dat.csv")) + proto_caps = sorted({int(f.stem.split("_")[-2]) for f in proto_files}) + lp_files = sorted(data_dir.glob("*_lp_*_dat.csv")) + lp_caps = sorted({int(f.stem.split("_")[-2]) for f in lp_files}) + print(f"Available proto captures: {proto_caps}") + print(f"Available LP captures: {lp_caps}") return if args.compare is not None: - compare_captures(args.cap, args.compare, data_dir) + if args.lp: + compare_lp_captures(args.cap, args.compare, data_dir) + else: + compare_captures(args.cap, args.compare, data_dir) else: - decode_capture(args.cap, data_dir, verbose=True) + if args.lp: + result = decode_lp_capture(args.cap, data_dir, verbose=True) + else: + result = decode_capture(args.cap, data_dir, verbose=True) + anomaly = analyse_for_anomalies(result) + if anomaly["anomalous"]: + print(f"\n*** BIT-LEVEL ANOMALIES: {', '.join(anomaly['flags'])} ***") + else: + print(f"\nNo bit-level anomalies detected (sync, packet type, pixel content all OK)") if __name__ == "__main__":