"""Minimal MicroPython-style machine module mock for browser simulation.""" import json _PIN_MODE_OUT = 1 _PIN_MODE_IN = 2 _PIN_MODE_PWM = 4 def _pin_view_get(name): try: import js return getattr(js, name, None) except Exception: return None def _sab_isolated(): """True when the worker has a real SharedArrayBuffer for pin/ADC views. Set by `pyodide-worker.js` at init time. When false, output updates must travel through `print("[pin-out]…")` markers because the main thread can't read worker-local arrays.""" try: import js return bool(getattr(js, "__sab_isolated", False)) except Exception: return False def _pin_out_publish(pin_id, ui_mode, value): """Pack [ui_mode (high byte) | value (low 24 bits)] for the UI panel.""" view = _pin_view_get("__pin_out_view") if view is not None: packed = ((int(ui_mode) & 0xFF) << 24) | (int(value) & 0x00FFFFFF) try: import js js.Atomics.store(view, int(pin_id), packed) except Exception: try: view[int(pin_id)] = packed except Exception: pass if not _sab_isolated(): # Fallback for non-isolated contexts (e.g. mobile over LAN IP): # the main thread reads `[pin-out]` lines from stdout and updates # the Pins panel from there. Slower than SAB but works everywhere. try: print( "[pin-out]" + json.dumps( {"pin": int(pin_id), "mode": int(ui_mode), "value": int(value)} ) ) except Exception: pass def _pin_in_read(pin_id): view = _pin_view_get("__pin_in_view") if view is None: return 0 try: import js return int(js.Atomics.load(view, int(pin_id))) & 1 except Exception: try: return int(view[int(pin_id)]) & 1 except Exception: return 0 def _pin_register(pin_id, ui_mode, **extra): payload = {"pin": int(pin_id), "mode": int(ui_mode)} payload.update({k: v for k, v in extra.items() if v is not None}) try: print("[pin-register]" + json.dumps(payload)) except Exception: pass class Pin: """Browser-simulated `machine.Pin`. A control row appears in the editor UI when a pin is constructed: * `Pin.OUT` pins show a live indicator that mirrors `value()`. * `Pin.IN` pins show a toggle button you can click — the next call to `value()` returns 0 or 1 accordingly. """ IN = 0 OUT = 1 PULL_UP = 2 PULL_DOWN = 3 OPEN_DRAIN = 7 IRQ_FALLING = 1 IRQ_RISING = 2 def __init__(self, pin_id, mode=OUT, pull=-1, value=None, **_kwargs): self.id = int(pin_id) self.mode = int(mode) self.pull = int(pull) if pull != -1 else -1 self._value = 1 if value else 0 self._irq_handler = None self._irq_trigger = 0 self._last_input = 0 self._publish() def _ui_mode(self): if self.mode == self.IN: return _PIN_MODE_IN return _PIN_MODE_OUT def _publish(self): ui_mode = self._ui_mode() _pin_register(self.id, ui_mode, pull=self.pull if self.pull != -1 else None) if ui_mode == _PIN_MODE_OUT: _pin_out_publish(self.id, ui_mode, self._value) def init(self, mode=-1, pull=-1, value=None): if mode != -1: self.mode = int(mode) if pull != -1: self.pull = int(pull) if value is not None: self._value = 1 if int(value) else 0 self._publish() def value(self, new_value=None): if new_value is None: if self.mode == self.IN: v = _pin_in_read(self.id) if self._irq_handler is not None: if v and not self._last_input and (self._irq_trigger & self.IRQ_RISING): self._fire_irq() elif not v and self._last_input and (self._irq_trigger & self.IRQ_FALLING): self._fire_irq() self._last_input = v return v return self._value v = 1 if int(new_value) else 0 self._value = v if self._ui_mode() == _PIN_MODE_OUT: _pin_out_publish(self.id, _PIN_MODE_OUT, v) return v def on(self): return self.value(1) def off(self): return self.value(0) def high(self): return self.value(1) def low(self): return self.value(0) def toggle(self): return self.value(0 if (self._value if self.mode == self.OUT else _pin_in_read(self.id)) else 1) def irq(self, handler=None, trigger=IRQ_RISING | IRQ_FALLING, **_kwargs): self._irq_handler = handler self._irq_trigger = int(trigger) return self def _fire_irq(self): if self._irq_handler is None: return try: self._irq_handler(self) except Exception as exc: try: print(f"[pin] irq handler raised: {exc!r}") except Exception: pass def __call__(self, *args): return self.value(*args) if args else self.value() class PWM: """Browser-simulated `machine.PWM`. Visualises the configured duty cycle as a bar in the Pins panel. `freq()` and `duty()`/`duty_u16()`/`duty_ns()` mirror MicroPython's API. """ def __init__(self, pin, freq=1000, duty=None, duty_u16=None, duty_ns=None): self._pin_id = _adc_pin_id(pin) self._freq = int(freq) if freq else 1000 self._duty_u16 = 0 if duty_u16 is not None: self._duty_u16 = max(0, min(65535, int(duty_u16))) elif duty is not None: self._duty_u16 = max(0, min(65535, int(duty) * 64)) elif duty_ns is not None: self._set_duty_ns(int(duty_ns)) _pin_register(self._pin_id, _PIN_MODE_PWM, freq=self._freq) _pin_out_publish(self._pin_id, _PIN_MODE_PWM, self._duty_u16) def _set_duty_ns(self, ns): period_ns = 1_000_000_000 // max(1, self._freq) if period_ns <= 0: self._duty_u16 = 0 return self._duty_u16 = max(0, min(65535, int(ns) * 65535 // period_ns)) def freq(self, value=None): if value is None: return self._freq self._freq = max(1, int(value)) _pin_register(self._pin_id, _PIN_MODE_PWM, freq=self._freq) def duty(self, value=None): if value is None: return self._duty_u16 // 64 self._duty_u16 = max(0, min(65535, int(value) * 64)) _pin_out_publish(self._pin_id, _PIN_MODE_PWM, self._duty_u16) def duty_u16(self, value=None): if value is None: return self._duty_u16 self._duty_u16 = max(0, min(65535, int(value))) _pin_out_publish(self._pin_id, _PIN_MODE_PWM, self._duty_u16) def duty_ns(self, value=None): period_ns = 1_000_000_000 // max(1, self._freq) if value is None: return (self._duty_u16 * period_ns) // 65535 self._set_duty_ns(int(value)) _pin_out_publish(self._pin_id, _PIN_MODE_PWM, self._duty_u16) def deinit(self): _pin_out_publish(self._pin_id, 0, 0) def _adc_pin_id(pin): if isinstance(pin, int): return pin return int(getattr(pin, "id", 0)) def _adc_read_raw(pin_id: int) -> int: """Read the live slider value (0..65535) shared from the editor UI. Backed by a SharedArrayBuffer so updates from the browser slider propagate instantly without the script having to yield. Falls back to 0 when the runtime is not cross-origin isolated (e.g. older browsers). """ try: import js # only available inside Pyodide view = getattr(js, "__adc_view", None) if view is None: return 0 try: return int(js.Atomics.load(view, pin_id)) & 0xFFFF except Exception: return int(view[pin_id]) & 0xFFFF except Exception: return 0 class ADC: """Browser-simulated `machine.ADC`. Exposes a slider in the editor UI for the given pin. `read_u16()` returns the slider value in the 0..65535 range (matching MicroPython's API); `read()` scales to the configured `width` (defaults to 12-bit, 0..4095). """ WIDTH_9BIT = 9 WIDTH_10BIT = 10 WIDTH_11BIT = 11 WIDTH_12BIT = 12 ATTN_0DB = 0 ATTN_2_5DB = 1 ATTN_6DB = 2 ATTN_11DB = 3 def __init__(self, pin): self._pin = _adc_pin_id(pin) self._atten = self.ATTN_11DB self._width = self.WIDTH_12BIT try: print("[adc-register]" + json.dumps({"pin": self._pin})) except Exception: pass def atten(self, attn): self._atten = int(attn) def width(self, width): self._width = int(width) def read_u16(self): return _adc_read_raw(self._pin) def read(self): bits = self._width if self._width in (9, 10, 11, 12) else 12 max_val = (1 << bits) - 1 return (_adc_read_raw(self._pin) * max_val) // 65535 def read_uv(self): """Return microvolts assuming a 0..3.3V range (rough simulation).""" return (_adc_read_raw(self._pin) * 3_300_000) // 65535 def _serial_drain_pending(buf): """Pull any bytes the editor's serial-monitor has pushed into the SAB ring.""" try: import js import base64 # noqa: F401 (kept for symmetry with write path) indices = getattr(js, "__serial_in_indices", None) data = getattr(js, "__serial_in_data", None) if indices is None or data is None: return cap = int(getattr(js, "__serial_in_capacity", 0)) if cap <= 0: return r = int(js.Atomics.load(indices, 0)) w = int(js.Atomics.load(indices, 1)) while r != w: buf.append(int(data[r]) & 0xFF) r = (r + 1) % cap js.Atomics.store(indices, 0, r) except Exception: pass class UART: """Browser-simulated `machine.UART`. A serial monitor pane appears in the editor when this is constructed. `write()` text shows up there; characters typed into the input box arrive via `read()` / `readline()` / `any()`. """ INV_TX = 1 INV_RX = 2 INV_RTS = 4 INV_CTS = 8 def __init__(self, id=0, baudrate=115200, bits=8, parity=None, stop=1, tx=None, rx=None, timeout=0, **_kwargs): self.id = int(id) if id is not None else 0 self.baudrate = int(baudrate) self.bits = int(bits) self.parity = parity self.stop = int(stop) self.tx = tx self.rx = rx self.timeout = int(timeout) if timeout is not None else 0 self._buf = bytearray() try: print("[serial-register]" + json.dumps({ "id": self.id, "baudrate": self.baudrate, })) except Exception: pass def init(self, *args, **kwargs): if args: try: self.baudrate = int(args[0]) except Exception: pass if "baudrate" in kwargs: self.baudrate = int(kwargs["baudrate"]) def deinit(self): self._buf = bytearray() def any(self): _serial_drain_pending(self._buf) return len(self._buf) def read(self, nbytes=None): _serial_drain_pending(self._buf) if not self._buf: return None if nbytes is None: out = bytes(self._buf) self._buf = bytearray() return out n = min(int(nbytes), len(self._buf)) if n <= 0: return None out = bytes(self._buf[:n]) del self._buf[:n] return out def readinto(self, buf, nbytes=None): n = int(nbytes) if nbytes is not None else len(buf) data = self.read(n) if not data: return None for i, b in enumerate(data): buf[i] = b return len(data) def readline(self): _serial_drain_pending(self._buf) if not self._buf: return None nl = self._buf.find(b"\n") if nl == -1: return None out = bytes(self._buf[: nl + 1]) del self._buf[: nl + 1] return out def write(self, data): if isinstance(data, str): buf = data.encode("utf-8") elif isinstance(data, (bytes, bytearray, memoryview)): buf = bytes(data) else: buf = bytes([int(data) & 0xFF]) try: import base64 encoded = base64.b64encode(buf).decode("ascii") print(f"[serial-out]{encoded}") except Exception: pass return len(buf) def sendbreak(self): pass def flush(self): pass def txdone(self): return True