diff --git a/Cargo.toml b/Cargo.toml index a91b7a1..f4d6fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,10 @@ version = "0.10" [dependencies.futures] version = "0.3" +[dependencies.nix] +version = "0.19" +optional = true + [dependencies.parking_lot] optional = true version = "0.11" @@ -133,6 +137,7 @@ driver = [ "byteorder", "discortp", "flume", + "nix", "parking_lot", "rand", "serenity-voice-model", diff --git a/src/driver/tasks/disposal.rs b/src/driver/tasks/disposal.rs new file mode 100644 index 0000000..b10a56f --- /dev/null +++ b/src/driver/tasks/disposal.rs @@ -0,0 +1,18 @@ +use super::message::*; +use flume::Receiver; +use tracing::instrument; + +/// The mixer's disposal thread is also synchronous, due to tracks, +/// inputs, etc. being based on synchronous I/O. +/// +/// The mixer uses this to offload heavy and expensive drop operations +/// to prevent deadline misses. +#[instrument(skip(mix_rx))] +pub(crate) fn runner(mix_rx: Receiver) { + loop { + match mix_rx.recv() { + Err(_) | Ok(DisposalMessage::Poison) => break, + _ => {}, + } + } +} diff --git a/src/driver/tasks/message/disposal.rs b/src/driver/tasks/message/disposal.rs new file mode 100644 index 0000000..84df7e6 --- /dev/null +++ b/src/driver/tasks/message/disposal.rs @@ -0,0 +1,9 @@ +#![allow(missing_docs)] + +use crate::tracks::Track; + +pub enum DisposalMessage { + Track(Track), + + Poison, +} diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index d697e7f..6c20f90 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -1,13 +1,14 @@ #![allow(missing_docs)] mod core; +mod disposal; mod events; mod mixer; mod udp_rx; mod udp_tx; mod ws; -pub use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; +pub use self::{core::*, disposal::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; use flume::Sender; use tracing::info; diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index cf85707..e6af8d6 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -1,4 +1,4 @@ -use super::{error::Result, message::*, Config}; +use super::{disposal, error::Result, message::*, Config}; use crate::{ constants::*, tracks::{PlayMode, Track}, @@ -28,6 +28,7 @@ pub struct Mixer { pub config: Config, pub conn_active: Option, pub deadline: Instant, + pub disposer: Sender, pub encoder: OpusEncoder, pub interconnect: Interconnect, pub mix_rx: Receiver, @@ -74,12 +75,17 @@ impl Mixer { let tracks = Vec::with_capacity(1.max(config.preallocated_tracks)); + // Create an object disposal thread here. + let (disposer, disposal_rx) = flume::unbounded(); + std::thread::spawn(move || disposal::runner(disposal_rx)); + Self { async_handle, bitrate, config, conn_active: None, deadline: Instant::now(), + disposer, encoder, interconnect, mix_rx, @@ -322,12 +328,13 @@ impl Mixer { if track.playing.is_done() { let p_state = track.playing(); - self.tracks.swap_remove(i); + let to_drop = self.tracks.swap_remove(i); to_remove.push(i); self.fire_event(EventMessage::ChangeState( i, TrackStateChange::Mode(p_state), ))?; + let _ = self.disposer.send(DisposalMessage::Track(to_drop)); } else { i += 1; } @@ -580,4 +587,6 @@ pub(crate) fn runner( let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config); mixer.run(); + + let _ = mixer.disposer.send(DisposalMessage::Poison); } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index f6e3411..f303231 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -1,5 +1,6 @@ #![allow(missing_docs)] +pub(crate) mod disposal; pub mod error; mod events; pub mod message; diff --git a/src/input/child.rs b/src/input/child.rs index 47d57f9..6cede9e 100644 --- a/src/input/child.rs +++ b/src/input/child.rs @@ -5,6 +5,12 @@ use std::{ }; use tracing::debug; +#[cfg(unix)] +use nix::{ + sys::signal::{self, Signal}, + unistd::Pid, +}; + /// Handle for a child process which ensures that any subprocesses are properly closed /// on drop. #[derive(Debug)] @@ -31,7 +37,18 @@ impl Read for ChildContainer { impl Drop for ChildContainer { fn drop(&mut self) { - if let Err(e) = self.0.kill() { + #[cfg(not(unix))] + let attempt = self.0.kill(); + + #[cfg(unix)] + let attempt = { + let pid = Pid::from_raw(self.0.id() as i32); + let _ = signal::kill(pid, Signal::SIGINT); + + self.0.wait() + }; + + if let Err(e) = attempt { debug!("Error awaiting child process: {:?}", e); } }