diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3d3a20 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +settings.json \ No newline at end of file diff --git a/dev.py b/dev.py new file mode 100755 index 0000000..7b3ab91 --- /dev/null +++ b/dev.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +import subprocess +import serial +import sys + +print(sys.argv) + +port = sys.argv[1] + +cmd = sys.argv[1] + +for cmd in sys.argv[1:]: + print(cmd) + match cmd: + case "src": + subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", ".", ":" ], cwd="src") + case "lib": + subprocess.call(["mpremote", "connect", port, "fs", "cp", "-r", "lib", ":" ]) + case "ls": + subprocess.call(["mpremote", "connect", port, "fs", "ls", ":" ]) + case "reset": + with serial.Serial(port, baudrate=115200) as ser: + ser.write(b'\x03\x03\x04') + case "follow": + with serial.Serial(port, baudrate=115200) as ser: + while True: + if ser.in_waiting > 0: # Check if there is data in the buffer + data = ser.readline().decode('utf-8').strip() # Read and decode the data + print(data) + + + diff --git a/lib/microdot/__init__.py b/lib/microdot/__init__.py new file mode 100644 index 0000000..68cb381 --- /dev/null +++ b/lib/microdot/__init__.py @@ -0,0 +1,2 @@ +from microdot.microdot import Microdot, Request, Response, abort, redirect, \ + send_file # noqa: F401 \ No newline at end of file diff --git a/lib/microdot/helpers.py b/lib/microdot/helpers.py new file mode 100644 index 0000000..664e58c --- /dev/null +++ b/lib/microdot/helpers.py @@ -0,0 +1,8 @@ +try: + from functools import wraps +except ImportError: # pragma: no cover + # MicroPython does not currently implement functools.wraps + def wraps(wrapped): + def _(wrapper): + return wrapper + return _ diff --git a/lib/microdot/microdot.py b/lib/microdot/microdot.py new file mode 100644 index 0000000..0513f21 --- /dev/null +++ b/lib/microdot/microdot.py @@ -0,0 +1,1450 @@ +""" +microdot +-------- + +The ``microdot`` module defines a few classes that help implement HTTP-based +servers for MicroPython and standard Python. +""" +import asyncio +import io +import json +import time + +try: + from inspect import iscoroutinefunction, iscoroutine + from functools import partial + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in a thread pool executor. + """ + if iscoroutinefunction(handler): + ret = await handler(*args, **kwargs) + else: + ret = await asyncio.get_running_loop().run_in_executor( + None, partial(handler, *args, **kwargs)) + return ret +except ImportError: # pragma: no cover + def iscoroutine(coro): + return hasattr(coro, 'send') and hasattr(coro, 'throw') + + async def invoke_handler(handler, *args, **kwargs): + """Invoke a handler and return the result. + + This method runs sync handlers in the asyncio thread, which can + potentially cause blocking and performance issues. + """ + ret = handler(*args, **kwargs) + if iscoroutine(ret): + ret = await ret + return ret + +try: + from sys import print_exception +except ImportError: # pragma: no cover + import traceback + + def print_exception(exc): + traceback.print_exc() + +MUTED_SOCKET_ERRORS = [ + 32, # Broken pipe + 54, # Connection reset by peer + 104, # Connection reset by peer + 128, # Operation on closed socket +] + + +def urldecode_str(s): + s = s.replace('+', ' ') + parts = s.split('%') + if len(parts) == 1: + return s + result = [parts[0]] + for item in parts[1:]: + if item == '': + result.append('%') + else: + code = item[:2] + result.append(chr(int(code, 16))) + result.append(item[2:]) + return ''.join(result) + + +def urldecode_bytes(s): + s = s.replace(b'+', b' ') + parts = s.split(b'%') + if len(parts) == 1: + return s.decode() + result = [parts[0]] + for item in parts[1:]: + if item == b'': + result.append(b'%') + else: + code = item[:2] + result.append(bytes([int(code, 16)])) + result.append(item[2:]) + return b''.join(result).decode() + + +def urlencode(s): + return s.replace('+', '%2B').replace(' ', '+').replace( + '%', '%25').replace('?', '%3F').replace('#', '%23').replace( + '&', '%26').replace('=', '%3D') + + +class NoCaseDict(dict): + """A subclass of dictionary that holds case-insensitive keys. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = NoCaseDict() + >>> d['Content-Type'] = 'text/html' + >>> print(d['Content-Type']) + text/html + >>> print(d['content-type']) + text/html + >>> print(d['CONTENT-TYPE']) + text/html + >>> del d['cOnTeNt-TyPe'] + >>> print(d) + {} + """ + def __init__(self, initial_dict=None): + super().__init__(initial_dict or {}) + self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k} + + def __setitem__(self, key, value): + kl = key.lower() + key = self.keymap.get(kl, key) + if kl != key: + self.keymap[kl] = key + super().__setitem__(key, value) + + def __getitem__(self, key): + kl = key.lower() + return super().__getitem__(self.keymap.get(kl, kl)) + + def __delitem__(self, key): + kl = key.lower() + super().__delitem__(self.keymap.get(kl, kl)) + + def __contains__(self, key): + kl = key.lower() + return self.keymap.get(kl, kl) in self.keys() + + def get(self, key, default=None): + kl = key.lower() + return super().get(self.keymap.get(kl, kl), default) + + def update(self, other_dict): + for key, value in other_dict.items(): + self[key] = value + + +def mro(cls): # pragma: no cover + """Return the method resolution order of a class. + + This is a helper function that returns the method resolution order of a + class. It is used by Microdot to find the best error handler to invoke for + the raised exception. + + In CPython, this function returns the ``__mro__`` attribute of the class. + In MicroPython, this function implements a recursive depth-first scanning + of the class hierarchy. + """ + if hasattr(cls, 'mro'): + return cls.__mro__ + + def _mro(cls): + m = [cls] + for base in cls.__bases__: + m += _mro(base) + return m + + mro_list = _mro(cls) + + # If a class appears multiple times (due to multiple inheritance) remove + # all but the last occurence. This matches the method resolution order + # of MicroPython, but not CPython. + mro_pruned = [] + for i in range(len(mro_list)): + base = mro_list.pop(0) + if base not in mro_list: + mro_pruned.append(base) + return mro_pruned + + +class MultiDict(dict): + """A subclass of dictionary that can hold multiple values for the same + key. It is used to hold key/value pairs decoded from query strings and + form submissions. + + :param initial_dict: an initial dictionary of key/value pairs to + initialize this object with. + + Example:: + + >>> d = MultiDict() + >>> d['sort'] = 'name' + >>> d['sort'] = 'email' + >>> print(d['sort']) + 'name' + >>> print(d.getlist('sort')) + ['name', 'email'] + """ + def __init__(self, initial_dict=None): + super().__init__() + if initial_dict: + for key, value in initial_dict.items(): + self[key] = value + + def __setitem__(self, key, value): + if key not in self: + super().__setitem__(key, []) + super().__getitem__(key).append(value) + + def __getitem__(self, key): + return super().__getitem__(key)[0] + + def get(self, key, default=None, type=None): + """Return the value for a given key. + + :param key: The key to retrieve. + :param default: A default value to use if the key does not exist. + :param type: A type conversion callable to apply to the value. + + If the multidict contains more than one value for the requested key, + this method returns the first value only. + + Example:: + + >>> d = MultiDict() + >>> d['age'] = '42' + >>> d.get('age') + '42' + >>> d.get('age', type=int) + 42 + >>> d.get('name', default='noname') + 'noname' + """ + if key not in self: + return default + value = self[key] + if type is not None: + value = type(value) + return value + + def getlist(self, key, type=None): + """Return all the values for a given key. + + :param key: The key to retrieve. + :param type: A type conversion callable to apply to the values. + + If the requested key does not exist in the dictionary, this method + returns an empty list. + + Example:: + + >>> d = MultiDict() + >>> d.getlist('items') + [] + >>> d['items'] = '3' + >>> d.getlist('items') + ['3'] + >>> d['items'] = '56' + >>> d.getlist('items') + ['3', '56'] + >>> d.getlist('items', type=int) + [3, 56] + """ + if key not in self: + return [] + values = super().__getitem__(key) + if type is not None: + values = [type(value) for value in values] + return values + + +class AsyncBytesIO: + """An async wrapper for BytesIO.""" + def __init__(self, data): + self.stream = io.BytesIO(data) + + async def read(self, n=-1): + return self.stream.read(n) + + async def readline(self): # pragma: no cover + return self.stream.readline() + + async def readexactly(self, n): # pragma: no cover + return self.stream.read(n) + + async def readuntil(self, separator=b'\n'): # pragma: no cover + return self.stream.readuntil(separator=separator) + + async def awrite(self, data): # pragma: no cover + return self.stream.write(data) + + async def aclose(self): # pragma: no cover + pass + + +class Request: + """An HTTP request.""" + #: Specify the maximum payload size that is accepted. Requests with larger + #: payloads will be rejected with a 413 status code. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed + max_content_length = 16 * 1024 + + #: Specify the maximum payload size that can be stored in ``body``. + #: Requests with payloads that are larger than this size and up to + #: ``max_content_length`` bytes will be accepted, but the application will + #: only be able to access the body of the request by reading from + #: ``stream``. Set to 0 if you always access the body as a stream. + #: + #: Example:: + #: + #: Request.max_body_length = 4 * 1024 # up to 4KB bodies read + max_body_length = 16 * 1024 + + #: Specify the maximum length allowed for a line in the request. Requests + #: with longer lines will not be correctly interpreted. Applications can + #: change this maximum as necessary. + #: + #: Example:: + #: + #: Request.max_readline = 16 * 1024 # 16KB lines allowed + max_readline = 2 * 1024 + + class G: + pass + + def __init__(self, app, client_addr, method, url, http_version, headers, + body=None, stream=None, sock=None): + #: The application instance to which this request belongs. + self.app = app + #: The address of the client, as a tuple (host, port). + self.client_addr = client_addr + #: The HTTP method of the request. + self.method = method + #: The request URL, including the path and query string. + self.url = url + #: The path portion of the URL. + self.path = url + #: The query string portion of the URL. + self.query_string = None + #: The parsed query string, as a + #: :class:`MultiDict <microdot.MultiDict>` object. + self.args = {} + #: A dictionary with the headers included in the request. + self.headers = headers + #: A dictionary with the cookies included in the request. + self.cookies = {} + #: The parsed ``Content-Length`` header. + self.content_length = 0 + #: The parsed ``Content-Type`` header. + self.content_type = None + #: A general purpose container for applications to store data during + #: the life of the request. + self.g = Request.G() + + self.http_version = http_version + if '?' in self.path: + self.path, self.query_string = self.path.split('?', 1) + self.args = self._parse_urlencoded(self.query_string) + + if 'Content-Length' in self.headers: + self.content_length = int(self.headers['Content-Length']) + if 'Content-Type' in self.headers: + self.content_type = self.headers['Content-Type'] + if 'Cookie' in self.headers: + for cookie in self.headers['Cookie'].split(';'): + name, value = cookie.strip().split('=', 1) + self.cookies[name] = value + + self._body = body + self.body_used = False + self._stream = stream + self.sock = sock + self._json = None + self._form = None + self.after_request_handlers = [] + + @staticmethod + async def create(app, client_reader, client_writer, client_addr): + """Create a request object. + + :param app: The Microdot application instance. + :param client_reader: An input stream from where the request data can + be read. + :param client_writer: An output stream where the response data can be + written. + :param client_addr: The address of the client, as a tuple. + + This method is a coroutine. It returns a newly created ``Request`` + object. + """ + # request line + line = (await Request._safe_readline(client_reader)).strip().decode() + if not line: # pragma: no cover + return None + method, url, http_version = line.split() + http_version = http_version.split('/', 1)[1] + + # headers + headers = NoCaseDict() + content_length = 0 + while True: + line = (await Request._safe_readline( + client_reader)).strip().decode() + if line == '': + break + header, value = line.split(':', 1) + value = value.strip() + headers[header] = value + if header.lower() == 'content-length': + content_length = int(value) + + # body + body = b'' + if content_length and content_length <= Request.max_body_length: + body = await client_reader.readexactly(content_length) + stream = None + else: + body = b'' + stream = client_reader + + return Request(app, client_addr, method, url, http_version, headers, + body=body, stream=stream, + sock=(client_reader, client_writer)) + + def _parse_urlencoded(self, urlencoded): + data = MultiDict() + if len(urlencoded) > 0: # pragma: no branch + if isinstance(urlencoded, str): + for kv in [pair.split('=', 1) + for pair in urlencoded.split('&') if pair]: + data[urldecode_str(kv[0])] = urldecode_str(kv[1]) \ + if len(kv) > 1 else '' + elif isinstance(urlencoded, bytes): # pragma: no branch + for kv in [pair.split(b'=', 1) + for pair in urlencoded.split(b'&') if pair]: + data[urldecode_bytes(kv[0])] = urldecode_bytes(kv[1]) \ + if len(kv) > 1 else b'' + return data + + @property + def body(self): + """The body of the request, as bytes.""" + return self._body + + @property + def stream(self): + """The body of the request, as a bytes stream.""" + if self._stream is None: + self._stream = AsyncBytesIO(self._body) + return self._stream + + @property + def json(self): + """The parsed JSON body, or ``None`` if the request does not have a + JSON body.""" + if self._json is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/json': + return None + self._json = json.loads(self.body.decode()) + return self._json + + @property + def form(self): + """The parsed form submission body, as a + :class:`MultiDict <microdot.MultiDict>` object, or ``None`` if the + request does not have a form submission.""" + if self._form is None: + if self.content_type is None: + return None + mime_type = self.content_type.split(';')[0] + if mime_type != 'application/x-www-form-urlencoded': + return None + self._form = self._parse_urlencoded(self.body) + return self._form + + def after_request(self, f): + """Register a request-specific function to run after the request is + handled. Request-specific after request handlers run at the very end, + after the application's own after request handlers. The function must + take two arguments, the request and response objects. The return value + of the function must be the updated response object. + + Example:: + + @app.route('/') + def index(request): + # register a request-specific after request handler + @req.after_request + def func(request, response): + # ... + return response + + return 'Hello, World!' + + Note that the function is not called if the request handler raises an + exception and an error response is returned instead. + """ + self.after_request_handlers.append(f) + return f + + @staticmethod + async def _safe_readline(stream): + line = (await stream.readline()) + if len(line) > Request.max_readline: + raise ValueError('line too long') + return line + + +class Response: + """An HTTP response class. + + :param body: The body of the response. If a dictionary or list is given, + a JSON formatter is used to generate the body. If a file-like + object or an async generator is given, a streaming response is + used. If a string is given, it is encoded from UTF-8. Else, + the body should be a byte sequence. + :param status_code: The numeric HTTP status code of the response. The + default is 200. + :param headers: A dictionary of headers to include in the response. + :param reason: A custom reason phrase to add after the status code. The + default is "OK" for responses with a 200 status code and + "N/A" for any other status codes. + """ + types_map = { + 'css': 'text/css', + 'gif': 'image/gif', + 'html': 'text/html', + 'jpg': 'image/jpeg', + 'js': 'application/javascript', + 'json': 'application/json', + 'png': 'image/png', + 'txt': 'text/plain', + } + + send_file_buffer_size = 1024 + + #: The content type to use for responses that do not explicitly define a + #: ``Content-Type`` header. + default_content_type = 'text/plain' + + #: The default cache control max age used by :meth:`send_file`. A value + #: of ``None`` means that no ``Cache-Control`` header is added. + default_send_file_max_age = None + + #: Special response used to signal that a response does not need to be + #: written to the client. Used to exit WebSocket connections cleanly. + already_handled = None + + def __init__(self, body='', status_code=200, headers=None, reason=None): + if body is None and status_code == 200: + body = '' + status_code = 204 + self.status_code = status_code + self.headers = NoCaseDict(headers or {}) + self.reason = reason + if isinstance(body, (dict, list)): + self.body = json.dumps(body).encode() + self.headers['Content-Type'] = 'application/json; charset=UTF-8' + elif isinstance(body, str): + self.body = body.encode() + else: + # this applies to bytes, file-like objects or generators + self.body = body + self.is_head = False + + def set_cookie(self, cookie, value, path=None, domain=None, expires=None, + max_age=None, secure=False, http_only=False, + partitioned=False): + """Add a cookie to the response. + + :param cookie: The cookie's name. + :param value: The cookie's value. + :param path: The cookie's path. + :param domain: The cookie's domain. + :param expires: The cookie expiration time, as a ``datetime`` object + or a correctly formatted string. + :param max_age: The cookie's ``Max-Age`` value. + :param secure: The cookie's ``secure`` flag. + :param http_only: The cookie's ``HttpOnly`` flag. + :param partitioned: Whether the cookie is partitioned. + """ + http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value) + if path: + http_cookie += '; Path=' + path + if domain: + http_cookie += '; Domain=' + domain + if expires: + if isinstance(expires, str): + http_cookie += '; Expires=' + expires + else: # pragma: no cover + http_cookie += '; Expires=' + time.strftime( + '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) + if max_age is not None: + http_cookie += '; Max-Age=' + str(max_age) + if secure: + http_cookie += '; Secure' + if http_only: + http_cookie += '; HttpOnly' + if partitioned: + http_cookie += '; Partitioned' + if 'Set-Cookie' in self.headers: + self.headers['Set-Cookie'].append(http_cookie) + else: + self.headers['Set-Cookie'] = [http_cookie] + + def delete_cookie(self, cookie, **kwargs): + """Delete a cookie. + + :param cookie: The cookie's name. + :param kwargs: Any cookie opens and flags supported by + ``set_cookie()`` except ``expires`` and ``max_age``. + """ + self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', + max_age=0, **kwargs) + + def complete(self): + if isinstance(self.body, bytes) and \ + 'Content-Length' not in self.headers: + self.headers['Content-Length'] = str(len(self.body)) + if 'Content-Type' not in self.headers: + self.headers['Content-Type'] = self.default_content_type + if 'charset=' not in self.headers['Content-Type']: + self.headers['Content-Type'] += '; charset=UTF-8' + + async def write(self, stream): + self.complete() + + try: + # status code + reason = self.reason if self.reason is not None else \ + ('OK' if self.status_code == 200 else 'N/A') + await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format( + status_code=self.status_code, reason=reason).encode()) + + # headers + for header, value in self.headers.items(): + values = value if isinstance(value, list) else [value] + for value in values: + await stream.awrite('{header}: {value}\r\n'.format( + header=header, value=value).encode()) + await stream.awrite(b'\r\n') + + # body + if not self.is_head: + iter = self.body_iter() + async for body in iter: + if isinstance(body, str): # pragma: no cover + body = body.encode() + try: + await stream.awrite(body) + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + if hasattr(iter, 'aclose'): + await iter.aclose() + raise + if hasattr(iter, 'aclose'): # pragma: no branch + await iter.aclose() + + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS or \ + exc.args[0] == 'Connection lost': + pass + else: + raise + + def body_iter(self): + if hasattr(self.body, '__anext__'): + # response body is an async generator + return self.body + + response = self + + class iter: + ITER_UNKNOWN = 0 + ITER_SYNC_GEN = 1 + ITER_FILE_OBJ = 2 + ITER_NO_BODY = -1 + + def __aiter__(self): + if response.body: + self.i = self.ITER_UNKNOWN # need to determine type + else: + self.i = self.ITER_NO_BODY + return self + + async def __anext__(self): + if self.i == self.ITER_NO_BODY: + await self.aclose() + raise StopAsyncIteration + if self.i == self.ITER_UNKNOWN: + if hasattr(response.body, 'read'): + self.i = self.ITER_FILE_OBJ + elif hasattr(response.body, '__next__'): + self.i = self.ITER_SYNC_GEN + return next(response.body) + else: + self.i = self.ITER_NO_BODY + return response.body + elif self.i == self.ITER_SYNC_GEN: + try: + return next(response.body) + except StopIteration: + await self.aclose() + raise StopAsyncIteration + buf = response.body.read(response.send_file_buffer_size) + if iscoroutine(buf): # pragma: no cover + buf = await buf + if len(buf) < response.send_file_buffer_size: + self.i = self.ITER_NO_BODY + return buf + + async def aclose(self): + if hasattr(response.body, 'close'): + result = response.body.close() + if iscoroutine(result): # pragma: no cover + await result + + return iter() + + @classmethod + def redirect(cls, location, status_code=302): + """Return a redirect response. + + :param location: The URL to redirect to. + :param status_code: The 3xx status code to use for the redirect. The + default is 302. + """ + if '\x0d' in location or '\x0a' in location: + raise ValueError('invalid redirect URL') + return cls(status_code=status_code, headers={'Location': location}) + + @classmethod + def send_file(cls, filename, status_code=200, content_type=None, + stream=None, max_age=None, compressed=False, + file_extension=''): + """Send file contents in a response. + + :param filename: The filename of the file. + :param status_code: The 3xx status code to use for the redirect. The + default is 302. + :param content_type: The ``Content-Type`` header to use in the + response. If omitted, it is generated + automatically from the file extension of the + ``filename`` parameter. + :param stream: A file-like object to read the file contents from. If + a stream is given, the ``filename`` parameter is only + used when generating the ``Content-Type`` header. + :param max_age: The ``Cache-Control`` header's ``max-age`` value in + seconds. If omitted, the value of the + :attr:`Response.default_send_file_max_age` attribute is + used. + :param compressed: Whether the file is compressed. If ``True``, the + ``Content-Encoding`` header is set to ``gzip``. A + string with the header value can also be passed. + Note that when using this option the file must have + been compressed beforehand. This option only sets + the header. + :param file_extension: A file extension to append to the ``filename`` + parameter when opening the file, including the + dot. The extension given here is not considered + when generating the ``Content-Type`` header. + + Security note: The filename is assumed to be trusted. Never pass + filenames provided by the user without validating and sanitizing them + first. + """ + if content_type is None: + if compressed and filename.endswith('.gz'): + ext = filename[:-3].split('.')[-1] + else: + ext = filename.split('.')[-1] + if ext in Response.types_map: + content_type = Response.types_map[ext] + else: + content_type = 'application/octet-stream' + headers = {'Content-Type': content_type} + + if max_age is None: + max_age = cls.default_send_file_max_age + if max_age is not None: + headers['Cache-Control'] = 'max-age={}'.format(max_age) + + if compressed: + headers['Content-Encoding'] = compressed \ + if isinstance(compressed, str) else 'gzip' + + f = stream or open(filename + file_extension, 'rb') + return cls(body=f, status_code=status_code, headers=headers) + + +class URLPattern(): + def __init__(self, url_pattern): + self.url_pattern = url_pattern + self.segments = [] + self.regex = None + pattern = '' + use_regex = False + for segment in url_pattern.lstrip('/').split('/'): + if segment and segment[0] == '<': + if segment[-1] != '>': + raise ValueError('invalid URL pattern') + segment = segment[1:-1] + if ':' in segment: + type_, name = segment.rsplit(':', 1) + else: + type_ = 'string' + name = segment + parser = None + if type_ == 'string': + parser = self._string_segment + pattern += '/([^/]+)' + elif type_ == 'int': + parser = self._int_segment + pattern += '/(-?\\d+)' + elif type_ == 'path': + use_regex = True + pattern += '/(.+)' + elif type_.startswith('re:'): + use_regex = True + pattern += '/({pattern})'.format(pattern=type_[3:]) + else: + raise ValueError('invalid URL segment type') + self.segments.append({'parser': parser, 'name': name, + 'type': type_}) + else: + pattern += '/' + segment + self.segments.append({'parser': self._static_segment(segment)}) + if use_regex: + import re + self.regex = re.compile('^' + pattern + '$') + + def match(self, path): + args = {} + if self.regex: + g = self.regex.match(path) + if not g: + return + i = 1 + for segment in self.segments: + if 'name' not in segment: + continue + value = g.group(i) + if segment['type'] == 'int': + value = int(value) + args[segment['name']] = value + i += 1 + else: + if len(path) == 0 or path[0] != '/': + return + path = path[1:] + args = {} + for segment in self.segments: + if path is None: + return + arg, path = segment['parser'](path) + if arg is None: + return + if 'name' in segment: + args[segment['name']] = arg + if path is not None: + return + return args + + def _static_segment(self, segment): + def _static(value): + s = value.split('/', 1) + if s[0] == segment: + return '', s[1] if len(s) > 1 else None + return None, None + return _static + + def _string_segment(self, value): + s = value.split('/', 1) + if len(s[0]) == 0: + return None, None + return s[0], s[1] if len(s) > 1 else None + + def _int_segment(self, value): + s = value.split('/', 1) + try: + return int(s[0]), s[1] if len(s) > 1 else None + except ValueError: + return None, None + + +class HTTPException(Exception): + def __init__(self, status_code, reason=None): + self.status_code = status_code + self.reason = reason or str(status_code) + ' error' + + def __repr__(self): # pragma: no cover + return 'HTTPException: {}'.format(self.status_code) + + +class Microdot: + """An HTTP application class. + + This class implements an HTTP application instance and is heavily + influenced by the ``Flask`` class of the Flask framework. It is typically + declared near the start of the main application script. + + Example:: + + from microdot import Microdot + + app = Microdot() + """ + + def __init__(self): + self.url_map = [] + self.before_request_handlers = [] + self.after_request_handlers = [] + self.after_error_request_handlers = [] + self.error_handlers = {} + self.shutdown_requested = False + self.options_handler = self.default_options_handler + self.debug = False + self.server = None + + def route(self, url_pattern, methods=None): + """Decorator that is used to register a function as a request handler + for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + :param methods: The list of HTTP methods to be handled by the + decorated function. If omitted, only ``GET`` requests + are handled. + + The URL pattern can be a static path (for example, ``/users`` or + ``/api/invoices/search``) or a path with dynamic components enclosed + in ``<`` and ``>`` (for example, ``/users/<id>`` or + ``/invoices/<number>/products``). Dynamic path components can also + include a type prefix, separated from the name with a colon (for + example, ``/users/<int:id>``). The type can be ``string`` (the + default), ``int``, ``path`` or ``re:[regular-expression]``. + + The first argument of the decorated function must be + the request object. Any path arguments that are specified in the URL + pattern are passed as keyword arguments. The return value of the + function must be a :class:`Response` instance, or the arguments to + be passed to this class. + + Example:: + + @app.route('/') + def index(request): + return 'Hello, world!' + """ + def decorated(f): + self.url_map.append( + ([m.upper() for m in (methods or ['GET'])], + URLPattern(url_pattern), f)) + return f + return decorated + + def get(self, url_pattern): + """Decorator that is used to register a function as a ``GET`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['GET']``. + + Example:: + + @app.get('/users/<int:id>') + def get_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['GET']) + + def post(self, url_pattern): + """Decorator that is used to register a function as a ``POST`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the``route`` decorator with + ``methods=['POST']``. + + Example:: + + @app.post('/users') + def create_user(request): + # ... + """ + return self.route(url_pattern, methods=['POST']) + + def put(self, url_pattern): + """Decorator that is used to register a function as a ``PUT`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PUT']``. + + Example:: + + @app.put('/users/<int:id>') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PUT']) + + def patch(self, url_pattern): + """Decorator that is used to register a function as a ``PATCH`` request + handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['PATCH']``. + + Example:: + + @app.patch('/users/<int:id>') + def edit_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['PATCH']) + + def delete(self, url_pattern): + """Decorator that is used to register a function as a ``DELETE`` + request handler for a given URL. + + :param url_pattern: The URL pattern that will be compared against + incoming requests. + + This decorator can be used as an alias to the ``route`` decorator with + ``methods=['DELETE']``. + + Example:: + + @app.delete('/users/<int:id>') + def delete_user(request, id): + # ... + """ + return self.route(url_pattern, methods=['DELETE']) + + def before_request(self, f): + """Decorator to register a function to run before each request is + handled. The decorated function must take a single argument, the + request object. + + Example:: + + @app.before_request + def func(request): + # ... + """ + self.before_request_handlers.append(f) + return f + + def after_request(self, f): + """Decorator to register a function to run after each request is + handled. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. + + Example:: + + @app.after_request + def func(request, response): + # ... + return response + """ + self.after_request_handlers.append(f) + return f + + def after_error_request(self, f): + """Decorator to register a function to run after an error response is + generated. The decorated function must take two arguments, the request + and response objects. The return value of the function must be an + updated response object. The handler is invoked for error responses + generated by Microdot, as well as those returned by application-defined + error handlers. + + Example:: + + @app.after_error_request + def func(request, response): + # ... + return response + """ + self.after_error_request_handlers.append(f) + return f + + def errorhandler(self, status_code_or_exception_class): + """Decorator to register a function as an error handler. Error handler + functions for numeric HTTP status codes must accept a single argument, + the request object. Error handler functions for Python exceptions + must accept two arguments, the request object and the exception + object. + + :param status_code_or_exception_class: The numeric HTTP status code or + Python exception class to + handle. + + Examples:: + + @app.errorhandler(404) + def not_found(request): + return 'Not found' + + @app.errorhandler(RuntimeError) + def runtime_error(request, exception): + return 'Runtime error' + """ + def decorated(f): + self.error_handlers[status_code_or_exception_class] = f + return f + return decorated + + def mount(self, subapp, url_prefix=''): + """Mount a sub-application, optionally under the given URL prefix. + + :param subapp: The sub-application to mount. + :param url_prefix: The URL prefix to mount the application under. + """ + for methods, pattern, handler in subapp.url_map: + self.url_map.append( + (methods, URLPattern(url_prefix + pattern.url_pattern), + handler)) + for handler in subapp.before_request_handlers: + self.before_request_handlers.append(handler) + for handler in subapp.after_request_handlers: + self.after_request_handlers.append(handler) + for handler in subapp.after_error_request_handlers: + self.after_error_request_handlers.append(handler) + for status_code, handler in subapp.error_handlers.items(): + self.error_handlers[status_code] = handler + + @staticmethod + def abort(status_code, reason=None): + """Abort the current request and return an error response with the + given status code. + + :param status_code: The numeric status code of the response. + :param reason: The reason for the response, which is included in the + response body. + + Example:: + + from microdot import abort + + @app.route('/users/<int:id>') + def get_user(id): + user = get_user_by_id(id) + if user is None: + abort(404) + return user.to_dict() + """ + raise HTTPException(status_code, reason) + + async def start_server(self, host='0.0.0.0', port=5000, debug=False, + ssl=None): + """Start the Microdot web server as a coroutine. This coroutine does + not normally return, as the server enters an endless listening loop. + The :func:`shutdown` function provides a method for terminating the + server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + This method is a coroutine. + + Example:: + + import asyncio + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + async def main(): + await app.start_server(debug=True) + + asyncio.run(main()) + """ + self.debug = debug + + async def serve(reader, writer): + if not hasattr(writer, 'awrite'): # pragma: no cover + # CPython provides the awrite and aclose methods in 3.8+ + async def awrite(self, data): + self.write(data) + await self.drain() + + async def aclose(self): + self.close() + await self.wait_closed() + + from types import MethodType + writer.awrite = MethodType(awrite, writer) + writer.aclose = MethodType(aclose, writer) + + await self.handle_request(reader, writer) + + if self.debug: # pragma: no cover + print('Starting async server on {host}:{port}...'.format( + host=host, port=port)) + + try: + self.server = await asyncio.start_server(serve, host, port, + ssl=ssl) + except TypeError: # pragma: no cover + self.server = await asyncio.start_server(serve, host, port) + + while True: + try: + if hasattr(self.server, 'serve_forever'): # pragma: no cover + try: + await self.server.serve_forever() + except asyncio.CancelledError: + pass + await self.server.wait_closed() + break + except AttributeError: # pragma: no cover + # the task hasn't been initialized in the server object yet + # wait a bit and try again + await asyncio.sleep(0.1) + + def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): + """Start the web server. This function does not normally return, as + the server enters an endless listening loop. The :func:`shutdown` + function provides a method for terminating the server gracefully. + + :param host: The hostname or IP address of the network interface that + will be listening for requests. A value of ``'0.0.0.0'`` + (the default) indicates that the server should listen for + requests on all the available interfaces, and a value of + ``127.0.0.1`` indicates that the server should listen + for requests only on the internal networking interface of + the host. + :param port: The port number to listen for requests. The default is + port 5000. + :param debug: If ``True``, the server logs debugging information. The + default is ``False``. + :param ssl: An ``SSLContext`` instance or ``None`` if the server should + not use TLS. The default is ``None``. + + Example:: + + from microdot import Microdot + + app = Microdot() + + @app.route('/') + async def index(request): + return 'Hello, world!' + + app.run(debug=True) + """ + asyncio.run(self.start_server(host=host, port=port, debug=debug, + ssl=ssl)) # pragma: no cover + + def shutdown(self): + """Request a server shutdown. The server will then exit its request + listening loop and the :func:`run` function will return. This function + can be safely called from a route handler, as it only schedules the + server to terminate as soon as the request completes. + + Example:: + + @app.route('/shutdown') + def shutdown(request): + request.app.shutdown() + return 'The server is shutting down...' + """ + self.server.close() + + def find_route(self, req): + method = req.method.upper() + if method == 'OPTIONS' and self.options_handler: + return self.options_handler(req) + if method == 'HEAD': + method = 'GET' + f = 404 + for route_methods, route_pattern, route_handler in self.url_map: + req.url_args = route_pattern.match(req.path) + if req.url_args is not None: + if method in route_methods: + f = route_handler + break + else: + f = 405 + return f + + def default_options_handler(self, req): + allow = [] + for route_methods, route_pattern, route_handler in self.url_map: + if route_pattern.match(req.path) is not None: + allow.extend(route_methods) + if 'GET' in allow: + allow.append('HEAD') + allow.append('OPTIONS') + return {'Allow': ', '.join(allow)} + + async def handle_request(self, reader, writer): + req = None + try: + req = await Request.create(self, reader, writer, + writer.get_extra_info('peername')) + except Exception as exc: # pragma: no cover + print_exception(exc) + + res = await self.dispatch_request(req) + if res != Response.already_handled: # pragma: no branch + await res.write(writer) + try: + await writer.aclose() + except OSError as exc: # pragma: no cover + if exc.errno in MUTED_SOCKET_ERRORS: + pass + else: + raise + if self.debug and req: # pragma: no cover + print('{method} {path} {status_code}'.format( + method=req.method, path=req.path, + status_code=res.status_code)) + + async def dispatch_request(self, req): + after_request_handled = False + if req: + if req.content_length > req.max_content_length: + if 413 in self.error_handlers: + res = await invoke_handler(self.error_handlers[413], req) + else: + res = 'Payload too large', 413 + else: + f = self.find_route(req) + try: + res = None + if callable(f): + for handler in self.before_request_handlers: + res = await invoke_handler(handler, req) + if res: + break + if res is None: + res = await invoke_handler( + f, req, **req.url_args) + if isinstance(res, int): + res = '', res + if isinstance(res, tuple): + if isinstance(res[0], int): + res = ('', res[0], + res[1] if len(res) > 1 else {}) + body = res[0] + if isinstance(res[1], int): + status_code = res[1] + headers = res[2] if len(res) > 2 else {} + else: + status_code = 200 + headers = res[1] + res = Response(body, status_code, headers) + elif not isinstance(res, Response): + res = Response(res) + for handler in self.after_request_handlers: + res = await invoke_handler( + handler, req, res) or res + for handler in req.after_request_handlers: + res = await invoke_handler( + handler, req, res) or res + after_request_handled = True + elif isinstance(f, dict): + res = Response(headers=f) + elif f in self.error_handlers: + res = await invoke_handler(self.error_handlers[f], req) + else: + res = 'Not found', f + except HTTPException as exc: + if exc.status_code in self.error_handlers: + res = self.error_handlers[exc.status_code](req) + else: + res = exc.reason, exc.status_code + except Exception as exc: + print_exception(exc) + exc_class = None + res = None + if exc.__class__ in self.error_handlers: + exc_class = exc.__class__ + else: + for c in mro(exc.__class__)[1:]: + if c in self.error_handlers: + exc_class = c + break + if exc_class: + try: + res = await invoke_handler( + self.error_handlers[exc_class], req, exc) + except Exception as exc2: # pragma: no cover + print_exception(exc2) + if res is None: + if 500 in self.error_handlers: + res = await invoke_handler( + self.error_handlers[500], req) + else: + res = 'Internal server error', 500 + else: + if 400 in self.error_handlers: + res = await invoke_handler(self.error_handlers[400], req) + else: + res = 'Bad request', 400 + if isinstance(res, tuple): + res = Response(*res) + elif not isinstance(res, Response): + res = Response(res) + if not after_request_handled: + for handler in self.after_error_request_handlers: + res = await invoke_handler( + handler, req, res) or res + res.is_head = (req and req.method == 'HEAD') + return res + + +Response.already_handled = Response() + +abort = Microdot.abort +redirect = Response.redirect +send_file = Response.send_file \ No newline at end of file diff --git a/lib/microdot/utemplate.py b/lib/microdot/utemplate.py new file mode 100644 index 0000000..16d0398 --- /dev/null +++ b/lib/microdot/utemplate.py @@ -0,0 +1,70 @@ +from utemplate import recompile + +_loader = None + + +class Template: + """A template object. + + :param template: The filename of the template to render, relative to the + configured template directory. + """ + @classmethod + def initialize(cls, template_dir='templates', + loader_class=recompile.Loader): + """Initialize the templating subsystem. + + :param template_dir: the directory where templates are stored. This + argument is optional. The default is to load + templates from a *templates* subdirectory. + :param loader_class: the ``utemplate.Loader`` class to use when loading + templates. This argument is optional. The default + is the ``recompile.Loader`` class, which + automatically recompiles templates when they + change. + """ + global _loader + _loader = loader_class(None, template_dir) + + def __init__(self, template): + if _loader is None: # pragma: no cover + self.initialize() + #: The name of the template + self.name = template + self.template = _loader.load(template) + + def generate(self, *args, **kwargs): + """Return a generator that renders the template in chunks, with the + given arguments.""" + return self.template(*args, **kwargs) + + def render(self, *args, **kwargs): + """Render the template with the given arguments and return it as a + string.""" + return ''.join(self.generate(*args, **kwargs)) + + def generate_async(self, *args, **kwargs): + """Return an asynchronous generator that renders the template in + chunks, using the given arguments.""" + class sync_to_async_iter(): + def __init__(self, iter): + self.iter = iter + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.iter) + except StopIteration: + raise StopAsyncIteration + + return sync_to_async_iter(self.generate(*args, **kwargs)) + + async def render_async(self, *args, **kwargs): + """Render the template with the given arguments asynchronously and + return it as a string.""" + response = '' + async for chunk in self.generate_async(*args, **kwargs): + response += chunk + return response diff --git a/lib/microdot/websocket.py b/lib/microdot/websocket.py new file mode 100644 index 0000000..0fb6f7c --- /dev/null +++ b/lib/microdot/websocket.py @@ -0,0 +1,231 @@ +import binascii +import hashlib +from microdot import Request, Response +from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception +from microdot.helpers import wraps + + +class WebSocketError(Exception): + """Exception raised when an error occurs in a WebSocket connection.""" + pass + + +class WebSocket: + """A WebSocket connection object. + + An instance of this class is sent to handler functions to manage the + WebSocket connection. + """ + CONT = 0 + TEXT = 1 + BINARY = 2 + CLOSE = 8 + PING = 9 + PONG = 10 + + #: Specify the maximum message size that can be received when calling the + #: ``receive()`` method. Messages with payloads that are larger than this + #: size will be rejected and the connection closed. Set to 0 to disable + #: the size check (be aware of potential security issues if you do this), + #: or to -1 to use the value set in + #: ``Request.max_body_length``. The default is -1. + #: + #: Example:: + #: + #: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages + max_message_length = -1 + + def __init__(self, request): + self.request = request + self.closed = False + + async def handshake(self): + response = self._handshake_response() + await self.request.sock[1].awrite( + b'HTTP/1.1 101 Switching Protocols\r\n') + await self.request.sock[1].awrite(b'Upgrade: websocket\r\n') + await self.request.sock[1].awrite(b'Connection: Upgrade\r\n') + await self.request.sock[1].awrite( + b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n') + + async def receive(self): + """Receive a message from the client.""" + while True: + opcode, payload = await self._read_frame() + send_opcode, data = self._process_websocket_frame(opcode, payload) + if send_opcode: # pragma: no cover + await self.send(data, send_opcode) + elif data: # pragma: no branch + return data + + async def send(self, data, opcode=None): + """Send a message to the client. + + :param data: the data to send, given as a string or bytes. + :param opcode: a custom frame opcode to use. If not given, the opcode + is ``TEXT`` or ``BINARY`` depending on the type of the + data. + """ + frame = self._encode_websocket_frame( + opcode or (self.TEXT if isinstance(data, str) else self.BINARY), + data) + await self.request.sock[1].awrite(frame) + + async def close(self): + """Close the websocket connection.""" + if not self.closed: # pragma: no cover + self.closed = True + await self.send(b'', self.CLOSE) + + def _handshake_response(self): + connection = False + upgrade = False + websocket_key = None + for header, value in self.request.headers.items(): + h = header.lower() + if h == 'connection': + connection = True + if 'upgrade' not in value.lower(): + return self.request.app.abort(400) + elif h == 'upgrade': + upgrade = True + if not value.lower() == 'websocket': + return self.request.app.abort(400) + elif h == 'sec-websocket-key': + websocket_key = value + if not connection or not upgrade or not websocket_key: + return self.request.app.abort(400) + d = hashlib.sha1(websocket_key.encode()) + d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11') + return binascii.b2a_base64(d.digest())[:-1] + + @classmethod + def _parse_frame_header(cls, header): + fin = header[0] & 0x80 + opcode = header[0] & 0x0f + if fin == 0 or opcode == cls.CONT: # pragma: no cover + raise WebSocketError('Continuation frames not supported') + has_mask = header[1] & 0x80 + length = header[1] & 0x7f + if length == 126: + length = -2 + elif length == 127: + length = -8 + return fin, opcode, has_mask, length + + def _process_websocket_frame(self, opcode, payload): + if opcode == self.TEXT: + payload = payload.decode() + elif opcode == self.BINARY: + pass + elif opcode == self.CLOSE: + raise WebSocketError('Websocket connection closed') + elif opcode == self.PING: + return self.PONG, payload + elif opcode == self.PONG: # pragma: no branch + return None, None + return None, payload + + @classmethod + def _encode_websocket_frame(cls, opcode, payload): + frame = bytearray() + frame.append(0x80 | opcode) + if opcode == cls.TEXT: + payload = payload.encode() + if len(payload) < 126: + frame.append(len(payload)) + elif len(payload) < (1 << 16): + frame.append(126) + frame.extend(len(payload).to_bytes(2, 'big')) + else: + frame.append(127) + frame.extend(len(payload).to_bytes(8, 'big')) + frame.extend(payload) + return frame + + async def _read_frame(self): + header = await self.request.sock[0].read(2) + if len(header) != 2: # pragma: no cover + raise WebSocketError('Websocket connection closed') + fin, opcode, has_mask, length = self._parse_frame_header(header) + if length == -2: + length = await self.request.sock[0].read(2) + length = int.from_bytes(length, 'big') + elif length == -8: + length = await self.request.sock[0].read(8) + length = int.from_bytes(length, 'big') + max_allowed_length = Request.max_body_length \ + if self.max_message_length == -1 else self.max_message_length + if length > max_allowed_length: + raise WebSocketError('Message too large') + if has_mask: # pragma: no cover + mask = await self.request.sock[0].read(4) + payload = await self.request.sock[0].read(length) + if has_mask: # pragma: no cover + payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload)) + return opcode, payload + + +async def websocket_upgrade(request): + """Upgrade a request handler to a websocket connection. + + This function can be called directly inside a route function to process a + WebSocket upgrade handshake, for example after the user's credentials are + verified. The function returns the websocket object:: + + @app.route('/echo') + async def echo(request): + if not authenticate_user(request): + abort(401) + ws = await websocket_upgrade(request) + while True: + message = await ws.receive() + await ws.send(message) + """ + ws = WebSocket(request) + await ws.handshake() + + @request.after_request + async def after_request(request, response): + return Response.already_handled + + return ws + + +def websocket_wrapper(f, upgrade_function): + @wraps(f) + async def wrapper(request, *args, **kwargs): + ws = await upgrade_function(request) + try: + await f(request, ws, *args, **kwargs) + except OSError as exc: + if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover + raise + except WebSocketError: + pass + except Exception as exc: + print_exception(exc) + finally: # pragma: no cover + try: + await ws.close() + except Exception: + pass + return Response.already_handled + return wrapper + + +def with_websocket(f): + """Decorator to make a route a WebSocket endpoint. + + This decorator is used to define a route that accepts websocket + connections. The route then receives a websocket object as a second + argument that it can use to send and receive messages:: + + @app.route('/echo') + @with_websocket + async def echo(request, ws): + while True: + message = await ws.receive() + await ws.send(message) + """ + return websocket_wrapper(f, websocket_upgrade) diff --git a/lib/utemplate/__init__.py b/lib/utemplate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/utemplate/compiled.py b/lib/utemplate/compiled.py new file mode 100644 index 0000000..006e6f5 --- /dev/null +++ b/lib/utemplate/compiled.py @@ -0,0 +1,14 @@ +class Loader: + + def __init__(self, pkg, dir): + if dir == ".": + dir = "" + else: + dir = dir.replace("/", ".") + "." + if pkg and pkg != "__main__": + dir = pkg + "." + dir + self.p = dir + + def load(self, name): + name = name.replace(".", "_") + return __import__(self.p + name, None, None, (name,)).render \ No newline at end of file diff --git a/lib/utemplate/recompile.py b/lib/utemplate/recompile.py new file mode 100644 index 0000000..b9bae4e --- /dev/null +++ b/lib/utemplate/recompile.py @@ -0,0 +1,21 @@ +# (c) 2014-2020 Paul Sokolovsky. MIT license. +try: + from uos import stat, remove +except: + from os import stat, remove +from . import source + + +class Loader(source.Loader): + + def load(self, name): + o_path = self.pkg_path + self.compiled_path(name) + i_path = self.pkg_path + self.dir + "/" + name + try: + o_stat = stat(o_path) + i_stat = stat(i_path) + if i_stat[8] > o_stat[8]: + # input file is newer, remove output to force recompile + remove(o_path) + finally: + return super().load(name) \ No newline at end of file diff --git a/lib/utemplate/source.py b/lib/utemplate/source.py new file mode 100644 index 0000000..0ff4651 --- /dev/null +++ b/lib/utemplate/source.py @@ -0,0 +1,188 @@ +# (c) 2014-2019 Paul Sokolovsky. MIT license. +from . import compiled + + +class Compiler: + + START_CHAR = "{" + STMNT = "%" + STMNT_END = "%}" + EXPR = "{" + EXPR_END = "}}" + + def __init__(self, file_in, file_out, indent=0, seq=0, loader=None): + self.file_in = file_in + self.file_out = file_out + self.loader = loader + self.seq = seq + self._indent = indent + self.stack = [] + self.in_literal = False + self.flushed_header = False + self.args = "*a, **d" + + def indent(self, adjust=0): + if not self.flushed_header: + self.flushed_header = True + self.indent() + self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args)) + self.stack.append("def") + self.file_out.write(" " * (len(self.stack) + self._indent + adjust)) + + def literal(self, s): + if not s: + return + if not self.in_literal: + self.indent() + self.file_out.write('yield """') + self.in_literal = True + self.file_out.write(s.replace('"', '\\"')) + + def close_literal(self): + if self.in_literal: + self.file_out.write('"""\n') + self.in_literal = False + + def render_expr(self, e): + self.indent() + self.file_out.write('yield str(' + e + ')\n') + + def parse_statement(self, stmt): + tokens = stmt.split(None, 1) + if tokens[0] == "args": + if len(tokens) > 1: + self.args = tokens[1] + else: + self.args = "" + elif tokens[0] == "set": + self.indent() + self.file_out.write(stmt[3:].strip() + "\n") + elif tokens[0] == "include": + if not self.flushed_header: + # If there was no other output, we still need a header now + self.indent() + tokens = tokens[1].split(None, 1) + args = "" + if len(tokens) > 1: + args = tokens[1] + if tokens[0][0] == "{": + self.indent() + # "1" as fromlist param is uPy hack + self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2]) + self.indent() + self.file_out.write("yield from _.render(%s)\n" % args) + return + + with self.loader.input_open(tokens[0][1:-1]) as inc: + self.seq += 1 + c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq) + inc_id = self.seq + self.seq = c.compile() + self.indent() + self.file_out.write("yield from render%d(%s)\n" % (inc_id, args)) + elif len(tokens) > 1: + if tokens[0] == "elif": + assert self.stack[-1] == "if" + self.indent(-1) + self.file_out.write(stmt + ":\n") + else: + self.indent() + self.file_out.write(stmt + ":\n") + self.stack.append(tokens[0]) + else: + if stmt.startswith("end"): + assert self.stack[-1] == stmt[3:] + self.stack.pop(-1) + elif stmt == "else": + assert self.stack[-1] == "if" + self.indent(-1) + self.file_out.write("else:\n") + else: + assert False + + def parse_line(self, l): + while l: + start = l.find(self.START_CHAR) + if start == -1: + self.literal(l) + return + self.literal(l[:start]) + self.close_literal() + sel = l[start + 1] + #print("*%s=%s=" % (sel, EXPR)) + if sel == self.STMNT: + end = l.find(self.STMNT_END) + assert end > 0 + stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip() + self.parse_statement(stmt) + end += len(self.STMNT_END) + l = l[end:] + if not self.in_literal and l == "\n": + break + elif sel == self.EXPR: + # print("EXPR") + end = l.find(self.EXPR_END) + assert end > 0 + expr = l[start + len(self.START_CHAR + self.EXPR):end].strip() + self.render_expr(expr) + end += len(self.EXPR_END) + l = l[end:] + else: + self.literal(l[start]) + l = l[start + 1:] + + def header(self): + self.file_out.write("# Autogenerated file\n") + + def compile(self): + self.header() + for l in self.file_in: + self.parse_line(l) + self.close_literal() + return self.seq + + +class Loader(compiled.Loader): + + def __init__(self, pkg, dir): + super().__init__(pkg, dir) + self.dir = dir + if pkg == "__main__": + # if pkg isn't really a package, don't bother to use it + # it means we're running from "filesystem directory", not + # from a package. + pkg = None + + self.pkg_path = "" + if pkg: + p = __import__(pkg) + if isinstance(p.__path__, str): + # uPy + self.pkg_path = p.__path__ + else: + # CPy + self.pkg_path = p.__path__[0] + self.pkg_path += "/" + + def input_open(self, template): + path = self.pkg_path + self.dir + "/" + template + return open(path) + + def compiled_path(self, template): + return self.dir + "/" + template.replace(".", "_") + ".py" + + def load(self, name): + try: + return super().load(name) + except (OSError, ImportError): + pass + + compiled_path = self.pkg_path + self.compiled_path(name) + + f_in = self.input_open(name) + f_out = open(compiled_path, "w") + c = Compiler(f_in, f_out, loader=self) + c.compile() + f_in.close() + f_out.close() + return super().load(name) \ No newline at end of file diff --git a/src/boot.py b/src/boot.py new file mode 100644 index 0000000..bd892c1 --- /dev/null +++ b/src/boot.py @@ -0,0 +1,19 @@ +import wifi +import time +from settings import Settings + +print(wifi.ap('qwerty')) + + +settings = Settings() +ssid = settings.get('wifi', {}).get('ssid', None) +password = settings.get('wifi', {}).get('password', None) +ip = settings.get('wifi', {}).get('ip', None) +gateway = settings.get('wifi', {}).get('gateway', None) + +for i in range(10): + config = wifi.connect(ssid, password, ip, gateway) + if config: + print(config) + break + time.sleep(0.1) \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..1386884 --- /dev/null +++ b/src/main.py @@ -0,0 +1,54 @@ +import asyncio +from settings import Settings +from web import web +from patterns import Patterns +import gc +import utime +import machine +import ntptime +import time +import wifi + + +async def main(): + + + + settings = Settings() + + patterns = Patterns(4, settings["num_leds"], selected=settings["selected_pattern"]) + patterns.set_color1(tuple(int(settings["color1"][i:i+2], 16) for i in (1, 5, 3))) + patterns.set_color2(tuple(int(settings["color2"][i:i+2], 16) for i in (1, 5, 3))) + patterns.set_brightness(int(settings["brightness"])) + patterns.set_delay(int(settings["delay"])) + + w = web(settings, patterns) + print(settings) + # start the server in a bacakground task + print("Starting") + server = asyncio.create_task(w.start_server(host="0.0.0.0", port=80)) + wdt = machine.WDT(timeout=10000) + + async def tick(): + while True: + patterns.tick() + await asyncio.sleep_ms(1) + + asyncio.create_task(tick()) + + first = True + + while True: + #print(time.localtime()) + + # gc.collect() + for i in range(60): + wdt.feed() + await asyncio.sleep_ms(500) + + # cleanup before ending the application + await server + +asyncio.run(main()) + + diff --git a/src/patterns.py b/src/patterns.py new file mode 100644 index 0000000..8e5fd5d --- /dev/null +++ b/src/patterns.py @@ -0,0 +1,292 @@ +from machine import Pin +from neopixel import NeoPixel +import utime +import random + +class Patterns: + def __init__(self, pin, num_leds, color1=(0,0,0), color2=(0,0,0), brightness=127, selected="rainbow_cycle", delay=100): + self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) + self.num_leds = num_leds + self.pattern_step = 0 + self.last_update = utime.ticks_ms() + self.delay = delay + self.brightness = brightness + self.patterns = { + "off": self.off, + "on" : self.on, + "color_wipe": self.color_wipe_step, + "rainbow_cycle": self.rainbow_cycle_step, + "theater_chase": self.theater_chase_step, + "blink": self.blink_step, + "random_color_wipe": self.random_color_wipe_step, + "random_rainbow_cycle": self.random_rainbow_cycle_step, + "random_theater_chase": self.random_theater_chase_step, + "random_blink": self.random_blink_step, + "color_transition": self.color_transition_step, + "2 step forward 1 step back": self.two_steps_forward_one_step_back_step, + "external": None + } + self.selected = selected + self.color1 = color1 + self.color2 = color2 + self.transition_duration = 50 # Duration of color transition in milliseconds + self.transition_step = 0 + + def sync(self): + self.pattern_step=0 + self.last_update = utime.ticks_ms() + + def tick(self): + if self.patterns[self.selected]: + self.patterns[self.selected]() + + def update_num_leds(self, pin, num_leds): + self.n = NeoPixel(Pin(pin, Pin.OUT), num_leds) + self.num_leds = num_leds + self.pattern_step = 0 + + def set_delay(self, delay): + self.delay = delay + + def set_brightness(self, brightness): + self.brightness = brightness + + def set_color1(self, color): + print(color) + self.color1 = self.apply_brightness(color) + + def set_color2(self, color): + self.color2 = self.apply_brightness(color) + + def apply_brightness(self, color): + return tuple(int(c * self.brightness / 255) for c in color) + + def select(self, pattern): + if pattern in self.patterns: + self.selected = pattern + return True + return False + + def set(self, i, color): + self.n[i] = color + + def write(self): + self.n.write() + + def fill(self): + for i in range(self.num_leds): + self.n[i] = self.color1 + self.n.write() + + def off(self): + color = self.color1 + self.color1 = (0,0,0) + self.fill() + self.color1 = color + + def on(self): + color = self.color1 + self.color1 = self.apply_brightness(self.color1) + self.fill() + self.color1 = color + + + def color_wipe_step(self): + color = self.apply_brightness(self.color1) + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + if self.pattern_step < self.num_leds: + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + self.n[self.pattern_step] = self.apply_brightness(color) + self.n.write() + self.pattern_step += 1 + else: + self.pattern_step = 0 + self.last_update = current_time + + def rainbow_cycle_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay/5: + def wheel(pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + for i in range(self.num_leds): + rc_index = (i * 256 // self.num_leds) + self.pattern_step + self.n[i] = self.apply_brightness(wheel(rc_index & 255)) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 256 + self.last_update = current_time + + def theater_chase_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + for i in range(self.num_leds): + if (i + self.pattern_step) % 3 == 0: + self.n[i] = self.apply_brightness(self.color1) + else: + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 3 + self.last_update = current_time + + def blink_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + if self.pattern_step % 2 == 0: + for i in range(self.num_leds): + self.n[i] = self.apply_brightness(self.color1) + else: + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 2 + self.last_update = current_time + + def random_color_wipe_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) + if self.pattern_step < self.num_leds: + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + self.n[self.pattern_step] = self.apply_brightness(color) + self.n.write() + self.pattern_step += 1 + else: + self.pattern_step = 0 + self.last_update = current_time + + def random_rainbow_cycle_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + def wheel(pos): + if pos < 85: + return (pos * 3, 255 - pos * 3, 0) + elif pos < 170: + pos -= 85 + return (255 - pos * 3, 0, pos * 3) + else: + pos -= 170 + return (0, pos * 3, 255 - pos * 3) + + random_offset = random.randint(0, 255) + for i in range(self.num_leds): + rc_index = (i * 256 // self.num_leds) + self.pattern_step + random_offset + self.n[i] = self.apply_brightness(wheel(rc_index & 255)) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 256 + self.last_update = current_time + + def random_theater_chase_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) + for i in range(self.num_leds): + if (i + self.pattern_step) % 3 == 0: + self.n[i] = self.apply_brightness(color) + else: + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 3 + self.last_update = current_time + + def random_blink_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) + if self.pattern_step % 2 == 0: + for i in range(self.num_leds): + self.n[i] = self.apply_brightness(color) + else: + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + self.n.write() + self.pattern_step = (self.pattern_step + 1) % 2 + self.last_update = current_time + + def color_transition_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + # Calculate transition factor based on elapsed time + transition_factor = (self.pattern_step * 100) / self.transition_duration + if transition_factor > 100: + transition_factor = 100 + color = self.interpolate_color(self.color1, self.color2, transition_factor / 100) + + # Apply the interpolated color to all LEDs + for i in range(self.num_leds): + self.n[i] = self.apply_brightness(color) + self.n.write() + + self.pattern_step += self.delay + if self.pattern_step > self.transition_duration: + self.pattern_step = 0 + + self.last_update = current_time + + def interpolate_color(self, color1, color2, factor): + return ( + int(color1[0] + (color2[0] - color1[0]) * factor), + int(color1[1] + (color2[1] - color1[1]) * factor), + int(color1[2] + (color2[2] - color1[2]) * factor) + ) + + def two_steps_forward_one_step_back_step(self): + current_time = utime.ticks_ms() + if utime.ticks_diff(current_time, self.last_update) >= self.delay: + # Move forward 2 steps and backward 1 step + if self.direction == 1: # Moving forward + if self.scanner_position < self.num_leds - 2: + self.scanner_position += 2 # Move forward 2 steps + else: + self.direction = -1 # Change direction to backward + else: # Moving backward + if self.scanner_position > 0: + self.scanner_position -= 1 # Move backward 1 step + else: + self.direction = 1 # Change direction to forward + + # Set all LEDs to off + for i in range(self.num_leds): + self.n[i] = (0, 0, 0) + + # Set the current position to the color + self.n[self.scanner_position] = self.apply_brightness(self.color1) + + # Apply the color transition + transition_factor = (self.pattern_step * 100) / self.transition_duration + if transition_factor > 100: + transition_factor = 100 + color = self.interpolate_color(self.color1, self.color2, transition_factor / 100) + self.n[self.scanner_position] = self.apply_brightness(color) + + self.n.write() + self.pattern_step += self.delay + if self.pattern_step > self.transition_duration: + self.pattern_step = 0 + + self.last_update = current_time + +if __name__ == "__main__": + p = Patterns(4, 180) + p.set_color1((255,0,0)) + p.set_color2((0,255,0)) + #p.set_delay(10) + try: + while True: + for key in p.patterns: + print(key) + p.select(key) + for _ in range(2000): + p.tick() + utime.sleep_ms(1) + except KeyboardInterrupt: + p.fill((0, 0, 0)) diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..7ad5c81 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,53 @@ +import json + +class Settings(dict): + SETTINGS_FILE = "/settings.json" + + def __init__(self): + super().__init__() + self.load() # Load settings from file during initialization + + def set_defaults(self): + self["num_leds"] = 50 + self["selected_pattern"] = "blink" + self["color1"] = "#000f00" + self["color2"] = "#0f0000" + self["delay"] = 100 + self["brightness"] = 100 + self["wifi"] = {"ssid": "", "password": ""} + + def save(self): + try: + j = json.dumps(self) + with open(self.SETTINGS_FILE, 'w') as file: + file.write(j) + print("Settings saved successfully.") + except Exception as e: + print(f"Error saving settings: {e}") + + def load(self): + try: + with open(self.SETTINGS_FILE, 'r') as file: + loaded_settings = json.load(file) + self.update(loaded_settings) + print("Settings loaded successfully.") + except Exception as e: + print(f"Error loading settings") + self.set_defaults() + +# Example usage +def main(): + settings = Settings() + print(f"Number of LEDs: {settings['num_leds']}") + settings['num_leds'] = 100 + print(f"Updated number of LEDs: {settings['num_leds']}") + settings.save() + + # Create a new Settings object to test loading + new_settings = Settings() + print(f"Loaded number of LEDs: {new_settings['num_leds']}") + print(settings) + +# Run the example +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/static/main.css b/src/static/main.css new file mode 100644 index 0000000..0c56bb7 --- /dev/null +++ b/src/static/main.css @@ -0,0 +1,75 @@ +body { + font-family: Arial, sans-serif; + max-width: 600px; + margin: 0 auto; + padding: 20px; + line-height: 1.6; + } + h1 { + text-align: center; + } + + form { + margin-bottom: 20px; + } + label { + display: block; + margin-bottom: 5px; + } + input[type="text"], input[type="submit"], input[type="range"], input[type="color"] { + width: 100%; + + margin-bottom: 10px; + box-sizing: border-box; + } + input[type="range"] { + -webkit-appearance: none; + appearance: none; + height: 25px; + background: #d3d3d3; + outline: none; + opacity: 0.7; + transition: opacity .2s; + } + input[type="range"]:hover { + opacity: 1; + } + input[type="range"]::-webkit-slider-thumb { + -webkit-appearance: none; + appearance: none; + width: 25px; + height: 25px; + background: #4CAF50; + cursor: pointer; + border-radius: 50%; + } + input[type="range"]::-moz-range-thumb { + width: 25px; + height: 25px; + background: #4CAF50; + cursor: pointer; + border-radius: 50%; + } + #pattern_buttons { + display: flex; + flex-wrap: wrap; + gap: 10px; + margin-bottom: 20px; + } + #pattern_buttons button { + flex: 1 0 calc(33.333% - 10px); + padding: 10px; + background-color: #4CAF50; + color: white; + border: none; + cursor: pointer; + transition: background-color 0.3s; + } + #pattern_buttons button:hover { + background-color: #45a049; + } + @media (max-width: 480px) { + #pattern_buttons button { + flex: 1 0 calc(50% - 10px); + } + } \ No newline at end of file diff --git a/src/static/main.js b/src/static/main.js new file mode 100644 index 0000000..4cdcc55 --- /dev/null +++ b/src/static/main.js @@ -0,0 +1,147 @@ +let delayTimeout; +let brightnessTimeout; +let colorTimeout; +let color2Timeout; + +async function post(path, data) { + console.log(`POST to ${path}`, data); + try { + const response = await fetch(path, { + method: "POST", + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify(data) // Convert data to JSON string + }); + if (!response.ok) { + throw new Error(`HTTP error! Status: ${response.status}`); + } + } catch (error) { + console.error('Error during POST request:', error); + } +} + +async function get(path) { + try { + const response = await fetch(path); + if (!response.ok) { + throw new Error(`HTTP error! Status: ${response.status}`); + } + return await response.json(); // Assuming you are expecting JSON response + } catch (error) { + console.error('Error during GET request:', error); + } +} + +async function updateColor(event) { + event.preventDefault(); + clearTimeout(colorTimeout); + colorTimeout = setTimeout(async function() { + const color = document.getElementById('color').value; + await post("/color", { color }); // Send as JSON + }, 500); +} + +async function updateColor2(event) { + event.preventDefault(); + clearTimeout(color2Timeout); + color2Timeout = setTimeout(async function() { + const color = document.getElementById('color2').value; + await post("/color2", { color }); // Send as JSON + }, 500); +} + +async function updatePattern(pattern) { + event.preventDefault(); + await post("/pattern", { pattern }); // Send as JSON +} + +async function updateBrightness(event) { + event.preventDefault(); + clearTimeout(brightnessTimeout); + brightnessTimeout = setTimeout(async function() { + const brightness = document.getElementById('brightness').value; + await post('/brightness', { brightness }); // Send as JSON + }, 500); +} + +async function updateDelay(event) { + event.preventDefault(); + clearTimeout(delayTimeout); + delayTimeout = setTimeout(async function() { + const delay = document.getElementById('delay').value; + await post('/delay', { delay }); // Send as JSON + }, 500); +} + +async function updateNumLeds(event) { + event.preventDefault(); + const numLeds = document.getElementById('num_leds').value; + await post('/num_leds', { num_leds: numLeds }); // Send as JSON +} + +async function updateWifi(event) { + event.preventDefault(); + const ssid = document.getElementById('ssid').value; + const password = document.getElementById('password').value; + const ip = document.getElementById('ip').value; + const gateway = document.getElementById('gateway').value; + + const wifiSettings = { ssid, password, ip, gateway }; // Create JSON object + console.log(wifiSettings); + const response = await post('/wifi_settings', wifiSettings); // Send as JSON + if (response === 500) { + alert("Failed to connect to Wi-Fi"); + } +} + +function createPatternButtons(patterns) { + const container = document.getElementById('pattern_buttons'); + container.innerHTML = ''; // Clear previous buttons + + patterns.forEach(pattern => { + const button = document.createElement('button'); + button.type = 'button'; // Use 'button' instead of 'submit' + button.textContent = pattern; + button.value = pattern; + button.addEventListener('click', async function(event) { + event.preventDefault(); + await updatePattern(pattern); + }); + container.appendChild(button); + }); +} + +document.addEventListener('DOMContentLoaded', async function() { + document.getElementById('color').addEventListener('input', updateColor); + document.getElementById('color2').addEventListener('input', updateColor2); + document.getElementById('delay').addEventListener('input', updateDelay); + document.getElementById('brightness').addEventListener('input', updateBrightness); + document.getElementById('num_leds_form').addEventListener('submit', updateNumLeds); + document.getElementById('wifi_form').addEventListener('submit', updateWifi); + document.getElementById('delay').addEventListener('touchend', updateDelay); + document.getElementById('brightness').addEventListener('touchend', updateBrightness); + + document.querySelectorAll(".pattern_button").forEach(button => { + console.log(button.value); + button.addEventListener('click', async event => { + event.preventDefault(); + await updatePattern(button.value); + }); + }); +}); + +// Function to toggle the display of the settings menu +function selectSettings() { + const settingsMenu = document.getElementById('settings_menu'); + controls = document.getElementById('controls'); + settingsMenu.style.display = 'block'; + controls.style.display = 'none'; +} + +function selectControls() { + const settingsMenu = document.getElementById('settings_menu'); + controls = document.getElementById('controls'); + settingsMenu.style.display = 'none'; + controls.style.display = 'block'; +} diff --git a/src/templates/index.html b/src/templates/index.html new file mode 100644 index 0000000..57b9eac --- /dev/null +++ b/src/templates/index.html @@ -0,0 +1,71 @@ +{% args settings, patterns %} +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>LED Control</title> + <script src="static/main.js"></script> + <link rel="stylesheet" href="static/main.css"> +</head> +<body> + <h1>Control LEDs</h1> + <button onclick="selectControls()">Controls</button> + <button onclick="selectSettings()">Settings</button> + + <!-- Main LED Controls --> + <div id="controls"> + <div id="pattern_buttons"> + {% for p in patterns %} + <button class="pattern_button" value="{{p}}">{{p}}</button> + {% endfor %} + + <!-- Pattern buttons will be inserted here --> + </div> + <form id="delay_form" method="post" action="/delay"> + <label for="delay">Delay:</label> + <input type="range" id="delay" name="delay" min="1" max="1000" value="{{settings['delay']}}" step="10"> + </form> + <form id="brightness_form" method="post" action="/brightness"> + <label for="brightness">Brightness:</label> + <input type="range" id="brightness" name="brightness" min="0" max="100" value="{{settings['brightness']}}" step="1"> + </form> + <form id="color_form" method="post" action="/color"> + <input type="color" id="color" name="color" value="{{settings['color1']}}"> + </form> + <form id="color2_form" method="post" action="/color2"> + <input type="color" id="color2" name="color2" value="{{settings['color2']}}"> + </form> + </div> + + <!-- Settings Menu for num_leds, Wi-Fi SSID, and Password --> + + <div id="settings_menu" style="display: none;"> + <h2>Settings</h2> + + <!-- Separate form for submitting num_leds --> + <form id="num_leds_form" method="post" action="/num_leds"> + <label for="num_leds">Number of LEDs:</label> + <input type="text" id="num_leds" name="num_leds" value="{{settings['num_leds']}}"> + <input type="submit" value="Update Number of LEDs"> + </form> + + <!-- Form for Wi-Fi SSID and password --> + <form id="wifi_form" method="post" action="/wifi_settings"> + <label for="ssid">Wi-Fi SSID:</label> + <input type="text" id="ssid" name="ssid" value="{{settings['wifi']['ssid']}}"> + <br> + <label for="password">Wi-Fi Password:</label> + <input type="password" id="password" name="password"> + <br> + <label for="ip">Wi-Fi IP:</label> + <input type="ip" id="ip" name="ip" value="{{settings.get('wifi', {}).get('ip', '')}}"> + <br> + <label for="gateway">Wi-Fi Gateway:</label> + <input type="gateway" id="gateway" name="gateway" value="{{settings.get('wifi', {}).get('gateway', '')}}"> + <br> + <input type="submit" value="Save Wi-Fi Settings"> + </form> + </div> +</body> +</html> diff --git a/src/templates/index_html.py b/src/templates/index_html.py new file mode 100644 index 0000000..8ba57fe --- /dev/null +++ b/src/templates/index_html.py @@ -0,0 +1,94 @@ +# Autogenerated file +def render(settings, patterns): + yield """<!DOCTYPE html> +<html lang=\"en\"> +<head> + <meta charset=\"UTF-8\"> + <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\"> + <title>LED Control</title> + <script src=\"static/main.js\"></script> + <link rel=\"stylesheet\" href=\"static/main.css\"> +</head> +<body> + <h1>Control LEDs</h1> + <button onclick=\"selectControls()\">Controls</button> + <button onclick=\"selectSettings()\">Settings</button> + + <!-- Main LED Controls --> + <div id=\"controls\"> + <div id=\"pattern_buttons\"> + """ + for p in patterns: + yield """ <button class=\"pattern_button\" value=\"""" + yield str(p) + yield """\">""" + yield str(p) + yield """</button> + """ + yield """ + <!-- Pattern buttons will be inserted here --> + </div> + <form id=\"delay_form\" method=\"post\" action=\"/delay\"> + <label for=\"delay\">Delay:</label> + <input type=\"range\" id=\"delay\" name=\"delay\" min=\"1\" max=\"1000\" value=\"""" + yield str(settings['delay']) + yield """\" step=\"10\"> + </form> + <form id=\"brightness_form\" method=\"post\" action=\"/brightness\"> + <label for=\"brightness\">Brightness:</label> + <input type=\"range\" id=\"brightness\" name=\"brightness\" min=\"0\" max=\"100\" value=\"""" + yield str(settings['brightness']) + yield """\" step=\"1\"> + </form> + <form id=\"color_form\" method=\"post\" action=\"/color\"> + <input type=\"color\" id=\"color\" name=\"color\" value=\"""" + yield str(settings['color1']) + yield """\"> + </form> + <form id=\"color2_form\" method=\"post\" action=\"/color2\"> + <input type=\"color\" id=\"color2\" name=\"color2\" value=\"""" + yield str(settings['color2']) + yield """\"> + </form> + </div> + + <!-- Settings Menu for num_leds, Wi-Fi SSID, and Password --> + + <div id=\"settings_menu\" style=\"display: none;\"> + <h2>Settings</h2> + + <!-- Separate form for submitting num_leds --> + <form id=\"num_leds_form\" method=\"post\" action=\"/num_leds\"> + <label for=\"num_leds\">Number of LEDs:</label> + <input type=\"text\" id=\"num_leds\" name=\"num_leds\" value=\"""" + yield str(settings['num_leds']) + yield """\"> + <input type=\"submit\" value=\"Update Number of LEDs\"> + </form> + + <!-- Form for Wi-Fi SSID and password --> + <form id=\"wifi_form\" method=\"post\" action=\"/wifi_settings\"> + <label for=\"ssid\">Wi-Fi SSID:</label> + <input type=\"text\" id=\"ssid\" name=\"ssid\" value=\"""" + yield str(settings['wifi']['ssid']) + yield """\"> + <br> + <label for=\"password\">Wi-Fi Password:</label> + <input type=\"password\" id=\"password\" name=\"password\"> + <br> + <label for=\"ip\">Wi-Fi IP:</label> + <input type=\"ip\" id=\"ip\" name=\"ip\" value=\"""" + yield str(settings.get('wifi', {}).get('ip', '')) + yield """\"> + <br> + <label for=\"gateway\">Wi-Fi Gateway:</label> + <input type=\"gateway\" id=\"gateway\" name=\"gateway\" value=\"""" + yield str(settings.get('wifi', {}).get('gateway', '')) + yield """\"> + <br> + <input type=\"submit\" value=\"Save Wi-Fi Settings\"> + </form> + </div> +</body> +</html> +""" diff --git a/src/web.py b/src/web.py new file mode 100644 index 0000000..048e60d --- /dev/null +++ b/src/web.py @@ -0,0 +1,135 @@ +from microdot import Microdot, send_file, Response +from microdot.utemplate import Template +from microdot.websocket import with_websocket + +import json +import wifi + +def web(settings, patterns): + app = Microdot() + Response.default_content_type = 'text/html' + + @app.route('/') + async def index(request): + return Template('/index.html').render(settings=settings, patterns=patterns.patterns.keys()) + + @app.route("/static/<path:path>") + def static(request, path): + if '..' in path: + # Directory traversal is not allowed + return 'Not found', 404 + return send_file('static/' + path) + + @app.post("/num_leds") + def num_leds(request): + try: + data = json.loads(request.body.decode('utf-8')) + num_leds = int(data["num_leds"]) + patterns.update_num_leds(4, num_leds) + settings["num_leds"] = num_leds + settings.save() + return "OK", 200 + except (ValueError, KeyError, json.JSONDecodeError): + return "Bad request", 400 + + @app.post("/pattern") + def pattern(request): + try: + data = json.loads(request.body.decode('utf-8')) + pattern = data["pattern"] + if patterns.select(pattern): + settings["selected_pattern"] = pattern + settings.save() + return "OK", 200 + else: + return "Bad request", 400 + except (KeyError, json.JSONDecodeError): + return "Bad request", 400 + + @app.post("/delay") + def delay(request): + try: + data = json.loads(request.body.decode('utf-8')) + delay = int(data["delay"]) + patterns.set_delay(delay) + settings["delay"] = delay + settings.save() + return "OK", 200 + except (ValueError, KeyError, json.JSONDecodeError): + return "Bad request", 400 + + @app.post("/brightness") + def brightness(request): + try: + data = json.loads(request.body.decode('utf-8')) + brightness = int(data["brightness"]) + patterns.set_brightness(brightness) + settings["brightness"] = brightness + settings.save() + return "OK", 200 + except (ValueError, KeyError, json.JSONDecodeError): + return "Bad request", 400 + + @app.post("/color") + def color(request): + try: + data = json.loads(request.body.decode('utf-8')) + color = data["color"] + patterns.set_color1(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB + settings["color1"] = color + settings.save() + return "OK", 200 + except (KeyError, json.JSONDecodeError, ValueError): + return "Bad request", 400 + + @app.post("/color2") + def color2(request): + try: + data = json.loads(request.body.decode('utf-8')) + color = data["color2"] + patterns.set_color2(tuple(int(color[i:i+2], 16) for i in (1, 3, 5))) # Convert hex to RGB + settings["color2"] = color + settings.save() + return "OK", 200 + except (KeyError, json.JSONDecodeError, ValueError): + return "Bad request", 400 + + + @app.post("/wifi_settings") + def wifi_settings(request): + try: + data = json.loads(request.body.decode('utf-8')) + print(data) + ssid = settings['wifi']['ssid'] = data['ssid'] + password = settings['wifi']['password'] = data.get('password', settings['wifi']['password']) + ip = settings['wifi']['ip'] = data.get('ip', None) + gateway = settings['wifi']['gateway'] = data.get('gateway', None) + print(settings) + + if config := wifi.connect(ssid, password, ip, gateway): + print(config) + settings.save() + + return "OK", 200 + except Exception as e: + print(f"Wifi {e}") + return "Bad request", 400 + + + @app.post("/sync") + def sync(request): + patterns.sync() + return "OK", 200 + + @app.route("/external") + @with_websocket + async def ws(request, ws): + patterns.select("external") + while True: + data = await ws.receive() + print(data) + for i in range(min(patterns.num_leds, int(len(data)/3))): + patterns.set(i, (data[i*3], data[i*3+1], data[i*3+2])) + patterns.write() + + return app diff --git a/src/wifi.py b/src/wifi.py new file mode 100644 index 0000000..32a9036 --- /dev/null +++ b/src/wifi.py @@ -0,0 +1,46 @@ +import network +from machine import Pin +from time import sleep +import ubinascii +from settings import Settings + +def connect(ssid, password, ip, gateway): + if ssid is None or password is None: + print("Missing ssid or password") + return None + try: + sta_if = network.WLAN(network.STA_IF) + if ip is not None and gateway is not None: + sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1')) + if not sta_if.isconnected(): + print('connecting to network...') + sta_if.active(True) + sta_if.connect(ssid, password) + sleep(0.1) + if sta_if.isconnected(): + return sta_if.ifconfig() + return None + return sta_if.ifconfig() + except Exception as e: + print(f"Failed to connect to wifi {e}") + return None + + +def ap(password): + ap_if = network.WLAN(network.AP_IF) + ap_mac = ap_if.config('mac') + ssid = f"led-{ubinascii.hexlify(ap_mac).decode()}" + print(ssid) + ap_if.active(True) + ap_if.config(essid=ssid, password="qwerty1234") + ap_if.active(False) + ap_if.active(True) + print(ap_if.ifconfig()) + + + + + + + + diff --git a/test.py b/test.py new file mode 100644 index 0000000..a3ea3c6 --- /dev/null +++ b/test.py @@ -0,0 +1 @@ +print("Hello") \ No newline at end of file