Fix: hand off process killing to blocking thread, await all children.

This should make dropping `ChildContainer`s and their parent `Input`s safer in async contexts.

It seems like SIGINT is insufficient to make wait terminate, but SIGKILL suffices. This introduced a new problem, namely that we have to remember and wait on *every* pid we create. This should, hopefully, put the issue of zombie processes to bed for good.
This commit is contained in:
Kyle Simpson
2021-02-01 13:43:26 +00:00
parent 7d4891d32c
commit b2453091e7
4 changed files with 54 additions and 33 deletions

View File

@@ -52,10 +52,6 @@ version = "0.10"
[dependencies.futures] [dependencies.futures]
version = "0.3" version = "0.3"
[dependencies.nix]
version = "0.19"
optional = true
[dependencies.parking_lot] [dependencies.parking_lot]
optional = true optional = true
version = "0.11" version = "0.11"
@@ -137,7 +133,6 @@ driver = [
"byteorder", "byteorder",
"discortp", "discortp",
"flume", "flume",
"nix",
"parking_lot", "parking_lot",
"rand", "rand",
"serenity-voice-model", "serenity-voice-model",

View File

@@ -1,55 +1,79 @@
use super::*; use super::*;
use std::{ use std::{
io::{BufReader, Read}, io::{BufReader, Read},
mem,
process::Child, process::Child,
}; };
use tokio::runtime::Handle;
use tracing::debug; use tracing::debug;
#[cfg(unix)]
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
/// Handle for a child process which ensures that any subprocesses are properly closed /// Handle for a child process which ensures that any subprocesses are properly closed
/// on drop. /// on drop.
///
/// # Warning
/// To allow proper cleanup of child processes, if you create a process chain you must
/// make sure to use `From<Vec<Child>>`. Here, the *last* process in the `Vec` will be
/// used as the audio byte source.
#[derive(Debug)] #[derive(Debug)]
pub struct ChildContainer(Child); pub struct ChildContainer(Vec<Child>);
pub(crate) fn child_to_reader<T>(child: Child) -> Reader { pub(crate) fn children_to_reader<T>(children: Vec<Child>) -> Reader {
Reader::Pipe(BufReader::with_capacity( Reader::Pipe(BufReader::with_capacity(
STEREO_FRAME_SIZE * mem::size_of::<T>() * CHILD_BUFFER_LEN, STEREO_FRAME_SIZE * mem::size_of::<T>() * CHILD_BUFFER_LEN,
ChildContainer(child), ChildContainer(children),
)) ))
} }
impl From<Child> for Reader { impl From<Child> for Reader {
fn from(container: Child) -> Self { fn from(container: Child) -> Self {
child_to_reader::<f32>(container) children_to_reader::<f32>(vec![container])
}
}
impl From<Vec<Child>> for Reader {
fn from(container: Vec<Child>) -> Self {
children_to_reader::<f32>(container)
} }
} }
impl Read for ChildContainer { impl Read for ChildContainer {
fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> { fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> {
self.0.stdout.as_mut().unwrap().read(buffer) match self.0.last_mut() {
Some(ref mut child) => child.stdout.as_mut().unwrap().read(buffer),
None => Ok(0),
}
} }
} }
impl Drop for ChildContainer { impl Drop for ChildContainer {
fn drop(&mut self) { fn drop(&mut self) {
#[cfg(not(unix))] let children = mem::take(&mut self.0);
let attempt = self.0.kill();
#[cfg(unix)] if let Ok(handle) = Handle::try_current() {
let attempt = { handle.spawn_blocking(move || {
let pid = Pid::from_raw(self.0.id() as i32); cleanup_child_processes(children);
let _ = signal::kill(pid, Signal::SIGINT); });
} else {
cleanup_child_processes(children);
}
}
}
self.0.wait() fn cleanup_child_processes(mut children: Vec<Child>) {
let attempt = if let Some(child) = children.last_mut() {
child.kill()
} else {
return;
}; };
let attempt = attempt.and_then(|_| {
children
.iter_mut()
.rev()
.try_for_each(|child| child.wait().map(|_| ()))
});
if let Err(e) = attempt { if let Err(e) = attempt {
debug!("Error awaiting child process: {:?}", e); debug!("Error awaiting child process: {:?}", e);
} }
} }
}

View File

@@ -1,5 +1,5 @@
use super::{ use super::{
child_to_reader, children_to_reader,
error::{Error, Result}, error::{Error, Result},
Codec, Codec,
Container, Container,
@@ -115,7 +115,7 @@ pub(crate) async fn _ffmpeg_optioned(
Ok(Input::new( Ok(Input::new(
is_stereo, is_stereo,
child_to_reader::<f32>(command), children_to_reader::<f32>(vec![command]),
Codec::FloatPcm, Codec::FloatPcm,
Container::Raw, Container::Raw,
Some(metadata), Some(metadata),

View File

@@ -1,5 +1,5 @@
use super::{ use super::{
child_to_reader, children_to_reader,
error::{Error, Result}, error::{Error, Result},
Codec, Codec,
Container, Container,
@@ -92,12 +92,14 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result<Input> {
youtube_dl.stderr = Some(returned_stderr); youtube_dl.stderr = Some(returned_stderr);
let taken_stdout = youtube_dl.stdout.take().ok_or(Error::Stdout)?;
let ffmpeg = Command::new("ffmpeg") let ffmpeg = Command::new("ffmpeg")
.args(pre_args) .args(pre_args)
.arg("-i") .arg("-i")
.arg("-") .arg("-")
.args(&ffmpeg_args) .args(&ffmpeg_args)
.stdin(youtube_dl.stdout.ok_or(Error::Stdout)?) .stdin(taken_stdout)
.stderr(Stdio::null()) .stderr(Stdio::null())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.spawn()?; .spawn()?;
@@ -108,7 +110,7 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result<Input> {
Ok(Input::new( Ok(Input::new(
true, true,
child_to_reader::<f32>(ffmpeg), children_to_reader::<f32>(vec![youtube_dl, ffmpeg]),
Codec::FloatPcm, Codec::FloatPcm,
Container::Raw, Container::Raw,
Some(metadata), Some(metadata),