From 86b28a1b9c54393de6c8e7a45e9642fab6f45b88 Mon Sep 17 00:00:00 2001 From: Jimmy Date: Thu, 19 Feb 2026 18:14:01 +1300 Subject: [PATCH] Restructure: remove old flat src/lib layout Co-authored-by: Cursor --- box.scad | 20 - box.stl | Bin 25484 -> 0 bytes lib/microdot/__init__.py | 2 - lib/microdot/helpers.py | 8 - lib/microdot/microdot.py | 1450 ----------------------------------- lib/microdot/utemplate.py | 70 -- lib/microdot/websocket.py | 231 ------ lib/utemplate/__init__.py | 0 lib/utemplate/compiled.py | 14 - lib/utemplate/recompile.py | 21 - lib/utemplate/source.py | 188 ----- src/boot.py | 19 - src/dma.py | 113 --- src/main.py | 34 - src/patterns.py | 95 --- src/settings.py | 53 -- src/static/main.css | 75 -- src/static/main.js | 147 ---- src/templates/index.html | 41 - src/templates/index_html.py | 94 --- src/web.py | 101 --- src/wifi.py | 46 -- src/ws2812.py | 68 -- test.py | 1 - 24 files changed, 2891 deletions(-) delete mode 100644 box.scad delete mode 100644 box.stl delete mode 100644 lib/microdot/__init__.py delete mode 100644 lib/microdot/helpers.py delete mode 100644 lib/microdot/microdot.py delete mode 100644 lib/microdot/utemplate.py delete mode 100644 lib/microdot/websocket.py delete mode 100644 lib/utemplate/__init__.py delete mode 100644 lib/utemplate/compiled.py delete mode 100644 lib/utemplate/recompile.py delete mode 100644 lib/utemplate/source.py delete mode 100644 src/boot.py delete mode 100644 src/dma.py delete mode 100644 src/main.py delete mode 100644 src/patterns.py delete mode 100644 src/settings.py delete mode 100644 src/static/main.css delete mode 100644 src/static/main.js delete mode 100644 src/templates/index.html delete mode 100644 src/templates/index_html.py delete mode 100644 src/web.py delete mode 100644 src/wifi.py delete mode 100644 src/ws2812.py delete mode 100644 test.py diff --git a/box.scad b/box.scad deleted file mode 100644 index e4ea3c2..0000000 --- a/box.scad +++ /dev/null @@ -1,20 +0,0 @@ -difference() { -cube([30,25,20]); - //hoop - translate([15,12.5,-905]){ - rotate([90,0,0]) - rotate_extrude($fn=300) - translate([900,0,0]) - circle(d=25, $fn=100); - }; - //pico - translate([3.25,3.5,0]) - cube([23.5,18,12]); - //pico usb port - translate([26.75,8,6.5]) - cube([3.25,9,5.5]); - //wifi - translate([2.5,5,12]) - cube([25,15,5]); - -}; \ No newline at end of file diff --git a/box.stl b/box.stl deleted file mode 100644 index 4cff76d132cd1292b657cb428ccafc23716c83a6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 25484 zcmbuH3wTt;*~do|W34ow1eHfYqtz;CwQ8xhyJTnNs-iC91u6gNMWqOe zm4bp%i3$oO383I@LpFj~1Pc;0Dnt#5gs3QMtXg@zeD7S&zW+HJj^Fcq`!sTT=Ka0z z%$#%P%=~BOTu?fC{E$AXZ}E9Ij2eCYf#U!B`)(iYA(hbo(MLjLRej3}HTR^0q&^m6 z9sTKFa9nEszaeZwF2!1XBQi=2DUEC5>rNR*yBO^fvu>+V;#WONG{JNfapfPX)lto7 zP_DCz=_umHC(^3;n#&ErbQDp0_)F^V2R>&ArlW{A#=oc*|8A`zn2sXOT)0x*I(A>F z%fWOMf!BuV_6dEZvZ~+v%hegF6HN@JqljZWEmyBKoo)!Gqlo8Ut5n^~1{;FuC}Q4@ z3iZL9VTNEjidZyyvHE`YO@?4Pin#X1MXKni+YP~V6mfUW0(IxY8HQjwiugs%Q)>IK zA20;d`NV9x76Z-ouuqh|HbRusIsFGjN?*0_-kz^+8&qce`b6Lf2%_=n^l8pvnDAo+ zezWTQe?6|Qoc%b(4ilWB<1x`;`6KG#WsBP(IK?M=&Y7pWt*B^+;1r*D^`bedW%cvz z5S-!@J5HLdn(uh29fIp!6K|45aBYGzg-ecB*ALnxtzS!DwYB{i?04tD8u`umr4%Qt zn2w5Z$�(t)Hx7I*K?lxme!)&=V#G(^16wrUlYo=d5Dd5h-DQx`gHAQ{SWEx(eO9 zKxnkIJ;Hg4%9UmVnsXQ?I7R1*iP3V7Jm$t(QlIhPQJmrv>k1x}_6ZIXoZ=I|IC+Zv z@sKI)#NZU4s2x5@+LkCBgHwEB%g~W>))7~>6N6KH;>;~WrEO8eF*wC1ZYV2}55F60 zCkChZ#77UEA?=$X9D`GQ;)wo7$tCx8X(tA!_(bycuG02C!ZAedmsMe)t+V%_e-t9j(Vf$KPf>V6r#o`A{J>|#X_Ei%>%W02Cxi&HARR%Q> zv>Dn8+m+zFj$(hBKr4L~tpeJa(?6-q8LK{3t(}8Dr>430iha?5No90PbJDzg!uJ%| z>lpSp@XPeB7dxuEFAF^FjR;$P1wG_|3Fk_m58`2&0(+Y)onn8}e_5$&2TXTYEvxVf z1~xhd6S@`>{|AM3R*HG(34N#7yt?MPnVke&L?^wwn7E1zNM`d(msKs1|-mvPs}=L znF`)sNT4U5z)=Gd=*cHq{UN^yCvQ zOP^FhuLueBIyyiM8VMeRsRG3l}Df_pZI>$ zG*#NPF^@n`KJna)zo~I6^%z9|_zI#YpQvw{qOKqPjx+!e=*bW%YoY(ketnwX)P9Uq z*_~=?%e&HgU@N43VpsJ<)ppGXc?5d$iKfGEQNNq>VIF~=d}7zcaq2hC&3OcR@`-_) z#;BXqALS9~$tMaAzEU-8-kC?BC!bg_@(OjpMXh-Rdh&@6XI`W_pYll_fu4Ngo^6BG zU4P6c(34N-vGNh$<`d{COkiJ)={bY?sm1@cp7u_lRq=#au6%nZJgeF>>HYYzsgb=C zDX_|Zesb&n|>k&T6O7jXzsb$ zdknOCYe|v1v!&rDVxZMUryZDUF~Ttf{p>pftqK2w+1{O)L!__u=neMep#!@ zkHNV@tIs>$FRj)aMW7XHHBkgwy;^spw3>AkfmW>5L=k9p(fYyCYVT15TCr9WMWEHY zhnyBOi1hX#1egn59ugaIDWe!XD;Q0xNggoDYuMFTE!8FUXaDY}%6)YZhpwEdyE~pFjD; z_}5Q~-bD?QP20#li==$JM<4ae)t^ebi={r_+D{+PCE~UmfmT8=Ed=*!uD;{M_;HVl z-d$Y`PDzr#fogZ*E)#>}9ZJNLkBQ!g83L`)ub^Mhx{h1dGk*Aj+TPpVyTg=&doM}3 zWB%{d3&V9UEGhN*CtG!o^Y=vbC1R@~(28mZdq}yS)0>-n#tT;0_MYrwaGwh62R(g9 z#N)371X|H`&H2T8oAZ|*@kPChOFF+j$&`b8NJ%;W_@1h8wC-Ogr9S`Tx4L(}gNUg) z0JOKELq}J(9^1aej_KDyC|g**3Fe$7UOYQ<9SF2R$80L^mQ@41reY7whfTQAfv58e3V? zw6)ZfgL{)n`Lm}xs`q>89&1wS^QV5hiH=7SaVinp4S`ldFf9c4WPc>0l8EB%>C zpeIdK5wXF=;95k#SZ^b5?H->!bVJF7pN}!+;2vvI)^7h&et3-T6(^-WKkSFKbX+&F zd%Ti}a}9x3LNF}^_m(dn*FFBj`5Q_`XKf5l!TLc@6-3k#QECXZLcdsV*_N*HVG~+Q zwj46bl!JT4Njcz<_hn1jE2bR!{F*s0)A3>=Mia5g5NIU?(?W2Md}(vn_)#~smW(>o z#^4mJAM{lCQP+5Od~3-KhCnOyi}m(WRZ;wl3pymWUNYR2gL~ddx%m6ndQD?4TXBzsOijrwyVW&ilKbiR;?K|~yE2qAbRr29dZ z@I5uoZHo;i;-GejH(nVp%a>2^;t6pQ#khcoLPI1aTVXtUm*qxxvlg|*8usgznB2iH zhoARSQ;x<>V<=uF5j%;v#Sqk5=B^a_#cT5^5sgGlG6Y&7jkz7y)D(MTf0_9B>>j3e z^@vVndsB2FaCFCG!^MqFu?LAb@f=I=^`N&5j5E2`*Ag+Eh|>*$R(Lf_6~`WOZe8rN z{^ul)Z|P=orN@Bc4~Lpu;dqcolS2pA#U>K*vLo1+5Nw5II){i!M7&@Kw8ET}npFfg z_UVnav5x5>iJO`WO|JB)QC#?5Le~|JAbI>5Yp9J)e0fNs??;xvaZFB1xP96`L`1c;eQiCD@k` zY=vbyoQRo3+-wN6!km;Uj@|k7vRJa~sKn0F)*Ah+nUSejXjUQMD3!;!r@vYjd-bqU zi39#(3HBufTT#o9TLEz@5w8(ZU0f}WXOh^@GLTpetA@!?LRH{ z>=$DaS6rM=a5*r>P9mNoVptx5RyZ@>VStTsIT89^3jBt1$hn%>cUtVTFUDjBU9_Pl zyjlQfr+IGs`F_)4UE9WFrc=pvjKIpLOYKTobDa)3Z(3|2#V9km;*>DKD#p&*s_iALuTrKWtQH?Vt5E7lKnbp1yn7(}C;DVqXw(nIX^$X)d1- z&n&HqZNGGQX0Cfa^sIOA)Zz>{&z~PfPZZtlnc!~c6wFDfS&eHw?RmN?b`}x;bx#!M zmwFj6!_T#F+&0seWP-Lt&w2+f9L|9AjQfi1wXq2=56PVGS_V$RoRpfC+1Aq&L<}Ng zs%w3)}>6vxg0}ujd*?EqBcym)MMMT2AaX1BYQmWX~nMCv> zBIVv{oL{<^U=;z^LYIoR*yZ{?>-r2l7oU`iPFte7Ty~w*<0yUJ_OA;5*%li>M3L(= zu$2%@3&HCEZl2v1dwIWJnI~Nj0139zy_yQ8s2?Q66LU;2BV&6Rn1ZF%y^IQFdl|hb z#*Y-klto~TB8|DyF}4wLr^^*bM;hb(x7Ar0a{al@Lq|!Rs0Nz98b4(;ZS_dX|`i*8tqC-DDnm{YMYOtz<>+Ox!PVvU3!c3hTtMS^1r0n_nqpJCk zg{B<(yd87yOCzuEh_ItOv=V}8A$VQJ{a3BhdysLfD4~5iti$#!Hu2SigZ9& z3(jjluA%Yy85efQOmSmpPNBT#>IdU(uPlle5;50}o!N@C3|O7Q^|p|7hT#)hYc@MA zgV%T@<&^zrsVU#om~!azR+Bl0h;xZ}$7vaCB?Qw#@cNQdc6N>L9^YCs!D$(sLV9bi zelXr8L=@c6S~J#Z8El1qvEJ16-Q)9zZm1dTv?X3gl9bDa-=~g#>vdBOecozh3y64@ zh(S(UVk;q-7J}EX)Zf-U-uZ$JHCH-qiBpo2>j&elCZd3dtDLsPR_GV&t#I_w@t$KV zYnq)7$16{gvitO@>d3?vQx1LJYL#e*gNh@#i+pthwK5ue`b?DJy5E)S8pGnR4j!Rzv=Qh;2kn zb=oUiQU4lN+3?Dmv82|Xux(~dv(sKVB`LXnFy8Ag9vfe~b!N?%PJ3l5()3~N57*ni zq&g3~Vnj`#h4TuUq};UMO={gE+if{$jgHmyi-;JJBhZS<0V{cU_09vg93Q{w!Vxv! zIW3%1l9KBOJq;kD8xi|#(F#3U3BfehTQ`~o*xI|e=3qB(z^inU@`GouRg-SlD@2k~ zpSN=?%f|JHR}x`oGti234p=S3D}>r;Hsguk7T1h(F*t>Cn5!T3w1tRUiMYYdO|TV> z0bvaj*V~zOJ>#(jwX5E9^EkW`i0<9QaJ6BxUQLsf`n;XtmA9L@yH$s}c{R2Y^b6+Fgy5AT2kRdbxfooF z=ojnls-0b{4ly>(K?kn;Mv_iVU(5=g~r*A5Ys`rqF zyuiHG8kE*sAD*uDXxx32HRtWFq~ofJs`n!edA=dg3h4&5<;Yj$#8~q`s&|sI z-Duvdc{g=R^fguR=F1vF(yZ#71H%;&FFii9lcyyA*w$k^?a-uv0zHpT$^AA zD91aEh>wXFX9%=H8m~>|oVIj5>AO?S$g5t}jJaCcab{NWo-bWjBV}$pm}b=(Q+(}J z6LviKB*oZDG3@9rYxH)zXAEhKN8`aXjRzg!Td@j^@fw!2rIVzd|ICbp{c`X%z!*nX zw52a5;s8Tr%@wq}<&ef})7;dQX4N^Xc)yyj@3MaVJ`qz$r|#`E0M?L^&KjNC?r=M= zu_?WYh>wgifmT>rtivM*)umZ=&MMyF=Igs0`*0#|A)Pwy^b*#Pk;dI_4-TkH?;_$? zhCnMUIhNz>?X_uEopJA*ukUi~fkgb3bm~4%TVf3v>8#PI?Vh+3w$-NhC-wYhqn@J` zmK@7**3(sKR-Lno_r&@7F30Xm#Dk<$PjEUMYsg5`la|wvw=S(pzd`ExDx(gf6_y;! zamV_y^jcD#I~vtFt9TckukUi~$wZ*C&009pS*wwoZyda~x-5MGspl<5NkuCxIhI47 zH!ZDI=SxYg^>=5X(u_Hr+;3X?WYWTmNfGu5%n9xcyNQTj6LEskklD(oF~&^;rltQz z8uD?MY^d>f%Vpu445;Eh-hWy;P@R1Ot$0VB5EFY(OP5lNSrkLR!vkg8r}3*1pA4Or zKJq_fGN+tliQsz@VS;x9&RtWM7Nl$!JJp%>-K^p-MP!XmZFgth`)XO5m2I^0HDr@p zqqlc_y)1nR#dynIbM}iR=kf`$w6ZE)OR96RyW3gcrS=iNOu_nYMU{D?oU)BpzJ|=@ z>%OQeogwwS&OIOO7t4Vq|1Uk~Wu!WHb}a+zyVMTCw=7uSt!k)Ev$Bm=zJ|=@8_`&s z{(@r6c5Mm!#d2ifdl?-E)}@~z)%kYUin6}zfBk~>-3N%chlqc=R+PVT;cLiTzKhST zOVg}n#?D%zUo1x!znwzwkCD_TxzfFHSdY#s{^CW}=+yQdd>0X{Y@-$Gt(Zfa;Z1L# z74~ou*&^85sQd;+!1;O|C4S6!327G zkBIAtcr%ZvS4*nOm0gR2-!d6Y#Bd_sGXz?hv4M-H?}c*peWM;t+<(6re`G#$BNp__ z*8pQ|zptSBFQn-=x$y^EVLZGx?|$5=dMPQ}=esedR(^xAGk*Pq72$0jkyUKfpqOq@ zqehgeE|q#^)uA*>rPWlKSKOGBQ}Amo=;@l>ovQbt7W5@UpcVSX>(RKPsCpN@396h1 zpcUajXTUG4u#)}|d_}~&P6OatM88;XKPSE9e@H!VG3vR#GiIxgjmnWVx#BOk%vSa*1wMgRNMo5u z%cNE(_+cw!TOuJO#l==BOmHje6WsEJ34YFff<1)^Zhd@$J%tH=K74{bg$aJve1bhW zB4umKxAJMd=SSCDevFZlYqVTir2rL#D1sFa9BoGtthV4-J&ItZ07t1&1gjJ{#*HFa z572cOXaM;H*N?WsdJAe-*RcNSdsxo{2lbwRZE{3P+gk#ykhU|*Il=n9rsr}X` zWJQ#HGw`0caIQGTkH_yq4O z3lp5;6TG`BOmK=%@Sd+Q!6`n0dz$=f&M7{@`?10?IK?M;*H)O|6rbR|Qegu7CAfo! zciMyr_T|UmJy>A^W8iM5{~N(M!M#i27?^jA$2)Bt0Z%RW!q^JS6prCrG0i)M!UU&i zzf9;^K)dEHOmK=%@NS+k!6`n$z5X!4DL%nFZNdbn_yqU*!vv@J1nxcYpASy)30-ei zzY^vbj8wH%(0fI#5Bu{3r09Rt1J^&aLfRe&{atNo311gOzvxNZJ0cZ~k#&3C9_^LG z^+$2kp#27On^+4v0R7JqXoa*r_R67&l)q~S{h}wb)W$9bTeU}HISiGdon@5wH>G~F zY0c9AeqwN52hjHP4-sgEv{fmrC$`o{s9%KX;Cqcgr1E31cA|5b`>G>BLCLk>Aip+O zc1;Q9P@6M>R!H09+}Cue#6jti!1W$}jNrZ{&XwHOiM9&VNL0MYy^i62+A63Mv_ics zieOC+)u<=}wLVtDB8i-mws$qkF9+&ytc2yq;QGN@(Df$7H2TXY@R|o!7&}bxN+4~; zw0>V%oiR-CDkPs^Pho;r>G%YD3KRPMVU=y4U{7H}zb)rzbCNCbXO4ChIk(jWcwU60X`Hpi4Rcg+*pTr*B#kv1i;l4Kbi z17YkrC1-^+9qaS<7d(eaI;-=ZBV0~`82LnyqN5{?<@00cRzHY=R!IAVwX(ArNZVsO z)8G?mg*3)I}r)=WPn^too|J-RaJqY7^Y5S zeooL65Oip+2NAjJ5u$@DsORnI&y`;LD5dM7-vH%A=>> d = NoCaseDict() - >>> d['Content-Type'] = 'text/html' - >>> print(d['Content-Type']) - text/html - >>> print(d['content-type']) - text/html - >>> print(d['CONTENT-TYPE']) - text/html - >>> del d['cOnTeNt-TyPe'] - >>> print(d) - {} - """ - def __init__(self, initial_dict=None): - super().__init__(initial_dict or {}) - self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k} - - def __setitem__(self, key, value): - kl = key.lower() - key = self.keymap.get(kl, key) - if kl != key: - self.keymap[kl] = key - super().__setitem__(key, value) - - def __getitem__(self, key): - kl = key.lower() - return super().__getitem__(self.keymap.get(kl, kl)) - - def __delitem__(self, key): - kl = key.lower() - super().__delitem__(self.keymap.get(kl, kl)) - - def __contains__(self, key): - kl = key.lower() - return self.keymap.get(kl, kl) in self.keys() - - def get(self, key, default=None): - kl = key.lower() - return super().get(self.keymap.get(kl, kl), default) - - def update(self, other_dict): - for key, value in other_dict.items(): - self[key] = value - - -def mro(cls): # pragma: no cover - """Return the method resolution order of a class. - - This is a helper function that returns the method resolution order of a - class. It is used by Microdot to find the best error handler to invoke for - the raised exception. - - In CPython, this function returns the ``__mro__`` attribute of the class. - In MicroPython, this function implements a recursive depth-first scanning - of the class hierarchy. - """ - if hasattr(cls, 'mro'): - return cls.__mro__ - - def _mro(cls): - m = [cls] - for base in cls.__bases__: - m += _mro(base) - return m - - mro_list = _mro(cls) - - # If a class appears multiple times (due to multiple inheritance) remove - # all but the last occurence. This matches the method resolution order - # of MicroPython, but not CPython. - mro_pruned = [] - for i in range(len(mro_list)): - base = mro_list.pop(0) - if base not in mro_list: - mro_pruned.append(base) - return mro_pruned - - -class MultiDict(dict): - """A subclass of dictionary that can hold multiple values for the same - key. It is used to hold key/value pairs decoded from query strings and - form submissions. - - :param initial_dict: an initial dictionary of key/value pairs to - initialize this object with. - - Example:: - - >>> d = MultiDict() - >>> d['sort'] = 'name' - >>> d['sort'] = 'email' - >>> print(d['sort']) - 'name' - >>> print(d.getlist('sort')) - ['name', 'email'] - """ - def __init__(self, initial_dict=None): - super().__init__() - if initial_dict: - for key, value in initial_dict.items(): - self[key] = value - - def __setitem__(self, key, value): - if key not in self: - super().__setitem__(key, []) - super().__getitem__(key).append(value) - - def __getitem__(self, key): - return super().__getitem__(key)[0] - - def get(self, key, default=None, type=None): - """Return the value for a given key. - - :param key: The key to retrieve. - :param default: A default value to use if the key does not exist. - :param type: A type conversion callable to apply to the value. - - If the multidict contains more than one value for the requested key, - this method returns the first value only. - - Example:: - - >>> d = MultiDict() - >>> d['age'] = '42' - >>> d.get('age') - '42' - >>> d.get('age', type=int) - 42 - >>> d.get('name', default='noname') - 'noname' - """ - if key not in self: - return default - value = self[key] - if type is not None: - value = type(value) - return value - - def getlist(self, key, type=None): - """Return all the values for a given key. - - :param key: The key to retrieve. - :param type: A type conversion callable to apply to the values. - - If the requested key does not exist in the dictionary, this method - returns an empty list. - - Example:: - - >>> d = MultiDict() - >>> d.getlist('items') - [] - >>> d['items'] = '3' - >>> d.getlist('items') - ['3'] - >>> d['items'] = '56' - >>> d.getlist('items') - ['3', '56'] - >>> d.getlist('items', type=int) - [3, 56] - """ - if key not in self: - return [] - values = super().__getitem__(key) - if type is not None: - values = [type(value) for value in values] - return values - - -class AsyncBytesIO: - """An async wrapper for BytesIO.""" - def __init__(self, data): - self.stream = io.BytesIO(data) - - async def read(self, n=-1): - return self.stream.read(n) - - async def readline(self): # pragma: no cover - return self.stream.readline() - - async def readexactly(self, n): # pragma: no cover - return self.stream.read(n) - - async def readuntil(self, separator=b'\n'): # pragma: no cover - return self.stream.readuntil(separator=separator) - - async def awrite(self, data): # pragma: no cover - return self.stream.write(data) - - async def aclose(self): # pragma: no cover - pass - - -class Request: - """An HTTP request.""" - #: Specify the maximum payload size that is accepted. Requests with larger - #: payloads will be rejected with a 413 status code. Applications can - #: change this maximum as necessary. - #: - #: Example:: - #: - #: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed - max_content_length = 16 * 1024 - - #: Specify the maximum payload size that can be stored in ``body``. - #: Requests with payloads that are larger than this size and up to - #: ``max_content_length`` bytes will be accepted, but the application will - #: only be able to access the body of the request by reading from - #: ``stream``. Set to 0 if you always access the body as a stream. - #: - #: Example:: - #: - #: Request.max_body_length = 4 * 1024 # up to 4KB bodies read - max_body_length = 16 * 1024 - - #: Specify the maximum length allowed for a line in the request. Requests - #: with longer lines will not be correctly interpreted. Applications can - #: change this maximum as necessary. - #: - #: Example:: - #: - #: Request.max_readline = 16 * 1024 # 16KB lines allowed - max_readline = 2 * 1024 - - class G: - pass - - def __init__(self, app, client_addr, method, url, http_version, headers, - body=None, stream=None, sock=None): - #: The application instance to which this request belongs. - self.app = app - #: The address of the client, as a tuple (host, port). - self.client_addr = client_addr - #: The HTTP method of the request. - self.method = method - #: The request URL, including the path and query string. - self.url = url - #: The path portion of the URL. - self.path = url - #: The query string portion of the URL. - self.query_string = None - #: The parsed query string, as a - #: :class:`MultiDict ` object. - self.args = {} - #: A dictionary with the headers included in the request. - self.headers = headers - #: A dictionary with the cookies included in the request. - self.cookies = {} - #: The parsed ``Content-Length`` header. - self.content_length = 0 - #: The parsed ``Content-Type`` header. - self.content_type = None - #: A general purpose container for applications to store data during - #: the life of the request. - self.g = Request.G() - - self.http_version = http_version - if '?' in self.path: - self.path, self.query_string = self.path.split('?', 1) - self.args = self._parse_urlencoded(self.query_string) - - if 'Content-Length' in self.headers: - self.content_length = int(self.headers['Content-Length']) - if 'Content-Type' in self.headers: - self.content_type = self.headers['Content-Type'] - if 'Cookie' in self.headers: - for cookie in self.headers['Cookie'].split(';'): - name, value = cookie.strip().split('=', 1) - self.cookies[name] = value - - self._body = body - self.body_used = False - self._stream = stream - self.sock = sock - self._json = None - self._form = None - self.after_request_handlers = [] - - @staticmethod - async def create(app, client_reader, client_writer, client_addr): - """Create a request object. - - :param app: The Microdot application instance. - :param client_reader: An input stream from where the request data can - be read. - :param client_writer: An output stream where the response data can be - written. - :param client_addr: The address of the client, as a tuple. - - This method is a coroutine. It returns a newly created ``Request`` - object. - """ - # request line - line = (await Request._safe_readline(client_reader)).strip().decode() - if not line: # pragma: no cover - return None - method, url, http_version = line.split() - http_version = http_version.split('/', 1)[1] - - # headers - headers = NoCaseDict() - content_length = 0 - while True: - line = (await Request._safe_readline( - client_reader)).strip().decode() - if line == '': - break - header, value = line.split(':', 1) - value = value.strip() - headers[header] = value - if header.lower() == 'content-length': - content_length = int(value) - - # body - body = b'' - if content_length and content_length <= Request.max_body_length: - body = await client_reader.readexactly(content_length) - stream = None - else: - body = b'' - stream = client_reader - - return Request(app, client_addr, method, url, http_version, headers, - body=body, stream=stream, - sock=(client_reader, client_writer)) - - def _parse_urlencoded(self, urlencoded): - data = MultiDict() - if len(urlencoded) > 0: # pragma: no branch - if isinstance(urlencoded, str): - for kv in [pair.split('=', 1) - for pair in urlencoded.split('&') if pair]: - data[urldecode_str(kv[0])] = urldecode_str(kv[1]) \ - if len(kv) > 1 else '' - elif isinstance(urlencoded, bytes): # pragma: no branch - for kv in [pair.split(b'=', 1) - for pair in urlencoded.split(b'&') if pair]: - data[urldecode_bytes(kv[0])] = urldecode_bytes(kv[1]) \ - if len(kv) > 1 else b'' - return data - - @property - def body(self): - """The body of the request, as bytes.""" - return self._body - - @property - def stream(self): - """The body of the request, as a bytes stream.""" - if self._stream is None: - self._stream = AsyncBytesIO(self._body) - return self._stream - - @property - def json(self): - """The parsed JSON body, or ``None`` if the request does not have a - JSON body.""" - if self._json is None: - if self.content_type is None: - return None - mime_type = self.content_type.split(';')[0] - if mime_type != 'application/json': - return None - self._json = json.loads(self.body.decode()) - return self._json - - @property - def form(self): - """The parsed form submission body, as a - :class:`MultiDict ` object, or ``None`` if the - request does not have a form submission.""" - if self._form is None: - if self.content_type is None: - return None - mime_type = self.content_type.split(';')[0] - if mime_type != 'application/x-www-form-urlencoded': - return None - self._form = self._parse_urlencoded(self.body) - return self._form - - def after_request(self, f): - """Register a request-specific function to run after the request is - handled. Request-specific after request handlers run at the very end, - after the application's own after request handlers. The function must - take two arguments, the request and response objects. The return value - of the function must be the updated response object. - - Example:: - - @app.route('/') - def index(request): - # register a request-specific after request handler - @req.after_request - def func(request, response): - # ... - return response - - return 'Hello, World!' - - Note that the function is not called if the request handler raises an - exception and an error response is returned instead. - """ - self.after_request_handlers.append(f) - return f - - @staticmethod - async def _safe_readline(stream): - line = (await stream.readline()) - if len(line) > Request.max_readline: - raise ValueError('line too long') - return line - - -class Response: - """An HTTP response class. - - :param body: The body of the response. If a dictionary or list is given, - a JSON formatter is used to generate the body. If a file-like - object or an async generator is given, a streaming response is - used. If a string is given, it is encoded from UTF-8. Else, - the body should be a byte sequence. - :param status_code: The numeric HTTP status code of the response. The - default is 200. - :param headers: A dictionary of headers to include in the response. - :param reason: A custom reason phrase to add after the status code. The - default is "OK" for responses with a 200 status code and - "N/A" for any other status codes. - """ - types_map = { - 'css': 'text/css', - 'gif': 'image/gif', - 'html': 'text/html', - 'jpg': 'image/jpeg', - 'js': 'application/javascript', - 'json': 'application/json', - 'png': 'image/png', - 'txt': 'text/plain', - } - - send_file_buffer_size = 1024 - - #: The content type to use for responses that do not explicitly define a - #: ``Content-Type`` header. - default_content_type = 'text/plain' - - #: The default cache control max age used by :meth:`send_file`. A value - #: of ``None`` means that no ``Cache-Control`` header is added. - default_send_file_max_age = None - - #: Special response used to signal that a response does not need to be - #: written to the client. Used to exit WebSocket connections cleanly. - already_handled = None - - def __init__(self, body='', status_code=200, headers=None, reason=None): - if body is None and status_code == 200: - body = '' - status_code = 204 - self.status_code = status_code - self.headers = NoCaseDict(headers or {}) - self.reason = reason - if isinstance(body, (dict, list)): - self.body = json.dumps(body).encode() - self.headers['Content-Type'] = 'application/json; charset=UTF-8' - elif isinstance(body, str): - self.body = body.encode() - else: - # this applies to bytes, file-like objects or generators - self.body = body - self.is_head = False - - def set_cookie(self, cookie, value, path=None, domain=None, expires=None, - max_age=None, secure=False, http_only=False, - partitioned=False): - """Add a cookie to the response. - - :param cookie: The cookie's name. - :param value: The cookie's value. - :param path: The cookie's path. - :param domain: The cookie's domain. - :param expires: The cookie expiration time, as a ``datetime`` object - or a correctly formatted string. - :param max_age: The cookie's ``Max-Age`` value. - :param secure: The cookie's ``secure`` flag. - :param http_only: The cookie's ``HttpOnly`` flag. - :param partitioned: Whether the cookie is partitioned. - """ - http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value) - if path: - http_cookie += '; Path=' + path - if domain: - http_cookie += '; Domain=' + domain - if expires: - if isinstance(expires, str): - http_cookie += '; Expires=' + expires - else: # pragma: no cover - http_cookie += '; Expires=' + time.strftime( - '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) - if max_age is not None: - http_cookie += '; Max-Age=' + str(max_age) - if secure: - http_cookie += '; Secure' - if http_only: - http_cookie += '; HttpOnly' - if partitioned: - http_cookie += '; Partitioned' - if 'Set-Cookie' in self.headers: - self.headers['Set-Cookie'].append(http_cookie) - else: - self.headers['Set-Cookie'] = [http_cookie] - - def delete_cookie(self, cookie, **kwargs): - """Delete a cookie. - - :param cookie: The cookie's name. - :param kwargs: Any cookie opens and flags supported by - ``set_cookie()`` except ``expires`` and ``max_age``. - """ - self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', - max_age=0, **kwargs) - - def complete(self): - if isinstance(self.body, bytes) and \ - 'Content-Length' not in self.headers: - self.headers['Content-Length'] = str(len(self.body)) - if 'Content-Type' not in self.headers: - self.headers['Content-Type'] = self.default_content_type - if 'charset=' not in self.headers['Content-Type']: - self.headers['Content-Type'] += '; charset=UTF-8' - - async def write(self, stream): - self.complete() - - try: - # status code - reason = self.reason if self.reason is not None else \ - ('OK' if self.status_code == 200 else 'N/A') - await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format( - status_code=self.status_code, reason=reason).encode()) - - # headers - for header, value in self.headers.items(): - values = value if isinstance(value, list) else [value] - for value in values: - await stream.awrite('{header}: {value}\r\n'.format( - header=header, value=value).encode()) - await stream.awrite(b'\r\n') - - # body - if not self.is_head: - iter = self.body_iter() - async for body in iter: - if isinstance(body, str): # pragma: no cover - body = body.encode() - try: - await stream.awrite(body) - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS or \ - exc.args[0] == 'Connection lost': - if hasattr(iter, 'aclose'): - await iter.aclose() - raise - if hasattr(iter, 'aclose'): # pragma: no branch - await iter.aclose() - - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS or \ - exc.args[0] == 'Connection lost': - pass - else: - raise - - def body_iter(self): - if hasattr(self.body, '__anext__'): - # response body is an async generator - return self.body - - response = self - - class iter: - ITER_UNKNOWN = 0 - ITER_SYNC_GEN = 1 - ITER_FILE_OBJ = 2 - ITER_NO_BODY = -1 - - def __aiter__(self): - if response.body: - self.i = self.ITER_UNKNOWN # need to determine type - else: - self.i = self.ITER_NO_BODY - return self - - async def __anext__(self): - if self.i == self.ITER_NO_BODY: - await self.aclose() - raise StopAsyncIteration - if self.i == self.ITER_UNKNOWN: - if hasattr(response.body, 'read'): - self.i = self.ITER_FILE_OBJ - elif hasattr(response.body, '__next__'): - self.i = self.ITER_SYNC_GEN - return next(response.body) - else: - self.i = self.ITER_NO_BODY - return response.body - elif self.i == self.ITER_SYNC_GEN: - try: - return next(response.body) - except StopIteration: - await self.aclose() - raise StopAsyncIteration - buf = response.body.read(response.send_file_buffer_size) - if iscoroutine(buf): # pragma: no cover - buf = await buf - if len(buf) < response.send_file_buffer_size: - self.i = self.ITER_NO_BODY - return buf - - async def aclose(self): - if hasattr(response.body, 'close'): - result = response.body.close() - if iscoroutine(result): # pragma: no cover - await result - - return iter() - - @classmethod - def redirect(cls, location, status_code=302): - """Return a redirect response. - - :param location: The URL to redirect to. - :param status_code: The 3xx status code to use for the redirect. The - default is 302. - """ - if '\x0d' in location or '\x0a' in location: - raise ValueError('invalid redirect URL') - return cls(status_code=status_code, headers={'Location': location}) - - @classmethod - def send_file(cls, filename, status_code=200, content_type=None, - stream=None, max_age=None, compressed=False, - file_extension=''): - """Send file contents in a response. - - :param filename: The filename of the file. - :param status_code: The 3xx status code to use for the redirect. The - default is 302. - :param content_type: The ``Content-Type`` header to use in the - response. If omitted, it is generated - automatically from the file extension of the - ``filename`` parameter. - :param stream: A file-like object to read the file contents from. If - a stream is given, the ``filename`` parameter is only - used when generating the ``Content-Type`` header. - :param max_age: The ``Cache-Control`` header's ``max-age`` value in - seconds. If omitted, the value of the - :attr:`Response.default_send_file_max_age` attribute is - used. - :param compressed: Whether the file is compressed. If ``True``, the - ``Content-Encoding`` header is set to ``gzip``. A - string with the header value can also be passed. - Note that when using this option the file must have - been compressed beforehand. This option only sets - the header. - :param file_extension: A file extension to append to the ``filename`` - parameter when opening the file, including the - dot. The extension given here is not considered - when generating the ``Content-Type`` header. - - Security note: The filename is assumed to be trusted. Never pass - filenames provided by the user without validating and sanitizing them - first. - """ - if content_type is None: - if compressed and filename.endswith('.gz'): - ext = filename[:-3].split('.')[-1] - else: - ext = filename.split('.')[-1] - if ext in Response.types_map: - content_type = Response.types_map[ext] - else: - content_type = 'application/octet-stream' - headers = {'Content-Type': content_type} - - if max_age is None: - max_age = cls.default_send_file_max_age - if max_age is not None: - headers['Cache-Control'] = 'max-age={}'.format(max_age) - - if compressed: - headers['Content-Encoding'] = compressed \ - if isinstance(compressed, str) else 'gzip' - - f = stream or open(filename + file_extension, 'rb') - return cls(body=f, status_code=status_code, headers=headers) - - -class URLPattern(): - def __init__(self, url_pattern): - self.url_pattern = url_pattern - self.segments = [] - self.regex = None - pattern = '' - use_regex = False - for segment in url_pattern.lstrip('/').split('/'): - if segment and segment[0] == '<': - if segment[-1] != '>': - raise ValueError('invalid URL pattern') - segment = segment[1:-1] - if ':' in segment: - type_, name = segment.rsplit(':', 1) - else: - type_ = 'string' - name = segment - parser = None - if type_ == 'string': - parser = self._string_segment - pattern += '/([^/]+)' - elif type_ == 'int': - parser = self._int_segment - pattern += '/(-?\\d+)' - elif type_ == 'path': - use_regex = True - pattern += '/(.+)' - elif type_.startswith('re:'): - use_regex = True - pattern += '/({pattern})'.format(pattern=type_[3:]) - else: - raise ValueError('invalid URL segment type') - self.segments.append({'parser': parser, 'name': name, - 'type': type_}) - else: - pattern += '/' + segment - self.segments.append({'parser': self._static_segment(segment)}) - if use_regex: - import re - self.regex = re.compile('^' + pattern + '$') - - def match(self, path): - args = {} - if self.regex: - g = self.regex.match(path) - if not g: - return - i = 1 - for segment in self.segments: - if 'name' not in segment: - continue - value = g.group(i) - if segment['type'] == 'int': - value = int(value) - args[segment['name']] = value - i += 1 - else: - if len(path) == 0 or path[0] != '/': - return - path = path[1:] - args = {} - for segment in self.segments: - if path is None: - return - arg, path = segment['parser'](path) - if arg is None: - return - if 'name' in segment: - args[segment['name']] = arg - if path is not None: - return - return args - - def _static_segment(self, segment): - def _static(value): - s = value.split('/', 1) - if s[0] == segment: - return '', s[1] if len(s) > 1 else None - return None, None - return _static - - def _string_segment(self, value): - s = value.split('/', 1) - if len(s[0]) == 0: - return None, None - return s[0], s[1] if len(s) > 1 else None - - def _int_segment(self, value): - s = value.split('/', 1) - try: - return int(s[0]), s[1] if len(s) > 1 else None - except ValueError: - return None, None - - -class HTTPException(Exception): - def __init__(self, status_code, reason=None): - self.status_code = status_code - self.reason = reason or str(status_code) + ' error' - - def __repr__(self): # pragma: no cover - return 'HTTPException: {}'.format(self.status_code) - - -class Microdot: - """An HTTP application class. - - This class implements an HTTP application instance and is heavily - influenced by the ``Flask`` class of the Flask framework. It is typically - declared near the start of the main application script. - - Example:: - - from microdot import Microdot - - app = Microdot() - """ - - def __init__(self): - self.url_map = [] - self.before_request_handlers = [] - self.after_request_handlers = [] - self.after_error_request_handlers = [] - self.error_handlers = {} - self.shutdown_requested = False - self.options_handler = self.default_options_handler - self.debug = False - self.server = None - - def route(self, url_pattern, methods=None): - """Decorator that is used to register a function as a request handler - for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - :param methods: The list of HTTP methods to be handled by the - decorated function. If omitted, only ``GET`` requests - are handled. - - The URL pattern can be a static path (for example, ``/users`` or - ``/api/invoices/search``) or a path with dynamic components enclosed - in ``<`` and ``>`` (for example, ``/users/`` or - ``/invoices//products``). Dynamic path components can also - include a type prefix, separated from the name with a colon (for - example, ``/users/``). The type can be ``string`` (the - default), ``int``, ``path`` or ``re:[regular-expression]``. - - The first argument of the decorated function must be - the request object. Any path arguments that are specified in the URL - pattern are passed as keyword arguments. The return value of the - function must be a :class:`Response` instance, or the arguments to - be passed to this class. - - Example:: - - @app.route('/') - def index(request): - return 'Hello, world!' - """ - def decorated(f): - self.url_map.append( - ([m.upper() for m in (methods or ['GET'])], - URLPattern(url_pattern), f)) - return f - return decorated - - def get(self, url_pattern): - """Decorator that is used to register a function as a ``GET`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['GET']``. - - Example:: - - @app.get('/users/') - def get_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['GET']) - - def post(self, url_pattern): - """Decorator that is used to register a function as a ``POST`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the``route`` decorator with - ``methods=['POST']``. - - Example:: - - @app.post('/users') - def create_user(request): - # ... - """ - return self.route(url_pattern, methods=['POST']) - - def put(self, url_pattern): - """Decorator that is used to register a function as a ``PUT`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['PUT']``. - - Example:: - - @app.put('/users/') - def edit_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['PUT']) - - def patch(self, url_pattern): - """Decorator that is used to register a function as a ``PATCH`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['PATCH']``. - - Example:: - - @app.patch('/users/') - def edit_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['PATCH']) - - def delete(self, url_pattern): - """Decorator that is used to register a function as a ``DELETE`` - request handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['DELETE']``. - - Example:: - - @app.delete('/users/') - def delete_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['DELETE']) - - def before_request(self, f): - """Decorator to register a function to run before each request is - handled. The decorated function must take a single argument, the - request object. - - Example:: - - @app.before_request - def func(request): - # ... - """ - self.before_request_handlers.append(f) - return f - - def after_request(self, f): - """Decorator to register a function to run after each request is - handled. The decorated function must take two arguments, the request - and response objects. The return value of the function must be an - updated response object. - - Example:: - - @app.after_request - def func(request, response): - # ... - return response - """ - self.after_request_handlers.append(f) - return f - - def after_error_request(self, f): - """Decorator to register a function to run after an error response is - generated. The decorated function must take two arguments, the request - and response objects. The return value of the function must be an - updated response object. The handler is invoked for error responses - generated by Microdot, as well as those returned by application-defined - error handlers. - - Example:: - - @app.after_error_request - def func(request, response): - # ... - return response - """ - self.after_error_request_handlers.append(f) - return f - - def errorhandler(self, status_code_or_exception_class): - """Decorator to register a function as an error handler. Error handler - functions for numeric HTTP status codes must accept a single argument, - the request object. Error handler functions for Python exceptions - must accept two arguments, the request object and the exception - object. - - :param status_code_or_exception_class: The numeric HTTP status code or - Python exception class to - handle. - - Examples:: - - @app.errorhandler(404) - def not_found(request): - return 'Not found' - - @app.errorhandler(RuntimeError) - def runtime_error(request, exception): - return 'Runtime error' - """ - def decorated(f): - self.error_handlers[status_code_or_exception_class] = f - return f - return decorated - - def mount(self, subapp, url_prefix=''): - """Mount a sub-application, optionally under the given URL prefix. - - :param subapp: The sub-application to mount. - :param url_prefix: The URL prefix to mount the application under. - """ - for methods, pattern, handler in subapp.url_map: - self.url_map.append( - (methods, URLPattern(url_prefix + pattern.url_pattern), - handler)) - for handler in subapp.before_request_handlers: - self.before_request_handlers.append(handler) - for handler in subapp.after_request_handlers: - self.after_request_handlers.append(handler) - for handler in subapp.after_error_request_handlers: - self.after_error_request_handlers.append(handler) - for status_code, handler in subapp.error_handlers.items(): - self.error_handlers[status_code] = handler - - @staticmethod - def abort(status_code, reason=None): - """Abort the current request and return an error response with the - given status code. - - :param status_code: The numeric status code of the response. - :param reason: The reason for the response, which is included in the - response body. - - Example:: - - from microdot import abort - - @app.route('/users/') - def get_user(id): - user = get_user_by_id(id) - if user is None: - abort(404) - return user.to_dict() - """ - raise HTTPException(status_code, reason) - - async def start_server(self, host='0.0.0.0', port=5000, debug=False, - ssl=None): - """Start the Microdot web server as a coroutine. This coroutine does - not normally return, as the server enters an endless listening loop. - The :func:`shutdown` function provides a method for terminating the - server gracefully. - - :param host: The hostname or IP address of the network interface that - will be listening for requests. A value of ``'0.0.0.0'`` - (the default) indicates that the server should listen for - requests on all the available interfaces, and a value of - ``127.0.0.1`` indicates that the server should listen - for requests only on the internal networking interface of - the host. - :param port: The port number to listen for requests. The default is - port 5000. - :param debug: If ``True``, the server logs debugging information. The - default is ``False``. - :param ssl: An ``SSLContext`` instance or ``None`` if the server should - not use TLS. The default is ``None``. - - This method is a coroutine. - - Example:: - - import asyncio - from microdot import Microdot - - app = Microdot() - - @app.route('/') - async def index(request): - return 'Hello, world!' - - async def main(): - await app.start_server(debug=True) - - asyncio.run(main()) - """ - self.debug = debug - - async def serve(reader, writer): - if not hasattr(writer, 'awrite'): # pragma: no cover - # CPython provides the awrite and aclose methods in 3.8+ - async def awrite(self, data): - self.write(data) - await self.drain() - - async def aclose(self): - self.close() - await self.wait_closed() - - from types import MethodType - writer.awrite = MethodType(awrite, writer) - writer.aclose = MethodType(aclose, writer) - - await self.handle_request(reader, writer) - - if self.debug: # pragma: no cover - print('Starting async server on {host}:{port}...'.format( - host=host, port=port)) - - try: - self.server = await asyncio.start_server(serve, host, port, - ssl=ssl) - except TypeError: # pragma: no cover - self.server = await asyncio.start_server(serve, host, port) - - while True: - try: - if hasattr(self.server, 'serve_forever'): # pragma: no cover - try: - await self.server.serve_forever() - except asyncio.CancelledError: - pass - await self.server.wait_closed() - break - except AttributeError: # pragma: no cover - # the task hasn't been initialized in the server object yet - # wait a bit and try again - await asyncio.sleep(0.1) - - def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): - """Start the web server. This function does not normally return, as - the server enters an endless listening loop. The :func:`shutdown` - function provides a method for terminating the server gracefully. - - :param host: The hostname or IP address of the network interface that - will be listening for requests. A value of ``'0.0.0.0'`` - (the default) indicates that the server should listen for - requests on all the available interfaces, and a value of - ``127.0.0.1`` indicates that the server should listen - for requests only on the internal networking interface of - the host. - :param port: The port number to listen for requests. The default is - port 5000. - :param debug: If ``True``, the server logs debugging information. The - default is ``False``. - :param ssl: An ``SSLContext`` instance or ``None`` if the server should - not use TLS. The default is ``None``. - - Example:: - - from microdot import Microdot - - app = Microdot() - - @app.route('/') - async def index(request): - return 'Hello, world!' - - app.run(debug=True) - """ - asyncio.run(self.start_server(host=host, port=port, debug=debug, - ssl=ssl)) # pragma: no cover - - def shutdown(self): - """Request a server shutdown. The server will then exit its request - listening loop and the :func:`run` function will return. This function - can be safely called from a route handler, as it only schedules the - server to terminate as soon as the request completes. - - Example:: - - @app.route('/shutdown') - def shutdown(request): - request.app.shutdown() - return 'The server is shutting down...' - """ - self.server.close() - - def find_route(self, req): - method = req.method.upper() - if method == 'OPTIONS' and self.options_handler: - return self.options_handler(req) - if method == 'HEAD': - method = 'GET' - f = 404 - for route_methods, route_pattern, route_handler in self.url_map: - req.url_args = route_pattern.match(req.path) - if req.url_args is not None: - if method in route_methods: - f = route_handler - break - else: - f = 405 - return f - - def default_options_handler(self, req): - allow = [] - for route_methods, route_pattern, route_handler in self.url_map: - if route_pattern.match(req.path) is not None: - allow.extend(route_methods) - if 'GET' in allow: - allow.append('HEAD') - allow.append('OPTIONS') - return {'Allow': ', '.join(allow)} - - async def handle_request(self, reader, writer): - req = None - try: - req = await Request.create(self, reader, writer, - writer.get_extra_info('peername')) - except Exception as exc: # pragma: no cover - print_exception(exc) - - res = await self.dispatch_request(req) - if res != Response.already_handled: # pragma: no branch - await res.write(writer) - try: - await writer.aclose() - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS: - pass - else: - raise - if self.debug and req: # pragma: no cover - print('{method} {path} {status_code}'.format( - method=req.method, path=req.path, - status_code=res.status_code)) - - async def dispatch_request(self, req): - after_request_handled = False - if req: - if req.content_length > req.max_content_length: - if 413 in self.error_handlers: - res = await invoke_handler(self.error_handlers[413], req) - else: - res = 'Payload too large', 413 - else: - f = self.find_route(req) - try: - res = None - if callable(f): - for handler in self.before_request_handlers: - res = await invoke_handler(handler, req) - if res: - break - if res is None: - res = await invoke_handler( - f, req, **req.url_args) - if isinstance(res, int): - res = '', res - if isinstance(res, tuple): - if isinstance(res[0], int): - res = ('', res[0], - res[1] if len(res) > 1 else {}) - body = res[0] - if isinstance(res[1], int): - status_code = res[1] - headers = res[2] if len(res) > 2 else {} - else: - status_code = 200 - headers = res[1] - res = Response(body, status_code, headers) - elif not isinstance(res, Response): - res = Response(res) - for handler in self.after_request_handlers: - res = await invoke_handler( - handler, req, res) or res - for handler in req.after_request_handlers: - res = await invoke_handler( - handler, req, res) or res - after_request_handled = True - elif isinstance(f, dict): - res = Response(headers=f) - elif f in self.error_handlers: - res = await invoke_handler(self.error_handlers[f], req) - else: - res = 'Not found', f - except HTTPException as exc: - if exc.status_code in self.error_handlers: - res = self.error_handlers[exc.status_code](req) - else: - res = exc.reason, exc.status_code - except Exception as exc: - print_exception(exc) - exc_class = None - res = None - if exc.__class__ in self.error_handlers: - exc_class = exc.__class__ - else: - for c in mro(exc.__class__)[1:]: - if c in self.error_handlers: - exc_class = c - break - if exc_class: - try: - res = await invoke_handler( - self.error_handlers[exc_class], req, exc) - except Exception as exc2: # pragma: no cover - print_exception(exc2) - if res is None: - if 500 in self.error_handlers: - res = await invoke_handler( - self.error_handlers[500], req) - else: - res = 'Internal server error', 500 - else: - if 400 in self.error_handlers: - res = await invoke_handler(self.error_handlers[400], req) - else: - res = 'Bad request', 400 - if isinstance(res, tuple): - res = Response(*res) - elif not isinstance(res, Response): - res = Response(res) - if not after_request_handled: - for handler in self.after_error_request_handlers: - res = await invoke_handler( - handler, req, res) or res - res.is_head = (req and req.method == 'HEAD') - return res - - -Response.already_handled = Response() - -abort = Microdot.abort -redirect = Response.redirect -send_file = Response.send_file \ No newline at end of file diff --git a/lib/microdot/utemplate.py b/lib/microdot/utemplate.py deleted file mode 100644 index 16d0398..0000000 --- a/lib/microdot/utemplate.py +++ /dev/null @@ -1,70 +0,0 @@ -from utemplate import recompile - -_loader = None - - -class Template: - """A template object. - - :param template: The filename of the template to render, relative to the - configured template directory. - """ - @classmethod - def initialize(cls, template_dir='templates', - loader_class=recompile.Loader): - """Initialize the templating subsystem. - - :param template_dir: the directory where templates are stored. This - argument is optional. The default is to load - templates from a *templates* subdirectory. - :param loader_class: the ``utemplate.Loader`` class to use when loading - templates. This argument is optional. The default - is the ``recompile.Loader`` class, which - automatically recompiles templates when they - change. - """ - global _loader - _loader = loader_class(None, template_dir) - - def __init__(self, template): - if _loader is None: # pragma: no cover - self.initialize() - #: The name of the template - self.name = template - self.template = _loader.load(template) - - def generate(self, *args, **kwargs): - """Return a generator that renders the template in chunks, with the - given arguments.""" - return self.template(*args, **kwargs) - - def render(self, *args, **kwargs): - """Render the template with the given arguments and return it as a - string.""" - return ''.join(self.generate(*args, **kwargs)) - - def generate_async(self, *args, **kwargs): - """Return an asynchronous generator that renders the template in - chunks, using the given arguments.""" - class sync_to_async_iter(): - def __init__(self, iter): - self.iter = iter - - def __aiter__(self): - return self - - async def __anext__(self): - try: - return next(self.iter) - except StopIteration: - raise StopAsyncIteration - - return sync_to_async_iter(self.generate(*args, **kwargs)) - - async def render_async(self, *args, **kwargs): - """Render the template with the given arguments asynchronously and - return it as a string.""" - response = '' - async for chunk in self.generate_async(*args, **kwargs): - response += chunk - return response diff --git a/lib/microdot/websocket.py b/lib/microdot/websocket.py deleted file mode 100644 index 0fb6f7c..0000000 --- a/lib/microdot/websocket.py +++ /dev/null @@ -1,231 +0,0 @@ -import binascii -import hashlib -from microdot import Request, Response -from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception -from microdot.helpers import wraps - - -class WebSocketError(Exception): - """Exception raised when an error occurs in a WebSocket connection.""" - pass - - -class WebSocket: - """A WebSocket connection object. - - An instance of this class is sent to handler functions to manage the - WebSocket connection. - """ - CONT = 0 - TEXT = 1 - BINARY = 2 - CLOSE = 8 - PING = 9 - PONG = 10 - - #: Specify the maximum message size that can be received when calling the - #: ``receive()`` method. Messages with payloads that are larger than this - #: size will be rejected and the connection closed. Set to 0 to disable - #: the size check (be aware of potential security issues if you do this), - #: or to -1 to use the value set in - #: ``Request.max_body_length``. The default is -1. - #: - #: Example:: - #: - #: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages - max_message_length = -1 - - def __init__(self, request): - self.request = request - self.closed = False - - async def handshake(self): - response = self._handshake_response() - await self.request.sock[1].awrite( - b'HTTP/1.1 101 Switching Protocols\r\n') - await self.request.sock[1].awrite(b'Upgrade: websocket\r\n') - await self.request.sock[1].awrite(b'Connection: Upgrade\r\n') - await self.request.sock[1].awrite( - b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n') - - async def receive(self): - """Receive a message from the client.""" - while True: - opcode, payload = await self._read_frame() - send_opcode, data = self._process_websocket_frame(opcode, payload) - if send_opcode: # pragma: no cover - await self.send(data, send_opcode) - elif data: # pragma: no branch - return data - - async def send(self, data, opcode=None): - """Send a message to the client. - - :param data: the data to send, given as a string or bytes. - :param opcode: a custom frame opcode to use. If not given, the opcode - is ``TEXT`` or ``BINARY`` depending on the type of the - data. - """ - frame = self._encode_websocket_frame( - opcode or (self.TEXT if isinstance(data, str) else self.BINARY), - data) - await self.request.sock[1].awrite(frame) - - async def close(self): - """Close the websocket connection.""" - if not self.closed: # pragma: no cover - self.closed = True - await self.send(b'', self.CLOSE) - - def _handshake_response(self): - connection = False - upgrade = False - websocket_key = None - for header, value in self.request.headers.items(): - h = header.lower() - if h == 'connection': - connection = True - if 'upgrade' not in value.lower(): - return self.request.app.abort(400) - elif h == 'upgrade': - upgrade = True - if not value.lower() == 'websocket': - return self.request.app.abort(400) - elif h == 'sec-websocket-key': - websocket_key = value - if not connection or not upgrade or not websocket_key: - return self.request.app.abort(400) - d = hashlib.sha1(websocket_key.encode()) - d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11') - return binascii.b2a_base64(d.digest())[:-1] - - @classmethod - def _parse_frame_header(cls, header): - fin = header[0] & 0x80 - opcode = header[0] & 0x0f - if fin == 0 or opcode == cls.CONT: # pragma: no cover - raise WebSocketError('Continuation frames not supported') - has_mask = header[1] & 0x80 - length = header[1] & 0x7f - if length == 126: - length = -2 - elif length == 127: - length = -8 - return fin, opcode, has_mask, length - - def _process_websocket_frame(self, opcode, payload): - if opcode == self.TEXT: - payload = payload.decode() - elif opcode == self.BINARY: - pass - elif opcode == self.CLOSE: - raise WebSocketError('Websocket connection closed') - elif opcode == self.PING: - return self.PONG, payload - elif opcode == self.PONG: # pragma: no branch - return None, None - return None, payload - - @classmethod - def _encode_websocket_frame(cls, opcode, payload): - frame = bytearray() - frame.append(0x80 | opcode) - if opcode == cls.TEXT: - payload = payload.encode() - if len(payload) < 126: - frame.append(len(payload)) - elif len(payload) < (1 << 16): - frame.append(126) - frame.extend(len(payload).to_bytes(2, 'big')) - else: - frame.append(127) - frame.extend(len(payload).to_bytes(8, 'big')) - frame.extend(payload) - return frame - - async def _read_frame(self): - header = await self.request.sock[0].read(2) - if len(header) != 2: # pragma: no cover - raise WebSocketError('Websocket connection closed') - fin, opcode, has_mask, length = self._parse_frame_header(header) - if length == -2: - length = await self.request.sock[0].read(2) - length = int.from_bytes(length, 'big') - elif length == -8: - length = await self.request.sock[0].read(8) - length = int.from_bytes(length, 'big') - max_allowed_length = Request.max_body_length \ - if self.max_message_length == -1 else self.max_message_length - if length > max_allowed_length: - raise WebSocketError('Message too large') - if has_mask: # pragma: no cover - mask = await self.request.sock[0].read(4) - payload = await self.request.sock[0].read(length) - if has_mask: # pragma: no cover - payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload)) - return opcode, payload - - -async def websocket_upgrade(request): - """Upgrade a request handler to a websocket connection. - - This function can be called directly inside a route function to process a - WebSocket upgrade handshake, for example after the user's credentials are - verified. The function returns the websocket object:: - - @app.route('/echo') - async def echo(request): - if not authenticate_user(request): - abort(401) - ws = await websocket_upgrade(request) - while True: - message = await ws.receive() - await ws.send(message) - """ - ws = WebSocket(request) - await ws.handshake() - - @request.after_request - async def after_request(request, response): - return Response.already_handled - - return ws - - -def websocket_wrapper(f, upgrade_function): - @wraps(f) - async def wrapper(request, *args, **kwargs): - ws = await upgrade_function(request) - try: - await f(request, ws, *args, **kwargs) - except OSError as exc: - if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover - raise - except WebSocketError: - pass - except Exception as exc: - print_exception(exc) - finally: # pragma: no cover - try: - await ws.close() - except Exception: - pass - return Response.already_handled - return wrapper - - -def with_websocket(f): - """Decorator to make a route a WebSocket endpoint. - - This decorator is used to define a route that accepts websocket - connections. The route then receives a websocket object as a second - argument that it can use to send and receive messages:: - - @app.route('/echo') - @with_websocket - async def echo(request, ws): - while True: - message = await ws.receive() - await ws.send(message) - """ - return websocket_wrapper(f, websocket_upgrade) diff --git a/lib/utemplate/__init__.py b/lib/utemplate/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/utemplate/compiled.py b/lib/utemplate/compiled.py deleted file mode 100644 index 006e6f5..0000000 --- a/lib/utemplate/compiled.py +++ /dev/null @@ -1,14 +0,0 @@ -class Loader: - - def __init__(self, pkg, dir): - if dir == ".": - dir = "" - else: - dir = dir.replace("/", ".") + "." - if pkg and pkg != "__main__": - dir = pkg + "." + dir - self.p = dir - - def load(self, name): - name = name.replace(".", "_") - return __import__(self.p + name, None, None, (name,)).render \ No newline at end of file diff --git a/lib/utemplate/recompile.py b/lib/utemplate/recompile.py deleted file mode 100644 index b9bae4e..0000000 --- a/lib/utemplate/recompile.py +++ /dev/null @@ -1,21 +0,0 @@ -# (c) 2014-2020 Paul Sokolovsky. MIT license. -try: - from uos import stat, remove -except: - from os import stat, remove -from . import source - - -class Loader(source.Loader): - - def load(self, name): - o_path = self.pkg_path + self.compiled_path(name) - i_path = self.pkg_path + self.dir + "/" + name - try: - o_stat = stat(o_path) - i_stat = stat(i_path) - if i_stat[8] > o_stat[8]: - # input file is newer, remove output to force recompile - remove(o_path) - finally: - return super().load(name) \ No newline at end of file diff --git a/lib/utemplate/source.py b/lib/utemplate/source.py deleted file mode 100644 index 0ff4651..0000000 --- a/lib/utemplate/source.py +++ /dev/null @@ -1,188 +0,0 @@ -# (c) 2014-2019 Paul Sokolovsky. MIT license. -from . import compiled - - -class Compiler: - - START_CHAR = "{" - STMNT = "%" - STMNT_END = "%}" - EXPR = "{" - EXPR_END = "}}" - - def __init__(self, file_in, file_out, indent=0, seq=0, loader=None): - self.file_in = file_in - self.file_out = file_out - self.loader = loader - self.seq = seq - self._indent = indent - self.stack = [] - self.in_literal = False - self.flushed_header = False - self.args = "*a, **d" - - def indent(self, adjust=0): - if not self.flushed_header: - self.flushed_header = True - self.indent() - self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args)) - self.stack.append("def") - self.file_out.write(" " * (len(self.stack) + self._indent + adjust)) - - def literal(self, s): - if not s: - return - if not self.in_literal: - self.indent() - self.file_out.write('yield """') - self.in_literal = True - self.file_out.write(s.replace('"', '\\"')) - - def close_literal(self): - if self.in_literal: - self.file_out.write('"""\n') - self.in_literal = False - - def render_expr(self, e): - self.indent() - self.file_out.write('yield str(' + e + ')\n') - - def parse_statement(self, stmt): - tokens = stmt.split(None, 1) - if tokens[0] == "args": - if len(tokens) > 1: - self.args = tokens[1] - else: - self.args = "" - elif tokens[0] == "set": - self.indent() - self.file_out.write(stmt[3:].strip() + "\n") - elif tokens[0] == "include": - if not self.flushed_header: - # If there was no other output, we still need a header now - self.indent() - tokens = tokens[1].split(None, 1) - args = "" - if len(tokens) > 1: - args = tokens[1] - if tokens[0][0] == "{": - self.indent() - # "1" as fromlist param is uPy hack - self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2]) - self.indent() - self.file_out.write("yield from _.render(%s)\n" % args) - return - - with self.loader.input_open(tokens[0][1:-1]) as inc: - self.seq += 1 - c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq) - inc_id = self.seq - self.seq = c.compile() - self.indent() - self.file_out.write("yield from render%d(%s)\n" % (inc_id, args)) - elif len(tokens) > 1: - if tokens[0] == "elif": - assert self.stack[-1] == "if" - self.indent(-1) - self.file_out.write(stmt + ":\n") - else: - self.indent() - self.file_out.write(stmt + ":\n") - self.stack.append(tokens[0]) - else: - if stmt.startswith("end"): - assert self.stack[-1] == stmt[3:] - self.stack.pop(-1) - elif stmt == "else": - assert self.stack[-1] == "if" - self.indent(-1) - self.file_out.write("else:\n") - else: - assert False - - def parse_line(self, l): - while l: - start = l.find(self.START_CHAR) - if start == -1: - self.literal(l) - return - self.literal(l[:start]) - self.close_literal() - sel = l[start + 1] - #print("*%s=%s=" % (sel, EXPR)) - if sel == self.STMNT: - end = l.find(self.STMNT_END) - assert end > 0 - stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip() - self.parse_statement(stmt) - end += len(self.STMNT_END) - l = l[end:] - if not self.in_literal and l == "\n": - break - elif sel == self.EXPR: - # print("EXPR") - end = l.find(self.EXPR_END) - assert end > 0 - expr = l[start + len(self.START_CHAR + self.EXPR):end].strip() - self.render_expr(expr) - end += len(self.EXPR_END) - l = l[end:] - else: - self.literal(l[start]) - l = l[start + 1:] - - def header(self): - self.file_out.write("# Autogenerated file\n") - - def compile(self): - self.header() - for l in self.file_in: - self.parse_line(l) - self.close_literal() - return self.seq - - -class Loader(compiled.Loader): - - def __init__(self, pkg, dir): - super().__init__(pkg, dir) - self.dir = dir - if pkg == "__main__": - # if pkg isn't really a package, don't bother to use it - # it means we're running from "filesystem directory", not - # from a package. - pkg = None - - self.pkg_path = "" - if pkg: - p = __import__(pkg) - if isinstance(p.__path__, str): - # uPy - self.pkg_path = p.__path__ - else: - # CPy - self.pkg_path = p.__path__[0] - self.pkg_path += "/" - - def input_open(self, template): - path = self.pkg_path + self.dir + "/" + template - return open(path) - - def compiled_path(self, template): - return self.dir + "/" + template.replace(".", "_") + ".py" - - def load(self, name): - try: - return super().load(name) - except (OSError, ImportError): - pass - - compiled_path = self.pkg_path + self.compiled_path(name) - - f_in = self.input_open(name) - f_out = open(compiled_path, "w") - c = Compiler(f_in, f_out, loader=self) - c.compile() - f_in.close() - f_out.close() - return super().load(name) \ No newline at end of file diff --git a/src/boot.py b/src/boot.py deleted file mode 100644 index bc0859a..0000000 --- a/src/boot.py +++ /dev/null @@ -1,19 +0,0 @@ -import wifi -import time -from settings import Settings - -print(wifi.ap('qwerty')) - - -settings = Settings() -ssid = settings.get('wifi', {}).get('ssid', None) -password = settings.get('wifi', {}).get('password', None) -ip = settings.get('wifi', {}).get('ip', None) -gateway = settings.get('wifi', {}).get('gateway', None) - -# for i in range(10): -# config = wifi.connect(ssid, password, ip, gateway) -# if config: -# print(config) -# break -# time.sleep(0.1) \ No newline at end of file diff --git a/src/dma.py b/src/dma.py deleted file mode 100644 index 0e7b158..0000000 --- a/src/dma.py +++ /dev/null @@ -1,113 +0,0 @@ -# DMA driver for Raspberry Pi Pico 2 (RP2350) only. -from machine import Pin -from rp2 import PIO, StateMachine, asm_pio -import array -import uctypes -from uctypes import BF_POS, BF_LEN, UINT32, BFUINT32, struct - -PIO0_BASE = 0x50200000 -PIO1_BASE = 0x50300000 -PIO2_BASE = 0x50400000 -DMA_BASE = 0x50000000 -DMA_CHAN_WIDTH = 0x40 -DMA_CHAN_COUNT = 12 - -DMA_SIZE_BYTE = 0x0 -DMA_SIZE_HALFWORD = 0x1 -DMA_SIZE_WORD = 0x2 - -# RP2350 DMA CTRL_TRIG bit positions -DMA_CTRL_TRIG_FIELDS = { - "AHB_ERROR": 31< 0: - self.strip[i - 1] = (0, 0, 0) - # If it's the first pixel, ensure the last one from previous cycle is off (if applicable) - elif i == 0 and num_pixels > 1: # Only relevant if scanning backwards too - self.strip[last_pixel_index] = (0,0,0) - - - self.strip.write() # Write changes to the strip - if delay_ms > 0: - utime.sleep_ms(delay_ms) - - # Ensure the last pixel of the forward scan is turned off - if self.run and num_pixels > 0: - self.strip[last_pixel_index] = (0, 0, 0) - self.strip.write() # Write this final change - - - # --- Scan Backward (optional, remove this loop if you only want forward) --- - for i in range(num_pixels - 1, -1, -1): # From last_pixel_index down to 0 - if not self.run: - break - - # Turn on the current pixel - self.strip[i] = color - - # Turn off the next pixel (which was the previous one in reverse scan) - if i < last_pixel_index: - self.strip[i + 1] = (0, 0, 0) - # If it's the last pixel of the reverse scan, ensure the first one from previous cycle is off (if applicable) - elif i == last_pixel_index and num_pixels > 1: # Only relevant if scanning forward too - self.strip[0] = (0,0,0) - - self.strip.write() # Write changes to the strip - if delay_ms > 0: - utime.sleep_ms(delay_ms) - - # Ensure the first pixel of the backward scan is turned off - if self.run and num_pixels > 0: - self.strip[0] = (0, 0, 0) - self.strip.write() # Write this final change - - - def off(self): - print("Turning off LEDs.") - self.run = False - self.strip.fill((0,0,0)) - self.strip.write() - utime.sleep_ms(50) - -# Example Usage (for MicroPython on actual hardware): -# (Same as before, just removed from the main block for brevity) diff --git a/src/settings.py b/src/settings.py deleted file mode 100644 index 7ad5c81..0000000 --- a/src/settings.py +++ /dev/null @@ -1,53 +0,0 @@ -import json - -class Settings(dict): - SETTINGS_FILE = "/settings.json" - - def __init__(self): - super().__init__() - self.load() # Load settings from file during initialization - - def set_defaults(self): - self["num_leds"] = 50 - self["selected_pattern"] = "blink" - self["color1"] = "#000f00" - self["color2"] = "#0f0000" - self["delay"] = 100 - self["brightness"] = 100 - self["wifi"] = {"ssid": "", "password": ""} - - def save(self): - try: - j = json.dumps(self) - with open(self.SETTINGS_FILE, 'w') as file: - file.write(j) - print("Settings saved successfully.") - except Exception as e: - print(f"Error saving settings: {e}") - - def load(self): - try: - with open(self.SETTINGS_FILE, 'r') as file: - loaded_settings = json.load(file) - self.update(loaded_settings) - print("Settings loaded successfully.") - except Exception as e: - print(f"Error loading settings") - self.set_defaults() - -# Example usage -def main(): - settings = Settings() - print(f"Number of LEDs: {settings['num_leds']}") - settings['num_leds'] = 100 - print(f"Updated number of LEDs: {settings['num_leds']}") - settings.save() - - # Create a new Settings object to test loading - new_settings = Settings() - print(f"Loaded number of LEDs: {new_settings['num_leds']}") - print(settings) - -# Run the example -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/src/static/main.css b/src/static/main.css deleted file mode 100644 index 0c56bb7..0000000 --- a/src/static/main.css +++ /dev/null @@ -1,75 +0,0 @@ -body { - font-family: Arial, sans-serif; - max-width: 600px; - margin: 0 auto; - padding: 20px; - line-height: 1.6; - } - h1 { - text-align: center; - } - - form { - margin-bottom: 20px; - } - label { - display: block; - margin-bottom: 5px; - } - input[type="text"], input[type="submit"], input[type="range"], input[type="color"] { - width: 100%; - - margin-bottom: 10px; - box-sizing: border-box; - } - input[type="range"] { - -webkit-appearance: none; - appearance: none; - height: 25px; - background: #d3d3d3; - outline: none; - opacity: 0.7; - transition: opacity .2s; - } - input[type="range"]:hover { - opacity: 1; - } - input[type="range"]::-webkit-slider-thumb { - -webkit-appearance: none; - appearance: none; - width: 25px; - height: 25px; - background: #4CAF50; - cursor: pointer; - border-radius: 50%; - } - input[type="range"]::-moz-range-thumb { - width: 25px; - height: 25px; - background: #4CAF50; - cursor: pointer; - border-radius: 50%; - } - #pattern_buttons { - display: flex; - flex-wrap: wrap; - gap: 10px; - margin-bottom: 20px; - } - #pattern_buttons button { - flex: 1 0 calc(33.333% - 10px); - padding: 10px; - background-color: #4CAF50; - color: white; - border: none; - cursor: pointer; - transition: background-color 0.3s; - } - #pattern_buttons button:hover { - background-color: #45a049; - } - @media (max-width: 480px) { - #pattern_buttons button { - flex: 1 0 calc(50% - 10px); - } - } \ No newline at end of file diff --git a/src/static/main.js b/src/static/main.js deleted file mode 100644 index 4cdcc55..0000000 --- a/src/static/main.js +++ /dev/null @@ -1,147 +0,0 @@ -let delayTimeout; -let brightnessTimeout; -let colorTimeout; -let color2Timeout; - -async function post(path, data) { - console.log(`POST to ${path}`, data); - try { - const response = await fetch(path, { - method: "POST", - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify(data) // Convert data to JSON string - }); - if (!response.ok) { - throw new Error(`HTTP error! Status: ${response.status}`); - } - } catch (error) { - console.error('Error during POST request:', error); - } -} - -async function get(path) { - try { - const response = await fetch(path); - if (!response.ok) { - throw new Error(`HTTP error! Status: ${response.status}`); - } - return await response.json(); // Assuming you are expecting JSON response - } catch (error) { - console.error('Error during GET request:', error); - } -} - -async function updateColor(event) { - event.preventDefault(); - clearTimeout(colorTimeout); - colorTimeout = setTimeout(async function() { - const color = document.getElementById('color').value; - await post("/color", { color }); // Send as JSON - }, 500); -} - -async function updateColor2(event) { - event.preventDefault(); - clearTimeout(color2Timeout); - color2Timeout = setTimeout(async function() { - const color = document.getElementById('color2').value; - await post("/color2", { color }); // Send as JSON - }, 500); -} - -async function updatePattern(pattern) { - event.preventDefault(); - await post("/pattern", { pattern }); // Send as JSON -} - -async function updateBrightness(event) { - event.preventDefault(); - clearTimeout(brightnessTimeout); - brightnessTimeout = setTimeout(async function() { - const brightness = document.getElementById('brightness').value; - await post('/brightness', { brightness }); // Send as JSON - }, 500); -} - -async function updateDelay(event) { - event.preventDefault(); - clearTimeout(delayTimeout); - delayTimeout = setTimeout(async function() { - const delay = document.getElementById('delay').value; - await post('/delay', { delay }); // Send as JSON - }, 500); -} - -async function updateNumLeds(event) { - event.preventDefault(); - const numLeds = document.getElementById('num_leds').value; - await post('/num_leds', { num_leds: numLeds }); // Send as JSON -} - -async function updateWifi(event) { - event.preventDefault(); - const ssid = document.getElementById('ssid').value; - const password = document.getElementById('password').value; - const ip = document.getElementById('ip').value; - const gateway = document.getElementById('gateway').value; - - const wifiSettings = { ssid, password, ip, gateway }; // Create JSON object - console.log(wifiSettings); - const response = await post('/wifi_settings', wifiSettings); // Send as JSON - if (response === 500) { - alert("Failed to connect to Wi-Fi"); - } -} - -function createPatternButtons(patterns) { - const container = document.getElementById('pattern_buttons'); - container.innerHTML = ''; // Clear previous buttons - - patterns.forEach(pattern => { - const button = document.createElement('button'); - button.type = 'button'; // Use 'button' instead of 'submit' - button.textContent = pattern; - button.value = pattern; - button.addEventListener('click', async function(event) { - event.preventDefault(); - await updatePattern(pattern); - }); - container.appendChild(button); - }); -} - -document.addEventListener('DOMContentLoaded', async function() { - document.getElementById('color').addEventListener('input', updateColor); - document.getElementById('color2').addEventListener('input', updateColor2); - document.getElementById('delay').addEventListener('input', updateDelay); - document.getElementById('brightness').addEventListener('input', updateBrightness); - document.getElementById('num_leds_form').addEventListener('submit', updateNumLeds); - document.getElementById('wifi_form').addEventListener('submit', updateWifi); - document.getElementById('delay').addEventListener('touchend', updateDelay); - document.getElementById('brightness').addEventListener('touchend', updateBrightness); - - document.querySelectorAll(".pattern_button").forEach(button => { - console.log(button.value); - button.addEventListener('click', async event => { - event.preventDefault(); - await updatePattern(button.value); - }); - }); -}); - -// Function to toggle the display of the settings menu -function selectSettings() { - const settingsMenu = document.getElementById('settings_menu'); - controls = document.getElementById('controls'); - settingsMenu.style.display = 'block'; - controls.style.display = 'none'; -} - -function selectControls() { - const settingsMenu = document.getElementById('settings_menu'); - controls = document.getElementById('controls'); - settingsMenu.style.display = 'none'; - controls.style.display = 'block'; -} diff --git a/src/templates/index.html b/src/templates/index.html deleted file mode 100644 index 6063d30..0000000 --- a/src/templates/index.html +++ /dev/null @@ -1,41 +0,0 @@ -{% args settings, patterns %} - - - - - - LED Control - - - - -

Control LEDs

- - - - -
-
- {% for p in patterns %} - - {% endfor %} - - -
-
- - -
-
- - -
-
- -
-
- -
-
- - diff --git a/src/templates/index_html.py b/src/templates/index_html.py deleted file mode 100644 index 8ba57fe..0000000 --- a/src/templates/index_html.py +++ /dev/null @@ -1,94 +0,0 @@ -# Autogenerated file -def render(settings, patterns): - yield """ - - - - - LED Control - - - - -

Control LEDs

- - - - -
-
- """ - for p in patterns: - yield """ - """ - yield """ - -
-
- - -
-
- - -
-
- -
-
- -
-
- - - -
-

Settings

- - -
- - - -
- - -
- - -
- - -
- - -
- - -
- -
-
- - -""" diff --git a/src/web.py b/src/web.py deleted file mode 100644 index d4d2d4f..0000000 --- a/src/web.py +++ /dev/null @@ -1,101 +0,0 @@ -from microdot import Microdot, send_file, Response -from microdot.utemplate import Template -from microdot.websocket import with_websocket - -import json -import wifi - -def web(settings, patterns, patterns2): - app = Microdot() - Response.default_content_type = 'text/html' - - @app.route('/') - async def index(request): - return Template('/index.html').render(settings=settings, patterns=patterns.patterns.keys()) - - @app.route("/static/") - def static(request, path): - if '..' in path: - # Directory traversal is not allowed - return 'Not found', 404 - return send_file('static/' + path) - - @app.post("/pattern") - def pattern(request): - try: - data = json.loads(request.body.decode('utf-8')) - pattern = data["pattern"] - if patterns.select(pattern): - patterns2.select(pattern) - settings["selected_pattern"] = pattern - settings.save() - return "OK", 200 - else: - return "Bad request", 400 - except (KeyError, json.JSONDecodeError): - return "Bad request", 400 - - @app.post("/delay") - def delay(request): - try: - data = json.loads(request.body.decode('utf-8')) - delay = int(data["delay"]) - patterns.set_delay(delay) - patterns2.set_delay(delay) - settings["delay"] = delay - settings.save() - return "OK", 200 - except (ValueError, KeyError, json.JSONDecodeError): - return "Bad request", 400 - - @app.post("/brightness") - def brightness(request): - try: - data = json.loads(request.body.decode('utf-8')) - brightness = int(data["brightness"]) - patterns.set_brightness(brightness) - patterns2.set_brightness(brightness) - settings["brightness"] = brightness - settings.save() - return "OK", 200 - except (ValueError, KeyError, json.JSONDecodeError): - return "Bad request", 400 - - @app.post("/color") - def color(request): - try: - data = json.loads(request.body.decode('utf-8')) - color = data["color"] - patterns.set_color1(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB - patterns2.set_color1(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB - settings["color1"] = color - settings.save() - return "OK", 200 - except (KeyError, json.JSONDecodeError, ValueError): - return "Bad request", 400 - - @app.post("/color2") - def color2(request): - try: - data = json.loads(request.body.decode('utf-8')) - color = data["color2"] - patterns.set_color2(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB - patterns2.set_color2(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB - settings["color2"] = color - settings.save() - return "OK", 200 - except (KeyError, json.JSONDecodeError, ValueError): - return "Bad request", 400 - - @app.route("/external") - @with_websocket - async def ws(request, ws): - patterns.select("external") - while True: - data = await ws.receive() - print(data) - for i in range(min(patterns.num_leds, int(len(data)/3))): - patterns.set(i, (data[i*3], data[i*3+1], data[i*3+2])) - patterns.write() - - return app diff --git a/src/wifi.py b/src/wifi.py deleted file mode 100644 index 32a9036..0000000 --- a/src/wifi.py +++ /dev/null @@ -1,46 +0,0 @@ -import network -from machine import Pin -from time import sleep -import ubinascii -from settings import Settings - -def connect(ssid, password, ip, gateway): - if ssid is None or password is None: - print("Missing ssid or password") - return None - try: - sta_if = network.WLAN(network.STA_IF) - if ip is not None and gateway is not None: - sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1')) - if not sta_if.isconnected(): - print('connecting to network...') - sta_if.active(True) - sta_if.connect(ssid, password) - sleep(0.1) - if sta_if.isconnected(): - return sta_if.ifconfig() - return None - return sta_if.ifconfig() - except Exception as e: - print(f"Failed to connect to wifi {e}") - return None - - -def ap(password): - ap_if = network.WLAN(network.AP_IF) - ap_mac = ap_if.config('mac') - ssid = f"led-{ubinascii.hexlify(ap_mac).decode()}" - print(ssid) - ap_if.active(True) - ap_if.config(essid=ssid, password="qwerty1234") - ap_if.active(False) - ap_if.active(True) - print(ap_if.ifconfig()) - - - - - - - - diff --git a/src/ws2812.py b/src/ws2812.py deleted file mode 100644 index c9d9c2f..0000000 --- a/src/ws2812.py +++ /dev/null @@ -1,68 +0,0 @@ - -import array, time -from machine import Pin -import rp2 -from time import sleep -import dma - -@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=8) -def ws2812(): - T1 = 2 - T2 = 5 - T3 = 3 - wrap_target() - label("bitloop") - out(x, 1) .side(0) [T3 - 1] - jmp(not_x, "do_zero") .side(1) [T1 - 1] - jmp("bitloop") .side(1) [T2 - 1] - label("do_zero") - nop() .side(0) [T2 - 1] - wrap() - -class WS2812B: - def __init__(self, num_leds, pin, state_machine, brightness=0.1, invert=False): - self.sm = rp2.StateMachine(state_machine, ws2812, freq=8_000_000, sideset_base=Pin(pin)) - self.sm.active(1) - self.ar = bytearray(num_leds*3) - self.num_leds = num_leds - self.brightness = brightness - self.invert = invert - self.pio_dma = dma.PIO_DMA_Transfer(state_machine+4, state_machine, 8, num_leds*3) - - def show(self): - self.pio_dma.start_transfer(self.ar) - - def set(self, i, color): - self.ar[i*3] = int(color[1]*self.brightness) - self.ar[i*3+1] = int(color[0]*self.brightness) - self.ar[i*3+2] = int(color[2]*self.brightness) - - def fill(self, color): - for i in range(self.num_leds): - self.set(i, color) - - def busy(self): - return self.pio_dma.busy() - - BLACK = (0, 0, 0) - RED = (255, 0, 0) - YELLOW = (255, 150, 0) - GREEN = (0, 255, 0) - CYAN = (0, 255, 255) - BLUE = (0, 0, 255) - PURPLE = (180, 0, 255) - WHITE = (255, 255, 255) - COLORS = (BLACK, RED, YELLOW, GREEN, CYAN, BLUE, PURPLE, WHITE) - -if __name__ == "__main__": - num_leds, pin, sm, brightness = 10, 0, 0, 1 - ws0 = WS2812B(num_leds, pin, sm, brightness) - while True: - for color in ws0.COLORS: - ws0.fill(color) - ws0.show() - time.sleep(1) - - - - diff --git a/test.py b/test.py deleted file mode 100644 index a3ea3c6..0000000 --- a/test.py +++ /dev/null @@ -1 +0,0 @@ -print("Hello") \ No newline at end of file