-
Notifications
You must be signed in to change notification settings - Fork 0
/
gpt_elevenlabs.py
133 lines (105 loc) · 4.12 KB
/
gpt_elevenlabs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
import websockets
import json
import base64
import shutil
import os
import subprocess
from openai import AsyncOpenAI
import asyncio
# Define API keys and voice ID
OPENAI_API_KEY = 'sk-'
ELEVENLABS_API_KEY = ''
VOICE_ID = ''
# Set OpenAI API key
aclient = AsyncOpenAI(api_key=OPENAI_API_KEY)
def is_installed(lib_name):
return shutil.which(lib_name) is not None
async def text_chunker(chunks):
"""Split text into chunks, ensuring to not break sentences."""
splitters = (".", ",", "?", "!", ";", ":", "—", "-", "(", ")", "[", "]", "}", " ")
buffer = ""
async for text in chunks:
if text is None: # Skip if text is None
continue
if buffer.endswith(splitters):
yield buffer + " "
buffer = text
elif text.startswith(splitters):
yield buffer + text[0] + " "
buffer = text[1:]
else:
buffer += text
if buffer:
yield buffer + " "
async def stream(audio_stream):
"""Stream audio data using mpv player."""
if not is_installed("mpv"):
raise ValueError(
"mpv not found, necessary to stream audio. "
"Install instructions: https://mpv.io/installation/"
)
mpv_process = subprocess.Popen(
["mpv", "--no-cache", "--no-terminal", "--", "fd://0"],
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
print("Started streaming audio")
async for chunk in audio_stream:
if chunk:
mpv_process.stdin.write(chunk)
mpv_process.stdin.flush()
if mpv_process.stdin:
mpv_process.stdin.close()
mpv_process.wait()
async def text_to_speech_input_streaming(voice_id, text_iterator):
"""Send text to ElevenLabs API and stream the returned audio."""
uri = f"wss://api.elevenlabs.io/v1/text-to-speech/{voice_id}/stream-input?model_id=eleven_monolingual_v1"
async with websockets.connect(uri) as websocket:
await websocket.send(json.dumps({
"text": " ",
"voice_settings": {"stability": 0.5, "similarity_boost": 0.8},
"xi_api_key": ELEVENLABS_API_KEY,
}))
async def listen():
"""Listen to the websocket for audio data and stream it."""
while True:
try:
message = await websocket.recv()
data = json.loads(message)
if data.get("audio"):
yield base64.b64decode(data["audio"])
elif data.get('isFinal'):
break
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
break
# listen_task = asyncio.create_task(store_audio(listen()))
listen_task = asyncio.create_task(stream(listen()))
async for text in text_chunker(text_iterator):
await websocket.send(json.dumps({"text": text, "try_trigger_generation": True}))
await websocket.send(json.dumps({"text": ""}))
await listen_task
async def chat_completion(query):
"""Retrieve text from OpenAI and pass it to the text-to-speech function."""
response = await aclient.chat.completions.create(model='gpt-4', messages=[{'role': 'user', 'content': query}],
temperature=1, stream=True)
async def text_iterator():
async for chunk in response:
delta = chunk.choices[0].delta
if delta and delta.content:
yield delta.content
await text_to_speech_input_streaming(VOICE_ID, text_iterator())
# Main execution
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
while True:
query = input("Enter your query or 'exit' to quit: ")
if query.lower() == 'exit':
break
try:
loop.run_until_complete(chat_completion(query))
except Exception as e:
print(f"An error occurred: {e}")
finally:
loop.close()