Skip to content

Commit

Permalink
Change HlsStream to a more generic AsyncReadOnlySource (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhgndf authored Sep 28, 2024
1 parent 095feb1 commit fe9b156
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 55 deletions.
58 changes: 3 additions & 55 deletions src/input/sources/hls.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use pin_project::pin_project;
use reqwest::{header::HeaderMap, Client};
use stream_lib::Event;
use symphonia_core::io::MediaSource;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio_util::io::StreamReader;

use crate::input::{
AsyncAdapterStream,
AsyncMediaSource,
AsyncReadOnlySource,
AudioStream,
AudioStreamError,
Compose,
Expand Down Expand Up @@ -51,7 +43,7 @@ impl HlsRequest {
}
}

fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
fn create_stream(&mut self) -> Result<AsyncReadOnlySource, AudioStreamError> {
let request = self
.client
.get(&self.request)
Expand All @@ -70,51 +62,7 @@ impl HlsRequest {
)),
})));

Ok(HlsStream { stream })
}
}

#[pin_project]
struct HlsStream {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncRead for HlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for HlsStream {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for HlsStream {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
Ok(AsyncReadOnlySource { stream })
}
}

Expand Down
81 changes: 81 additions & 0 deletions src/input/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,84 @@ mod http;
mod ytdl;

pub use self::{file::*, hls::*, http::*, ytdl::*};

use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use async_trait::async_trait;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};

use crate::input::{AsyncMediaSource, AudioStreamError};

/// `AsyncReadOnlySource` wraps any source implementing [`tokio::io::AsyncRead`] in an unseekable
/// [`symphonia_core::io::MediaSource`], similar to [`symphonia_core::io::ReadOnlySource`]
#[pin_project]
pub struct AsyncReadOnlySource {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncReadOnlySource {
/// Instantiates a new `AsyncReadOnlySource` by taking ownership and wrapping the provided
/// `Read`er.
pub fn new<R>(inner: R) -> Self
where
R: AsyncRead + Send + Sync + Unpin + 'static,
{
AsyncReadOnlySource {
stream: Box::new(inner),
}
}

/// Gets a reference to the underlying reader.
pub fn get_ref(&self) -> &Box<dyn AsyncRead + Send + Sync + Unpin> {
&self.stream
}

/// Unwraps this `AsyncReadOnlySource`, returning the underlying reader.
pub fn into_inner<R>(self) -> Box<dyn AsyncRead + Send + Sync + Unpin> {
self.stream.into()
}
}

impl AsyncRead for AsyncReadOnlySource {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for AsyncReadOnlySource {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for AsyncReadOnlySource {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}

0 comments on commit fe9b156

Please sign in to comment.