Files
songbird/src/input/adapters/async_adapter.rs
Kyle Simpson 3daf11f5d1 Driver: Implement audio scheduler (#179)
This PR implements a custom scheduler for audio threads, which reduces thread use and (often) memory consumption.

To save threads and memory (e.g., packet buffer allocations), Songbird parks Mixer tasks which do not have any live Tracks.
These are now all co-located on a single async 'Idle' task.
This task is responsible for managing UDP keepalive messages for each task, maintaining event state, and executing any Mixer task messages.
Whenever any message arrives which adds a `Track`, the mixer task is moved to a live thread.
The Idle task inspects task counts and execution time on each thread, choosing the first live thread with room, and creating a new one if needed.

Each live thread is responsible for running as many live mixers as it can in a single tick every 20ms: this currently defaults to 16 mixers per thread, but is user-configurable.
A live thread also stores RTP packet blocks to be written into by each sub-task.
Each live thread has a conservative limit of 18ms that it will aim to stay under: if all work takes longer than this, it will offload the task with the highest mixing cost once per tick onto another (possibly new) live worker thread.
2023-11-20 00:02:57 +00:00

378 lines
12 KiB
Rust

use crate::input::AudioStreamError;
use async_trait::async_trait;
use flume::{Receiver, RecvError, Sender, TryRecvError};
use futures::{future::Either, stream::FuturesUnordered, FutureExt, StreamExt};
use ringbuf::*;
use std::{
io::{
Error as IoError,
ErrorKind as IoErrorKind,
Read,
Result as IoResult,
Seek,
SeekFrom,
Write,
},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use symphonia_core::io::MediaSource;
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt},
sync::Notify,
};
struct AsyncAdapterSink {
bytes_in: HeapProducer<u8>,
req_rx: Receiver<AdapterRequest>,
resp_tx: Sender<AdapterResponse>,
stream: Box<dyn AsyncMediaSource>,
notify_rx: Arc<Notify>,
}
impl AsyncAdapterSink {
async fn launch(mut self) {
let mut inner_buf = [0u8; 32 * 1024];
let mut read_region = 0..0;
let mut hit_end = false;
let mut blocked = false;
let mut pause_buf_moves = false;
let mut seek_res = None;
let mut seen_bytes = 0;
loop {
// if read_region is empty, refill from src.
// if that read is zero, tell other half.
// if WouldBlock, block on msg acquire,
// else non_block msg acquire.
if !pause_buf_moves {
if !hit_end && read_region.is_empty() {
if let Ok(n) = self.stream.read(&mut inner_buf).await {
read_region = 0..n;
if n == 0 {
drop(self.resp_tx.send_async(AdapterResponse::ReadZero).await);
hit_end = true;
}
seen_bytes += n as u64;
} else {
match self.stream.try_resume(seen_bytes).await {
Ok(s) => {
self.stream = s;
},
Err(_e) => break,
}
}
}
while !read_region.is_empty() && !blocked {
if let Ok(n_moved) = self
.bytes_in
.write(&inner_buf[read_region.start..read_region.end])
{
read_region.start += n_moved;
drop(self.resp_tx.send_async(AdapterResponse::ReadOccurred).await);
} else {
blocked = true;
}
}
}
let msg = if blocked || hit_end {
let mut fs = FuturesUnordered::new();
fs.push(Either::Left(self.req_rx.recv_async()));
fs.push(Either::Right(self.notify_rx.notified().map(|_| {
let o: Result<AdapterRequest, RecvError> = Ok(AdapterRequest::Wake);
o
})));
match fs.next().await {
Some(Ok(a)) => a,
_ => break,
}
} else {
match self.req_rx.try_recv() {
Ok(a) => a,
Err(TryRecvError::Empty) => continue,
_ => break,
}
};
match msg {
AdapterRequest::Wake => blocked = false,
AdapterRequest::ByteLen => {
drop(
self.resp_tx
.send_async(AdapterResponse::ByteLen(self.stream.byte_len().await))
.await,
);
},
AdapterRequest::Seek(pos) => {
pause_buf_moves = true;
drop(self.resp_tx.send_async(AdapterResponse::SeekClear).await);
seek_res = Some(self.stream.seek(pos).await);
},
AdapterRequest::SeekCleared => {
if let Some(res) = seek_res.take() {
drop(
self.resp_tx
.send_async(AdapterResponse::SeekResult(res))
.await,
);
}
pause_buf_moves = false;
},
}
}
}
}
/// An adapter for converting an async media source into a synchronous one
/// usable by symphonia.
///
/// This adapter takes a source implementing `AsyncRead`, and allows the receive side to
/// pass along seek requests needed. This allows for passing bytes from exclusively `AsyncRead`
/// streams (e.g., hyper HTTP sessions) to Songbird.
pub struct AsyncAdapterStream {
bytes_out: HeapConsumer<u8>,
can_seek: bool,
// Note: these are Atomic just to work around the need for
// check_messages to take &self rather than &mut.
finalised: AtomicBool,
bytes_known_present: AtomicBool,
req_tx: Sender<AdapterRequest>,
resp_rx: Receiver<AdapterResponse>,
notify_tx: Arc<Notify>,
}
impl AsyncAdapterStream {
/// Wrap and pull from an async file stream, with an intermediate ring-buffer of size `buf_len`
/// between the async and sync halves.
#[must_use]
pub fn new(stream: Box<dyn AsyncMediaSource>, buf_len: usize) -> AsyncAdapterStream {
let (bytes_in, bytes_out) = SharedRb::new(buf_len).split();
let (resp_tx, resp_rx) = flume::unbounded();
let (req_tx, req_rx) = flume::unbounded();
let can_seek = stream.is_seekable();
let notify_rx = Arc::new(Notify::new());
let notify_tx = notify_rx.clone();
let sink = AsyncAdapterSink {
bytes_in,
req_rx,
resp_tx,
stream,
notify_rx,
};
let stream = AsyncAdapterStream {
bytes_out,
can_seek,
finalised: false.into(),
bytes_known_present: false.into(),
req_tx,
resp_rx,
notify_tx,
};
tokio::spawn(async move {
Box::pin(sink.launch()).await;
});
stream
}
fn handle_messages(&self, op: Operation) -> Option<AdapterResponse> {
loop {
let msg = if op.will_block() {
self.resp_rx.recv().ok()
} else {
self.resp_rx.try_recv().ok()
};
let msg = if let Some(msg) = msg { msg } else { break None };
// state changes
match &msg {
AdapterResponse::ReadZero => {
self.finalised.store(true, Ordering::Relaxed);
},
AdapterResponse::ReadOccurred => {
self.bytes_known_present.store(true, Ordering::Relaxed);
},
_ => {},
}
if op.expected_msg(&msg) {
break Some(msg);
}
}
}
fn is_dropped_and_clear(&self) -> bool {
self.resp_rx.is_empty() && self.resp_rx.is_disconnected()
}
fn check_dropped(&self) -> IoResult<()> {
if self.is_dropped_and_clear() {
Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Async half was dropped.",
))
} else {
Ok(())
}
}
}
impl Read for AsyncAdapterStream {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
loop {
let block = !(self.bytes_known_present.load(Ordering::Relaxed)
|| self.finalised.load(Ordering::Relaxed));
drop(self.handle_messages(Operation::Read { block }));
match self.bytes_out.read(buf) {
Ok(n) => {
self.notify_tx.notify_one();
return Ok(n);
},
Err(e) if e.kind() == IoErrorKind::WouldBlock => {
// receive side must ABSOLUTELY be unblocked here.
self.notify_tx.notify_one();
if self.finalised.load(Ordering::Relaxed) {
return Ok(0);
}
self.bytes_known_present.store(false, Ordering::Relaxed);
self.check_dropped()?;
},
a => {
println!("Misc err {a:?}");
return a;
},
}
}
}
}
impl Seek for AsyncAdapterStream {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
if !self.can_seek {
return Err(IoError::new(
IoErrorKind::Unsupported,
"Async half does not support seek operations.",
));
}
self.check_dropped()?;
let _ = self.req_tx.send(AdapterRequest::Seek(pos));
// wait for async to tell us that it has stopped writing,
// then clear buf and allow async to write again.
self.finalised.store(false, Ordering::Relaxed);
match self.handle_messages(Operation::Seek) {
Some(AdapterResponse::SeekClear) => {},
None => self.check_dropped().map(|_| unreachable!())?,
_ => unreachable!(),
}
self.bytes_out.skip(self.bytes_out.capacity());
let _ = self.req_tx.send(AdapterRequest::SeekCleared);
match self.handle_messages(Operation::Seek) {
Some(AdapterResponse::SeekResult(a)) => a,
None => self.check_dropped().map(|_| unreachable!()),
_ => unreachable!(),
}
}
}
impl MediaSource for AsyncAdapterStream {
fn is_seekable(&self) -> bool {
self.can_seek
}
fn byte_len(&self) -> Option<u64> {
self.check_dropped().ok()?;
let _ = self.req_tx.send(AdapterRequest::ByteLen);
match self.handle_messages(Operation::Len) {
Some(AdapterResponse::ByteLen(a)) => a,
None => self.check_dropped().ok().map(|_| unreachable!()),
_ => unreachable!(),
}
}
}
enum AdapterRequest {
Wake,
Seek(SeekFrom),
SeekCleared,
ByteLen,
}
enum AdapterResponse {
SeekResult(IoResult<u64>),
SeekClear,
ByteLen(Option<u64>),
ReadZero,
ReadOccurred,
}
#[derive(Copy, Clone)]
enum Operation {
Read { block: bool },
Seek,
Len,
}
impl Operation {
fn will_block(self) -> bool {
match self {
Self::Read { block } => block,
_ => true,
}
}
fn expected_msg(self, msg: &AdapterResponse) -> bool {
match self {
Self::Read { .. } => matches!(
msg,
AdapterResponse::ReadOccurred | AdapterResponse::ReadZero
),
Self::Seek => matches!(
msg,
AdapterResponse::SeekResult(_) | AdapterResponse::SeekClear
),
Self::Len => matches!(msg, AdapterResponse::ByteLen(_)),
}
}
}
/// An async port of symphonia's [`MediaSource`].
///
/// Streams which are not seekable should implement `AsyncSeek` such that all operations
/// fail with `Unsupported`, and implement `fn is_seekable(&self) -> { false }`.
///
/// [`MediaSource`]: MediaSource
#[async_trait]
pub trait AsyncMediaSource: AsyncRead + AsyncSeek + Send + Sync + Unpin {
/// Returns if the source is seekable. This may be an expensive operation.
fn is_seekable(&self) -> bool;
/// Returns the length in bytes, if available. This may be an expensive operation.
async fn byte_len(&self) -> Option<u64>;
/// Tries to recreate this stream in event of an error, resuming from the given offset.
async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}