forked from matatonic/openedai-speech
-
Notifications
You must be signed in to change notification settings - Fork 0
/
audio_reader.py
executable file
·127 lines (105 loc) · 3.63 KB
/
audio_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
try:
import dotenv
dotenv.load_dotenv()
except ImportError:
pass
import argparse
import os
import pysbd
import queue
import sys
import tempfile
import threading
import shutil
import sys
import tempfile
import contextlib
import openai
try:
from playsound import playsound
except ImportError:
print("Error: missing required package 'playsound'. !pip install playsound")
sys.exit(1)
@contextlib.contextmanager
def tempdir():
path = tempfile.mkdtemp()
try:
yield path
finally:
try:
shutil.rmtree(path)
except IOError:
sys.stderr.write('Failed to clean up temp dir {}'.format(path))
class SimpleAudioPlayer:
def __init__(self):
self._queue = queue.Queue()
self.running = True
self._thread = threading.Thread(target=self.__play_audio_loop, daemon=True)
self._thread.start()
def put(self, file):
self._queue.put(file)
def stop(self):
self.running = False
self._thread.join()
try:
while True:
file = self._queue.get_nowait()
if os.path.exists(file):
os.unlink(file)
except queue.Empty as e:
pass
def __play_audio_loop(self):
while self.running:
try:
while True:
file = self._queue.get(block=True, timeout=0.01)
try:
playsound(file)
finally:
os.unlink(file)
except queue.Empty as e:
continue
class OpenAI_tts:
def __init__(self, model, voice, speed, base_dir):
self.base_dir = base_dir
self.openai_client = openai.OpenAI(
# export OPENAI_API_KEY=sk-11111111111
# export OPENAI_BASE_URL=http://localhost:8000/v1
api_key = os.environ.get("OPENAI_API_KEY", "sk-ip"),
base_url = os.environ.get("OPENAI_BASE_URL", "http://localhost:8000/v1"),
)
self.params = {
'model': model,
'voice': voice,
'speed': speed
}
def speech_to_file(self, text: str) -> None:
with self.openai_client.audio.speech.with_streaming_response.create(
input=text, response_format='opus', **self.params
) as response:
tf, output_filename = tempfile.mkstemp(suffix='.wav', prefix="audio_reader_", dir=self.base_dir)
response.stream_to_file(output_filename)
return output_filename
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Text to speech player',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-m', '--model', action='store', default="tts-1", help="The OpenAI model")
parser.add_argument('-v', '--voice', action='store', default="alloy", help="The voice to use")
parser.add_argument('-s', '--speed', action='store', default=1.0, help="How fast to read the audio")
args = parser.parse_args()
try:
with tempdir() as base_dir:
player = SimpleAudioPlayer()
reader = OpenAI_tts(voice=args.voice, model=args.model, speed=args.speed, base_dir=base_dir)
seg = pysbd.Segmenter(language='en', clean=True) # text is dirty, clean it up.
for raw_line in sys.stdin:
for line in seg.segment(raw_line):
if not line:
continue
print(line)
player.put(reader.speech_to_file(line))
player.stop()
except KeyboardInterrupt:
pass