Input: Fix high CPU use when initialising long files over HTTP (#163)
Fixes the possibility of a spinlock while reading bytes from an async->sync adapter. This case can be observed with long (e.g., 10 hours of silence) videos which seem to be served slower than we would like to parse their headers. The fix moves most communication to blocking: `read` calls first parse all messages form the async context in a non-blocking way, then swap to blocking if no bytes are available. Tested using `cargo make ready` and "examples/serenity/voice" against the URL https://www.youtube.com/watch?v=g4mHPeMGTJM.
This commit is contained in:
@@ -34,7 +34,7 @@ struct AsyncAdapterSink {
|
|||||||
|
|
||||||
impl AsyncAdapterSink {
|
impl AsyncAdapterSink {
|
||||||
async fn launch(mut self) {
|
async fn launch(mut self) {
|
||||||
let mut inner_buf = [0u8; 10 * 1024];
|
let mut inner_buf = [0u8; 32 * 1024];
|
||||||
let mut read_region = 0..0;
|
let mut read_region = 0..0;
|
||||||
let mut hit_end = false;
|
let mut hit_end = false;
|
||||||
let mut blocked = false;
|
let mut blocked = false;
|
||||||
@@ -73,6 +73,7 @@ impl AsyncAdapterSink {
|
|||||||
.write(&inner_buf[read_region.start..read_region.end])
|
.write(&inner_buf[read_region.start..read_region.end])
|
||||||
{
|
{
|
||||||
read_region.start += n_moved;
|
read_region.start += n_moved;
|
||||||
|
drop(self.resp_tx.send_async(AdapterResponse::ReadOccurred).await);
|
||||||
} else {
|
} else {
|
||||||
blocked = true;
|
blocked = true;
|
||||||
}
|
}
|
||||||
@@ -137,9 +138,10 @@ impl AsyncAdapterSink {
|
|||||||
pub struct AsyncAdapterStream {
|
pub struct AsyncAdapterStream {
|
||||||
bytes_out: HeapConsumer<u8>,
|
bytes_out: HeapConsumer<u8>,
|
||||||
can_seek: bool,
|
can_seek: bool,
|
||||||
// Note: this is Atomic just to work around the need for
|
// Note: these are Atomic just to work around the need for
|
||||||
// check_messages to take &self rather than &mut.
|
// check_messages to take &self rather than &mut.
|
||||||
finalised: AtomicBool,
|
finalised: AtomicBool,
|
||||||
|
bytes_known_present: AtomicBool,
|
||||||
req_tx: Sender<AdapterRequest>,
|
req_tx: Sender<AdapterRequest>,
|
||||||
resp_rx: Receiver<AdapterResponse>,
|
resp_rx: Receiver<AdapterResponse>,
|
||||||
notify_tx: Arc<Notify>,
|
notify_tx: Arc<Notify>,
|
||||||
@@ -168,6 +170,7 @@ impl AsyncAdapterStream {
|
|||||||
bytes_out,
|
bytes_out,
|
||||||
can_seek,
|
can_seek,
|
||||||
finalised: false.into(),
|
finalised: false.into(),
|
||||||
|
bytes_known_present: false.into(),
|
||||||
req_tx,
|
req_tx,
|
||||||
resp_rx,
|
resp_rx,
|
||||||
notify_tx,
|
notify_tx,
|
||||||
@@ -180,16 +183,29 @@ impl AsyncAdapterStream {
|
|||||||
stream
|
stream
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_messages(&self, block: bool) -> Option<AdapterResponse> {
|
fn handle_messages(&self, op: Operation) -> Option<AdapterResponse> {
|
||||||
loop {
|
loop {
|
||||||
match self.resp_rx.try_recv() {
|
let msg = if op.will_block() {
|
||||||
Ok(AdapterResponse::ReadZero) => {
|
self.resp_rx.recv().ok()
|
||||||
|
} else {
|
||||||
|
self.resp_rx.try_recv().ok()
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg = if let Some(msg) = msg { msg } else { break None };
|
||||||
|
|
||||||
|
// state changes
|
||||||
|
match &msg {
|
||||||
|
AdapterResponse::ReadZero => {
|
||||||
self.finalised.store(true, Ordering::Relaxed);
|
self.finalised.store(true, Ordering::Relaxed);
|
||||||
},
|
},
|
||||||
Ok(a) => break Some(a),
|
AdapterResponse::ReadOccurred => {
|
||||||
Err(TryRecvError::Empty) if !block => break None,
|
self.bytes_known_present.store(true, Ordering::Relaxed);
|
||||||
Err(TryRecvError::Disconnected) => break None,
|
},
|
||||||
Err(TryRecvError::Empty) => {},
|
_ => {},
|
||||||
|
}
|
||||||
|
|
||||||
|
if op.expected_msg(&msg) {
|
||||||
|
break Some(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -212,11 +228,10 @@ impl AsyncAdapterStream {
|
|||||||
|
|
||||||
impl Read for AsyncAdapterStream {
|
impl Read for AsyncAdapterStream {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
|
||||||
// TODO: make this run via condvar instead?
|
|
||||||
// This needs to remain blocking or spin loopy
|
|
||||||
// Mainly because this is at odds with "keep CPU low."
|
|
||||||
loop {
|
loop {
|
||||||
drop(self.handle_messages(false));
|
let block = !(self.bytes_known_present.load(Ordering::Relaxed)
|
||||||
|
|| self.finalised.load(Ordering::Relaxed));
|
||||||
|
drop(self.handle_messages(Operation::Read { block }));
|
||||||
|
|
||||||
match self.bytes_out.read(buf) {
|
match self.bytes_out.read(buf) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
@@ -229,9 +244,8 @@ impl Read for AsyncAdapterStream {
|
|||||||
if self.finalised.load(Ordering::Relaxed) {
|
if self.finalised.load(Ordering::Relaxed) {
|
||||||
return Ok(0);
|
return Ok(0);
|
||||||
}
|
}
|
||||||
|
self.bytes_known_present.store(false, Ordering::Relaxed);
|
||||||
self.check_dropped()?;
|
self.check_dropped()?;
|
||||||
std::thread::yield_now();
|
|
||||||
},
|
},
|
||||||
a => {
|
a => {
|
||||||
println!("Misc err {a:?}");
|
println!("Misc err {a:?}");
|
||||||
@@ -258,7 +272,7 @@ impl Seek for AsyncAdapterStream {
|
|||||||
// wait for async to tell us that it has stopped writing,
|
// wait for async to tell us that it has stopped writing,
|
||||||
// then clear buf and allow async to write again.
|
// then clear buf and allow async to write again.
|
||||||
self.finalised.store(false, Ordering::Relaxed);
|
self.finalised.store(false, Ordering::Relaxed);
|
||||||
match self.handle_messages(true) {
|
match self.handle_messages(Operation::Seek) {
|
||||||
Some(AdapterResponse::SeekClear) => {},
|
Some(AdapterResponse::SeekClear) => {},
|
||||||
None => self.check_dropped().map(|_| unreachable!())?,
|
None => self.check_dropped().map(|_| unreachable!())?,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
@@ -268,7 +282,7 @@ impl Seek for AsyncAdapterStream {
|
|||||||
|
|
||||||
let _ = self.req_tx.send(AdapterRequest::SeekCleared);
|
let _ = self.req_tx.send(AdapterRequest::SeekCleared);
|
||||||
|
|
||||||
match self.handle_messages(true) {
|
match self.handle_messages(Operation::Seek) {
|
||||||
Some(AdapterResponse::SeekResult(a)) => a,
|
Some(AdapterResponse::SeekResult(a)) => a,
|
||||||
None => self.check_dropped().map(|_| unreachable!()),
|
None => self.check_dropped().map(|_| unreachable!()),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
@@ -286,7 +300,7 @@ impl MediaSource for AsyncAdapterStream {
|
|||||||
|
|
||||||
let _ = self.req_tx.send(AdapterRequest::ByteLen);
|
let _ = self.req_tx.send(AdapterRequest::ByteLen);
|
||||||
|
|
||||||
match self.handle_messages(true) {
|
match self.handle_messages(Operation::Len) {
|
||||||
Some(AdapterResponse::ByteLen(a)) => a,
|
Some(AdapterResponse::ByteLen(a)) => a,
|
||||||
None => self.check_dropped().ok().map(|_| unreachable!()),
|
None => self.check_dropped().ok().map(|_| unreachable!()),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
@@ -306,6 +320,37 @@ enum AdapterResponse {
|
|||||||
SeekClear,
|
SeekClear,
|
||||||
ByteLen(Option<u64>),
|
ByteLen(Option<u64>),
|
||||||
ReadZero,
|
ReadZero,
|
||||||
|
ReadOccurred,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone)]
|
||||||
|
enum Operation {
|
||||||
|
Read { block: bool },
|
||||||
|
Seek,
|
||||||
|
Len,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Operation {
|
||||||
|
fn will_block(self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Read { block } => block,
|
||||||
|
_ => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn expected_msg(self, msg: &AdapterResponse) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Read { .. } => matches!(
|
||||||
|
msg,
|
||||||
|
AdapterResponse::ReadOccurred | AdapterResponse::ReadZero
|
||||||
|
),
|
||||||
|
Self::Seek => matches!(
|
||||||
|
msg,
|
||||||
|
AdapterResponse::SeekResult(_) | AdapterResponse::SeekClear
|
||||||
|
),
|
||||||
|
Self::Len => matches!(msg, AdapterResponse::ByteLen(_)),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An async port of symphonia's [`MediaSource`].
|
/// An async port of symphonia's [`MediaSource`].
|
||||||
|
|||||||
Reference in New Issue
Block a user