Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

整理: CancellableEngine を明確化 #1448

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion voicevox_engine/app/routers/tts_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def cancellable_synthesis(
)
try:
version = core_version or LATEST_VERSION
f_name = cancellable_engine._synthesis_impl(
f_name = cancellable_engine.synthesize_wave(
query, style_id, request, version=version
)
except CancellableEngineInternalError as e:
Expand Down
170 changes: 72 additions & 98 deletions voicevox_engine/cancellable_engine.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""キャンセル可能な音声合成"""

import asyncio
import queue
import sys
from multiprocessing import Pipe, Process
from queue import Queue

if sys.platform == "win32":
from multiprocessing.connection import PipeConnection as ConnectionType
Expand All @@ -29,24 +29,7 @@ class CancellableEngineInternalError(Exception):


class CancellableEngine:
"""
音声合成のキャンセル機能に関するクラス
初期化後は、synthesis関数で音声合成できる
(オリジナルと比べ引数が増えているので注意)

パラメータ use_gpu, voicelib_dirs, voicevox_dir,
runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照

Attributes
----------
watch_con_list: list[tuple[Request, Process]]
Requestは接続の監視に使用され、Processは通信切断時のプロセスキルに使用される
クライアントから接続があるとlistにtupleが追加される
接続が切断、もしくは音声合成が終了すると削除される
procs_and_cons: queue.Queue[tuple[Process, ConnectionType]]
音声合成の準備が終わっているプロセスのList
(音声合成中のプロセスは入っていない)
"""
"""マルチプロセスでの合成・キャンセル可能な合成をサポートする音声合成エンジン"""

def __init__(
self,
Expand All @@ -60,7 +43,9 @@ def __init__(
) -> None:
"""
変数の初期化を行う
また、init_processesの数だけプロセスを起動し、procs_and_consに格納する
パラメータ use_gpu, voicelib_dirs, voicevox_dir,
runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照
init_processesの数だけプロセスを起動し、procs_and_consに格納する
Comment on lines +46 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(細かいけど)

「その他の引数はcore_initializerを参照」とかにすると、引数名とドキュメントの密結合を防げるかもです。

"""

self.use_gpu = use_gpu
Expand All @@ -70,28 +55,21 @@ def __init__(
self.cpu_num_threads = cpu_num_threads
self.enable_mock = enable_mock

self.watch_con_list: list[tuple[Request, Process]] = []
# Requestは接続の監視に使用され、Processは通信切断時のプロセスキルに使用される
# クライアントから接続があるとlistにtupleが追加される
# 接続が切断、もしくは音声合成が終了すると削除される
Comment on lines +58 to +60
Copy link
Member

@Hiroshiba Hiroshiba Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ここ、平易な説明でわかりやすいなと感じました!!!

「接続の監視」を「切断の監視」とするとよりわかりやすいかも。

Suggested change
# Requestは接続の監視に使用され、Processは通信切断時のプロセスキルに使用される
# クライアントから接続があるとlistにtupleが追加される
# 接続が切断、もしくは音声合成が終了すると削除される
# Requestは切断の監視に使用され、Processは切断時のプロセスキルに使用される
# クライアントから接続があるとlistにtupleが追加される
# 切断、もしくは音声合成が終了すると削除される

self._watching_reqs_and_procs: list[tuple[Request, Process]] = []

procs_and_cons: queue.Queue[tuple[Process, ConnectionType]] = queue.Queue()
# 待機しているサブプロセスと、それと通信できるコネクション
procs_and_cons: Queue[tuple[Process, ConnectionType]] = Queue()
for _ in range(init_processes):
procs_and_cons.put(self.start_new_proc())
self.procs_and_cons = procs_and_cons
procs_and_cons.put(self._start_new_process())
self._waiting_procs_and_cons = procs_and_cons

def start_new_proc(
self,
) -> tuple[Process, ConnectionType]:
"""
新しく開始したプロセスを返す関数

Returns
-------
ret_proc: Process
新規のプロセス
sub_proc_con1: ConnectionType
ret_procのプロセスと通信するためのPipe
"""
sub_proc_con1, sub_proc_con2 = Pipe(True)
ret_proc = Process(
def _start_new_process(self) -> tuple[Process, ConnectionType]:
"""音声合成可能な新しいプロセスを開始し、そのプロセスとそこへのコネクションを返す。"""
Copy link
Member

@Hiroshiba Hiroshiba Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不明瞭な代名詞は展開してあげるとより確実

Suggested change
"""音声合成可能な新しいプロセスを開始し、そのプロセスとそこへのコネクションを返す。"""
"""音声合成可能な新しいプロセスを開始し、そのプロセスと、プロセスへのコネクションを返す。"""

connection_outer, connection_inner = Pipe(True)
new_process = Process(
target=start_synthesis_subprocess,
kwargs={
"use_gpu": self.use_gpu,
Expand All @@ -100,93 +78,82 @@ def start_new_proc(
"runtime_dirs": self.runtime_dirs,
"cpu_num_threads": self.cpu_num_threads,
"enable_mock": self.enable_mock,
"sub_proc_con": sub_proc_con2,
"sub_proc_con": connection_inner,
},
daemon=True,
)
ret_proc.start()
return ret_proc, sub_proc_con1
new_process.start()
return new_process, connection_outer

def finalize_con(
self,
req: Request,
proc: Process,
sub_proc_con: ConnectionType | None,
def _finalize_con(
self, req: Request, proc: Process, sub_proc_con: ConnectionType | None
) -> None:
"""
接続が切断された時の処理を行う関数
watch_con_listからの削除、プロセスの後処理を行う
プロセスが生きている場合はそのままprocs_and_consに加える
死んでいる場合は新しく生成したものをprocs_and_consに加える
合成の後処理をおこなう。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

音声合成の後処理と勘違いされるかも


Parameters
----------
req: fastapi.Request
接続確立時に受け取ったものをそのまま渡せばよい
https://fastapi.tiangolo.com/advanced/using-request-directly/
proc: Process
req:
HTTP 接続状態に関するオブジェクト
proc:
音声合成を行っていたプロセス
sub_proc_con: ConnectionType, optional
音声合成を行っていたプロセスとのPipe
sub_proc_con:
音声合成を行っていたプロセスとのコネクション
指定されていない場合、プロセスは再利用されず終了される
"""
# 監視対象リストから除外する
try:
self.watch_con_list.remove((req, proc))
self._watching_reqs_and_procs.remove((req, proc))
except ValueError:
pass

# 待機中リストへ再登録する
try:
Comment on lines +104 to 111
Copy link
Member

@Hiroshiba Hiroshiba Jun 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

この辺りちょっとややこしいかもですね!
監視も待機の1つなのと、登録ってなんだっけ?となりました。

案ですが、_watching_reqs_and_procs_waiting_procs_and_consをそれぞれ_request_pool_process_poolにしちゃうのとかどうでしょう・・・・?
そうするとここのコメントは、1つ目の方はコメントがいらないほど明確で、2つ目は「プロセスを待機中に戻す」とかにできるなーと。

ただそうすると、(Request, Process)のtupleを_request_poolに入れることになってなんか変な感じはあるのですが。。。
そこはこう。。コメントでうまいこと説明する感じ・・・は難しいですかね。。。。。

if not proc.is_alive() or sub_proc_con is None:
proc.close()
raise ValueError
# プロセスが死んでいない場合は再利用する
self.procs_and_cons.put((proc, sub_proc_con))
self._waiting_procs_and_cons.put((proc, sub_proc_con))
except ValueError:
# プロセスが死んでいるので新しく作り直す
self.procs_and_cons.put(self.start_new_proc())
self._waiting_procs_and_cons.put(self._start_new_process())

def _synthesis_impl(
def synthesize_wave(
self,
query: AudioQuery,
style_id: StyleId,
request: Request,
version: str | LatestVersion,
) -> str:
"""
音声合成を行う関数
通常エンジンの引数に比べ、requestが必要になっている
また、返り値がファイル名になっている
サブプロセス上において、音声合成用のクエリ・スタイルIDに基づいて音声波形を生成し、音声ファイル名を返す。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

平易に。

Suggested change
サブプロセス上において音声合成用のクエリスタイルIDに基づいて音声波形を生成し音声ファイル名を返す
サブプロセスで音声合成用のクエリスタイルIDから音声を生成し音声ファイル名を返す


Parameters
----------
query: AudioQuery
style_id: StyleId
request: fastapi.Request
接続確立時に受け取ったものをそのまま渡せばよい
https://fastapi.tiangolo.com/advanced/using-request-directly/
version

Returns
-------
f_name: str
生成された音声ファイルの名前
request:
HTTP 接続状態に関するオブジェクト
version:
合成に用いる TTSEngine のバージョン
"""
proc, sub_proc_con1 = self.procs_and_cons.get()
self.watch_con_list.append((request, proc))
# 待機中のプロセスとそのコネクションを取得し、監視対象リストへ登録する
synth_process, synth_connection = self._waiting_procs_and_cons.get()
self._watching_reqs_and_procs.append((request, synth_process))

# サブプロセスへ入力を渡して音声を合成する
try:
sub_proc_con1.send((query, style_id, version))
f_name = sub_proc_con1.recv()
if isinstance(f_name, str):
audio_file_name = f_name
else:
synth_connection.send((query, style_id, version))
audio_file_name = synth_connection.recv()

if not isinstance(audio_file_name, str):
# ここには来ないはず
raise CancellableEngineInternalError("不正な値が生成されました")
except EOFError:
raise CancellableEngineInternalError("既にサブプロセスは終了されています")
except Exception:
self.finalize_con(request, proc, sub_proc_con1)
self._finalize_con(request, synth_process, synth_connection)
raise
self._finalize_con(request, synth_process, synth_connection)

self.finalize_con(request, proc, sub_proc_con1)
return audio_file_name

async def catch_disconnection(self) -> None:
Expand All @@ -195,7 +162,7 @@ async def catch_disconnection(self) -> None:
"""
while True:
await asyncio.sleep(1)
for con in self.watch_con_list:
for con in self._watching_reqs_and_procs:
req, proc = con
if await req.is_disconnected():
try:
Expand All @@ -206,31 +173,31 @@ async def catch_disconnection(self) -> None:
except ValueError:
pass
finally:
self.finalize_con(req, proc, None)
self._finalize_con(req, proc, None)


# NOTE: pickle化の関係でグローバルに書いている
def start_synthesis_subprocess(
use_gpu: bool,
voicelib_dirs: list[Path] | None,
voicevox_dir: Path | None,
runtime_dirs: list[Path] | None,
cpu_num_threads: int | None,
enable_mock: bool,
sub_proc_con: ConnectionType,
connection: ConnectionType,
) -> None:
"""
音声合成を行うサブプロセスで行うための関数
pickle化の関係でグローバルに書いている
コネクションへの入力に応答して音声合成をおこなうループを実行する

引数 use_gpu, voicelib_dirs, voicevox_dir,
runtime_dirs, cpu_num_threads, enable_mock は、 core_initializer を参照

Parameters
----------
sub_proc_con: ConnectionType
メインプロセスと通信するためのPipe
connection:
メインプロセスと通信するためのコネクション
"""

# 音声合成エンジンを用意する
core_manager = initialize_cores(
use_gpu=use_gpu,
voicelib_dirs=voicelib_dirs,
Expand All @@ -240,16 +207,20 @@ def start_synthesis_subprocess(
enable_mock=enable_mock,
)
tts_engines = make_tts_engines_from_cores(core_manager)

assert len(tts_engines.versions()) != 0, "音声合成エンジンがありません。"

# 「キュー入力待機 → キュー入力受付 → 音声合成 → ファイル名送信」をループする
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

入力待機と入力受付の違いがわからず混乱するかも

while True:
try:
query, style_id, version = sub_proc_con.recv()
# キューへ入力が来たらそれを受け取る
query, style_id, version = connection.recv()

# 音声を合成しファイルへ保存する
try:
_engine = tts_engines.get_engine(version)
except Exception:
# バージョンが見つからないエラー
sub_proc_con.send("")
# コネクションを介して「バージョンが見つからないエラー」を送信する
connection.send("") # `""` をエラーして扱う
continue
# FIXME: enable_interrogative_upspeakフラグをWebAPIから受け渡してくる
wave = _engine.synthesize_wave(
Expand All @@ -259,7 +230,10 @@ def start_synthesis_subprocess(
soundfile.write(
file=f, data=wave, samplerate=query.outputSamplingRate, format="WAV"
)
sub_proc_con.send(f.name)

# コネクションを介してファイル名を送信する
connection.send(f.name)

except Exception:
sub_proc_con.close()
connection.close()
raise