Skip to content

Commit

Permalink
Merge pull request #596 from pipecat-ai/aleix/prepare-0.0.44
Browse files Browse the repository at this point in the history
prepare for pipecat 0.0.44
  • Loading branch information
aconchillo authored Oct 16, 2024
2 parents 0c4a513 + 4eb2c95 commit d255b7d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 43 deletions.
36 changes: 35 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.44] - 2024-10-15

### Added

- Added support for OpenAI Realtime API with the new
`OpenAILLMServiceRealtimeBeta` processor.
(see https://platform.openai.com/docs/guides/realtime/overview)

- Added `RTVIBotTranscriptionProcessor` which will send the RTVI
`bot-transcription` protocol message. These are TTS text aggregated (into
sentences) messages.
Expand All @@ -17,14 +21,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`filter_code` to filter code from text and `filter_tables` to filter tables
from text.

- Added `CanonicalMetricsService`. This processor uses the new
`AudioBufferProcessor` to capture conversation audio and later send it to
Canonical AI.
(see https://canonical.chat/)

- Added `AudioBufferProcessor`. This processor can be used to buffer mixed user and
bot audio. This can later be saved into an audio file or processed by some
audio analyzer.

- Added `on_first_participant_joined` event to `LiveKitTransport`.

### Changed

- LLM text responses are now logged properly as unicode characters.

- `UserStartedSpeakingFrame`, `UserStoppedSpeakingFrame`,
`BotStartedSpeakingFrame`, `BotStoppedSpeakingFrame`, `BotSpeakingFrame` and
`UserImageRequestFrame` are now based from `SystemFrame`

### Fixed

- Merge `RTVIBotLLMProcessor`/`RTVIBotLLMTextProcessor` and
`RTVIBotTTSProcessor`/`RTVIBotTTSTextProcessor` to avoid out of order issues.

- Fixed an issue in RTVI protocol that could cause a `bot-llm-stopped` or
`bot-tts-stopped` message to be sent before a `bot-llm-text` or `bot-tts-text`
message.

- Fixed `DeepgramSTTService` constructor settings not being merged with default
ones.

- Fixed an issue in Daily transport that would cause tasks to be hanging if
urgent transport messages were being sent from a transport event handler.

- Fixed an issue in `BaseOutputTransport` that would cause `EndFrame` to be
pushed downed too early and call `FrameProcessor.cleanup()` before letting the
transport stop properly.

## [0.0.43] - 2024-10-10

### Added
Expand Down
19 changes: 9 additions & 10 deletions src/pipecat/processors/audio/audio_buffer_processor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import wave
from io import BytesIO

from pipecat.frames.frames import (
AudioRawFrame,
BotInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartInterruptionFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

Expand All @@ -39,18 +38,18 @@ def __init__(self, **kwargs):
def _buffer_has_audio(self, buffer: bytearray):
return buffer is not None and len(buffer) > 0

def _has_audio(self):
def has_audio(self):
return (
self._buffer_has_audio(self._user_audio_buffer)
and self._buffer_has_audio(self._assistant_audio_buffer)
and self._sample_rate is not None
)

def _reset_audio_buffer(self):
def reset_audio_buffer(self):
self._user_audio_buffer = bytearray()
self._assistant_audio_buffer = bytearray()

def _merge_audio_buffers(self):
def merge_audio_buffers(self):
with BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setnchannels(2)
Expand Down
38 changes: 20 additions & 18 deletions src/pipecat/services/canonical.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import aiohttp
import os
import uuid

from datetime import datetime
from typing import Dict, List, Tuple

import aiohttp
from pipecat.frames.frames import CancelFrame, EndFrame, Frame
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService

from loguru import logger

try:
Expand All @@ -18,27 +30,18 @@
raise Exception(f"Missing module: {e}")


from pipecat.frames.frames import CancelFrame, EndFrame, Frame
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_services import AIService

# Multipart upload part size in bytes, cannot be smaller than 5MB
PART_SIZE = 1024 * 1024 * 5
"""
This class extends AudioBufferProcessor to handle audio processing and uploading
for the Canonical Voice API.
"""


class CanonicalMetricsService(AIService):
"""
Initialize a CanonicalAudioProcessor instance.
"""Initialize a CanonicalAudioProcessor instance.
This class extends AudioBufferProcessor to handle audio processing and uploading
for the Canonical Voice API.
This class uses an AudioBufferProcessor to get the conversation audio and
uploads it to Canonical Voice API for audio processing.
Args:
call_id (str): Your unique identifier for the call. This is used to match the call in the Canonical Voice system to the call in your system.
assistant (str): Identifier for the AI assistant. This can be whatever you want, it's intended for you convenience so you can distinguish
between different assistants and a grouping mechanism for calls.
Expand All @@ -52,7 +55,6 @@ class CanonicalMetricsService(AIService):
output_dir (str): Directory path for saving temporary audio files.
The constructor also ensures that the output directory exists.
This class requires a Canonical API key to be set in the CANONICAL_API_KEY environment variable.
"""

def __init__(
Expand Down Expand Up @@ -90,17 +92,17 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

async def _process_audio(self):
pipeline = self._audio_buffer_processor
if pipeline._has_audio():
if pipeline.has_audio():
os.makedirs(self._output_dir, exist_ok=True)
filename = self._get_output_filename()
wave_data = pipeline._merge_audio_buffers()
wave_data = pipeline.merge_audio_buffers()

async with aiofiles.open(filename, "wb") as file:
await file.write(wave_data)

try:
await self._multipart_upload(filename)
pipeline._reset_audio_buffer()
pipeline.reset_audio_buffer()
await aiofiles.os.remove(filename)
except FileNotFoundError:
pass
Expand Down
33 changes: 21 additions & 12 deletions src/pipecat/transports/base_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ async def start(self, frame: StartFrame):
self._audio_out_task = self.get_event_loop().create_task(self._audio_out_task_handler())

async def stop(self, frame: EndFrame):
# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the camera and audio tasks below
# because they might be still rendering.
if self._sink_task:
await self._sink_task
if self._sink_clock_task:
await self._sink_clock_task

# Cancel and wait for the camera output task to finish.
if self._camera_out_task and self._params.camera_out_enabled:
self._camera_out_task.cancel()
Expand Down Expand Up @@ -191,9 +182,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self.push_frame(frame, direction)
# Control frames.
elif isinstance(frame, EndFrame):
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)
# Process sink tasks.
await self._stop_sink_tasks(frame)
# Now we can stop.
await self.stop(frame)
# We finally push EndFrame down so PipelineTask stops nicely.
await self.push_frame(frame, direction)
# Other frames.
elif isinstance(frame, OutputAudioRawFrame):
await self._handle_audio(frame)
Expand All @@ -205,6 +199,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
else:
await self._sink_queue.put(frame)

async def _stop_sink_tasks(self, frame: EndFrame):
# Let the sink tasks process the queue until they reach this EndFrame.
await self._sink_clock_queue.put((sys.maxsize, frame.id, frame))
await self._sink_queue.put(frame)

# At this point we have enqueued an EndFrame and we need to wait for
# that EndFrame to be processed by the sink tasks. We also need to wait
# for these tasks before cancelling the camera and audio tasks below
# because they might be still rendering.
if self._sink_task:
await self._sink_task
if self._sink_clock_task:
await self._sink_clock_task

async def _handle_interruptions(self, frame: Frame):
if not self.interruptions_allowed:
return
Expand Down Expand Up @@ -278,7 +286,8 @@ async def _sink_frame_handler(self, frame: Frame):
elif isinstance(frame, TTSStoppedFrame):
await self._bot_stopped_speaking()
await self.push_frame(frame)
else:
# We will push EndFrame later.
elif not isinstance(frame, EndFrame):
await self.push_frame(frame)

async def _sink_task_handler(self):
Expand Down
8 changes: 6 additions & 2 deletions src/pipecat/transports/services/daily.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def set_callbacks(self, callbacks: DailyCallbacks):
self._callbacks = callbacks

async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
if not self._client:
if not self._joined or self._leaving:
return

participant_id = None
Expand Down Expand Up @@ -740,13 +740,17 @@ async def stop(self, frame: EndFrame):
self._messages_task.cancel()
await self._messages_task
self._messages_task = None
self._messages_task = None
# Leave the room.
await self._client.leave()

async def cancel(self, frame: CancelFrame):
# Parent stop.
await super().cancel(frame)
# Cancel messages task
if self._messages_task:
self._messages_task.cancel()
await self._messages_task
self._messages_task = None
# Leave the room.
await self._client.leave()

Expand Down
6 changes: 6 additions & 0 deletions src/pipecat/transports/services/livekit.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, List
Expand Down

0 comments on commit d255b7d

Please sign in to comment.