Prevent mixer thread from waking while inactive (#46)

This change prevents mixer threads from waking every 20ms without an active voice connection. This was leading to unacceptably high CPU usage in cases where users needed to preserve this state between many active connections. Additionally, this modifies the documentation of `Songbird::leave` to emphasise why users would prefer to `remove` their calls.

This was tested by examining the CPU usage in task manager before and after the change was made, using a control of 10k manually created `Driver` instances. After creation is finished, the Drivers no longer saturate a 6-core laptop Intel i7 (while they very much did so before).

Closes #42.
This commit is contained in:
Kyle Simpson
2021-03-14 17:18:24 +00:00
committed by GitHub
parent c488ce3dc9
commit a9b4cb7715
2 changed files with 160 additions and 117 deletions

View File

@@ -109,110 +109,22 @@ impl Mixer {
let mut conn_failure = false; let mut conn_failure = false;
'runner: loop { 'runner: loop {
if self.conn_active.is_some() {
loop { loop {
use MixerMessage::*; use MixerMessage::*;
let error = match self.mix_rx.try_recv() { match self.mix_rx.try_recv() {
Ok(AddTrack(mut t)) => { Ok(m) => {
t.source.prep_with_handle(self.async_handle.clone()); let (events, conn, should_exit) = self.handle_message(m);
self.add_track(t) events_failure |= events;
}, conn_failure |= conn;
Ok(SetTrack(t)) => {
self.tracks.clear();
let mut out = self.fire_event(EventMessage::RemoveAllTracks); if should_exit {
break 'runner;
if let Some(mut t) = t {
t.source.prep_with_handle(self.async_handle.clone());
// Do this unconditionally: this affects local state infallibly,
// with the event installation being the remote part.
if let Err(e) = self.add_track(t) {
out = Err(e);
} }
}
out
},
Ok(SetBitrate(b)) => {
self.bitrate = b;
if let Err(e) = self.set_bitrate(b) {
error!("Failed to update bitrate {:?}", e);
}
Ok(())
},
Ok(SetMute(m)) => {
self.muted = m;
Ok(())
},
Ok(SetConn(conn, ssrc)) => {
self.conn_active = Some(conn);
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_ssrc(ssrc);
rtp.set_sequence(random::<u16>().into());
rtp.set_timestamp(random::<u32>().into());
self.deadline = Instant::now();
Ok(())
},
Ok(DropConn) => {
self.conn_active = None;
Ok(())
},
Ok(ReplaceInterconnect(i)) => {
self.prevent_events = false;
if let Some(ws) = &self.ws {
conn_failure |=
ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err();
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::ReplaceInterconnect(i.clone()))
.is_err();
}
self.interconnect = i;
self.rebuild_tracks()
},
Ok(SetConfig(new_config)) => {
self.config = new_config.clone();
if self.tracks.capacity() < self.config.preallocated_tracks {
self.tracks
.reserve(self.config.preallocated_tracks - self.tracks.len());
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::SetConfig(new_config))
.is_err();
}
Ok(())
},
Ok(RebuildEncoder) => match new_encoder(self.bitrate) {
Ok(encoder) => {
self.encoder = encoder;
Ok(())
},
Err(e) => {
error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e);
self.bitrate = DEFAULT_BITRATE;
self.encoder = new_encoder(self.bitrate)
.expect("Failed fallback rebuild of OpusEncoder with safe inputs.");
Ok(())
},
},
Ok(Ws(new_ws_handle)) => {
self.ws = new_ws_handle;
Ok(())
}, },
Err(TryRecvError::Disconnected) | Ok(Poison) => { Err(TryRecvError::Disconnected) => {
break 'runner; break 'runner;
}, },
@@ -220,11 +132,6 @@ impl Mixer {
break; break;
}, },
}; };
if let Err(e) = error {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();
}
} }
if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) {
@@ -233,6 +140,22 @@ impl Mixer {
error!("Mixer thread cycle: {:?}", e); error!("Mixer thread cycle: {:?}", e);
} }
} else {
match self.mix_rx.recv() {
Ok(m) => {
let (events, conn, should_exit) = self.handle_message(m);
events_failure |= events;
conn_failure |= conn;
if should_exit {
break 'runner;
}
},
Err(_) => {
break 'runner;
},
}
}
// event failure? rebuild interconnect. // event failure? rebuild interconnect.
// ws or udp failure? full connect // ws or udp failure? full connect
@@ -266,6 +189,126 @@ impl Mixer {
} }
} }
#[inline]
fn handle_message(&mut self, msg: MixerMessage) -> (bool, bool, bool) {
let mut events_failure = false;
let mut conn_failure = false;
let mut should_exit = false;
use MixerMessage::*;
let error = match msg {
AddTrack(mut t) => {
t.source.prep_with_handle(self.async_handle.clone());
self.add_track(t)
},
SetTrack(t) => {
self.tracks.clear();
let mut out = self.fire_event(EventMessage::RemoveAllTracks);
if let Some(mut t) = t {
t.source.prep_with_handle(self.async_handle.clone());
// Do this unconditionally: this affects local state infallibly,
// with the event installation being the remote part.
if let Err(e) = self.add_track(t) {
out = Err(e);
}
}
out
},
SetBitrate(b) => {
self.bitrate = b;
if let Err(e) = self.set_bitrate(b) {
error!("Failed to update bitrate {:?}", e);
}
Ok(())
},
SetMute(m) => {
self.muted = m;
Ok(())
},
SetConn(conn, ssrc) => {
self.conn_active = Some(conn);
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_ssrc(ssrc);
rtp.set_sequence(random::<u16>().into());
rtp.set_timestamp(random::<u32>().into());
self.deadline = Instant::now();
Ok(())
},
DropConn => {
self.conn_active = None;
Ok(())
},
ReplaceInterconnect(i) => {
self.prevent_events = false;
if let Some(ws) = &self.ws {
conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err();
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::ReplaceInterconnect(i.clone()))
.is_err();
}
self.interconnect = i;
self.rebuild_tracks()
},
SetConfig(new_config) => {
self.config = new_config.clone();
if self.tracks.capacity() < self.config.preallocated_tracks {
self.tracks
.reserve(self.config.preallocated_tracks - self.tracks.len());
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::SetConfig(new_config))
.is_err();
}
Ok(())
},
RebuildEncoder => match new_encoder(self.bitrate) {
Ok(encoder) => {
self.encoder = encoder;
Ok(())
},
Err(e) => {
error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e);
self.bitrate = DEFAULT_BITRATE;
self.encoder = new_encoder(self.bitrate)
.expect("Failed fallback rebuild of OpusEncoder with safe inputs.");
Ok(())
},
},
Ws(new_ws_handle) => {
self.ws = new_ws_handle;
Ok(())
},
Poison => {
should_exit = true;
Ok(())
},
};
if let Err(e) = error {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();
}
(events_failure, conn_failure, should_exit)
}
#[inline] #[inline]
fn fire_event(&self, event: EventMessage) -> Result<()> { fn fire_event(&self, event: EventMessage) -> Result<()> {
// As this task is responsible for noticing the potential death of an event context, // As this task is responsible for noticing the potential death of an event context,
@@ -360,17 +403,13 @@ impl Mixer {
return; return;
} }
// FIXME: make choice of spin-sleep/imprecise sleep optional in next breaking.
self.sleeper self.sleeper
.sleep(self.deadline.saturating_duration_since(Instant::now())); .sleep(self.deadline.saturating_duration_since(Instant::now()));
self.deadline += TIMESTEP_LENGTH; self.deadline += TIMESTEP_LENGTH;
} }
pub fn cycle(&mut self) -> Result<()> { pub fn cycle(&mut self) -> Result<()> {
if self.conn_active.is_none() {
self.march_deadline();
return Ok(());
}
let mut mix_buffer = [0f32; STEREO_FRAME_SIZE]; let mut mix_buffer = [0f32; STEREO_FRAME_SIZE];
// Walk over all the audio files, combining into one audio frame according // Walk over all the audio files, combining into one audio frame according

View File

@@ -267,6 +267,9 @@ impl Songbird {
/// associated voice channel, if connected. /// associated voice channel, if connected.
/// ///
/// This will _not_ drop the handler, and will preserve it and its settings. /// This will _not_ drop the handler, and will preserve it and its settings.
/// If you do not need to reuse event handlers, configuration, or active tracks
/// in the underlying driver *consider calling [`remove`]* to release tasks,
/// threads, and memory.
/// ///
/// This is a wrapper around [getting][`get`] a handler and calling /// This is a wrapper around [getting][`get`] a handler and calling
/// [`leave`] on it. /// [`leave`] on it.
@@ -274,6 +277,7 @@ impl Songbird {
/// [`Call`]: Call /// [`Call`]: Call
/// [`get`]: Songbird::get /// [`get`]: Songbird::get
/// [`leave`]: Call::leave /// [`leave`]: Call::leave
/// [`remove`]: Songbird::remove
#[inline] #[inline]
pub async fn leave<G: Into<GuildId>>(&self, guild_id: G) -> JoinResult<()> { pub async fn leave<G: Into<GuildId>>(&self, guild_id: G) -> JoinResult<()> {
self._leave(guild_id.into()).await self._leave(guild_id.into()).await