lighting-controller/networking.py

54 lines
1.6 KiB
Python

import socket
import asyncio
import httpx
from websockets import connect
async def send_color_data(color_data, servers):
"""Send color data to all configured servers."""
tasks = [send_to_server(color_data, server) for server in servers]
await asyncio.gather(*tasks)
async def send_to_server(color_data, server):
"""Connect to a server, send color data, and close the connection."""
server_ip, server_port = server
try:
reader, writer = await asyncio.open_connection(server_ip, server_port)
writer.write(color_data)
await writer.drain()
writer.close()
await writer.wait_closed()
print(f"Sent data to {server_ip}:{server_port}")
except (socket.error, ConnectionError) as e:
print(f"Error sending to {server_ip}:{server_port}: {e}")
async def post(server, endpoint, data=""):
async with httpx.AsyncClient() as client:
r = await client.post(f"http://{server}/{endpoint}", data=data)
print(server, r)
async def main():
b = bytearray(180)
try:
ws1 = await connect("ws://10.1.1.200/external")
await asyncio.gather(post("10.1.1.200", "pattern", "external"))
while True:
print("sync")
for i in range(len(b)):
b[i] = 128
await asyncio.gather(ws1.send(b))
await asyncio.sleep(5)
finally:
await ws1.close()
await asyncio.gather(post("10.1.1.200", "pattern", "off"))
if __name__ == "__main__":
asyncio.run(main())