1 Commits

Author SHA1 Message Date
ddbf4a00f4 Add run script 2025-09-08 23:17:50 +12:00
15 changed files with 781 additions and 3129 deletions

14
Pipfile
View File

@@ -5,13 +5,6 @@ name = "pypi"
[packages] [packages]
websockets = "*" websockets = "*"
watchfiles = "*"
async-tkinter-loop = "*"
mido = "*"
python-rtmidi = "*"
pyaudio = "*"
aubio = "*"
websocket-client = "*"
[dev-packages] [dev-packages]
@@ -19,8 +12,5 @@ websocket-client = "*"
python_version = "3.12" python_version = "3.12"
[scripts] [scripts]
ui = "python src/ui_client.py" run = 'watchfiles "python main.py" main.py networking.py settings.py color_utils.py'
control = "python src/control_server.py"
sound = "python src/sound.py"
dev-ui = 'watchfiles "python src/ui_client.py" src'
dev-control = 'watchfiles "python src/control_server.py" src'

321
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "4aaef0c08e86d190f036736e98ff0e932788c7b461e725840de5699a8758b9d5" "sha256": "f7607d6d57851b07acb91db3698117d8af9ab03600cda23fd56c9ab927904d31"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@@ -16,325 +16,6 @@
] ]
}, },
"default": { "default": {
"anyio": {
"hashes": [
"sha256:3f3fae35c96039744587aa5b8371e7e8e603c0702999535961dd336026973ba6",
"sha256:60e474ac86736bbfd6f210f7a61218939c318f43f9972497381f1c5e930ed3d1"
],
"markers": "python_version >= '3.9'",
"version": "==4.10.0"
},
"async-tkinter-loop": {
"hashes": [
"sha256:4c69f46ffdbac48dc44c296c3a7b00c4cc4f852c3d43aa2fa329991d1fadea02",
"sha256:509c418139847bcb2e47a5a6b6d24a2e2dca290bc468dad6b6b8029e8a865bfd"
],
"index": "pypi",
"markers": "python_version >= '3.8' and python_version < '4.0'",
"version": "==0.9.3"
},
"aubio": {
"hashes": [
"sha256:df1244f6c4cf5bea382c8c2d35aa43bc31f4cf631fe325ae3992c219546a4202"
],
"index": "pypi",
"version": "==0.4.9"
},
"idna": {
"hashes": [
"sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9",
"sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"
],
"markers": "python_version >= '3.6'",
"version": "==3.10"
},
"mido": {
"hashes": [
"sha256:01033c9b10b049e4436fca2762194ca839b09a4334091dd3c34e7f4ae674fd8a",
"sha256:1aecb30b7f282404f17e43768cbf74a6a31bf22b3b783bdd117a1ce9d22cb74c"
],
"index": "pypi",
"markers": "python_version ~= '3.7'",
"version": "==1.3.3"
},
"numpy": {
"hashes": [
"sha256:07b62978075b67eee4065b166d000d457c82a1efe726cce608b9db9dd66a73a5",
"sha256:087ffc25890d89a43536f75c5fe8770922008758e8eeeef61733957041ed2f9b",
"sha256:092aeb3449833ea9c0bf0089d70c29ae480685dd2377ec9cdbbb620257f84631",
"sha256:095737ed986e00393ec18ec0b21b47c22889ae4b0cd2d5e88342e08b01141f58",
"sha256:0a4f2021a6da53a0d580d6ef5db29947025ae8b35b3250141805ea9a32bbe86b",
"sha256:103ea7063fa624af04a791c39f97070bf93b96d7af7eb23530cd087dc8dbe9dc",
"sha256:11e58218c0c46c80509186e460d79fbdc9ca1eb8d8aee39d8f2dc768eb781089",
"sha256:122bf5ed9a0221b3419672493878ba4967121514b1d7d4656a7580cd11dddcbf",
"sha256:14a91ebac98813a49bc6aa1a0dfc09513dcec1d97eaf31ca21a87221a1cdcb15",
"sha256:1f91e5c028504660d606340a084db4b216567ded1056ea2b4be4f9d10b67197f",
"sha256:20b8200721840f5621b7bd03f8dcd78de33ec522fc40dc2641aa09537df010c3",
"sha256:240259d6564f1c65424bcd10f435145a7644a65a6811cfc3201c4a429ba79170",
"sha256:2738534837c6a1d0c39340a190177d7d66fdf432894f469728da901f8f6dc910",
"sha256:27c9f90e7481275c7800dc9c24b7cc40ace3fdb970ae4d21eaff983a32f70c91",
"sha256:293b2192c6bcce487dbc6326de5853787f870aeb6c43f8f9c6496db5b1781e45",
"sha256:2c3271cc4097beb5a60f010bcc1cc204b300bb3eafb4399376418a83a1c6373c",
"sha256:2f4f0215edb189048a3c03bd5b19345bdfa7b45a7a6f72ae5945d2a28272727f",
"sha256:3dcf02866b977a38ba3ec10215220609ab9667378a9e2150615673f3ffd6c73b",
"sha256:4209f874d45f921bde2cff1ffcd8a3695f545ad2ffbef6d3d3c6768162efab89",
"sha256:448a66d052d0cf14ce9865d159bfc403282c9bc7bb2a31b03cc18b651eca8b1a",
"sha256:4ae6863868aaee2f57503c7a5052b3a2807cf7a3914475e637a0ecd366ced220",
"sha256:4d002ecf7c9b53240be3bb69d80f86ddbd34078bae04d87be81c1f58466f264e",
"sha256:4e6ecfeddfa83b02318f4d84acf15fbdbf9ded18e46989a15a8b6995dfbf85ab",
"sha256:508b0eada3eded10a3b55725b40806a4b855961040180028f52580c4729916a2",
"sha256:546aaf78e81b4081b2eba1d105c3b34064783027a06b3ab20b6eba21fb64132b",
"sha256:572d5512df5470f50ada8d1972c5f1082d9a0b7aa5944db8084077570cf98370",
"sha256:5ad4ebcb683a1f99f4f392cc522ee20a18b2bb12a2c1c42c3d48d5a1adc9d3d2",
"sha256:66459dccc65d8ec98cc7df61307b64bf9e08101f9598755d42d8ae65d9a7a6ee",
"sha256:6936aff90dda378c09bea075af0d9c675fe3a977a9d2402f95a87f440f59f619",
"sha256:69779198d9caee6e547adb933941ed7520f896fd9656834c300bdf4dd8642712",
"sha256:6f1ae3dcb840edccc45af496f312528c15b1f79ac318169d094e85e4bb35fdf1",
"sha256:71669b5daae692189540cffc4c439468d35a3f84f0c88b078ecd94337f6cb0ec",
"sha256:72c6df2267e926a6d5286b0a6d556ebe49eae261062059317837fda12ddf0c1a",
"sha256:72dbebb2dcc8305c431b2836bcc66af967df91be793d63a24e3d9b741374c450",
"sha256:754d6755d9a7588bdc6ac47dc4ee97867271b17cee39cb87aef079574366db0a",
"sha256:76c3e9501ceb50b2ff3824c3589d5d1ab4ac857b0ee3f8f49629d0de55ecf7c2",
"sha256:7a0e27186e781a69959d0230dd9909b5e26024f8da10683bd6344baea1885168",
"sha256:7d6e390423cc1f76e1b8108c9b6889d20a7a1f59d9a60cac4a050fa734d6c1e2",
"sha256:8145dd6d10df13c559d1e4314df29695613575183fa2e2d11fac4c208c8a1f73",
"sha256:8446acd11fe3dc1830568c941d44449fd5cb83068e5c70bd5a470d323d448296",
"sha256:852ae5bed3478b92f093e30f785c98e0cb62fa0a939ed057c31716e18a7a22b9",
"sha256:87c930d52f45df092f7578889711a0768094debf73cfcde105e2d66954358125",
"sha256:8b1224a734cd509f70816455c3cffe13a4f599b1bf7130f913ba0e2c0b2006c0",
"sha256:8dc082ea901a62edb8f59713c6a7e28a85daddcb67454c839de57656478f5b19",
"sha256:906a30249315f9c8e17b085cc5f87d3f369b35fedd0051d4a84686967bdbbd0b",
"sha256:938065908d1d869c7d75d8ec45f735a034771c6ea07088867f713d1cd3bbbe4f",
"sha256:9c144440db4bf3bb6372d2c3e49834cc0ff7bb4c24975ab33e01199e645416f2",
"sha256:9e196ade2400c0c737d93465327d1ae7c06c7cb8a1756121ebf54b06ca183c7f",
"sha256:a3ef07ec8cbc8fc9e369c8dcd52019510c12da4de81367d8b20bc692aa07573a",
"sha256:a7af9ed2aa9ec5950daf05bb11abc4076a108bd3c7db9aa7251d5f107079b6a6",
"sha256:a9f66e7d2b2d7712410d3bc5684149040ef5f19856f20277cd17ea83e5006286",
"sha256:aa098a5ab53fa407fded5870865c6275a5cd4101cfdef8d6fafc48286a96e981",
"sha256:af58de8745f7fa9ca1c0c7c943616c6fe28e75d0c81f5c295810e3c83b5be92f",
"sha256:b05a89f2fb84d21235f93de47129dd4f11c16f64c87c33f5e284e6a3a54e43f2",
"sha256:b5e40e80299607f597e1a8a247ff8d71d79c5b52baa11cc1cce30aa92d2da6e0",
"sha256:b9d0878b21e3918d76d2209c924ebb272340da1fb51abc00f986c258cd5e957b",
"sha256:bc3186bea41fae9d8e90c2b4fb5f0a1f5a690682da79b92574d63f56b529080b",
"sha256:c63d95dc9d67b676e9108fe0d2182987ccb0f11933c1e8959f42fa0da8d4fa56",
"sha256:c771cfac34a4f2c0de8e8c97312d07d64fd8f8ed45bc9f5726a7e947270152b5",
"sha256:c8d9727f5316a256425892b043736d63e89ed15bbfe6556c5ff4d9d4448ff3b3",
"sha256:cbc95b3813920145032412f7e33d12080f11dc776262df1712e1638207dde9e8",
"sha256:cefc2219baa48e468e3db7e706305fcd0c095534a192a08f31e98d83a7d45fb0",
"sha256:d95f59afe7f808c103be692175008bab926b59309ade3e6d25009e9a171f7036",
"sha256:dd937f088a2df683cbb79dda9a772b62a3e5a8a7e76690612c2737f38c6ef1b6",
"sha256:de6ea4e5a65d5a90c7d286ddff2b87f3f4ad61faa3db8dabe936b34c2275b6f8",
"sha256:e0486a11ec30cdecb53f184d496d1c6a20786c81e55e41640270130056f8ee48",
"sha256:ee807923782faaf60d0d7331f5e86da7d5e3079e28b291973c545476c2b00d07",
"sha256:efc81393f25f14d11c9d161e46e6ee348637c0a1e8a54bf9dedc472a3fae993b",
"sha256:f0a1a8476ad77a228e41619af2fa9505cf69df928e9aaa165746584ea17fed2b",
"sha256:f75018be4980a7324edc5930fe39aa391d5734531b1926968605416ff58c332d",
"sha256:f92d6c2a8535dc4fe4419562294ff957f83a16ebdec66df0805e473ffaad8bd0",
"sha256:fb1752a3bb9a3ad2d6b090b88a9a0ae1cd6f004ef95f75825e2f382c183b2097",
"sha256:fc927d7f289d14f5e037be917539620603294454130b6de200091e23d27dc9be",
"sha256:fed5527c4cf10f16c6d0b6bee1f89958bccb0ad2522c8cadc2efd318bcd545f5"
],
"markers": "python_version >= '3.11'",
"version": "==2.3.2"
},
"packaging": {
"hashes": [
"sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484",
"sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f"
],
"markers": "python_version >= '3.8'",
"version": "==25.0"
},
"pyaudio": {
"hashes": [
"sha256:009f357ee5aa6bc8eb19d69921cd30e98c42cddd34210615d592a71d09c4bd57",
"sha256:126065b5e82a1c03ba16e7c0404d8f54e17368836e7d2d92427358ad44fefe61",
"sha256:12f2f1ba04e06ff95d80700a78967897a489c05e093e3bffa05a84ed9c0a7fa3",
"sha256:2a166fc88d435a2779810dd2678354adc33499e9d4d7f937f28b20cc55893e83",
"sha256:2dac0d6d675fe7e181ba88f2de88d321059b69abd52e3f4934a8878e03a7a074",
"sha256:506b32a595f8693811682ab4b127602d404df7dfc453b499c91a80d0f7bad289",
"sha256:5fce4bcdd2e0e8c063d835dbe2860dac46437506af509353c7f8114d4bacbd5b",
"sha256:692d8c1446f52ed2662120bcd9ddcb5aa2b71f38bda31e58b19fb4672fffba69",
"sha256:78dfff3879b4994d1f4fc6485646a57755c6ee3c19647a491f790a0895bd2f87",
"sha256:858caf35b05c26d8fc62f1efa2e8f53d5fa1a01164842bd622f70ddc41f55000",
"sha256:95328285b4dab57ea8c52a4a996cb52be6d629353315be5bfda403d15932a497",
"sha256:bbeb01d36a2f472ae5ee5e1451cacc42112986abe622f735bb870a5db77cf903",
"sha256:f745109634a7c19fa4d6b8b7d6967c3123d988c9ade0cd35d4295ee1acdb53e9"
],
"index": "pypi",
"version": "==0.2.14"
},
"python-rtmidi": {
"hashes": [
"sha256:052c89933cae4fca354012d8ca7248f4f9e1e3f062471409d48415a7f7d7e59e",
"sha256:1d5da765184150fb946043d59be4039b36a8060ede025f109ef20492dbf99075",
"sha256:25f5a5db7be98911c41ca5bebb262fcf9a7c89600b88fd3c207ceafd3101e721",
"sha256:26149186367341bf5b0a3ac17b495f6a25950bd3da6b4f13d25ac0a9ce8208dd",
"sha256:271d625c489fffb39b3edc5aba67f7c8e29a04a0a0f056ce19e5a888a08b4c59",
"sha256:29661939f9b7bd1a4e29835f50f4790e741dacd21a5cb143297aefb51deefdec",
"sha256:29d9c9d9f82ce679fecad7bb4cb79f3a24574ea84600e377194b4cc1baacec0e",
"sha256:30d117193dcad8af67c600c405f53eb096e4ff84849760be14c97270af334922",
"sha256:46bbf32c8a4bf6c8f0df1c02a68689d0757f13cb7a69f27ccbbed3d7b2365918",
"sha256:4e234dca7f9d783dd3f1e9c9c5c2f295f02b7af3085301d6eed3b428cf49d327",
"sha256:5443634597eb340cdec0734f76267a827c2d366f00a6f9195141c78828016ac2",
"sha256:5966172ed28add6ff2b76d389702931bfc7ff3cc741c0e4b0d1aaae269ab7a8e",
"sha256:7bce7f17c71a71d8ef0bfeae3cb8a7652dd02f0d5067de882e1ee44eb38518db",
"sha256:7f9ade68b068ae09000ecb562ae9521da3a234361ad5449e83fc734544d004fa",
"sha256:82e61bc1b51aa91d9e615827056e80f78dbe364248eecd61698b233f7af903f6",
"sha256:844bd12840c9d4e03dfc89b2cd57c55dcbf5ed7246504d69c6c661732249b19c",
"sha256:878ce085dfb65c0974810a7e919f73708cbb4c0430c7924b78f25aea1dd4ebee",
"sha256:8bbaf7c7164471712a93ac60c8f9ed146b336a294a5103223bbaf8f10709a0bf",
"sha256:a5582983ad57ea7f0a7797ddc3e258efb00f8326113b6ddfa85b5165a4151806",
"sha256:a706e9850e22acc57fa840c60fdc4541baafe462a05ff7631a6d9eb91c65e171",
"sha256:c60dd180e5130fb87571e71aea30e2ef0512131aab45865a7d67063ed8e52ca4",
"sha256:cec30924e305f55284594ccf35a71dee7216fd308dfa2dec1b3ed03e6f243803",
"sha256:cfea32c91752fa7aecfe3d6827535c190ba0e646a9accd6604f4fc70cf4b780f",
"sha256:dd2bcbea822488fca6b8d9fc7e78a91da12914f3b88dc086f051cb65a643449f",
"sha256:efc07413b30b0039c0d35abe25a81d740c7405124eb58eed141a8f24388e6fe0",
"sha256:f2138005c6bd3d8b9af05df383679f6d0827d16056e68a941110732310dcb7dd"
],
"index": "pypi",
"markers": "python_version >= '3.8'",
"version": "==1.5.8"
},
"sniffio": {
"hashes": [
"sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2",
"sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"
],
"markers": "python_version >= '3.7'",
"version": "==1.3.1"
},
"typing-extensions": {
"hashes": [
"sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466",
"sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548"
],
"markers": "python_version >= '3.9'",
"version": "==4.15.0"
},
"watchfiles": {
"hashes": [
"sha256:00645eb79a3faa70d9cb15c8d4187bb72970b2470e938670240c7998dad9f13a",
"sha256:04e4ed5d1cd3eae68c89bcc1a485a109f39f2fd8de05f705e98af6b5f1861f1f",
"sha256:0a7d40b77f07be87c6faa93d0951a0fcd8cbca1ddff60a1b65d741bac6f3a9f6",
"sha256:0ece16b563b17ab26eaa2d52230c9a7ae46cf01759621f4fbbca280e438267b3",
"sha256:11ee4444250fcbeb47459a877e5e80ed994ce8e8d20283857fc128be1715dac7",
"sha256:12b0a02a91762c08f7264e2e79542f76870c3040bbc847fb67410ab81474932a",
"sha256:12fe8eaffaf0faa7906895b4f8bb88264035b3f0243275e0bf24af0436b27259",
"sha256:130fc497b8ee68dce163e4254d9b0356411d1490e868bd8790028bc46c5cc297",
"sha256:17ab167cca6339c2b830b744eaf10803d2a5b6683be4d79d8475d88b4a8a4be1",
"sha256:199207b2d3eeaeb80ef4411875a6243d9ad8bc35b07fc42daa6b801cc39cc41c",
"sha256:20ecc8abbd957046f1fe9562757903f5eaf57c3bce70929fda6c7711bb58074a",
"sha256:239736577e848678e13b201bba14e89718f5c2133dfd6b1f7846fa1b58a8532b",
"sha256:249590eb75ccc117f488e2fabd1bfa33c580e24b96f00658ad88e38844a040bb",
"sha256:27f30e14aa1c1e91cb653f03a63445739919aef84c8d2517997a83155e7a2fcc",
"sha256:29e7bc2eee15cbb339c68445959108803dc14ee0c7b4eea556400131a8de462b",
"sha256:328dbc9bff7205c215a7807da7c18dce37da7da718e798356212d22696404339",
"sha256:32d6d4e583593cb8576e129879ea0991660b935177c0f93c6681359b3654bfa9",
"sha256:3366f56c272232860ab45c77c3ca7b74ee819c8e1f6f35a7125556b198bbc6df",
"sha256:3434e401f3ce0ed6b42569128b3d1e3af773d7ec18751b918b89cd49c14eaafb",
"sha256:37d3d3f7defb13f62ece99e9be912afe9dd8a0077b7c45ee5a57c74811d581a4",
"sha256:3a6fd40bbb50d24976eb275ccb55cd1951dfb63dbc27cae3066a6ca5f4beabd5",
"sha256:3aba215958d88182e8d2acba0fdaf687745180974946609119953c0e112397dc",
"sha256:406520216186b99374cdb58bc48e34bb74535adec160c8459894884c983a149c",
"sha256:4281cd9fce9fc0a9dbf0fc1217f39bf9cf2b4d315d9626ef1d4e87b84699e7e8",
"sha256:42f92befc848bb7a19658f21f3e7bae80d7d005d13891c62c2cd4d4d0abb3433",
"sha256:48aa25e5992b61debc908a61ab4d3f216b64f44fdaa71eb082d8b2de846b7d12",
"sha256:5007f860c7f1f8df471e4e04aaa8c43673429047d63205d1630880f7637bca30",
"sha256:50a51a90610d0845a5931a780d8e51d7bd7f309ebc25132ba975aca016b576a0",
"sha256:51556d5004887045dba3acdd1fdf61dddea2be0a7e18048b5e853dcd37149b86",
"sha256:51b81e55d40c4b4aa8658427a3ee7ea847c591ae9e8b81ef94a90b668999353c",
"sha256:5366164391873ed76bfdf618818c82084c9db7fac82b64a20c44d335eec9ced5",
"sha256:54062ef956807ba806559b3c3d52105ae1827a0d4ab47b621b31132b6b7e2866",
"sha256:60022527e71d1d1fda67a33150ee42869042bce3d0fcc9cc49be009a9cded3fb",
"sha256:622d6b2c06be19f6e89b1d951485a232e3b59618def88dbeda575ed8f0d8dbf2",
"sha256:62cc7a30eeb0e20ecc5f4bd113cd69dcdb745a07c68c0370cea919f373f65d9e",
"sha256:693ed7ec72cbfcee399e92c895362b6e66d63dac6b91e2c11ae03d10d503e575",
"sha256:6d2404af8db1329f9a3c9b79ff63e0ae7131986446901582067d9304ae8aaf7f",
"sha256:7049e52167fc75fc3cc418fc13d39a8e520cbb60ca08b47f6cedb85e181d2f2a",
"sha256:7080c4bb3efd70a07b1cc2df99a7aa51d98685be56be6038c3169199d0a1c69f",
"sha256:7738027989881e70e3723c75921f1efa45225084228788fc59ea8c6d732eb30d",
"sha256:7a7bd57a1bb02f9d5c398c0c1675384e7ab1dd39da0ca50b7f09af45fa435277",
"sha256:7b3443f4ec3ba5aa00b0e9fa90cf31d98321cbff8b925a7c7b84161619870bc9",
"sha256:7c55b0f9f68590115c25272b06e63f0824f03d4fc7d6deed43d8ad5660cabdbf",
"sha256:7fd1b3879a578a8ec2076c7961076df540b9af317123f84569f5a9ddee64ce92",
"sha256:8076a5769d6bdf5f673a19d51da05fc79e2bbf25e9fe755c47595785c06a8c72",
"sha256:80f811146831c8c86ab17b640801c25dc0a88c630e855e2bef3568f30434d52b",
"sha256:8412eacef34cae2836d891836a7fff7b754d6bcac61f6c12ba5ca9bc7e427b68",
"sha256:865c8e95713744cf5ae261f3067861e9da5f1370ba91fc536431e29b418676fa",
"sha256:86b1e28d4c37e89220e924305cd9f82866bb0ace666943a6e4196c5df4d58dcc",
"sha256:891c69e027748b4a73847335d208e374ce54ca3c335907d381fde4e41661b13b",
"sha256:8ac164e20d17cc285f2b94dc31c384bc3aa3dd5e7490473b3db043dd70fbccfd",
"sha256:8c5701dc474b041e2934a26d31d39f90fac8a3dee2322b39f7729867f932b1d4",
"sha256:90ebb429e933645f3da534c89b29b665e285048973b4d2b6946526888c3eb2c7",
"sha256:923fec6e5461c42bd7e3fd5ec37492c6f3468be0499bc0707b4bbbc16ac21792",
"sha256:935f9edd022ec13e447e5723a7d14456c8af254544cefbc533f6dd276c9aa0d9",
"sha256:95ab1594377effac17110e1352989bdd7bdfca9ff0e5eeccd8c69c5389b826d0",
"sha256:9974d2f7dc561cce3bb88dfa8eb309dab64c729de85fba32e98d75cf24b66297",
"sha256:9c733cda03b6d636b4219625a4acb5c6ffb10803338e437fb614fef9516825ef",
"sha256:9dc001c3e10de4725c749d4c2f2bdc6ae24de5a88a339c4bce32300a31ede179",
"sha256:9f811079d2f9795b5d48b55a37aa7773680a5659afe34b54cc1d86590a51507d",
"sha256:a2726d7bfd9f76158c84c10a409b77a320426540df8c35be172444394b17f7ea",
"sha256:a479466da6db5c1e8754caee6c262cd373e6e6c363172d74394f4bff3d84d7b5",
"sha256:a543492513a93b001975ae283a51f4b67973662a375a403ae82f420d2c7205ee",
"sha256:a89c75a5b9bc329131115a409d0acc16e8da8dfd5867ba59f1dd66ae7ea8fa82",
"sha256:a8f6f72974a19efead54195bc9bed4d850fc047bb7aa971268fd9a8387c89011",
"sha256:a9ccbf1f129480ed3044f540c0fdbc4ee556f7175e5ab40fe077ff6baf286d4e",
"sha256:aa0cc8365ab29487eb4f9979fd41b22549853389e22d5de3f134a6796e1b05a4",
"sha256:adb4167043d3a78280d5d05ce0ba22055c266cf8655ce942f2fb881262ff3cdf",
"sha256:af06c863f152005c7592df1d6a7009c836a247c9d8adb78fef8575a5a98699db",
"sha256:b067915e3c3936966a8607f6fe5487df0c9c4afb85226613b520890049deea20",
"sha256:b7c5f6fe273291f4d414d55b2c80d33c457b8a42677ad14b4b47ff025d0893e4",
"sha256:b915daeb2d8c1f5cee4b970f2e2c988ce6514aace3c9296e58dd64dc9aa5d575",
"sha256:ba0e3255b0396cac3cc7bbace76404dd72b5438bf0d8e7cefa2f79a7f3649caa",
"sha256:bda8136e6a80bdea23e5e74e09df0362744d24ffb8cd59c4a95a6ce3d142f79c",
"sha256:bfe3c517c283e484843cb2e357dd57ba009cff351edf45fb455b5fbd1f45b15f",
"sha256:c588c45da9b08ab3da81d08d7987dae6d2a3badd63acdb3e206a42dbfa7cb76f",
"sha256:c600e85f2ffd9f1035222b1a312aff85fd11ea39baff1d705b9b047aad2ce267",
"sha256:c68e9f1fcb4d43798ad8814c4c1b61547b014b667216cb754e606bfade587018",
"sha256:c9649dfc57cc1f9835551deb17689e8d44666315f2e82d337b9f07bd76ae3aa2",
"sha256:cb45350fd1dc75cd68d3d72c47f5b513cb0578da716df5fba02fff31c69d5f2d",
"sha256:cbcf8630ef4afb05dc30107bfa17f16c0896bb30ee48fc24bf64c1f970f3b1fd",
"sha256:cbd949bdd87567b0ad183d7676feb98136cde5bb9025403794a4c0db28ed3a47",
"sha256:cc08ef8b90d78bfac66f0def80240b0197008e4852c9f285907377b2947ffdcb",
"sha256:cd17a1e489f02ce9117b0de3c0b1fab1c3e2eedc82311b299ee6b6faf6c23a29",
"sha256:d05686b5487cfa2e2c28ff1aa370ea3e6c5accfe6435944ddea1e10d93872147",
"sha256:d0e10e6f8f6dc5762adee7dece33b722282e1f59aa6a55da5d493a97282fedd8",
"sha256:d181ef50923c29cf0450c3cd47e2f0557b62218c50b2ab8ce2ecaa02bd97e670",
"sha256:d1caf40c1c657b27858f9774d5c0e232089bca9cb8ee17ce7478c6e9264d2587",
"sha256:d7642b9bc4827b5518ebdb3b82698ada8c14c7661ddec5fe719f3e56ccd13c97",
"sha256:d9481174d3ed982e269c090f780122fb59cee6c3796f74efe74e70f7780ed94c",
"sha256:d9ba68ec283153dead62cbe81872d28e053745f12335d037de9cbd14bd1877f5",
"sha256:da71945c9ace018d8634822f16cbc2a78323ef6c876b1d34bbf5d5222fd6a72e",
"sha256:dc44678a72ac0910bac46fa6a0de6af9ba1355669b3dfaf1ce5f05ca7a74364e",
"sha256:df32d59cb9780f66d165a9a7a26f19df2c7d24e3bd58713108b41d0ff4f929c6",
"sha256:df670918eb7dd719642e05979fc84704af913d563fd17ed636f7c4783003fdcc",
"sha256:e78b6ed8165996013165eeabd875c5dfc19d41b54f94b40e9fff0eb3193e5e8e",
"sha256:ed8fc66786de8d0376f9f913c09e963c66e90ced9aa11997f93bdb30f7c872a8",
"sha256:eff4b8d89f444f7e49136dc695599a591ff769300734446c0a86cba2eb2f9895",
"sha256:f21af781a4a6fbad54f03c598ab620e3a77032c5878f3d780448421a6e1818c7",
"sha256:f2bcdc54ea267fe72bfc7d83c041e4eb58d7d8dc6f578dfddb52f037ce62f432",
"sha256:f2f0498b7d2a3c072766dba3274fe22a183dbea1f99d188f1c6c72209a1063dc",
"sha256:f7208ab6e009c627b7557ce55c465c98967e8caa8b11833531fdf95799372633",
"sha256:f7590d5a455321e53857892ab8879dce62d1f4b04748769f5adf2e707afb9d4f",
"sha256:fa257a4d0d21fcbca5b5fcba9dca5a78011cb93c0323fb8855c6d2dfbc76eb77",
"sha256:fba9b62da882c1be1280a7584ec4515d0a6006a94d6e5819730ec2eab60ffe12",
"sha256:fe4371595edf78c41ef8ac8df20df3943e13defd0efcb732b2e393b5a8a7a71f"
],
"index": "pypi",
"markers": "python_version >= '3.9'",
"version": "==1.1.0"
},
"websocket-client": {
"hashes": [
"sha256:17b44cc997f5c498e809b22cdf2d9c7a9e71c02c8cc2b6c56e7c2d1239bfa526",
"sha256:3239df9f44da632f96012472805d40a23281a991027ce11d2f45a6f24ac4c3da"
],
"index": "pypi",
"markers": "python_version >= '3.8'",
"version": "==1.8.0"
},
"websockets": { "websockets": {
"hashes": [ "hashes": [
"sha256:0701bc3cfcb9164d04a14b149fd74be7347a530ad3bbf15ab2c678a2cd3dd9a2", "sha256:0701bc3cfcb9164d04a14b149fd74be7347a530ad3bbf15ab2c678a2cd3dd9a2",

View File

@@ -1,296 +0,0 @@
# Lighting Controller - Separated Architecture
This version of the lighting controller separates the UI and control logic, communicating via WebSocket. The MIDI controller is now integrated with the UI client.
## Architecture Overview
```
┌─────────────────┐ WebSocket ┌─────────────────┐ WebSocket ┌─────────────────┐
│ UI Client │◄─────────────────►│ Control Server │◄─────────────────►│ LED Server │
│ │ │ │ │ │
│ - MIDI Input │ │ - Lighting Logic│ │ - LED Bars │
│ - User Interface│ │ - Pattern Logic │ │ - ESP-NOW │
│ - Status Display│ │ - Beat Handling │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
│ │ TCP
│ ▼
│ ┌─────────────────┐
│ │ Sound Detector │
│ │ │
│ │ - Audio Input │
│ │ - Beat Detection│
│ │ - BPM Analysis │
│ └─────────────────┘
│ MIDI
┌─────────────────┐
│ MIDI Controller │
│ │
│ - Knobs/Dials │
│ - Buttons │
│ - Pattern Select│
└─────────────────┘
```
## Components
### 1. UI Client (`src/ui_client.py`)
- **Purpose**: User interface and MIDI controller integration
- **Features**:
- MIDI controller input handling
- Real-time status display
- Pattern selection visualization
- Connection status monitoring
- **Communication**: WebSocket client to control server
### 2. Control Server (`src/control_server.py`)
- **Purpose**: Core lighting control logic
- **Features**:
- Pattern execution
- Beat synchronization
- Parameter management
- LED bar communication
- **Communication**:
- WebSocket server for UI clients
- TCP server for sound detector
- WebSocket client to LED server
### 3. Sound Detector (`src/sound.py`)
- **Purpose**: Audio beat detection and BPM analysis
- **Features**:
- Real-time audio processing
- Beat detection
- BPM calculation
- Tempo reset functionality
- **Communication**: TCP client to control server
## WebSocket Protocol
### UI Client → Control Server Messages
```json
{
"type": "pattern_change",
"data": {
"pattern": "pulse"
}
}
```
```json
{
"type": "color_change",
"data": {
"r": 255,
"g": 0,
"b": 0
}
}
```
```json
{
"type": "brightness_change",
"data": {
"brightness": 80
}
}
```
```json
{
"type": "parameter_change",
"data": {
"n1": 15,
"n2": 20
}
}
```
```json
{
"type": "delay_change",
"data": {
"delay": 150
}
}
```
```json
{
"type": "beat_toggle",
"data": {
"enabled": true
}
}
```
```json
{
"type": "reset_tempo",
"data": {}
}
```
## Running the System
### Option 1: Use the startup script (Recommended)
```bash
python start_lighting_controller.py
```
### Option 2: Start components individually
1. **Start Control Server**:
```bash
pipenv run control
# or
python src/control_server.py
```
2. **Start Sound Detector** (in another terminal):
```bash
pipenv run sound
# or
python src/sound.py
```
3. **Start UI Client** (in another terminal):
```bash
pipenv run ui
# or
python src/ui_client.py
```
### Option 3: Development mode with auto-reload
```bash
# Terminal 1 - Control Server
pipenv run dev-control
# Terminal 2 - Sound Detector
pipenv run sound
# Terminal 3 - UI Client
pipenv run dev-ui
```
## Configuration
### MIDI Controller
- MIDI device preferences are saved in `config.json`
- The UI client automatically detects and connects to MIDI devices
- Use the dropdown to select different MIDI ports
### Network Settings
- **Control Server**: `localhost:8765` (WebSocket)
- **Sound Detector**: `127.0.0.1:65432` (TCP)
- **LED Server**: `192.168.4.1:80/ws` (WebSocket)
### Audio Settings
- Audio input device index: 7 (modify in `src/sound.py`)
- Buffer size: 512 samples
- Sample rate: Auto-detected from device
## MIDI Controller Mapping
### Buttons (Notes 36-51)
- **Row 1**: Pulse, Sequential Pulse
- **Row 2**: Alternating, Alternating Phase
- **Row 3**: N Chase, Rainbow
- **Row 4**: Flicker, Radiate
### Dials (CC30-37)
- **CC30**: Red (0-255)
- **CC31**: Green (0-255)
- **CC32**: Blue (0-255)
- **CC33**: Brightness (0-100)
- **CC34**: N1 parameter
- **CC35**: N2 parameter
- **CC36**: N3 parameter
- **CC37**: Delay (0-508ms)
### Additional Knobs (CC38-45)
- **CC38**: Pulse N1
- **CC39**: Pulse N2
- **CC40**: Alternating N1
- **CC41**: Alternating N2
- **CC42**: Radiate N1
- **CC43**: Radiate Delay
- **CC44**: Knob 7
- **CC45**: Knob 8
### Control Buttons
- **CC27**: Beat sending toggle (127=on, 0=off)
- **CC29**: Reset tempo detection
## Troubleshooting
### Connection Issues
1. **UI Client can't connect to Control Server**:
- Ensure control server is running first
- Check firewall settings
- Verify port 8765 is available
2. **Control Server can't connect to LED Server**:
- Check LED server IP address (192.168.4.1)
- Verify LED server is running
- Check network connectivity
3. **Sound Detector can't connect to Control Server**:
- Ensure control server is running
- Check TCP port 65432 is available
### MIDI Issues
1. **No MIDI devices detected**:
- Check MIDI controller connection
- Install MIDI drivers if needed
- Use "Refresh MIDI Ports" button
2. **MIDI input not working**:
- Verify correct MIDI port is selected
- Check MIDI controller is sending data
- Look for error messages in console
### Performance Issues
1. **High CPU usage**:
- Reduce audio buffer size in sound.py
- Increase parameter update interval
- Check for network latency
2. **Audio dropouts**:
- Increase audio buffer size
- Check audio device settings
- Reduce system load
## Development
### Adding New Patterns
1. Add pattern name to `PATTERN_NAMES` in `control_server.py`
2. Implement pattern logic in `LightingController` class
3. Add pattern to MIDI button mapping in `ui_client.py`
### Adding New MIDI Controls
1. Add control change handler in `MidiController.handle_midi_message()`
2. Add corresponding WebSocket message type
3. Implement handler in `LightingController.handle_ui_command()`
### Modifying UI
- Edit `src/ui_client.py` for UI changes
- Use `pipenv run dev-ui` for auto-reload during development
- UI uses tkinter with dark theme
## Migration from Monolithic Version
The separated architecture maintains compatibility with:
- Existing MIDI controller mappings
- LED bar communication protocol
- Sound detection functionality
- Configuration files
Key differences:
- MIDI controller is now part of UI client
- Control logic is isolated in control server
- Communication via WebSocket instead of direct function calls
- Better separation of concerns and modularity

745
main.py Normal file
View File

@@ -0,0 +1,745 @@
import asyncio
import tkinter as tk
from tkinter import ttk, messagebox # Import messagebox for confirmations
import json
from async_tkinter_loop import async_handler, async_mainloop
from networking import WebSocketClient
import color_utils
from settings import Settings
import time
# Dark theme colors (unchanged)
bg_color = "#2e2e2e"
fg_color = "white"
trough_color_red = "#4a0000"
trough_color_green = "#004a00"
trough_color_blue = "#00004a"
trough_color_brightness = "#4a4a4a"
trough_color_delay = "#4a4a4a"
active_bg_color = "#4a4a4a"
highlight_pattern_color = "#6a5acd"
# New color for active color in palette
active_palette_color_border = "#FFD700" # Gold color
class App:
def __init__(self) -> None:
self.settings = Settings()
self.root = tk.Tk()
self.root.attributes("-fullscreen", True)
self.root.configure(bg=bg_color)
# Debouncing variables (remain the same)
self.last_rgb_update_time = 0
self.rgb_update_interval_ms = 100
self.pending_rgb_update_id = None
self.last_brightness_update_time = 0
self.brightness_update_interval_ms = 100
self.pending_brightness_update_id = None
self.last_delay_update_time = 0
self.delay_update_interval_ms = 100
self.pending_delay_update_id = None
# --- WebSocketClient ---
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
self.root.after(100, async_handler(self.websocket_client.connect))
# Configure ttk style (unchanged)
style = ttk.Style()
style.theme_use("alt")
style.configure(".", background=bg_color, foreground=fg_color, font=("Arial", 14))
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure(
"TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5]
)
style.map("TNotebook.Tab", background=[("selected", active_bg_color)], foreground=[("selected", fg_color)])
style.configure("TFrame", background=bg_color)
# Create Notebook for tabs (unchanged)
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(expand=1, fill="both")
self.tabs = {}
self.create_tabs()
self.notebook.bind("<<NotebookTabChanged>>", self.on_tab_change)
# Add Reload Config Button (unchanged)
reload_button = tk.Button(
self.root,
text="Reload Config",
command=self.reload_config,
bg=active_bg_color,
fg=fg_color,
font=("Arial", 20),
padx=20,
pady=10,
relief=tk.FLAT,
)
reload_button.pack(side=tk.BOTTOM, pady=20)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
async_mainloop(self.root)
def on_closing(self):
print("Closing application...")
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
def create_tabs(self):
for tab_name in list(self.tabs.keys()):
self.notebook.forget(self.tabs[tab_name])
del self.tabs[tab_name]
for key, value in self.settings["lights"].items():
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text=key)
self.create_light_control_widgets(tab, key, value["names"], value["settings"])
self.tabs[key] = tab
def create_light_control_widgets(self, tab, tab_name, ids, initial_settings):
slider_length = 800
slider_width = 50
# Extract initial color, brightness, and delay
initial_colors = initial_settings.get("colors", ["#000000"])
initial_hex_color = initial_colors[0] if initial_colors else "#000000"
initial_brightness = initial_settings.get("brightness", 127)
initial_delay = initial_settings.get("delay", 0)
initial_pattern = initial_settings.get("pattern", "on")
initial_r, initial_g, initial_b = color_utils.hex_to_rgb(initial_hex_color)
# Main frame to hold everything within the tab
main_tab_frame = tk.Frame(tab, bg=bg_color)
main_tab_frame.pack(expand=True, fill="both", padx=10, pady=10)
# Left panel for sliders
slider_panel_frame = tk.Frame(main_tab_frame, bg=bg_color)
slider_panel_frame.pack(side=tk.LEFT, padx=10, pady=10)
# Common slider configuration
slider_config = {
"from_": 255,
"to": 0,
"orient": tk.VERTICAL,
"length": slider_length,
"width": slider_width,
"bg": bg_color,
"fg": fg_color,
"highlightbackground": bg_color,
"activebackground": active_bg_color,
"resolution": 1,
"sliderlength": 70,
}
# Red Slider
red_slider = tk.Scale(slider_panel_frame, label="Red", troughcolor=trough_color_red, **slider_config)
red_slider.set(initial_r)
red_slider.pack(side=tk.LEFT, padx=10)
red_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
red_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
# Green Slider
green_slider = tk.Scale(slider_panel_frame, label="Green", troughcolor=trough_color_green, **slider_config)
green_slider.set(initial_g)
green_slider.pack(side=tk.LEFT, padx=10)
green_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
green_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
# Blue Slider
blue_slider = tk.Scale(slider_panel_frame, label="Blue", troughcolor=trough_color_blue, **slider_config)
blue_slider.set(initial_b)
blue_slider.pack(side=tk.LEFT, padx=10)
blue_slider.bind("<B1-Motion>", lambda _: self.schedule_update_rgb(tab))
blue_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_rgb(tab, force_send=True))
# Brightness Slider
brightness_slider = tk.Scale(
slider_panel_frame, label="Brightness", troughcolor=trough_color_brightness, **slider_config
)
brightness_slider.set(initial_brightness)
brightness_slider.pack(side=tk.LEFT, padx=10)
brightness_slider.bind("<B1-Motion>", lambda _: self.schedule_update_brightness(tab))
brightness_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_brightness(tab, force_send=True))
# Delay Slider
delay_slider_config = slider_config.copy()
delay_slider_config.update(
{
"from_": 1000,
"to": 0,
"resolution": 10,
"label": "Delay (ms)",
"troughcolor": trough_color_delay,
}
)
delay_slider = tk.Scale(slider_panel_frame, **delay_slider_config)
delay_slider.set(initial_delay)
delay_slider.pack(side=tk.LEFT, padx=10)
delay_slider.bind("<B1-Motion>", lambda _: self.schedule_update_delay(tab))
delay_slider.bind("<ButtonRelease-1>", lambda _: self.schedule_update_delay(tab, force_send=True))
# Store references to widgets for this tab
tab.widgets = {
"red_slider": red_slider,
"green_slider": green_slider,
"blue_slider": blue_slider,
"brightness_slider": brightness_slider,
"delay_slider": delay_slider,
"selected_color_index": 0, # Default to the first color
}
tab.colors_in_palette = initial_colors.copy() # Store the list of hex colors for this tab
tab.color_swatch_frames = [] # To hold references to the color swatches
# Right panel for IDs, Patterns, and NEW Color Palette
right_panel_frame = tk.Frame(main_tab_frame, bg=bg_color)
right_panel_frame.pack(side=tk.LEFT, padx=20, pady=10, anchor="n", expand=True, fill="both")
# IDs section - MODIFIED TO BE SIDE-BY-SIDE
ids_frame = tk.Frame(right_panel_frame, bg=bg_color)
ids_frame.pack(pady=10, fill=tk.X)
tk.Label(ids_frame, text="Associated Names:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(pady=10)
# New inner frame for the IDs to be displayed horizontally
ids_inner_frame = tk.Frame(ids_frame, bg=bg_color)
ids_inner_frame.pack(fill=tk.X, expand=True) # Pack this frame to fill available width
for light_id in ids:
tk.Label(ids_inner_frame, text=str(light_id), font=("Arial", 18), bg=bg_color, fg=fg_color).pack(
side=tk.LEFT, padx=5, pady=2
) # Pack labels horizontally
# --- New Frame to hold Patterns and Color Palette side-by-side ---
patterns_and_palette_frame = tk.Frame(right_panel_frame, bg=bg_color)
patterns_and_palette_frame.pack(pady=20, fill=tk.BOTH, expand=True)
# Patterns section
patterns_frame = tk.Frame(patterns_and_palette_frame, bg=bg_color, bd=2, relief=tk.GROOVE)
patterns_frame.pack(side=tk.LEFT, padx=10, pady=5, fill=tk.BOTH, expand=True) # Pack to the left
tk.Label(patterns_frame, text="Patterns:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(pady=10)
tab.pattern_buttons = {}
patterns = self.settings.get("patterns", [])
for pattern_name in patterns:
button = tk.Button(
patterns_frame,
text=pattern_name,
command=lambda p=pattern_name: self.send_pattern(tab_name, p),
bg=active_bg_color,
fg=fg_color,
font=("Arial", 18),
padx=15,
pady=5,
relief=tk.FLAT,
)
button.pack(pady=5, fill=tk.X)
tab.pattern_buttons[pattern_name] = button
self.highlight_pattern_button(tab, initial_pattern)
# --- Color Palette Editor Section ---
color_palette_editor_frame = tk.Frame(patterns_and_palette_frame, bg=bg_color, bd=2, relief=tk.GROOVE)
color_palette_editor_frame.pack(side=tk.LEFT, padx=10, pady=5, fill=tk.BOTH, expand=True) # Pack to the left
tab.color_palette_editor_frame = color_palette_editor_frame # Store reference for update_ui_for_pattern
tk.Label(color_palette_editor_frame, text="Color Palette:", font=("Arial", 20), bg=bg_color, fg=fg_color).pack(
pady=10
)
# Frame to hold color swatches (will be dynamic)
tab.color_swatches_container = tk.Frame(color_palette_editor_frame, bg=bg_color)
tab.color_swatches_container.pack(pady=5, fill=tk.BOTH, expand=True)
# Buttons for Add/Remove Color
palette_buttons_frame = tk.Frame(color_palette_editor_frame, bg=bg_color)
palette_buttons_frame.pack(pady=10, fill=tk.X)
add_color_button = tk.Button(
palette_buttons_frame,
text="Add Color",
command=lambda t=tab: self.add_color_to_palette(t),
bg=active_bg_color,
fg=fg_color,
font=("Arial", 16),
padx=10,
pady=5,
relief=tk.FLAT,
)
add_color_button.pack(side=tk.LEFT, expand=True, padx=5)
remove_color_button = tk.Button(
palette_buttons_frame,
text="Remove Selected",
command=lambda t=tab: self.remove_selected_color_from_palette(t),
bg=active_bg_color,
fg=fg_color,
font=("Arial", 16),
padx=10,
pady=5,
relief=tk.FLAT,
)
remove_color_button.pack(side=tk.RIGHT, expand=True, padx=5)
# Initial population of the color palette
self.refresh_color_palette_display(tab)
# The initial call to update_ui_for_pattern now only sets slider values and highlights
self.update_ui_for_pattern(tab, initial_pattern)
def refresh_color_palette_display(self, tab):
"""Clears and repopulates the color swatches in the palette display."""
# Clear existing swatches
for frame in tab.color_swatch_frames:
frame.destroy()
tab.color_swatch_frames.clear()
for i, hex_color in enumerate(tab.colors_in_palette):
swatch_frame = tk.Frame(
tab.color_swatches_container, bg=hex_color, width=100, height=50, bd=2, relief=tk.SOLID
)
swatch_frame.pack(pady=3, padx=5, fill=tk.X)
# Bind click to select this color for editing
swatch_frame.bind("<Button-1>", lambda event, idx=i, t=tab: self.select_color_in_palette(t, idx))
# Add a label inside to make it clickable too
swatch_label = tk.Label(
swatch_frame,
text=f"Color {i+1}",
bg=hex_color,
fg=color_utils.get_contrast_text_color(hex_color),
font=("Arial", 14),
width=5,
height=3,
)
swatch_label.pack(expand=True, fill=tk.BOTH)
swatch_label.bind("<Button-1>", lambda event, idx=i, t=tab: self.select_color_in_palette(t, idx))
tab.color_swatch_frames.append(swatch_frame)
# Re-highlight the currently selected color
self._highlight_selected_color_swatch(tab)
def _highlight_selected_color_swatch(self, tab):
"""Applies/removes highlight border to the selected color swatch."""
current_index = tab.widgets["selected_color_index"]
for i, swatch_frame in enumerate(tab.color_swatch_frames):
if i == current_index:
swatch_frame.config(highlightbackground=active_palette_color_border, highlightthickness=3)
else:
swatch_frame.config(highlightbackground=swatch_frame.cget("bg"), highlightthickness=0) # Reset to no highlight
def select_color_in_palette(self, tab, index: int):
"""Selects a color in the palette, updates sliders, and highlights swatch.
This now also triggers an RGB update to the device."""
if not (0 <= index < len(tab.colors_in_palette)):
return
tab.widgets["selected_color_index"] = index
self._highlight_selected_color_swatch(tab)
# Update RGB sliders with the selected color
hex_color = tab.colors_in_palette[index]
r, g, b = color_utils.hex_to_rgb(hex_color)
tab.widgets["red_slider"].set(r)
tab.widgets["green_slider"].set(g)
tab.widgets["blue_slider"].set(b)
print(f"Selected color index {index}: {hex_color}")
# Immediately send the update, as changing the selected color implies
# a desire to change the light's current color, regardless of pattern.
# This will also save the settings.
self.schedule_update_rgb(tab, force_send=True)
def add_color_to_palette(self, tab):
"""Adds a new black color to the palette and selects it, with a limit of 10 colors."""
MAX_COLORS = 8 # Define the maximum number of colors allowed
if len(tab.colors_in_palette) >= MAX_COLORS:
messagebox.showwarning("Color Limit Reached", f"You can add a maximum of {MAX_COLORS} colors to the palette.")
return
# Simplified: just add black. If unique colors were required globally,
# more complex logic would be needed here.
tab.colors_in_palette.append("#000000") # Add black as default
self.refresh_color_palette_display(tab)
# Select the newly added color
self.select_color_in_palette(tab, len(tab.colors_in_palette) - 1)
self.save_current_tab_settings() # Save changes to settings.json
def remove_selected_color_from_palette(self, tab):
"""Removes the currently selected color from the palette."""
current_index = tab.widgets["selected_color_index"]
if len(tab.colors_in_palette) <= 1:
messagebox.showwarning("Cannot Remove", "There must be at least one color in the palette.")
return
if messagebox.askyesno("Confirm Delete", f"Are you sure you want to remove Color {current_index + 1}?"):
del tab.colors_in_palette[current_index]
# Adjust selected index if the removed color was the last one
if current_index >= len(tab.colors_in_palette):
tab.widgets["selected_color_index"] = len(tab.colors_in_palette) - 1
if tab.widgets["selected_color_index"] < 0: # Should not happen with 1-color check
tab.widgets["selected_color_index"] = 0
self.refresh_color_palette_display(tab)
# Update sliders with the new selected color (if any)
if tab.colors_in_palette:
self.select_color_in_palette(tab, tab.widgets["selected_color_index"])
else: # If palette became empty (shouldn't happen with 1-color check)
tab.widgets["red_slider"].set(0)
tab.widgets["green_slider"].set(0)
tab.widgets["blue_slider"].set(0)
self.save_current_tab_settings() # Save changes to settings.json
def update_ui_for_pattern(self, tab, current_pattern: str):
"""
Manages the state of the UI elements based on the selected pattern.
The Color Palette Editor is always visible. RGB sliders update
based on the currently selected color in the palette, or the first
color if the palette is empty or not in transition mode and a new tab/pattern is selected.
"""
# The color_palette_editor_frame is always packed, so no visibility control needed here.
# When the pattern changes, we need to ensure the RGB sliders reflect
# the appropriate color based on the context.
if tab.colors_in_palette:
# If in 'transition' mode, set sliders to the currently selected color in the palette.
if current_pattern == "transition":
self.select_color_in_palette(tab, tab.widgets["selected_color_index"])
else:
# If not in 'transition' mode, but a color is selected, update sliders to that.
# Or, if this is a fresh load/tab change, ensure it's the first color.
# This ensures the sliders consistently show the color that will be sent
# for 'on'/'blink' based on the palette's first entry.
hex_color = tab.colors_in_palette[tab.widgets["selected_color_index"]]
r, g, b = color_utils.hex_to_rgb(hex_color)
tab.widgets["red_slider"].set(r)
tab.widgets["green_slider"].set(g)
tab.widgets["blue_slider"].set(b)
self._highlight_selected_color_swatch(tab) # Re-highlight even if index didn't change
else:
# Handle empty palette scenario (shouldn't happen with default ["#000000"])
tab.widgets["red_slider"].set(0)
tab.widgets["green_slider"].set(0)
tab.widgets["blue_slider"].set(0)
tab.widgets["selected_color_index"] = 0 # Ensure index is valid
self._highlight_selected_color_swatch(tab)
# Brightness and Delay sliders are always visible.
def highlight_pattern_button(self, tab_widget, active_pattern_name):
if hasattr(tab_widget, "pattern_buttons"):
for pattern_name, button in tab_widget.pattern_buttons.items():
if pattern_name == active_pattern_name:
button.config(bg=highlight_pattern_color)
else:
button.config(bg=active_bg_color)
def on_tab_change(self, event):
selected_tab_name = self.notebook.tab(self.notebook.select(), "text")
current_tab_widget = self.notebook.nametowidget(self.notebook.select())
initial_settings = self.settings["lights"][selected_tab_name]["settings"]
# Ensure current_tab_widget has the necessary attributes
if not hasattr(current_tab_widget, "colors_in_palette"):
# This tab might not have been fully initialized yet, or recreated
# In a full reload, create_tabs ensures it is.
return
# Update the local colors_in_palette list for the tab
current_tab_widget.colors_in_palette = initial_settings.get("colors", ["#000000"]).copy()
current_tab_widget.widgets["selected_color_index"] = 0 # Default to first color
# Refresh the color palette display and select the first color
self.refresh_color_palette_display(current_tab_widget)
if current_tab_widget.colors_in_palette:
self.select_color_in_palette(current_tab_widget, 0)
else: # If palette became empty (shouldn't happen with default ["#000000"])
current_tab_widget.widgets["red_slider"].set(0)
current_tab_widget.widgets["green_slider"].set(0)
current_tab_widget.widgets["blue_slider"].set(0)
# Update brightness and delay sliders
current_tab_widget.widgets["brightness_slider"].set(initial_settings.get("brightness", 127))
current_tab_widget.widgets["delay_slider"].set(initial_settings.get("delay", 0))
# Highlight the active pattern button
initial_pattern = initial_settings.get("pattern", "on")
self.highlight_pattern_button(current_tab_widget, initial_pattern)
# Update UI visibility based on the current pattern
self.update_ui_for_pattern(current_tab_widget, initial_pattern)
def reload_config(self):
print("Reloading configuration...")
self.settings = Settings()
self.create_tabs()
# After recreating, ensure the currently selected tab's sliders are updated
# Trigger on_tab_change manually for the currently selected tab
self.on_tab_change(None)
# --- Debouncing functions (no change to core logic, just how they call update_rgb) ---
def schedule_update_rgb(self, tab, force_send=False):
current_time = time.time() * 1000
if force_send:
if self.pending_rgb_update_id:
self.root.after_cancel(self.pending_rgb_update_id)
self.pending_rgb_update_id = None
self.update_rgb(tab)
self.last_rgb_update_time = current_time
elif current_time - self.last_rgb_update_time >= self.rgb_update_interval_ms:
if self.pending_rgb_update_id:
self.root.after_cancel(self.pending_rgb_update_id)
self.pending_rgb_update_id = None
self.update_rgb(tab)
self.last_rgb_update_time = current_time
else:
if self.pending_rgb_update_id:
self.root.after_cancel(self.pending_rgb_update_id)
time_to_wait = int(self.rgb_update_interval_ms - (current_time - self.last_rgb_update_time))
self.pending_rgb_update_id = self.root.after(time_to_wait, lambda: self.update_rgb(tab))
def schedule_update_brightness(self, tab, force_send=False):
current_time = time.time() * 1000
if force_send:
if self.pending_brightness_update_id:
self.root.after_cancel(self.pending_brightness_update_id)
self.pending_brightness_update_id = None
self.update_brightness(tab)
self.last_brightness_update_time = current_time
elif current_time - self.last_brightness_update_time >= self.brightness_update_interval_ms:
if self.pending_brightness_update_id:
self.root.after_cancel(self.pending_brightness_update_id)
self.pending_brightness_update_id = None
self.update_brightness(tab)
self.last_brightness_update_time = current_time
else:
if self.pending_brightness_update_id:
self.root.after_cancel(self.pending_brightness_update_id)
time_to_wait = int(self.brightness_update_interval_ms - (current_time - self.last_brightness_update_time))
self.pending_brightness_update_id = self.root.after(time_to_wait, lambda: self.update_brightness(tab))
def schedule_update_delay(self, tab, force_send=False):
current_time = time.time() * 1000
if force_send:
if self.pending_delay_update_id:
self.root.after_cancel(self.pending_delay_update_id)
self.pending_delay_update_id = None
self.update_delay(tab)
self.last_delay_update_time = current_time
elif current_time - self.last_delay_update_time >= self.delay_update_interval_ms:
if self.pending_delay_update_id:
self.root.after_cancel(self.pending_delay_update_id)
self.pending_delay_update_id = None
self.update_delay(tab)
self.last_delay_update_time = current_time
else:
if self.pending_delay_update_id:
self.root.after_cancel(self.pending_delay_update_id)
time_to_wait = int(self.delay_update_interval_ms - (current_time - self.last_delay_update_time))
self.pending_delay_update_id = self.root.after(time_to_wait, lambda: self.update_delay(tab))
# --- Asynchronous Update Functions ---
@async_handler
async def update_rgb(self, tab):
"""Update the currently selected color in the palette and send to the server."""
try:
red_slider = tab.widgets["red_slider"]
green_slider = tab.widgets["green_slider"]
blue_slider = tab.widgets["blue_slider"]
r = red_slider.get()
g = green_slider.get()
b = blue_slider.get()
hex_color = f"#{r:02x}{g:02x}{b:02x}"
print(f"Updating selected color to: {hex_color}")
selected_color_index = tab.widgets["selected_color_index"]
if 0 <= selected_color_index < len(tab.colors_in_palette):
tab.colors_in_palette[selected_color_index] = hex_color
self.refresh_color_palette_display(tab) # Update swatch immediately
selected_server = self.notebook.tab(self.notebook.select(), "text")
names = self.settings["lights"][selected_server]["names"]
# Determine which colors to send based on the current pattern.
current_pattern = self.settings["lights"][selected_server]["settings"].get("pattern", "on")
colors_to_send = []
if current_pattern == "transition":
colors_to_send = tab.colors_in_palette.copy()
elif current_pattern in ["on", "blink", "theater_chase", "flicker"]: # Add other patterns that use a single color
if tab.colors_in_palette:
# For non-transition patterns, the device typically uses only the first color.
# However, if a user picks a color from the palette, we want THAT color to be the one
# sent and active. So, the selected color from the palette *becomes* the first color
# in the list we send to the device for these modes.
# This ensures the light matches the selected palette color.
colors_to_send = [hex_color] # Send the color currently set by the sliders
else:
colors_to_send = ["#000000"] # Default if palette is empty
else: # For other patterns like "off", "rainbow" where colors might not be primary
# We still want to send the *current* palette state for saving,
# but the device firmware might ignore it for these patterns.
colors_to_send = tab.colors_in_palette.copy()
payload = {
"save": True, # Always save this change to config
"names": names,
"settings": {
"colors": colors_to_send, # This now dynamically changes based on pattern
"brightness": tab.widgets["brightness_slider"].get(),
"delay": tab.widgets["delay_slider"].get(),
"pattern": current_pattern, # Always send the current pattern
},
}
# Update the settings object with the new color list (and potentially other synced values)
self.settings["lights"][selected_server]["settings"]["colors"] = tab.colors_in_palette.copy()
self.settings["lights"][selected_server]["settings"]["brightness"] = tab.widgets["brightness_slider"].get()
self.settings["lights"][selected_server]["settings"]["delay"] = tab.widgets["delay_slider"].get()
self.settings.save()
await self.websocket_client.send_data(payload)
print(f"Sent RGB payload: {payload}")
except Exception as e:
print(f"Error updating RGB: {e}")
@async_handler
async def update_brightness(self, tab):
try:
brightness_slider = tab.widgets["brightness_slider"]
brightness = brightness_slider.get()
print(f"Brightness: {brightness}")
selected_server = self.notebook.tab(self.notebook.select(), "text")
names = self.settings["lights"][selected_server]["names"]
payload = {
"save": True,
"names": names,
"settings": {
"brightness": brightness,
},
}
# Update the settings object with the new brightness
self.settings["lights"][selected_server]["settings"]["brightness"] = brightness
self.settings.save()
await self.websocket_client.send_data(payload)
print(f"Sent brightness payload: {payload}")
except Exception as e:
print(f"Error updating brightness: {e}")
@async_handler
async def update_delay(self, tab):
try:
delay_slider = tab.widgets["delay_slider"]
delay = delay_slider.get()
print(f"Delay: {delay}")
selected_server = self.notebook.tab(self.notebook.select(), "text")
names = self.settings["lights"][selected_server]["names"]
payload = {
"save": True,
"names": names,
"settings": {
"delay": delay,
},
}
# Update the settings object with the new delay
self.settings["lights"][selected_server]["settings"]["delay"] = delay
self.settings.save()
await self.websocket_client.send_data(payload)
print(f"Sent delay payload: {payload}")
except Exception as e:
print(f"Error updating delay: {e}")
@async_handler
async def send_pattern(self, tab_name: str, pattern_name: str):
try:
names = self.settings["lights"][tab_name]["names"]
# Get the actual tab widget to access its `colors_in_palette` and other attributes
current_tab_widget = None
for key, tab_widget in self.tabs.items():
if key == tab_name:
current_tab_widget = tab_widget
break
if not current_tab_widget:
print(f"Error: Could not find tab widget for {tab_name}")
return
current_settings_for_tab = self.settings["lights"][tab_name]["settings"]
payload_settings = {
"pattern": pattern_name,
"brightness": current_settings_for_tab.get("brightness", 127),
"delay": current_settings_for_tab.get("delay", 0),
}
# Determine colors to send based on the *newly selected* pattern
if pattern_name == "transition":
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
elif pattern_name in ["on", "blink"]: # Add other patterns that use a single color here
# When switching TO 'on' or 'blink', ensure the color sent is the one
# currently displayed on the sliders (which reflects the selected palette color).
r = current_tab_widget.widgets["red_slider"].get()
g = current_tab_widget.widgets["green_slider"].get()
b = current_tab_widget.widgets["blue_slider"].get()
hex_color_from_sliders = f"#{r:02x}{g:02x}{b:02x}"
payload_settings["colors"] = [hex_color_from_sliders]
else:
# For other patterns, send the full palette, device might ignore or use default
payload_settings["colors"] = current_tab_widget.colors_in_palette.copy()
payload = {
"save": True,
"names": names,
"settings": payload_settings,
}
# Update the settings object with the new pattern and current colors/brightness/delay
self.settings["lights"][tab_name]["settings"]["pattern"] = pattern_name
# Always save the full current palette state in settings.
self.settings["lights"][tab_name]["settings"]["colors"] = current_tab_widget.colors_in_palette.copy()
self.settings.save()
self.highlight_pattern_button(current_tab_widget, pattern_name)
self.update_ui_for_pattern(current_tab_widget, pattern_name) # Update UI based on new pattern
await self.websocket_client.send_data(payload)
print(f"Sent pattern payload: {payload}")
except Exception as e:
print(f"Error sending pattern: {e}")
def save_current_tab_settings(self):
"""Saves the current state of the active tab's settings (colors, brightness, delay, pattern) to config."""
selected_server = self.notebook.tab(self.notebook.select(), "text")
current_tab_widget = self.notebook.nametowidget(self.notebook.select())
if not hasattr(current_tab_widget, "colors_in_palette"):
return # Tab not fully initialized yet
# Update settings for the current tab in the self.settings object
self.settings["lights"][selected_server]["settings"]["colors"] = current_tab_widget.colors_in_palette
self.settings["lights"][selected_server]["settings"]["brightness"] = current_tab_widget.widgets["brightness_slider"].get()
self.settings["lights"][selected_server]["settings"]["delay"] = current_tab_widget.widgets["delay_slider"].get()
# The pattern is updated in send_pattern already, but ensure consistency
# For simplicity, we assume send_pattern is the primary way to change pattern.
self.settings.save()
print(f"Saved settings for {selected_server}")
if __name__ == "__main__":
app = App()

View File

@@ -2,8 +2,7 @@
"lights": { "lights": {
"sign": { "sign": {
"names": [ "names": [
"tt-sign", "tt-sign"
"1"
], ],
"settings": { "settings": {
"colors": [ "colors": [
@@ -11,9 +10,9 @@
"#c30074", "#c30074",
"#00ff00" "#00ff00"
], ],
"brightness": 9, "brightness": 6,
"pattern": "off", "pattern": "color_transition",
"delay": 50 "delay": 30
} }
}, },
"dj": { "dj": {
@@ -80,7 +79,7 @@
"#000000" "#000000"
], ],
"brightness": 6, "brightness": 6,
"pattern": "on", "pattern": "flicker",
"delay": 520 "delay": 520
} }
}, },
@@ -156,7 +155,6 @@
"rainbow_cycle", "rainbow_cycle",
"color_transition", "color_transition",
"theater_chase", "theater_chase",
"flicker", "flicker"
"pulse"
] ]
} }

27
settings.py Normal file
View File

@@ -0,0 +1,27 @@
import json
class Settings(dict):
SETTINGS_FILE = "settings.json"
def __init__(self):
super().__init__()
self.load() # Load settings from file during initialization
def save(self):
try:
j = json.dumps(self, indent=4)
with open(self.SETTINGS_FILE, 'w') as file:
file.write(j)
print("Settings saved successfully.")
except Exception as e:
print(f"Error saving settings: {e}")
def load(self):
try:
with open(self.SETTINGS_FILE, 'r') as file:
loaded_settings = json.load(file)
self.update(loaded_settings)
print("Settings loaded successfully.")
except Exception as e:
print(f"Error loading settings {e}")
self.save()

View File

@@ -1,38 +0,0 @@
# LED Bar Configuration
# Modify these names as needed for your setup
# LED Bar Names/IDs - 4 left bars + 4 right bars
LED_BAR_NAMES = [
"100", # Left Bar 1
"101", # Left Bar 2
"102", # Left Bar 3
"103", # Left Bar 4
"104", # Right Bar 1
"105", # Right Bar 2
"106", # Right Bar 3
"107", # Right Bar 4
]
# Left and right bar groups for spatial control
LEFT_BARS = ["100", "101", "102", "103"]
RIGHT_BARS = ["104", "105", "106", "107"]
# Number of LED bars
NUM_BARS = len(LED_BAR_NAMES)
# Default settings for all bars
DEFAULT_BAR_SETTINGS = {
"pattern": "pulse",
"delay": 100,
"colors": [(0, 255, 0)], # Default green
"brightness": 10,
"num_leds": 200,
"n1": 10,
"n2": 10,
"n3": 1,
"n": 0,
}
# ESP-NOW broadcast settings
ESP_NOW_CHANNEL = 1
ESP_NOW_ENCRYPTION = False

View File

@@ -1,443 +0,0 @@
#!/usr/bin/env python3
"""
Control Server for Lighting Controller
Handles lighting control logic and communicates with LED bars via WebSocket.
Receives commands from UI client via WebSocket.
"""
import asyncio
import websockets
import json
import logging
import socket
import threading
import time
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
from color_utils import adjust_brightness
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration
LED_SERVER_URI = "ws://192.168.4.1:80/ws"
CONTROL_SERVER_PORT = 8765
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
# Pattern name mapping for shorter JSON payloads
PATTERN_NAMES = {
"flicker": "f",
"fill_range": "fr",
"n_chase": "nc",
"alternating": "a",
"pulse": "p",
"rainbow": "r",
"specto": "s",
"radiate": "rd",
"sequential_pulse": "sp",
"alternating_phase": "ap",
}
class LEDController:
"""Handles communication with LED bars via WebSocket."""
def __init__(self, led_server_uri):
self.led_server_uri = led_server_uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
async def connect(self):
"""Connect to LED server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to LED server at {self.led_server_uri}...")
self.websocket = await websockets.connect(self.led_server_uri)
self.is_connected = True
logging.info("Connected to LED server")
except Exception as e:
logging.error(f"Failed to connect to LED server: {e}")
self.is_connected = False
self.websocket = None
async def send_data(self, data):
"""Send data to LED server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to LED server. Attempting to reconnect...")
await self.connect()
if not self.is_connected:
logging.error("Failed to reconnect to LED server. Cannot send data.")
return
try:
await self.websocket.send(json.dumps(data))
logging.debug(f"Sent to LED server: {data}")
except Exception as e:
logging.error(f"Failed to send data to LED server: {e}")
self.is_connected = False
self.websocket = None
# Attempt to reconnect
await self.connect()
async def close(self):
"""Close LED server connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from LED server")
class SoundController:
"""Handles communication with sound beat detector."""
def __init__(self, sound_host, sound_port):
self.sound_host = sound_host
self.sound_port = sound_port
async def send_reset_tempo(self):
"""Send reset tempo command to sound controller."""
try:
reader, writer = await asyncio.open_connection(self.sound_host, self.sound_port)
cmd = "RESET_TEMPO\n".encode('utf-8')
writer.write(cmd)
await writer.drain()
resp = await reader.read(100)
logging.info(f"Sent RESET_TEMPO, response: {resp.decode().strip()}")
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"Failed to send RESET_TEMPO: {e}")
class LightingController:
"""Main lighting control logic."""
def __init__(self):
self.led_controller = LEDController(LED_SERVER_URI)
self.sound_controller = SoundController(SOUND_CONTROL_HOST, SOUND_CONTROL_PORT)
# Lighting state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.beat_index = 0
self.beat_sending_enabled = True
# Rate limiting
self.last_param_update = 0.0
self.param_update_interval = 0.1
self.pending_param_update = False
def _current_color_rgb(self):
"""Get current RGB color tuple."""
r = max(0, min(255, int(self.color_r)))
g = max(0, min(255, int(self.color_g)))
b = max(0, min(255, int(self.color_b)))
return (r, g, b)
async def _send_full_parameters(self):
"""Send all parameters to LED bars."""
full_payload = {
"d": {
"t": "u", # Message type: update
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
"dl": self.delay,
"cl": [self._current_color_rgb()],
"br": self.brightness,
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"s": self.beat_index % 256,
}
}
# Add empty entries for each bar
for bar_name in LED_BAR_NAMES:
full_payload[bar_name] = {}
await self.led_controller.send_data(full_payload)
async def _request_param_update(self):
"""Request parameter update with rate limiting."""
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
self.last_param_update = current_time
await self._send_full_parameters()
else:
self.pending_param_update = True
async def _send_normal_pattern(self):
"""Send normal pattern to all bars."""
patterns_needing_params = ["alternating", "flicker", "n_chase", "rainbow", "radiate"]
payload = {
"d": {
"t": "b", # Message type: beat
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
}
}
if self.current_pattern in patterns_needing_params:
payload["d"].update({
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"dl": self.delay,
"s": self.beat_index % 256,
})
for bar_name in LED_BAR_NAMES:
payload[bar_name] = {}
await self.led_controller.send_data(payload)
async def _handle_sequential_pulse(self):
"""Handle sequential pulse pattern."""
from bar_config import LEFT_BARS, RIGHT_BARS
bar_index = self.beat_index % 4
payload = {
"d": {
"t": "b",
"pt": "o", # off
}
}
left_bar = LEFT_BARS[bar_index]
right_bar = RIGHT_BARS[bar_index]
payload[left_bar] = {"pt": "p"} # pulse
payload[right_bar] = {"pt": "p"} # pulse
await self.led_controller.send_data(payload)
async def _handle_alternating_phase(self):
"""Handle alternating pattern with phase offset."""
payload = {
"d": {
"t": "b",
"pt": "a", # alternating
"n1": self.n1,
"n2": self.n2,
"s": self.beat_index % 2,
}
}
swap_bars = ["101", "103", "105", "107"]
for bar_name in LED_BAR_NAMES:
if bar_name in swap_bars:
payload[bar_name] = {"s": (self.beat_index + 1) % 2}
else:
payload[bar_name] = {}
await self.led_controller.send_data(payload)
async def handle_beat(self, bpm_value):
"""Handle beat from sound detector."""
if not self.beat_sending_enabled or not self.current_pattern:
return
self.beat_index = (self.beat_index + 1) % 1000000
# Send periodic parameter updates every 8 beats
if self.beat_index % 8 == 0:
await self._send_full_parameters()
# Check for pending parameter updates
if self.pending_param_update:
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
self.last_param_update = current_time
self.pending_param_update = False
await self._send_full_parameters()
# Handle pattern-specific beat logic
if self.current_pattern == "sequential_pulse":
await self._handle_sequential_pulse()
elif self.current_pattern == "alternating_phase":
await self._handle_alternating_phase()
elif self.current_pattern:
await self._send_normal_pattern()
async def handle_ui_command(self, message_type, data):
"""Handle command from UI client."""
if message_type == "pattern_change":
self.current_pattern = data.get("pattern", "")
await self._send_full_parameters()
logging.info(f"Pattern changed to: {self.current_pattern}")
elif message_type == "color_change":
self.color_r = data.get("r", self.color_r)
self.color_g = data.get("g", self.color_g)
self.color_b = data.get("b", self.color_b)
await self._request_param_update()
elif message_type == "brightness_change":
self.brightness = data.get("brightness", self.brightness)
await self._request_param_update()
elif message_type == "parameter_change":
if "n1" in data:
self.n1 = data["n1"]
if "n2" in data:
self.n2 = data["n2"]
if "n3" in data:
self.n3 = data["n3"]
await self._request_param_update()
elif message_type == "delay_change":
self.delay = data.get("delay", self.delay)
await self._request_param_update()
elif message_type == "beat_toggle":
self.beat_sending_enabled = data.get("enabled", True)
logging.info(f"Beat sending {'enabled' if self.beat_sending_enabled else 'disabled'}")
elif message_type == "reset_tempo":
await self.sound_controller.send_reset_tempo()
class ControlServer:
"""WebSocket server for UI client communication and TCP server for sound."""
def __init__(self):
self.lighting_controller = LightingController()
self.clients = set()
self.tcp_server = None
async def handle_ui_client(self, websocket):
"""Handle UI client WebSocket connection."""
self.clients.add(websocket)
client_addr = websocket.remote_address
logging.info(f"UI client connected: {client_addr}")
try:
async for message in websocket:
try:
data = json.loads(message)
message_type = data.get("type")
message_data = data.get("data", {})
await self.lighting_controller.handle_ui_command(message_type, message_data)
except json.JSONDecodeError:
logging.error(f"Invalid JSON from client {client_addr}: {message}")
except Exception as e:
logging.error(f"Error handling message from client {client_addr}: {e}")
except websockets.exceptions.ConnectionClosed:
logging.info(f"UI client disconnected: {client_addr}")
except Exception as e:
logging.error(f"Error in UI client handler: {e}")
finally:
self.clients.discard(websocket)
async def handle_tcp_client(self, reader, writer):
"""Handle TCP client (sound detector) connection."""
addr = writer.get_extra_info('peername')
logging.info(f"Sound client connected: {addr}")
try:
while True:
data = await reader.read(4096)
if not data:
logging.info(f"Sound client disconnected: {addr}")
break
message = data.decode().strip()
if self.lighting_controller.beat_sending_enabled:
try:
bpm_value = float(message)
await self.lighting_controller.handle_beat(bpm_value)
except ValueError:
logging.warning(f"Non-BPM message from {addr}: {message}")
except Exception as e:
logging.error(f"Error processing beat from {addr}: {e}")
except asyncio.CancelledError:
logging.info(f"Sound client handler cancelled: {addr}")
except Exception as e:
logging.error(f"Error handling sound client {addr}: {e}")
finally:
logging.info(f"Closing connection for sound client: {addr}")
writer.close()
await writer.wait_closed()
async def start_tcp_server(self):
"""Start TCP server for sound detector."""
self.tcp_server = await asyncio.start_server(
self.handle_tcp_client, "127.0.0.1", 65432
)
addrs = ', '.join(str(sock.getsockname()) for sock in self.tcp_server.sockets)
logging.info(f"TCP server listening on {addrs}")
async def start_websocket_server(self):
"""Start WebSocket server for UI clients."""
server = await websockets.serve(
self.handle_ui_client, "localhost", CONTROL_SERVER_PORT
)
logging.info(f"WebSocket server listening on localhost:{CONTROL_SERVER_PORT}")
async def run(self):
"""Run the control server."""
# Connect to LED server
await self.lighting_controller.led_controller.connect()
# Start servers and heartbeat task
await asyncio.gather(
self.start_websocket_server(),
self.start_tcp_server(),
self._heartbeat_loop()
)
async def _heartbeat_loop(self):
"""Send periodic heartbeats to keep LED connection alive."""
try:
while True:
await asyncio.sleep(5) # Send heartbeat every 5 seconds
if self.lighting_controller.led_controller.is_connected:
# Send a simple heartbeat to keep connection alive
heartbeat_data = {
"d": {
"t": "h", # heartbeat type
}
}
await self.lighting_controller.led_controller.send_data(heartbeat_data)
except asyncio.CancelledError:
logging.info("Heartbeat loop cancelled")
except Exception as e:
logging.error(f"Heartbeat loop error: {e}")
async def main():
"""Main entry point."""
server = ControlServer()
try:
await server.run()
except KeyboardInterrupt:
logging.info("Server interrupted by user")
except Exception as e:
logging.error(f"Server error: {e}")
finally:
await server.lighting_controller.led_controller.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,514 +0,0 @@
import asyncio
import tkinter as tk
from tkinter import ttk, messagebox # Import messagebox for confirmations
import json
import os
from async_tkinter_loop import async_handler, async_mainloop
from networking import WebSocketClient
import color_utils
import mido # Import mido for MIDI port detection
import time
from midi import MidiHandler # Import MidiHandler
# Configuration file path
CONFIG_FILE = "config.json"
# Dark theme colors (unchanged)
bg_color = "#2e2e2e"
fg_color = "white"
trough_color_red = "#4a0000"
trough_color_green = "#004a00"
trough_color_blue = "#00004a"
trough_color_brightness = "#4a4a4a"
trough_color_delay = "#4a4a4a"
active_bg_color = "#4a4a4a"
highlight_pattern_color = "#6a5acd"
# New color for active color in palette
active_palette_color_border = "#FFD700" # Gold color
class App:
def __init__(self) -> None:
self.root = tk.Tk()
# self.root.attributes("-fullscreen", True)
self.root.configure(bg=bg_color)
# --- WebSocketClient ---
self.websocket_client = WebSocketClient("ws://192.168.4.1:80/ws")
self.root.after(100, async_handler(self.websocket_client.connect))
# --- MIDI Configuration ---
self.available_midi_ports = self.get_midi_ports()
self.current_midi_port_index = self.load_midi_device_preference() # Load saved preference
self.midi_handler: MidiHandler | None = None
self.midi_task: asyncio.Task | None = None
# Initialize MIDI handler with saved port (will be done after GUI is created)
self.pending_midi_init = True if self.available_midi_ports else False
# Configure ttk style (unchanged)
style = ttk.Style()
style.theme_use("alt")
style.configure(".", background=bg_color, foreground=fg_color, font=("Arial", 14))
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure(
"TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5]
)
# (Status box removed per request)
# MIDI Controller Selection
midi_frame = ttk.LabelFrame(self.root, text="MIDI Controller")
midi_frame.pack(padx=16, pady=8, fill="x")
# MIDI port dropdown
self.midi_port_var = tk.StringVar()
if self.available_midi_ports:
# Set to saved preference if available, otherwise first port
if 0 <= self.current_midi_port_index < len(self.available_midi_ports):
self.midi_port_var.set(self.available_midi_ports[self.current_midi_port_index])
else:
self.midi_port_var.set(self.available_midi_ports[0])
self.current_midi_port_index = 0
midi_dropdown = ttk.Combobox(
midi_frame,
textvariable=self.midi_port_var,
values=self.available_midi_ports,
state="readonly",
font=("Arial", 12)
)
midi_dropdown.pack(padx=8, pady=4, fill="x")
midi_dropdown.bind("<<ComboboxSelected>>", self.on_midi_port_change)
# Refresh MIDI ports button
refresh_button = ttk.Button(
midi_frame,
text="Refresh MIDI Ports",
command=self.refresh_midi_ports
)
refresh_button.pack(padx=8, pady=4)
# MIDI connection status
self.midi_status_label = tk.Label(
midi_frame,
text="Status: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 10)
)
self.midi_status_label.pack(padx=8, pady=2)
# Initialize MIDI handler now that GUI is ready
if self.pending_midi_init:
self.initialize_midi_handler()
self.pending_midi_init = False
# Controls overview (dials grid left, buttons grids right)
controls_frame = ttk.Frame(self.root)
controls_frame.pack(padx=16, pady=8, fill="both")
# Dials box: 4 rows by 2 columns (top-left origin):
# Row0: n3 (left), Delay (right)
# Row1: n1 (left), n2 (right)
# Row2: B (left), Brightness (right)
# Row3: R (left), G (right)
dials_frame = ttk.LabelFrame(controls_frame, text="Dials (CC30-37)")
dials_frame.pack(side="left", padx=12)
for c in range(2):
dials_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
dials_frame.grid_rowconfigure(rr, minsize=70)
self.dials_boxes: list[tk.Label] = []
# Create with placeholders so they are visible before first update
placeholders = {
(0, 0): "n3\n-",
(0, 1): "Delay\n-",
(1, 0): "n1\n-",
(1, 1): "n2\n-",
(2, 0): "B\n-",
(2, 1): "Bright\n-",
(3, 0): "R\n-",
(3, 1): "G\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
dials_frame,
text=placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6,
pady=6,
borderwidth=2,
relief="ridge",
width=14,
height=4,
anchor="center",
justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.dials_boxes.append(lbl)
# Additional knobs box: 4 rows by 2 columns (CC38-45)
# Row0: K1 (left), K2 (right)
# Row1: K3 (left), K4 (right)
# Row2: K5 (left), K6 (right)
# Row3: K7 (left), K8 (right)
knobs_frame = ttk.LabelFrame(controls_frame, text="Knobs (CC38-45)")
knobs_frame.pack(side="left", padx=12)
for c in range(2):
knobs_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
knobs_frame.grid_rowconfigure(rr, minsize=70)
self.knobs_boxes: list[tk.Label] = []
# Create with placeholders so they are visible before first update
knob_placeholders = {
(0, 0): "CC44\n-",
(0, 1): "CC45\n-",
(1, 0): "Rad n1\n-",
(1, 1): "Rad delay\n-",
(2, 0): "Alt n1\n-",
(2, 1): "Alt n2\n-",
(3, 0): "Pulse n1\n-",
(3, 1): "Pulse n2\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
knobs_frame,
text=knob_placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6,
pady=6,
borderwidth=2,
relief="ridge",
width=14,
height=4,
anchor="center",
justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.knobs_boxes.append(lbl)
# Buttons bank (single)
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(side="left", padx=12)
buttons1_frame = ttk.LabelFrame(buttons_frame, text="Buttons (notes 36-51)")
buttons1_frame.pack(side="top", pady=8)
for c in range(4):
buttons1_frame.grid_columnconfigure(c, minsize=140)
for rr in range(1, 5):
buttons1_frame.grid_rowconfigure(rr, minsize=70)
self.button1_cells: list[tk.Label] = []
for r in range(4):
for c in range(4):
lbl = tk.Label(
buttons1_frame,
text="",
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6,
pady=6,
borderwidth=2,
relief="ridge",
width=14,
height=4,
anchor="center",
justify="center",
)
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
self.button1_cells.append(lbl)
# (No second buttons bank)
# (No status labels to pack)
# schedule periodic UI updates
self.root.after(200, self.update_status_labels)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
async_mainloop(self.root)
def load_midi_device_preference(self):
"""Load saved MIDI device preference from config file"""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
saved_index = config.get('midi_device_index', 0)
print(f"Loaded MIDI device preference: index {saved_index}")
return saved_index
except Exception as e:
print(f"Error loading MIDI device preference: {e}")
return 0 # Default to first port
def save_midi_device_preference(self):
"""Save current MIDI device preference to config file"""
try:
config = {
'midi_device_index': self.current_midi_port_index,
'midi_device_name': self.available_midi_ports[self.current_midi_port_index] if self.available_midi_ports else None
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
print(f"Saved MIDI device preference: {config['midi_device_name']} (index {config['midi_device_index']})")
except Exception as e:
print(f"Error saving MIDI device preference: {e}")
def get_midi_ports(self):
"""Get list of available MIDI input ports"""
try:
port_names = mido.get_input_names()
return port_names
except Exception as e:
print(f"Error getting MIDI ports: {e}")
return []
def initialize_midi_handler(self):
"""Initialize MIDI handler with current port"""
if not self.available_midi_ports:
print("No MIDI ports available")
return
try:
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
print(f"Initializing MIDI handler with port index {self.current_midi_port_index}")
self.midi_handler = MidiHandler(self.current_midi_port_index, WEBSOCKET_SERVER_URI)
print("MIDI handler initialized")
# Update status
port_name = self.available_midi_ports[self.current_midi_port_index]
if hasattr(self, 'midi_status_label'):
self.midi_status_label.config(text=f"Status: Connected to {port_name}", fg="green")
# Start MIDI in background
self.root.after(0, async_handler(self.start_midi))
except Exception as e:
print(f"Error initializing MIDI handler: {e}")
messagebox.showerror("MIDI Error", f"Failed to initialize MIDI handler:\n{e}")
self.midi_handler = None
if hasattr(self, 'midi_status_label'):
self.midi_status_label.config(text="Status: Error", fg="red")
def refresh_midi_ports(self):
"""Refresh the list of available MIDI ports"""
old_ports = self.available_midi_ports.copy()
self.available_midi_ports = self.get_midi_ports()
# Update dropdown values
midi_dropdown = None
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
midi_dropdown = widget
break
break
if midi_dropdown:
midi_dropdown['values'] = self.available_midi_ports
if self.available_midi_ports and self.midi_port_var.get() not in self.available_midi_ports:
# Current selection is no longer available, select first available
self.midi_port_var.set(self.available_midi_ports[0])
self.current_midi_port_index = 0
self.save_midi_device_preference() # Save the new preference
self.restart_midi_handler()
elif not self.available_midi_ports:
# No ports available
self.midi_port_var.set("No MIDI ports found")
if hasattr(self, 'midi_status_label'):
self.midi_status_label.config(text="Status: No MIDI ports available", fg="orange")
# Stop current MIDI handler if running
if self.midi_task and not self.midi_task.done():
self.midi_task.cancel()
self.midi_handler = None
print(f"MIDI ports refreshed. Available: {self.available_midi_ports}")
def on_midi_port_change(self, event):
"""Handle MIDI port selection change"""
selected_port = self.midi_port_var.get()
if selected_port in self.available_midi_ports:
self.current_midi_port_index = self.available_midi_ports.index(selected_port)
print(f"MIDI port changed to: {selected_port} (index: {self.current_midi_port_index})")
self.save_midi_device_preference() # Save the new preference
self.restart_midi_handler()
@async_handler
async def restart_midi_handler(self):
"""Restart MIDI handler with new port"""
try:
# Stop current MIDI task
if self.midi_task and not self.midi_task.done():
self.midi_task.cancel()
try:
await self.midi_task
except asyncio.CancelledError:
pass
# Initialize new MIDI handler
self.initialize_midi_handler()
except Exception as e:
print(f"Error restarting MIDI handler: {e}")
messagebox.showerror("MIDI Error", f"Failed to restart MIDI handler:\n{e}")
@async_handler
async def start_midi(self):
# Launch MidiHandler.run() as a background task
if self.midi_handler and (self.midi_task is None or self.midi_task.done()):
self.midi_task = asyncio.create_task(self.midi_handler.run())
elif not self.midi_handler:
print("Cannot start MIDI: no MIDI handler available")
def on_closing(self):
print("Closing application...")
if self.midi_task and not self.midi_task.done():
self.midi_task.cancel()
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
# --- Asynchronous Update Functions ---
@async_handler
async def update_rgb(self, tab):
await asyncio.sleep(0) # Yield control
def update_status_labels(self):
# Check if MIDI handler is available
if not self.midi_handler:
# Update dial displays with placeholder values
placeholders = [
("n3", "-"), ("Delay", "-"),
("n1", "-"), ("n2", "-"),
("B", "-"), ("Brightness", "-"),
("R", "-"), ("G", "-"),
]
for idx, (label, value) in enumerate(placeholders):
if idx < len(self.dials_boxes):
self.dials_boxes[idx].config(text=f"{label}\n{value}")
# Update knobs with placeholder values
knob_placeholders = [
("CC44", "-"), ("CC45", "-"),
("Rad n1", "-"), ("Rad delay", "-"),
("Alt n1", "-"), ("Alt n2", "-"),
("Pulse n1", "-"), ("Pulse n2", "-"),
]
for idx, (label, value) in enumerate(knob_placeholders):
if idx < len(self.knobs_boxes):
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
# Update buttons with no selection
for lbl in self.button1_cells:
lbl.config(text="", bg=bg_color)
# Reschedule
self.root.after(200, self.update_status_labels)
return
# Pull values from midi_handler
delay = self.midi_handler.delay
brightness = self.midi_handler.brightness
r = getattr(self.midi_handler, 'color_r', 0)
g = getattr(self.midi_handler, 'color_g', 0)
b = getattr(self.midi_handler, 'color_b', 0)
# Single bank values
brightness = getattr(self.midi_handler, 'brightness', '-')
r = getattr(self.midi_handler, 'color_r', 0)
g = getattr(self.midi_handler, 'color_g', 0)
b = getattr(self.midi_handler, 'color_b', 0)
pattern = getattr(self.midi_handler, 'current_pattern', '') or '-'
n1 = getattr(self.midi_handler, 'n1', '-')
n2 = getattr(self.midi_handler, 'n2', '-')
n3 = getattr(self.midi_handler, 'n3', '-')
# Update dials 2x4 grid (left→right, top→bottom):
# Row0: n3, Delay
# Row1: n1, n2
# Row2: B, Brightness
# Row3: R, G
dial_values = [
("n3", n3), ("Delay", getattr(self.midi_handler, 'delay', '-')),
("n1", n1), ("n2", n2),
("B", b), ("Brightness", brightness),
("R", r), ("G", g),
]
# Update dial displays
for idx, (label, value) in enumerate(dial_values):
if idx < len(self.dials_boxes):
self.dials_boxes[idx].config(text=f"{label}\n{value}")
# Update additional knobs (CC38-45)
knob_values = [
("CC44", getattr(self.midi_handler, 'knob7', '-')), ("CC45", getattr(self.midi_handler, 'knob8', '-')),
("Rad n1", getattr(self.midi_handler, 'n1', '-')), ("Rad delay", getattr(self.midi_handler, 'delay', '-')),
("Alt n1", getattr(self.midi_handler, 'n1', '-')), ("Alt n2", getattr(self.midi_handler, 'n2', '-')),
("Pulse n1", getattr(self.midi_handler, 'n1', '-')), ("Pulse n2", getattr(self.midi_handler, 'n2', '-')),
]
for idx, (label, value) in enumerate(knob_values):
if idx < len(self.knobs_boxes):
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
# Update buttons bank mappings and selection (single bank)
# Pattern icons for nicer appearance
icon_for = {
"pulse": "💥",
"flicker": "",
"alternating": "↔️",
"n chase": "🏃",
"rainbow": "🌈",
"radiate": "🌟",
"sequential\npulse": "🔄",
"alternating\nphase": "",
"-": "",
}
bank1_patterns = [
# Pulse patterns (row 1)
"pulse", "sequential\npulse",
# Alternating patterns (row 2)
"alternating", "alternating\nphase",
# Chase/movement patterns (row 3)
"n chase", "rainbow",
# Effect patterns (row 4)
"flicker", "radiate",
"-", "-", "-", "-",
"-", "-", "-", "-",
]
# Map MIDI handler pattern names to GUI display names
pattern_name_mapping = {
"sequential_pulse": "sequential\npulse",
"alternating_phase": "alternating\nphase",
"n_chase": "n chase",
}
# Get the display name for the current pattern
display_pattern = pattern_name_mapping.get(pattern, pattern)
# notes numbers per cell (bottom-left origin)
for idx, lbl in enumerate(self.button1_cells):
name = bank1_patterns[idx]
sel = (display_pattern == name and name != "-")
icon = icon_for.get(name, "")
text = f"{icon} {name}" if name != "-" else ""
if sel:
lbl.config(text=text, bg=highlight_pattern_color)
else:
lbl.config(text=text, bg=bg_color)
# (no second bank to update)
# reschedule
self.root.after(200, self.update_status_labels)
if __name__ == "__main__":
app = App()

View File

@@ -1,570 +0,0 @@
import mido
import asyncio
import networking
import socket
import json
import logging # Added logging import
import time # Added for initial state read
import tkinter as tk
from tkinter import ttk, messagebox # Import messagebox for confirmations
from bar_config import LED_BAR_NAMES, DEFAULT_BAR_SETTINGS
# Pattern name mapping for shorter JSON payloads
PATTERN_NAMES = {
"flicker": "f",
"fill_range": "fr",
"n_chase": "nc",
"alternating": "a",
"pulse": "p",
"rainbow": "r",
"specto": "s",
"radiate": "rd",
}
# Configure logging
DEBUG_MODE = True # Set to False for INFO level logging
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# TCP Server Configuration
TCP_HOST = "127.0.0.1"
TCP_PORT = 65432
# Sound Control Server Configuration (for sending reset)
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
class MidiHandler:
def __init__(self, midi_port_index: int, websocket_uri: str):
self.midi_port_index = midi_port_index
self.websocket_uri = websocket_uri
self.ws_client = networking.WebSocketClient(websocket_uri)
self.delay = 100 # Default delay value, controlled by MIDI controller
self.brightness = 100 # Default brightness value, controlled by MIDI controller
self.tcp_host = TCP_HOST
self.tcp_port = TCP_PORT
self.beat_sending_enabled = True # New: Local flag for beat sending
self.sound_control_host = SOUND_CONTROL_HOST
self.sound_control_port = SOUND_CONTROL_PORT
# RGB controlled by CC 30/31/32 (default green)
self.color_r = 0
self.color_g = 255
self.color_b = 0
# Generic parameters controlled via CC
# Raw CC-driven parameters (0-127)
self.n1 = 10
self.n2 = 10
self.n3 = 1
# Additional knobs (CC38-45)
self.knob1 = 0
self.knob2 = 0
self.knob3 = 0
self.knob4 = 0
self.knob5 = 0
self.knob6 = 0
self.knob7 = 0
self.knob8 = 0
# Current state for GUI display
self.current_bpm: float | None = None
self.current_pattern: str = ""
self.beat_index: int = 0
# Rate limiting for parameter updates
self.last_param_update: float = 0.0
self.param_update_interval: float = 0.1 # 100ms minimum between updates
self.pending_param_update: bool = False
# Sequential pulse pattern state
self.sequential_pulse_enabled: bool = False
self.sequential_pulse_step: int = 0
def _current_color_rgb(self) -> tuple:
r = max(0, min(255, int(self.color_r)))
g = max(0, min(255, int(self.color_g)))
b = max(0, min(255, int(self.color_b)))
return (r, g, b)
async def _handle_sequential_pulse(self):
"""Handle sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored"""
from bar_config import LEFT_BARS, RIGHT_BARS
# Calculate which bar should pulse based on beat (1 beat per bar)
bar_index = self.beat_index % 4 # 0-3, cycles every 4 beats
# Create minimal payload - defaults to off
payload = {
"d": { # Defaults - off for all bars
"t": "b", # Message type: beat
"pt": "o", # off
}
}
# Set specific bars to pulse
left_bar = LEFT_BARS[bar_index]
right_bar = RIGHT_BARS[bar_index]
payload[left_bar] = {"pt": "p"} # pulse
payload[right_bar] = {"pt": "p"} # pulse
# logging.debug(f"[Sequential Pulse] Beat {self.beat_index}, pulsing bars {left_bar} and {right_bar}")
await self.ws_client.send_data(payload)
async def _handle_alternating_phase(self):
"""Handle alternating pattern with phase offset: every second bar uses different step"""
from bar_config import LED_BAR_NAMES
# Create minimal payload - same n1/n2 for all bars
payload = {
"d": { # Defaults - pattern and n1/n2
"t": "b", # Message type: beat
"pt": "a", # alternating
"n1": self.n1,
"n2": self.n2,
"s": self.beat_index % 2, # Default step for in-phase bars
}
}
# Set step offset for every second bar (bars 101, 103, 105, 107)
swap_bars = ["101", "103", "105", "107"]
for bar_name in LED_BAR_NAMES:
if bar_name in swap_bars:
# Send step offset for out-of-phase bars
payload[bar_name] = {"s": (self.beat_index + 1) % 2}
else:
# In-phase bars use defaults (no override needed)
payload[bar_name] = {}
# logging.debug(f"[Alternating Phase] Beat {self.beat_index}, step offset for bars {swap_bars}")
await self.ws_client.send_data(payload)
async def _send_full_parameters(self):
"""Send all parameters to bars - may require multiple packets due to size limit"""
from bar_config import LED_BAR_NAMES
# Calculate packet size for full parameters
full_payload = {
"d": {
"t": "u", # Message type: update
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
"dl": self.delay,
"cl": [self._current_color_rgb()],
"br": self.brightness,
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"s": self.beat_index % 256, # Use full range for rainbow patterns
}
}
# Estimate size: ~200 bytes for defaults + 8 bars * 2 bytes = ~216 bytes
# This should fit in one packet, but let's be safe
payload_size = len(str(full_payload))
if payload_size > 200: # Split into 2 packets if too large
# Packet 1: Pattern and timing parameters
payload1 = {
"d": {
"t": "u", # Message type: update
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
"dl": self.delay,
"br": self.brightness,
}
}
for bar_name in LED_BAR_NAMES:
payload1[bar_name] = {}
# Packet 2: Color and pattern parameters
payload2 = {
"d": {
"t": "u", # Message type: update
"cl": [self._current_color_rgb()],
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"s": self.beat_index % 2, # Keep step small (0 or 1) for alternating patterns
}
}
for bar_name in LED_BAR_NAMES:
payload2[bar_name] = {}
# logging.debug(f"[Full Params] Sending in 2 packets due to size ({payload_size} bytes)")
await self.ws_client.send_data(payload1)
await asyncio.sleep(0.01) # Small delay between packets
await self.ws_client.send_data(payload2)
else:
# Single packet
for bar_name in LED_BAR_NAMES:
full_payload[bar_name] = {}
# logging.debug(f"[Full Params] Sending single packet ({payload_size} bytes)")
await self.ws_client.send_data(full_payload)
async def _request_param_update(self):
"""Request a parameter update with rate limiting"""
import time
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
# Can send immediately
self.last_param_update = current_time
await self._send_full_parameters()
# logging.debug("[Rate Limit] Parameter update sent immediately")
else:
# Rate limited - mark as pending
self.pending_param_update = True
# logging.debug("[Rate Limit] Parameter update queued (rate limited)")
async def _send_normal_pattern(self):
"""Send normal pattern to all bars - include required parameters"""
# Patterns that need specific parameters
patterns_needing_params = ["alternating", "flicker", "n_chase", "rainbow", "radiate"]
payload = {
"d": { # Defaults
"t": "b", # Message type: beat
"pt": PATTERN_NAMES.get(self.current_pattern, self.current_pattern),
}
}
# Add required parameters for patterns that need them
if self.current_pattern in patterns_needing_params:
payload["d"].update({
"n1": self.n1,
"n2": self.n2,
"n3": self.n3,
"dl": self.delay,
"s": self.beat_index % 256, # Use full range for rainbow patterns
})
# Add empty entries for each bar (they'll use defaults)
for bar_name in LED_BAR_NAMES:
payload[bar_name] = {}
# logging.debug(f"[Beat] Triggering '{self.current_pattern}' for {len(LED_BAR_NAMES)} bars")
await self.ws_client.send_data(payload)
async def _send_reset_to_sound(self):
try:
reader, writer = await asyncio.open_connection(self.sound_control_host, self.sound_control_port)
cmd = "RESET_TEMPO\n".encode('utf-8')
writer.write(cmd)
await writer.drain()
resp = await reader.read(100)
logging.info(f"[MidiHandler - Control] Sent RESET_TEMPO, response: {resp.decode().strip()}")
writer.close()
await writer.wait_closed()
except Exception as e:
logging.error(f"[MidiHandler - Control] Failed to send RESET_TEMPO: {e}")
async def _handle_tcp_client(self, reader, writer):
addr = writer.get_extra_info('peername')
logging.info(f"[MidiHandler - TCP Server] Connected by {addr}") # Changed to info
try:
while True:
data = await reader.read(4096) # Read up to 4KB of data
if not data:
logging.info(f"[MidiHandler - TCP Server] Client {addr} disconnected.") # Changed to info
break
message = data.decode().strip()
# logging.debug(f"[MidiHandler - TCP Server] Received from {addr}: {message}") # Changed to debug
if self.beat_sending_enabled:
try:
# Attempt to parse as float (BPM) from sound.py
bpm_value = float(message)
self.current_bpm = bpm_value
# On each beat, trigger currently selected pattern(s)
if not self.current_pattern:
pass # No pattern selected yet; ignoring beat
else:
self.beat_index = (self.beat_index + 1) % 1000000
# Send periodic parameter updates every 8 beats
if self.beat_index % 8 == 0:
await self._send_full_parameters()
# Check for pending parameter updates (rate limited)
if self.pending_param_update:
import time
current_time = time.time()
if current_time - self.last_param_update >= self.param_update_interval:
self.last_param_update = current_time
self.pending_param_update = False
await self._send_full_parameters()
# logging.debug("[Rate Limit] Pending parameter update sent")
if self.current_pattern == "sequential_pulse":
# Sequential pulse pattern: each bar pulses for 1 beat, then next bar, mirrored
await self._handle_sequential_pulse()
elif self.current_pattern == "alternating_phase":
# Alternating pattern with phase offset: every second bar is out of phase
await self._handle_alternating_phase()
elif self.current_pattern:
# Normal pattern mode - run on all bars
await self._send_normal_pattern()
except ValueError:
logging.warning(f"[MidiHandler - TCP Server] Received non-BPM message from {addr}, not forwarding: {message}") # Changed to warning
except Exception as e:
logging.error(f"[MidiHandler - TCP Server] Error processing received message from {addr}: {e}") # Changed to error
else:
pass # Beat sending disabled
except asyncio.CancelledError:
logging.info(f"[MidiHandler - TCP Server] Client handler for {addr} cancelled.") # Changed to info
except Exception as e:
logging.error(f"[MidiHandler - TCP Server] Error handling client {addr}: {e}") # Changed to error
finally:
logging.info(f"[MidiHandler - TCP Server] Closing connection for {addr}") # Changed to info
writer.close()
await writer.wait_closed()
async def _midi_tcp_server(self):
server = await asyncio.start_server(
lambda r, w: self._handle_tcp_client(r, w), self.tcp_host, self.tcp_port)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
logging.info(f"[MidiHandler - TCP Server] Serving on {addrs}") # Changed to info
async with server:
await server.serve_forever()
async def _read_initial_cc_state(self, port, timeout_s: float = 0.5):
"""Read initial CC values from the MIDI device for a short period to populate state."""
start = time.time()
while time.time() - start < timeout_s:
msg = port.receive(block=False)
if msg and msg.type == 'control_change':
if msg.control == 36:
self.n3 = max(1, msg.value)
logging.info(f"[Init] n3 set to {self.n3} from CC36")
elif msg.control == 37:
self.delay = msg.value * 4
logging.info(f"[Init] Delay set to {self.delay} ms from CC37")
elif msg.control == 39:
self.delay = msg.value * 4
logging.info(f"[Init] Delay set to {self.delay} ms from CC39")
elif msg.control == 33:
self.brightness = round((msg.value / 127) * 100)
logging.info(f"[Init] Brightness set to {self.brightness} from CC33")
elif msg.control == 30:
self.color_r = round((msg.value / 127) * 255)
logging.info(f"[Init] Red set to {self.color_r} from CC30")
elif msg.control == 31:
self.color_g = round((msg.value / 127) * 255)
logging.info(f"[Init] Green set to {self.color_g} from CC31")
elif msg.control == 32:
self.color_b = round((msg.value / 127) * 255)
logging.info(f"[Init] Blue set to {self.color_b} from CC32")
elif msg.control == 34:
self.n1 = int(msg.value)
logging.info(f"[Init] n1 set to {self.n1} from CC34")
elif msg.control == 35:
self.n2 = int(msg.value)
logging.info(f"[Init] n2 set to {self.n2} from CC35")
elif msg.control == 27:
self.beat_sending_enabled = (msg.value == 127)
logging.info(f"[Init] Beat sending {'ENABLED' if self.beat_sending_enabled else 'DISABLED'} from CC27")
await asyncio.sleep(0.001)
async def _midi_listener(self):
logging.info("Midi function") # Changed to info
"""
Listens to a specific MIDI port and sends data to a WebSocket server
when Note 32 (and 33) is pressed.
"""
# 1. Get MIDI port name
port_names = mido.get_input_names()
if not port_names:
logging.warning("No MIDI input ports found. Please connect your device.") # Changed to warning
return
if not (0 <= self.midi_port_index < len(port_names)):
logging.error(f"Error: MIDI port index {self.midi_port_index} out of range. Available ports: {port_names}") # Changed to error
logging.info("Available ports:") # Changed to info
for i, name in enumerate(port_names):
logging.info(f" {i}: {name}") # Changed to info
return
midi_port_name = port_names[self.midi_port_index]
logging.info(f"Selected MIDI input port: {midi_port_name}") # Changed to info
try:
with mido.open_input(midi_port_name) as port:
logging.info(f"MIDI port '{midi_port_name}' opened. Press Ctrl+C to stop.") # Changed to info
# Read initial controller state briefly
await self._read_initial_cc_state(port)
while True:
msg = port.receive(block=False) # Non-blocking read
if msg:
# logging.debug(msg) # Changed to debug
match msg.type:
case 'note_on':
# logging.debug(f" Note ON: Note={msg.note}, Velocity={msg.velocity}, Channel={msg.channel}") # Changed to debug
# Bank1 patterns starting at MIDI note 36
pattern_bindings: list[str] = [
# Pulse patterns (row 1)
"pulse",
"sequential_pulse",
# Alternating patterns (row 2)
"alternating",
"alternating_phase",
# Chase/movement patterns (row 3)
"n_chase",
"rainbow",
# Effect patterns (row 4)
"flicker",
"radiate",
]
idx = msg.note - 36
if 0 <= idx < len(pattern_bindings):
pattern_name = pattern_bindings[idx]
self.current_pattern = pattern_name
logging.info(f"[Select] Pattern selected via note {msg.note}: {self.current_pattern} (n1={self.n1}, n2={self.n2})")
# Send full parameters when pattern changes
await self._send_full_parameters()
else:
pass # Note not bound to patterns
case 'control_change':
match msg.control:
case 36:
self.n3 = max(1, msg.value) # Update n3 step rate
logging.info(f"n3 set to {self.n3} by MIDI controller (CC36)")
await self._request_param_update()
case 37:
self.delay = msg.value * 4 # Update instance delay
logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC37)")
await self._request_param_update()
case 38:
self.n1 = msg.value # pulse n1 for pulse patterns
logging.info(f"Pulse n1 set to {self.n1} by MIDI controller (CC38)")
await self._request_param_update()
case 39:
self.n2 = msg.value # pulse n2 for pulse patterns
logging.info(f"Pulse n2 set to {self.n2} by MIDI controller (CC39)")
await self._request_param_update()
case 40:
self.n1 = msg.value # n1 for alternating patterns
logging.info(f"Alternating n1 set to {self.n1} by MIDI controller (CC40)")
await self._request_param_update()
case 41:
self.n2 = msg.value # n2 for alternating patterns
logging.info(f"Alternating n2 set to {self.n2} by MIDI controller (CC41)")
await self._request_param_update()
case 42:
self.n1 = msg.value # radiate n1 for radiate patterns
logging.info(f"Radiate n1 set to {self.n1} by MIDI controller (CC42)")
await self._request_param_update()
case 43:
self.delay = msg.value * 4 # delay for radiate patterns
logging.info(f"Delay set to {self.delay} ms by MIDI controller (CC43)")
await self._request_param_update()
case 44:
self.knob7 = msg.value
logging.info(f"Knob7 set to {self.knob7} by MIDI controller (CC44)")
await self._request_param_update()
case 45:
self.knob8 = msg.value
logging.info(f"Knob8 set to {self.knob8} by MIDI controller (CC45)")
await self._request_param_update()
case 27:
if msg.value == 127:
self.beat_sending_enabled = True
logging.info("[MidiHandler - Listener] Beat sending ENABLED by MIDI control.") # Changed to info
elif msg.value == 0:
self.beat_sending_enabled = False
logging.info("[MidiHandler - Listener] Beat sending DISABLED by MIDI control.") # Changed to info
case 29:
if msg.value == 127:
logging.info("[MidiHandler - Listener] RESET_TEMPO requested by control 29.")
await self._send_reset_to_sound()
case 33:
# Map 0-127 to 0-100 brightness scale
self.brightness = round((msg.value / 127) * 100)
logging.info(f"Brightness set to {self.brightness} by MIDI controller (CC33)")
await self._request_param_update()
case 30:
# Red 0-127 -> 0-255
self.color_r = round((msg.value / 127) * 255)
logging.info(f"Red set to {self.color_r}")
await self._request_param_update()
case 31:
# Green 0-127 -> 0-255
self.color_g = round((msg.value / 127) * 255)
logging.info(f"Green set to {self.color_g}")
await self._request_param_update()
case 32:
# Blue 0-127 -> 0-255
self.color_b = round((msg.value / 127) * 255)
logging.info(f"Blue set to {self.color_b}")
await self._request_param_update()
case 34:
self.n1 = int(msg.value)
logging.info(f"n1 set to {self.n1} by MIDI controller (CC34)")
await self._request_param_update()
case 35:
self.n2 = int(msg.value)
logging.info(f"n2 set to {self.n2} by MIDI controller (CC35)")
await self._request_param_update()
await asyncio.sleep(0.001) # Important: Yield control to asyncio event loop
except mido.PortsError as e:
logging.error(f"Error opening MIDI port '{midi_port_name}': {e}") # Changed to error
except asyncio.CancelledError:
logging.info(f"MIDI listener cancelled.") # Changed to info
except Exception as e:
logging.error(f"An unexpected error occurred in MIDI listener: {e}") # Changed to error
async def run(self):
try:
await self.ws_client.connect()
logging.info(f"[MidiHandler] WebSocket client connected to {self.ws_client.uri}") # Changed to info
# List available MIDI ports for debugging
print(f"Available MIDI input ports: {mido.get_input_names()}")
print(f"Trying to open MIDI port index {self.midi_port_index}")
await asyncio.gather(
self._midi_listener(),
self._midi_tcp_server()
)
except mido.PortsError as e:
logging.error(f"[MidiHandler] Error opening MIDI port: {e}") # Changed to error
print(f"MIDI Port Error: {e}")
print(f"Available MIDI ports: {mido.get_input_names()}")
print("Please check your MIDI device connection and port index")
except asyncio.CancelledError:
logging.info("[MidiHandler] Tasks cancelled due to program shutdown.") # Changed to info
except KeyboardInterrupt:
logging.info("\n[MidiHandler] Program interrupted by user.") # Changed to info
finally:
logging.info("[MidiHandler] Main program finished. Closing WebSocket client...") # Changed to info
await self.ws_client.close()
logging.info("[MidiHandler] WebSocket client closed.") # Changed to info
def print_midi_ports():
logging.info("\n--- Available MIDI Input Ports ---") # Changed to info
port_names = mido.get_input_names()
if not port_names:
logging.warning("No MIDI input ports found.") # Changed to warning
else:
for i, name in enumerate(port_names):
logging.info(f" {i}: {name}") # Changed to info
logging.info("----------------------------------") # Changed to info
async def main():
print_midi_ports()
# --- Configuration ---
MIDI_PORT_INDEX = 1 # <--- IMPORTANT: Change this to the correct index for your device
WEBSOCKET_SERVER_URI = "ws://192.168.4.1:80/ws"
# --- End Configuration ---
midi_handler = MidiHandler(MIDI_PORT_INDEX, WEBSOCKET_SERVER_URI)
await midi_handler.run()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,210 +0,0 @@
#!/usr/bin/python
import pyaudio
import aubio
import numpy as np
from time import sleep
import json
import socket
import time
import logging # Added logging import
import asyncio # Re-added asyncio import
import threading # Added threading for control server
# Configure logging
DEBUG_MODE = True # Set to False for INFO level logging
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# TCP Server Configuration (assuming midi.py runs this)
MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432
# Sound Control Server Configuration (for midi.py to control sound.py)
SOUND_CONTROL_HOST = "127.0.0.1"
SOUND_CONTROL_PORT = 65433
class SoundBeatDetector:
def __init__(self, tcp_host: str, tcp_port: int):
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.tcp_socket = None
self.connected_to_midi = False
self.reconnect_delay = 1 # seconds
# Note: beat_sending_enabled is not used in this simplified flow
self.bufferSize = 512
self.windowSizeMultiple = 2
self.audioInputDeviceIndex = 7
self.audioInputChannels = 1
self.pa = pyaudio.PyAudio()
logging.info("Available audio input devices:")
info = self.pa.get_host_api_info_by_index(0)
num_devices = info.get('deviceCount')
found_device = False
for i in range(0, num_devices):
device_info = self.pa.get_device_info_by_host_api_device_index(0, i)
if (device_info.get('maxInputChannels')) > 0:
logging.info(f" Input Device id {i} - {device_info.get('name')}")
if i == self.audioInputDeviceIndex:
found_device = True
if not found_device:
logging.warning(f"Audio input device index {self.audioInputDeviceIndex} not found or has no input channels.")
try:
audioInputDevice = self.pa.get_device_info_by_index(self.audioInputDeviceIndex)
self.audioInputSampleRate = int(audioInputDevice['defaultSampleRate'])
except Exception as e:
logging.error(f"Error getting audio device info for index {self.audioInputDeviceIndex}: {e}")
self.pa.terminate()
exit()
self.hopSize = self.bufferSize
self.winSize = self.hopSize * self.windowSizeMultiple
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
self.inputStream = None
self._control_thread = None
self._connect_to_midi_server()
self._start_control_server() # Start control server in background
def reset_tempo_detection(self):
"""Re-initializes the aubio tempo detection object."""
logging.info("[SoundBeatDetector] Resetting tempo detection.")
self.tempoDetection = aubio.tempo(method='default', buf_size=self.winSize, hop_size=self.hopSize, samplerate=self.audioInputSampleRate)
def _control_server_loop(self):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((SOUND_CONTROL_HOST, SOUND_CONTROL_PORT))
srv.listen(5)
logging.info(f"[SoundBeatDetector - Control] Listening on {SOUND_CONTROL_HOST}:{SOUND_CONTROL_PORT}")
while True:
conn, addr = srv.accept()
with conn:
logging.info(f"[SoundBeatDetector - Control] Connection from {addr}")
try:
data = conn.recv(1024)
if not data:
continue
command = data.decode().strip()
logging.debug(f"[SoundBeatDetector - Control] Received command: {command}")
if command == "RESET_TEMPO":
self.reset_tempo_detection()
response = "OK: Tempo reset\n"
else:
response = "ERROR: Unknown command\n"
conn.sendall(response.encode('utf-8'))
except Exception as e:
logging.error(f"[SoundBeatDetector - Control] Error handling control message: {e}")
except Exception as e:
logging.error(f"[SoundBeatDetector - Control] Server error: {e}")
finally:
try:
srv.close()
except Exception:
pass
def _start_control_server(self):
if self._control_thread and self._control_thread.is_alive():
return
self._control_thread = threading.Thread(target=self._control_server_loop, daemon=True)
self._control_thread.start()
def _connect_to_midi_server(self):
if self.tcp_socket:
self.tcp_socket.close()
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.tcp_socket.settimeout(self.reconnect_delay)
try:
logging.info(f"[SoundBeatDetector] Attempting to connect to MIDI TCP server at {self.tcp_host}:{self.tcp_port}...")
self.tcp_socket.connect((self.tcp_host, self.tcp_port))
self.tcp_socket.setblocking(0)
self.connected_to_midi = True
logging.info(f"[SoundBeatDetector] Successfully connected to MIDI TCP server.")
except (socket.error, socket.timeout) as e:
logging.error(f"[SoundBeatDetector] Failed to connect to MIDI TCP server: {e}")
self.connected_to_midi = False
# Removed _handle_control_client and _control_server (replaced by simple threaded server)
def readAudioFrames(self, in_data, frame_count, time_info, status):
signal = np.frombuffer(in_data, dtype=np.float32)
beat = self.tempoDetection(signal)
if beat:
bpm = self.tempoDetection.get_bpm()
logging.debug(f"beat! (running with {bpm:.2f} bpm)") # Changed to debug
bpm_message = str(bpm)
if self.connected_to_midi and self.tcp_socket:
try:
message_bytes = (bpm_message + "\n").encode('utf-8')
self.tcp_socket.sendall(message_bytes)
logging.debug(f"[SoundBeatDetector] Sent BPM to MIDI TCP server: {bpm_message}") # Changed to debug
except socket.error as e:
logging.error(f"[SoundBeatDetector] Error sending BPM to MIDI TCP server: {e}. Attempting to reconnect...")
self.connected_to_midi = False
self._connect_to_midi_server()
elif not self.connected_to_midi:
logging.warning("[SoundBeatDetector] Not connected to MIDI TCP server, attempting to reconnect...") # Changed to warning
self._connect_to_midi_server()
else:
logging.warning("[SoundBeatDetector] TCP socket not initialized, cannot send BPM.") # Changed to warning
return (in_data, pyaudio.paContinue)
def start_stream(self):
try:
self.inputStream = self.pa.open(format=pyaudio.paFloat32,
input=True,
channels=self.audioInputChannels,
input_device_index=self.audioInputDeviceIndex,
frames_per_buffer=self.bufferSize,
rate=self.audioInputSampleRate,
stream_callback=self.readAudioFrames)
self.inputStream.start_stream()
logging.info("\nAudio stream started. Detecting beats. Press Ctrl+C to stop.")
while self.inputStream.is_active():
sleep(0.1)
except KeyboardInterrupt:
logging.info("\nKeyboardInterrupt: Stopping script gracefully.")
except Exception as e:
logging.error(f"An error occurred with the audio stream: {e}")
finally:
self.stop_stream()
def stop_stream(self):
if self.inputStream and self.inputStream.is_active():
self.inputStream.stop_stream()
if self.inputStream and not self.inputStream.is_stopped():
self.inputStream.close()
self.pa.terminate()
if self.tcp_socket and self.connected_to_midi:
logging.info("[SoundBeatDetector] Closing TCP socket.")
self.tcp_socket.close()
self.connected_to_midi = False
logging.info("SoundBeatDetector stopped.")
# Removed async def run(self)
if __name__ == "__main__":
# TCP Server Configuration (should match midi.py)
MIDI_TCP_HOST = "127.0.0.1"
MIDI_TCP_PORT = 65432
sound_detector = SoundBeatDetector(MIDI_TCP_HOST, MIDI_TCP_PORT)
logging.info("Starting SoundBeatDetector...")
try:
sound_detector.start_stream()
except KeyboardInterrupt:
logging.info("\nProgram interrupted by user.")
except Exception as e:
logging.error(f"An error occurred during main execution: {e}")

View File

@@ -1,602 +0,0 @@
#!/usr/bin/env python3
"""
UI Client for Lighting Controller
Handles the user interface and MIDI controller input.
Communicates with the control server via WebSocket.
"""
import asyncio
import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import mido
import logging
from async_tkinter_loop import async_handler, async_mainloop
import websockets
import websocket
# Configuration
CONFIG_FILE = "config.json"
CONTROL_SERVER_URI = "ws://localhost:8765"
# Dark theme colors
bg_color = "#2e2e2e"
fg_color = "white"
trough_color_red = "#4a0000"
trough_color_green = "#004a00"
trough_color_blue = "#00004a"
trough_color_brightness = "#4a4a4a"
trough_color_delay = "#4a4a4a"
active_bg_color = "#4a4a4a"
highlight_pattern_color = "#6a5acd"
active_palette_color_border = "#FFD700"
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class WebSocketClient:
"""WebSocket client for communicating with the control server."""
def __init__(self, uri):
self.uri = uri
self.websocket = None
self.is_connected = False
self.reconnect_task = None
async def connect(self):
"""Establish WebSocket connection to control server."""
if self.is_connected and self.websocket:
return
try:
logging.info(f"Connecting to control server at {self.uri}...")
self.websocket = await websockets.connect(self.uri)
self.is_connected = True
logging.info("Connected to control server")
except Exception as e:
logging.error(f"Failed to connect to control server: {e}")
self.is_connected = False
self.websocket = None
async def send_message(self, message_type, data=None):
"""Send a message to the control server."""
if not self.is_connected or not self.websocket:
logging.warning("Not connected to control server")
return
try:
message = {
"type": message_type,
"data": data or {}
}
await self.websocket.send(json.dumps(message))
logging.debug(f"Sent message: {message}")
except Exception as e:
logging.error(f"Failed to send message: {e}")
self.is_connected = False
async def close(self):
"""Close WebSocket connection."""
if self.websocket and self.is_connected:
await self.websocket.close()
self.is_connected = False
self.websocket = None
logging.info("Disconnected from control server")
class MidiController:
"""Handles MIDI controller input and sends commands to control server."""
def __init__(self, websocket_client):
self.websocket_client = websocket_client
self.midi_port_index = 0
self.available_ports = []
self.midi_port = None
self.midi_task = None
# MIDI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.knob7 = 0
self.knob8 = 0
self.beat_sending_enabled = True
def get_midi_ports(self):
"""Get list of available MIDI input ports."""
try:
return mido.get_input_names()
except Exception as e:
logging.error(f"Error getting MIDI ports: {e}")
return []
def load_midi_preference(self):
"""Load saved MIDI device preference."""
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
return config.get('midi_device_index', 0)
except Exception as e:
logging.error(f"Error loading MIDI preference: {e}")
return 0
def save_midi_preference(self):
"""Save current MIDI device preference."""
try:
config = {
'midi_device_index': self.midi_port_index,
'midi_device_name': self.available_ports[self.midi_port_index] if self.available_ports else None
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
except Exception as e:
logging.error(f"Error saving MIDI preference: {e}")
async def initialize_midi(self):
"""Initialize MIDI port connection."""
self.available_ports = self.get_midi_ports()
self.midi_port_index = self.load_midi_preference()
if not self.available_ports:
logging.warning("No MIDI ports available")
return False
if not (0 <= self.midi_port_index < len(self.available_ports)):
self.midi_port_index = 0
try:
port_name = self.available_ports[self.midi_port_index]
self.midi_port = mido.open_input(port_name)
logging.info(f"Connected to MIDI port: {port_name}")
return True
except Exception as e:
logging.error(f"Failed to open MIDI port: {e}")
return False
async def start_midi_listener(self):
"""Start listening for MIDI messages."""
if not self.midi_port:
return
try:
while True:
msg = self.midi_port.receive(block=False)
if msg:
await self.handle_midi_message(msg)
await asyncio.sleep(0.001)
except asyncio.CancelledError:
logging.info("MIDI listener cancelled")
except Exception as e:
logging.error(f"MIDI listener error: {e}")
async def handle_midi_message(self, msg):
"""Handle incoming MIDI message and send to control server."""
if msg.type == 'note_on':
# Pattern selection (notes 36-51)
logging.info(f"MIDI Note {msg.note}: {msg.velocity}")
pattern_bindings = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate"
]
idx = msg.note - 36
if 0 <= idx < len(pattern_bindings):
self.current_pattern = pattern_bindings[idx]
await self.websocket_client.send_message("pattern_change", {
"pattern": self.current_pattern
})
logging.info(f"Pattern changed to: {self.current_pattern}")
elif msg.type == 'control_change':
# Handle control change messages
control = msg.control
value = msg.value
logging.info(f"MIDI CC {control}: {value}")
if control == 30: # Red
self.color_r = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 31: # Green
self.color_g = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 32: # Blue
self.color_b = round((value / 127) * 255)
await self.websocket_client.send_message("color_change", {
"r": self.color_r, "g": self.color_g, "b": self.color_b
})
elif control == 33: # Brightness
self.brightness = round((value / 127) * 100)
await self.websocket_client.send_message("brightness_change", {
"brightness": self.brightness
})
elif control == 34: # n1
self.n1 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n1": self.n1
})
elif control == 35: # n2
self.n2 = int(value)
await self.websocket_client.send_message("parameter_change", {
"n2": self.n2
})
elif control == 36: # n3
self.n3 = max(1, value)
await self.websocket_client.send_message("parameter_change", {
"n3": self.n3
})
elif control == 37: # Delay
self.delay = value * 4
await self.websocket_client.send_message("delay_change", {
"delay": self.delay
})
elif control == 27: # Beat sending toggle
self.beat_sending_enabled = (value == 127)
await self.websocket_client.send_message("beat_toggle", {
"enabled": self.beat_sending_enabled
})
def close(self):
"""Close MIDI connection."""
if self.midi_port:
self.midi_port.close()
self.midi_port = None
class UIClient:
"""Main UI client application."""
def __init__(self):
self.root = tk.Tk()
self.root.configure(bg=bg_color)
self.root.title("Lighting Controller - UI Client")
# WebSocket client
self.websocket_client = WebSocketClient(CONTROL_SERVER_URI)
# MIDI controller
self.midi_controller = MidiController(self.websocket_client)
# UI state
self.current_pattern = ""
self.delay = 100
self.brightness = 100
self.color_r = 0
self.color_g = 255
self.color_b = 0
self.n1 = 10
self.n2 = 10
self.n3 = 1
self.setup_ui()
self.setup_async_tasks()
def setup_ui(self):
"""Setup the user interface."""
# Configure ttk style
style = ttk.Style()
style.theme_use("alt")
style.configure(".", background=bg_color, foreground=fg_color, font=("Arial", 14))
style.configure("TNotebook", background=bg_color, borderwidth=0)
style.configure("TNotebook.Tab", background=bg_color, foreground=fg_color, font=("Arial", 30), padding=[10, 5])
# MIDI Controller Selection
midi_frame = ttk.LabelFrame(self.root, text="MIDI Controller")
midi_frame.pack(padx=16, pady=8, fill="x")
# MIDI port dropdown
self.midi_port_var = tk.StringVar()
midi_dropdown = ttk.Combobox(
midi_frame,
textvariable=self.midi_port_var,
values=[],
state="readonly",
font=("Arial", 12)
)
midi_dropdown.pack(padx=8, pady=4, fill="x")
midi_dropdown.bind("<<ComboboxSelected>>", self.on_midi_port_change)
# Refresh MIDI ports button
refresh_button = ttk.Button(
midi_frame,
text="Refresh MIDI Ports",
command=self.refresh_midi_ports
)
refresh_button.pack(padx=8, pady=4)
# MIDI connection status
self.midi_status_label = tk.Label(
midi_frame,
text="Status: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 10)
)
self.midi_status_label.pack(padx=8, pady=2)
# Controls overview
controls_frame = ttk.Frame(self.root)
controls_frame.pack(padx=16, pady=8, fill="both")
# Dials display
dials_frame = ttk.LabelFrame(controls_frame, text="Dials (CC30-37)")
dials_frame.pack(side="left", padx=12)
for c in range(2):
dials_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
dials_frame.grid_rowconfigure(rr, minsize=70)
self.dials_boxes = []
placeholders = {
(0, 0): "n3\n-", (0, 1): "Delay\n-",
(1, 0): "n1\n-", (1, 1): "n2\n-",
(2, 0): "B\n-", (2, 1): "Bright\n-",
(3, 0): "R\n-", (3, 1): "G\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
dials_frame,
text=placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.dials_boxes.append(lbl)
# Knobs display
knobs_frame = ttk.LabelFrame(controls_frame, text="Knobs (CC38-45)")
knobs_frame.pack(side="left", padx=12)
for c in range(2):
knobs_frame.grid_columnconfigure(c, minsize=140)
for rr in range(4):
knobs_frame.grid_rowconfigure(rr, minsize=70)
self.knobs_boxes = []
knob_placeholders = {
(0, 0): "CC44\n-", (0, 1): "CC45\n-",
(1, 0): "Rad n1\n-", (1, 1): "Rad delay\n-",
(2, 0): "Alt n1\n-", (2, 1): "Alt n2\n-",
(3, 0): "Pulse n1\n-", (3, 1): "Pulse n2\n-",
}
for r in range(4):
for c in range(2):
lbl = tk.Label(
knobs_frame,
text=knob_placeholders.get((r, c), "-"),
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=r, column=c, padx=6, pady=6, sticky="nsew")
self.knobs_boxes.append(lbl)
# Buttons display
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(side="left", padx=12)
buttons1_frame = ttk.LabelFrame(buttons_frame, text="Buttons (notes 36-51)")
buttons1_frame.pack(side="top", pady=8)
for c in range(4):
buttons1_frame.grid_columnconfigure(c, minsize=140)
for rr in range(1, 5):
buttons1_frame.grid_rowconfigure(rr, minsize=70)
self.button1_cells = []
for r in range(4):
for c in range(4):
lbl = tk.Label(
buttons1_frame,
text="",
bg=bg_color,
fg=fg_color,
font=("Arial", 14),
padx=6, pady=6,
borderwidth=2, relief="ridge",
width=14, height=4,
anchor="center", justify="center",
)
lbl.grid(row=1 + (3 - r), column=c, padx=6, pady=6, sticky="nsew")
self.button1_cells.append(lbl)
# Connection status
self.connection_status = tk.Label(
self.root,
text="Control Server: Disconnected",
bg=bg_color,
fg="red",
font=("Arial", 12)
)
self.connection_status.pack(pady=8)
# Schedule periodic UI updates
self.root.after(200, self.update_status_labels)
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def setup_async_tasks(self):
"""Setup async tasks for WebSocket and MIDI."""
# Connect to control server
self.root.after(100, async_handler(self.websocket_client.connect))
# Initialize MIDI
self.root.after(200, async_handler(self.initialize_midi))
@async_handler
async def initialize_midi(self):
"""Initialize MIDI controller."""
success = await self.midi_controller.initialize_midi()
if success:
# Update UI
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
if self.midi_controller.available_ports:
self.midi_port_var.set(self.midi_controller.available_ports[self.midi_controller.midi_port_index])
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
break
break
self.midi_status_label.config(
text=f"Status: Connected to {self.midi_controller.available_ports[self.midi_controller.midi_port_index]}",
fg="green"
)
# Start MIDI listener
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def refresh_midi_ports(self):
"""Refresh MIDI ports list."""
old_ports = self.midi_controller.available_ports.copy()
self.midi_controller.available_ports = self.midi_controller.get_midi_ports()
# Update dropdown
for child in self.root.winfo_children():
if isinstance(child, ttk.LabelFrame) and child.cget("text") == "MIDI Controller":
for widget in child.winfo_children():
if isinstance(widget, ttk.Combobox):
widget['values'] = self.midi_controller.available_ports
if (self.midi_controller.available_ports and
self.midi_port_var.get() not in self.midi_controller.available_ports):
self.midi_port_var.set(self.midi_controller.available_ports[0])
self.midi_controller.midi_port_index = 0
self.midi_controller.save_midi_preference()
break
break
def on_midi_port_change(self, event):
"""Handle MIDI port selection change."""
selected_port = self.midi_port_var.get()
if selected_port in self.midi_controller.available_ports:
self.midi_controller.midi_port_index = self.midi_controller.available_ports.index(selected_port)
self.midi_controller.save_midi_preference()
# Restart MIDI connection
asyncio.create_task(self.restart_midi())
@async_handler
async def restart_midi(self):
"""Restart MIDI connection with new port."""
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
if self.midi_controller.midi_port:
self.midi_controller.midi_port.close()
success = await self.midi_controller.initialize_midi()
if success:
self.midi_controller.midi_task = asyncio.create_task(
self.midi_controller.start_midi_listener()
)
def update_status_labels(self):
"""Update UI status labels."""
# Update connection status
if self.websocket_client.is_connected:
self.connection_status.config(text="Control Server: Connected", fg="green")
else:
self.connection_status.config(text="Control Server: Disconnected", fg="red")
# Update dial displays
dial_values = [
("n3", self.midi_controller.n3), ("Delay", self.midi_controller.delay),
("n1", self.midi_controller.n1), ("n2", self.midi_controller.n2),
("B", self.midi_controller.color_b), ("Brightness", self.midi_controller.brightness),
("R", self.midi_controller.color_r), ("G", self.midi_controller.color_g),
]
for idx, (label, value) in enumerate(dial_values):
if idx < len(self.dials_boxes):
self.dials_boxes[idx].config(text=f"{label}\n{value}")
# Update knobs
knob_values = [
("CC44", self.midi_controller.knob7), ("CC45", self.midi_controller.knob8),
("Rad n1", self.midi_controller.n1), ("Rad delay", self.midi_controller.delay),
("Alt n1", self.midi_controller.n1), ("Alt n2", self.midi_controller.n2),
("Pulse n1", self.midi_controller.n1), ("Pulse n2", self.midi_controller.n2),
]
for idx, (label, value) in enumerate(knob_values):
if idx < len(self.knobs_boxes):
self.knobs_boxes[idx].config(text=f"{label}\n{value}")
# Update buttons
icon_for = {
"pulse": "💥", "flicker": "", "alternating": "↔️",
"n_chase": "🏃", "rainbow": "🌈", "radiate": "🌟",
"sequential_pulse": "🔄", "alternating_phase": "", "-": "",
}
bank1_patterns = [
"pulse", "sequential_pulse", "alternating", "alternating_phase",
"n_chase", "rainbow", "flicker", "radiate",
"-", "-", "-", "-", "-", "-", "-", "-",
]
# Display names for UI (with line breaks for better display)
display_names = {
"pulse": "pulse",
"sequential_pulse": "sequential\npulse",
"alternating": "alternating",
"alternating_phase": "alternating\nphase",
"n_chase": "n chase",
"rainbow": "rainbow",
"flicker": "flicker",
"radiate": "radiate",
}
current_pattern = self.midi_controller.current_pattern
for idx, lbl in enumerate(self.button1_cells):
pattern_name = bank1_patterns[idx]
is_selected = (current_pattern == pattern_name and pattern_name != "-")
display_name = display_names.get(pattern_name, pattern_name)
icon = icon_for.get(pattern_name, "")
text = f"{icon} {display_name}" if pattern_name != "-" else ""
if is_selected:
lbl.config(text=text, bg=highlight_pattern_color)
else:
lbl.config(text=text, bg=bg_color)
# Reschedule
self.root.after(200, self.update_status_labels)
def on_closing(self):
"""Handle application closing."""
logging.info("Closing UI client...")
if self.midi_controller.midi_task:
self.midi_controller.midi_task.cancel()
self.midi_controller.close()
asyncio.create_task(self.websocket_client.close())
self.root.destroy()
def run(self):
"""Run the UI client."""
async_mainloop(self.root)
if __name__ == "__main__":
app = UIClient()
app.run()

View File

@@ -1,116 +0,0 @@
#!/usr/bin/env python3
"""
Startup script for the separated lighting controller architecture.
Starts the control server, sound detector, and UI client.
"""
import subprocess
import sys
import time
import signal
import os
from pathlib import Path
def start_process(command, name, cwd=None):
"""Start a subprocess and return the process object."""
print(f"Starting {name}...")
try:
process = subprocess.Popen(
command,
shell=True,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if os.name != 'nt' else None
)
print(f"{name} started with PID {process.pid}")
return process
except Exception as e:
print(f"Failed to start {name}: {e}")
return None
def main():
"""Main startup function."""
print("Starting Lighting Controller (Separated Architecture)")
print("=" * 50)
# Get the project directory
project_dir = Path(__file__).parent
processes = []
try:
# Start control server
control_process = start_process(
"python src/control_server.py",
"Control Server",
cwd=project_dir
)
if control_process:
processes.append(("Control Server", control_process))
# Wait a moment for the control server to start
time.sleep(2)
# Start sound detector
sound_process = start_process(
"python src/sound.py",
"Sound Detector",
cwd=project_dir
)
if sound_process:
processes.append(("Sound Detector", sound_process))
# Wait a moment for the sound detector to start
time.sleep(1)
# Start UI client
ui_process = start_process(
"python src/ui_client.py",
"UI Client",
cwd=project_dir
)
if ui_process:
processes.append(("UI Client", ui_process))
print("\nAll components started successfully!")
print("Press Ctrl+C to stop all components...")
# Wait for processes
try:
while True:
time.sleep(1)
# Check if any process has died
for name, process in processes:
if process.poll() is not None:
print(f"Warning: {name} has stopped unexpectedly")
except KeyboardInterrupt:
print("\nShutting down all components...")
except Exception as e:
print(f"Error during startup: {e}")
finally:
# Clean up all processes
for name, process in processes:
if process and process.poll() is None:
print(f"Stopping {name}...")
try:
if os.name != 'nt':
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
else:
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
print(f"Force killing {name}...")
if os.name != 'nt':
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
else:
process.kill()
except Exception as e:
print(f"Error stopping {name}: {e}")
print("All components stopped.")
if __name__ == "__main__":
main()