Skip to content

Commit

Permalink
Write some more blog stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
xd009642 committed Dec 25, 2024
1 parent 3692c2a commit a3c1f92
Showing 1 changed file with 91 additions and 0 deletions.
91 changes: 91 additions & 0 deletions doc/04_axum_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,94 @@ let (client_sender, client_receiver) = mpsc::channel(8);
}
}
```

The transcoding task is also fairly simple, we just want to spawn the audio
decoder with our channels passed in.

```rust
let transcoding_task = task::spawn(
decode_audio(start.format, audio_bytes_rx, senders).in_current_span()
);
```

In our current function the last stage is to read the messages from the
websocket and send down the appropriate channel. There's also a small
check after we join the running inferences so I'll include a bit of
the code from a previous sample again:

```rust
'outer: loop {

// Code from setting up the inference and transcoding tasks

let mut got_messages = false;
let mut disconnect = false;
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Binary(audio) => {
got_messages = true;
if let Err(e) = audio_bytes_tx.send(audio.into()).await {
warn!("Transcoding channel closed, this may indicate that inference has finished: {}", e);
break;
}
}
Message::Text(text) => match serde_json::from_str::<RequestMessage>(&text) {
Ok(RequestMessage::Start(start_msg)) => {
got_messages = true;
info!(start=?start, "Reinitialising streamer");
start = start_msg;
break;
}
Ok(RequestMessage::Stop(msg)) => {
got_messages = true;
info!("Stopping current stream, {:?}", msg);
disconnect = msg.disconnect;
break;
}
Err(e) => {
error!(json=%text, error=%e, "invalid json");
}
},
Message::Close(_frame) => {
info!("Finished streaming request");
break 'outer;
}
_ => {} // We don't care about ping and pong
}
}

std::mem::drop(audio_bytes_tx);
for handle in running_inferences.drain(..) {
match handle.await {
Ok(Err(e)) => error!("Inference failed: {}", e),
Err(e) => error!("Inference task panicked: {}", e),
Ok(Ok(_)) => {}
}
}
if let Err(e) = transcoding_task.await.unwrap() {
error!("Failed from transcoding task: {}", e);
}
if !got_messages || disconnect {
break;
}
}
```

When we receive biinary data this is audio so we send into our transcoding task.
If that sending has failed it means one of two things:

1. Transcoding failed with an error
2. We've had a single utterance request and the utterances have finished

In this case we break out of the message handling and when we await the other
tasks we'll see if anything went wrong.

For the text messages these should either be start or stop, these come when our
current stream ends and a new one is starting or the connection will be closed.

If we exited the websocket message receiving without receiving anything then the
client will have disconnected before we did anything and we want to exit same
with a disconnection request hence the little if with a final break at the end.

With this we have a working API and can stream audio into a model and get
results back.

0 comments on commit a3c1f92

Please sign in to comment.