diff --git a/deepgram/audio/__init__.py b/deepgram/audio/__init__.py index aa9b266e..dcadaa4c 100644 --- a/deepgram/audio/__init__.py +++ b/deepgram/audio/__init__.py @@ -18,4 +18,5 @@ CHANNELS as OUTPUT_CHANNELS, RATE as OUTPUT_RATE, CHUNK as OUTPUT_CHUNK, + PLAYBACK_DELTA as OUTPUT_PLAYBACK_DELTA, ) diff --git a/deepgram/audio/speaker/__init__.py b/deepgram/audio/speaker/__init__.py index e0bc56d7..f9adc8e5 100644 --- a/deepgram/audio/speaker/__init__.py +++ b/deepgram/audio/speaker/__init__.py @@ -4,4 +4,4 @@ from .speaker import Speaker from .errors import DeepgramSpeakerError -from .constants import LOGGING, CHANNELS, RATE, CHUNK +from .constants import LOGGING, CHANNELS, RATE, CHUNK, PLAYBACK_DELTA diff --git a/deepgram/audio/speaker/constants.py b/deepgram/audio/speaker/constants.py index 15c9baa0..a033c721 100644 --- a/deepgram/audio/speaker/constants.py +++ b/deepgram/audio/speaker/constants.py @@ -4,9 +4,12 @@ from ...utils import verboselogs -# Constants for microphone +# Constants for speaker LOGGING = verboselogs.WARNING TIMEOUT = 0.050 CHANNELS = 1 RATE = 16000 CHUNK = 8194 + +# Constants for speaker +PLAYBACK_DELTA = 2000 diff --git a/deepgram/audio/speaker/speaker.py b/deepgram/audio/speaker/speaker.py index 100abdde..faee51c4 100644 --- a/deepgram/audio/speaker/speaker.py +++ b/deepgram/audio/speaker/speaker.py @@ -8,15 +8,18 @@ import threading from typing import Optional, Callable, Union, TYPE_CHECKING import logging +from datetime import datetime import websockets from ...utils import verboselogs -from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT +from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT, PLAYBACK_DELTA if TYPE_CHECKING: import pyaudio +HALF_SECOND = 0.5 + class Speaker: # pylint: disable=too-many-instance-attributes """ @@ -33,6 +36,11 @@ class Speaker: # pylint: disable=too-many-instance-attributes _channels: int _output_device_index: Optional[int] = None + # last time we received audio + _last_datagram: datetime = datetime.now() + _last_play_delta_in_ms: int + _lock_wait: threading.Lock + _queue: queue.Queue _exit: threading.Event @@ -56,6 +64,7 @@ def __init__( rate: int = RATE, chunk: int = CHUNK, channels: int = CHANNELS, + last_play_delta_in_ms: int = PLAYBACK_DELTA, output_device_index: Optional[int] = None, ): # pylint: disable=too-many-positional-arguments # dynamic import of pyaudio as not to force the requirements on the SDK (and users) @@ -68,11 +77,15 @@ def __init__( self._exit = threading.Event() self._queue = queue.Queue() + self._last_datagram = datetime.now() + self._lock_wait = threading.Lock() + self._audio = pyaudio.PyAudio() self._chunk = chunk self._rate = rate self._format = pyaudio.paInt16 self._channels = channels + self._last_play_delta_in_ms = last_play_delta_in_ms self._output_device_index = output_device_index self._push_callback_org = push_callback @@ -192,6 +205,42 @@ def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool return True + def wait_for_complete(self): + """ + This method will block until the speak is done playing sound. + """ + self._logger.debug("Speaker.wait_for_complete ENTER") + + delta_in_ms = float(self._last_play_delta_in_ms) + self._logger.debug("Last Play delta: %f", delta_in_ms) + + # set to now + with self._lock_wait: + self._last_datagram = datetime.now() + + while True: + # sleep for a bit + self._exit.wait(HALF_SECOND) + + # check if we should exit + if self._exit.is_set(): + self._logger.debug("Exiting wait_for_complete _exit is set") + break + + # check the time + with self._lock_wait: + delta = datetime.now() - self._last_datagram + diff_in_ms = delta.total_seconds() * 1000 + if diff_in_ms < delta_in_ms: + self._logger.debug("LastPlay delta is less than threshold") + continue + + # if we get here, we are done playing audio + self._logger.debug("LastPlay delta is greater than threshold. Exit wait!") + break + + self._logger.debug("Speaker.wait_for_complete LEAVE") + def _start_receiver(self): # Check if the socket is an asyncio WebSocket if inspect.iscoroutinefunction(self._pull_callback_org): @@ -315,6 +364,8 @@ def _play(self, audio_out, stream, stop): while not stop.is_set(): try: data = audio_out.get(True, TIMEOUT) + with self._lock_wait: + self._last_datagram = datetime.now() stream.write(data) except queue.Empty: pass diff --git a/deepgram/clients/speak/v1/websocket/async_client.py b/deepgram/clients/speak/v1/websocket/async_client.py index 1f0eee2c..c5c1cb16 100644 --- a/deepgram/clients/speak/v1/websocket/async_client.py +++ b/deepgram/clients/speak/v1/websocket/async_client.py @@ -27,7 +27,7 @@ ) from .options import SpeakWSOptions -from .....audio.speaker import Speaker, RATE, CHANNELS +from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA ONE_SECOND = 1 HALF_SECOND = 0.5 @@ -93,6 +93,11 @@ def __init__(self, config: DeepgramClientOptions): channels = self._config.options.get("speaker_playback_channels") if channels is None: channels = CHANNELS + playback_delta_in_ms = self._config.options.get( + "speaker_playback_delta_in_ms" + ) + if playback_delta_in_ms is None: + playback_delta_in_ms = PLAYBACK_DELTA device_index = self._config.options.get("speaker_playback_device_index") self._logger.debug("rate: %s", rate) @@ -103,6 +108,7 @@ def __init__(self, config: DeepgramClientOptions): self._speaker = Speaker( rate=rate, channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, verbose=self._config.verbose, output_device_index=device_index, ) @@ -110,6 +116,7 @@ def __init__(self, config: DeepgramClientOptions): self._speaker = Speaker( rate=rate, channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, verbose=self._config.verbose, ) @@ -590,6 +597,21 @@ async def clear(self) -> bool: return True + async def wait_for_complete(self): + """ + This method will block until the speak is done playing sound. + """ + self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER") + + if self._speaker is None: + self._logger.error("speaker is None. Return immediately") + return + + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._speaker.wait_for_complete) + self._logger.notice("wait_for_complete succeeded") + self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE") + async def _close_message(self) -> bool: return await self.send_control(SpeakWebSocketMessage.Close) diff --git a/deepgram/clients/speak/v1/websocket/client.py b/deepgram/clients/speak/v1/websocket/client.py index c0b1e23e..f334bb9b 100644 --- a/deepgram/clients/speak/v1/websocket/client.py +++ b/deepgram/clients/speak/v1/websocket/client.py @@ -27,7 +27,7 @@ ) from .options import SpeakWSOptions -from .....audio.speaker import Speaker, RATE, CHANNELS +from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA ONE_SECOND = 1 HALF_SECOND = 0.5 @@ -96,6 +96,11 @@ def __init__(self, config: DeepgramClientOptions): channels = self._config.options.get("speaker_playback_channels") if channels is None: channels = CHANNELS + playback_delta_in_ms = self._config.options.get( + "speaker_playback_delta_in_ms" + ) + if playback_delta_in_ms is None: + playback_delta_in_ms = PLAYBACK_DELTA device_index = self._config.options.get("speaker_playback_device_index") self._logger.debug("rate: %s", rate) @@ -106,6 +111,7 @@ def __init__(self, config: DeepgramClientOptions): self._speaker = Speaker( rate=rate, channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, verbose=self._config.verbose, output_device_index=device_index, ) @@ -113,6 +119,7 @@ def __init__(self, config: DeepgramClientOptions): self._speaker = Speaker( rate=rate, channels=channels, + last_play_delta_in_ms=playback_delta_in_ms, verbose=self._config.verbose, ) @@ -589,6 +596,20 @@ def clear(self) -> bool: return True + def wait_for_complete(self): + """ + This method will block until the speak is done playing sound. + """ + self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER") + + if self._speaker is None: + self._logger.error("speaker is None. Return immediately") + raise DeepgramError("Speaker is not initialized") + + self._speaker.wait_for_complete() + self._logger.notice("wait_for_complete succeeded") + self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE") + def _close_message(self) -> bool: return self.send_control(SpeakWebSocketMessage.Close) diff --git a/examples/text-to-speech/websocket/async_complete/main.py b/examples/text-to-speech/websocket/async_complete/main.py index 46819ce2..87f238d3 100644 --- a/examples/text-to-speech/websocket/async_complete/main.py +++ b/examples/text-to-speech/websocket/async_complete/main.py @@ -32,8 +32,11 @@ async def main(): # example of setting up a client config. logging values: WARNING, VERBOSE, DEBUG, SPAM config: DeepgramClientOptions = DeepgramClientOptions( - options={"auto_flush_speak_delta": "500", "speaker_playback": "true"}, - # verbose=verboselogs.SPAM, + options={ + # "auto_flush_speak_delta": "500", + "speaker_playback": "true" + }, + verbose=verboselogs.SPAM, ) deepgram: DeepgramClient = DeepgramClient("", config) @@ -99,11 +102,12 @@ async def on_unhandled(self, unhandled, **kwargs): # send the text to Deepgram await dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() await dg_connection.flush() # Indicate that we've finished - await asyncio.sleep(7) + await dg_connection.wait_for_complete() # Close the connection await dg_connection.finish() diff --git a/examples/text-to-speech/websocket/complete/main.py b/examples/text-to-speech/websocket/complete/main.py index 3e10f0fe..612ae7da 100644 --- a/examples/text-to-speech/websocket/complete/main.py +++ b/examples/text-to-speech/websocket/complete/main.py @@ -93,11 +93,13 @@ def on_unhandled(self, unhandled, **kwargs): # send the text to Deepgram dg_connection.send_text(TTS_TEXT) + # if auto_flush_speak_delta is not used, you must flush the connection by calling flush() dg_connection.flush() # Indicate that we've finished - time.sleep(5) + dg_connection.wait_for_complete() + print("\n\nPress Enter to stop...\n\n") input()