#!/usr/bin/env python3 import argparse import queue import sys import json import asyncio import sounddevice as sd from termcolor import colored import aiohttp import aiofiles import vlc from time import sleep from vosk import Model, KaldiRecognizer q = queue.Queue() def int_or_str(text): """Helper function for argument parsing.""" try: return int(text) except ValueError: return text def callback(indata, frames, time, status): """This is called (from a separate thread) for each audio block.""" if status: print(status, file=sys.stderr) q.put(bytes(indata)) async def main(): vlc_instance = vlc.Instance() player = vlc_instance.media_player_new() media = vlc_instance.media_new("image.png") player.set_media(media) parser = argparse.ArgumentParser(add_help=False) parser.add_argument( "-l", "--list-devices", action="store_true", help="show list of audio devices and exit") args, remaining = parser.parse_known_args() if args.list_devices: print(sd.query_devices()) parser.exit(0) parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, parents=[parser]) parser.add_argument( "-f", "--filename", type=str, metavar="FILENAME", help="audio file to store recording to") parser.add_argument( "-d", "--device", type=int_or_str, help="input device (numeric ID or substring)") parser.add_argument( "-r", "--samplerate", type=int, help="sampling rate") args = parser.parse_args(remaining) try: if args.samplerate is None: device_info = sd.query_devices(args.device, "input") # soundfile expects an int, sounddevice provides a float: args.samplerate = int(device_info["default_samplerate"]) model = Model(lang="en-us") if args.filename: dump_fn = open(args.filename, "wb") else: dump_fn = None with sd.RawInputStream(samplerate=args.samplerate, blocksize = 8000, device=args.device, dtype="int16", channels=1, callback=callback): print("#" * 80) print("Press Ctrl+C to stop the recording") print("#" * 80) rec = KaldiRecognizer(model, args.samplerate) while True: data = q.get() if rec.AcceptWaveform(data): print(rec.Result()) j = json.loads(rec.Result()) if "text" in j and "result" in j: n = 0 for word in j["result"]: n += float(word["conf"]) if float(word["conf"]) > 0.7: print(colored(word["word"], "green"), end=" ") elif float(word["conf"]) > 0.5: print(colored(word["word"], "yellow"), end=" ") else: print(colored(word["word"], "red"), end=" ") print(n/len(j["result"])) print("Generating Image") if len(j["result"]) > 2: async with aiohttp.ClientSession() as session: url = f'http://192.168.1.95:8000?text={j["text"].replace(" ", "+")}' async with session.get(url) as resp: print(resp.status) if resp.status == 200: f = await aiofiles.open('image.png', mode='wb') await f.write(await resp.read()) await f.close() print("Image generated") player.stop() player.play() sleep(1) player.pause() # else: # print(rec.PartialResult()) if dump_fn is not None: dump_fn.write(data) except KeyboardInterrupt: print("\nDone") parser.exit(0) except Exception as e: parser.exit(type(e).__name__ + ": " + str(e)) if __name__ == '__main__': asyncio.run(main())