diff --git a/voicevox_engine/app/routers/tts_pipeline.py b/voicevox_engine/app/routers/tts_pipeline.py index 6555c844c..54d225ad1 100644 --- a/voicevox_engine/app/routers/tts_pipeline.py +++ b/voicevox_engine/app/routers/tts_pipeline.py @@ -295,7 +295,7 @@ def cancellable_synthesis( ) try: version = core_version or LATEST_VERSION - f_name = cancellable_engine._synthesis_impl( + f_name = cancellable_engine.synthesize_wave( query, style_id, request, version=version ) except CancellableEngineInternalError as e: diff --git a/voicevox_engine/cancellable_engine.py b/voicevox_engine/cancellable_engine.py index a812892f3..d15146fd7 100644 --- a/voicevox_engine/cancellable_engine.py +++ b/voicevox_engine/cancellable_engine.py @@ -1,9 +1,9 @@ """キャンセル可能な音声合成""" import asyncio -import queue import sys from multiprocessing import Pipe, Process +from queue import Queue if sys.platform == "win32": from multiprocessing.connection import PipeConnection as ConnectionType @@ -29,24 +29,7 @@ class CancellableEngineInternalError(Exception): class CancellableEngine: - """ - 音声合成のキャンセル機能に関するクラス - 初期化後は、synthesis関数で音声合成できる - (オリジナルと比べ引数が増えているので注意) - - パラメータ use_gpu, voicelib_dirs, voicevox_dir, - runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照 - - Attributes - ---------- - watch_con_list: list[tuple[Request, Process]] - Requestは接続の監視に使用され、Processは通信切断時のプロセスキルに使用される - クライアントから接続があるとlistにtupleが追加される - 接続が切断、もしくは音声合成が終了すると削除される - procs_and_cons: queue.Queue[tuple[Process, ConnectionType]] - 音声合成の準備が終わっているプロセスのList - (音声合成中のプロセスは入っていない) - """ + """マルチプロセスでの合成・キャンセル可能な合成をサポートする音声合成エンジン""" def __init__( self, @@ -60,7 +43,9 @@ def __init__( ) -> None: """ 変数の初期化を行う - また、init_processesの数だけプロセスを起動し、procs_and_consに格納する + パラメータ use_gpu, voicelib_dirs, voicevox_dir, + runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照 + init_processesの数だけプロセスを起動し、procs_and_consに格納する """ self.use_gpu = use_gpu @@ -70,28 +55,21 @@ def __init__( self.cpu_num_threads = cpu_num_threads self.enable_mock = enable_mock - self.watch_con_list: list[tuple[Request, Process]] = [] + # Requestは接続の監視に使用され、Processは通信切断時のプロセスキルに使用される + # クライアントから接続があるとlistにtupleが追加される + # 接続が切断、もしくは音声合成が終了すると削除される + self._watching_reqs_and_procs: list[tuple[Request, Process]] = [] - procs_and_cons: queue.Queue[tuple[Process, ConnectionType]] = queue.Queue() + # 待機しているサブプロセスと、それと通信できるコネクション + procs_and_cons: Queue[tuple[Process, ConnectionType]] = Queue() for _ in range(init_processes): - procs_and_cons.put(self.start_new_proc()) - self.procs_and_cons = procs_and_cons + procs_and_cons.put(self._start_new_process()) + self._waiting_procs_and_cons = procs_and_cons - def start_new_proc( - self, - ) -> tuple[Process, ConnectionType]: - """ - 新しく開始したプロセスを返す関数 - - Returns - ------- - ret_proc: Process - 新規のプロセス - sub_proc_con1: ConnectionType - ret_procのプロセスと通信するためのPipe - """ - sub_proc_con1, sub_proc_con2 = Pipe(True) - ret_proc = Process( + def _start_new_process(self) -> tuple[Process, ConnectionType]: + """音声合成可能な新しいプロセスを開始し、そのプロセスとそこへのコネクションを返す。""" + connection_outer, connection_inner = Pipe(True) + new_process = Process( target=start_synthesis_subprocess, kwargs={ "use_gpu": self.use_gpu, @@ -100,51 +78,47 @@ def start_new_proc( "runtime_dirs": self.runtime_dirs, "cpu_num_threads": self.cpu_num_threads, "enable_mock": self.enable_mock, - "sub_proc_con": sub_proc_con2, + "sub_proc_con": connection_inner, }, daemon=True, ) - ret_proc.start() - return ret_proc, sub_proc_con1 + new_process.start() + return new_process, connection_outer - def finalize_con( - self, - req: Request, - proc: Process, - sub_proc_con: ConnectionType | None, + def _finalize_con( + self, req: Request, proc: Process, sub_proc_con: ConnectionType | None ) -> None: """ - 接続が切断された時の処理を行う関数 - watch_con_listからの削除、プロセスの後処理を行う - プロセスが生きている場合はそのままprocs_and_consに加える - 死んでいる場合は新しく生成したものをprocs_and_consに加える + 合成の後処理をおこなう。 Parameters ---------- - req: fastapi.Request - 接続確立時に受け取ったものをそのまま渡せばよい - https://fastapi.tiangolo.com/advanced/using-request-directly/ - proc: Process + req: + HTTP 接続状態に関するオブジェクト + proc: 音声合成を行っていたプロセス - sub_proc_con: ConnectionType, optional - 音声合成を行っていたプロセスとのPipe + sub_proc_con: + 音声合成を行っていたプロセスとのコネクション 指定されていない場合、プロセスは再利用されず終了される """ + # 監視対象リストから除外する try: - self.watch_con_list.remove((req, proc)) + self._watching_reqs_and_procs.remove((req, proc)) except ValueError: pass + + # 待機中リストへ再登録する try: if not proc.is_alive() or sub_proc_con is None: proc.close() raise ValueError # プロセスが死んでいない場合は再利用する - self.procs_and_cons.put((proc, sub_proc_con)) + self._waiting_procs_and_cons.put((proc, sub_proc_con)) except ValueError: # プロセスが死んでいるので新しく作り直す - self.procs_and_cons.put(self.start_new_proc()) + self._waiting_procs_and_cons.put(self._start_new_process()) - def _synthesis_impl( + def synthesize_wave( self, query: AudioQuery, style_id: StyleId, @@ -152,41 +126,34 @@ def _synthesis_impl( version: str | LatestVersion, ) -> str: """ - 音声合成を行う関数 - 通常エンジンの引数に比べ、requestが必要になっている - また、返り値がファイル名になっている + サブプロセス上において、音声合成用のクエリ・スタイルIDに基づいて音声波形を生成し、音声ファイル名を返す。 Parameters ---------- - query: AudioQuery - style_id: StyleId - request: fastapi.Request - 接続確立時に受け取ったものをそのまま渡せばよい - https://fastapi.tiangolo.com/advanced/using-request-directly/ - version - - Returns - ------- - f_name: str - 生成された音声ファイルの名前 + request: + HTTP 接続状態に関するオブジェクト + version: + 合成に用いる TTSEngine のバージョン """ - proc, sub_proc_con1 = self.procs_and_cons.get() - self.watch_con_list.append((request, proc)) + # 待機中のプロセスとそのコネクションを取得し、監視対象リストへ登録する + synth_process, synth_connection = self._waiting_procs_and_cons.get() + self._watching_reqs_and_procs.append((request, synth_process)) + + # サブプロセスへ入力を渡して音声を合成する try: - sub_proc_con1.send((query, style_id, version)) - f_name = sub_proc_con1.recv() - if isinstance(f_name, str): - audio_file_name = f_name - else: + synth_connection.send((query, style_id, version)) + audio_file_name = synth_connection.recv() + + if not isinstance(audio_file_name, str): # ここには来ないはず raise CancellableEngineInternalError("不正な値が生成されました") except EOFError: raise CancellableEngineInternalError("既にサブプロセスは終了されています") except Exception: - self.finalize_con(request, proc, sub_proc_con1) + self._finalize_con(request, synth_process, synth_connection) raise + self._finalize_con(request, synth_process, synth_connection) - self.finalize_con(request, proc, sub_proc_con1) return audio_file_name async def catch_disconnection(self) -> None: @@ -195,7 +162,7 @@ async def catch_disconnection(self) -> None: """ while True: await asyncio.sleep(1) - for con in self.watch_con_list: + for con in self._watching_reqs_and_procs: req, proc = con if await req.is_disconnected(): try: @@ -206,9 +173,10 @@ async def catch_disconnection(self) -> None: except ValueError: pass finally: - self.finalize_con(req, proc, None) + self._finalize_con(req, proc, None) +# NOTE: pickle化の関係でグローバルに書いている def start_synthesis_subprocess( use_gpu: bool, voicelib_dirs: list[Path] | None, @@ -216,21 +184,20 @@ def start_synthesis_subprocess( runtime_dirs: list[Path] | None, cpu_num_threads: int | None, enable_mock: bool, - sub_proc_con: ConnectionType, + connection: ConnectionType, ) -> None: """ - 音声合成を行うサブプロセスで行うための関数 - pickle化の関係でグローバルに書いている + コネクションへの入力に応答して音声合成をおこなうループを実行する 引数 use_gpu, voicelib_dirs, voicevox_dir, runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照 Parameters ---------- - sub_proc_con: ConnectionType - メインプロセスと通信するためのPipe + connection: + メインプロセスと通信するためのコネクション """ - + # 音声合成エンジンを用意する core_manager = initialize_cores( use_gpu=use_gpu, voicelib_dirs=voicelib_dirs, @@ -240,16 +207,20 @@ def start_synthesis_subprocess( enable_mock=enable_mock, ) tts_engines = make_tts_engines_from_cores(core_manager) - assert len(tts_engines.versions()) != 0, "音声合成エンジンがありません。" + + # 「キュー入力待機 → キュー入力受付 → 音声合成 → ファイル名送信」をループする while True: try: - query, style_id, version = sub_proc_con.recv() + # キューへ入力が来たらそれを受け取る + query, style_id, version = connection.recv() + + # 音声を合成しファイルへ保存する try: _engine = tts_engines.get_engine(version) except Exception: - # バージョンが見つからないエラー - sub_proc_con.send("") + # コネクションを介して「バージョンが見つからないエラー」を送信する + connection.send("") # `""` をエラーして扱う continue # FIXME: enable_interrogative_upspeakフラグをWebAPIから受け渡してくる wave = _engine.synthesize_wave( @@ -259,7 +230,10 @@ def start_synthesis_subprocess( soundfile.write( file=f, data=wave, samplerate=query.outputSamplingRate, format="WAV" ) - sub_proc_con.send(f.name) + + # コネクションを介してファイル名を送信する + connection.send(f.name) + except Exception: - sub_proc_con.close() + connection.close() raise