chore(deps): Update ringbuf -> 0.4
This commit is contained in:
@@ -30,7 +30,7 @@ parking_lot = { optional = true, version = "0.12" }
|
|||||||
pin-project = "1"
|
pin-project = "1"
|
||||||
rand = { optional = true, version = "0.8" }
|
rand = { optional = true, version = "0.8" }
|
||||||
reqwest = { default-features = false, features = ["stream"], optional = true, version = "0.11" }
|
reqwest = { default-features = false, features = ["stream"], optional = true, version = "0.11" }
|
||||||
ringbuf = { optional = true, version = "0.3" }
|
ringbuf = { optional = true, version = "0.4" }
|
||||||
rubato = { optional = true, version = "0.14.1" }
|
rubato = { optional = true, version = "0.14.1" }
|
||||||
rusty_pool = { optional = true, version = "0.7" }
|
rusty_pool = { optional = true, version = "0.7" }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ use crate::input::AudioStreamError;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use flume::{Receiver, RecvError, Sender, TryRecvError};
|
use flume::{Receiver, RecvError, Sender, TryRecvError};
|
||||||
use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt};
|
use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt};
|
||||||
use ringbuf::*;
|
use parking_lot::Mutex;
|
||||||
|
use ringbuf::{traits::*, *};
|
||||||
use std::{
|
use std::{
|
||||||
io::{
|
io::{
|
||||||
Error as IoError,
|
Error as IoError,
|
||||||
@@ -25,7 +26,7 @@ use tokio::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct AsyncAdapterSink {
|
struct AsyncAdapterSink {
|
||||||
bytes_in: HeapProducer<u8>,
|
bytes_in: HeapProd<u8>,
|
||||||
req_rx: Receiver<AdapterRequest>,
|
req_rx: Receiver<AdapterRequest>,
|
||||||
resp_tx: Sender<AdapterResponse>,
|
resp_tx: Sender<AdapterResponse>,
|
||||||
stream: Box<dyn AsyncMediaSource>,
|
stream: Box<dyn AsyncMediaSource>,
|
||||||
@@ -136,7 +137,10 @@ impl AsyncAdapterSink {
|
|||||||
/// pass along seek requests needed. This allows for passing bytes from exclusively `AsyncRead`
|
/// pass along seek requests needed. This allows for passing bytes from exclusively `AsyncRead`
|
||||||
/// streams (e.g., hyper HTTP sessions) to Songbird.
|
/// streams (e.g., hyper HTTP sessions) to Songbird.
|
||||||
pub struct AsyncAdapterStream {
|
pub struct AsyncAdapterStream {
|
||||||
bytes_out: HeapConsumer<u8>,
|
// Note: this mutex is here to appease symphonia's Send + Sync bound.
|
||||||
|
// Only one thread should own and pull from this stream, so in practice
|
||||||
|
// there is no contention.
|
||||||
|
bytes_out: Mutex<HeapCons<u8>>,
|
||||||
can_seek: bool,
|
can_seek: bool,
|
||||||
// Note: these are Atomic just to work around the need for
|
// Note: these are Atomic just to work around the need for
|
||||||
// check_messages to take &self rather than &mut.
|
// check_messages to take &self rather than &mut.
|
||||||
@@ -153,6 +157,7 @@ impl AsyncAdapterStream {
|
|||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn new(stream: Box<dyn AsyncMediaSource>, buf_len: usize) -> AsyncAdapterStream {
|
pub fn new(stream: Box<dyn AsyncMediaSource>, buf_len: usize) -> AsyncAdapterStream {
|
||||||
let (bytes_in, bytes_out) = SharedRb::new(buf_len).split();
|
let (bytes_in, bytes_out) = SharedRb::new(buf_len).split();
|
||||||
|
let bytes_out = bytes_out.into();
|
||||||
let (resp_tx, resp_rx) = flume::unbounded();
|
let (resp_tx, resp_rx) = flume::unbounded();
|
||||||
let (req_tx, req_rx) = flume::unbounded();
|
let (req_tx, req_rx) = flume::unbounded();
|
||||||
let can_seek = stream.is_seekable();
|
let can_seek = stream.is_seekable();
|
||||||
@@ -233,7 +238,8 @@ impl Read for AsyncAdapterStream {
|
|||||||
|| self.finalised.load(Ordering::Relaxed));
|
|| self.finalised.load(Ordering::Relaxed));
|
||||||
drop(self.handle_messages(Operation::Read { block }));
|
drop(self.handle_messages(Operation::Read { block }));
|
||||||
|
|
||||||
match self.bytes_out.read(buf) {
|
let mut rb = self.bytes_out.lock();
|
||||||
|
match rb.read(buf) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
self.notify_tx.notify_one();
|
self.notify_tx.notify_one();
|
||||||
return Ok(n);
|
return Ok(n);
|
||||||
@@ -278,7 +284,9 @@ impl Seek for AsyncAdapterStream {
|
|||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.bytes_out.skip(self.bytes_out.capacity());
|
let mut rb = self.bytes_out.lock();
|
||||||
|
let cap = rb.capacity();
|
||||||
|
rb.skip(cap.into());
|
||||||
|
|
||||||
_ = self.req_tx.send(AdapterRequest::SeekCleared);
|
_ = self.req_tx.send(AdapterRequest::SeekCleared);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user