use crate::input::AudioStreamError; use async_trait::async_trait; use flume::{Receiver, RecvError, Sender, TryRecvError}; use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt}; use ringbuf::*; use std::{ io::{ Error as IoError, ErrorKind as IoErrorKind, Read, Result as IoResult, Seek, SeekFrom, Write, }, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, }; use symphonia_core::io::MediaSource; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}, sync::Notify, }; struct AsyncAdapterSink { bytes_in: Producer, req_rx: Receiver, resp_tx: Sender, stream: Box, notify_rx: Arc, } impl AsyncAdapterSink { async fn launch(mut self) { let mut inner_buf = [0u8; 10 * 1024]; let mut read_region = 0..0; let mut hit_end = false; let mut blocked = false; let mut pause_buf_moves = false; let mut seek_res = None; let mut seen_bytes = 0; loop { // if read_region is empty, refill from src. // if that read is zero, tell other half. // if WouldBlock, block on msg acquire, // else non_block msg acquire. if !pause_buf_moves { if !hit_end && read_region.is_empty() { if let Ok(n) = self.stream.read(&mut inner_buf).await { read_region = 0..n; if n == 0 { drop(self.resp_tx.send_async(AdapterResponse::ReadZero).await); hit_end = true; } seen_bytes += n as u64; } else { match self.stream.try_resume(seen_bytes).await { Ok(s) => { self.stream = s; }, Err(_e) => break, } } } while !read_region.is_empty() && !blocked { if let Ok(n_moved) = self .bytes_in .write(&inner_buf[read_region.start..read_region.end]) { read_region.start += n_moved; } else { blocked = true; } } } let msg = if blocked || hit_end { let mut fs = FuturesUnordered::new(); fs.push(Either::Left(self.req_rx.recv_async())); fs.push(Either::Right(self.notify_rx.notified().map(|_| { let o: Result = Ok(AdapterRequest::Wake); o }))); match fs.next().await { Some(Ok(a)) => a, _ => break, } } else { match self.req_rx.try_recv() { Ok(a) => a, Err(TryRecvError::Empty) => continue, _ => break, } }; match msg { AdapterRequest::Wake => blocked = false, AdapterRequest::ByteLen => { drop( self.resp_tx .send_async(AdapterResponse::ByteLen(self.stream.byte_len().await)) .await, ); }, AdapterRequest::Seek(pos) => { pause_buf_moves = true; drop(self.resp_tx.send_async(AdapterResponse::SeekClear).await); seek_res = Some(self.stream.seek(pos).await); }, AdapterRequest::SeekCleared => { if let Some(res) = seek_res.take() { drop( self.resp_tx .send_async(AdapterResponse::SeekResult(res)) .await, ); } pause_buf_moves = false; }, } } } } /// An adapter for converting an async media source into a synchronous one /// usable by symphonia. /// /// This adapter takes a source implementing `AsyncRead`, and allows the receive side to /// pass along seek requests needed. This allows for passing bytes from exclusively `AsyncRead` /// streams (e.g., hyper HTTP sessions) to Songbird. pub struct AsyncAdapterStream { bytes_out: Consumer, can_seek: bool, // Note: this is Atomic just to work around the need for // check_messages to take &self rather than &mut. finalised: AtomicBool, req_tx: Sender, resp_rx: Receiver, notify_tx: Arc, } impl AsyncAdapterStream { /// Wrap and pull from an async file stream, with an intermediate ring-buffer of size `buf_len` /// between the async and sync halves. #[must_use] pub fn new(stream: Box, buf_len: usize) -> AsyncAdapterStream { let (bytes_in, bytes_out) = RingBuffer::new(buf_len).split(); let (resp_tx, resp_rx) = flume::unbounded(); let (req_tx, req_rx) = flume::unbounded(); let can_seek = stream.is_seekable(); let notify_rx = Arc::new(Notify::new()); let notify_tx = notify_rx.clone(); let sink = AsyncAdapterSink { bytes_in, req_rx, resp_tx, stream, notify_rx, }; let stream = AsyncAdapterStream { bytes_out, can_seek, finalised: false.into(), req_tx, resp_rx, notify_tx, }; tokio::spawn(async move { sink.launch().await; }); stream } fn handle_messages(&self, block: bool) -> Option { loop { match self.resp_rx.try_recv() { Ok(AdapterResponse::ReadZero) => { self.finalised.store(true, Ordering::Relaxed); }, Ok(a) => break Some(a), Err(TryRecvError::Empty) if !block => break None, Err(TryRecvError::Disconnected) => break None, Err(TryRecvError::Empty) => {}, } } } fn is_dropped_and_clear(&self) -> bool { self.resp_rx.is_empty() && self.resp_rx.is_disconnected() } fn check_dropped(&self) -> IoResult<()> { if self.is_dropped_and_clear() { Err(IoError::new( IoErrorKind::UnexpectedEof, "Async half was dropped.", )) } else { Ok(()) } } } impl Read for AsyncAdapterStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { // TODO: make this run via condvar instead? // This needs to remain blocking or spin loopy // Mainly because this is at odds with "keep CPU low." loop { drop(self.handle_messages(false)); match self.bytes_out.read(buf) { Ok(n) => { self.notify_tx.notify_one(); return Ok(n); }, Err(e) if e.kind() == IoErrorKind::WouldBlock => { // receive side must ABSOLUTELY be unblocked here. self.notify_tx.notify_one(); if self.finalised.load(Ordering::Relaxed) { return Ok(0); } self.check_dropped()?; std::thread::yield_now(); }, a => { println!("Misc err {:?}", a); return a; }, } } } } impl Seek for AsyncAdapterStream { fn seek(&mut self, pos: SeekFrom) -> IoResult { if !self.can_seek { return Err(IoError::new( IoErrorKind::Unsupported, "Async half does not support seek operations.", )); } self.check_dropped()?; let _ = self.req_tx.send(AdapterRequest::Seek(pos)); // wait for async to tell us that it has stopped writing, // then clear buf and allow async to write again. self.finalised.store(false, Ordering::Relaxed); match self.handle_messages(true) { Some(AdapterResponse::SeekClear) => {}, None => self.check_dropped().map(|_| unreachable!())?, _ => unreachable!(), } self.bytes_out.discard(self.bytes_out.capacity()); let _ = self.req_tx.send(AdapterRequest::SeekCleared); match self.handle_messages(true) { Some(AdapterResponse::SeekResult(a)) => a, None => self.check_dropped().map(|_| unreachable!()), _ => unreachable!(), } } } impl MediaSource for AsyncAdapterStream { fn is_seekable(&self) -> bool { self.can_seek } fn byte_len(&self) -> Option { self.check_dropped().ok()?; let _ = self.req_tx.send(AdapterRequest::ByteLen); match self.handle_messages(true) { Some(AdapterResponse::ByteLen(a)) => a, None => self.check_dropped().ok().map(|_| unreachable!()), _ => unreachable!(), } } } enum AdapterRequest { Wake, Seek(SeekFrom), SeekCleared, ByteLen, } enum AdapterResponse { SeekResult(IoResult), SeekClear, ByteLen(Option), ReadZero, } /// An async port of symphonia's [`MediaSource`]. /// /// Streams which are not seekable should implement `AsyncSeek` such that all operations /// fail with `Unsupported`, and implement `fn is_seekable(&self) -> { false }`. /// /// [`MediaSource`]: MediaSource #[async_trait] pub trait AsyncMediaSource: AsyncRead + AsyncSeek + Send + Sync + Unpin { /// Returns if the source is seekable. This may be an expensive operation. fn is_seekable(&self) -> bool; /// Returns the length in bytes, if available. This may be an expensive operation. async fn byte_len(&self) -> Option; /// Tries to recreate this stream in event of an error, resuming from the given offset. async fn try_resume( &mut self, _offset: u64, ) -> Result, AudioStreamError> { Err(AudioStreamError::Unsupported) } }