diff --git a/lib/microdot/__init__.py b/lib/microdot/__init__.py deleted file mode 100644 index 68cb381..0000000 --- a/lib/microdot/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from microdot.microdot import Microdot, Request, Response, abort, redirect, \ - send_file # noqa: F401 \ No newline at end of file diff --git a/lib/microdot/helpers.py b/lib/microdot/helpers.py deleted file mode 100644 index 664e58c..0000000 --- a/lib/microdot/helpers.py +++ /dev/null @@ -1,8 +0,0 @@ -try: - from functools import wraps -except ImportError: # pragma: no cover - # MicroPython does not currently implement functools.wraps - def wraps(wrapped): - def _(wrapper): - return wrapper - return _ diff --git a/lib/microdot/microdot.py b/lib/microdot/microdot.py deleted file mode 100644 index 0513f21..0000000 --- a/lib/microdot/microdot.py +++ /dev/null @@ -1,1450 +0,0 @@ -""" -microdot --------- - -The ``microdot`` module defines a few classes that help implement HTTP-based -servers for MicroPython and standard Python. -""" -import asyncio -import io -import json -import time - -try: - from inspect import iscoroutinefunction, iscoroutine - from functools import partial - - async def invoke_handler(handler, *args, **kwargs): - """Invoke a handler and return the result. - - This method runs sync handlers in a thread pool executor. - """ - if iscoroutinefunction(handler): - ret = await handler(*args, **kwargs) - else: - ret = await asyncio.get_running_loop().run_in_executor( - None, partial(handler, *args, **kwargs)) - return ret -except ImportError: # pragma: no cover - def iscoroutine(coro): - return hasattr(coro, 'send') and hasattr(coro, 'throw') - - async def invoke_handler(handler, *args, **kwargs): - """Invoke a handler and return the result. - - This method runs sync handlers in the asyncio thread, which can - potentially cause blocking and performance issues. - """ - ret = handler(*args, **kwargs) - if iscoroutine(ret): - ret = await ret - return ret - -try: - from sys import print_exception -except ImportError: # pragma: no cover - import traceback - - def print_exception(exc): - traceback.print_exc() - -MUTED_SOCKET_ERRORS = [ - 32, # Broken pipe - 54, # Connection reset by peer - 104, # Connection reset by peer - 128, # Operation on closed socket -] - - -def urldecode_str(s): - s = s.replace('+', ' ') - parts = s.split('%') - if len(parts) == 1: - return s - result = [parts[0]] - for item in parts[1:]: - if item == '': - result.append('%') - else: - code = item[:2] - result.append(chr(int(code, 16))) - result.append(item[2:]) - return ''.join(result) - - -def urldecode_bytes(s): - s = s.replace(b'+', b' ') - parts = s.split(b'%') - if len(parts) == 1: - return s.decode() - result = [parts[0]] - for item in parts[1:]: - if item == b'': - result.append(b'%') - else: - code = item[:2] - result.append(bytes([int(code, 16)])) - result.append(item[2:]) - return b''.join(result).decode() - - -def urlencode(s): - return s.replace('+', '%2B').replace(' ', '+').replace( - '%', '%25').replace('?', '%3F').replace('#', '%23').replace( - '&', '%26').replace('=', '%3D') - - -class NoCaseDict(dict): - """A subclass of dictionary that holds case-insensitive keys. - - :param initial_dict: an initial dictionary of key/value pairs to - initialize this object with. - - Example:: - - >>> d = NoCaseDict() - >>> d['Content-Type'] = 'text/html' - >>> print(d['Content-Type']) - text/html - >>> print(d['content-type']) - text/html - >>> print(d['CONTENT-TYPE']) - text/html - >>> del d['cOnTeNt-TyPe'] - >>> print(d) - {} - """ - def __init__(self, initial_dict=None): - super().__init__(initial_dict or {}) - self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k} - - def __setitem__(self, key, value): - kl = key.lower() - key = self.keymap.get(kl, key) - if kl != key: - self.keymap[kl] = key - super().__setitem__(key, value) - - def __getitem__(self, key): - kl = key.lower() - return super().__getitem__(self.keymap.get(kl, kl)) - - def __delitem__(self, key): - kl = key.lower() - super().__delitem__(self.keymap.get(kl, kl)) - - def __contains__(self, key): - kl = key.lower() - return self.keymap.get(kl, kl) in self.keys() - - def get(self, key, default=None): - kl = key.lower() - return super().get(self.keymap.get(kl, kl), default) - - def update(self, other_dict): - for key, value in other_dict.items(): - self[key] = value - - -def mro(cls): # pragma: no cover - """Return the method resolution order of a class. - - This is a helper function that returns the method resolution order of a - class. It is used by Microdot to find the best error handler to invoke for - the raised exception. - - In CPython, this function returns the ``__mro__`` attribute of the class. - In MicroPython, this function implements a recursive depth-first scanning - of the class hierarchy. - """ - if hasattr(cls, 'mro'): - return cls.__mro__ - - def _mro(cls): - m = [cls] - for base in cls.__bases__: - m += _mro(base) - return m - - mro_list = _mro(cls) - - # If a class appears multiple times (due to multiple inheritance) remove - # all but the last occurence. This matches the method resolution order - # of MicroPython, but not CPython. - mro_pruned = [] - for i in range(len(mro_list)): - base = mro_list.pop(0) - if base not in mro_list: - mro_pruned.append(base) - return mro_pruned - - -class MultiDict(dict): - """A subclass of dictionary that can hold multiple values for the same - key. It is used to hold key/value pairs decoded from query strings and - form submissions. - - :param initial_dict: an initial dictionary of key/value pairs to - initialize this object with. - - Example:: - - >>> d = MultiDict() - >>> d['sort'] = 'name' - >>> d['sort'] = 'email' - >>> print(d['sort']) - 'name' - >>> print(d.getlist('sort')) - ['name', 'email'] - """ - def __init__(self, initial_dict=None): - super().__init__() - if initial_dict: - for key, value in initial_dict.items(): - self[key] = value - - def __setitem__(self, key, value): - if key not in self: - super().__setitem__(key, []) - super().__getitem__(key).append(value) - - def __getitem__(self, key): - return super().__getitem__(key)[0] - - def get(self, key, default=None, type=None): - """Return the value for a given key. - - :param key: The key to retrieve. - :param default: A default value to use if the key does not exist. - :param type: A type conversion callable to apply to the value. - - If the multidict contains more than one value for the requested key, - this method returns the first value only. - - Example:: - - >>> d = MultiDict() - >>> d['age'] = '42' - >>> d.get('age') - '42' - >>> d.get('age', type=int) - 42 - >>> d.get('name', default='noname') - 'noname' - """ - if key not in self: - return default - value = self[key] - if type is not None: - value = type(value) - return value - - def getlist(self, key, type=None): - """Return all the values for a given key. - - :param key: The key to retrieve. - :param type: A type conversion callable to apply to the values. - - If the requested key does not exist in the dictionary, this method - returns an empty list. - - Example:: - - >>> d = MultiDict() - >>> d.getlist('items') - [] - >>> d['items'] = '3' - >>> d.getlist('items') - ['3'] - >>> d['items'] = '56' - >>> d.getlist('items') - ['3', '56'] - >>> d.getlist('items', type=int) - [3, 56] - """ - if key not in self: - return [] - values = super().__getitem__(key) - if type is not None: - values = [type(value) for value in values] - return values - - -class AsyncBytesIO: - """An async wrapper for BytesIO.""" - def __init__(self, data): - self.stream = io.BytesIO(data) - - async def read(self, n=-1): - return self.stream.read(n) - - async def readline(self): # pragma: no cover - return self.stream.readline() - - async def readexactly(self, n): # pragma: no cover - return self.stream.read(n) - - async def readuntil(self, separator=b'\n'): # pragma: no cover - return self.stream.readuntil(separator=separator) - - async def awrite(self, data): # pragma: no cover - return self.stream.write(data) - - async def aclose(self): # pragma: no cover - pass - - -class Request: - """An HTTP request.""" - #: Specify the maximum payload size that is accepted. Requests with larger - #: payloads will be rejected with a 413 status code. Applications can - #: change this maximum as necessary. - #: - #: Example:: - #: - #: Request.max_content_length = 1 * 1024 * 1024 # 1MB requests allowed - max_content_length = 16 * 1024 - - #: Specify the maximum payload size that can be stored in ``body``. - #: Requests with payloads that are larger than this size and up to - #: ``max_content_length`` bytes will be accepted, but the application will - #: only be able to access the body of the request by reading from - #: ``stream``. Set to 0 if you always access the body as a stream. - #: - #: Example:: - #: - #: Request.max_body_length = 4 * 1024 # up to 4KB bodies read - max_body_length = 16 * 1024 - - #: Specify the maximum length allowed for a line in the request. Requests - #: with longer lines will not be correctly interpreted. Applications can - #: change this maximum as necessary. - #: - #: Example:: - #: - #: Request.max_readline = 16 * 1024 # 16KB lines allowed - max_readline = 2 * 1024 - - class G: - pass - - def __init__(self, app, client_addr, method, url, http_version, headers, - body=None, stream=None, sock=None): - #: The application instance to which this request belongs. - self.app = app - #: The address of the client, as a tuple (host, port). - self.client_addr = client_addr - #: The HTTP method of the request. - self.method = method - #: The request URL, including the path and query string. - self.url = url - #: The path portion of the URL. - self.path = url - #: The query string portion of the URL. - self.query_string = None - #: The parsed query string, as a - #: :class:`MultiDict ` object. - self.args = {} - #: A dictionary with the headers included in the request. - self.headers = headers - #: A dictionary with the cookies included in the request. - self.cookies = {} - #: The parsed ``Content-Length`` header. - self.content_length = 0 - #: The parsed ``Content-Type`` header. - self.content_type = None - #: A general purpose container for applications to store data during - #: the life of the request. - self.g = Request.G() - - self.http_version = http_version - if '?' in self.path: - self.path, self.query_string = self.path.split('?', 1) - self.args = self._parse_urlencoded(self.query_string) - - if 'Content-Length' in self.headers: - self.content_length = int(self.headers['Content-Length']) - if 'Content-Type' in self.headers: - self.content_type = self.headers['Content-Type'] - if 'Cookie' in self.headers: - for cookie in self.headers['Cookie'].split(';'): - name, value = cookie.strip().split('=', 1) - self.cookies[name] = value - - self._body = body - self.body_used = False - self._stream = stream - self.sock = sock - self._json = None - self._form = None - self.after_request_handlers = [] - - @staticmethod - async def create(app, client_reader, client_writer, client_addr): - """Create a request object. - - :param app: The Microdot application instance. - :param client_reader: An input stream from where the request data can - be read. - :param client_writer: An output stream where the response data can be - written. - :param client_addr: The address of the client, as a tuple. - - This method is a coroutine. It returns a newly created ``Request`` - object. - """ - # request line - line = (await Request._safe_readline(client_reader)).strip().decode() - if not line: # pragma: no cover - return None - method, url, http_version = line.split() - http_version = http_version.split('/', 1)[1] - - # headers - headers = NoCaseDict() - content_length = 0 - while True: - line = (await Request._safe_readline( - client_reader)).strip().decode() - if line == '': - break - header, value = line.split(':', 1) - value = value.strip() - headers[header] = value - if header.lower() == 'content-length': - content_length = int(value) - - # body - body = b'' - if content_length and content_length <= Request.max_body_length: - body = await client_reader.readexactly(content_length) - stream = None - else: - body = b'' - stream = client_reader - - return Request(app, client_addr, method, url, http_version, headers, - body=body, stream=stream, - sock=(client_reader, client_writer)) - - def _parse_urlencoded(self, urlencoded): - data = MultiDict() - if len(urlencoded) > 0: # pragma: no branch - if isinstance(urlencoded, str): - for kv in [pair.split('=', 1) - for pair in urlencoded.split('&') if pair]: - data[urldecode_str(kv[0])] = urldecode_str(kv[1]) \ - if len(kv) > 1 else '' - elif isinstance(urlencoded, bytes): # pragma: no branch - for kv in [pair.split(b'=', 1) - for pair in urlencoded.split(b'&') if pair]: - data[urldecode_bytes(kv[0])] = urldecode_bytes(kv[1]) \ - if len(kv) > 1 else b'' - return data - - @property - def body(self): - """The body of the request, as bytes.""" - return self._body - - @property - def stream(self): - """The body of the request, as a bytes stream.""" - if self._stream is None: - self._stream = AsyncBytesIO(self._body) - return self._stream - - @property - def json(self): - """The parsed JSON body, or ``None`` if the request does not have a - JSON body.""" - if self._json is None: - if self.content_type is None: - return None - mime_type = self.content_type.split(';')[0] - if mime_type != 'application/json': - return None - self._json = json.loads(self.body.decode()) - return self._json - - @property - def form(self): - """The parsed form submission body, as a - :class:`MultiDict ` object, or ``None`` if the - request does not have a form submission.""" - if self._form is None: - if self.content_type is None: - return None - mime_type = self.content_type.split(';')[0] - if mime_type != 'application/x-www-form-urlencoded': - return None - self._form = self._parse_urlencoded(self.body) - return self._form - - def after_request(self, f): - """Register a request-specific function to run after the request is - handled. Request-specific after request handlers run at the very end, - after the application's own after request handlers. The function must - take two arguments, the request and response objects. The return value - of the function must be the updated response object. - - Example:: - - @app.route('/') - def index(request): - # register a request-specific after request handler - @req.after_request - def func(request, response): - # ... - return response - - return 'Hello, World!' - - Note that the function is not called if the request handler raises an - exception and an error response is returned instead. - """ - self.after_request_handlers.append(f) - return f - - @staticmethod - async def _safe_readline(stream): - line = (await stream.readline()) - if len(line) > Request.max_readline: - raise ValueError('line too long') - return line - - -class Response: - """An HTTP response class. - - :param body: The body of the response. If a dictionary or list is given, - a JSON formatter is used to generate the body. If a file-like - object or an async generator is given, a streaming response is - used. If a string is given, it is encoded from UTF-8. Else, - the body should be a byte sequence. - :param status_code: The numeric HTTP status code of the response. The - default is 200. - :param headers: A dictionary of headers to include in the response. - :param reason: A custom reason phrase to add after the status code. The - default is "OK" for responses with a 200 status code and - "N/A" for any other status codes. - """ - types_map = { - 'css': 'text/css', - 'gif': 'image/gif', - 'html': 'text/html', - 'jpg': 'image/jpeg', - 'js': 'application/javascript', - 'json': 'application/json', - 'png': 'image/png', - 'txt': 'text/plain', - } - - send_file_buffer_size = 1024 - - #: The content type to use for responses that do not explicitly define a - #: ``Content-Type`` header. - default_content_type = 'text/plain' - - #: The default cache control max age used by :meth:`send_file`. A value - #: of ``None`` means that no ``Cache-Control`` header is added. - default_send_file_max_age = None - - #: Special response used to signal that a response does not need to be - #: written to the client. Used to exit WebSocket connections cleanly. - already_handled = None - - def __init__(self, body='', status_code=200, headers=None, reason=None): - if body is None and status_code == 200: - body = '' - status_code = 204 - self.status_code = status_code - self.headers = NoCaseDict(headers or {}) - self.reason = reason - if isinstance(body, (dict, list)): - self.body = json.dumps(body).encode() - self.headers['Content-Type'] = 'application/json; charset=UTF-8' - elif isinstance(body, str): - self.body = body.encode() - else: - # this applies to bytes, file-like objects or generators - self.body = body - self.is_head = False - - def set_cookie(self, cookie, value, path=None, domain=None, expires=None, - max_age=None, secure=False, http_only=False, - partitioned=False): - """Add a cookie to the response. - - :param cookie: The cookie's name. - :param value: The cookie's value. - :param path: The cookie's path. - :param domain: The cookie's domain. - :param expires: The cookie expiration time, as a ``datetime`` object - or a correctly formatted string. - :param max_age: The cookie's ``Max-Age`` value. - :param secure: The cookie's ``secure`` flag. - :param http_only: The cookie's ``HttpOnly`` flag. - :param partitioned: Whether the cookie is partitioned. - """ - http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value) - if path: - http_cookie += '; Path=' + path - if domain: - http_cookie += '; Domain=' + domain - if expires: - if isinstance(expires, str): - http_cookie += '; Expires=' + expires - else: # pragma: no cover - http_cookie += '; Expires=' + time.strftime( - '%a, %d %b %Y %H:%M:%S GMT', expires.timetuple()) - if max_age is not None: - http_cookie += '; Max-Age=' + str(max_age) - if secure: - http_cookie += '; Secure' - if http_only: - http_cookie += '; HttpOnly' - if partitioned: - http_cookie += '; Partitioned' - if 'Set-Cookie' in self.headers: - self.headers['Set-Cookie'].append(http_cookie) - else: - self.headers['Set-Cookie'] = [http_cookie] - - def delete_cookie(self, cookie, **kwargs): - """Delete a cookie. - - :param cookie: The cookie's name. - :param kwargs: Any cookie opens and flags supported by - ``set_cookie()`` except ``expires`` and ``max_age``. - """ - self.set_cookie(cookie, '', expires='Thu, 01 Jan 1970 00:00:01 GMT', - max_age=0, **kwargs) - - def complete(self): - if isinstance(self.body, bytes) and \ - 'Content-Length' not in self.headers: - self.headers['Content-Length'] = str(len(self.body)) - if 'Content-Type' not in self.headers: - self.headers['Content-Type'] = self.default_content_type - if 'charset=' not in self.headers['Content-Type']: - self.headers['Content-Type'] += '; charset=UTF-8' - - async def write(self, stream): - self.complete() - - try: - # status code - reason = self.reason if self.reason is not None else \ - ('OK' if self.status_code == 200 else 'N/A') - await stream.awrite('HTTP/1.0 {status_code} {reason}\r\n'.format( - status_code=self.status_code, reason=reason).encode()) - - # headers - for header, value in self.headers.items(): - values = value if isinstance(value, list) else [value] - for value in values: - await stream.awrite('{header}: {value}\r\n'.format( - header=header, value=value).encode()) - await stream.awrite(b'\r\n') - - # body - if not self.is_head: - iter = self.body_iter() - async for body in iter: - if isinstance(body, str): # pragma: no cover - body = body.encode() - try: - await stream.awrite(body) - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS or \ - exc.args[0] == 'Connection lost': - if hasattr(iter, 'aclose'): - await iter.aclose() - raise - if hasattr(iter, 'aclose'): # pragma: no branch - await iter.aclose() - - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS or \ - exc.args[0] == 'Connection lost': - pass - else: - raise - - def body_iter(self): - if hasattr(self.body, '__anext__'): - # response body is an async generator - return self.body - - response = self - - class iter: - ITER_UNKNOWN = 0 - ITER_SYNC_GEN = 1 - ITER_FILE_OBJ = 2 - ITER_NO_BODY = -1 - - def __aiter__(self): - if response.body: - self.i = self.ITER_UNKNOWN # need to determine type - else: - self.i = self.ITER_NO_BODY - return self - - async def __anext__(self): - if self.i == self.ITER_NO_BODY: - await self.aclose() - raise StopAsyncIteration - if self.i == self.ITER_UNKNOWN: - if hasattr(response.body, 'read'): - self.i = self.ITER_FILE_OBJ - elif hasattr(response.body, '__next__'): - self.i = self.ITER_SYNC_GEN - return next(response.body) - else: - self.i = self.ITER_NO_BODY - return response.body - elif self.i == self.ITER_SYNC_GEN: - try: - return next(response.body) - except StopIteration: - await self.aclose() - raise StopAsyncIteration - buf = response.body.read(response.send_file_buffer_size) - if iscoroutine(buf): # pragma: no cover - buf = await buf - if len(buf) < response.send_file_buffer_size: - self.i = self.ITER_NO_BODY - return buf - - async def aclose(self): - if hasattr(response.body, 'close'): - result = response.body.close() - if iscoroutine(result): # pragma: no cover - await result - - return iter() - - @classmethod - def redirect(cls, location, status_code=302): - """Return a redirect response. - - :param location: The URL to redirect to. - :param status_code: The 3xx status code to use for the redirect. The - default is 302. - """ - if '\x0d' in location or '\x0a' in location: - raise ValueError('invalid redirect URL') - return cls(status_code=status_code, headers={'Location': location}) - - @classmethod - def send_file(cls, filename, status_code=200, content_type=None, - stream=None, max_age=None, compressed=False, - file_extension=''): - """Send file contents in a response. - - :param filename: The filename of the file. - :param status_code: The 3xx status code to use for the redirect. The - default is 302. - :param content_type: The ``Content-Type`` header to use in the - response. If omitted, it is generated - automatically from the file extension of the - ``filename`` parameter. - :param stream: A file-like object to read the file contents from. If - a stream is given, the ``filename`` parameter is only - used when generating the ``Content-Type`` header. - :param max_age: The ``Cache-Control`` header's ``max-age`` value in - seconds. If omitted, the value of the - :attr:`Response.default_send_file_max_age` attribute is - used. - :param compressed: Whether the file is compressed. If ``True``, the - ``Content-Encoding`` header is set to ``gzip``. A - string with the header value can also be passed. - Note that when using this option the file must have - been compressed beforehand. This option only sets - the header. - :param file_extension: A file extension to append to the ``filename`` - parameter when opening the file, including the - dot. The extension given here is not considered - when generating the ``Content-Type`` header. - - Security note: The filename is assumed to be trusted. Never pass - filenames provided by the user without validating and sanitizing them - first. - """ - if content_type is None: - if compressed and filename.endswith('.gz'): - ext = filename[:-3].split('.')[-1] - else: - ext = filename.split('.')[-1] - if ext in Response.types_map: - content_type = Response.types_map[ext] - else: - content_type = 'application/octet-stream' - headers = {'Content-Type': content_type} - - if max_age is None: - max_age = cls.default_send_file_max_age - if max_age is not None: - headers['Cache-Control'] = 'max-age={}'.format(max_age) - - if compressed: - headers['Content-Encoding'] = compressed \ - if isinstance(compressed, str) else 'gzip' - - f = stream or open(filename + file_extension, 'rb') - return cls(body=f, status_code=status_code, headers=headers) - - -class URLPattern(): - def __init__(self, url_pattern): - self.url_pattern = url_pattern - self.segments = [] - self.regex = None - pattern = '' - use_regex = False - for segment in url_pattern.lstrip('/').split('/'): - if segment and segment[0] == '<': - if segment[-1] != '>': - raise ValueError('invalid URL pattern') - segment = segment[1:-1] - if ':' in segment: - type_, name = segment.rsplit(':', 1) - else: - type_ = 'string' - name = segment - parser = None - if type_ == 'string': - parser = self._string_segment - pattern += '/([^/]+)' - elif type_ == 'int': - parser = self._int_segment - pattern += '/(-?\\d+)' - elif type_ == 'path': - use_regex = True - pattern += '/(.+)' - elif type_.startswith('re:'): - use_regex = True - pattern += '/({pattern})'.format(pattern=type_[3:]) - else: - raise ValueError('invalid URL segment type') - self.segments.append({'parser': parser, 'name': name, - 'type': type_}) - else: - pattern += '/' + segment - self.segments.append({'parser': self._static_segment(segment)}) - if use_regex: - import re - self.regex = re.compile('^' + pattern + '$') - - def match(self, path): - args = {} - if self.regex: - g = self.regex.match(path) - if not g: - return - i = 1 - for segment in self.segments: - if 'name' not in segment: - continue - value = g.group(i) - if segment['type'] == 'int': - value = int(value) - args[segment['name']] = value - i += 1 - else: - if len(path) == 0 or path[0] != '/': - return - path = path[1:] - args = {} - for segment in self.segments: - if path is None: - return - arg, path = segment['parser'](path) - if arg is None: - return - if 'name' in segment: - args[segment['name']] = arg - if path is not None: - return - return args - - def _static_segment(self, segment): - def _static(value): - s = value.split('/', 1) - if s[0] == segment: - return '', s[1] if len(s) > 1 else None - return None, None - return _static - - def _string_segment(self, value): - s = value.split('/', 1) - if len(s[0]) == 0: - return None, None - return s[0], s[1] if len(s) > 1 else None - - def _int_segment(self, value): - s = value.split('/', 1) - try: - return int(s[0]), s[1] if len(s) > 1 else None - except ValueError: - return None, None - - -class HTTPException(Exception): - def __init__(self, status_code, reason=None): - self.status_code = status_code - self.reason = reason or str(status_code) + ' error' - - def __repr__(self): # pragma: no cover - return 'HTTPException: {}'.format(self.status_code) - - -class Microdot: - """An HTTP application class. - - This class implements an HTTP application instance and is heavily - influenced by the ``Flask`` class of the Flask framework. It is typically - declared near the start of the main application script. - - Example:: - - from microdot import Microdot - - app = Microdot() - """ - - def __init__(self): - self.url_map = [] - self.before_request_handlers = [] - self.after_request_handlers = [] - self.after_error_request_handlers = [] - self.error_handlers = {} - self.shutdown_requested = False - self.options_handler = self.default_options_handler - self.debug = False - self.server = None - - def route(self, url_pattern, methods=None): - """Decorator that is used to register a function as a request handler - for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - :param methods: The list of HTTP methods to be handled by the - decorated function. If omitted, only ``GET`` requests - are handled. - - The URL pattern can be a static path (for example, ``/users`` or - ``/api/invoices/search``) or a path with dynamic components enclosed - in ``<`` and ``>`` (for example, ``/users/`` or - ``/invoices//products``). Dynamic path components can also - include a type prefix, separated from the name with a colon (for - example, ``/users/``). The type can be ``string`` (the - default), ``int``, ``path`` or ``re:[regular-expression]``. - - The first argument of the decorated function must be - the request object. Any path arguments that are specified in the URL - pattern are passed as keyword arguments. The return value of the - function must be a :class:`Response` instance, or the arguments to - be passed to this class. - - Example:: - - @app.route('/') - def index(request): - return 'Hello, world!' - """ - def decorated(f): - self.url_map.append( - ([m.upper() for m in (methods or ['GET'])], - URLPattern(url_pattern), f)) - return f - return decorated - - def get(self, url_pattern): - """Decorator that is used to register a function as a ``GET`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['GET']``. - - Example:: - - @app.get('/users/') - def get_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['GET']) - - def post(self, url_pattern): - """Decorator that is used to register a function as a ``POST`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the``route`` decorator with - ``methods=['POST']``. - - Example:: - - @app.post('/users') - def create_user(request): - # ... - """ - return self.route(url_pattern, methods=['POST']) - - def put(self, url_pattern): - """Decorator that is used to register a function as a ``PUT`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['PUT']``. - - Example:: - - @app.put('/users/') - def edit_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['PUT']) - - def patch(self, url_pattern): - """Decorator that is used to register a function as a ``PATCH`` request - handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['PATCH']``. - - Example:: - - @app.patch('/users/') - def edit_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['PATCH']) - - def delete(self, url_pattern): - """Decorator that is used to register a function as a ``DELETE`` - request handler for a given URL. - - :param url_pattern: The URL pattern that will be compared against - incoming requests. - - This decorator can be used as an alias to the ``route`` decorator with - ``methods=['DELETE']``. - - Example:: - - @app.delete('/users/') - def delete_user(request, id): - # ... - """ - return self.route(url_pattern, methods=['DELETE']) - - def before_request(self, f): - """Decorator to register a function to run before each request is - handled. The decorated function must take a single argument, the - request object. - - Example:: - - @app.before_request - def func(request): - # ... - """ - self.before_request_handlers.append(f) - return f - - def after_request(self, f): - """Decorator to register a function to run after each request is - handled. The decorated function must take two arguments, the request - and response objects. The return value of the function must be an - updated response object. - - Example:: - - @app.after_request - def func(request, response): - # ... - return response - """ - self.after_request_handlers.append(f) - return f - - def after_error_request(self, f): - """Decorator to register a function to run after an error response is - generated. The decorated function must take two arguments, the request - and response objects. The return value of the function must be an - updated response object. The handler is invoked for error responses - generated by Microdot, as well as those returned by application-defined - error handlers. - - Example:: - - @app.after_error_request - def func(request, response): - # ... - return response - """ - self.after_error_request_handlers.append(f) - return f - - def errorhandler(self, status_code_or_exception_class): - """Decorator to register a function as an error handler. Error handler - functions for numeric HTTP status codes must accept a single argument, - the request object. Error handler functions for Python exceptions - must accept two arguments, the request object and the exception - object. - - :param status_code_or_exception_class: The numeric HTTP status code or - Python exception class to - handle. - - Examples:: - - @app.errorhandler(404) - def not_found(request): - return 'Not found' - - @app.errorhandler(RuntimeError) - def runtime_error(request, exception): - return 'Runtime error' - """ - def decorated(f): - self.error_handlers[status_code_or_exception_class] = f - return f - return decorated - - def mount(self, subapp, url_prefix=''): - """Mount a sub-application, optionally under the given URL prefix. - - :param subapp: The sub-application to mount. - :param url_prefix: The URL prefix to mount the application under. - """ - for methods, pattern, handler in subapp.url_map: - self.url_map.append( - (methods, URLPattern(url_prefix + pattern.url_pattern), - handler)) - for handler in subapp.before_request_handlers: - self.before_request_handlers.append(handler) - for handler in subapp.after_request_handlers: - self.after_request_handlers.append(handler) - for handler in subapp.after_error_request_handlers: - self.after_error_request_handlers.append(handler) - for status_code, handler in subapp.error_handlers.items(): - self.error_handlers[status_code] = handler - - @staticmethod - def abort(status_code, reason=None): - """Abort the current request and return an error response with the - given status code. - - :param status_code: The numeric status code of the response. - :param reason: The reason for the response, which is included in the - response body. - - Example:: - - from microdot import abort - - @app.route('/users/') - def get_user(id): - user = get_user_by_id(id) - if user is None: - abort(404) - return user.to_dict() - """ - raise HTTPException(status_code, reason) - - async def start_server(self, host='0.0.0.0', port=5000, debug=False, - ssl=None): - """Start the Microdot web server as a coroutine. This coroutine does - not normally return, as the server enters an endless listening loop. - The :func:`shutdown` function provides a method for terminating the - server gracefully. - - :param host: The hostname or IP address of the network interface that - will be listening for requests. A value of ``'0.0.0.0'`` - (the default) indicates that the server should listen for - requests on all the available interfaces, and a value of - ``127.0.0.1`` indicates that the server should listen - for requests only on the internal networking interface of - the host. - :param port: The port number to listen for requests. The default is - port 5000. - :param debug: If ``True``, the server logs debugging information. The - default is ``False``. - :param ssl: An ``SSLContext`` instance or ``None`` if the server should - not use TLS. The default is ``None``. - - This method is a coroutine. - - Example:: - - import asyncio - from microdot import Microdot - - app = Microdot() - - @app.route('/') - async def index(request): - return 'Hello, world!' - - async def main(): - await app.start_server(debug=True) - - asyncio.run(main()) - """ - self.debug = debug - - async def serve(reader, writer): - if not hasattr(writer, 'awrite'): # pragma: no cover - # CPython provides the awrite and aclose methods in 3.8+ - async def awrite(self, data): - self.write(data) - await self.drain() - - async def aclose(self): - self.close() - await self.wait_closed() - - from types import MethodType - writer.awrite = MethodType(awrite, writer) - writer.aclose = MethodType(aclose, writer) - - await self.handle_request(reader, writer) - - if self.debug: # pragma: no cover - print('Starting async server on {host}:{port}...'.format( - host=host, port=port)) - - try: - self.server = await asyncio.start_server(serve, host, port, - ssl=ssl) - except TypeError: # pragma: no cover - self.server = await asyncio.start_server(serve, host, port) - - while True: - try: - if hasattr(self.server, 'serve_forever'): # pragma: no cover - try: - await self.server.serve_forever() - except asyncio.CancelledError: - pass - await self.server.wait_closed() - break - except AttributeError: # pragma: no cover - # the task hasn't been initialized in the server object yet - # wait a bit and try again - await asyncio.sleep(0.1) - - def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None): - """Start the web server. This function does not normally return, as - the server enters an endless listening loop. The :func:`shutdown` - function provides a method for terminating the server gracefully. - - :param host: The hostname or IP address of the network interface that - will be listening for requests. A value of ``'0.0.0.0'`` - (the default) indicates that the server should listen for - requests on all the available interfaces, and a value of - ``127.0.0.1`` indicates that the server should listen - for requests only on the internal networking interface of - the host. - :param port: The port number to listen for requests. The default is - port 5000. - :param debug: If ``True``, the server logs debugging information. The - default is ``False``. - :param ssl: An ``SSLContext`` instance or ``None`` if the server should - not use TLS. The default is ``None``. - - Example:: - - from microdot import Microdot - - app = Microdot() - - @app.route('/') - async def index(request): - return 'Hello, world!' - - app.run(debug=True) - """ - asyncio.run(self.start_server(host=host, port=port, debug=debug, - ssl=ssl)) # pragma: no cover - - def shutdown(self): - """Request a server shutdown. The server will then exit its request - listening loop and the :func:`run` function will return. This function - can be safely called from a route handler, as it only schedules the - server to terminate as soon as the request completes. - - Example:: - - @app.route('/shutdown') - def shutdown(request): - request.app.shutdown() - return 'The server is shutting down...' - """ - self.server.close() - - def find_route(self, req): - method = req.method.upper() - if method == 'OPTIONS' and self.options_handler: - return self.options_handler(req) - if method == 'HEAD': - method = 'GET' - f = 404 - for route_methods, route_pattern, route_handler in self.url_map: - req.url_args = route_pattern.match(req.path) - if req.url_args is not None: - if method in route_methods: - f = route_handler - break - else: - f = 405 - return f - - def default_options_handler(self, req): - allow = [] - for route_methods, route_pattern, route_handler in self.url_map: - if route_pattern.match(req.path) is not None: - allow.extend(route_methods) - if 'GET' in allow: - allow.append('HEAD') - allow.append('OPTIONS') - return {'Allow': ', '.join(allow)} - - async def handle_request(self, reader, writer): - req = None - try: - req = await Request.create(self, reader, writer, - writer.get_extra_info('peername')) - except Exception as exc: # pragma: no cover - print_exception(exc) - - res = await self.dispatch_request(req) - if res != Response.already_handled: # pragma: no branch - await res.write(writer) - try: - await writer.aclose() - except OSError as exc: # pragma: no cover - if exc.errno in MUTED_SOCKET_ERRORS: - pass - else: - raise - if self.debug and req: # pragma: no cover - print('{method} {path} {status_code}'.format( - method=req.method, path=req.path, - status_code=res.status_code)) - - async def dispatch_request(self, req): - after_request_handled = False - if req: - if req.content_length > req.max_content_length: - if 413 in self.error_handlers: - res = await invoke_handler(self.error_handlers[413], req) - else: - res = 'Payload too large', 413 - else: - f = self.find_route(req) - try: - res = None - if callable(f): - for handler in self.before_request_handlers: - res = await invoke_handler(handler, req) - if res: - break - if res is None: - res = await invoke_handler( - f, req, **req.url_args) - if isinstance(res, int): - res = '', res - if isinstance(res, tuple): - if isinstance(res[0], int): - res = ('', res[0], - res[1] if len(res) > 1 else {}) - body = res[0] - if isinstance(res[1], int): - status_code = res[1] - headers = res[2] if len(res) > 2 else {} - else: - status_code = 200 - headers = res[1] - res = Response(body, status_code, headers) - elif not isinstance(res, Response): - res = Response(res) - for handler in self.after_request_handlers: - res = await invoke_handler( - handler, req, res) or res - for handler in req.after_request_handlers: - res = await invoke_handler( - handler, req, res) or res - after_request_handled = True - elif isinstance(f, dict): - res = Response(headers=f) - elif f in self.error_handlers: - res = await invoke_handler(self.error_handlers[f], req) - else: - res = 'Not found', f - except HTTPException as exc: - if exc.status_code in self.error_handlers: - res = self.error_handlers[exc.status_code](req) - else: - res = exc.reason, exc.status_code - except Exception as exc: - print_exception(exc) - exc_class = None - res = None - if exc.__class__ in self.error_handlers: - exc_class = exc.__class__ - else: - for c in mro(exc.__class__)[1:]: - if c in self.error_handlers: - exc_class = c - break - if exc_class: - try: - res = await invoke_handler( - self.error_handlers[exc_class], req, exc) - except Exception as exc2: # pragma: no cover - print_exception(exc2) - if res is None: - if 500 in self.error_handlers: - res = await invoke_handler( - self.error_handlers[500], req) - else: - res = 'Internal server error', 500 - else: - if 400 in self.error_handlers: - res = await invoke_handler(self.error_handlers[400], req) - else: - res = 'Bad request', 400 - if isinstance(res, tuple): - res = Response(*res) - elif not isinstance(res, Response): - res = Response(res) - if not after_request_handled: - for handler in self.after_error_request_handlers: - res = await invoke_handler( - handler, req, res) or res - res.is_head = (req and req.method == 'HEAD') - return res - - -Response.already_handled = Response() - -abort = Microdot.abort -redirect = Response.redirect -send_file = Response.send_file \ No newline at end of file diff --git a/lib/microdot/utemplate.py b/lib/microdot/utemplate.py deleted file mode 100644 index 16d0398..0000000 --- a/lib/microdot/utemplate.py +++ /dev/null @@ -1,70 +0,0 @@ -from utemplate import recompile - -_loader = None - - -class Template: - """A template object. - - :param template: The filename of the template to render, relative to the - configured template directory. - """ - @classmethod - def initialize(cls, template_dir='templates', - loader_class=recompile.Loader): - """Initialize the templating subsystem. - - :param template_dir: the directory where templates are stored. This - argument is optional. The default is to load - templates from a *templates* subdirectory. - :param loader_class: the ``utemplate.Loader`` class to use when loading - templates. This argument is optional. The default - is the ``recompile.Loader`` class, which - automatically recompiles templates when they - change. - """ - global _loader - _loader = loader_class(None, template_dir) - - def __init__(self, template): - if _loader is None: # pragma: no cover - self.initialize() - #: The name of the template - self.name = template - self.template = _loader.load(template) - - def generate(self, *args, **kwargs): - """Return a generator that renders the template in chunks, with the - given arguments.""" - return self.template(*args, **kwargs) - - def render(self, *args, **kwargs): - """Render the template with the given arguments and return it as a - string.""" - return ''.join(self.generate(*args, **kwargs)) - - def generate_async(self, *args, **kwargs): - """Return an asynchronous generator that renders the template in - chunks, using the given arguments.""" - class sync_to_async_iter(): - def __init__(self, iter): - self.iter = iter - - def __aiter__(self): - return self - - async def __anext__(self): - try: - return next(self.iter) - except StopIteration: - raise StopAsyncIteration - - return sync_to_async_iter(self.generate(*args, **kwargs)) - - async def render_async(self, *args, **kwargs): - """Render the template with the given arguments asynchronously and - return it as a string.""" - response = '' - async for chunk in self.generate_async(*args, **kwargs): - response += chunk - return response diff --git a/lib/microdot/websocket.py b/lib/microdot/websocket.py deleted file mode 100644 index 0fb6f7c..0000000 --- a/lib/microdot/websocket.py +++ /dev/null @@ -1,231 +0,0 @@ -import binascii -import hashlib -from microdot import Request, Response -from microdot.microdot import MUTED_SOCKET_ERRORS, print_exception -from microdot.helpers import wraps - - -class WebSocketError(Exception): - """Exception raised when an error occurs in a WebSocket connection.""" - pass - - -class WebSocket: - """A WebSocket connection object. - - An instance of this class is sent to handler functions to manage the - WebSocket connection. - """ - CONT = 0 - TEXT = 1 - BINARY = 2 - CLOSE = 8 - PING = 9 - PONG = 10 - - #: Specify the maximum message size that can be received when calling the - #: ``receive()`` method. Messages with payloads that are larger than this - #: size will be rejected and the connection closed. Set to 0 to disable - #: the size check (be aware of potential security issues if you do this), - #: or to -1 to use the value set in - #: ``Request.max_body_length``. The default is -1. - #: - #: Example:: - #: - #: WebSocket.max_message_length = 4 * 1024 # up to 4KB messages - max_message_length = -1 - - def __init__(self, request): - self.request = request - self.closed = False - - async def handshake(self): - response = self._handshake_response() - await self.request.sock[1].awrite( - b'HTTP/1.1 101 Switching Protocols\r\n') - await self.request.sock[1].awrite(b'Upgrade: websocket\r\n') - await self.request.sock[1].awrite(b'Connection: Upgrade\r\n') - await self.request.sock[1].awrite( - b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n') - - async def receive(self): - """Receive a message from the client.""" - while True: - opcode, payload = await self._read_frame() - send_opcode, data = self._process_websocket_frame(opcode, payload) - if send_opcode: # pragma: no cover - await self.send(data, send_opcode) - elif data: # pragma: no branch - return data - - async def send(self, data, opcode=None): - """Send a message to the client. - - :param data: the data to send, given as a string or bytes. - :param opcode: a custom frame opcode to use. If not given, the opcode - is ``TEXT`` or ``BINARY`` depending on the type of the - data. - """ - frame = self._encode_websocket_frame( - opcode or (self.TEXT if isinstance(data, str) else self.BINARY), - data) - await self.request.sock[1].awrite(frame) - - async def close(self): - """Close the websocket connection.""" - if not self.closed: # pragma: no cover - self.closed = True - await self.send(b'', self.CLOSE) - - def _handshake_response(self): - connection = False - upgrade = False - websocket_key = None - for header, value in self.request.headers.items(): - h = header.lower() - if h == 'connection': - connection = True - if 'upgrade' not in value.lower(): - return self.request.app.abort(400) - elif h == 'upgrade': - upgrade = True - if not value.lower() == 'websocket': - return self.request.app.abort(400) - elif h == 'sec-websocket-key': - websocket_key = value - if not connection or not upgrade or not websocket_key: - return self.request.app.abort(400) - d = hashlib.sha1(websocket_key.encode()) - d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11') - return binascii.b2a_base64(d.digest())[:-1] - - @classmethod - def _parse_frame_header(cls, header): - fin = header[0] & 0x80 - opcode = header[0] & 0x0f - if fin == 0 or opcode == cls.CONT: # pragma: no cover - raise WebSocketError('Continuation frames not supported') - has_mask = header[1] & 0x80 - length = header[1] & 0x7f - if length == 126: - length = -2 - elif length == 127: - length = -8 - return fin, opcode, has_mask, length - - def _process_websocket_frame(self, opcode, payload): - if opcode == self.TEXT: - payload = payload.decode() - elif opcode == self.BINARY: - pass - elif opcode == self.CLOSE: - raise WebSocketError('Websocket connection closed') - elif opcode == self.PING: - return self.PONG, payload - elif opcode == self.PONG: # pragma: no branch - return None, None - return None, payload - - @classmethod - def _encode_websocket_frame(cls, opcode, payload): - frame = bytearray() - frame.append(0x80 | opcode) - if opcode == cls.TEXT: - payload = payload.encode() - if len(payload) < 126: - frame.append(len(payload)) - elif len(payload) < (1 << 16): - frame.append(126) - frame.extend(len(payload).to_bytes(2, 'big')) - else: - frame.append(127) - frame.extend(len(payload).to_bytes(8, 'big')) - frame.extend(payload) - return frame - - async def _read_frame(self): - header = await self.request.sock[0].read(2) - if len(header) != 2: # pragma: no cover - raise WebSocketError('Websocket connection closed') - fin, opcode, has_mask, length = self._parse_frame_header(header) - if length == -2: - length = await self.request.sock[0].read(2) - length = int.from_bytes(length, 'big') - elif length == -8: - length = await self.request.sock[0].read(8) - length = int.from_bytes(length, 'big') - max_allowed_length = Request.max_body_length \ - if self.max_message_length == -1 else self.max_message_length - if length > max_allowed_length: - raise WebSocketError('Message too large') - if has_mask: # pragma: no cover - mask = await self.request.sock[0].read(4) - payload = await self.request.sock[0].read(length) - if has_mask: # pragma: no cover - payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload)) - return opcode, payload - - -async def websocket_upgrade(request): - """Upgrade a request handler to a websocket connection. - - This function can be called directly inside a route function to process a - WebSocket upgrade handshake, for example after the user's credentials are - verified. The function returns the websocket object:: - - @app.route('/echo') - async def echo(request): - if not authenticate_user(request): - abort(401) - ws = await websocket_upgrade(request) - while True: - message = await ws.receive() - await ws.send(message) - """ - ws = WebSocket(request) - await ws.handshake() - - @request.after_request - async def after_request(request, response): - return Response.already_handled - - return ws - - -def websocket_wrapper(f, upgrade_function): - @wraps(f) - async def wrapper(request, *args, **kwargs): - ws = await upgrade_function(request) - try: - await f(request, ws, *args, **kwargs) - except OSError as exc: - if exc.errno not in MUTED_SOCKET_ERRORS: # pragma: no cover - raise - except WebSocketError: - pass - except Exception as exc: - print_exception(exc) - finally: # pragma: no cover - try: - await ws.close() - except Exception: - pass - return Response.already_handled - return wrapper - - -def with_websocket(f): - """Decorator to make a route a WebSocket endpoint. - - This decorator is used to define a route that accepts websocket - connections. The route then receives a websocket object as a second - argument that it can use to send and receive messages:: - - @app.route('/echo') - @with_websocket - async def echo(request, ws): - while True: - message = await ws.receive() - await ws.send(message) - """ - return websocket_wrapper(f, websocket_upgrade) diff --git a/lib/utemplate/__init__.py b/lib/utemplate/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/utemplate/compiled.py b/lib/utemplate/compiled.py deleted file mode 100644 index 006e6f5..0000000 --- a/lib/utemplate/compiled.py +++ /dev/null @@ -1,14 +0,0 @@ -class Loader: - - def __init__(self, pkg, dir): - if dir == ".": - dir = "" - else: - dir = dir.replace("/", ".") + "." - if pkg and pkg != "__main__": - dir = pkg + "." + dir - self.p = dir - - def load(self, name): - name = name.replace(".", "_") - return __import__(self.p + name, None, None, (name,)).render \ No newline at end of file diff --git a/lib/utemplate/recompile.py b/lib/utemplate/recompile.py deleted file mode 100644 index b9bae4e..0000000 --- a/lib/utemplate/recompile.py +++ /dev/null @@ -1,21 +0,0 @@ -# (c) 2014-2020 Paul Sokolovsky. MIT license. -try: - from uos import stat, remove -except: - from os import stat, remove -from . import source - - -class Loader(source.Loader): - - def load(self, name): - o_path = self.pkg_path + self.compiled_path(name) - i_path = self.pkg_path + self.dir + "/" + name - try: - o_stat = stat(o_path) - i_stat = stat(i_path) - if i_stat[8] > o_stat[8]: - # input file is newer, remove output to force recompile - remove(o_path) - finally: - return super().load(name) \ No newline at end of file diff --git a/lib/utemplate/source.py b/lib/utemplate/source.py deleted file mode 100644 index 0ff4651..0000000 --- a/lib/utemplate/source.py +++ /dev/null @@ -1,188 +0,0 @@ -# (c) 2014-2019 Paul Sokolovsky. MIT license. -from . import compiled - - -class Compiler: - - START_CHAR = "{" - STMNT = "%" - STMNT_END = "%}" - EXPR = "{" - EXPR_END = "}}" - - def __init__(self, file_in, file_out, indent=0, seq=0, loader=None): - self.file_in = file_in - self.file_out = file_out - self.loader = loader - self.seq = seq - self._indent = indent - self.stack = [] - self.in_literal = False - self.flushed_header = False - self.args = "*a, **d" - - def indent(self, adjust=0): - if not self.flushed_header: - self.flushed_header = True - self.indent() - self.file_out.write("def render%s(%s):\n" % (str(self.seq) if self.seq else "", self.args)) - self.stack.append("def") - self.file_out.write(" " * (len(self.stack) + self._indent + adjust)) - - def literal(self, s): - if not s: - return - if not self.in_literal: - self.indent() - self.file_out.write('yield """') - self.in_literal = True - self.file_out.write(s.replace('"', '\\"')) - - def close_literal(self): - if self.in_literal: - self.file_out.write('"""\n') - self.in_literal = False - - def render_expr(self, e): - self.indent() - self.file_out.write('yield str(' + e + ')\n') - - def parse_statement(self, stmt): - tokens = stmt.split(None, 1) - if tokens[0] == "args": - if len(tokens) > 1: - self.args = tokens[1] - else: - self.args = "" - elif tokens[0] == "set": - self.indent() - self.file_out.write(stmt[3:].strip() + "\n") - elif tokens[0] == "include": - if not self.flushed_header: - # If there was no other output, we still need a header now - self.indent() - tokens = tokens[1].split(None, 1) - args = "" - if len(tokens) > 1: - args = tokens[1] - if tokens[0][0] == "{": - self.indent() - # "1" as fromlist param is uPy hack - self.file_out.write('_ = __import__(%s.replace(".", "_"), None, None, 1)\n' % tokens[0][2:-2]) - self.indent() - self.file_out.write("yield from _.render(%s)\n" % args) - return - - with self.loader.input_open(tokens[0][1:-1]) as inc: - self.seq += 1 - c = Compiler(inc, self.file_out, len(self.stack) + self._indent, self.seq) - inc_id = self.seq - self.seq = c.compile() - self.indent() - self.file_out.write("yield from render%d(%s)\n" % (inc_id, args)) - elif len(tokens) > 1: - if tokens[0] == "elif": - assert self.stack[-1] == "if" - self.indent(-1) - self.file_out.write(stmt + ":\n") - else: - self.indent() - self.file_out.write(stmt + ":\n") - self.stack.append(tokens[0]) - else: - if stmt.startswith("end"): - assert self.stack[-1] == stmt[3:] - self.stack.pop(-1) - elif stmt == "else": - assert self.stack[-1] == "if" - self.indent(-1) - self.file_out.write("else:\n") - else: - assert False - - def parse_line(self, l): - while l: - start = l.find(self.START_CHAR) - if start == -1: - self.literal(l) - return - self.literal(l[:start]) - self.close_literal() - sel = l[start + 1] - #print("*%s=%s=" % (sel, EXPR)) - if sel == self.STMNT: - end = l.find(self.STMNT_END) - assert end > 0 - stmt = l[start + len(self.START_CHAR + self.STMNT):end].strip() - self.parse_statement(stmt) - end += len(self.STMNT_END) - l = l[end:] - if not self.in_literal and l == "\n": - break - elif sel == self.EXPR: - # print("EXPR") - end = l.find(self.EXPR_END) - assert end > 0 - expr = l[start + len(self.START_CHAR + self.EXPR):end].strip() - self.render_expr(expr) - end += len(self.EXPR_END) - l = l[end:] - else: - self.literal(l[start]) - l = l[start + 1:] - - def header(self): - self.file_out.write("# Autogenerated file\n") - - def compile(self): - self.header() - for l in self.file_in: - self.parse_line(l) - self.close_literal() - return self.seq - - -class Loader(compiled.Loader): - - def __init__(self, pkg, dir): - super().__init__(pkg, dir) - self.dir = dir - if pkg == "__main__": - # if pkg isn't really a package, don't bother to use it - # it means we're running from "filesystem directory", not - # from a package. - pkg = None - - self.pkg_path = "" - if pkg: - p = __import__(pkg) - if isinstance(p.__path__, str): - # uPy - self.pkg_path = p.__path__ - else: - # CPy - self.pkg_path = p.__path__[0] - self.pkg_path += "/" - - def input_open(self, template): - path = self.pkg_path + self.dir + "/" + template - return open(path) - - def compiled_path(self, template): - return self.dir + "/" + template.replace(".", "_") + ".py" - - def load(self, name): - try: - return super().load(name) - except (OSError, ImportError): - pass - - compiled_path = self.pkg_path + self.compiled_path(name) - - f_in = self.input_open(name) - f_out = open(compiled_path, "w") - c = Compiler(f_in, f_out, loader=self) - c.compile() - f_in.close() - f_out.close() - return super().load(name) \ No newline at end of file diff --git a/src/boot.py b/src/boot.py deleted file mode 100644 index 43c56e5..0000000 --- a/src/boot.py +++ /dev/null @@ -1,9 +0,0 @@ -import settings -import wifi -from settings import Settings - -s = Settings() - -name = s.get('name', 'led') -password = s.get("ap_password", "") -wifi.ap(name, password) diff --git a/src/p2p.py b/src/p2p.py deleted file mode 100644 index d73636c..0000000 --- a/src/p2p.py +++ /dev/null @@ -1,20 +0,0 @@ -import asyncio -import aioespnow -import json - -async def p2p(settings, patterns): - e = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support - e.active(True) - async for mac, msg in e: - try: - data = json.loads(msg) - except: - print(f"Failed to load espnow data {msg}") - continue - print(data) - if "names" not in data or settings.get("name") in data.get("names", []): - if "step" in settings and isinstance(settings["step"], int): - patterns.set_pattern_step(settings["step"]) - else: - settings.set_settings(data.get("settings", {}), patterns, data.get("save", False)) - print("should not print") diff --git a/src/static/main.css b/src/static/main.css deleted file mode 100644 index 15226a5..0000000 --- a/src/static/main.css +++ /dev/null @@ -1,109 +0,0 @@ -body { - font-family: Arial, sans-serif; - max-width: 600px; - margin: 0 auto; - padding: 20px; - line-height: 1.6; -} -h1 { - text-align: center; -} - -form { - margin-bottom: 20px; -} -label { - display: block; - margin-bottom: 5px; -} -input[type="text"], -input[type="submit"], -input[type="range"], -input[type="color"] { - width: 100%; - - margin-bottom: 10px; - box-sizing: border-box; -} -input[type="range"] { - -webkit-appearance: none; - appearance: none; - height: 25px; - background: #d3d3d3; - outline: none; - opacity: 0.7; - transition: opacity 0.2s; -} -input[type="range"]:hover { - opacity: 1; -} -input[type="range"]::-webkit-slider-thumb { - -webkit-appearance: none; - appearance: none; - width: 25px; - height: 25px; - background: #4caf50; - cursor: pointer; - border-radius: 50%; -} -input[type="range"]::-moz-range-thumb { - width: 25px; - height: 25px; - background: #4caf50; - cursor: pointer; - border-radius: 50%; -} -#pattern_buttons { - display: flex; - flex-wrap: wrap; - gap: 10px; - margin-bottom: 20px; -} -#pattern_buttons button { - flex: 1 0 calc(33.333% - 10px); - padding: 10px; - background-color: #4caf50; - color: white; - border: none; - cursor: pointer; - transition: background-color 0.3s; -} -#pattern_buttons button:hover { - background-color: #45a049; -} -@media (max-width: 480px) { - #pattern_buttons button { - flex: 1 0 calc(50% - 10px); - } -} -#connection-status { - width: 15px; - height: 15px; - border-radius: 50%; - display: inline-block; /* Or block, depending on where you put it */ - margin-left: 10px; /* Adjust spacing as needed */ - vertical-align: middle; /* Align with nearby text */ - background-color: grey; /* Default: Unknown */ -} - -#connection-status.connecting { - background-color: yellow; -} - -#connection-status.open { - background-color: green; -} - -#connection-status.closing, -#connection-status.closed { - background-color: red; -} - -#color_order_form label, -#color_order_form input[type="radio"] { - /* Ensures they behave as inline elements */ - display: inline-block; - /* Adds some space between them for readability */ - margin-right: 10px; - vertical-align: middle; /* Aligns them nicely if heights vary */ -} diff --git a/src/static/main.js b/src/static/main.js deleted file mode 100644 index 9e00994..0000000 --- a/src/static/main.js +++ /dev/null @@ -1,244 +0,0 @@ -let delayTimeout; -let brightnessTimeout; -let colorTimeout; -let color2Timeout; -let ws; // Variable to hold the WebSocket connection -let connectionStatusElement; // Variable to hold the connection status element - -// Function to update the connection status indicator -function updateConnectionStatus(status) { - if (!connectionStatusElement) { - connectionStatusElement = document.getElementById("connection-status"); - } - if (connectionStatusElement) { - connectionStatusElement.className = ""; // Clear existing classes - connectionStatusElement.classList.add(status); - // Optionally, you could also update text content based on status - // connectionStatusElement.textContent = status.charAt(0).toUpperCase() + status.slice(1); - } -} - -// Function to establish WebSocket connection -function connectWebSocket() { - // Determine the WebSocket URL based on the current location - const wsUrl = `ws://${window.location.host}/ws`; - ws = new WebSocket(wsUrl); - - updateConnectionStatus("connecting"); // Indicate connecting state - - ws.onopen = function (event) { - console.log("WebSocket connection opened:", event); - updateConnectionStatus("open"); // Indicate open state - // Optionally, you could send an initial message here - }; - - ws.onmessage = function (event) { - console.log("WebSocket message received:", event.data); - }; - - ws.onerror = function (event) { - console.error("WebSocket error:", event); - updateConnectionStatus("closed"); // Indicate error state (treat as closed) - }; - - ws.onclose = function (event) { - if (event.wasClean) { - console.log( - `WebSocket connection closed cleanly, code=${event.code}, reason=${event.reason}`, - ); - updateConnectionStatus("closed"); // Indicate closed state - } else { - console.error("WebSocket connection died"); - updateConnectionStatus("closed"); // Indicate closed state - } - // Attempt to reconnect after a delay - setTimeout(connectWebSocket, 1000); - }; -} - -// Function to send data over WebSocket -function sendWebSocketData(data) { - if (ws && ws.readyState === WebSocket.OPEN) { - console.log("Sending data over WebSocket:", data); - ws.send(JSON.stringify(data)); - } else { - console.error("WebSocket is not connected. Cannot send data:", data); - // You might want to queue messages or handle this in a different way - } -} - -// Keep the post and get functions for now, they might still be useful -async function post(path, data) { - console.log(`POST to ${path}`, data); - try { - const response = await fetch(path, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(data), - }); - if (!response.ok) { - throw new Error(`HTTP error! Status: ${response.status}`); - } - } catch (error) { - console.error("Error during POST request:", error); - } -} - -async function get(path) { - try { - const response = await fetch(path); - if (!response.ok) { - throw new Error(`HTTP error! Status: ${response.status}`); - } - return await response.json(); - } catch (error) { - console.error("Error during GET request:", error); - } -} - -async function updateColor(event) { - event.preventDefault(); - clearTimeout(colorTimeout); - colorTimeout = setTimeout(function () { - const color = document.getElementById("color").value; - sendWebSocketData({ color1: color }); - }, 500); -} - -async function updateColor2(event) { - event.preventDefault(); - clearTimeout(color2Timeout); - color2Timeout = setTimeout(function () { - const color = document.getElementById("color2").value; - sendWebSocketData({ color2: color }); - }, 500); -} - -async function updatePattern(pattern) { - sendWebSocketData({ pattern: pattern }); -} - -async function updateBrightness(event) { - event.preventDefault(); - clearTimeout(brightnessTimeout); - brightnessTimeout = setTimeout(function () { - const brightness = document.getElementById("brightness").value; - sendWebSocketData({ brightness: brightness }); - }, 500); -} - -async function updateDelay(event) { - event.preventDefault(); - clearTimeout(delayTimeout); - delayTimeout = setTimeout(function () { - const delay = document.getElementById("delay").value; - sendWebSocketData({ delay: delay }); - }, 500); -} - -async function updateNumLeds(event) { - event.preventDefault(); - const numLeds = document.getElementById("num_leds").value; - sendWebSocketData({ num_leds: parseInt(numLeds) }); -} - -async function updateName(event) { - event.preventDefault(); - const name = document.getElementById("name").value; - sendWebSocketData({ name: name }); -} - -async function updateID(event) { - event.preventDefault(); - const id = document.getElementById("id").value; - sendWebSocketData({ id: parseInt(id) }); -} - -async function updateLedPin(event) { - event.preventDefault(); - const ledpin = document.getElementById("led_pin").value; - sendWebSocketData({ led_pin: parseInt(ledpin) }); -} - -function handleRadioChange(event) { - event.preventDefault(); - console.log("Selected color order:", event.target.value); - // Add your specific logic here - if (event.target.value === "rgb") { - console.log("RGB order selected!"); - } else if (event.target.value === "rbg") { - console.log("RBG order selected!"); - } - sendWebSocketData({ color_order: event.target.value }); -} - -function createPatternButtons(patterns) { - const container = document.getElementById("pattern_buttons"); - container.innerHTML = ""; // Clear previous buttons - - patterns.forEach((pattern) => { - const button = document.createElement("button"); - button.type = "button"; - button.textContent = pattern; - button.value = pattern; - button.addEventListener("click", async function (event) { - event.preventDefault(); - await updatePattern(pattern); - }); - container.appendChild(button); - }); -} - -document.addEventListener("DOMContentLoaded", async function () { - // Get the connection status element once the DOM is ready - connectionStatusElement = document.getElementById("connection-status"); - - // Establish WebSocket connection on page load - connectWebSocket(); - - document.getElementById("color").addEventListener("input", updateColor); - document.getElementById("color2").addEventListener("input", updateColor2); - document.getElementById("delay").addEventListener("input", updateDelay); - document - .getElementById("brightness") - .addEventListener("input", updateBrightness); - document - .getElementById("num_leds_form") - .addEventListener("submit", updateNumLeds); - document.getElementById("name_form").addEventListener("submit", updateName); - document.getElementById("id_form").addEventListener("submit", updateID); - document - .getElementById("led_pin_form") - .addEventListener("submit", updateLedPin); - document.getElementById("delay").addEventListener("touchend", updateDelay); - document - .getElementById("brightness") - .addEventListener("touchend", updateBrightness); - - document.getElementById("rgb").addEventListener("change", handleRadioChange); - document.getElementById("rbg").addEventListener("change", handleRadioChange); - document.querySelectorAll(".pattern_button").forEach((button) => { - console.log(button.value); - button.addEventListener("click", async (event) => { - event.preventDefault(); - await updatePattern(button.value); - }); - }); -}); - -// Function to toggle the display of the settings menu -function selectSettings() { - const settingsMenu = document.getElementById("settings_menu"); - controls = document.getElementById("controls"); - settingsMenu.style.display = "block"; - controls.style.display = "none"; -} - -function selectControls() { - const settingsMenu = document.getElementById("settings_menu"); - controls = document.getElementById("controls"); - settingsMenu.style.display = "none"; - controls.style.display = "block"; -} diff --git a/src/templates/index.html b/src/templates/index.html deleted file mode 100644 index 4e1a112..0000000 --- a/src/templates/index.html +++ /dev/null @@ -1,124 +0,0 @@ -{% args settings, patterns, mac %} - - - - - - {{settings['name']}} - - - - -

{{settings['name']}}

- - - - -
-
- {% for p in patterns %} - - {% endfor %} - - -
-
- - -
-
- - -
-
- -
-
- -
-
- - - - -
- - diff --git a/src/web.py b/src/web.py deleted file mode 100644 index a66c4f0..0000000 --- a/src/web.py +++ /dev/null @@ -1,43 +0,0 @@ -from microdot import Microdot, send_file, Response -from microdot.utemplate import Template -from microdot.websocket import with_websocket -import machine -import wifi -import json - -def web(settings, patterns): - app = Microdot() - Response.default_content_type = 'text/html' - - @app.route('/') - async def index_hnadler(request): - mac = wifi.get_mac().hex() - return Template('/index.html').render(settings=settings, patterns=patterns.patterns.keys(), mac=mac) - - @app.route("/static/") - def static_handler(request, path): - if '..' in path: - # Directory traversal is not allowed - return 'Not found', 404 - return send_file('static/' + path) - - @app.post("/settings") - def settings_handler(request): - # Keep the POST handler for compatibility or alternative usage if needed - # For WebSocket updates, the /ws handler is now primary - return settings.set_settings(request.body.decode('utf-8'), patterns) - - @app.route("/ws") - @with_websocket - async def ws(request, ws): - while True: - data = await ws.receive() - if data: - - # Process the received data - _, status_code = settings.set_settings(json.loads(data), patterns, True) - #await ws.send(status_code) - else: - break - - return app diff --git a/src/wifi.py b/src/wifi.py deleted file mode 100644 index 2ca86aa..0000000 --- a/src/wifi.py +++ /dev/null @@ -1,39 +0,0 @@ -import network -from time import sleep - -def connect(ssid, password, ip, gateway): - - try: - sta_if = network.WLAN(network.STA_IF) - if not sta_if.isconnected(): - if ssid == "" or password == "": - print("Missing ssid or password") - return None - if ip != "" and gateway != "": - sta_if.ifconfig((ip, '255.255.255.0', gateway, '1.1.1.1')) - print('connecting to network...') - sta_if.active(True) - sta_if.connect(ssid, password) - sleep(0.1) - if sta_if.isconnected(): - return sta_if.ifconfig() - return None - return sta_if.ifconfig() - except Exception as e: - print(f"Failed to connect to wifi {e}") - return None - - -def ap(ssid, password): - ap_if = network.WLAN(network.AP_IF) - ap_mac = ap_if.config('mac') - print(ssid) - ap_if.active(True) - ap_if.config(essid=ssid, password=password) - ap_if.active(False) - ap_if.active(True) - print(ap_if.ifconfig()) - -def get_mac(): - ap_if = network.WLAN(network.AP_IF) - return ap_if.config('mac')