From 3fbe59cee3cf189659c56532d0743bce6879cb1d Mon Sep 17 00:00:00 2001 From: koder-ns Date: Sun, 28 Jun 2026 04:29:23 +0100 Subject: [PATCH] queue back pressure --- src/queue/__init__.py | 20 +- .../__pycache__/__init__.cpython-313.pyc | Bin 0 -> 705 bytes .../__pycache__/backpressure.cpython-313.pyc | Bin 0 -> 32042 bytes src/queue/backpressure.py | 527 +++++++++++++++++- 4 files changed, 543 insertions(+), 4 deletions(-) create mode 100644 src/queue/__pycache__/__init__.cpython-313.pyc create mode 100644 src/queue/__pycache__/backpressure.cpython-313.pyc diff --git a/src/queue/__init__.py b/src/queue/__init__.py index 262e8c69..d8b99308 100644 --- a/src/queue/__init__.py +++ b/src/queue/__init__.py @@ -1,6 +1,15 @@ from __future__ import annotations -from queue.backpressure import ( +from .backpressure import ( + # Drop-tail ingestion queue pipeline + BackpressureConfig, + BackpressureQueueManager, + BackpressureSnapshot, + BoundedIngestionQueue, + IngestionPacket, + PacketPriority, + backpressure_queue_manager, + # Token-bucket rate limiter (backward-compat) TokenBucket, TokenBucketConfig, TokenBucketController, @@ -9,6 +18,15 @@ ) __all__ = [ + # Drop-tail ingestion queue pipeline + "PacketPriority", + "IngestionPacket", + "BackpressureConfig", + "BackpressureSnapshot", + "BoundedIngestionQueue", + "BackpressureQueueManager", + "backpressure_queue_manager", + # Token-bucket rate limiter (backward-compat) "TokenBucketConfig", "TokenBucketSnapshot", "TokenBucket", diff --git a/src/queue/__pycache__/__init__.cpython-313.pyc b/src/queue/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f97e145c993d81f970eeb807ce8a6bce1ce4e123 GIT binary patch literal 705 zcmY+B&yLeD5XSAKZQ3SjmOs@ZPOIhA18_$OP~n1CpsoV9NLIFWOK8?EV@GI@JOkp! z3-C&oIC0t=qKCcWal1?K;g^|-{e5H4Hcf}bt>Wv8kKaQ=eqnOGf?YtjGZ%a#Imzh> zVHD^Q$%8yR30Mdr>p+Kfq04&EV-ZBG4}CU(0UN@Q#SpUu5|%>BMld=ek4fG+rI4+< zlinrDPpFMcDU~TrrKG;3SWE2fX*pkSKxo~7c&X&FTG@N;+G!&i@urmJN`SrF{*%jc zqdzEP$5YkFg;>0nE1^-{pRw6ZK64(z*wOAggGxbVF6{lYmN)-|M*Mv56u(nzA*YQ$ zc&nA{dmR5?GN9_(HF&30zUlAKpcX&#VtlUTe)l8+oCypNT||WFBYKD!ks>lgf*2tV z9Ntb2?2z+iW8BMe4sNK#ChaTjG`-S4VQ=Pby}H-*7GBujC3Cmi2eu&;eE(#6v!oOz0c_k~u`J>H+?LR$Ni>4OO;Lz#e h`1J#a-yx;+Cpo_AWHh)U*Vtn8=QxXlFA~jM{@*2GO!aRkvC~mpY*g+#H?C3 zr>Eb4@62ET4k+14b52Lp#o*pMckcb~fB*NrxL;c87I00yy7$FbrUcJZM zA6o?B+d@E)gn%VreZq3sDp^Ikw>@E#Z0yr6+1aNrP9M?Qkf`>+s0b7S7txkh4J!jcrRU2MZM521d1AkfNR{n&8Fs7 zju)}~V&uEGS$Xa@{vk+JtgHlO9<5BOR-Y(EZkd*=KT)HWdQn=wqO?{mtw3qziqbmv zwkniWYh_aXczwH|x85*bgfY|vYIj&z-8$5**XruE8uMy3pjP9GT5DJfYf#ptl}Sx3 zuNiqQdS3H*v!Mqq8-zgXQx>Up#^w{}txiGs{Cs#mykR0Vd3sI?%ksPw_RlSdKRNfR zxX+L$&L<+#NFoxJ#p$>t2I5kDA`}zHp4=}+Vl!bm5sAmdxyW2N8i|EvzuSH6Y+Rg- z&(1}|v*B1GBrS+@Qe+k_oIw*IDU7f2Gm)wARKMH3$uIWw9E+b0$NDDbCr^hHq7+Jm z#VFMeOFcbe_sNs2cpoc0c~aaU=H%>;#}ZOJ8VyS)Px`3uZc)VBS*9!|PKCuDYOzNQ zCB(C*B9o`Y$q?0+#o5pTRg~vv!^oQpO-2$6=>2Bu(U26M>q~?pQNs`!w0Mv9#ph$_ z|FAavrwEYLH+uQWlSb&VqLEN6G^6)W4C(Ldo8T?=5{Tjwde9d-OCUNpeDElGA4*V* zq4`97Hk61^&lg1bRCr1jWARuY<_rLigrZ{4sfe7w^Qa*vBn%}sBhH2s$mkI3!-IPlv;E)B-_|rjDjCirJ$tXk+qJC>9Gx zWpOeZ4oLu67Fvl@K%`Ug=#)4Yk47dJ+=ShOx#Yxnx2nFef?+yST`4!2wMRpLi)b= z^t3n?jsg!NGN2NPy%?TMgr@*0b?EV_`AL9D2C|BBT%4bi69Cz)jzY0eVqRjC0w6Qs z&dT8^(A>|umxYUI_ZZZ*?#0lF7*-9zozEE1H7+xlGsK_{$^h7tY&xG*(ZxS<;Mmyk ze&AlW2?|Vw5+NTw7MYz5Perf}fIm|ac`_scebHjiL>Mr}>WC!7M0}nh1I;?abwA;9 z;Mm~k(BRk*-VlmSA!Gm8@Uh|jgGZ1-IMK&&0!?XXMr$t;n+G)F;zU>sO-_dA08}(~ z3_ZjwgwFtJX;LTVB{?C+r$w5@GodIw6DFdOkQUJ3v=pBe!)GFsXpZCJbR-svO%lG- zz}WH-Y4){Q;fondJ2aV?XVVf3o=z5rNAP0J^$+$FyG2||%4UFXVO@|X8P8>Kp#VH`` z979FS*x6Iz7{g;~QlD@vVl4^{Z4%SwlFS87RV3QfCch`6|#a2a)MoQ1{_jRz$v+AihM4mEElss-iSvRe@jDj zF$&Ot<`4@4K4@%*(>6@;k@#7RED}qEL1@L@y8)M(Q@ER#X2P+_1%`fM23AhjIek)z zX%?z(4UIYIu&B@qV5m0i<8whvL?xkYrHC;QLeN8gqr6w2z zg#m*Ih7ut96Z46%91Kccy!t-=3(Ee<{_*27xE@Q#hr;sdM0{>s#(Il}q-k1!L=nTW zsc~7F9A|?XryLzs<`$$9w8Rpc8Mz+GSA~04yTf(YBa~O1JCyO3BPlKW`GZn}Ry7AJ z?8wDsxqlF8E>?62952R!7wb?OaNy2KcjHb?_S<0rwiF}Z&1#eYZ#=$I#XGE_14p2b zcj2j!55OM;<&Mon!$5n`HxQb$ap^RtZBq=948%jnDd5te4nzZ;aI4f~p&ztK6jixH z7Ei^`GH$^UVU}}x5iD{l3f9F~C__?p?d1T~P0Nh8Hz-X+z`03S3N1wAp((MOQC9Hi zNl=a6ENKLF0T%U&pcj6h-<^oe0zpHwbG)77vB*necrFfFG8>6TBQhS3L0(`q#L%0w zA>b>R8Ifi;n^nc7jZi8A1Qk1@0l3pILS-Ax-)7u=TR5Q?j8RG6m6bdLmOkhbor2Fg z`tg6o^Pn$w>a|il8wb^i$0CyqRXNr)q8-JxD;k=em3gA0>qUdEJ(pD320CL78aZ*Mxg^H&#f!;B8LXn=ff9QRz$77HVLsNZnXd39N5lc-3<{~d&MTLm`u%=Xi zl}$ykG7{j9B(25gXnCJJIZ2uT1O~7%|5O-_aj^w#lO)p+nD2}qD|ZS)Ks<&ru*T4t zcw|Zpi6FozC5>Pjk}H&jNa7S*zH(wJ8ks;7tZ#UTlWwt_Rugy&qyT!57zYWcykEdK z@D@n&oCFYc&$iHu)kvDi+4wvpWmE$lGQ0}X8VRac+e}8Do1Z|37zdJo%3-W)Ns5Er z(EE=q%!LD!Qe-ZH(GSWCu}O6fo>EnkNDS}IAvy^eV}LX}5yt>Yw1sLg&0oI&h5(uC zB}Vmm7CO$=DJ&wtd(ybjiJSvTE%$%G5cIeuVCl#D34u?XPz3-Ga;v7shb_;RN3;1h zmF81qn?d$k+O%W});TeeND2WZ=+)eG40tyudu zBlsMnQYpSJ*35=p3d)g1z?nonh9-x^}ii6rSN{Imj zr449Lu?43-51ON)pHi&jpJG1($x(3xi9#zC!TA^j+z82O!9++tEkn?+vm<+jpnVX%5671+cFJp zFPHtPtLw*&t-rEv$^NqIc1=sVru&1M?zbD>Y59Ij%C{p`(~kmormcO+e#_Ous8$X> zz@otMVJ~L++rn{S3>o>CRS<;}Gy-;^jy~I3o*-oBYmC~=yZV0@I0noBAp^z*ESkAP z6lSdE?TzX4oNem3MSBbIPuOi4({U|ZCt&LXO32ozY6=qyjA@JzC-2h8qiEJodz;WJ zbgNHH0b0{o1+9P##$efPYZay~KF8wMj)91PLWN_TD)w;Mw1<(V_&jivG!G(aZueLi znrMvl!($SodJ(>)`6D86drcGTMWutGs7$m`t;O4i5*&&Wm0TcWiAJCO?$Lv6WgPPr zN!!t9#TE_6q%F9UHqlpS2%1W4N_w1q0Yi^WFZis|4)&G5Una7lVXS*@J`s$?&xRri zMKs6RY>inwC`XW-6Ch=lRbF`R8_(VLR$knC`SDASf8%Gb+TSdDqb%k1XX=_BIE1qL zrRVNCh0=-(ht40maP<7qD+h0SIx?Q}w5R<8Py5wS(zEWCryUs=4xc}q_B1Cw%{M)* znX0-6rFi(lbHDW5Co;(auXOG6+WyU83DWny?tRTgqmw$$qFBfD7zPGRw@yGBcvNxd zxU~;W90lgaK!eQipREy+!Ptd5I@Qwha@L3m=CU>N3&2;~#6C(NwQqfBD`r1#4_N#A z3$C&Py*66~Xvx0AA__+v%|~kXOm^j*`1`{E^D{zlFooe5DM)$O3c5hDVcr#RsB}S} zmja_BK{D^kj?RRBIUj*86oo7x7zgrbLa$B?atskLxP;EYj2W7UhCwBs43X*p!>Z8$ zqN==HeK-c0LZum5%EOfmP#j)oB=k~bc79gWR0~w3#mvk_qwyHg75WaV6RZuy(#Y(D zQk6zA^=SnOBB7jU8iWrBb~5-b8KzWw$^?`cXgS8Qj53aV5ORKz`lw2z+)_J0E_F~s z<$-Swq^dfu&U{eS_oL1wR|eFJh+JjO z#i`4uE}i;T-_@;e4!kjts@#;RT9d3=ml3<)-ulkK_XkqzcP7Q17l$)-;?==#eJRu0 z`JjZ8K#x$z-L>o$s%mrFzg4;EZuw^-oeeta zB#GqvgU+F~w)b`n*5Tuam4hYt_z!E{LtBd`fw%gllhD6G)@s56mKGLq{-rZdb3wpj z{?wSx$g}Nf3o=H}WV7|VF`WqMS-Mw86pmYDShaG+kWS$dma%p5zz``YWc0+Ei-liG zXbX=tj~w={FCB1hb%B9#hiB*Duo(5(IhqrOD@Aj0xLu|biktJ{AgrJoFScVH&gOFB znef9nakj>nl}*-L`Anki6IJ9(=6GT&DaxAP>;Q@~JqjMd!2yBtzn~n06B> zQ%5if+{4PasP$8EQQL8tI#=PMgl={WKkqV?Rc!&=s$A4WzSYqBq2nVCNnqx*v${xZ*GPRADPhCFsFuu|8$b93jSEySAlem!PGqyo&t5R4zRUmjTAs|^dSFQ zog)Mw@&@b%rF1oA&#o(b`vQ>}a!iuXS0ra8JYVs7nf!H}O~9qf?Z~H~%T0t;j}ZCR z;DyN`C#^LUUi;E9vcet0Lv?xm_#XXd6n^9=K6{ps-e^a*7AV zx?DllpH~J;2HR}!wYdj36u_26)LIQ&9@Xsd83~kiaozrTNrH1zgqLgrz9*Q~RQ(sC zFk|zK;rmCg+44oxrwMWfpxem;r8Jtuo67zpJwSmoI;d|$qDx$U>C#Kz+Wo+$FFI0D zUa9G|z3uKV0Jd|e@ectuEIW`o01u%;0U^5pRuzW?z5uRSRTBFgIwR!+!xfQ(Oxh`A z9i08h&#`NPJgiqTo1TpdhSb>%~*euM2D_wE9adG#L%GUmLs|LxBVEsMT-bCsJF zkShXYD8fK#Hpe8j_>T#0>k7eb=*`hLMw3;W4X`7D6N7Ucqua2Tc%%YD)X z3ZYe}AWfjyXJHnk{9a3--a?_N)=Z%vppHy*0r>B&y6~m*U%KIHW;RKqK+M}G5{F#Y ztPP%A#VIj!nPMkl;F_K5edK_VP9ah36Y+SI_t0r5BX0hQSMztDVBVzX*%Q)f6kW5j z`%fD58pFAc$>;GVCJ{unH%YPVa}rs()Hb9U+(T}s-mtcP6$O1%(oU+hkCMZbJV{A_ zl4mF(nNGKqwIeDS%klvrTUFm+<}Qe{fWZzKOK5_0nvJ6y}PMUIZ;MvtTZZi}$r z@;S?0Tce{sQ`^NqH|}AdcZy5@)`9%xy_O=>tQLCwcWqUuSGSgZ-YIVXTL-=`_c$G$ z%k?%#+p^p0Si9`9I=0+(3C^L1-ZAPA4M4wuV{}>W6O3}bW126SVhqL+ zV?c-vV~Z1Bup)eSso#qw7yHh<$tW+8irIIMT3!lYSQ+Yj>3g7DDw(P9RVtPH3|?`V z@~0y+AKylW#U01yV`OFl$=8f5%*Wi%!YKL+2Zi~iHayR!T=5WkYnIoU|MW+6{kJ_* zH!3J)6cZBEBdlho+4W!cC{;52_u%+vV}?>Zf$L}XDwWw%7V8w8m5(u;Qp&=t!$CD% zMD>Px3S-(Cv?%pZa+Z>pC|RInkrEQ`@*ScH%APccms}@5SH=BC5 z%mU5GX$iMB4-E>cqJ>gR6AYgbLjahmcxa5QG=|QEC3qx*s)UH5Ajy9~0ClR13Qskg zWwROmA_gc?LL-%ZNN7fP-KoTs)ns^D&hBgKQ}+5yQ!C~wBX)gg@8Z)%&*n^55i)RD z?*AOphfJ3hF+=*4QLOB^E8s+22(4Uo(pUs-alp;~mFysoHa)>sb)bU#uBDQ354_`L zIezTYfLAJ;Dfd+&P-le2PRbD%*csRTX3#n593@|&WGxcKj_`_vQk4f^836ux3griV zcBM!SLQw2v2UYA+curD^C^{CtVZ{E73_cqS430c`zGI8Z) zd{u0)RB4te>#Uq(m#XD4FZqb8v}*I>H?L5Rx66mnI>bMhC+!lGE-~#|mvpUr%X-tb z{&q>(QX=hLm-Mbnm2|(gHNAdUa{aDU$u6dfbS6EWX-{|3)BRS(EzkP9Ho>!-VQh}s z(vH^Iybu}}nipELbdhIo#*(9U@PeFFgW03bRv#2w7EJS3S~ETy1F>R5EHgKE6Xi3g z6A~>gr9%f_VaiZLASAyv6RQ9${AH?YnaeF*wLV$3KIQFwTfV+Ez2%AImM2mhjwHQD zZn%!{wN3sM5=}4QlQAtW!5AeQf{o8>N_fuEQ;hI@h3-gqmwu5FhChTg(nU&OtrFOX zOj*t^P$?y}c4d-*z9!sR({%3W#Q+81w0GRFH(uGFvUmJg?7BV$13K}W>Xd!wNA=BD zC$Br+J@BJ#$@-@_e|l=!ZgceBbu(ea<`}W4atYn?A=8^C@n65xHEFs3OUTM4XDn=| z97KqP>mV%00V3pJfi+G9=u{iRb!xP59faGkARUmnIz2?EP7l$k*F$t_wGf>^gG%Je z0*#!+NnVh)a)kY_QRnSontFh^kHh^p1<`e{$3&y0!SgN7@JR&B9P-|%(@wF1g z;D8nJa7tY)Q3JH$Y(kJBOd6;~(LBao1FG3b0UdL9XXdin{D)G0O27Ec2%2_B$7 z!FNGWI2Qvq2|#i}iy@an)6)o;Vsf{Lz})CUFKdW<6b(_`>P80LNJAShAe%`M`Ly4? zh4z6Qg$PB_u{@p=uT!C%7dL?jDT+*E;q|0$Q;3BYu6yz%ycF-vbj7wt|FXd@yn zi}t5bZ&X>2MS@XehDfnxNs4l>42UV+Zx9Y># zy~)3k*92POB{LCZ9?NcvF5$RI*2_lw zX}%%E)`6YyD@U%eYPs1@Gw+)w6Xiq$>b@w$TfndQb)rnW8!OlDvY**lz}%=mZsc(@ z6&0cO2*iUv&x{{T`%;{U%?FQU@&wcZO;g^AmZis#D5Y9nkYcf5zS%*!PTr7FsAXmv zrOk-KM%>oJ^bV8MvKNg&hOKK(*KNDmwk6fJ4Uu=A+qLW88hYo*wIk`?1IgY4soI0c zExBDU{)!tnrMH{czW%~@UPv|fQG8w_mtBqUCRj1q^@d%QUG+IaiF#?Ujai=+%^dG$ zK1&vDgs>^-5vnq$wML)E^$1W=DA1A&E-Y7 ze{@V3tIC#YcLB?BYrsmO=~-JfMqq@PT6L^idnWs-t$+{!hNo(?CEA_pTP7ts??SX8 zSx56OJ=#$BpRRBvIc6Mq7j&}xYpgXthajh|BE7Ge?-}PqAu6tHi$V0)9tzPS?TsSA z#H^nkE89lgusz_~VMR2YP3!5@bk4iFRz4Ld&Ux3Toi!}pf2>(MZtFw@UZ)Wo&Bo(4 zpO>+KZWhxw zDl?JFxFIZGgCTiU-GMqKGR;H{Nu%~SCv`(4%iS``_N5N;-KQr{W-YIJ;}E^GibX&A zMO#kAT$KRg!33lY!}e5lLu#&4L3>Q~SLG@v`EOad`xEcTZhF(DQ`+IlT_Y$}V}t>y zZ1BXV4KBu1AoSR+im_rhjZ527M!Pg4XJ`Wwb~?wVzAziyhiOCY`-$!4X2K#@P=5w^fkPUd%+JLTO7%IEn~}kyQQcJ@miZ&phiIn-p_(W z76E%5|KjGnCV02vszPEIbr&=vCY<_x)k;Y) z3n1w`G|ZQfDCI%YhZll6Msf>&3soSoN~)t;&S-oFBDhk7E$9^R$nE<7gvx)Fl5><$ z92j>u)zIBxB#I-<_CeTbn+wc8Bm;53*_Wx*rODSa5nbi?{o!}?5p)9areNeyc-HK#=|E>CM@Q*h3epJ^) zuPnc?^Zd?qNn5g{?dsZei7#2=L*yHkR9@I~e$TJQ(kKoM81$m%ni73+cKI$+`{KojLg(oIjE&AIhLr?| zIjY>OS$C(Z{u8+qc<}ya@z7S=@0E(f4%?61;&7eqM_b*))}r5cxQDBYe!tE=+*KeH za2$y6=(`VN1w#K!MO90{vV+3%Z4;nJ5D;AGXx?SEQA874LWlr+)F44@1Ay`bp$`Hm z3#HRV+UyHe5z<7&9iv?edDn4!RwN2Qv_;I5Di%Q;Q-zR6VYWW|;=daU&w|vjE&nP* zAhu_WQf1roDU%T{a0&C^a#cU&%4k5{j0!qPQP|8fs0Q0TBzl~w%vnB^4dSC7sPE9i z_^_Y#06;U!{Hoa9*^Vzf9-PrG<{M|Ek-06ihRzfqk`7f>AQXjdLRAK$_v) z$q(U!kB7lv6*{KYxI%R7t9Z>hVYyPMsJUGB&9YRvxa7#-e$Ayd=?Y)6!gqb$jSAn5 zivA^MrlcZS(tNe^&E7Y9)9t(Qce}CWb^mw#sm7kSjwc&8Uk@i6b}k*p(AXR-;1SL*?GrXvsAitW~mf`x|N+bD>_)lCoF#Ay{%3Anr!bk zx%aKljpWeP#TP!^dMgyzYN8rul}&`${=Z*w7PjJ;%Ez|e{QXy)&9LG?JB*vrj$0x2 zU>lz2V+_p@)eQeD zqln}rTM>jsnTSU&B$?^v!r!D>CIboEp;ugbq2zo?+SQzNHQ#czu5_U4;3o6gX8|`{ zpO(c9g(EGAv5@ULC*o2YDKY2#kt*dBUd=$IMqVhsg2T?Q`tND4PRy3sG?t5$efk0;xi~PABfQ+{*ZwftNxG%Qi)b!cSJlL z6a2c9-tHT&?vly^ z2_HNriD)`k2ERggKS$*yQdW?n3`04iF>K{g&_ou06^BHAB~|3YKH@VokggCi!vmZY zCl73bTFT;~^2IKfHiT8I>RJ`j9c9=rX_=N*q)S^ax;fdf z`F35?m8ai~z7b8Y-IK&$^PV46r0Vu(nmRHazT4}1-+KD`Q>k^^@0AvnyYC7`#ctYu zA>Jx&hK|O<_Vgt^eQD2@q-V=b&o+iwK*$fX(_b{$Yd>1<-Z+x(+?~XqclQmK z8nLD0IiAv2%_nFF$Cwt{=4pIJ1rgf~ZzV4g+fan(XKc@wkEx}`4>PY3Jq;xvt0QL1 z`alE3Ka7v6_h!>@oEZnO+Z=6ZbYI<_|0+COKhI1QW^I}9LES{Ll|6;NY1%9DYcLzF~hBGlXmuXR#pY5FXvc znhnN=IUbU^u}F)=MhPC;w2=;~%o=X}?!V%Lzq^M8pN>l3!gr?GF{}0B!93e=Kn-uA z5MYC^17R{<{!P==weD=|{i@*{#FFgbK99jpY=cN>2xVC%z2#}WTO)Yt{;Fo(t(tY5+rRG> z_xIX<*z4ZkUj)+02qrVOGuSgqI)PE6=yiN7HtCy7j0}ng!>D^vDB4nb5efG4{UP1Q zDVe2Yj*`zKfj$It3lHR$ewi}t5|iqPOk{&LAG+=28dd+7l>9m+Op*L8x}%+d()*PB z9wpzUgl60{Wb+CYP|`+|ycajL<7BPF_0uOUc87ntdce`}(?eFTqjtI7g{0c)5SQz% zj`n4j#o=3amO0$ZqQmjna;?=d@W55$5ShxtnmmR>o19ppi-;yf!}_I?8q0mI_8U*A zr2Qk~IHPhMDziKFL_&mXQa(GCFi<2pXIwbcvBGdVF`tHr2dso|*>IDFm7vp&Wu-*I zSs2l1kWM3J^P+gw92U_CR=R3beh)e${g{$^N=P5!gKVU`54D8e&5mY7&?^tauc%Gg zYctIv0$`7SXjdUa_07RCfRfy()dJukP-r_R?CQ=5m*ki!#^G312B2tLBO+BgSjtp| zLj{eL#yb_a1|6KydYUpk*qjExX*`jDIYfnx(=>3H!Nv?Me}Z--O6c7(K?-20Zb;c1 zG8{NOt*rgfUdy3LmCXS|!ecIASU7K~ehOcib_!p>ixc@+STCKthn+>aXY#Rg_)y{+ zFGkQW3+~m#W$aa`R)c&?0%dBDuP0Ed2Kkl-JlLV&^wlUO1^`FaU~nqwKn#CU%Z6mj1{QJEvF^=*?+&Cowk+9ix*BjM+T|C%`9i9uYsvnK zd!<=~hH9ft8$F<7pTa|hgYnEHb1r@&WJ(7%+E*_#u(OaUkl+GOxV2}qpUn3}2US@5 zNub;e@0A{W1Tod@y(WSnYMK~CAiUy5nlK3D(I*wCMsvfi>^2djpN7pTn{h_*GN`K9 zp!_jP!4LrWPF`lkhJ(bEBGoselyc<98VH7_rt%EfMcQI8QYXKf*-2<{7A>9=ZsU}( z*wB{F* zUh#HoJ2^^^Tso4jUZ1R9|JJd0#;=XPJCf`hzHhUNl^5-=df>Wk>%8S@zFR7Iws2(9 z!Gewu@EJ1Qz)c}_PnFqGFaOd=ib!rudwwa&+zee>aYv`$nbuH`L(qE#QkJ*3d&Xjt zx0;5pJ3K~@9mFnpwp#xhz2lRrg-`NnG_GKUxgQW#Ns+QjopB|T8C)&cK(olzadrPU zDcz9HULIA&x+`Z=-Y)WzdA{kn>P(ie{kDhBuc4y`aC98J{SE5rH2O(!+Xw4x@0D68 zt#c1{<(&jax(}PBdbZ?0<4JJP5L;-TbXu96uL4ng7PzRv&rawS%_z2krrOwSqmMJj zQ*2BYI8N0p)*7WVGEHrzCOTrkZq!nW`X7{gkdi}4@|Lw%o%>*13X<(#Sz| z*uw4jSDk#r7?336txI}4-nPF}_WiO{_v7y#N_7w6*umR$nBdnBzbaqZra5W{lI;UO zs7kgErRomc^c=ibB6zD-atnQdFQGsG@C#i2Ob0PAb}RjNg7wn`Yha9a4oG=OJ4sHH zF`A5!&M)0Zpzgrq%rr(vnq}7_4vs=h4Ldc8p94iG_e#?i+i$uDExATdDq#!k5MN-X z#*;@OgIN>I!;TfqdT}2e{U6rmqqjuMD5nQH-vlJo3kE4b7cKxgOev2fujUk((a0|j zo&H_4#q_h(3O&QX>)GdcR-Qqf10WM#2D9AO%tk|Sqn7WhMi4yW)m z5ZRD;;-#q)UU_WLV|&lz9&9WS!1Wubv>MEf=R{-S*ik+2f`+2QEnCEd*b0(u%u0>=%V&qR zoi7UpYX)YfljJKLO6mL@w%LT$9VJSGky7RqJwwpmjxiXZ-S+YkO%}I}2*5P|TS^hLJbet7B*vu+6#OX*hsf?|L zYltB4NjCJnwI$WC;i5BB)o^Lxc60Yz)oI`Eq;GesdEnx~jBf+QSYPc(RS}6ig z%4^jK&JuiowMEV0K7@ZhXdT>vBX`~VTt$;AJz~(Md;GWD--jE9HtIC6QZ>nY zwW-TSAbAO}M7MG91i@4)r3_*bvJs%Hiqd8dMT6FUhCtYYrvXCJyV{dp0w0|{u_fu< za{XY+yYq%?CtnqI)YC4y|9cMa*$&M@l{Y-%NSgks505FKx%#I>6_qaYLFQ?n3p7v> z^%-R-ug)3j_M~_FyUvvN&<)q2l>|Va$-Q{|v(2Pt5y~6r`)E)leQX|c=p7X*1?Zh7 z*?);f)r5*Gr*C7@yD{b6e8aVwkBCO0UHXWIkn`C_WZYS*S&FR^!%mVLa;yxFc6>BrXTUwcKCg6KZOWSPhg_!qb+gt+H(+{1X5^z&ej{%vC;HX=eP> zCLm}XspAtM;|FtMmQFk^0$_AMt-45Tobx%@Dc?+jR`Fl4oK`Gna-getw8cvYge&dl z=x?m^O9bsL9NN7b-x;|!lJ4D;?A>#-ckrfn-v_;eZwy@>ylA=Hf2sdlyT0j7_6~mF z-FL&a?|-Gd`xwFewWa|lI>h8ICV?>|m_Uo5vUF6>;u_txo!cEOL4#)-wTT?#tfPd) zHIO@KLrPIYPR@urP;O9Ibah4&P~dZ!Rx;6Q)Hg}2Io1eMZHzB5(W%s@7EATfprgkA zb2VwPg?>Hp1j>3v!*By?ROa(O{H7L-3)-ylN69QSG^RW0JeV;ct}&4fnyXE1bXQN$ zdMRn3q>&OE6;MJlzx;J%k=nHD+%U|cy9b$`)RSr1LE2G|svPw!*TU)k(`t)j7Xl!M zEhDI9a|~N>ER=d9;si*R^Vk7?!=9v^Ww!;VIy!BR?P~4q8F3TU*^VmMXXe=cQ@7L6 z@u1w|*smU_M4dxS3bY6QQ)=vN26o~cKbPSO3Y5kPLtMHVyR9B%!G(O9D(Ma-)Sgn8 zASb-4-;h$S{${XG2@EOQjeB;}N#AlF#Y)s3beSUizA#iAL(72#bUWH-_QA%Vv021>0!xRAgqFur&D(DXjrS_?%*D>6rS5^%W@)=CAfdAMc1zt|0f}0Qy9Z@^EtXB_G13PIt(z?SE%yYLp0WsQHl`Xk-4N>j#_lb# Kex-=P_WuGV1M6)7 literal 0 HcmV?d00001 diff --git a/src/queue/backpressure.py b/src/queue/backpressure.py index 7f5d6e66..74a27f4b 100644 --- a/src/queue/backpressure.py +++ b/src/queue/backpressure.py @@ -1,14 +1,526 @@ +"""queue/backpressure.py — Backpressure utilities for Soroban RPC ingestion pipelines. + +Two complementary primitives are provided: + +1. **Token-bucket rate limiter** (``TokenBucket`` / ``TokenBucketController``) — + controls the *rate* at which callers may consume capacity. + +2. **Drop-tail ingestion queue** (``BoundedIngestionQueue`` / + ``BackpressureQueueManager``) — a capacity-bounded, priority-aware FIFO that + automatically sheds non-essential *historical tracing metric* packets once the + buffer reaches 90 % capacity, keeping the primary live price channels clear. + +Drop-tail threshold policy +-------------------------- +The ``BackpressureQueueManager`` enforces a two-stage policy: + +* **Slow-down stage** (default ≥ 70 % full): a proportional back-off delay is + injected on the producer side so upstream callers naturally pace themselves. +* **Drop-tail stage** (default ≥ 90 % full): any incoming packet whose priority + is ``PacketPriority.METRIC`` (historical tracing data) is immediately discarded + *before* it touches the queue. ``STANDARD`` and ``CRITICAL`` live-price + packets continue to be accepted. This prevents a burst of low-value telemetry + from evicting the financial data that downstream consumers actually need. + +Packet priorities +----------------- +``CRITICAL`` — live price data that must not be lost; blocks until space frees. +``STANDARD`` — ordinary live price / rate-fetch events; dropped only when full. +``METRIC`` — historical tracing metrics; first to be shed under pressure. +""" + from __future__ import annotations import logging import threading import time -from dataclasses import dataclass -from typing import Dict, Optional - +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Dict, Optional logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Packet model +# --------------------------------------------------------------------------- + + +class PacketPriority(IntEnum): + """Priority levels for ingestion packets. + + Lower integer == higher urgency. The drop-tail policy sheds ``METRIC`` + packets first (highest integer) when the buffer queue nears capacity. + """ + + CRITICAL = 0 # Live price data — blocks until accepted, never auto-dropped + STANDARD = 1 # Ordinary live price / rate-fetch events + METRIC = 2 # Historical tracing metrics — dropped early under pressure + + +@dataclass +class IngestionPacket: + """A single unit of work queued for downstream ingestion. + + Attributes + ---------- + priority: + Controls how the queue manager handles the packet under backpressure. + data: + Arbitrary payload (price record, telemetry frame, etc.). + timestamp: + Unix epoch milliseconds when the packet was created. + """ + + priority: PacketPriority + data: Any + timestamp: float = field(default_factory=lambda: time.monotonic() * 1_000) + + +# --------------------------------------------------------------------------- +# Bounded ingestion queue +# --------------------------------------------------------------------------- + + +class BoundedIngestionQueue: + """Thread-safe, capacity-bounded FIFO queue for ingestion packets. + + Implemented directly on top of ``collections.deque`` and + ``threading.Condition`` to avoid a naming conflict with the stdlib + ``queue`` module (the enclosing package directory shares the name + ``queue``, which would shadow the stdlib import). + + The public interface mirrors the TypeScript ``AsyncBoundedQueue`` in + ``backpressure.ts`` so both sides of the system express the same contract. + """ + + def __init__(self, max_size: int) -> None: + if max_size < 1: + raise ValueError("max_size must be >= 1") + self._max_size = max_size + from collections import deque + self._dq: deque = deque() + self._cond = threading.Condition(threading.Lock()) + self._unfinished_tasks = 0 + + # ------------------------------------------------------------------ + # Producer side + # ------------------------------------------------------------------ + + def put_nowait(self, packet: IngestionPacket) -> bool: + """Try to enqueue *packet* without blocking. + + Returns ``True`` if the packet was accepted, ``False`` if the queue is + already at capacity. + """ + with self._cond: + if len(self._dq) >= self._max_size: + return False + self._dq.append(packet) + self._unfinished_tasks += 1 + self._cond.notify() + return True + + def put_blocking( + self, packet: IngestionPacket, timeout: Optional[float] = None + ) -> bool: + """Enqueue *packet*, blocking until space is available. + + Parameters + ---------- + packet: + The packet to enqueue. + timeout: + Maximum seconds to wait. ``None`` waits indefinitely. Returns + ``False`` if the timeout expires before space becomes available. + """ + deadline = time.monotonic() + timeout if timeout is not None else None + with self._cond: + while len(self._dq) >= self._max_size: + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0: + return False + self._cond.wait(timeout=remaining) + else: + self._cond.wait() + self._dq.append(packet) + self._unfinished_tasks += 1 + self._cond.notify() + return True + + # ------------------------------------------------------------------ + # Consumer side + # ------------------------------------------------------------------ + + def get_nowait(self) -> Optional[IngestionPacket]: + """Dequeue the next packet without blocking. Returns ``None`` if empty.""" + with self._cond: + if not self._dq: + return None + item = self._dq.popleft() + self._cond.notify_all() + return item + + def get_blocking( + self, timeout: Optional[float] = None + ) -> Optional[IngestionPacket]: + """Dequeue the next packet, blocking until one is available. + + Returns ``None`` if *timeout* expires before an item arrives. + """ + deadline = time.monotonic() + timeout if timeout is not None else None + with self._cond: + while not self._dq: + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0: + return None + self._cond.wait(timeout=remaining) + else: + self._cond.wait() + item = self._dq.popleft() + self._cond.notify_all() + return item + + def task_done(self) -> None: + """Signal that a previously dequeued packet has been fully processed.""" + with self._cond: + if self._unfinished_tasks <= 0: + raise ValueError("task_done() called too many times") + self._unfinished_tasks -= 1 + self._cond.notify_all() + + # ------------------------------------------------------------------ + # Introspection + # ------------------------------------------------------------------ + + def size(self) -> int: + """Current number of packets in the queue.""" + with self._cond: + return len(self._dq) + + def is_empty(self) -> bool: + with self._cond: + return len(self._dq) == 0 + + def is_full(self) -> bool: + with self._cond: + return len(self._dq) >= self._max_size + + @property + def max_size(self) -> int: + return self._max_size + + +# --------------------------------------------------------------------------- +# Metrics / configuration +# --------------------------------------------------------------------------- + + +@dataclass +class BackpressureConfig: + """Tunable parameters for :class:`BackpressureQueueManager`.""" + + max_capacity: int = 1_000 + """Hard upper bound on the number of queued packets.""" + + drop_threshold: float = 0.90 + """Saturation level (0–1) at which METRIC packets are dropped immediately.""" + + slow_down_threshold: float = 0.70 + """Saturation level (0–1) at which producers experience a back-off delay.""" + + slow_down_delay_ms: float = 100.0 + """Maximum back-off delay (milliseconds) injected at 100 % saturation. + + The actual delay is proportional to how far above ``slow_down_threshold`` + the queue currently sits:: + + delay = slow_down_delay_ms * (sat - slow_down_threshold) + / (1 - slow_down_threshold) + """ + + enable_metrics: bool = True + """Whether to maintain internal drop / slow-down counters.""" + + +@dataclass +class BackpressureSnapshot: + """Immutable view of queue health at a point in time.""" + + queue_length: int + max_capacity: int + saturation: float + dropped_packets: int + slowed_down_ingestions: int + average_processing_time_ms: float + + +# --------------------------------------------------------------------------- +# Mutable internal counters (always accessed under _metrics_lock) +# --------------------------------------------------------------------------- + + +@dataclass +class _Metrics: + dropped_packets: int = 0 + slowed_down_ingestions: int = 0 + processing_times_ms: list = field(default_factory=list) + + _MAX_SAMPLES: int = field(default=100, init=False, repr=False, compare=False) + + def record_processing_time(self, ms: float) -> None: + self.processing_times_ms.append(ms) + if len(self.processing_times_ms) > self._MAX_SAMPLES: + self.processing_times_ms.pop(0) + + @property + def average_processing_time_ms(self) -> float: + if not self.processing_times_ms: + return 0.0 + return sum(self.processing_times_ms) / len(self.processing_times_ms) + + +# --------------------------------------------------------------------------- +# BackpressureQueueManager — the main public API for Python services +# --------------------------------------------------------------------------- + + +class BackpressureQueueManager: + """Queue ingestion pipeline with a structured drop-tail threshold policy. + + Designed for Soroban RPC network-facing services where sudden latency spikes + cause the internal buffer to swell. The manager applies the following rules + in order: + + 1. **Slow-down** — when saturation ≥ ``drop_threshold - 0.2`` (default 70%) + the *calling thread* sleeps for a proportional delay so upstream producers + naturally pace themselves. + 2. **Drop-tail** — when saturation ≥ ``drop_threshold`` (default 90%) any + ``PacketPriority.METRIC`` packet (historical tracing data) is discarded + immediately. This is the *structured drop-tail threshold policy*: the + buffer is protected by shedding the lowest-value traffic class early, + before live price channel packets are at risk. + 3. **Overflow handling** — if the queue is completely full, ``CRITICAL`` + packets block until space is available; all others are dropped. + + Thread safety + ------------- + All public methods are safe to call from multiple threads simultaneously. + The underlying :class:`BoundedIngestionQueue` inherits the thread-safety + guarantees of :class:`queue.Queue`. + + Usage:: + + manager = BackpressureQueueManager(BackpressureConfig(max_capacity=500)) + + # Producer (e.g., Soroban RPC poll loop) + accepted = manager.enqueue(IngestionPacket( + priority=PacketPriority.STANDARD, + data=price_record, + )) + + # Consumer (e.g., DB sink worker) + packet = manager.dequeue(timeout=1.0) + if packet: + process(packet.data) + manager.task_done() + """ + + def __init__(self, config: Optional[BackpressureConfig] = None) -> None: + self._config = config or BackpressureConfig() + self._queue = BoundedIngestionQueue(self._config.max_capacity) + self._metrics = _Metrics() + self._metrics_lock = threading.Lock() + + # ------------------------------------------------------------------ + # Producer API + # ------------------------------------------------------------------ + + def enqueue(self, packet: IngestionPacket) -> bool: + """Submit *packet* to the ingestion pipeline. + + The drop-tail threshold policy is enforced here before the packet + reaches the queue: + + * At ≥ ``slow_down_threshold`` saturation: inject a proportional + back-off delay on the calling thread. + * At ≥ ``drop_threshold`` saturation: immediately discard + ``METRIC`` packets and return ``False``. + + Returns ``True`` if the packet was accepted, ``False`` if it was + dropped (backpressure activated). + """ + saturation = self._saturation() + + # ── Stage 1: slow-down ──────────────────────────────────────── + if saturation >= self._config.slow_down_threshold: + self._apply_slow_down(saturation) + + # ── Stage 2: drop-tail for METRIC packets ───────────────────── + if saturation >= self._config.drop_threshold: + if packet.priority == PacketPriority.METRIC: + logger.warning( + "[Backpressure] Drop-tail active — saturation %.0f%%. " + "Dropping METRIC packet (historical tracing data).", + saturation * 100, + ) + if self._config.enable_metrics: + with self._metrics_lock: + self._metrics.dropped_packets += 1 + return False + + # ── Stage 3: try non-blocking enqueue ───────────────────────── + if self._queue.put_nowait(packet): + return True + + # ── Stage 4: queue is full — priority-based overflow handling ── + if packet.priority == PacketPriority.CRITICAL: + # Block until space is available — live price data must not be lost. + logger.warning( + "[Backpressure] Queue full. CRITICAL packet blocking until " + "space is available." + ) + accepted = self._queue.put_blocking(packet, timeout=None) + if not accepted: + logger.error( + "[Backpressure] Failed to enqueue CRITICAL packet (queue closed?)." + ) + if self._config.enable_metrics: + with self._metrics_lock: + self._metrics.dropped_packets += 1 + return accepted + else: + # Non-critical — drop rather than block. + logger.error( + "[Backpressure] Queue overflow. Dropping %s packet.", + packet.priority.name, + ) + if self._config.enable_metrics: + with self._metrics_lock: + self._metrics.dropped_packets += 1 + return False + + # ------------------------------------------------------------------ + # Consumer API + # ------------------------------------------------------------------ + + def dequeue(self, timeout: Optional[float] = 1.0) -> Optional[IngestionPacket]: + """Remove and return the next packet from the queue. + + Parameters + ---------- + timeout: + Seconds to wait for an item. ``None`` waits indefinitely. + Returns ``None`` if the timeout expires. + """ + start = time.monotonic() + packet = self._queue.get_blocking(timeout=timeout) + if packet is not None and self._config.enable_metrics: + elapsed_ms = (time.monotonic() - start) * 1_000 + with self._metrics_lock: + self._metrics.record_processing_time(elapsed_ms) + return packet + + def try_dequeue(self) -> Optional[IngestionPacket]: + """Non-blocking dequeue. Returns ``None`` immediately if queue is empty.""" + start = time.monotonic() + packet = self._queue.get_nowait() + if packet is not None and self._config.enable_metrics: + elapsed_ms = (time.monotonic() - start) * 1_000 + with self._metrics_lock: + self._metrics.record_processing_time(elapsed_ms) + return packet + + def task_done(self) -> None: + """Signal that the last dequeued packet has been fully processed. + + Must be called once per :meth:`dequeue` / :meth:`try_dequeue` call to + allow :meth:`queue.Queue.join` to unblock. + """ + self._queue.task_done() + + # ------------------------------------------------------------------ + # Introspection + # ------------------------------------------------------------------ + + def get_queue_length(self) -> int: + """Current number of packets waiting in the queue.""" + return self._queue.size() + + def get_max_capacity(self) -> int: + return self._config.max_capacity + + def snapshot(self) -> BackpressureSnapshot: + """Return an immutable snapshot of current queue health metrics.""" + with self._metrics_lock: + avg_ms = self._metrics.average_processing_time_ms + dropped = self._metrics.dropped_packets + slowed = self._metrics.slowed_down_ingestions + length = self._queue.size() + return BackpressureSnapshot( + queue_length=length, + max_capacity=self._config.max_capacity, + saturation=round(length / self._config.max_capacity, 4), + dropped_packets=dropped, + slowed_down_ingestions=slowed, + average_processing_time_ms=round(avg_ms, 4), + ) + + def reset_metrics(self) -> None: + """Reset counters and timing samples (useful for testing).""" + with self._metrics_lock: + self._metrics.dropped_packets = 0 + self._metrics.slowed_down_ingestions = 0 + self._metrics.processing_times_ms.clear() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _saturation(self) -> float: + """Current queue fill ratio as a value in [0.0, 1.0].""" + return self._queue.size() / self._config.max_capacity + + def _apply_slow_down(self, saturation: float) -> None: + """Inject a proportional back-off delay on the calling thread. + + The delay grows linearly from 0 ms at ``slow_down_threshold`` to + ``slow_down_delay_ms`` at 100 % saturation. + """ + headroom = 1.0 - self._config.slow_down_threshold + if headroom <= 0: + return + ratio = (saturation - self._config.slow_down_threshold) / headroom + delay_s = max(0.0, (self._config.slow_down_delay_ms * ratio) / 1_000) + if delay_s > 0: + if self._config.enable_metrics: + with self._metrics_lock: + self._metrics.slowed_down_ingestions += 1 + logger.debug( + "[Backpressure] Slowing down ingestion by %.1f ms " + "(saturation: %.0f%%).", + delay_s * 1_000, + saturation * 100, + ) + time.sleep(delay_s) + + +# --------------------------------------------------------------------------- +# Module-level singleton (mirrors token_bucket_controller pattern) +# --------------------------------------------------------------------------- + +#: Shared queue manager; configure via ``backpressure_queue_manager._config`` +#: or replace with a custom instance before starting workers. +backpressure_queue_manager = BackpressureQueueManager() + + +# =========================================================================== +# Token-bucket rate limiter (unchanged — kept for backward-compat) +# =========================================================================== + + + @dataclass(frozen=True) class TokenBucketConfig: max_tokens: float @@ -146,6 +658,15 @@ def snapshot_all(self) -> Dict[str, TokenBucketSnapshot]: token_bucket_controller = TokenBucketController() __all__ = [ + # Drop-tail ingestion queue pipeline + "PacketPriority", + "IngestionPacket", + "BackpressureConfig", + "BackpressureSnapshot", + "BoundedIngestionQueue", + "BackpressureQueueManager", + "backpressure_queue_manager", + # Token-bucket rate limiter (backward-compat) "TokenBucketConfig", "TokenBucketSnapshot", "TokenBucket",