diff --git a/Cargo.toml b/Cargo.toml index 529ef25..2f85c7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ parking_lot = { optional = true, version = "0.12" } pin-project = "1" rand = { optional = true, version = "0.8" } reqwest = { default-features = false, features = ["stream"], optional = true, version = "0.11" } -ringbuf = { optional = true, version = "0.3" } +ringbuf = { optional = true, version = "0.4" } rubato = { optional = true, version = "0.14.1" } rusty_pool = { optional = true, version = "0.7" } serde = { version = "1", features = ["derive"] } diff --git a/src/input/adapters/async_adapter.rs b/src/input/adapters/async_adapter.rs index a25fe0a..8c6d920 100644 --- a/src/input/adapters/async_adapter.rs +++ b/src/input/adapters/async_adapter.rs @@ -2,7 +2,8 @@ use crate::input::AudioStreamError; use async_trait::async_trait; use flume::{Receiver, RecvError, Sender, TryRecvError}; use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt}; -use ringbuf::*; +use parking_lot::Mutex; +use ringbuf::{traits::*, *}; use std::{ io::{ Error as IoError, @@ -25,7 +26,7 @@ use tokio::{ }; struct AsyncAdapterSink { - bytes_in: HeapProducer, + bytes_in: HeapProd, req_rx: Receiver, resp_tx: Sender, stream: Box, @@ -136,7 +137,10 @@ impl AsyncAdapterSink { /// pass along seek requests needed. This allows for passing bytes from exclusively `AsyncRead` /// streams (e.g., hyper HTTP sessions) to Songbird. pub struct AsyncAdapterStream { - bytes_out: HeapConsumer, + // Note: this mutex is here to appease symphonia's Send + Sync bound. + // Only one thread should own and pull from this stream, so in practice + // there is no contention. + bytes_out: Mutex>, can_seek: bool, // Note: these are Atomic just to work around the need for // check_messages to take &self rather than &mut. @@ -153,6 +157,7 @@ impl AsyncAdapterStream { #[must_use] pub fn new(stream: Box, buf_len: usize) -> AsyncAdapterStream { let (bytes_in, bytes_out) = SharedRb::new(buf_len).split(); + let bytes_out = bytes_out.into(); let (resp_tx, resp_rx) = flume::unbounded(); let (req_tx, req_rx) = flume::unbounded(); let can_seek = stream.is_seekable(); @@ -233,7 +238,8 @@ impl Read for AsyncAdapterStream { || self.finalised.load(Ordering::Relaxed)); drop(self.handle_messages(Operation::Read { block })); - match self.bytes_out.read(buf) { + let mut rb = self.bytes_out.lock(); + match rb.read(buf) { Ok(n) => { self.notify_tx.notify_one(); return Ok(n); @@ -278,7 +284,9 @@ impl Seek for AsyncAdapterStream { _ => unreachable!(), } - self.bytes_out.skip(self.bytes_out.capacity()); + let mut rb = self.bytes_out.lock(); + let cap = rb.capacity(); + rb.skip(cap.into()); _ = self.req_tx.send(AdapterRequest::SeekCleared);