From a9b4cb7715f104dbc7aedb9859d6553914f32879 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sun, 14 Mar 2021 17:18:24 +0000 Subject: [PATCH] Prevent mixer thread from waking while inactive (#46) This change prevents mixer threads from waking every 20ms without an active voice connection. This was leading to unacceptably high CPU usage in cases where users needed to preserve this state between many active connections. Additionally, this modifies the documentation of `Songbird::leave` to emphasise why users would prefer to `remove` their calls. This was tested by examining the CPU usage in task manager before and after the change was made, using a control of 10k manually created `Driver` instances. After creation is finished, the Drivers no longer saturate a 6-core laptop Intel i7 (while they very much did so before). Closes #42. --- src/driver/tasks/mixer.rs | 273 ++++++++++++++++++++++---------------- src/manager.rs | 4 + 2 files changed, 160 insertions(+), 117 deletions(-) diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index 77e11fb..22f6275 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -109,129 +109,52 @@ impl Mixer { let mut conn_failure = false; 'runner: loop { - loop { - use MixerMessage::*; + if self.conn_active.is_some() { + loop { + use MixerMessage::*; - let error = match self.mix_rx.try_recv() { - Ok(AddTrack(mut t)) => { - t.source.prep_with_handle(self.async_handle.clone()); - self.add_track(t) - }, - Ok(SetTrack(t)) => { - self.tracks.clear(); + match self.mix_rx.try_recv() { + Ok(m) => { + let (events, conn, should_exit) = self.handle_message(m); + events_failure |= events; + conn_failure |= conn; - let mut out = self.fire_event(EventMessage::RemoveAllTracks); - - if let Some(mut t) = t { - t.source.prep_with_handle(self.async_handle.clone()); - - // Do this unconditionally: this affects local state infallibly, - // with the event installation being the remote part. - if let Err(e) = self.add_track(t) { - out = Err(e); + if should_exit { + break 'runner; } - } - - out - }, - Ok(SetBitrate(b)) => { - self.bitrate = b; - if let Err(e) = self.set_bitrate(b) { - error!("Failed to update bitrate {:?}", e); - } - Ok(()) - }, - Ok(SetMute(m)) => { - self.muted = m; - Ok(()) - }, - Ok(SetConn(conn, ssrc)) => { - self.conn_active = Some(conn); - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( - "Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); - rtp.set_ssrc(ssrc); - rtp.set_sequence(random::().into()); - rtp.set_timestamp(random::().into()); - self.deadline = Instant::now(); - Ok(()) - }, - Ok(DropConn) => { - self.conn_active = None; - Ok(()) - }, - Ok(ReplaceInterconnect(i)) => { - self.prevent_events = false; - if let Some(ws) = &self.ws { - conn_failure |= - ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); - } - if let Some(conn) = &self.conn_active { - conn_failure |= conn - .udp_rx - .send(UdpRxMessage::ReplaceInterconnect(i.clone())) - .is_err(); - } - self.interconnect = i; - - self.rebuild_tracks() - }, - Ok(SetConfig(new_config)) => { - self.config = new_config.clone(); - - if self.tracks.capacity() < self.config.preallocated_tracks { - self.tracks - .reserve(self.config.preallocated_tracks - self.tracks.len()); - } - - if let Some(conn) = &self.conn_active { - conn_failure |= conn - .udp_rx - .send(UdpRxMessage::SetConfig(new_config)) - .is_err(); - } - - Ok(()) - }, - Ok(RebuildEncoder) => match new_encoder(self.bitrate) { - Ok(encoder) => { - self.encoder = encoder; - Ok(()) }, - Err(e) => { - error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e); - self.bitrate = DEFAULT_BITRATE; - self.encoder = new_encoder(self.bitrate) - .expect("Failed fallback rebuild of OpusEncoder with safe inputs."); - Ok(()) + + Err(TryRecvError::Disconnected) => { + break 'runner; }, - }, - Ok(Ws(new_ws_handle)) => { - self.ws = new_ws_handle; - Ok(()) - }, - Err(TryRecvError::Disconnected) | Ok(Poison) => { - break 'runner; - }, + Err(TryRecvError::Empty) => { + break; + }, + }; + } - Err(TryRecvError::Empty) => { - break; - }, - }; - - if let Err(e) = error { + if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { events_failure |= e.should_trigger_interconnect_rebuild(); conn_failure |= e.should_trigger_connect(); + + error!("Mixer thread cycle: {:?}", e); } - } + } else { + match self.mix_rx.recv() { + Ok(m) => { + let (events, conn, should_exit) = self.handle_message(m); + events_failure |= events; + conn_failure |= conn; - if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { - events_failure |= e.should_trigger_interconnect_rebuild(); - conn_failure |= e.should_trigger_connect(); - - error!("Mixer thread cycle: {:?}", e); + if should_exit { + break 'runner; + } + }, + Err(_) => { + break 'runner; + }, + } } // event failure? rebuild interconnect. @@ -266,6 +189,126 @@ impl Mixer { } } + #[inline] + fn handle_message(&mut self, msg: MixerMessage) -> (bool, bool, bool) { + let mut events_failure = false; + let mut conn_failure = false; + let mut should_exit = false; + + use MixerMessage::*; + + let error = match msg { + AddTrack(mut t) => { + t.source.prep_with_handle(self.async_handle.clone()); + self.add_track(t) + }, + SetTrack(t) => { + self.tracks.clear(); + + let mut out = self.fire_event(EventMessage::RemoveAllTracks); + + if let Some(mut t) = t { + t.source.prep_with_handle(self.async_handle.clone()); + + // Do this unconditionally: this affects local state infallibly, + // with the event installation being the remote part. + if let Err(e) = self.add_track(t) { + out = Err(e); + } + } + + out + }, + SetBitrate(b) => { + self.bitrate = b; + if let Err(e) = self.set_bitrate(b) { + error!("Failed to update bitrate {:?}", e); + } + Ok(()) + }, + SetMute(m) => { + self.muted = m; + Ok(()) + }, + SetConn(conn, ssrc) => { + self.conn_active = Some(conn); + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_ssrc(ssrc); + rtp.set_sequence(random::().into()); + rtp.set_timestamp(random::().into()); + self.deadline = Instant::now(); + Ok(()) + }, + DropConn => { + self.conn_active = None; + Ok(()) + }, + ReplaceInterconnect(i) => { + self.prevent_events = false; + if let Some(ws) = &self.ws { + conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); + } + if let Some(conn) = &self.conn_active { + conn_failure |= conn + .udp_rx + .send(UdpRxMessage::ReplaceInterconnect(i.clone())) + .is_err(); + } + self.interconnect = i; + + self.rebuild_tracks() + }, + SetConfig(new_config) => { + self.config = new_config.clone(); + + if self.tracks.capacity() < self.config.preallocated_tracks { + self.tracks + .reserve(self.config.preallocated_tracks - self.tracks.len()); + } + + if let Some(conn) = &self.conn_active { + conn_failure |= conn + .udp_rx + .send(UdpRxMessage::SetConfig(new_config)) + .is_err(); + } + + Ok(()) + }, + RebuildEncoder => match new_encoder(self.bitrate) { + Ok(encoder) => { + self.encoder = encoder; + Ok(()) + }, + Err(e) => { + error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e); + self.bitrate = DEFAULT_BITRATE; + self.encoder = new_encoder(self.bitrate) + .expect("Failed fallback rebuild of OpusEncoder with safe inputs."); + Ok(()) + }, + }, + Ws(new_ws_handle) => { + self.ws = new_ws_handle; + Ok(()) + }, + Poison => { + should_exit = true; + Ok(()) + }, + }; + + if let Err(e) = error { + events_failure |= e.should_trigger_interconnect_rebuild(); + conn_failure |= e.should_trigger_connect(); + } + + (events_failure, conn_failure, should_exit) + } + #[inline] fn fire_event(&self, event: EventMessage) -> Result<()> { // As this task is responsible for noticing the potential death of an event context, @@ -360,17 +403,13 @@ impl Mixer { return; } + // FIXME: make choice of spin-sleep/imprecise sleep optional in next breaking. self.sleeper .sleep(self.deadline.saturating_duration_since(Instant::now())); self.deadline += TIMESTEP_LENGTH; } pub fn cycle(&mut self) -> Result<()> { - if self.conn_active.is_none() { - self.march_deadline(); - return Ok(()); - } - let mut mix_buffer = [0f32; STEREO_FRAME_SIZE]; // Walk over all the audio files, combining into one audio frame according diff --git a/src/manager.rs b/src/manager.rs index 7e091f7..c25297e 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -267,6 +267,9 @@ impl Songbird { /// associated voice channel, if connected. /// /// This will _not_ drop the handler, and will preserve it and its settings. + /// If you do not need to reuse event handlers, configuration, or active tracks + /// in the underlying driver *consider calling [`remove`]* to release tasks, + /// threads, and memory. /// /// This is a wrapper around [getting][`get`] a handler and calling /// [`leave`] on it. @@ -274,6 +277,7 @@ impl Songbird { /// [`Call`]: Call /// [`get`]: Songbird::get /// [`leave`]: Call::leave + /// [`remove`]: Songbird::remove #[inline] pub async fn leave>(&self, guild_id: G) -> JoinResult<()> { self._leave(guild_id.into()).await