speechtoimage/pi/local.py

124 lines
4.3 KiB
Python

#!/usr/bin/env python3
import argparse
import queue
import sys
import json
import asyncio
import sounddevice as sd
from termcolor import colored
import aiohttp
import aiofiles
import vlc
from time import sleep
from vosk import Model, KaldiRecognizer
q = queue.Queue()
def int_or_str(text):
"""Helper function for argument parsing."""
try:
return int(text)
except ValueError:
return text
def callback(indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
q.put(bytes(indata))
async def main():
vlc_instance = vlc.Instance()
player = vlc_instance.media_player_new()
media = vlc_instance.media_new("image.png")
player.set_media(media)
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
"-l", "--list-devices", action="store_true",
help="show list of audio devices and exit")
args, remaining = parser.parse_known_args()
if args.list_devices:
print(sd.query_devices())
parser.exit(0)
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
parents=[parser])
parser.add_argument(
"-f", "--filename", type=str, metavar="FILENAME",
help="audio file to store recording to")
parser.add_argument(
"-d", "--device", type=int_or_str,
help="input device (numeric ID or substring)")
parser.add_argument(
"-r", "--samplerate", type=int, help="sampling rate")
args = parser.parse_args(remaining)
try:
if args.samplerate is None:
device_info = sd.query_devices(args.device, "input")
# soundfile expects an int, sounddevice provides a float:
args.samplerate = int(device_info["default_samplerate"])
model = Model(lang="en-us")
if args.filename:
dump_fn = open(args.filename, "wb")
else:
dump_fn = None
with sd.RawInputStream(samplerate=args.samplerate, blocksize = 8000, device=args.device,
dtype="int16", channels=1, callback=callback):
print("#" * 80)
print("Press Ctrl+C to stop the recording")
print("#" * 80)
rec = KaldiRecognizer(model, args.samplerate)
while True:
data = q.get()
if rec.AcceptWaveform(data):
print(rec.Result())
j = json.loads(rec.Result())
if "text" in j and "result" in j:
n = 0
for word in j["result"]:
n += float(word["conf"])
if float(word["conf"]) > 0.7:
print(colored(word["word"], "green"), end=" ")
elif float(word["conf"]) > 0.5:
print(colored(word["word"], "yellow"), end=" ")
else:
print(colored(word["word"], "red"), end=" ")
print(n/len(j["result"]))
print("Generating Image")
if len(j["result"]) > 2:
async with aiohttp.ClientSession() as session:
url = f'http://192.168.1.95:8000?text={j["text"].replace(" ", "+")}'
async with session.get(url) as resp:
print(resp.status)
if resp.status == 200:
f = await aiofiles.open('image.png', mode='wb')
await f.write(await resp.read())
await f.close()
print("Image generated")
player.stop()
player.play()
sleep(1)
player.pause()
# else:
# print(rec.PartialResult())
if dump_fn is not None:
dump_fn.write(data)
except KeyboardInterrupt:
print("\nDone")
parser.exit(0)
except Exception as e:
parser.exit(type(e).__name__ + ": " + str(e))
if __name__ == '__main__':
asyncio.run(main())