Add P2P communication module
- Implement ESP-NOW async communication - Support sending string, dict, or bytes data - Use asend for async broadcast messaging
This commit is contained in:
39
src/p2p.py
Normal file
39
src/p2p.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import network
|
||||
import aioespnow
|
||||
import asyncio
|
||||
import json
|
||||
from time import sleep
|
||||
|
||||
|
||||
class P2P:
|
||||
def __init__(self):
|
||||
network.WLAN(network.STA_IF).active(True)
|
||||
self.broadcast = bytes.fromhex("ffffffffffff")
|
||||
self.e = aioespnow.AIOESPNow()
|
||||
self.e.active(True)
|
||||
try:
|
||||
self.e.add_peer(self.broadcast)
|
||||
except:
|
||||
pass
|
||||
|
||||
async def send(self, data):
|
||||
# Convert data to bytes if it's a string or dict
|
||||
if isinstance(data, str):
|
||||
payload = data.encode()
|
||||
elif isinstance(data, dict):
|
||||
payload = json.dumps(data).encode()
|
||||
else:
|
||||
payload = data # Assume it's already bytes
|
||||
|
||||
# Use asend for async sending - returns boolean indicating success
|
||||
result = await self.e.asend(self.broadcast, payload)
|
||||
return result
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
p = P2P()
|
||||
await p.send(json.dumps({"dj": {"p": "on", "colors": ["#ff0000"]}}))
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user