Change HlsStream to a more generic AsyncReadOnlySource (#256)
This commit is contained in:
@@ -1,22 +1,14 @@
|
|||||||
use std::{
|
|
||||||
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use pin_project::pin_project;
|
|
||||||
use reqwest::{header::HeaderMap, Client};
|
use reqwest::{header::HeaderMap, Client};
|
||||||
use stream_lib::Event;
|
use stream_lib::Event;
|
||||||
use symphonia_core::io::MediaSource;
|
use symphonia_core::io::MediaSource;
|
||||||
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
|
|
||||||
use tokio_util::io::StreamReader;
|
use tokio_util::io::StreamReader;
|
||||||
|
|
||||||
use crate::input::{
|
use crate::input::{
|
||||||
AsyncAdapterStream,
|
AsyncAdapterStream,
|
||||||
AsyncMediaSource,
|
AsyncReadOnlySource,
|
||||||
AudioStream,
|
AudioStream,
|
||||||
AudioStreamError,
|
AudioStreamError,
|
||||||
Compose,
|
Compose,
|
||||||
@@ -51,7 +43,7 @@ impl HlsRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
|
fn create_stream(&mut self) -> Result<AsyncReadOnlySource, AudioStreamError> {
|
||||||
let request = self
|
let request = self
|
||||||
.client
|
.client
|
||||||
.get(&self.request)
|
.get(&self.request)
|
||||||
@@ -70,51 +62,7 @@ impl HlsRequest {
|
|||||||
)),
|
)),
|
||||||
})));
|
})));
|
||||||
|
|
||||||
Ok(HlsStream { stream })
|
Ok(AsyncReadOnlySource { stream })
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[pin_project]
|
|
||||||
struct HlsStream {
|
|
||||||
#[pin]
|
|
||||||
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for HlsStream {
|
|
||||||
fn poll_read(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut ReadBuf<'_>,
|
|
||||||
) -> Poll<IoResult<()>> {
|
|
||||||
AsyncRead::poll_read(self.project().stream, cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncSeek for HlsStream {
|
|
||||||
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
|
|
||||||
Err(IoErrorKind::Unsupported.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl AsyncMediaSource for HlsStream {
|
|
||||||
fn is_seekable(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn byte_len(&self) -> Option<u64> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn try_resume(
|
|
||||||
&mut self,
|
|
||||||
_offset: u64,
|
|
||||||
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
|
|
||||||
Err(AudioStreamError::Unsupported)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,3 +4,84 @@ mod http;
|
|||||||
mod ytdl;
|
mod ytdl;
|
||||||
|
|
||||||
pub use self::{file::*, hls::*, http::*, ytdl::*};
|
pub use self::{file::*, hls::*, http::*, ytdl::*};
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use pin_project::pin_project;
|
||||||
|
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
|
||||||
|
|
||||||
|
use crate::input::{AsyncMediaSource, AudioStreamError};
|
||||||
|
|
||||||
|
/// `AsyncReadOnlySource` wraps any source implementing [`tokio::io::AsyncRead`] in an unseekable
|
||||||
|
/// [`symphonia_core::io::MediaSource`], similar to [`symphonia_core::io::ReadOnlySource`]
|
||||||
|
#[pin_project]
|
||||||
|
pub struct AsyncReadOnlySource {
|
||||||
|
#[pin]
|
||||||
|
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncReadOnlySource {
|
||||||
|
/// Instantiates a new `AsyncReadOnlySource` by taking ownership and wrapping the provided
|
||||||
|
/// `Read`er.
|
||||||
|
pub fn new<R>(inner: R) -> Self
|
||||||
|
where
|
||||||
|
R: AsyncRead + Send + Sync + Unpin + 'static,
|
||||||
|
{
|
||||||
|
AsyncReadOnlySource {
|
||||||
|
stream: Box::new(inner),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets a reference to the underlying reader.
|
||||||
|
pub fn get_ref(&self) -> &Box<dyn AsyncRead + Send + Sync + Unpin> {
|
||||||
|
&self.stream
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unwraps this `AsyncReadOnlySource`, returning the underlying reader.
|
||||||
|
pub fn into_inner<R>(self) -> Box<dyn AsyncRead + Send + Sync + Unpin> {
|
||||||
|
self.stream.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for AsyncReadOnlySource {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<IoResult<()>> {
|
||||||
|
AsyncRead::poll_read(self.project().stream, cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncSeek for AsyncReadOnlySource {
|
||||||
|
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
|
||||||
|
Err(IoErrorKind::Unsupported.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncMediaSource for AsyncReadOnlySource {
|
||||||
|
fn is_seekable(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn byte_len(&self) -> Option<u64> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_resume(
|
||||||
|
&mut self,
|
||||||
|
_offset: u64,
|
||||||
|
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
|
||||||
|
Err(AudioStreamError::Unsupported)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user