Skip to content

Commit

Permalink
Merge pull request #188 from shiguredo/feature/support-streaming-header
Browse files Browse the repository at this point in the history
audio streaming header に対応する
  • Loading branch information
Hexa authored Nov 7, 2024
2 parents dd4fff8 + 16189a3 commit 3302a08
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@

## develop

- [ADD] audio streaming header に対応する
- @Hexa
- [ADD] クライアントから送られてくるデータにヘッダーが付与されている場合に対応する audio_streaming_header 設定を追加する
- デフォルト値: false
- @Hexa

### misc

## 2024.3.0
Expand Down
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {
ListenAddr string `ini:"listen_addr"`
ListenPort int `ini:"listen_port"`

AudioStreamingHeader bool `ini:"audio_streaming_header"`

TLSFullchainFile string `ini:"tls_fullchain_file"`
TLSPrivkeyFile string `ini:"tls_privkey_file"`
TLSVerifyCacertPath string `ini:"tls_verify_cacert_path"` // クライアント認証用
Expand Down
3 changes: 3 additions & 0 deletions config_example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ exporter_https = false
exporter_listen_addr = 0.0.0.0
exporter_listen_port = 48081

# クライアントから受信する音声データにヘッダーが含まれている想定かどうかです
audio_streaming_header = false

# Suzu のサーバ証明書ファイルです
# tls_fullchain_file =
# Suzu の秘密鍵ファイルです
Expand Down
115 changes: 114 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suzu

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -230,6 +231,107 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
}
}

func readPacketWithHeader(reader io.Reader) (io.Reader, error) {
r, w := io.Pipe()

go func() {
length := 0
payloadLength := 0
var payload []byte

for {
buf := make([]byte, 20+0xffff)
n, err := reader.Read(buf)
if err != nil {
w.CloseWithError(err)
return
}

payload = append(payload, buf[:n]...)
length += n

if length > 20 {
// timestamp(64), sequence number(64), length(32)
h := payload[0:20]
p := payload[20:length]

payloadLength = int(binary.BigEndian.Uint32(h[16:20]))

if length == (20 + payloadLength) {
if _, err := w.Write(p); err != nil {
w.CloseWithError(err)
return
}
payload = []byte{}
length = 0
continue
}

// payload が足りないのでさらに読み込む
if length < (20 + payloadLength) {
// 前の payload へ追加して次へ
payload = append(payload, p...)
continue
}

// 次の frame が含まれている場合
if length > (20 + payloadLength) {
if _, err := w.Write(p[:payloadLength]); err != nil {
w.CloseWithError(err)
return
}
// 次の payload 処理へ
payload = p[payloadLength:]
length = len(payload)

// 次の payload がすでにある場合の処理
for {
if length > 20 {
h = payload[0:20]
p = payload[20:length]

payloadLength = int(binary.BigEndian.Uint32(h[16:20]))

// すでに次の payload が全てある場合
if length == (20 + payloadLength) {
if _, err := w.Write(p); err != nil {
w.CloseWithError(err)
return
}
payload = []byte{}
length = 0
continue
}

if length > (20 + payloadLength) {
if _, err := w.Write(p[:payloadLength]); err != nil {
w.CloseWithError(err)
return
}

// 次の payload 処理へ
payload = p[payloadLength:]
length = len(payload)
continue
}
} else {
// payload が足りないので、次の読み込みへ
break
}
}

continue
}
} else {
// ヘッダー分に足りなければ次の読み込みへ
continue
}
}
}()

return r, nil
}

func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error {
o, err := NewWith(oggWriter, sampleRate, channelCount)
if err != nil {
Expand All @@ -247,15 +349,26 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa
return err
}

var r io.Reader
if c.AudioStreamingHeader {
r, err = readPacketWithHeader(opusReader)
if err != nil {
return err
}
} else {
r = opusReader
}

for {
buf := make([]byte, FrameSize)
n, err := opusReader.Read(buf)
n, err := r.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return err
}

if n > 0 {
opus := codecs.OpusPacket{}
_, err := opus.Unmarshal(buf[:n])
Expand Down

0 comments on commit 3302a08

Please sign in to comment.