diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index 77e11fb..22f6275 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -109,129 +109,52 @@ impl Mixer { let mut conn_failure = false; 'runner: loop { - loop { - use MixerMessage::*; + if self.conn_active.is_some() { + loop { + use MixerMessage::*; - let error = match self.mix_rx.try_recv() { - Ok(AddTrack(mut t)) => { - t.source.prep_with_handle(self.async_handle.clone()); - self.add_track(t) - }, - Ok(SetTrack(t)) => { - self.tracks.clear(); + match self.mix_rx.try_recv() { + Ok(m) => { + let (events, conn, should_exit) = self.handle_message(m); + events_failure |= events; + conn_failure |= conn; - let mut out = self.fire_event(EventMessage::RemoveAllTracks); - - if let Some(mut t) = t { - t.source.prep_with_handle(self.async_handle.clone()); - - // Do this unconditionally: this affects local state infallibly, - // with the event installation being the remote part. - if let Err(e) = self.add_track(t) { - out = Err(e); + if should_exit { + break 'runner; } - } - - out - }, - Ok(SetBitrate(b)) => { - self.bitrate = b; - if let Err(e) = self.set_bitrate(b) { - error!("Failed to update bitrate {:?}", e); - } - Ok(()) - }, - Ok(SetMute(m)) => { - self.muted = m; - Ok(()) - }, - Ok(SetConn(conn, ssrc)) => { - self.conn_active = Some(conn); - let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( - "Too few bytes in self.packet for RTP header.\ - (Blame: VOICE_PACKET_MAX?)", - ); - rtp.set_ssrc(ssrc); - rtp.set_sequence(random::().into()); - rtp.set_timestamp(random::().into()); - self.deadline = Instant::now(); - Ok(()) - }, - Ok(DropConn) => { - self.conn_active = None; - Ok(()) - }, - Ok(ReplaceInterconnect(i)) => { - self.prevent_events = false; - if let Some(ws) = &self.ws { - conn_failure |= - ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); - } - if let Some(conn) = &self.conn_active { - conn_failure |= conn - .udp_rx - .send(UdpRxMessage::ReplaceInterconnect(i.clone())) - .is_err(); - } - self.interconnect = i; - - self.rebuild_tracks() - }, - Ok(SetConfig(new_config)) => { - self.config = new_config.clone(); - - if self.tracks.capacity() < self.config.preallocated_tracks { - self.tracks - .reserve(self.config.preallocated_tracks - self.tracks.len()); - } - - if let Some(conn) = &self.conn_active { - conn_failure |= conn - .udp_rx - .send(UdpRxMessage::SetConfig(new_config)) - .is_err(); - } - - Ok(()) - }, - Ok(RebuildEncoder) => match new_encoder(self.bitrate) { - Ok(encoder) => { - self.encoder = encoder; - Ok(()) }, - Err(e) => { - error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e); - self.bitrate = DEFAULT_BITRATE; - self.encoder = new_encoder(self.bitrate) - .expect("Failed fallback rebuild of OpusEncoder with safe inputs."); - Ok(()) + + Err(TryRecvError::Disconnected) => { + break 'runner; }, - }, - Ok(Ws(new_ws_handle)) => { - self.ws = new_ws_handle; - Ok(()) - }, - Err(TryRecvError::Disconnected) | Ok(Poison) => { - break 'runner; - }, + Err(TryRecvError::Empty) => { + break; + }, + }; + } - Err(TryRecvError::Empty) => { - break; - }, - }; - - if let Err(e) = error { + if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { events_failure |= e.should_trigger_interconnect_rebuild(); conn_failure |= e.should_trigger_connect(); + + error!("Mixer thread cycle: {:?}", e); } - } + } else { + match self.mix_rx.recv() { + Ok(m) => { + let (events, conn, should_exit) = self.handle_message(m); + events_failure |= events; + conn_failure |= conn; - if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { - events_failure |= e.should_trigger_interconnect_rebuild(); - conn_failure |= e.should_trigger_connect(); - - error!("Mixer thread cycle: {:?}", e); + if should_exit { + break 'runner; + } + }, + Err(_) => { + break 'runner; + }, + } } // event failure? rebuild interconnect. @@ -266,6 +189,126 @@ impl Mixer { } } + #[inline] + fn handle_message(&mut self, msg: MixerMessage) -> (bool, bool, bool) { + let mut events_failure = false; + let mut conn_failure = false; + let mut should_exit = false; + + use MixerMessage::*; + + let error = match msg { + AddTrack(mut t) => { + t.source.prep_with_handle(self.async_handle.clone()); + self.add_track(t) + }, + SetTrack(t) => { + self.tracks.clear(); + + let mut out = self.fire_event(EventMessage::RemoveAllTracks); + + if let Some(mut t) = t { + t.source.prep_with_handle(self.async_handle.clone()); + + // Do this unconditionally: this affects local state infallibly, + // with the event installation being the remote part. + if let Err(e) = self.add_track(t) { + out = Err(e); + } + } + + out + }, + SetBitrate(b) => { + self.bitrate = b; + if let Err(e) = self.set_bitrate(b) { + error!("Failed to update bitrate {:?}", e); + } + Ok(()) + }, + SetMute(m) => { + self.muted = m; + Ok(()) + }, + SetConn(conn, ssrc) => { + self.conn_active = Some(conn); + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + rtp.set_ssrc(ssrc); + rtp.set_sequence(random::().into()); + rtp.set_timestamp(random::().into()); + self.deadline = Instant::now(); + Ok(()) + }, + DropConn => { + self.conn_active = None; + Ok(()) + }, + ReplaceInterconnect(i) => { + self.prevent_events = false; + if let Some(ws) = &self.ws { + conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); + } + if let Some(conn) = &self.conn_active { + conn_failure |= conn + .udp_rx + .send(UdpRxMessage::ReplaceInterconnect(i.clone())) + .is_err(); + } + self.interconnect = i; + + self.rebuild_tracks() + }, + SetConfig(new_config) => { + self.config = new_config.clone(); + + if self.tracks.capacity() < self.config.preallocated_tracks { + self.tracks + .reserve(self.config.preallocated_tracks - self.tracks.len()); + } + + if let Some(conn) = &self.conn_active { + conn_failure |= conn + .udp_rx + .send(UdpRxMessage::SetConfig(new_config)) + .is_err(); + } + + Ok(()) + }, + RebuildEncoder => match new_encoder(self.bitrate) { + Ok(encoder) => { + self.encoder = encoder; + Ok(()) + }, + Err(e) => { + error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e); + self.bitrate = DEFAULT_BITRATE; + self.encoder = new_encoder(self.bitrate) + .expect("Failed fallback rebuild of OpusEncoder with safe inputs."); + Ok(()) + }, + }, + Ws(new_ws_handle) => { + self.ws = new_ws_handle; + Ok(()) + }, + Poison => { + should_exit = true; + Ok(()) + }, + }; + + if let Err(e) = error { + events_failure |= e.should_trigger_interconnect_rebuild(); + conn_failure |= e.should_trigger_connect(); + } + + (events_failure, conn_failure, should_exit) + } + #[inline] fn fire_event(&self, event: EventMessage) -> Result<()> { // As this task is responsible for noticing the potential death of an event context, @@ -360,17 +403,13 @@ impl Mixer { return; } + // FIXME: make choice of spin-sleep/imprecise sleep optional in next breaking. self.sleeper .sleep(self.deadline.saturating_duration_since(Instant::now())); self.deadline += TIMESTEP_LENGTH; } pub fn cycle(&mut self) -> Result<()> { - if self.conn_active.is_none() { - self.march_deadline(); - return Ok(()); - } - let mut mix_buffer = [0f32; STEREO_FRAME_SIZE]; // Walk over all the audio files, combining into one audio frame according diff --git a/src/manager.rs b/src/manager.rs index 7e091f7..c25297e 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -267,6 +267,9 @@ impl Songbird { /// associated voice channel, if connected. /// /// This will _not_ drop the handler, and will preserve it and its settings. + /// If you do not need to reuse event handlers, configuration, or active tracks + /// in the underlying driver *consider calling [`remove`]* to release tasks, + /// threads, and memory. /// /// This is a wrapper around [getting][`get`] a handler and calling /// [`leave`] on it. @@ -274,6 +277,7 @@ impl Songbird { /// [`Call`]: Call /// [`get`]: Songbird::get /// [`leave`]: Call::leave + /// [`remove`]: Songbird::remove #[inline] pub async fn leave>(&self, guild_id: G) -> JoinResult<()> { self._leave(guild_id.into()).await