From e0931d41c2497fb85cf719ccc96dd5a63dd03005 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 6 Nov 2024 16:00:02 +0900 Subject: [PATCH 1/7] =?UTF-8?q?audio=5Fstreaming=5Fheader=20=E3=81=AB?= =?UTF-8?q?=E5=AF=BE=E5=BF=9C=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 2 + config_example.ini | 3 ++ handler.go | 95 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/config.go b/config.go index e0e6e98..3b12e60 100644 --- a/config.go +++ b/config.go @@ -41,6 +41,8 @@ type Config struct { ListenAddr string `ini:"listen_addr"` ListenPort int `ini:"listen_port"` + AudioStreamingHeader bool `ini:"audio_streaming_header"` + TLSFullchainFile string `ini:"tls_fullchain_file"` TLSPrivkeyFile string `ini:"tls_privkey_file"` TLSVerifyCacertPath string `ini:"tls_verify_cacert_path"` // クライアント認証用 diff --git a/config_example.ini b/config_example.ini index 2d5b5d2..19b71db 100644 --- a/config_example.ini +++ b/config_example.ini @@ -11,6 +11,9 @@ exporter_https = false exporter_listen_addr = 0.0.0.0 exporter_listen_port = 48081 +# クライアントから受信する音声データにヘッダーが含まれている想定かどうかです +audio_streaming_header = false + # Suzu のサーバ証明書ファイルです # tls_fullchain_file = # Suzu の秘密鍵ファイルです diff --git a/handler.go b/handler.go index cec4245..ce96dd4 100644 --- a/handler.go +++ b/handler.go @@ -2,6 +2,7 @@ package suzu import ( "context" + "encoding/binary" "errors" "fmt" "io" @@ -230,6 +231,73 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } +func readPacketWithHeader(reader io.Reader) (io.Reader, error) { + r, w := io.Pipe() + + go func() { + defer w.Close() + + length := 0 + payloadLength := 0 + var payload []byte + + for { + buf := make([]byte, 20+0xffff) + n, err := reader.Read(buf) + if err != nil { + // TODO: ログ出力 + return + } + + payload = append(payload, buf[:n]...) + length += n + + if length > 20 { + // timestamp(64), sequence number(64), length(32) + h := payload[0:20] + p := payload[20:length] + + payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + + if length == (20 + payloadLength) { + if _, err := w.Write(p); err != nil { + // TODO: ログ出力 + return + } + payload = []byte{} + length = 0 + continue + } + + // payload が足りないのでさらに読み込む + if length < (20 + payloadLength) { + // 前の payload へ追加して次へ + payload = append(payload, p...) + continue + } + + // 次の frame が含まれている場合 + if length > (20 + payloadLength) { + payload = append(payload, p[:payloadLength]...) + if _, err := w.Write(payload); err != nil { + // TODO: ログ出力 + return + } + // 次の payload 処理へ + payload = p[payloadLength:] + length = len(payload) + continue + } + } else { + // ヘッダー分に足りなければ次の読み込みへ + continue + } + } + }() + + return r, nil +} + func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { o, err := NewWith(oggWriter, sampleRate, channelCount) if err != nil { @@ -249,13 +317,30 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa for { buf := make([]byte, FrameSize) - n, err := opusReader.Read(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) + var n int + if c.AudioStreamingHeader { + r, err := readPacketWithHeader(opusReader) + if err != nil { + return err + } + + n, err = r.Read(buf) + if err != nil { + if w, ok := oggWriter.(*io.PipeWriter); ok { + w.CloseWithError(err) + } + return err + } + } else { + n, err = opusReader.Read(buf) + if err != nil { + if w, ok := oggWriter.(*io.PipeWriter); ok { + w.CloseWithError(err) + } + return err } - return err } + if n > 0 { opus := codecs.OpusPacket{} _, err := opus.Unmarshal(buf[:n]) From 7bda005754a2e148e746cc6ac5407ca1fcc89bac Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 10:56:04 +0900 Subject: [PATCH 2/7] =?UTF-8?q?=E3=81=99=E3=81=A7=E3=81=AB=20append=20?= =?UTF-8?q?=E3=81=97=E3=81=A6=E3=81=84=E3=82=8B=E3=81=9F=E3=82=81=E5=89=8A?= =?UTF-8?q?=E9=99=A4=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/handler.go b/handler.go index ce96dd4..7ac6070 100644 --- a/handler.go +++ b/handler.go @@ -278,8 +278,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { // 次の frame が含まれている場合 if length > (20 + payloadLength) { - payload = append(payload, p[:payloadLength]...) - if _, err := w.Write(payload); err != nil { + if _, err := w.Write(p[:payloadLength]); err != nil { // TODO: ログ出力 return } From a8e23cf57fd4a09ff9ff1076bdaab22d29cbe0d6 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 12:15:08 +0900 Subject: [PATCH 3/7] =?UTF-8?q?=E5=87=A6=E7=90=86=E5=8F=AF=E8=83=BD?= =?UTF-8?q?=E3=81=AA=E8=AA=AD=E3=81=BF=E8=BE=BC=E3=81=BF=E6=B8=88=E3=81=BF?= =?UTF-8?q?=E3=81=AE=E3=83=87=E3=83=BC=E3=82=BF=E3=81=8C=E3=81=BE=E3=81=A0?= =?UTF-8?q?=E3=81=82=E3=82=8B=E5=A0=B4=E5=90=88=E3=81=AF=E3=80=81=E3=81=9D?= =?UTF-8?q?=E3=81=AE=E3=83=87=E3=83=BC=E3=82=BF=E3=82=92=E5=87=A6=E7=90=86?= =?UTF-8?q?=E3=81=97=E3=81=A6=E3=81=8B=E3=82=89=E6=AC=A1=E3=81=AE=E8=AA=AD?= =?UTF-8?q?=E3=81=BF=E8=BE=BC=E3=81=BF=E3=81=AB=E9=80=B2=E3=82=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/handler.go b/handler.go index 7ac6070..d2dca31 100644 --- a/handler.go +++ b/handler.go @@ -285,6 +285,43 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { // 次の payload 処理へ payload = p[payloadLength:] length = len(payload) + + // 次の payload がすでにある場合の処理 + for { + if length > 20 { + h = payload[0:20] + p = payload[20:length] + + payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + + // すでに次の payload が全てある場合 + if length == (20 + payloadLength) { + if _, err := w.Write(p); err != nil { + // TODO: ログ出力 + return + } + payload = []byte{} + length = 0 + continue + } + + if length > (20 + payloadLength) { + if _, err := w.Write(p[:payloadLength]); err != nil { + // TODO: ログ出力 + return + } + + // 次の payload 処理へ + payload = p[payloadLength:] + length = len(payload) + continue + } + } else { + // payload が足りないので、次の読み込みへ + break + } + } + continue } } else { From 74ae761a498757d7377d4b8c7a761e8a670fa7a7 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 12:54:17 +0900 Subject: [PATCH 4/7] =?UTF-8?q?reader=20=E5=8F=96=E5=BE=97=E7=AE=87?= =?UTF-8?q?=E6=89=80=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/handler.go b/handler.go index d2dca31..f9675b1 100644 --- a/handler.go +++ b/handler.go @@ -351,30 +351,25 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa return err } + var r io.Reader + if c.AudioStreamingHeader { + r, err = readPacketWithHeader(opusReader) + if err != nil { + return err + } + } else { + r = opusReader + } + for { buf := make([]byte, FrameSize) var n int - if c.AudioStreamingHeader { - r, err := readPacketWithHeader(opusReader) - if err != nil { - return err - } - - n, err = r.Read(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return err - } - } else { - n, err = opusReader.Read(buf) - if err != nil { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - return err + n, err = r.Read(buf) + if err != nil { + if w, ok := oggWriter.(*io.PipeWriter); ok { + w.CloseWithError(err) } + return err } if n > 0 { From be7f386c99bcd8767dd40e275eedbe5599bcfeda Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 15:37:47 +0900 Subject: [PATCH 5/7] =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC=E6=99=82?= =?UTF-8?q?=E3=81=AE=20close=20=E5=87=A6=E7=90=86=E3=82=92=E5=85=A5?= =?UTF-8?q?=E3=82=8C=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/handler.go b/handler.go index f9675b1..209d825 100644 --- a/handler.go +++ b/handler.go @@ -235,8 +235,6 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { r, w := io.Pipe() go func() { - defer w.Close() - length := 0 payloadLength := 0 var payload []byte @@ -245,7 +243,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { buf := make([]byte, 20+0xffff) n, err := reader.Read(buf) if err != nil { - // TODO: ログ出力 + w.CloseWithError(err) return } @@ -261,7 +259,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { if length == (20 + payloadLength) { if _, err := w.Write(p); err != nil { - // TODO: ログ出力 + w.CloseWithError(err) return } payload = []byte{} @@ -279,7 +277,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { // 次の frame が含まれている場合 if length > (20 + payloadLength) { if _, err := w.Write(p[:payloadLength]); err != nil { - // TODO: ログ出力 + w.CloseWithError(err) return } // 次の payload 処理へ @@ -297,7 +295,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { // すでに次の payload が全てある場合 if length == (20 + payloadLength) { if _, err := w.Write(p); err != nil { - // TODO: ログ出力 + w.CloseWithError(err) return } payload = []byte{} @@ -307,7 +305,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { if length > (20 + payloadLength) { if _, err := w.Write(p[:payloadLength]); err != nil { - // TODO: ログ出力 + w.CloseWithError(err) return } From 2b244a728682b2a5c181d5cf2ca1867fc065c050 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 15:39:18 +0900 Subject: [PATCH 6/7] =?UTF-8?q?=E5=A4=89=E6=9B=B4=E5=B1=A5=E6=AD=B4?= =?UTF-8?q?=E3=82=92=E6=9B=B4=E6=96=B0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 3d10411..d0d0cc8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,12 @@ ## develop +- [ADD] audio streaming header に対応する + - @Hexa +- [ADD] クライアントから送られてくるデータにヘッダーが付与されている場合に対応する audio_streaming_header 設定を追加する + - デフォルト値: false + - @Hexa + ### misc ## 2024.3.0 From 16189a364e80975ae9c1cccf58d1cf1a47c5a9c1 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 7 Nov 2024 15:55:31 +0900 Subject: [PATCH 7/7] =?UTF-8?q?=E3=82=B3=E3=82=B9=E3=83=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/handler.go b/handler.go index 209d825..abe6eab 100644 --- a/handler.go +++ b/handler.go @@ -361,8 +361,7 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa for { buf := make([]byte, FrameSize) - var n int - n, err = r.Read(buf) + n, err := r.Read(buf) if err != nil { if w, ok := oggWriter.(*io.PipeWriter); ok { w.CloseWithError(err)