Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement wait_for_complete() for Speaker class #471

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deepgram/audio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
CHANNELS as OUTPUT_CHANNELS,
RATE as OUTPUT_RATE,
CHUNK as OUTPUT_CHUNK,
PLAYBACK_DELTA as OUTPUT_PLAYBACK_DELTA,
)
2 changes: 1 addition & 1 deletion deepgram/audio/speaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

from .speaker import Speaker
from .errors import DeepgramSpeakerError
from .constants import LOGGING, CHANNELS, RATE, CHUNK
from .constants import LOGGING, CHANNELS, RATE, CHUNK, PLAYBACK_DELTA
5 changes: 4 additions & 1 deletion deepgram/audio/speaker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

from ...utils import verboselogs

# Constants for microphone
# Constants for speaker
LOGGING = verboselogs.WARNING
TIMEOUT = 0.050
CHANNELS = 1
RATE = 16000
CHUNK = 8194

davidvonthenen marked this conversation as resolved.
Show resolved Hide resolved
# Constants for speaker
PLAYBACK_DELTA = 2000
53 changes: 52 additions & 1 deletion deepgram/audio/speaker/speaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
import threading
from typing import Optional, Callable, Union, TYPE_CHECKING
import logging
from datetime import datetime

import websockets

from ...utils import verboselogs
from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT
from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT, PLAYBACK_DELTA

if TYPE_CHECKING:
import pyaudio

HALF_SECOND = 0.5


class Speaker: # pylint: disable=too-many-instance-attributes
"""
Expand All @@ -33,6 +36,11 @@ class Speaker: # pylint: disable=too-many-instance-attributes
_channels: int
_output_device_index: Optional[int] = None

# last time we received audio
_last_datagram: datetime = datetime.now()
_last_play_delta_in_ms: int
_lock_wait: threading.Lock

_queue: queue.Queue
_exit: threading.Event

Expand All @@ -56,6 +64,7 @@ def __init__(
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
last_play_delta_in_ms: int = PLAYBACK_DELTA,
davidvonthenen marked this conversation as resolved.
Show resolved Hide resolved
output_device_index: Optional[int] = None,
): # pylint: disable=too-many-positional-arguments
# dynamic import of pyaudio as not to force the requirements on the SDK (and users)
Expand All @@ -68,11 +77,15 @@ def __init__(
self._exit = threading.Event()
self._queue = queue.Queue()

self._last_datagram = datetime.now()
self._lock_wait = threading.Lock()

self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = pyaudio.paInt16
self._channels = channels
self._last_play_delta_in_ms = last_play_delta_in_ms
self._output_device_index = output_device_index

self._push_callback_org = push_callback
Expand Down Expand Up @@ -192,6 +205,42 @@ def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool

return True

def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.debug("Speaker.wait_for_complete ENTER")

delta_in_ms = float(self._last_play_delta_in_ms)
self._logger.debug("Last Play delta: %f", delta_in_ms)

# set to now
with self._lock_wait:
self._last_datagram = datetime.now()

while True:
# sleep for a bit
self._exit.wait(HALF_SECOND)

# check if we should exit
if self._exit.is_set():
self._logger.debug("Exiting wait_for_complete _exit is set")
break

# check the time
with self._lock_wait:
delta = datetime.now() - self._last_datagram
diff_in_ms = delta.total_seconds() * 1000
if diff_in_ms < delta_in_ms:
self._logger.debug("LastPlay delta is less than threshold")
continue

# if we get here, we are done playing audio
self._logger.debug("LastPlay delta is greater than threshold. Exit wait!")
break

self._logger.debug("Speaker.wait_for_complete LEAVE")

def _start_receiver(self):
# Check if the socket is an asyncio WebSocket
if inspect.iscoroutinefunction(self._pull_callback_org):
Expand Down Expand Up @@ -315,6 +364,8 @@ def _play(self, audio_out, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
with self._lock_wait:
self._last_datagram = datetime.now()
stream.write(data)
except queue.Empty:
pass
Expand Down
24 changes: 23 additions & 1 deletion deepgram/clients/speak/v1/websocket/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .options import SpeakWSOptions

from .....audio.speaker import Speaker, RATE, CHANNELS
from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA

ONE_SECOND = 1
HALF_SECOND = 0.5
Expand Down Expand Up @@ -93,6 +93,11 @@ def __init__(self, config: DeepgramClientOptions):
channels = self._config.options.get("speaker_playback_channels")
if channels is None:
channels = CHANNELS
playback_delta_in_ms = self._config.options.get(
"speaker_playback_delta_in_ms"
)
if playback_delta_in_ms is None:
playback_delta_in_ms = PLAYBACK_DELTA
davidvonthenen marked this conversation as resolved.
Show resolved Hide resolved
device_index = self._config.options.get("speaker_playback_device_index")

self._logger.debug("rate: %s", rate)
Expand All @@ -103,13 +108,15 @@ def __init__(self, config: DeepgramClientOptions):
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
output_device_index=device_index,
)
else:
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
)

Expand Down Expand Up @@ -590,6 +597,21 @@ async def clear(self) -> bool:

return True

async def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER")

if self._speaker is None:
self._logger.error("speaker is None. Return immediately")
return

loop = asyncio.get_event_loop()
davidvonthenen marked this conversation as resolved.
Show resolved Hide resolved
await loop.run_in_executor(None, self._speaker.wait_for_complete)
self._logger.notice("wait_for_complete succeeded")
self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")

async def _close_message(self) -> bool:
return await self.send_control(SpeakWebSocketMessage.Close)

Expand Down
23 changes: 22 additions & 1 deletion deepgram/clients/speak/v1/websocket/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from .options import SpeakWSOptions

from .....audio.speaker import Speaker, RATE, CHANNELS
from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA

ONE_SECOND = 1
HALF_SECOND = 0.5
Expand Down Expand Up @@ -96,6 +96,11 @@ def __init__(self, config: DeepgramClientOptions):
channels = self._config.options.get("speaker_playback_channels")
if channels is None:
channels = CHANNELS
playback_delta_in_ms = self._config.options.get(
"speaker_playback_delta_in_ms"
)
if playback_delta_in_ms is None:
playback_delta_in_ms = PLAYBACK_DELTA
davidvonthenen marked this conversation as resolved.
Show resolved Hide resolved
device_index = self._config.options.get("speaker_playback_device_index")

self._logger.debug("rate: %s", rate)
Expand All @@ -106,13 +111,15 @@ def __init__(self, config: DeepgramClientOptions):
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
output_device_index=device_index,
)
else:
self._speaker = Speaker(
rate=rate,
channels=channels,
last_play_delta_in_ms=playback_delta_in_ms,
verbose=self._config.verbose,
)

Expand Down Expand Up @@ -589,6 +596,20 @@ def clear(self) -> bool:

return True

def wait_for_complete(self):
"""
This method will block until the speak is done playing sound.
"""
self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER")

if self._speaker is None:
self._logger.error("speaker is None. Return immediately")
raise DeepgramError("Speaker is not initialized")

self._speaker.wait_for_complete()
self._logger.notice("wait_for_complete succeeded")
self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")

def _close_message(self) -> bool:
return self.send_control(SpeakWebSocketMessage.Close)

Expand Down
10 changes: 7 additions & 3 deletions examples/text-to-speech/websocket/async_complete/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ async def main():

# example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM
config: DeepgramClientOptions = DeepgramClientOptions(
options={"auto_flush_speak_delta": "500", "speaker_playback": "true"},
# verbose=verboselogs.SPAM,
options={
# "auto_flush_speak_delta": "500",
"speaker_playback": "true"
},
verbose=verboselogs.SPAM,
)
deepgram: DeepgramClient = DeepgramClient("", config)

Expand Down Expand Up @@ -99,11 +102,12 @@ async def on_unhandled(self, unhandled, **kwargs):

# send the text to Deepgram
await dg_connection.send_text(TTS_TEXT)

# if auto_flush_speak_delta is not used, you must flush the connection by calling flush()
await dg_connection.flush()

# Indicate that we've finished
await asyncio.sleep(7)
await dg_connection.wait_for_complete()

# Close the connection
await dg_connection.finish()
Expand Down
4 changes: 3 additions & 1 deletion examples/text-to-speech/websocket/complete/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ def on_unhandled(self, unhandled, **kwargs):

# send the text to Deepgram
dg_connection.send_text(TTS_TEXT)

# if auto_flush_speak_delta is not used, you must flush the connection by calling flush()
dg_connection.flush()

# Indicate that we've finished
time.sleep(5)
dg_connection.wait_for_complete()

print("\n\nPress Enter to stop...\n\n")
input()

Expand Down
Loading