Driver: Automate (re)connection logic (#81)

This PR adds several enhancements to Driver connection logic:
* Driver (re)connection attempts now have a default timeout of around 10s.
* The driver will now attempt to retry full connection attempts using a user-provided strategy: currently, this defaults to 5 attempts under an exponential backoff strategy.
* The driver will now fire `DriverDisconnect` events at the end of any session -- this unifies (re)connection failure events with session expiry as seen in #76, which should provide users with enough detail to know *which* voice channel to reconnect to. Users still need to be careful to read the session/channel IDs to ensure that they aren't overwriting another join.

This has been tested using `cargo make ready`, and by setting low timeouts to force failures in the voice receive example (with some additional error handlers).

Closes #68.
This commit is contained in:
Kyle Simpson
2021-06-23 17:11:14 +01:00
parent 8381f8c461
commit 210e3ae584
17 changed files with 672 additions and 90 deletions

View File

@@ -2,7 +2,7 @@
use crate::{
driver::{connection::error::Error, Bitrate, Config},
events::EventData,
events::{context_data::DisconnectReason, EventData},
tracks::Track,
ConnectionInfo,
};
@@ -12,6 +12,8 @@ use flume::Sender;
#[derive(Debug)]
pub enum CoreMessage {
ConnectWithResult(ConnectionInfo, Sender<Result<(), Error>>),
RetryConnect(usize),
SignalWsClosure(usize, ConnectionInfo, Option<DisconnectReason>),
Disconnect,
SetTrack(Option<Track>),
AddTrack(Track),

View File

@@ -9,18 +9,25 @@ pub(crate) mod udp_rx;
pub(crate) mod udp_tx;
pub(crate) mod ws;
use std::time::Duration;
use super::connection::{error::Error as ConnectionError, Connection};
use crate::{
events::{internal_data::InternalConnect, CoreContext},
events::{
context_data::{DisconnectKind, DisconnectReason},
internal_data::{InternalConnect, InternalDisconnect},
CoreContext,
},
Config,
ConnectionInfo,
};
use flume::{Receiver, RecvError, Sender};
use message::*;
#[cfg(not(feature = "tokio-02-marker"))]
use tokio::{runtime::Handle, spawn};
use tokio::{runtime::Handle, spawn, time::sleep as tsleep};
#[cfg(feature = "tokio-02-marker")]
use tokio_compat::{runtime::Handle, spawn};
use tracing::{error, instrument, trace};
use tokio_compat::{runtime::Handle, spawn, time::delay_for as tsleep};
use tracing::{debug, instrument, trace};
pub(crate) fn start(config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
spawn(async move {
@@ -61,8 +68,10 @@ fn start_internals(core: Sender<CoreMessage>, config: Config) -> Interconnect {
#[instrument(skip(rx, tx))]
async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
let mut next_config: Option<Config> = None;
let mut connection = None;
let mut connection: Option<Connection> = None;
let mut interconnect = start_internals(tx, config.clone());
let mut retrying = None;
let mut attempt_idx = 0;
loop {
match rx.recv_async().await {
@@ -76,36 +85,69 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
config
};
connection = match Connection::new(info, &interconnect, &config).await {
Ok(connection) => {
// Other side may not be listening: this is fine.
let _ = tx.send(Ok(()));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnect(InternalConnect {
server: connection.info.endpoint.clone(),
ssrc: connection.ssrc,
}),
));
Some(connection)
},
Err(why) => {
// See above.
let _ = tx.send(Err(why));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnectFailed,
));
None
},
};
if connection
.as_ref()
.map(|conn| conn.info != info)
.unwrap_or(true)
{
// Only *actually* reconnect if the conn info changed, or we don't have an
// active connection.
// This allows the gateway component to keep sending join requests independent
// of driver failures.
connection = ConnectionRetryData::connect(tx, info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
} else {
// No reconnection was attempted as there's a valid, identical connection;
// tell the outside listener that the operation was a success.
let _ = tx.send(Ok(()));
}
},
Ok(CoreMessage::RetryConnect(retry_idx)) => {
debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx);
if retry_idx == attempt_idx {
if let Some(progress) = retrying.take() {
connection = progress
.attempt(&mut retrying, &interconnect, &config)
.await;
}
}
},
Ok(CoreMessage::Disconnect) => {
connection = None;
let last_conn = connection.take();
let _ = interconnect.mixer.send(MixerMessage::DropConn);
let _ = interconnect.mixer.send(MixerMessage::RebuildEncoder);
if let Some(conn) = last_conn {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Runtime,
reason: None,
info: conn.info.clone(),
}),
));
}
},
Ok(CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason)) => {
// if idx is not a match, quash reason
// (i.e., prevent users from mistakenly trying to reconnect for an *old* dead conn).
// if it *is* a match, the conn needs to die!
// (as the WS channel has truly given up the ghost).
if ws_idx != attempt_idx {
reason = None;
} else {
connection = None;
let _ = interconnect.mixer.send(MixerMessage::DropConn);
let _ = interconnect.mixer.send(MixerMessage::RebuildEncoder);
}
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Runtime,
reason,
info: ws_info,
}),
));
},
Ok(CoreMessage::SetTrack(s)) => {
let _ = interconnect.mixer.send(MixerMessage::SetTrack(s));
@@ -138,7 +180,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
// if still issue, full connect.
let info = conn.info.clone();
let full_connect = match conn.reconnect().await {
let full_connect = match conn.reconnect(&config).await {
Ok(()) => {
connection = Some(conn);
false
@@ -146,7 +188,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
Err(ConnectionError::InterconnectFailure(_)) => {
interconnect.restart_volatile_internals();
match conn.reconnect().await {
match conn.reconnect(&config).await {
Ok(()) => {
connection = Some(conn);
false
@@ -158,22 +200,13 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
};
if full_connect {
connection = Connection::new(info, &interconnect, &config)
.await
.map_err(|e| {
error!("Catastrophic connection failure. Stopping. {:?}", e);
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnectFailed,
));
e
})
.ok();
}
if let Some(ref connection) = &connection {
connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
} else if let Some(ref connection) = &connection {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
server: connection.info.endpoint.clone(),
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
));
@@ -184,25 +217,9 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
if let Some(conn) = connection.take() {
let info = conn.info.clone();
connection = Connection::new(info, &interconnect, &config)
.await
.map_err(|e| {
error!("Catastrophic connection failure. Stopping. {:?}", e);
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnectFailed,
));
e
})
.ok();
if let Some(ref connection) = &connection {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
server: connection.info.endpoint.clone(),
ssrc: connection.ssrc,
}),
));
}
connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
},
Ok(CoreMessage::RebuildInterconnect) => {
interconnect.restart_volatile_internals();
@@ -216,3 +233,138 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
trace!("Main thread exited");
interconnect.poison_all();
}
struct ConnectionRetryData {
flavour: ConnectionFlavour,
attempts: usize,
last_wait: Option<Duration>,
info: ConnectionInfo,
idx: usize,
}
impl ConnectionRetryData {
fn connect(
tx: Sender<Result<(), ConnectionError>>,
info: ConnectionInfo,
idx_src: &mut usize,
) -> Self {
Self::base(ConnectionFlavour::Connect(tx), info, idx_src)
}
fn reconnect(info: ConnectionInfo, idx_src: &mut usize) -> Self {
Self::base(ConnectionFlavour::Reconnect, info, idx_src)
}
fn base(flavour: ConnectionFlavour, info: ConnectionInfo, idx_src: &mut usize) -> Self {
*idx_src = idx_src.wrapping_add(1);
Self {
flavour,
attempts: 0,
last_wait: None,
info,
idx: *idx_src,
}
}
async fn attempt(
mut self,
attempt_slot: &mut Option<Self>,
interconnect: &Interconnect,
config: &Config,
) -> Option<Connection> {
match Connection::new(self.info.clone(), interconnect, config, self.idx).await {
Ok(connection) => {
match self.flavour {
ConnectionFlavour::Connect(tx) => {
// Other side may not be listening: this is fine.
let _ = tx.send(Ok(()));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnect(InternalConnect {
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
));
},
ConnectionFlavour::Reconnect => {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
));
},
}
Some(connection)
},
Err(why) => {
debug!("Failed to connect for {:?}: {}", self.info.guild_id, why);
if let Some(t) = config.driver_retry.retry_in(self.last_wait, self.attempts) {
let remote_ic = interconnect.clone();
let idx = self.idx;
spawn(async move {
tsleep(t).await;
let _ = remote_ic.core.send(CoreMessage::RetryConnect(idx));
});
self.attempts += 1;
self.last_wait = Some(t);
debug!(
"Retrying connection for {:?} in {}s ({}/{:?})",
self.info.guild_id,
t.as_secs_f32(),
self.attempts,
config.driver_retry.retry_limit
);
*attempt_slot = Some(self);
} else {
let reason = Some(DisconnectReason::from(&why));
match self.flavour {
ConnectionFlavour::Connect(tx) => {
// See above.
let _ = tx.send(Err(why));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnectFailed,
));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Connect,
reason,
info: self.info,
}),
));
},
ConnectionFlavour::Reconnect => {
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnectFailed,
));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Reconnect,
reason,
info: self.info,
}),
));
},
}
}
None
},
}
}
}
enum ConnectionFlavour {
Connect(Sender<Result<(), ConnectionError>>),
Reconnect,
}

View File

@@ -9,6 +9,7 @@ use crate::{
SpeakingState,
},
ws::{Error as WsError, ReceiverExt, SenderExt, WsStream},
ConnectionInfo,
};
#[cfg(not(feature = "tokio-02-marker"))]
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
@@ -39,6 +40,9 @@ struct AuxNetwork {
speaking: SpeakingState,
last_heartbeat_nonce: Option<u64>,
attempt_idx: usize,
info: ConnectionInfo,
}
impl AuxNetwork {
@@ -47,6 +51,8 @@ impl AuxNetwork {
ws_client: WsStream,
ssrc: u32,
heartbeat_interval: f64,
attempt_idx: usize,
info: ConnectionInfo,
) -> Self {
Self {
rx: evt_rx,
@@ -58,6 +64,9 @@ impl AuxNetwork {
speaking: SpeakingState::empty(),
last_heartbeat_nonce: None,
attempt_idx,
info,
}
}
@@ -68,6 +77,7 @@ impl AuxNetwork {
loop {
let mut ws_error = false;
let mut should_reconnect = false;
let mut ws_reason = None;
let hb = sleep_until(next_heartbeat);
@@ -75,7 +85,8 @@ impl AuxNetwork {
_ = hb => {
ws_error = match self.send_heartbeat().await {
Err(e) => {
should_reconnect = ws_error_is_not_final(e);
should_reconnect = ws_error_is_not_final(&e);
ws_reason = Some((&e).into());
true
},
_ => false,
@@ -89,7 +100,8 @@ impl AuxNetwork {
false
},
Err(e) => {
should_reconnect = ws_error_is_not_final(e);
should_reconnect = ws_error_is_not_final(&e);
ws_reason = Some((&e).into());
true
},
Ok(Some(msg)) => {
@@ -129,7 +141,8 @@ impl AuxNetwork {
ws_error |= match ssu_status {
Err(e) => {
should_reconnect = ws_error_is_not_final(e);
should_reconnect = ws_error_is_not_final(&e);
ws_reason = Some((&e).into());
true
},
_ => false,
@@ -149,6 +162,11 @@ impl AuxNetwork {
if should_reconnect {
let _ = interconnect.core.send(CoreMessage::Reconnect);
} else {
let _ = interconnect.core.send(CoreMessage::SignalWsClosure(
self.attempt_idx,
self.info.clone(),
ws_reason,
));
break;
}
}
@@ -217,15 +235,24 @@ pub(crate) async fn runner(
ws_client: WsStream,
ssrc: u32,
heartbeat_interval: f64,
attempt_idx: usize,
info: ConnectionInfo,
) {
trace!("WS thread started.");
let mut aux = AuxNetwork::new(evt_rx, ws_client, ssrc, heartbeat_interval);
let mut aux = AuxNetwork::new(
evt_rx,
ws_client,
ssrc,
heartbeat_interval,
attempt_idx,
info,
);
aux.run(&mut interconnect).await;
trace!("WS thread finished.");
}
fn ws_error_is_not_final(err: WsError) -> bool {
fn ws_error_is_not_final(err: &WsError) -> bool {
match err {
WsError::WsClosed(Some(frame)) => match frame.code {
CloseCode::Library(l) =>