Handle Voice close codes, prevent Songbird spinning WS threads (#1068)
Voice `CloseCode`s now map to a type rather than a collection of constants. Correct close code handling in this way terminates the websocket task when there is no likelihood of resuming, which was causing leftover tasks to spin at the `tokio::select` in some circumstances (i.e., ::leave, which keeps the `Driver` alive).
This commit is contained in:
@@ -1,13 +1,16 @@
|
|||||||
use super::{error::Result, message::*};
|
use super::message::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
events::CoreContext,
|
events::CoreContext,
|
||||||
model::{
|
model::{
|
||||||
payload::{Heartbeat, Speaking},
|
payload::{Heartbeat, Speaking},
|
||||||
|
CloseCode as VoiceCloseCode,
|
||||||
Event as GatewayEvent,
|
Event as GatewayEvent,
|
||||||
|
FromPrimitive,
|
||||||
SpeakingState,
|
SpeakingState,
|
||||||
},
|
},
|
||||||
ws::{Error as WsError, ReceiverExt, SenderExt, WsStream},
|
ws::{Error as WsError, ReceiverExt, SenderExt, WsStream},
|
||||||
};
|
};
|
||||||
|
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
||||||
use flume::Receiver;
|
use flume::Receiver;
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -52,6 +55,7 @@ impl AuxNetwork {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut ws_error = false;
|
let mut ws_error = false;
|
||||||
|
let mut should_reconnect = false;
|
||||||
|
|
||||||
let hb = time::delay_until(next_heartbeat);
|
let hb = time::delay_until(next_heartbeat);
|
||||||
|
|
||||||
@@ -59,7 +63,7 @@ impl AuxNetwork {
|
|||||||
_ = hb => {
|
_ = hb => {
|
||||||
ws_error = match self.send_heartbeat().await {
|
ws_error = match self.send_heartbeat().await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Heartbeat send failure {:?}.", e);
|
should_reconnect = ws_error_is_not_final(e);
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
_ => false,
|
_ => false,
|
||||||
@@ -73,7 +77,7 @@ impl AuxNetwork {
|
|||||||
false
|
false
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error processing ws {:?}.", e);
|
should_reconnect = ws_error_is_not_final(e);
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
Ok(Some(msg)) => {
|
Ok(Some(msg)) => {
|
||||||
@@ -113,7 +117,7 @@ impl AuxNetwork {
|
|||||||
|
|
||||||
ws_error |= match ssu_status {
|
ws_error |= match ssu_status {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Issue sending speaking update {:?}.", e);
|
should_reconnect = ws_error_is_not_final(e);
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
_ => false,
|
_ => false,
|
||||||
@@ -128,8 +132,13 @@ impl AuxNetwork {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ws_error {
|
if ws_error {
|
||||||
let _ = interconnect.core.send(CoreMessage::Reconnect);
|
|
||||||
self.dont_send = true;
|
self.dont_send = true;
|
||||||
|
|
||||||
|
if should_reconnect {
|
||||||
|
let _ = interconnect.core.send(CoreMessage::Reconnect);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -138,7 +147,7 @@ impl AuxNetwork {
|
|||||||
Instant::now() + self.heartbeat_interval
|
Instant::now() + self.heartbeat_interval
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_heartbeat(&mut self) -> Result<()> {
|
async fn send_heartbeat(&mut self) -> Result<(), WsError> {
|
||||||
let nonce = random::<u64>();
|
let nonce = random::<u64>();
|
||||||
self.last_heartbeat_nonce = Some(nonce);
|
self.last_heartbeat_nonce = Some(nonce);
|
||||||
|
|
||||||
@@ -203,3 +212,21 @@ pub(crate) async fn runner(
|
|||||||
aux.run(&mut interconnect).await;
|
aux.run(&mut interconnect).await;
|
||||||
info!("WS thread finished.");
|
info!("WS thread finished.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn ws_error_is_not_final(err: WsError) -> bool {
|
||||||
|
match err {
|
||||||
|
WsError::WsClosed(Some(frame)) => match frame.code {
|
||||||
|
CloseCode::Library(l) =>
|
||||||
|
if let Some(code) = VoiceCloseCode::from_u16(l) {
|
||||||
|
code.should_resume()
|
||||||
|
} else {
|
||||||
|
true
|
||||||
|
},
|
||||||
|
_ => true,
|
||||||
|
},
|
||||||
|
e => {
|
||||||
|
error!("Error sending/receiving ws {:?}.", e);
|
||||||
|
true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user