diff --git a/src/config.rs b/src/config.rs index 8970562..bc67643 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use crate::driver::DecodeMode; #[cfg(feature = "driver")] use crate::{ - driver::{retry::Retry, CryptoMode, MixMode}, + driver::{retry::Retry, tasks::disposal::DisposalThread, CryptoMode, MixMode}, input::codecs::*, }; @@ -143,6 +143,16 @@ pub struct Config { /// /// [`PROBE`]: static@PROBE pub format_registry: &'static Probe, + #[cfg(feature = "driver")] + /// The Sender for a channel that will run the destructor of possibly blocking values. + /// + /// If not set, a thread will be spawned to perform this, but it is recommended to create + /// a long running thread instead of relying on a per-driver thread. + /// + /// Note: When using [`Songbird`] this is overwritten automatically by its disposal thread. + /// + /// [`Songbird`]: crate::Songbird + pub disposer: Option, // Test only attributes #[cfg(feature = "driver")] @@ -181,6 +191,8 @@ impl Default for Config { #[cfg(feature = "driver")] format_registry: &PROBE, #[cfg(feature = "driver")] + disposer: None, + #[cfg(feature = "driver")] #[cfg(test)] tick_style: TickStyle::Timed, #[cfg(feature = "driver")] @@ -264,6 +276,23 @@ impl Config { self } + /// Sets this `Config`'s channel for sending disposal messages. + #[must_use] + pub fn disposer(mut self, disposer: DisposalThread) -> Self { + self.disposer = Some(disposer); + self + } + + /// Ensures a global disposer has been set, initializing one if not. + #[must_use] + pub(crate) fn initialise_disposer(self) -> Self { + if self.disposer.is_some() { + self + } else { + self.disposer(DisposalThread::run()) + } + } + /// This is used to prevent changes which would invalidate the current session. pub(crate) fn make_safe(&mut self, previous: &Config, connected: bool) { if connected { @@ -272,6 +301,13 @@ impl Config { } } +#[cfg(not(feature = "driver"))] +impl Config { + pub(crate) fn initialise_disposer(self) -> Self { + self + } +} + // Test only attributes #[cfg(all(test, feature = "driver"))] impl Config { diff --git a/src/driver/tasks/disposal.rs b/src/driver/tasks/disposal.rs index 0b13014..1e88026 100644 --- a/src/driver/tasks/disposal.rs +++ b/src/driver/tasks/disposal.rs @@ -1,6 +1,32 @@ use super::message::*; -use flume::Receiver; -use tracing::instrument; +use flume::{Receiver, Sender}; +use tracing::{instrument, trace}; + +#[derive(Debug, Clone)] +pub struct DisposalThread(Sender); + +impl Default for DisposalThread { + fn default() -> Self { + Self::run() + } +} + +impl DisposalThread { + pub fn run() -> Self { + let (mix_tx, mix_rx) = flume::unbounded(); + std::thread::spawn(move || { + trace!("Disposal thread started."); + runner(mix_rx); + trace!("Disposal thread finished."); + }); + + Self(mix_tx) + } + + pub(super) fn dispose(&self, message: DisposalMessage) { + drop(self.0.send(message)) + } +} /// The mixer's disposal thread is also synchronous, due to tracks, /// inputs, etc. being based on synchronous I/O. @@ -8,6 +34,6 @@ use tracing::instrument; /// The mixer uses this to offload heavy and expensive drop operations /// to prevent deadline misses. #[instrument(skip(mix_rx))] -pub(crate) fn runner(mix_rx: Receiver) { +fn runner(mix_rx: Receiver) { while mix_rx.recv().is_ok() {} } diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 648bfd6..04a8d88 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -11,7 +11,7 @@ use state::*; pub use track::*; use super::{ - disposal, + disposal::DisposalThread, error::{Error, Result}, message::*, }; @@ -65,7 +65,7 @@ pub struct Mixer { pub conn_active: Option, pub content_prep_sequence: u64, pub deadline: Instant, - pub disposer: Sender, + pub disposer: DisposalThread, pub encoder: OpusEncoder, pub interconnect: Interconnect, pub mix_rx: Receiver, @@ -126,14 +126,11 @@ impl Mixer { let tracks = Vec::with_capacity(1.max(config.preallocated_tracks)); let track_handles = Vec::with_capacity(1.max(config.preallocated_tracks)); - // Create an object disposal thread here. - let (disposer, disposal_rx) = flume::unbounded(); - std::thread::spawn(move || disposal::runner(disposal_rx)); - let thread_pool = BlockyTaskPool::new(async_handle); let symph_layout = config.mix_mode.symph_layout(); + let disposer = config.disposer.clone().unwrap_or_default(); let config = config.into(); let sample_buffer = SampleBuffer::::new( @@ -538,12 +535,11 @@ impl Mixer { if track.playing.is_done() { let p_state = track.playing.clone(); let to_drop = self.tracks.swap_remove(i); - drop( - self.disposer - .send(DisposalMessage::Track(Box::new(to_drop))), - ); + self.disposer + .dispose(DisposalMessage::Track(Box::new(to_drop))); + let to_drop = self.track_handles.swap_remove(i); - drop(self.disposer.send(DisposalMessage::Handle(to_drop))); + self.disposer.dispose(DisposalMessage::Handle(to_drop)); self.fire_event(EventMessage::ChangeState( i, diff --git a/src/manager.rs b/src/manager.rs index f2d6313..34782af 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -49,7 +49,7 @@ pub struct Songbird { client_data: OnceCell, calls: DashMap>>, sharder: Sharder, - config: PRwLock>, + config: PRwLock, } impl Songbird { @@ -76,7 +76,7 @@ impl Songbird { client_data: OnceCell::new(), calls: DashMap::new(), sharder: Sharder::Serenity(SerenitySharder::default()), - config: Some(config).into(), + config: config.initialise_disposer().into(), }) } @@ -114,7 +114,7 @@ impl Songbird { }), calls: DashMap::new(), sharder: Sharder::TwilightCluster(cluster), - config: Some(config).into(), + config: config.initialise_disposer().into(), } } @@ -176,7 +176,7 @@ impl Songbird { guild_id, shard_handle, info.user_id, - self.config.read().clone().unwrap_or_default(), + self.config.read().clone(), ); Arc::new(Mutex::new(call)) @@ -193,7 +193,7 @@ impl Songbird { /// Requires the `"driver"` feature. pub fn set_config(&self, new_config: Config) { let mut config = self.config.write(); - *config = Some(new_config); + *config = new_config; } #[cfg(feature = "driver")]