Use symphonia::io::MediaSource for Reader extensions (#61)

This PR does the following:

 * Changes both `Reader::Extension` and `Reader::ExtensionSeek`  to use `symphonia::io::MediaSource`.
 * Removes the `File` and `Vec` variants of readers, instead opting to provide a `from_file` and `from_memory` associated function to create readers from the `File` and `Cursor<Vec<u8>>` implementations of `MediaSource`. 
 * Removes the ReadSeek trait.
 * Added a dependency on `symphonia_core`. This crate has no additional dependencies.
This commit is contained in:
James Liu
2021-04-21 03:17:55 -07:00
committed by Kyle Simpson
parent 0bb2572deb
commit bc9a78e050
3 changed files with 34 additions and 70 deletions

View File

@@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
tracing-futures = "0.2" tracing-futures = "0.2"
symphonia-core = "0.2"
[dependencies.async-trait] [dependencies.async-trait]
optional = true optional = true

View File

@@ -1,6 +1,6 @@
use super::{codec::OpusDecoderState, error::DcaError, Codec, Container, Input, Metadata, Reader}; use super::{codec::OpusDecoderState, error::DcaError, Codec, Container, Input, Metadata, Reader};
use serde::Deserialize; use serde::Deserialize;
use std::{ffi::OsStr, io::BufReader, mem}; use std::{ffi::OsStr, mem};
#[cfg(not(feature = "tokio-02-marker"))] #[cfg(not(feature = "tokio-02-marker"))]
use tokio::{fs::File as TokioFile, io::AsyncReadExt}; use tokio::{fs::File as TokioFile, io::AsyncReadExt};
#[cfg(feature = "tokio-02-marker")] #[cfg(feature = "tokio-02-marker")]
@@ -46,7 +46,7 @@ async fn _dca(path: &OsStr) -> Result<Input, DcaError> {
.await .await
.map_err(DcaError::IoError)?; .map_err(DcaError::IoError)?;
let reader = BufReader::new(json_reader.into_inner().into_std().await); let reader = json_reader.into_inner().into_std().await;
let metadata: Metadata = serde_json::from_slice::<DcaMetadata>(raw_json.as_slice()) let metadata: Metadata = serde_json::from_slice::<DcaMetadata>(raw_json.as_slice())
.map_err(DcaError::InvalidMetadata)? .map_err(DcaError::InvalidMetadata)?
@@ -56,7 +56,7 @@ async fn _dca(path: &OsStr) -> Result<Input, DcaError> {
Ok(Input::new( Ok(Input::new(
stereo, stereo,
Reader::File(reader), Reader::from_file(reader),
Codec::Opus(OpusDecoderState::new().map_err(DcaError::Opus)?), Codec::Opus(OpusDecoderState::new().map_err(DcaError::Opus)?),
Container::Dca { Container::Dca {
first_frame: (size as usize) + mem::size_of::<i32>() + header.len(), first_frame: (size as usize) + mem::size_of::<i32>() + header.len(),

View File

@@ -17,14 +17,13 @@ use std::{
result::Result as StdResult, result::Result as StdResult,
}; };
use streamcatcher::{Catcher, TxCatcher}; use streamcatcher::{Catcher, TxCatcher};
pub use symphonia_core::io::MediaSource;
/// Usable data/byte sources for an audio stream. /// Usable data/byte sources for an audio stream.
/// ///
/// Users may define their own data sources using [`Extension`] /// Users may define their own data sources using [`Extension`].
/// and [`ExtensionSeek`].
/// ///
/// [`Extension`]: Reader::Extension /// [`Extension`]: Reader::Extension
/// [`ExtensionSeek`]: Reader::ExtensionSeek
pub enum Reader { pub enum Reader {
/// Piped output of another program (i.e., [`ffmpeg`]). /// Piped output of another program (i.e., [`ffmpeg`]).
/// ///
@@ -44,40 +43,38 @@ pub enum Reader {
/// ///
/// Supports seeking. /// Supports seeking.
Restartable(Restartable), Restartable(Restartable),
/// A source contained in a local file.
///
/// Supports seeking.
File(BufReader<File>),
/// A source contained as an array in memory.
///
/// Supports seeking.
Vec(Cursor<Vec<u8>>),
/// A basic user-provided source. /// A basic user-provided source.
/// ///
/// Does not support seeking. /// Seeking support depends on underlying `MediaSource` implementation.
Extension(Box<dyn Read + Send>), Extension(Box<dyn MediaSource + Send>),
/// A user-provided source which also implements [`Seek`].
///
/// Supports seeking.
///
/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
ExtensionSeek(Box<dyn ReadSeek + Send>),
} }
impl Reader { impl Reader {
/// Returns whether the given source implements [`Seek`]. /// Returns whether the given source implements [`Seek`].
/// ///
/// This might be an expensive operation and might involve blocking IO. In such cases, it is
/// advised to cache the return value when possible.
///
/// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html /// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
pub fn is_seekable(&self) -> bool { pub fn is_seekable(&self) -> bool {
use Reader::*; use Reader::*;
match self { match self {
Restartable(_) | Compressed(_) | Memory(_) => true, Restartable(_) | Compressed(_) | Memory(_) => true,
Extension(_) => false, Extension(source) => source.is_seekable(),
ExtensionSeek(_) => true,
_ => false, _ => false,
} }
} }
/// A source contained in a local file.
pub fn from_file(file: File) -> Self {
Self::Extension(Box::new(file))
}
/// A source contained as an array in memory.
pub fn from_memory(buf: Vec<u8>) -> Self {
Self::Extension(Box::new(Cursor::new(buf)))
}
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
pub(crate) fn prep_with_handle(&mut self, handle: Handle) { pub(crate) fn prep_with_handle(&mut self, handle: Handle) {
use Reader::*; use Reader::*;
@@ -105,10 +102,7 @@ impl Read for Reader {
Memory(a) => Read::read(a, buffer), Memory(a) => Read::read(a, buffer),
Compressed(a) => Read::read(a, buffer), Compressed(a) => Read::read(a, buffer),
Restartable(a) => Read::read(a, buffer), Restartable(a) => Read::read(a, buffer),
File(a) => Read::read(a, buffer),
Vec(a) => Read::read(a, buffer),
Extension(a) => a.read(buffer), Extension(a) => a.read(buffer),
ExtensionSeek(a) => a.read(buffer),
} }
} }
} }
@@ -117,16 +111,22 @@ impl Seek for Reader {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> { fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
use Reader::*; use Reader::*;
match self { match self {
Pipe(_) | Extension(_) => Err(IoError::new( Pipe(_) => Err(IoError::new(
IoErrorKind::InvalidInput, IoErrorKind::InvalidInput,
"Seeking not supported on Reader of this type.", "Seeking not supported on Reader of this type.",
)), )),
Memory(a) => Seek::seek(a, pos), Memory(a) => Seek::seek(a, pos),
Compressed(a) => Seek::seek(a, pos), Compressed(a) => Seek::seek(a, pos),
File(a) => Seek::seek(a, pos),
Restartable(a) => Seek::seek(a, pos), Restartable(a) => Seek::seek(a, pos),
Vec(a) => Seek::seek(a, pos), Extension(a) =>
ExtensionSeek(a) => a.seek(pos), if a.is_seekable() {
a.seek(pos)
} else {
Err(IoError::new(
IoErrorKind::InvalidInput,
"Seeking not supported on Reader of this type.",
))
},
} }
} }
} }
@@ -139,51 +139,14 @@ impl Debug for Reader {
Memory(a) => format!("{:?}", a), Memory(a) => format!("{:?}", a),
Compressed(a) => format!("{:?}", a), Compressed(a) => format!("{:?}", a),
Restartable(a) => format!("{:?}", a), Restartable(a) => format!("{:?}", a),
File(a) => format!("{:?}", a),
Vec(a) => format!("{:?}", a),
Extension(_) => "Extension".to_string(), Extension(_) => "Extension".to_string(),
ExtensionSeek(_) => "ExtensionSeek".to_string(),
}; };
f.debug_tuple("Reader").field(&field).finish() f.debug_tuple("Reader").field(&field).finish()
} }
} }
impl From<Vec<u8>> for Reader { impl From<Vec<u8>> for Reader {
fn from(val: Vec<u8>) -> Reader { fn from(val: Vec<u8>) -> Self {
Reader::Vec(Cursor::new(val)) Self::from_memory(val)
}
}
/// Fusion trait for custom input sources which allow seeking.
pub trait ReadSeek {
/// See [`Read::read`].
///
/// [`Read::read`]: https://doc.rust-lang.org/nightly/std/io/trait.Read.html#tymethod.read
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize>;
/// See [`Seek::seek`].
///
/// [`Seek::seek`]: https://doc.rust-lang.org/nightly/std/io/trait.Seek.html#tymethod.seek
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64>;
}
impl Read for dyn ReadSeek {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
ReadSeek::read(self, buf)
}
}
impl Seek for dyn ReadSeek {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
ReadSeek::seek(self, pos)
}
}
impl<R: Read + Seek> ReadSeek for R {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
Read::read(self, buf)
}
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
Seek::seek(self, pos)
} }
} }