diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 78855d5..30647a3 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -35,7 +35,7 @@ Audio processing remains synchronous for the following reasons: Songbird subdivides voice connection handling into several long- and short-lived tasks. * **Core**: Handles and directs commands received from the driver. Responsible for connection/reconnection, and creates network tasks. -* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state, and transmitting completed voice packets and keepalive messages. ***Synchronous***. +* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state, and transmitting completed voice packets and keepalive messages. ***Synchronous when live***. * **Thread Pool**: A dynamically sized thread-pool for I/O tasks. Creates lazy tracks using `Compose` if sync creation is needed, otherwise spawns a tokio task. Seek operations always go to the thread pool. ***Synchronous***. * **Disposer**: Used by mixer thread to dispose of data with potentially long/blocking `Drop` implementations (i.e., audio sources). ***Synchronous***. * **Events**: Stores and runs event handlers, tracks event timing, and handles @@ -46,6 +46,33 @@ Songbird subdivides voice connection handling into several long- and short-lived ![](images/driver.png) +## Scheduler +To save threads and memory (e.g., packet buffer allocations), Songbird parks Mixer tasks which do not have any live Tracks. +These are all co-located on a single async task. +This task is responsible for managing UDP keepalive messages for each task, maintaining event state, and executing any Mixer task messages. +Whenever any message arrives which adds a `Track`, the mixer task is moved to a live thread. +The Idle task inspects task counts and execution time on each thread, choosing the first live thread with room, creating a new one if needed. + +Each live thread is responsible for running as many live mixers as it can in a single tick every 20ms: this currently defaults to 16 mixers per thread, but is user-configurable. +A live thread also stores RTP packet blocks to be written into by each sub-task. +Audio threads have a budget of 20ms to complete all message handling, mixing, encoding, and encryption. +*These threads are synchronous, as explained above: the bulk costs (i.e., encoding) are compute-bound work and would block the Tokio executor.* +Mixer logic is handled in this order to minimise deadline variance: +``` +handle idle->live messages +handle all driver->mixer messages +cleanup idle/dead mixers +mix + encode + encrypt all mixers into packet buffer +check for excess packet blocks +sleep 'til next 20ms boundary + +send all packets, adjust RTP fields +handle per-track messages +``` +Each live thread has a conservative limit of 18ms that it will aim to stay under: if all work takes longer than this, it will offload the task with the highest mixing cost once per 20ms tick. + +![](images/scheduler.png) + ``` src/driver/* ``` diff --git a/Cargo.toml b/Cargo.toml index 9e7c675..eb8c101 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ derivative = "2" discortp = { default-features = false, features = ["discord", "pnet", "rtp"], optional = true, version = "0.5" } flume = { optional = true, version = "0.10" } futures = "0.3" +nohash-hasher = { optional = true, version = "0.2.0" } once_cell = { optional = true, version = "1" } parking_lot = { optional = true, version = "0.12" } pin-project = "1" @@ -62,11 +63,11 @@ version = "0.11" optional = true [dev-dependencies] +byteorder = "1" criterion = "0.4" ntest = "0.9" symphonia = { version = "0.5.2", features = ["aac", "isomp4", "mp3"] } -utils = { path = "utils" } -tokio = { version = "1", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "test-util"] } [features] # Core features @@ -93,6 +94,7 @@ driver = [ "dep:discortp", "dep:reqwest", "dep:flume", + "dep:nohash-hasher", "dep:once_cell", "dep:parking_lot", "dep:rand", @@ -141,7 +143,10 @@ receive = ["dep:bytes", "discortp?/demux", "discortp?/rtcp"] # Used for docgen/testing/benchmarking. full-doc = ["default", "twilight", "builtin-queue", "receive"] -internals = [] +internals = ["dep:byteorder"] + +[lib] +bench = false [[bench]] name = "base-mixing" diff --git a/benches/base-mixing.rs b/benches/base-mixing.rs index d9e12c3..0eaf7cd 100644 --- a/benches/base-mixing.rs +++ b/benches/base-mixing.rs @@ -6,6 +6,7 @@ use songbird::{ MixMode, }, input::{codecs::*, Input, LiveInput, Parsed}, + test_utils as utils, }; use std::io::Cursor; use symphonia_core::audio::{AudioBuffer, Layout, SampleBuffer, Signal, SignalSpec}; diff --git a/benches/mixing-task.rs b/benches/mixing-task.rs index 2da13d6..7803154 100644 --- a/benches/mixing-task.rs +++ b/benches/mixing-task.rs @@ -16,164 +16,23 @@ use songbird::{ bench_internals::{ self, mixer::{state::InputState, Mixer}, + scheduler::*, task_message::*, CryptoState, }, Bitrate, + DummyMixer, + Listeners, + MockScheduler, }, input::{cached::Compressed, codecs::*, Input, RawAdapter}, tracks, Config, }; -use std::{io::Cursor, net::UdpSocket}; +use std::{io::Cursor, net::UdpSocket, sync::Arc}; use tokio::runtime::{Handle, Runtime}; use xsalsa20poly1305::{KeyInit, XSalsa20Poly1305 as Cipher, KEY_SIZE}; -// create a dummied task + interconnect. -// measure perf at varying numbers of sources (binary 1--64) without passthrough support. - -fn dummied_mixer( - handle: Handle, - softclip: bool, -) -> ( - Mixer, - ( - Receiver, - Receiver, - Receiver, - ), -) { - let (mix_tx, mix_rx) = flume::unbounded(); - let (core_tx, core_rx) = flume::unbounded(); - let (event_tx, event_rx) = flume::unbounded(); - - let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded(); - - let ic = Interconnect { - core: core_tx, - events: event_tx, - mixer: mix_tx, - }; - - let config = Config::default().use_softclip(softclip); - - let mut out = Mixer::new(mix_rx, handle, ic, config); - - let udp_tx = UdpSocket::bind("0.0.0.0:0").expect("Failed to create send port."); - udp_tx - .connect("127.0.0.1:5316") - .expect("Failed to connect to local dest port."); - - let fake_conn = MixerConnection { - cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(), - crypto_state: CryptoState::Normal, - udp_rx: udp_receiver_tx, - udp_tx, - }; - - out.conn_active = Some(fake_conn); - - out.skip_sleep = true; - - (out, (core_rx, event_rx, udp_receiver_rx)) -} - -fn mixer_float( - num_tracks: usize, - handle: Handle, - softclip: bool, -) -> ( - Mixer, - ( - Receiver, - Receiver, - Receiver, - ), -) { - let mut out = dummied_mixer(handle, softclip); - - let floats = utils::make_sine(10 * STEREO_FRAME_SIZE, true); - - for i in 0..num_tracks { - let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into(); - let promoted = match input { - Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), - _ => panic!("Failed to create a guaranteed source."), - }; - let (handle, mut ctx) = - bench_internals::track_context(Input::Live(promoted.unwrap(), None).into()); - out.0.add_track(ctx); - } - - out -} - -fn mixer_float_drop( - num_tracks: usize, - handle: Handle, -) -> ( - Mixer, - ( - Receiver, - Receiver, - Receiver, - ), -) { - let mut out = dummied_mixer(handle, true); - - for i in 0..num_tracks { - let floats = utils::make_sine((i / 5) * STEREO_FRAME_SIZE, true); - let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into(); - let promoted = match input { - Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), - _ => panic!("Failed to create a guaranteed source."), - }; - let (handle, mut ctx) = - bench_internals::track_context(Input::Live(promoted.unwrap(), None).into()); - out.0.add_track(ctx); - } - - out -} - -fn mixer_opus( - handle: Handle, -) -> ( - Mixer, - ( - Receiver, - Receiver, - Receiver, - ), -) { - // should add a single opus-based track. - // make this fully loaded to prevent any perf cost there. - let mut out = dummied_mixer(handle.clone(), false); - - let floats = utils::make_sine(6 * STEREO_FRAME_SIZE, true); - - let input: Input = RawAdapter::new(Cursor::new(floats), 48_000, 2).into(); - - let mut src = handle.block_on(async move { - Compressed::new(input, Bitrate::BitsPerSecond(128_000)) - .await - .expect("These parameters are well-defined.") - }); - - src.raw.load_all(); - - let promoted = match src.into() { - Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), - _ => panic!("Failed to create a guaranteed source."), - }; - let (handle, mut ctx) = - bench_internals::track_context(Input::Live(promoted.unwrap(), None).into()); - - out.0.add_track(ctx); - - out -} - fn no_passthrough(c: &mut Criterion) { let rt = Runtime::new().unwrap(); @@ -187,9 +46,14 @@ fn no_passthrough(c: &mut Criterion) { &track_count, |b, i| { b.iter_batched_ref( - || black_box(mixer_float(*i, rt.handle().clone(), true)), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_float(*i, rt.handle().clone(), true)], + )) + }, |input| { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); }, BatchSize::SmallInput, ) @@ -200,9 +64,14 @@ fn no_passthrough(c: &mut Criterion) { &track_count, |b, i| { b.iter_batched_ref( - || black_box(mixer_float(*i, rt.handle().clone(), false)), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_float(*i, rt.handle().clone(), false)], + )) + }, |input| { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); }, BatchSize::SmallInput, ) @@ -213,10 +82,71 @@ fn no_passthrough(c: &mut Criterion) { &track_count, |b, i| { b.iter_batched_ref( - || black_box(mixer_float(*i, rt.handle().clone(), true)), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_float(*i, rt.handle().clone(), true)], + )) + }, |input| { for i in 0..5 { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + + group.finish(); +} + +fn no_passthrough_multimix(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + const N_MIXERS: usize = 16; + let mut group = c.benchmark_group(format!("Float Input (No Passthrough, {N_MIXERS} mixers)")); + + for shift in 0..=2 { + let track_count = 1 << shift; + + group.bench_with_input( + BenchmarkId::new("Single Packet (No Soft-Clip)", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_float(*i, rt.handle().clone(), false)) + .collect(), + )) + }, + |input| { + black_box(input.0.core.run_once()); + }, + BatchSize::SmallInput, + ) + }, + ); + group.bench_with_input( + BenchmarkId::new("n=5 Packets", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_float(*i, rt.handle().clone(), false)) + .collect(), + )) + }, + |input| { + for i in 0..5 { + black_box(input.0.core.run_once()); } }, BatchSize::SmallInput, @@ -235,19 +165,29 @@ fn passthrough(c: &mut Criterion) { group.bench_function("Single Packet", |b| { b.iter_batched_ref( - || black_box(mixer_opus(rt.handle().clone())), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_opus(rt.handle().clone())], + )) + }, |input| { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); }, BatchSize::SmallInput, ) }); group.bench_function("n=5 Packets", |b| { b.iter_batched_ref( - || black_box(mixer_opus(rt.handle().clone())), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_opus(rt.handle().clone())], + )) + }, |input| { for i in 0..5 { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); } }, BatchSize::SmallInput, @@ -257,15 +197,76 @@ fn passthrough(c: &mut Criterion) { group.finish(); } +fn passthrough_multimix(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + const N_MIXERS: usize = 16; + let mut group = c.benchmark_group(format!("Opus Input (Passthrough, {N_MIXERS} mixers)")); + + for shift in 0..=2 { + let track_count = 1 << shift; + + group.bench_with_input( + BenchmarkId::new("Single Packet (No Soft-Clip)", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_opus(rt.handle().clone())) + .collect(), + )) + }, + |input| { + black_box(input.0.core.run_once()); + }, + BatchSize::SmallInput, + ) + }, + ); + group.bench_with_input( + BenchmarkId::new("n=5 Packets", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_opus(rt.handle().clone())) + .collect(), + )) + }, + |input| { + for i in 0..5 { + black_box(input.0.core.run_once()); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + + group.finish(); +} + fn culling(c: &mut Criterion) { let rt = Runtime::new().unwrap(); c.bench_function("Worst-case Track Culling (15 tracks, 5 pkts)", |b| { b.iter_batched_ref( - || black_box(mixer_float_drop(15, rt.handle().clone())), + || { + black_box(MockScheduler::from_mixers( + None, + vec![Mixer::test_with_float_drop(15, rt.handle().clone())], + )) + }, |input| { for i in 0..5 { - black_box(input.0.cycle()); + black_box(input.0.core.run_once()); } }, BatchSize::SmallInput, @@ -273,5 +274,69 @@ fn culling(c: &mut Criterion) { }); } -criterion_group!(benches, no_passthrough, passthrough, culling); -criterion_main!(benches); +fn task_culling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + const N_MIXERS: usize = 8; + + c.bench_function("Live Mixer Thread Culling", |b| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_opus(rt.handle().clone())) + .collect(), + )) + }, + |input| { + black_box(input.0.core.remove_task(0)); + }, + BatchSize::SmallInput, + ) + }); + + c.bench_function("Live Mixer Thread Culling (Practical)", |b| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_opus(rt.handle().clone())) + .collect(), + )) + }, + |input| { + black_box({ + input.0.core.mark_for_cull(0); + input.0.core.mark_for_cull(1); + input.0.core.mark_for_cull(4); + input.0.core.demote_and_remove_mixers(); + }); + }, + BatchSize::SmallInput, + ) + }); + + c.bench_function("Live Mixer Thread Culling (Practical, NoDel)", |b| { + b.iter_batched_ref( + || { + black_box(MockScheduler::from_mixers( + None, + (0..N_MIXERS) + .map(|_| Mixer::test_with_opus(rt.handle().clone())) + .collect(), + )) + }, + |input| { + black_box(input.0.core.demote_and_remove_mixers()); + }, + BatchSize::SmallInput, + ) + }); +} + +criterion_group!(individual, no_passthrough, passthrough); +criterion_group!(multimix, no_passthrough_multimix, passthrough_multimix); +criterion_group!(deletions, culling, task_culling); +criterion_main!(individual, multimix, deletions); diff --git a/examples/serenity/voice/src/main.rs b/examples/serenity/voice/src/main.rs index e66ed21..8e4377b 100644 --- a/examples/serenity/voice/src/main.rs +++ b/examples/serenity/voice/src/main.rs @@ -31,8 +31,7 @@ use serenity::{ framework::{ standard::{ macros::{command, group}, - Args, - CommandResult, + Args, CommandResult, }, StandardFramework, }, diff --git a/images/arch.afdesign b/images/arch.afdesign index d7ab5a0..3d32bf2 100644 Binary files a/images/arch.afdesign and b/images/arch.afdesign differ diff --git a/images/driver.png b/images/driver.png index b06c2c2..8f285e3 100644 Binary files a/images/driver.png and b/images/driver.png differ diff --git a/images/driver.svg b/images/driver.svg index c3b6faa..05298d9 100644 --- a/images/driver.svg +++ b/images/driver.svg @@ -1,6 +1,6 @@ - + @@ -32,15 +32,6 @@ Driver - - - - - Async - - - Sync - @@ -49,37 +40,25 @@ Core - + - - Thread Pool + + Scheduler - - - - - - - Seek - + + + - - - - - - Compose - + + - - - - - - ... - + + + + + Mixer @@ -91,22 +70,6 @@ Events - - - - - - Mixer - - - - - - - - Disposer - - @@ -130,14 +93,8 @@ - - - - - - - - + + @@ -145,8 +102,8 @@ - - + + diff --git a/images/gateway.png b/images/gateway.png index 79f5a4f..50f6b14 100644 Binary files a/images/gateway.png and b/images/gateway.png differ diff --git a/images/scheduler.png b/images/scheduler.png new file mode 100644 index 0000000..14b6566 Binary files /dev/null and b/images/scheduler.png differ diff --git a/images/scheduler.svg b/images/scheduler.svg new file mode 100644 index 0000000..77848fd --- /dev/null +++ b/images/scheduler.svg @@ -0,0 +1,329 @@ + + + + + + + + + Async + + + Sync + + + + + + + Idle Mixers + + + + + + + + + ... + + + + + + + + Mixer + + + + + + + + Mixer + + + + + + + + + + + + ... + + + + + + + + Mixer + + + + + + + + Mixer + + + + + + + + + + + ... + + + + + + + + Mixer + + + + + + + + Mixer + + + + + + + + + + + + + + Thread Pool + + + + + + + ... + + + + + + + + + Seek + + + + + + + + Compose + + + + + + + Live Mixers Thread # + 1 + + + + + + + + + ... + + + + + + + + Mixer + + + + + + + + Mixer + + + + + + + + + + + + + + Thread Pool + + + + + + + ... + + + + + + + + + Seek + + + + + + + + Compose + + + + + + + Live Mixers Thread # + n + + + + + + + + + ... + + + + + + + + Mixer + + + + + + + + Mixer + + + + + + + + + + Disposer + + + + New + Mixer/Driver + + + + Create + + + + + + + + Schedule Mixer + + + + + + + + + + + + + + Deschedule + + + + + + + Move Mixer + + + ... + + + + + + + + + + Destroy + + + + + + + + + + + + Interaction + + + + + + + + Messages + + + + Track cleanup + + + diff --git a/src/config.rs b/src/config.rs index 6a8ec37..ba3ecbe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,12 +2,21 @@ use crate::driver::DecodeMode; #[cfg(feature = "driver")] use crate::{ - driver::{retry::Retry, tasks::disposal::DisposalThread, CryptoMode, MixMode}, + driver::{ + retry::Retry, + tasks::disposal::DisposalThread, + CryptoMode, + MixMode, + Scheduler, + DEFAULT_SCHEDULER, + }, input::codecs::*, }; #[cfg(test)] use crate::driver::test_config::*; +#[cfg(all(test, feature = "driver"))] +use crate::driver::SchedulerConfig; #[cfg(feature = "driver")] use symphonia::core::{codecs::CodecRegistry, probe::Probe}; @@ -166,6 +175,7 @@ pub struct Config { /// /// [`PROBE`]: static@PROBE pub format_registry: &'static Probe, + #[cfg(feature = "driver")] /// The Sender for a channel that will run the destructor of possibly blocking values. /// @@ -177,6 +187,15 @@ pub struct Config { /// [`Songbird`]: crate::Songbird pub disposer: Option, + #[cfg(feature = "driver")] + /// The scheduler is responsible for mapping idle and active [`Driver`] instances + /// to threads. + /// + /// If set to None, then songbird will initialise the [`DEFAULT_SCHEDULER`]. + /// + /// [`Driver`]: crate::Driver + pub scheduler: Option, + // Test only attributes #[cfg(feature = "driver")] #[cfg(test)] @@ -220,6 +239,8 @@ impl Default for Config { #[cfg(feature = "driver")] disposer: None, #[cfg(feature = "driver")] + scheduler: None, + #[cfg(feature = "driver")] #[cfg(test)] tick_style: TickStyle::Timed, #[cfg(feature = "driver")] @@ -326,6 +347,22 @@ impl Config { self } + /// Sets this `Config`'s mixer scheduler. + #[must_use] + pub fn scheduler(mut self, scheduler: Scheduler) -> Self { + self.scheduler = Some(scheduler); + self + } + + /// Returns a lightweight reference to the audio scheduler this `Config` will use. + #[must_use] + pub fn get_scheduler(&self) -> Scheduler { + self.scheduler + .as_ref() + .unwrap_or(&*DEFAULT_SCHEDULER) + .clone() + } + /// Ensures a global disposer has been set, initializing one if not. #[must_use] pub(crate) fn initialise_disposer(self) -> Self { @@ -381,8 +418,13 @@ impl Config { (OutputMode::Rtp(rtp_tx), OutputReceiver::Rtp(rtp_rx)) }; + let mut sc_config = SchedulerConfig::default(); + sc_config.strategy = crate::driver::SchedulerMode::MaxPerThread(1.try_into().unwrap()); + let config = Config::default() .tick_style(TickStyle::UntimedWithExecLimit(tick_rx)) + // give each test its own thread in the scheduler for simplicity. + .scheduler(Scheduler::new(sc_config)) .override_connection(Some(conn)); let handle = DriverTestHandle { rx, tx: tick_tx }; diff --git a/src/constants.rs b/src/constants.rs index 332dc73..f31ff8a 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -122,6 +122,9 @@ pub mod test_data { /// Path to a Wav source which can be read via a File. pub const FILE_WAV_TARGET: &str = "resources/loop.wav"; + /// Path to a shorter MP3 source which can be read via a File. + pub const FILE_SHORT_MP3_TARGET: &str = "resources/ting.mp3"; + /// Path to an MP4 (H264 + AAC) source which can be read via a File. pub const FILE_VID_TARGET: &str = "resources/ting-vid.mp4"; } diff --git a/src/driver/bench_internals.rs b/src/driver/bench_internals.rs index a12fa97..3dfb035 100644 --- a/src/driver/bench_internals.rs +++ b/src/driver/bench_internals.rs @@ -17,3 +17,7 @@ use crate::{ pub fn track_context(t: Track) -> (TrackHandle, TrackContext) { t.into_context() } + +pub mod scheduler { + pub use crate::driver::scheduler::*; +} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 4045978..3e5b58b 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -17,9 +17,12 @@ mod crypto; mod decode_mode; mod mix_mode; pub mod retry; +mod scheduler; pub(crate) mod tasks; #[cfg(test)] pub(crate) mod test_config; +#[cfg(any(test, feature = "internals"))] +mod test_impls; use connection::error::{Error, Result}; pub use crypto::CryptoMode; @@ -27,8 +30,18 @@ pub(crate) use crypto::CryptoState; #[cfg(feature = "receive")] pub use decode_mode::DecodeMode; pub use mix_mode::MixMode; +pub use scheduler::{ + Config as SchedulerConfig, + Error as SchedulerError, + LiveStatBlock, + Mode as SchedulerMode, + Scheduler, + DEFAULT_SCHEDULER, +}; #[cfg(test)] pub use test_config::*; +#[cfg(any(test, feature = "internals"))] +pub use test_impls::*; #[cfg(feature = "builtin-queue")] use crate::tracks::TrackQueue; diff --git a/src/driver/scheduler/config.rs b/src/driver/scheduler/config.rs new file mode 100644 index 0000000..87a3b0f --- /dev/null +++ b/src/driver/scheduler/config.rs @@ -0,0 +1,66 @@ +use std::num::NonZeroUsize; + +use super::*; + +/// Configuration for how a [`Scheduler`] handles tasks. +/// +/// [`Scheduler`]: super::Scheduler +#[derive(Clone, Debug)] +#[non_exhaustive] +pub struct Config { + /// How Live mixer tasks will be mapped to individual threads. + /// + /// Defaults to `Mode::MaxPerThread(16)`. + pub strategy: Mode, + /// Move costly mixers to another thread if their parent worker is at + /// risk of missing its deadlines. + /// + /// Defaults to `true`. + pub move_expensive_tasks: bool, +} + +impl Default for Config { + fn default() -> Self { + Self { + strategy: Mode::default(), + move_expensive_tasks: true, + } + } +} + +/// Strategies for mapping live mixer tasks to individual threads. +/// +/// Defaults to `MaxPerThread(16)`. +#[derive(Clone, Debug)] +#[non_exhaustive] +pub enum Mode { + /// Allows at most `n` tasks to run per thread. + MaxPerThread(NonZeroUsize), +} + +impl Mode { + /// Returns the number of `Mixer`s that a scheduler should preallocate + /// resources for. + pub(crate) fn prealloc_size(&self) -> usize { + match self { + Self::MaxPerThread(n) => n.get(), + } + } + + /// Returns the maximum number of concurrent mixers that a scheduler is + /// allowed to place on a single thread. + /// + /// Future scheduling modes may choose to limit *only* on execution cost. + #[allow(clippy::unnecessary_wraps)] + pub(crate) fn task_limit(&self) -> Option { + match self { + Self::MaxPerThread(n) => Some(n.get()), + } + } +} + +impl Default for Mode { + fn default() -> Self { + Self::MaxPerThread(DEFAULT_MIXERS_PER_THREAD) + } +} diff --git a/src/driver/scheduler/idle.rs b/src/driver/scheduler/idle.rs new file mode 100644 index 0000000..ad4042b --- /dev/null +++ b/src/driver/scheduler/idle.rs @@ -0,0 +1,319 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use flume::{Receiver, Sender}; +use nohash_hasher::{BuildNoHashHasher, IntMap}; +use tokio::time::{Instant as TokInstant, Interval}; + +use crate::constants::*; + +use super::*; + +const THREAD_CULL_TIMER: Duration = Duration::from_secs(60); + +/// An async task responsible for maintaining UDP keepalives and event state for inactive +/// `Mixer` tasks. +pub(crate) struct Idle { + config: Config, + cull_timer: Duration, + tasks: IntMap, + // track taskids which are live to prevent their realloc? unlikely w u64 but still + pub(crate) stats: Arc, + rx: Receiver, + tx: Sender, + next_id: TaskId, + next_worker_id: WorkerId, + workers: Vec, + to_cull: Vec, +} + +impl Idle { + pub fn new(config: Config) -> (Self, Sender) { + let (tx, rx) = flume::unbounded(); + + let stats = Arc::default(); + let tasks = HashMap::with_capacity_and_hasher(128, BuildNoHashHasher::default()); + + // TODO: include heap of keepalive sending times? + let out = Self { + config, + cull_timer: THREAD_CULL_TIMER, + tasks, + stats, + rx, + tx: tx.clone(), + next_id: TaskId::new(), + next_worker_id: WorkerId::new(), + workers: Vec::with_capacity(16), + to_cull: vec![], + }; + + (out, tx) + } + + /// Run the inner task until all external `Scheduler` handles are dropped. + async fn run(&mut self) { + let mut interval = tokio::time::interval(TIMESTEP_LENGTH); + while self.run_once(&mut interval).await {} + } + + /// Run one 'tick' of idle thread maintenance. + /// + /// This is a priority system over 2 main tasks: + /// 1) handle scheduling/upgrade/action requests for mixers + /// 2) [every 20ms]tick the main timer for each task, send keepalive if + /// needed, reclaim & cull workers. + /// + /// Idle mixers spawn an async task each to forward their `MixerMessage`s + /// on to this task to be handled by 1). These tasks self-terminate if a + /// message would make a mixer `now_live`. + async fn run_once(&mut self, interval: &mut Interval) -> bool { + tokio::select! { + biased; + msg = self.rx.recv_async() => match msg { + Ok(SchedulerMessage::NewMixer(rx, ic, cfg)) => { + let mixer = ParkedMixer::new(rx, ic, cfg); + let id = self.next_id.incr(); + + mixer.spawn_forwarder(self.tx.clone(), id); + self.tasks.insert(id, mixer); + self.stats.add_idle_mixer(); + }, + Ok(SchedulerMessage::Demote(id, task)) => { + task.send_gateway_not_speaking(); + + task.spawn_forwarder(self.tx.clone(), id); + self.tasks.insert(id, task); + }, + Ok(SchedulerMessage::Do(id, mix_msg)) => { + let now_live = mix_msg.is_mixer_now_live(); + let task = self.tasks.get_mut(&id).unwrap(); + + match task.handle_message(mix_msg) { + Ok(false) if now_live => { + let task = self.tasks.remove(&id).unwrap(); + self.schedule_mixer(task, id, None); + }, + Ok(false) => {}, + Ok(true) | Err(_) => self.to_cull.push(id), + } + }, + Ok(SchedulerMessage::Overspill(worker_id, id, task)) => { + self.schedule_mixer(task, id, Some(worker_id)); + }, + Ok(SchedulerMessage::GetStats(tx)) => { + _ = tx.send(self.workers.iter().map(Worker::stats).collect()); + }, + Ok(SchedulerMessage::Kill) | Err(_) => { + return false; + }, + }, + _ = interval.tick() => { + // TODO: store keepalive sends in another data structure so + // we don't check every task every 20ms. + // + // if we can also make tick handling lazy(er), we can also optimise for that. + let now = TokInstant::now(); + + for (id, task) in &mut self.tasks { + // NOTE: this is a non-blocking send so safe from async context. + if task.tick_and_keepalive(now.into()).is_err() { + self.to_cull.push(*id); + } + } + + let mut i = 0; + while i < self.workers.len() { + if let Some(then) = self.workers[i].try_mark_empty(now) { + if now.duration_since(then) >= self.cull_timer { + self.workers.swap_remove(i); + continue; + } + } + + i += 1; + } + }, + } + + for id in self.to_cull.drain(..) { + self.tasks.remove(&id); + } + + true + } + + /// Promote a task to a live mixer thread. + fn schedule_mixer(&mut self, mut task: ParkedMixer, id: TaskId, avoid: Option) { + if task.send_gateway_speaking().is_ok() { + // If a worker ever completely fails, then we need to remove it here + // `fetch_worker` will either find another, or generate us a new one if + // none exist. + + // We need to track ownership of the task coming back via SendError using this + // Option. + let mut loop_task = Some(task); + loop { + let task = loop_task.take().unwrap(); + let (worker, idx) = self.fetch_worker(&task, avoid); + match worker.schedule_mixer(id, task) { + Ok(_) => { + self.stats.move_mixer_to_live(); + break; + }, + Err(e) => { + loop_task = Some(e.0 .1); + let worker = self.workers.swap_remove(idx); + + // NOTE: we have incremented worker's live counter for this mixer in + // `schedule_mixer`. + // The only time this branch is ever hit is if a worker crashed, so we + // need to replicate some of their cleanup. + self.stats + .remove_live_mixers(worker.stats().live_mixers().saturating_sub(1)); + self.stats.remove_worker(); + }, + } + } + } + } + + /// Fetch the first `Worker` that has room for a new task, creating one if needed. + /// + /// If an inbound task has spilled from another thread, then do not reschedule it there. + fn fetch_worker( + &mut self, + task: &ParkedMixer, + avoid: Option, + ) -> (&mut Worker, usize) { + let idx = self + .workers + .iter() + .position(|w| w.can_schedule(task, avoid)) + .unwrap_or_else(|| { + self.workers.push(Worker::new( + self.next_worker_id.incr(), + self.config.clone(), + self.tx.clone(), + self.stats.clone(), + )); + self.stats.add_worker(); + self.workers.len() - 1 + }); + + (&mut self.workers[idx], idx) + } + + pub fn spawn(mut self) { + tokio::spawn(async move { self.run().await }); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + constants::test_data::FILE_WEBM_TARGET, + driver::{tasks::mixer::Mixer, OutputMode}, + input::File, + Driver, + }; + use tokio::runtime::Handle; + + #[tokio::test] + async fn inactive_mixers_dont_need_threads() { + let sched = Scheduler::new(Config::default()); + let cfg = DriverConfig::default().scheduler(sched.clone()); + + let _drivers: Vec = (0..1024).map(|_| Driver::new(cfg.clone())).collect(); + tokio::time::sleep(Duration::from_secs(1)).await; + + assert_eq!(sched.total_tasks(), 1024); + assert_eq!(sched.live_tasks(), 0); + assert_eq!(sched.worker_threads(), 0); + } + + #[tokio::test] + async fn active_mixers_spawn_threads() { + let mut config = Config::default(); + config.move_expensive_tasks = false; + + let sched = Scheduler::new(config); + let (pkt_tx, _pkt_rx) = flume::unbounded(); + let cfg = DriverConfig::default() + .scheduler(sched.clone()) + .override_connection(Some(OutputMode::Rtp(pkt_tx))); + + let n_tasks = 1024; + + let _drivers: Vec = (0..n_tasks) + .map(|_| { + let mut driver = Driver::new(cfg.clone()); + let file = File::new(FILE_WEBM_TARGET); + driver.play_input(file.into()); + driver + }) + .collect(); + tokio::time::sleep(Duration::from_secs(10)).await; + + assert_eq!(sched.total_tasks(), n_tasks); + assert_eq!(sched.live_tasks(), n_tasks); + assert_eq!( + sched.worker_threads(), + n_tasks / (DEFAULT_MIXERS_PER_THREAD.get() as u64) + ); + } + + #[tokio::test] + async fn excess_threads_are_cleaned_up() { + let mut config = Config::default(); + config.strategy = Mode::MaxPerThread(1.try_into().unwrap()); + let (mut core, tx) = Idle::new(config.clone()); + + const TEST_TIMER: Duration = Duration::from_millis(500); + core.cull_timer = TEST_TIMER; + + let mut next_id = TaskId::new(); + let mut thread_id = WorkerId::new(); + let mut handles = vec![]; + for i in 0..2 { + let mut worker = Worker::new( + thread_id.incr(), + config.clone(), + tx.clone(), + core.stats.clone(), + ); + let ((mixer, listeners), track_handle) = + Mixer::test_with_float_unending(Handle::current(), false); + + let send_mixer = ParkedMixer { + mixer: Box::new(mixer), + ssrc: i, + rtp_sequence: i as u16, + rtp_timestamp: i, + park_time: TokInstant::now().into(), + last_cost: None, + }; + core.stats.add_idle_mixer(); + core.stats.move_mixer_to_live(); + worker.schedule_mixer(next_id.incr(), send_mixer).unwrap(); + handles.push((track_handle, listeners)); + core.workers.push(worker); + } + + let mut timer = tokio::time::interval(TIMESTEP_LENGTH); + assert!(core.run_once(&mut timer).await); + + // Stop one of the handles, allow it to exit, and then run core again. + handles[1].0.stop().unwrap(); + while core.workers[1].is_busy() { + assert!(core.run_once(&mut timer).await); + } + + tokio::time::sleep(TEST_TIMER + Duration::from_secs(1)).await; + while core.workers.len() != 1 { + assert!(core.run_once(&mut timer).await); + } + + assert_eq!(core.stats.worker_threads(), 0); + } +} diff --git a/src/driver/scheduler/live.rs b/src/driver/scheduler/live.rs new file mode 100644 index 0000000..c649671 --- /dev/null +++ b/src/driver/scheduler/live.rs @@ -0,0 +1,821 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use discortp::rtp::{MutableRtpPacket, RtpPacket}; +use flume::{Receiver, SendError, Sender, TryRecvError}; +use tokio::time::Instant as TokInstant; + +use crate::{ + constants::*, + driver::tasks::{error::Error as DriverError, mixer::Mixer}, +}; + +#[cfg(test)] +use crate::driver::test_config::TickStyle; + +use super::*; + +/// The send-half of a worker thread, with bookkeeping mechanisms to help +/// the idle task schedule incoming tasks. +pub struct Worker { + id: WorkerId, + stats: Arc, + config: Config, + tx: Sender<(TaskId, ParkedMixer)>, + known_empty_since: Option, +} + +#[allow(missing_docs)] +impl Worker { + pub fn new( + id: WorkerId, + config: Config, + sched_tx: Sender, + global_stats: Arc, + ) -> Self { + let stats = Arc::new(LiveStatBlock::default()); + let (live_tx, live_rx) = flume::unbounded(); + + let core = Live::new( + id, + config.clone(), + global_stats, + stats.clone(), + live_rx, + sched_tx, + ); + core.spawn(); + + Self { + id, + stats, + config, + tx: live_tx, + known_empty_since: None, + } + } + + /// Mark the worker thread as idle from the present time if it reports no tasks. + /// + /// This time information is used for thread culling. + #[inline] + pub fn try_mark_empty(&mut self, now: TokInstant) -> Option { + if self.stats.live_mixers() == 0 { + self.known_empty_since.get_or_insert(now); + } else { + self.mark_busy(); + } + + self.known_empty_since + } + + /// Unset the thread culling time on this worker. + #[inline] + pub fn mark_busy(&mut self) { + self.known_empty_since = None; + } + + #[cfg(test)] + #[inline] + pub fn is_busy(&mut self) -> bool { + self.known_empty_since.is_none() + } + + /// Return whether this thread has enough room (task count, spare cycles) + /// for the given task. + #[inline] + pub fn can_schedule(&self, task: &ParkedMixer, avoid: Option) -> bool { + avoid.map_or(true, |id| !self.has_id(id)) + && self.stats.has_room(&self.config.strategy, task) + } + + #[inline] + pub fn stats(&self) -> Arc { + self.stats.clone() + } + + /// Increment this worker's statistics and hand off a task for execution. + #[inline] + pub fn schedule_mixer( + &mut self, + id: TaskId, + task: ParkedMixer, + ) -> Result<(), SendError<(TaskId, ParkedMixer)>> { + self.mark_busy(); + self.stats.add_mixer(); + self.tx.send((id, task)) + } + + pub fn has_id(&self, id: WorkerId) -> bool { + self.id == id + } +} + +const PACKETS_PER_BLOCK: usize = 16; +const MEMORY_CULL_TIMER: Duration = Duration::from_secs(10); + +/// A synchronous thread responsible for mixing, encoding, encrypting, and +/// sending the audio output of many `Mixer`s. +/// +/// `Mixer`s remain `Box`ed due to large move costs, and unboxing them appeared to have +/// a 5--10% perf cost from benchmarks. +pub struct Live { + packets: Vec>, + packet_lens: Vec, + #[allow(clippy::vec_box)] + tasks: Vec>, + ids: Vec, + to_cull: Vec, + + deadline: Instant, + start_of_work: Option, + + id: WorkerId, + config: Config, + stats: Arc, + global_stats: Arc, + rx: Receiver<(TaskId, ParkedMixer)>, + tx: Sender, + + excess_buffer_cull_time: Option, +} + +#[allow(missing_docs)] +impl Live { + pub fn new( + id: WorkerId, + config: Config, + global_stats: Arc, + stats: Arc, + rx: Receiver<(TaskId, ParkedMixer)>, + tx: Sender, + ) -> Self { + let to_prealloc = config.strategy.prealloc_size(); + + let block_size = config + .strategy + .task_limit() + .unwrap_or(PACKETS_PER_BLOCK) + .min(PACKETS_PER_BLOCK); + + let packets = vec![packet_block(block_size)]; + + Self { + packets, + packet_lens: Vec::with_capacity(to_prealloc), + tasks: Vec::with_capacity(to_prealloc), + ids: Vec::with_capacity(to_prealloc), + to_cull: Vec::with_capacity(to_prealloc), + + deadline: Instant::now(), + start_of_work: None, + + id, + config, + stats, + global_stats, + rx, + tx, + + excess_buffer_cull_time: None, + } + } + + #[inline] + fn run(&mut self) { + while self.run_once() {} + self.global_stats.remove_worker(); + } + + /// Returns whether the loop should exit (i.e., culled by main `Scheduler`). + #[inline] + pub fn run_once(&mut self) -> bool { + // Check for new tasks. + if self.handle_scheduler_msgs().is_err() { + return false; + } + + // Receive commands for each task. + self.handle_task_msgs(); + + // Move any idle calls back to the global pool. + self.demote_and_remove_mixers(); + + // Take a clock measure before and after each packet. + let mut pre_pkt_time = Instant::now(); + let mut worst_task = (0, Duration::default()); + + for (i, (packet_len, mixer)) in self + .packet_lens + .iter_mut() + .zip(self.tasks.iter_mut()) + .enumerate() + { + let (block, inner) = get_memory_indices(i); + match mixer.mix_and_build_packet(&mut self.packets[block][inner..][..VOICE_PACKET_MAX]) + { + Ok(written_sz) => *packet_len = written_sz, + e => { + *packet_len = 0; + rebuild_if_err(mixer, e, &mut self.to_cull, i); + }, + } + let post_pkt_time = Instant::now(); + let cost = post_pkt_time.duration_since(pre_pkt_time); + if cost > worst_task.1 { + worst_task = (i, cost); + } + pre_pkt_time = post_pkt_time; + } + + let end_of_work = pre_pkt_time; + + if let Some(start_of_work) = self.start_of_work { + let ns_cost = self.stats.store_compute_cost(end_of_work - start_of_work); + + if self.config.move_expensive_tasks + && ns_cost >= RESCHEDULE_THRESHOLD + && self.ids.len() > 1 + { + self.offload_mixer(worst_task.0, worst_task.1); + } + } + + self.timed_remove_excess_blocks(end_of_work); + + // Wait till the right time to send this packet: + // usually a 20ms tick, in test modes this is either a finite number of runs or user input. + self.march_deadline(); + + // Send all. + self.start_of_work = Some(Instant::now()); + for (i, (packet_len, mixer)) in self + .packet_lens + .iter_mut() + .zip(self.tasks.iter_mut()) + .enumerate() + { + let (block, inner) = get_memory_indices(i); + let packet = &mut self.packets[block][inner..]; + if *packet_len > 0 { + let res = mixer.send_packet(&packet[..*packet_len]); + rebuild_if_err(mixer, res, &mut self.to_cull, i); + } + #[cfg(test)] + if *packet_len == 0 { + mixer.test_signal_empty_tick(); + } + advance_rtp_counters(packet); + } + + for (i, mixer) in self.tasks.iter_mut().enumerate() { + let res = mixer + .audio_commands_events() + .and_then(|_| mixer.check_and_send_keepalive(self.start_of_work)); + rebuild_if_err(mixer, res, &mut self.to_cull, i); + } + + true + } + + #[cfg(test)] + fn _march_deadline(&mut self) { + // For testing, assume all will have same tick style. + // Only count 'remaining loops' on one of the nodes. + let mixer = self.tasks.get_mut(0).map(|m| { + let style = m.config.tick_style.clone(); + (m, style) + }); + + match mixer { + None | Some((_, TickStyle::Timed)) => { + std::thread::sleep(self.deadline.saturating_duration_since(Instant::now())); + self.deadline += TIMESTEP_LENGTH; + }, + Some((m, TickStyle::UntimedWithExecLimit(rx))) => { + if m.remaining_loops.is_none() { + if let Ok(new_val) = rx.recv() { + m.remaining_loops = Some(new_val.wrapping_sub(1)); + } + } + + if let Some(cnt) = m.remaining_loops.as_mut() { + if *cnt == 0 { + m.remaining_loops = None; + } else { + *cnt = cnt.wrapping_sub(1); + } + } + }, + } + } + + #[cfg(not(test))] + #[inline(always)] + #[allow(clippy::inline_always)] + fn _march_deadline(&mut self) { + std::thread::sleep(self.deadline.saturating_duration_since(Instant::now())); + self.deadline += TIMESTEP_LENGTH; + } + + #[inline] + fn march_deadline(&mut self) { + #[cfg(feature = "internals")] + { + return; + } + + self._march_deadline(); + } + + #[inline] + fn handle_scheduler_msgs(&mut self) -> Result<(), ()> { + let mut activation_time = None; + loop { + match self.rx.try_recv() { + Ok((id, task)) => { + self.add_task( + task, + id, + *activation_time.get_or_insert_with(|| { + self.deadline + .checked_sub(TIMESTEP_LENGTH) + .unwrap_or(self.deadline) + }), + ); + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Err(()), + } + } + + Ok(()) + } + + /// Handle messages from each tasks's `Driver`, marking dead tasks for removal. + #[inline] + fn handle_task_msgs(&mut self) { + for (i, (packet, mixer)) in self + .packets + .iter_mut() + .flat_map(|v| v.chunks_exact_mut(VOICE_PACKET_MAX)) + .zip(self.tasks.iter_mut()) + .enumerate() + { + let mut events_failure = false; + let mut conn_failure = false; + + let fatal = loop { + match mixer.mix_rx.try_recv() { + Ok(m) => { + let (events, conn, should_exit) = mixer.handle_message(m, packet); + events_failure |= events; + conn_failure |= conn; + + if should_exit { + break true; + } + }, + Err(TryRecvError::Disconnected) => { + break true; + }, + + Err(TryRecvError::Empty) => { + break false; + }, + } + }; + + if fatal || mixer.do_rebuilds(events_failure, conn_failure).is_err() { + // this is not zipped in because it is *not* needed most ticks. + self.to_cull[i] = true; + } + } + } + + #[cfg(feature = "internals")] + #[inline] + pub fn mark_for_cull(&mut self, idx: usize) { + self.to_cull[idx] = true; + } + + /// Check and demote for any tasks without live audio sources who have sent all + /// necessary silent frames (or remove dead tasks). + /// + /// This must occur *after* handling per-track events to prevent erroneously + /// descheduling tasks. + #[inline] + pub fn demote_and_remove_mixers(&mut self) { + let mut i = 0; + while i < self.tasks.len() { + #[cfg(test)] + let force_conn = self.tasks[i].config.override_connection.is_some(); + #[cfg(not(test))] + let force_conn = false; + + // Benchmarking suggests that these asserts remove some bounds checks for us. + assert!(i < self.tasks.len()); + assert!(i < self.to_cull.len()); + + if self.to_cull[i] + || (self.tasks[i].tracks.is_empty() && self.tasks[i].silence_frames == 0) + || !(self.tasks[i].conn_active.is_some() || force_conn) + { + self.stats.remove_mixer(); + + if let Some((id, parked)) = self.remove_task(i) { + self.global_stats.move_mixer_to_idle(); + let _ = self.tx.send(SchedulerMessage::Demote(id, parked)); + } else { + self.global_stats.remove_live_mixer(); + } + } else { + i += 1; + } + } + } + + /// Return a given mixer to the main scheduler if this worker is overloaded. + #[inline] + pub fn offload_mixer(&mut self, idx: usize, cost: Duration) { + self.stats.remove_mixer(); + + if let Some((id, mut parked)) = self.remove_task(idx) { + self.global_stats.move_mixer_to_idle(); + parked.last_cost = Some(cost); + let _ = self + .tx + .send(SchedulerMessage::Overspill(self.id, id, parked)); + } else { + self.global_stats.remove_live_mixer(); + } + } + + #[inline] + fn needed_blocks(&self) -> usize { + let div = self.ids.len() / PACKETS_PER_BLOCK; + let rem = self.ids.len() % PACKETS_PER_BLOCK; + (rem != 0) as usize + div + } + + #[inline] + fn has_excess_blocks(&self) -> bool { + self.packets.len() > self.needed_blocks() + } + + #[inline] + fn remove_excess_blocks(&mut self) { + self.packets.truncate(self.needed_blocks()); + } + + /// Try to offload excess packet buffers. + /// + /// If there is currently overallocation, then store the first time at which + /// this was seenb. If this condition persists past `MEMORY_CULL_TIMER`, remove + /// unnecessary blocks. + #[inline] + fn timed_remove_excess_blocks(&mut self, now: Instant) { + if self.has_excess_blocks() { + if let Some(mark_time) = self.excess_buffer_cull_time { + if now.duration_since(mark_time) >= MEMORY_CULL_TIMER { + self.remove_excess_blocks(); + self.excess_buffer_cull_time = None; + } + } else { + self.excess_buffer_cull_time = Some(now); + } + } else { + self.excess_buffer_cull_time = None; + } + } + + #[inline] + fn add_task(&mut self, task: ParkedMixer, id: TaskId, activation_time: Instant) { + let idx = self.ids.len(); + + let elapsed = task.park_time - activation_time; + + let samples_f64 = elapsed.as_secs_f64() * (SAMPLE_RATE_RAW as f64); + let mod_samples = (samples_f64 as u64) as u32; + let rtp_timestamp = task.rtp_timestamp.wrapping_add(mod_samples); + + self.ids.push(id); + self.tasks.push(task.mixer); + self.packet_lens.push(0); + self.to_cull.push(false); + + let (block, inner_idx) = get_memory_indices(idx); + + while self.packets.len() <= block { + self.add_packet_block(); + } + let packet = &mut self.packets[block][inner_idx..][..VOICE_PACKET_MAX]; + + let mut rtp = MutableRtpPacket::new(packet).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_ssrc(task.ssrc); + rtp.set_timestamp(rtp_timestamp.into()); + rtp.set_sequence(task.rtp_sequence.into()); + } + + /// Allocate and store a new packet block. + /// + /// This will be full-size (`PACKETS_PER_BLOCK`) unless this block + /// is a) the last required for the task limit and b) this limit + /// is not aligned to `PACKETS_PER_BLOCK`. + #[inline] + fn add_packet_block(&mut self) { + let n_packets = if let Some(limit) = self.config.strategy.task_limit() { + let (block, inner) = get_memory_indices_unscaled(limit); + if self.packets.len() < block || inner == 0 { + PACKETS_PER_BLOCK + } else { + inner + } + } else { + PACKETS_PER_BLOCK + }; + self.packets.push(packet_block(n_packets)); + } + + #[cfg(any(test, feature = "internals"))] + #[inline] + pub fn add_task_direct(&mut self, task: Mixer, id: TaskId) { + let id_0 = id.get(); + self.add_task( + ParkedMixer { + mixer: Box::new(task), + ssrc: id_0 as u32, + rtp_sequence: id_0 as u16, + rtp_timestamp: id_0 as u32, + park_time: Instant::now(), + last_cost: None, + }, + id, + Instant::now(), + ); + } + + /// Remove a `Mixer`, returning it to the idle scheduler. + /// + /// This operates by `swap_remove`ing each element of a Mixer's state, including + /// on RTP packet headers. This is achieved by setting up a memcpy between + /// buffer segments. + #[inline] + pub fn remove_task(&mut self, idx: usize) -> Option<(TaskId, ParkedMixer)> { + let end = self.tasks.len() - 1; + + let id = self.ids.swap_remove(idx); + let _len = self.packet_lens.swap_remove(idx); + let mixer = self.tasks.swap_remove(idx); + let alive = !self.to_cull.swap_remove(idx); + + let (block, inner_idx) = get_memory_indices(idx); + + let (removed, replacement) = if end > idx { + let (end_block, end_inner) = get_memory_indices(end); + let (rest, target_block) = self.packets.split_at_mut(end_block); + let (last_block, end_pkt) = target_block[0].split_at_mut(end_inner); + + if end_block == block { + (&mut last_block[inner_idx..], Some(end_pkt)) + } else { + (&mut rest[block][inner_idx..], Some(end_pkt)) + } + } else { + (&mut self.packets[block][inner_idx..], None) + }; + + let rtp = RtpPacket::new(removed).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + let ssrc = rtp.get_ssrc(); + let rtp_timestamp = rtp.get_timestamp().into(); + let rtp_sequence = rtp.get_sequence().into(); + + if let Some(replacement) = replacement { + // Copy the whole packet header since we know it'll be 4B aligned. + // 'Just necessary fields' is 2B aligned. + const COPY_LEN: usize = RtpPacket::minimum_packet_size(); + removed[..COPY_LEN].copy_from_slice(&replacement[..COPY_LEN]); + } + + alive.then(move || { + let park_time = Instant::now(); + + ( + id, + ParkedMixer { + mixer, + ssrc, + rtp_sequence, + rtp_timestamp, + park_time, + last_cost: None, + }, + ) + }) + } + + /// Spawn a new sync thread to manage `Mixer`s. + fn spawn(mut self) { + std::thread::spawn(move || { + self.run(); + }); + } +} + +/// Initialises a packet block of the required size, prefilling any constant RTP data. +#[inline] +fn packet_block(n_packets: usize) -> Box<[u8]> { + let mut packets = vec![0u8; VOICE_PACKET_MAX * n_packets].into_boxed_slice(); + + for packet in packets.chunks_exact_mut(VOICE_PACKET_MAX) { + let mut rtp = MutableRtpPacket::new(packet).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_version(RTP_VERSION); + rtp.set_payload_type(RTP_PROFILE_TYPE); + } + + packets +} + +/// Returns the block index into `self.packets` and the packet number in +/// the block for a given worker's index. +#[inline] +fn get_memory_indices_unscaled(idx: usize) -> (usize, usize) { + let block_size = PACKETS_PER_BLOCK; + (idx / block_size, idx % block_size) +} + +/// Returns the block index into `self.packets` and the byte offset into +/// a packet block for a given worker's index. +#[inline] +fn get_memory_indices(idx: usize) -> (usize, usize) { + let (block, inner_unscaled) = get_memory_indices_unscaled(idx); + (block, inner_unscaled * VOICE_PACKET_MAX) +} + +#[inline] +fn advance_rtp_counters(packet: &mut [u8]) { + let mut rtp = MutableRtpPacket::new(packet).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_sequence(rtp.get_sequence() + 1); + rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); +} + +/// Structured slightly confusingly: we only want to even access `cull_markers` +/// in the event of error. +#[inline] +fn rebuild_if_err( + mixer: &mut Box, + res: Result, + cull_markers: &mut [bool], + idx: usize, +) { + if let Err(e) = res { + cull_markers[idx] |= mixer + .do_rebuilds( + e.should_trigger_interconnect_rebuild(), + e.should_trigger_connect(), + ) + .is_err(); + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::driver::test_impls::*; + use tokio::runtime::Handle; + + fn rtp_has_index(pkt: &[u8], sentinel_val: u16) { + let rtp = RtpPacket::new(pkt).unwrap(); + + assert_eq!(rtp.get_version(), RTP_VERSION); + assert_eq!(rtp.get_padding(), 0); + assert_eq!(rtp.get_extension(), 0); + assert_eq!(rtp.get_csrc_count(), 0); + assert_eq!(rtp.get_marker(), 0); + assert_eq!(rtp.get_payload_type(), RTP_PROFILE_TYPE); + assert_eq!(rtp.get_sequence(), sentinel_val.into()); + assert_eq!(rtp.get_timestamp(), (sentinel_val as u32).into()); + assert_eq!(rtp.get_ssrc(), sentinel_val as u32); + } + + #[tokio::test] + async fn block_alloc_is_partial_small() { + let n_mixers = 1; + let (sched, _listeners) = MockScheduler::from_mixers( + Some(Mode::MaxPerThread(n_mixers.try_into().unwrap())), + (0..n_mixers) + .map(|_| Mixer::test_with_float(1, Handle::current(), false)) + .collect(), + ); + + assert_eq!(sched.core.packets.len(), 1); + assert_eq!(sched.core.packets[0].len(), VOICE_PACKET_MAX); + } + + #[tokio::test] + async fn block_alloc_is_partial_large() { + let n_mixers = 33; + let (sched, _listeners) = MockScheduler::from_mixers( + Some(Mode::MaxPerThread(n_mixers.try_into().unwrap())), + (0..n_mixers) + .map(|_| Mixer::test_with_float(1, Handle::current(), false)) + .collect(), + ); + + assert_eq!(sched.core.packets.len(), 3); + assert_eq!( + sched.core.packets[0].len(), + PACKETS_PER_BLOCK * VOICE_PACKET_MAX + ); + assert_eq!( + sched.core.packets[1].len(), + PACKETS_PER_BLOCK * VOICE_PACKET_MAX + ); + assert_eq!(sched.core.packets[2].len(), VOICE_PACKET_MAX); + } + + #[tokio::test] + async fn deletion_moves_pkt_header() { + let (mut sched, _listeners) = MockScheduler::from_mixers( + None, + (0..PACKETS_PER_BLOCK) + .map(|_| Mixer::test_with_float(1, Handle::current(), false)) + .collect(), + ); + + let last_idx = (PACKETS_PER_BLOCK - 1) as u16; + + // Remove head. + sched.core.remove_task(0); + rtp_has_index(&sched.core.packets[0], last_idx); + + // Remove head. + sched.core.remove_task(5); + rtp_has_index(&sched.core.packets[0][5 * VOICE_PACKET_MAX..], last_idx - 1); + } + + #[tokio::test] + async fn deletion_moves_pkt_header_multiblock() { + let n_pkts = PACKETS_PER_BLOCK + 8; + let (mut sched, _listeners) = MockScheduler::from_mixers( + None, + (0..n_pkts) + .map(|_| Mixer::test_with_float(1, Handle::current(), false)) + .collect(), + ); + + let last_idx = (n_pkts - 1) as u16; + + // Remove head (read from block 1 into block 0). + sched.core.remove_task(0); + rtp_has_index(&sched.core.packets[0], last_idx); + + // Remove later (read from block 1 into block 1). + sched.core.remove_task(17); + rtp_has_index(&sched.core.packets[1][VOICE_PACKET_MAX..], last_idx - 1); + } + + #[tokio::test] + async fn packet_blocks_are_cleaned_up() { + // Allocate 2 blocks. + let n_pkts = PACKETS_PER_BLOCK + 1; + let (mut sched, _listeners) = MockScheduler::from_mixers( + None, + (0..n_pkts) + .map(|_| Mixer::test_with_float(1, Handle::current(), false)) + .collect(), + ); + + // Assert no cleanup at start. + assert!(sched.core.run_once()); + assert_eq!(sched.core.needed_blocks(), 2); + assert!(sched.core.excess_buffer_cull_time.is_none()); + + // Remove only entry in last block. Cleanup should be sched'd. + sched.core.remove_task(n_pkts - 1); + assert!(sched.core.run_once()); + assert!(sched.core.has_excess_blocks()); + assert!(sched.core.excess_buffer_cull_time.is_some()); + + tokio::time::sleep(Duration::from_secs(2) + MEMORY_CULL_TIMER).await; + + // Cleanup should be unsched'd. + assert!(sched.core.run_once()); + assert!(sched.core.excess_buffer_cull_time.is_none()); + assert!(!sched.core.has_excess_blocks()); + } +} diff --git a/src/driver/scheduler/mod.rs b/src/driver/scheduler/mod.rs new file mode 100644 index 0000000..f7d6ec2 --- /dev/null +++ b/src/driver/scheduler/mod.rs @@ -0,0 +1,174 @@ +use std::{error::Error as StdError, fmt::Display, num::NonZeroUsize, sync::Arc}; + +use flume::{Receiver, RecvError, Sender}; +use once_cell::sync::Lazy; + +use crate::{constants::TIMESTEP_LENGTH, Config as DriverConfig}; + +use super::tasks::message::{Interconnect, MixerMessage}; + +mod config; +mod idle; +mod live; +mod stats; +mod task; + +pub use config::*; +use idle::*; +pub use live::*; +pub use stats::*; +pub use task::*; + +/// A soft maximum of 90% of the 20ms budget to account for variance in execution time. +const RESCHEDULE_THRESHOLD: u64 = ((TIMESTEP_LENGTH.subsec_nanos() as u64) * 9) / 10; + +const DEFAULT_MIXERS_PER_THREAD: NonZeroUsize = match NonZeroUsize::new(16) { + Some(v) => v, + None => [][0], +}; + +/// The default shared scheduler instance. +/// +/// This is built using the default value of [`ScheduleMode`]. Users desiring +/// a custom strategy should avoid calling [`Config::default`]. +/// +/// [`Config::default`]: crate::Config::default +/// [`ScheduleMode`]: Mode +pub static DEFAULT_SCHEDULER: Lazy = Lazy::new(Scheduler::default); + +/// A reference to a shared group of threads used for running idle and active +/// audio threads. +#[derive(Clone, Debug)] +pub struct Scheduler { + inner: Arc, +} + +/// Inner contents of a [`Scheduler`] instance. +/// +/// This is an `Arc` around `Arc`'d contents so that we can make use of the +/// drop check on `Scheduler` to clean up resources. +#[derive(Clone, Debug)] +struct InnerScheduler { + tx: Sender, + stats: Arc, +} + +impl Scheduler { + /// Create a new mixer scheduler from the allocation strategy in `config`. + #[must_use] + pub fn new(config: Config) -> Self { + let (core, tx) = Idle::new(config); + + let stats = core.stats.clone(); + core.spawn(); + + let inner = Arc::new(InnerScheduler { tx, stats }); + + Self { inner } + } + + pub(crate) fn new_mixer( + &self, + config: &DriverConfig, + ic: Interconnect, + rx: Receiver, + ) { + self.inner + .tx + .send(SchedulerMessage::NewMixer(rx, ic, config.clone())) + .unwrap(); + } + + /// Returns the total number of calls (idle and active) scheduled. + #[must_use] + pub fn total_tasks(&self) -> u64 { + self.inner.stats.total_mixers() + } + + /// Returns the total number of *active* calls scheduled and processing + /// audio. + #[must_use] + pub fn live_tasks(&self) -> u64 { + self.inner.stats.live_mixers() + } + + /// Returns the total number of threads spawned to process live audio sessions. + #[must_use] + pub fn worker_threads(&self) -> u64 { + self.inner.stats.worker_threads() + } + + /// Request a list of handles to statistics for currently live workers. + pub async fn worker_thread_stats(&self) -> Result>, Error> { + let (tx, rx) = flume::bounded(1); + _ = self.inner.tx.send(SchedulerMessage::GetStats(tx)); + + rx.recv_async().await.map_err(Error::from) + } + + /// Request a list of handles to statistics for currently live workers with a blocking call. + pub fn worker_thread_stats_blocking(&self) -> Result>, Error> { + let (tx, rx) = flume::bounded(1); + _ = self.inner.tx.send(SchedulerMessage::GetStats(tx)); + + rx.recv().map_err(Error::from) + } +} + +impl Drop for InnerScheduler { + fn drop(&mut self) { + _ = self.tx.send(SchedulerMessage::Kill); + } +} + +impl Default for Scheduler { + fn default() -> Self { + Scheduler::new(Config::default()) + } +} + +/// Control messages for a scheduler. +pub enum SchedulerMessage { + /// Build a new `Mixer` as part of the initialisation of a `Driver`. + NewMixer(Receiver, Interconnect, DriverConfig), + /// Forward a command for + Do(TaskId, MixerMessage), + /// Return a `Mixer` from a worker back to the idle pool. + Demote(TaskId, ParkedMixer), + /// Move an expensive `Mixer` to another thread in the worker pool. + Overspill(WorkerId, TaskId, ParkedMixer), + /// Request a copy of all per-worker statistics. + GetStats(Sender>>), + /// Cleanup once all `Scheduler` handles are dropped. + Kill, +} + +/// Errors encountered when communicating with the internals of a [`Scheduler`]. +/// +/// [`Scheduler`]: crate::driver::Scheduler +#[non_exhaustive] +#[derive(Debug)] +pub enum Error { + /// The scheduler exited or crashed while awating the request. + Disconnected, +} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Disconnected => f.write_str("the scheduler terminated mid-request"), + } + } +} + +impl StdError for Error { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + None + } +} + +impl From for Error { + fn from(_: RecvError) -> Self { + Self::Disconnected + } +} diff --git a/src/driver/scheduler/stats.rs b/src/driver/scheduler/stats.rs new file mode 100644 index 0000000..d1f10ea --- /dev/null +++ b/src/driver/scheduler/stats.rs @@ -0,0 +1,134 @@ +use std::{ + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +use super::{Mode, ParkedMixer, RESCHEDULE_THRESHOLD}; + +/// Statistics shared by an entire `Scheduler`. +#[derive(Debug, Default)] +pub struct StatBlock { + total: AtomicU64, + live: AtomicU64, + threads: AtomicU64, +} + +#[allow(missing_docs)] +impl StatBlock { + #[inline] + pub fn total_mixers(&self) -> u64 { + self.total.load(Ordering::Relaxed) + } + + #[inline] + pub fn live_mixers(&self) -> u64 { + self.live.load(Ordering::Relaxed) + } + + #[inline] + pub fn worker_threads(&self) -> u64 { + self.threads.load(Ordering::Relaxed) + } + + #[inline] + pub fn add_idle_mixer(&self) { + self.total.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn remove_idle_mixers(&self, n: u64) { + self.total.fetch_sub(n, Ordering::Relaxed); + } + + #[inline] + pub fn move_mixer_to_live(&self) { + self.live.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn move_mixer_to_idle(&self) { + self.move_mixers_to_idle(1); + } + + #[inline] + pub fn move_mixers_to_idle(&self, n: u64) { + self.live.fetch_sub(n, Ordering::Relaxed); + } + + #[inline] + pub fn remove_live_mixer(&self) { + self.remove_live_mixers(1); + } + + #[inline] + pub fn remove_live_mixers(&self, n: u64) { + self.move_mixers_to_idle(n); + self.remove_idle_mixers(n); + } + + #[inline] + pub fn add_worker(&self) { + self.threads.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub fn remove_worker(&self) { + self.threads.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Runtime statistics for an individual worker. +/// +/// Individual statistics are measured atomically -- the worker thread +/// may have been cleaned up, or its mixer count may not match the +/// count when [`Self::last_compute_cost_ns`] was set. +#[derive(Debug, Default)] +pub struct LiveStatBlock { + live: AtomicU64, + last_ns: AtomicU64, +} + +impl LiveStatBlock { + /// Returns the number of mixer tasks scheduled on this worker thread. + #[inline] + pub fn live_mixers(&self) -> u64 { + self.live.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn add_mixer(&self) { + self.live.fetch_add(1, Ordering::Relaxed); + } + + #[inline] + pub(crate) fn remove_mixer(&self) { + self.live.fetch_sub(1, Ordering::Relaxed); + } + + #[inline] + pub(crate) fn store_compute_cost(&self, work: Duration) -> u64 { + let cost = work.as_nanos() as u64; + self.last_ns.store(cost, Ordering::Relaxed); + cost + } + + /// Returns the number of nanoseconds required to process all worker threads' + /// packet transmission, mixing, encoding, and encryption in the last tick. + #[inline] + pub fn last_compute_cost_ns(&self) -> u64 { + self.last_ns.load(Ordering::Relaxed) + } + + #[inline] + pub(crate) fn has_room(&self, strategy: &Mode, task: &ParkedMixer) -> bool { + let task_room = strategy + .task_limit() + .map_or(true, |limit| self.live_mixers() < limit as u64); + + let exec_room = task.last_cost.map_or(true, |cost| { + cost.as_nanos() as u64 + self.last_compute_cost_ns() < RESCHEDULE_THRESHOLD + }); + + task_room && exec_room + } +} diff --git a/src/driver/scheduler/task.rs b/src/driver/scheduler/task.rs new file mode 100644 index 0000000..a3d7f85 --- /dev/null +++ b/src/driver/scheduler/task.rs @@ -0,0 +1,187 @@ +use std::{ + marker::PhantomData, + time::{Duration, Instant}, +}; + +use flume::{Receiver, Sender}; +use nohash_hasher::IsEnabled; +use rand::random; +use tokio::runtime::Handle; + +use crate::{ + driver::tasks::{ + error::Error as DriverError, + message::{EventMessage, Interconnect, MixerMessage}, + mixer::Mixer, + }, + Config, +}; + +use super::SchedulerMessage; + +/// Typesafe counter used to identify individual mixer/worker instances. +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct ResId(u64, PhantomData); +#[allow(missing_docs)] +pub type TaskId = ResId; +#[allow(missing_docs)] +pub type WorkerId = ResId; + +#[allow(missing_docs)] +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct TaskMarker; +#[allow(missing_docs)] +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct WorkerMarker; + +impl IsEnabled for ResId {} + +#[allow(missing_docs)] +impl ResId { + pub fn new() -> Self { + ResId(0, PhantomData) + } + + pub fn incr(&mut self) -> Self { + let out = *self; + self.0 = self.0.wrapping_add(1); + out + } + + #[cfg(any(test, feature = "internals"))] + pub fn get(&self) -> u64 { + self.0 + } +} + +/// An idle mixer instance, externally controlled by a `Driver`. +/// +/// Since we do not allocate packet buffers for idle threads, this +/// struct includes various RTP fields. +pub struct ParkedMixer { + /// Mixer, track, etc. state as well as message receivers. + pub mixer: Box, + /// The SSRC assigned to this voice session. + pub ssrc: u32, + /// The last recorded/generated RTP sequence. + pub rtp_sequence: u16, + /// The last recorded/generated RTP timestamp. + pub rtp_timestamp: u32, + /// The time at which this `Mixer` was made idle. + /// + /// This is used when transitioning to a live state to determine + /// how far we should adjust the RTP timestamp by. + pub park_time: Instant, + /// The last known cost of executing this task, if it had to be moved + /// due to a limit on thread resources. + pub last_cost: Option, +} + +#[allow(missing_docs)] +impl ParkedMixer { + /// Create a new `Mixer` in a parked state. + pub fn new(mix_rx: Receiver, interconnect: Interconnect, config: Config) -> Self { + Self { + mixer: Box::new(Mixer::new(mix_rx, Handle::current(), interconnect, config)), + ssrc: 0, + rtp_sequence: random::(), + rtp_timestamp: random::(), + park_time: Instant::now(), + last_cost: None, + } + } + + /// Spawn a tokio task which forwards any mixer messages to the central `Idle` task pool. + /// + /// Any requests which would cause this mixer to become live will terminate + /// this task. + pub fn spawn_forwarder(&self, tx: Sender, id: TaskId) { + let remote_rx = self.mixer.mix_rx.clone(); + tokio::spawn(async move { + while let Ok(msg) = remote_rx.recv_async().await { + let exit = msg.is_mixer_now_live(); + let dead = tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err(); + if exit || dead { + break; + } + } + }); + } + + /// Returns whether the mixer should exit and be cleaned up. + pub fn handle_message(&mut self, msg: MixerMessage) -> Result { + match msg { + MixerMessage::SetConn(conn, ssrc) => { + // Overridden because + self.ssrc = ssrc; + self.rtp_sequence = random::(); + self.rtp_timestamp = random::(); + self.mixer.conn_active = Some(conn); + self.mixer.update_keepalive(ssrc); + + Ok(false) + }, + MixerMessage::Ws(ws) => { + // Overridden so that we don't mistakenly tell Discord we're speaking. + self.mixer.ws = ws; + self.send_gateway_not_speaking(); + + Ok(false) + }, + msg => { + let (events_failure, conn_failure, should_exit) = + self.mixer.handle_message(msg, &mut []); + + self.mixer + .do_rebuilds(events_failure, conn_failure) + .map_err(|_| ()) + .map(|_| should_exit) + }, + } + } + + /// Handle periodic events attached to this `Mixer`, including timer state + /// on the event thread and UDP keepalives needed to prevent session termination. + /// + /// As we init our UDP sockets as non-blocking via Tokio -> `into_std`, it is + /// safe to call UDP packet sends like this. + pub fn tick_and_keepalive(&mut self, now: Instant) -> Result<(), ()> { + // TODO: should we include an atomic which signals whether the event + // thread *cares*, so we can prevent wakeups? + // Can we do the same for live tracks? + let mut events_failure = self.mixer.fire_event(EventMessage::Tick).is_err(); + + let ka_err = self + .mixer + .check_and_send_keepalive(Some(now)) + .or_else(DriverError::disarm_would_block); + + let conn_failure = if let Err(e) = ka_err { + events_failure |= e.should_trigger_interconnect_rebuild(); + e.should_trigger_connect() + } else { + false + }; + + self.mixer + .do_rebuilds(events_failure, conn_failure) + .map_err(|_| ()) + } + + pub fn send_gateway_speaking(&mut self) -> Result<(), ()> { + if let Err(e) = self.mixer.send_gateway_speaking() { + self.mixer + .do_rebuilds( + e.should_trigger_interconnect_rebuild(), + e.should_trigger_connect(), + ) + .map_err(|_| ()) + } else { + Ok(()) + } + } + + pub fn send_gateway_not_speaking(&self) { + self.mixer.send_gateway_not_speaking(); + } +} diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 585e135..0b1c17b 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -39,6 +39,12 @@ pub enum MixerMessage { Poison, } +impl MixerMessage { + pub fn is_mixer_now_live(&self) -> bool { + matches!(self, Self::AddTrack(_) | Self::SetTrack(Some(_))) + } +} + pub enum MixerInputResultMessage { CreateErr(Arc), ParseErr(Arc), diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 18892e6..6fc4e51 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -34,11 +34,12 @@ use discortp::{ rtp::{MutableRtpPacket, RtpPacket}, MutablePacket, }; -use flume::{Receiver, Sender, TryRecvError}; +use flume::{Receiver, SendError, Sender, TryRecvError}; use rand::random; use rubato::{FftFixedOut, Resampler}; use std::{ io::Write, + result::Result as StdResult, sync::Arc, time::{Duration, Instant}, }; @@ -51,11 +52,11 @@ use symphonia_core::{ units::Time, }; use tokio::runtime::Handle; -use tracing::{debug, error, instrument, warn}; +use tracing::error; use xsalsa20poly1305::TAG_SIZE; #[cfg(test)] -use crate::driver::test_config::{OutputMessage, OutputMode, TickStyle}; +use crate::driver::test_config::{OutputMessage, OutputMode}; #[cfg(test)] use discortp::Packet as _; @@ -70,10 +71,9 @@ pub struct Mixer { pub interconnect: Interconnect, pub mix_rx: Receiver, pub muted: bool, - pub packet: [u8; VOICE_PACKET_MAX], + // pub packet: [u8; VOICE_PACKET_MAX], pub prevent_events: bool, pub silence_frames: u8, - pub skip_sleep: bool, pub soft_clip: SoftClip, thread_pool: BlockyTaskPool, pub ws: Option>, @@ -89,7 +89,10 @@ pub struct Mixer { resample_scratch: AudioBuffer, #[cfg(test)] - remaining_loops: Option, + pub remaining_loops: Option, + + #[cfg(test)] + raw_msg: Option, } fn new_encoder(bitrate: Bitrate, mix_mode: MixMode) -> Result { @@ -111,18 +114,8 @@ impl Mixer { .expect("Failed to create encoder in mixing thread with known-good values."); let soft_clip = SoftClip::new(config.mix_mode.to_opus()); - let mut packet = [0u8; VOICE_PACKET_MAX]; let keepalive_packet = [0u8; MutableKeepalivePacket::minimum_packet_size()]; - let mut rtp = MutableRtpPacket::new(&mut packet[..]).expect( - "FATAL: Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); - rtp.set_version(RTP_VERSION); - rtp.set_payload_type(RTP_PROFILE_TYPE); - rtp.set_sequence(random::().into()); - rtp.set_timestamp(random::().into()); - let tracks = Vec::with_capacity(1.max(config.preallocated_tracks)); let track_handles = Vec::with_capacity(1.max(config.preallocated_tracks)); @@ -165,10 +158,8 @@ impl Mixer { interconnect, mix_rx, muted: false, - packet, prevent_events: false, silence_frames: 0, - skip_sleep: false, soft_clip, thread_pool, ws: None, @@ -185,110 +176,55 @@ impl Mixer { #[cfg(test)] remaining_loops: None, + #[cfg(test)] + raw_msg: None, } } - fn run(&mut self) { - let mut events_failure = false; - let mut conn_failure = false; + fn set_bitrate(&mut self, bitrate: Bitrate) -> Result<()> { + self.encoder.set_bitrate(bitrate).map_err(Into::into) + } - #[cfg(test)] - let ignore_check = self.config.override_connection.is_some(); - #[cfg(not(test))] - let ignore_check = false; - - 'runner: loop { - if self.conn_active.is_some() || ignore_check { - loop { - match self.mix_rx.try_recv() { - Ok(m) => { - let (events, conn, should_exit) = self.handle_message(m); - events_failure |= events; - conn_failure |= conn; - - if should_exit { - break 'runner; - } - }, - - Err(TryRecvError::Disconnected) => { - break 'runner; - }, - - Err(TryRecvError::Empty) => { - break; - }, - }; - } - - // The above action may have invalidated the connection; need to re-check! - // Also, if we're in a test mode we should unconditionally run packet mixing code. - if self.conn_active.is_some() || ignore_check { - if let Err(e) = self - .cycle() - .and_then(|_| self.audio_commands_events()) - .and_then(|_| { - self.check_and_send_keepalive() - .or_else(Error::disarm_would_block) - }) - { - events_failure |= e.should_trigger_interconnect_rebuild(); - conn_failure |= e.should_trigger_connect(); - - debug!("Mixer thread cycle: {:?}", e); - } - } - } else { - match self.mix_rx.recv() { - Ok(m) => { - let (events, conn, should_exit) = self.handle_message(m); - events_failure |= events; - conn_failure |= conn; - - if should_exit { - break 'runner; - } - }, - Err(_) => { - break 'runner; - }, - } - } - - // event failure? rebuild interconnect. - // ws or udp failure? full connect - // (soft reconnect is covered by the ws task.) - // - // in both cases, send failure is fatal, - // but will only occur on disconnect. - // expecting this is fairly noisy, so exit silently. - if events_failure { - self.prevent_events = true; - let sent = self - .interconnect - .core - .send(CoreMessage::RebuildInterconnect); - events_failure = false; - - if sent.is_err() { - break; - } - } - - if conn_failure { - self.conn_active = None; - let sent = self.interconnect.core.send(CoreMessage::FullReconnect); - conn_failure = false; - - if sent.is_err() { - break; - } - } + pub(crate) fn do_rebuilds( + &mut self, + event_failure: bool, + conn_failure: bool, + ) -> StdResult<(), SendError> { + // event failure? rebuild interconnect. + // ws or udp failure? full connect + // (soft reconnect is covered by the ws task.) + // + // in both cases, send failure is fatal, + // but will only occur on disconnect. + if event_failure { + self.rebuild_interconnect()?; } + + if conn_failure { + self.full_reconnect_gateway()?; + } + + Ok(()) + } + + pub(crate) fn rebuild_interconnect(&mut self) -> StdResult<(), SendError> { + self.prevent_events = true; + self.interconnect + .core + .send(CoreMessage::RebuildInterconnect) + } + + pub(crate) fn full_reconnect_gateway(&mut self) -> StdResult<(), SendError> { + self.conn_active = None; + self.interconnect.core.send(CoreMessage::FullReconnect) } #[inline] - fn handle_message(&mut self, msg: MixerMessage) -> (bool, bool, bool) { + pub(crate) fn handle_message( + &mut self, + msg: MixerMessage, + packet: &mut [u8], + ) -> (bool, bool, bool) { let mut events_failure = false; let mut conn_failure = false; let mut should_exit = false; @@ -323,7 +259,7 @@ impl Mixer { }, MixerMessage::SetConn(conn, ssrc) => { self.conn_active = Some(conn); - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + let mut rtp = MutableRtpPacket::new(packet).expect( "Too few bytes in self.packet for RTP header.\ (Blame: VOICE_PACKET_MAX?)", ); @@ -332,10 +268,7 @@ impl Mixer { rtp.set_timestamp(random::().into()); self.deadline = Instant::now(); - let mut ka = MutableKeepalivePacket::new(&mut self.keepalive_packet[..]) - .expect("FATAL: Insufficient bytes given to keepalive packet."); - ka.set_ssrc(ssrc); - self.keepalive_deadline = self.deadline + UDP_KEEPALIVE_GAP; + self.update_keepalive(ssrc); Ok(()) }, MixerMessage::DropConn => { @@ -420,6 +353,9 @@ impl Mixer { }, MixerMessage::Ws(new_ws_handle) => { self.ws = new_ws_handle; + if let Err(e) = self.send_gateway_speaking() { + conn_failure |= e.should_trigger_connect(); + } Ok(()) }, MixerMessage::Poison => { @@ -436,8 +372,15 @@ impl Mixer { (events_failure, conn_failure, should_exit) } + pub(crate) fn update_keepalive(&mut self, ssrc: u32) { + let mut ka = MutableKeepalivePacket::new(&mut self.keepalive_packet[..]) + .expect("FATAL: Insufficient bytes given to keepalive packet."); + ka.set_ssrc(ssrc); + self.keepalive_deadline = self.deadline + UDP_KEEPALIVE_GAP; + } + #[inline] - fn fire_event(&self, event: EventMessage) -> Result<()> { + pub(crate) fn fire_event(&self, event: EventMessage) -> Result<()> { // As this task is responsible for noticing the potential death of an event context, // it's responsible for not forcibly recreating said context repeatedly. if !self.prevent_events { @@ -476,7 +419,7 @@ impl Mixer { } #[inline] - fn audio_commands_events(&mut self) -> Result<()> { + pub(crate) fn audio_commands_events(&mut self) -> Result<()> { // Apply user commands. for (i, track) in self.tracks.iter_mut().enumerate() { // This causes fallible event system changes, @@ -558,48 +501,19 @@ impl Mixer { } #[cfg(test)] - fn _march_deadline(&mut self) { - match &self.config.tick_style { - TickStyle::Timed => { - std::thread::sleep(self.deadline.saturating_duration_since(Instant::now())); - self.deadline += TIMESTEP_LENGTH; - }, - TickStyle::UntimedWithExecLimit(rx) => { - if self.remaining_loops.is_none() { - if let Ok(new_val) = rx.recv() { - self.remaining_loops = Some(new_val.wrapping_sub(1)); - } - } - - if let Some(cnt) = self.remaining_loops.as_mut() { - if *cnt == 0 { - self.remaining_loops = None; - } else { - *cnt = cnt.wrapping_sub(1); - } - } - }, + #[inline] + pub(crate) fn test_signal_empty_tick(&self) { + match &self.config.override_connection { + Some(OutputMode::Raw(tx)) => + drop(tx.send(crate::driver::test_config::TickMessage::NoEl)), + Some(OutputMode::Rtp(tx)) => + drop(tx.send(crate::driver::test_config::TickMessage::NoEl)), + None => {}, } } - #[cfg(not(test))] - #[inline(always)] - #[allow(clippy::inline_always)] // Justified, this is a very very hot path - fn _march_deadline(&mut self) { - std::thread::sleep(self.deadline.saturating_duration_since(Instant::now())); - self.deadline += TIMESTEP_LENGTH; - } - #[inline] - fn march_deadline(&mut self) { - if self.skip_sleep { - return; - } - - self._march_deadline(); - } - - pub fn cycle(&mut self) -> Result<()> { + pub fn mix_and_build_packet(&mut self, packet: &mut [u8]) -> Result { // symph_mix is an `AudioBuffer` (planar format), we need to convert this // later into an interleaved `SampleBuffer` for libopus. self.symph_mix.clear(); @@ -609,7 +523,7 @@ impl Mixer { // Walk over all the audio files, combining into one audio frame according // to volume, play state, etc. let mut mix_len = { - let out = self.mix_tracks(); + let out = self.mix_tracks(packet); self.sample_buffer.copy_interleaved_typed(&self.symph_mix); @@ -626,7 +540,7 @@ impl Mixer { if mix_len == MixType::MixedPcm(0) { if self.silence_frames > 0 { self.silence_frames -= 1; - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + let mut rtp = MutableRtpPacket::new(packet).expect( "FATAL: Too few bytes in self.packet for RTP header.\ (Blame: VOICE_PACKET_MAX?)", ); @@ -638,28 +552,7 @@ impl Mixer { mix_len = MixType::Passthrough(SILENT_FRAME.len()); } else { // Per official guidelines, send 5x silence BEFORE we stop speaking. - if let Some(ws) = &self.ws { - // NOTE: this explicit `drop` should prevent a catastrophic thread pileup. - // A full reconnect might cause an inner closed connection. - // It's safer to leave the central task to clean this up and - // pass the mixer a new channel. - drop(ws.send(WsMessage::Speaking(false))); - } - - self.march_deadline(); - - #[cfg(test)] - match &self.config.override_connection { - Some(OutputMode::Raw(tx)) => - drop(tx.send(crate::driver::test_config::TickMessage::NoEl)), - Some(OutputMode::Rtp(tx)) => - drop(tx.send(crate::driver::test_config::TickMessage::NoEl)), - None => {}, - } - - self.advance_rtp_timestamp(); - - return Ok(()); + return Ok(0); } } else { self.silence_frames = 5; @@ -676,20 +569,14 @@ impl Mixer { } } - if let Some(ws) = &self.ws { - ws.send(WsMessage::Speaking(true))?; - } - - // Wait till the right time to send this packet: - // usually a 20ms tick, in test modes this is either a finite number of runs or user input. - self.march_deadline(); - + // For the benefit of test cases, send the raw un-RTP'd data. #[cfg(test)] - let send_status = if let Some(OutputMode::Raw(tx)) = &self.config.override_connection { + let out = if let Some(OutputMode::Raw(_)) = &self.config.override_connection { + // This case has been handled before buffer clearing above. let msg = match mix_len { MixType::Passthrough(len) if len == SILENT_FRAME.len() => OutputMessage::Silent, MixType::Passthrough(len) => { - let rtp = RtpPacket::new(&self.packet[..]).expect( + let rtp = RtpPacket::new(&packet).expect( "FATAL: Too few bytes in self.packet for RTP header.\ (Blame: VOICE_PACKET_MAX?)", ); @@ -704,19 +591,15 @@ impl Mixer { ), }; - drop(tx.send(msg.into())); + self.raw_msg = Some(msg); - Ok(()) + Ok(1) } else { - self.prep_and_send_packet(mix_len) + self.prep_packet(mix_len, packet) }; #[cfg(not(test))] - let send_status = self.prep_and_send_packet(mix_len); - - send_status.or_else(Error::disarm_would_block)?; - - self.advance_rtp_counters(); + let out = self.prep_packet(mix_len, packet); // Zero out all planes of the mix buffer if any audio was written. if matches!(mix_len, MixType::MixedPcm(a) if a > 0) { @@ -725,15 +608,11 @@ impl Mixer { } } - Ok(()) - } - - fn set_bitrate(&mut self, bitrate: Bitrate) -> Result<()> { - self.encoder.set_bitrate(bitrate).map_err(Into::into) + out } #[inline] - fn prep_and_send_packet(&mut self, mix_len: MixType) -> Result<()> { + fn prep_packet(&mut self, mix_len: MixType, packet: &mut [u8]) -> Result { let send_buffer = self.sample_buffer.samples(); let conn = self @@ -741,92 +620,97 @@ impl Mixer { .as_mut() .expect("Shouldn't be mixing packets without access to a cipher + UDP dest."); - let index = { - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( - "FATAL: Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); + let mut rtp = MutableRtpPacket::new(packet).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); - let payload = rtp.payload_mut(); - let crypto_mode = conn.crypto_state.kind(); + let payload = rtp.payload_mut(); + let crypto_mode = conn.crypto_state.kind(); - // If passthrough, Opus payload in place already. - // Else encode into buffer with space for AEAD encryption headers. - let payload_len = match mix_len { - MixType::Passthrough(opus_len) => opus_len, - MixType::MixedPcm(_samples) => { - let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); - self.encoder.encode_float( - &send_buffer[..self.config.mix_mode.sample_count_in_frame()], - &mut payload[TAG_SIZE..total_payload_space], - )? - }, - }; - - let final_payload_size = conn - .crypto_state - .write_packet_nonce(&mut rtp, TAG_SIZE + payload_len); - - // Packet encryption ignored in test modes. - #[cfg(not(test))] - let encrypt = true; - #[cfg(test)] - let encrypt = self.config.override_connection.is_none(); - - if encrypt { - conn.crypto_state.kind().encrypt_in_place( - &mut rtp, - &conn.cipher, - final_payload_size, - )?; - } - - RtpPacket::minimum_packet_size() + final_payload_size + // If passthrough, Opus payload in place already. + // Else encode into buffer with space for AEAD encryption headers. + let payload_len = match mix_len { + MixType::Passthrough(opus_len) => opus_len, + MixType::MixedPcm(_samples) => { + let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); + self.encoder.encode_float( + &send_buffer[..self.config.mix_mode.sample_count_in_frame()], + &mut payload[TAG_SIZE..total_payload_space], + )? + }, }; + let final_payload_size = conn + .crypto_state + .write_packet_nonce(&mut rtp, TAG_SIZE + payload_len); + + // Packet encryption ignored in test modes. + #[cfg(not(test))] + let encrypt = true; + #[cfg(test)] + let encrypt = self.config.override_connection.is_none(); + + if encrypt { + conn.crypto_state.kind().encrypt_in_place( + &mut rtp, + &conn.cipher, + final_payload_size, + )?; + } + + Ok(RtpPacket::minimum_packet_size() + final_payload_size) + } + + #[inline] + pub(crate) fn send_packet(&self, packet: &[u8]) -> Result<()> { + #[cfg(test)] + let send_status = if let Some(OutputMode::Raw(tx)) = &self.config.override_connection { + // This case has been handled before buffer clearing in `mix_and_build_packet`. + drop(tx.send(self.raw_msg.clone().unwrap().into())); + + Ok(()) + } else { + self._send_packet(packet) + }; + + #[cfg(not(test))] + let send_status = self._send_packet(packet); + + send_status.or_else(Error::disarm_would_block)?; + + Ok(()) + } + + #[inline] + fn _send_packet(&self, packet: &[u8]) -> Result<()> { + let conn = self + .conn_active + .as_ref() + .expect("Shouldn't be mixing packets without access to a cipher + UDP dest."); + #[cfg(test)] if let Some(OutputMode::Rtp(tx)) = &self.config.override_connection { // Test mode: send unencrypted (compressed) packets to local receiver. - drop(tx.send(self.packet[..index].to_vec().into())); + drop(tx.send(packet.to_vec().into())); } else { - conn.udp_tx.send(&self.packet[..index])?; + conn.udp_tx.send(packet)?; } #[cfg(not(test))] { // Normal operation: send encrypted payload to UDP Tx task. - conn.udp_tx.send(&self.packet[..index])?; + conn.udp_tx.send(packet)?; } Ok(()) } #[inline] - fn advance_rtp_counters(&mut self) { - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( - "FATAL: Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); - rtp.set_sequence(rtp.get_sequence() + 1); - rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); - } - - #[inline] - // Even if we don't send a packet, we *do* need to keep advancing the timestamp - // to make it easier for a receiver to reorder packets and compute jitter measures - // wrt. our clock difference vs. theirs. - fn advance_rtp_timestamp(&mut self) { - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( - "FATAL: Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); - rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); - } - - #[inline] - fn check_and_send_keepalive(&mut self) -> Result<()> { + pub(crate) fn check_and_send_keepalive(&mut self, now: Option) -> Result<()> { if let Some(conn) = self.conn_active.as_mut() { - if Instant::now() >= self.keepalive_deadline { + let now = now.unwrap_or_else(Instant::now); + if now >= self.keepalive_deadline { conn.udp_tx.send(&self.keepalive_packet)?; self.keepalive_deadline += UDP_KEEPALIVE_GAP; } @@ -836,9 +720,29 @@ impl Mixer { } #[inline] - fn mix_tracks(&mut self) -> MixType { + pub(crate) fn send_gateway_speaking(&self) -> Result<()> { + if let Some(ws) = &self.ws { + ws.send(WsMessage::Speaking(true))?; + } + + Ok(()) + } + + #[inline] + pub(crate) fn send_gateway_not_speaking(&self) { + if let Some(ws) = &self.ws { + // NOTE: this explicit `drop` should prevent a catastrophic thread pileup. + // A full reconnect might cause an inner closed connection. + // It's safer to leave the central task to clean this up and + // pass the mixer a new channel. + drop(ws.send(WsMessage::Speaking(false))); + } + } + + #[inline] + fn mix_tracks(&mut self, packet: &mut [u8]) -> MixType { // Get a slice of bytes to write in data for Opus packet passthrough. - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + let mut rtp = MutableRtpPacket::new(packet).expect( "FATAL: Too few bytes in self.packet for RTP header.\ (Blame: VOICE_PACKET_MAX?)", ); @@ -962,17 +866,3 @@ impl Mixer { MixType::MixedPcm(len) } } - -/// The mixing thread is a synchronous context due to its compute-bound nature. -/// -/// We pass in an async handle for the benefit of some Input classes (e.g., restartables) -/// who need to run their restart code elsewhere and return blank data until such time. -#[instrument(skip(interconnect, mix_rx, async_handle))] -pub(crate) fn runner( - interconnect: Interconnect, - mix_rx: Receiver, - async_handle: Handle, - config: Config, -) { - Mixer::new(mix_rx, async_handle, interconnect, config).run(); -} diff --git a/src/driver/tasks/mixer/pool.rs b/src/driver/tasks/mixer/pool.rs index ff32a2e..0052d59 100644 --- a/src/driver/tasks/mixer/pool.rs +++ b/src/driver/tasks/mixer/pool.rs @@ -24,7 +24,7 @@ impl BlockyTaskPool { pub fn new(handle: Handle) -> Self { Self { pool: Arc::new(RwLock::new(rusty_pool::ThreadPool::new( - 1, + 0, 64, Duration::from_secs(300), ))), diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index 584375b..665d0e9 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -23,7 +23,7 @@ use crate::{ }; use flume::{Receiver, Sender}; use message::*; -use tokio::{runtime::Handle, spawn, time::sleep as tsleep}; +use tokio::{spawn, time::sleep as tsleep}; use tracing::{debug, instrument, trace}; pub(crate) fn start(config: Config, rx: Receiver, tx: Sender) { @@ -34,7 +34,7 @@ pub(crate) fn start(config: Config, rx: Receiver, tx: Sender, config: Config) -> Interconnect { +fn start_internals(core: Sender, config: &Config) -> Interconnect { let (evt_tx, evt_rx) = flume::unbounded(); let (mix_tx, mix_rx) = flume::unbounded(); @@ -52,12 +52,7 @@ fn start_internals(core: Sender, config: Config) -> Interconnect { }); let ic = interconnect.clone(); - let handle = Handle::current(); - std::thread::spawn(move || { - trace!("Mixer started."); - mixer::runner(ic, mix_rx, handle, config); - trace!("Mixer finished."); - }); + config.get_scheduler().new_mixer(config, ic, mix_rx); interconnect } @@ -66,7 +61,7 @@ fn start_internals(core: Sender, config: Config) -> Interconnect { async fn runner(mut config: Config, rx: Receiver, tx: Sender) { let mut next_config: Option = None; let mut connection: Option = None; - let mut interconnect = start_internals(tx, config.clone()); + let mut interconnect = start_internals(tx, &config); let mut retrying = None; let mut attempt_idx = 0; diff --git a/src/driver/test_impls.rs b/src/driver/test_impls.rs new file mode 100644 index 0000000..2dd9849 --- /dev/null +++ b/src/driver/test_impls.rs @@ -0,0 +1,225 @@ +#![allow(missing_docs)] + +use crate::{ + constants::*, + input::{ + cached::Compressed, + codecs::{CODEC_REGISTRY, PROBE}, + RawAdapter, + }, + test_utils, + tracks::LoopState, +}; +use flume::{Receiver, Sender}; +use std::{io::Cursor, net::UdpSocket, sync::Arc}; +use tokio::runtime::Handle; +use xsalsa20poly1305::{KeyInit, XSalsa20Poly1305 as Cipher, KEY_SIZE}; + +use super::{ + scheduler::*, + tasks::{message::*, mixer::Mixer}, + *, +}; + +// create a dummied task + interconnect. +// measure perf at varying numbers of sources (binary 1--64) without passthrough support. + +#[cfg(feature = "receive")] +pub type Listeners = ( + Receiver, + Receiver, + Receiver, +); + +#[cfg(not(feature = "receive"))] +pub type Listeners = (Receiver, Receiver); + +pub type DummyMixer = (Mixer, Listeners); + +impl Mixer { + pub fn mock(handle: Handle, softclip: bool) -> DummyMixer { + let (mix_tx, mix_rx) = flume::unbounded(); + let (core_tx, core_rx) = flume::unbounded(); + let (event_tx, event_rx) = flume::unbounded(); + + #[cfg(feature = "receive")] + let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded(); + + let ic = Interconnect { + core: core_tx, + events: event_tx, + mixer: mix_tx, + }; + + // Scheduler must be created from a Tokio context... + let (tx, rx) = flume::unbounded(); + handle.spawn_blocking(move || tx.send(crate::Config::default().use_softclip(softclip))); + let config = rx.recv().unwrap(); + + let mut out = Mixer::new(mix_rx, handle, ic, config); + + let udp_tx = UdpSocket::bind("0.0.0.0:0").expect("Failed to create send port."); + udp_tx + .connect("127.0.0.1:5316") + .expect("Failed to connect to local dest port."); + + #[cfg(feature = "receive")] + let fake_conn = MixerConnection { + cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(), + crypto_state: CryptoState::Normal, + udp_rx: udp_receiver_tx, + udp_tx, + }; + + #[cfg(not(feature = "receive"))] + let fake_conn = MixerConnection { + cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(), + crypto_state: CryptoState::Normal, + udp_tx, + }; + + out.conn_active = Some(fake_conn); + + #[cfg(feature = "receive")] + return (out, (core_rx, event_rx, udp_receiver_rx)); + + #[cfg(not(feature = "receive"))] + return (out, (core_rx, event_rx)); + } + + pub fn test_with_float(num_tracks: usize, handle: Handle, softclip: bool) -> DummyMixer { + let mut out = Self::mock(handle, softclip); + + let floats = test_utils::make_sine(10 * STEREO_FRAME_SIZE, true); + + for _ in 0..num_tracks { + let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into(); + let promoted = match input { + Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), + _ => panic!("Failed to create a guaranteed source."), + }; + let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context(); + _ = out.0.add_track(ctx); + } + + out + } + + pub fn test_with_float_unending(handle: Handle, softclip: bool) -> (DummyMixer, TrackHandle) { + let mut out = Self::mock(handle, softclip); + + let floats = test_utils::make_sine(10 * STEREO_FRAME_SIZE, true); + + let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into(); + let promoted = match input { + Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), + _ => panic!("Failed to create a guaranteed source."), + }; + let mut track = Track::from(Input::Live(promoted.unwrap(), None)); + track.loops = LoopState::Infinite; + + let (handle, ctx) = track.into_context(); + _ = out.0.add_track(ctx); + + (out, handle) + } + + pub fn test_with_float_drop(num_tracks: usize, handle: Handle) -> DummyMixer { + let mut out = Self::mock(handle, true); + + for i in 0..num_tracks { + let floats = test_utils::make_sine((i / 5) * STEREO_FRAME_SIZE, true); + let input: Input = RawAdapter::new(Cursor::new(floats.clone()), 48_000, 2).into(); + let promoted = match input { + Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), + _ => panic!("Failed to create a guaranteed source."), + }; + let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context(); + _ = out.0.add_track(ctx); + } + + out + } + + pub fn test_with_opus(handle: Handle) -> DummyMixer { + // should add a single opus-based track. + // make this fully loaded to prevent any perf cost there. + let mut out = Self::mock(handle.clone(), false); + + let floats = test_utils::make_sine(6 * STEREO_FRAME_SIZE, true); + + let input: Input = RawAdapter::new(Cursor::new(floats), 48_000, 2).into(); + + let mut src = handle.block_on(async move { + Compressed::new(input, Bitrate::BitsPerSecond(128_000)) + .await + .expect("These parameters are well-defined.") + }); + + src.raw.load_all(); + + let promoted = match src.into() { + Input::Live(l, _) => l.promote(&CODEC_REGISTRY, &PROBE), + _ => panic!("Failed to create a guaranteed source."), + }; + let (_, ctx) = Track::from(Input::Live(promoted.unwrap(), None)).into_context(); + + _ = out.0.add_track(ctx); + + out + } +} + +pub struct MockScheduler { + pub core: Live, + pub stats: Arc, + pub local: Arc, + pub rx: Receiver, + pub tx: Sender<(TaskId, ParkedMixer)>, + pub id: TaskId, +} + +impl MockScheduler { + pub fn new(mode: Option) -> Self { + let stats = Arc::new(StatBlock::default()); + let local = Arc::new(LiveStatBlock::default()); + + let (task_tx, task_rx) = flume::unbounded(); + let (sched_tx, sched_rx) = flume::unbounded(); + + let mut cfg = crate::driver::SchedulerConfig::default(); + cfg.strategy = mode.unwrap_or_default(); + + let core = Live::new( + WorkerId::new(), + cfg, + stats.clone(), + local.clone(), + task_rx, + sched_tx, + ); + Self { + core, + stats, + local, + rx: sched_rx, + tx: task_tx, + id: TaskId::new(), + } + } + + pub fn add_mixer_direct(&mut self, m: Mixer) { + let id = self.id.incr(); + self.core.add_task_direct(m, id); + } + + pub fn from_mixers(mode: Option, mixers: Vec) -> (Self, Vec) { + let mut out = Self::new(mode); + let mut listeners = vec![]; + for (mixer, listener) in mixers { + out.add_mixer_direct(mixer); + listeners.push(listener); + } + (out, listeners) + } +} diff --git a/src/error.rs b/src/error.rs index db4b3e1..8b54ad1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -143,6 +143,9 @@ pub type JoinResult = Result; #[cfg(feature = "driver")] pub use crate::{ - driver::connection::error::{Error as ConnectionError, Result as ConnectionResult}, + driver::{ + connection::error::{Error as ConnectionError, Result as ConnectionResult}, + SchedulerError, + }, tracks::{ControlError, PlayError, TrackResult}, }; diff --git a/src/input/adapters/async_adapter.rs b/src/input/adapters/async_adapter.rs index 6131f12..c6f1199 100644 --- a/src/input/adapters/async_adapter.rs +++ b/src/input/adapters/async_adapter.rs @@ -177,7 +177,7 @@ impl AsyncAdapterStream { }; tokio::spawn(async move { - sink.launch().await; + Box::pin(sink.launch()).await; }); stream diff --git a/src/input/sources/file.rs b/src/input/sources/file.rs index fade69d..2d9fe57 100644 --- a/src/input/sources/file.rs +++ b/src/input/sources/file.rs @@ -1,7 +1,6 @@ -use crate::input::{AudioStream, AudioStreamError, AuxMetadata, Compose, Input}; +use crate::input::{AudioStream, AudioStreamError, Compose, Input}; use std::{error::Error, ffi::OsStr, path::Path}; use symphonia_core::{io::MediaSource, probe::Hint}; -use tokio::process::Command; /// A lazily instantiated local file. #[derive(Clone, Debug)] @@ -56,25 +55,32 @@ impl + Send + Sync> Compose for File

{ true } - /// Probes for metadata about this audio files using `ffprobe`. - async fn aux_metadata(&mut self) -> Result { - let args = [ - "-v", - "quiet", - "-of", - "json", - "-show_format", - "-show_streams", - "-i", - ]; + // SEE: issue #186 + // Below is removed due to issues with: + // 1) deadlocks on small files. + // 2) serde_aux poorly handles missing field names. + // - let mut output = Command::new("ffprobe") - .args(args) - .output() - .await - .map_err(|e| AudioStreamError::Fail(Box::new(e)))?; + // Probes for metadata about this audio file using `ffprobe`. + // async fn aux_metadata(&mut self) -> Result { + // let args = [ + // "-v", + // "quiet", + // "-of", + // "json", + // "-show_format", + // "-show_streams", + // "-i", + // ]; - AuxMetadata::from_ffprobe_json(&mut output.stdout[..]) - .map_err(|e| AudioStreamError::Fail(Box::new(e))) - } + // let mut output = Command::new("ffprobe") + // .args(args) + // .arg(self.path.as_ref().as_os_str()) + // .output() + // .await + // .map_err(|e| AudioStreamError::Fail(Box::new(e)))?; + + // AuxMetadata::from_ffprobe_json(&mut output.stdout[..]) + // .map_err(|e| AudioStreamError::Fail(Box::new(e))) + // } } diff --git a/src/lib.rs b/src/lib.rs index 54324b1..483ffd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -98,6 +98,8 @@ mod manager; pub mod serenity; #[cfg(feature = "gateway")] pub mod shards; +#[cfg(any(test, feature = "internals"))] +pub mod test_utils; #[cfg(feature = "driver")] pub mod tracks; #[cfg(feature = "driver")] diff --git a/src/shards.rs b/src/shards.rs index d0b6e64..71e3a00 100644 --- a/src/shards.rs +++ b/src/shards.rs @@ -37,16 +37,19 @@ impl TwilightMap { /// Construct a map of shards and command senders to those shards. /// /// For correctness all shards should be in the map. + #[must_use] pub fn new(map: std::collections::HashMap) -> Self { TwilightMap { map } } /// Get the message sender for `shard_id`. + #[must_use] pub fn get(&self, shard_id: u64) -> Option<&MessageSender> { self.map.get(&shard_id) } /// Get the total number of shards in the map. + #[must_use] pub fn shard_count(&self) -> u64 { self.map.len() as u64 } diff --git a/utils/src/lib.rs b/src/test_utils.rs similarity index 98% rename from utils/src/lib.rs rename to src/test_utils.rs index 35bcf34..061612d 100644 --- a/utils/src/lib.rs +++ b/src/test_utils.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use byteorder::{LittleEndian, WriteBytesExt}; use std::mem; diff --git a/utils/Cargo.toml b/utils/Cargo.toml deleted file mode 100644 index 9bb2698..0000000 --- a/utils/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "utils" -version = "0.1.0" -authors = ["Kyle Simpson "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -byteorder = "1" diff --git a/utils/README.md b/utils/README.md deleted file mode 100644 index fcb910f..0000000 --- a/utils/README.md +++ /dev/null @@ -1 +0,0 @@ -Test utilities for testing and benchmarking songbird.