Files
songbird/src/driver/tasks/mod.rs
Kyle Simpson 1fc3dc2259 Gateway: Add connection timeout, add Config to gateway. (#51)
This change fixes tasks hanging due to rare cases of messages being lost between full Discord reconnections by placing a configurable timeout on the `ConnectionInfo` responses. This is a companion fix to [serenity#1255](https://github.com/serenity-rs/serenity/pull/1255). To make this doable, `Config`s are now used by all versions of `Songbird`/`Call`, and relevant functions are  added to simplify setup with configuration. These are now non-exhaustive, correcting an earlier oversight. For future extensibility, this PR moves the return type of `join`/`join_gateway` into a custom future (no longer leaking flume's `RecvFut` type).

Additionally, this fixes the Makefile's feature sets for driver/gateway-only compilation.

This is a breaking change in:
* the return types of `join`/`join_gateway`
* moving `crate::driver::Config` -> `crate::Config`,
* `Config` and `JoinError` becoming `#[non_breaking]`.

This was tested via `cargo make ready`, and by testing `examples/serenity/voice_receive` with various timeout settings.
2021-07-01 11:30:01 +01:00

207 lines
7.5 KiB
Rust

#![allow(missing_docs)]
pub(crate) mod disposal;
pub mod error;
mod events;
pub mod message;
pub mod mixer;
pub(crate) mod udp_rx;
pub(crate) mod udp_tx;
pub(crate) mod ws;
use super::connection::{error::Error as ConnectionError, Connection};
use crate::{events::CoreContext, Config};
use flume::{Receiver, RecvError, Sender};
use message::*;
#[cfg(not(feature = "tokio-02-marker"))]
use tokio::{runtime::Handle, spawn};
#[cfg(feature = "tokio-02-marker")]
use tokio_compat::{runtime::Handle, spawn};
use tracing::{error, instrument, trace};
pub(crate) fn start(config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
spawn(async move {
trace!("Driver started.");
runner(config, rx, tx).await;
trace!("Driver finished.");
});
}
fn start_internals(core: Sender<CoreMessage>, config: Config) -> Interconnect {
let (evt_tx, evt_rx) = flume::unbounded();
let (mix_tx, mix_rx) = flume::unbounded();
let interconnect = Interconnect {
core,
events: evt_tx,
mixer: mix_tx,
};
let ic = interconnect.clone();
spawn(async move {
trace!("Event processor started.");
events::runner(ic, evt_rx).await;
trace!("Event processor finished.");
});
let ic = interconnect.clone();
let handle = Handle::current();
std::thread::spawn(move || {
trace!("Mixer started.");
mixer::runner(ic, mix_rx, handle, config);
trace!("Mixer finished.");
});
interconnect
}
#[instrument(skip(rx, tx))]
async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
let mut next_config: Option<Config> = None;
let mut connection = None;
let mut interconnect = start_internals(tx, config.clone());
loop {
match rx.recv_async().await {
Ok(CoreMessage::ConnectWithResult(info, tx)) => {
config = if let Some(new_config) = next_config.take() {
let _ = interconnect
.mixer
.send(MixerMessage::SetConfig(new_config.clone()));
new_config
} else {
config
};
connection = match Connection::new(info, &interconnect, &config).await {
Ok(connection) => {
// Other side may not be listening: this is fine.
let _ = tx.send(Ok(()));
let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverConnect));
Some(connection)
},
Err(why) => {
// See above.
let _ = tx.send(Err(why));
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnectFailed,
));
None
},
};
},
Ok(CoreMessage::Disconnect) => {
connection = None;
let _ = interconnect.mixer.send(MixerMessage::DropConn);
let _ = interconnect.mixer.send(MixerMessage::RebuildEncoder);
},
Ok(CoreMessage::SetTrack(s)) => {
let _ = interconnect.mixer.send(MixerMessage::SetTrack(s));
},
Ok(CoreMessage::AddTrack(s)) => {
let _ = interconnect.mixer.send(MixerMessage::AddTrack(s));
},
Ok(CoreMessage::SetBitrate(b)) => {
let _ = interconnect.mixer.send(MixerMessage::SetBitrate(b));
},
Ok(CoreMessage::SetConfig(mut new_config)) => {
next_config = Some(new_config.clone());
new_config.make_safe(&config, connection.is_some());
let _ = interconnect.mixer.send(MixerMessage::SetConfig(new_config));
},
Ok(CoreMessage::AddEvent(evt)) => {
let _ = interconnect.events.send(EventMessage::AddGlobalEvent(evt));
},
Ok(CoreMessage::RemoveGlobalEvents) => {
let _ = interconnect.events.send(EventMessage::RemoveGlobalEvents);
},
Ok(CoreMessage::Mute(m)) => {
let _ = interconnect.mixer.send(MixerMessage::SetMute(m));
},
Ok(CoreMessage::Reconnect) => {
if let Some(mut conn) = connection.take() {
// try once: if interconnect, try again.
// if still issue, full connect.
let info = conn.info.clone();
let full_connect = match conn.reconnect().await {
Ok(()) => {
connection = Some(conn);
false
},
Err(ConnectionError::InterconnectFailure(_)) => {
interconnect.restart_volatile_internals();
match conn.reconnect().await {
Ok(()) => {
connection = Some(conn);
false
},
_ => true,
}
},
_ => true,
};
if full_connect {
connection = Connection::new(info, &interconnect, &config)
.await
.map_err(|e| {
error!("Catastrophic connection failure. Stopping. {:?}", e);
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnectFailed,
));
e
})
.ok();
}
if connection.is_some() {
let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverReconnect));
}
}
},
Ok(CoreMessage::FullReconnect) =>
if let Some(conn) = connection.take() {
let info = conn.info.clone();
connection = Connection::new(info, &interconnect, &config)
.await
.map_err(|e| {
error!("Catastrophic connection failure. Stopping. {:?}", e);
let _ = interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnectFailed,
));
e
})
.ok();
if connection.is_some() {
let _ = interconnect
.events
.send(EventMessage::FireCoreEvent(CoreContext::DriverReconnect));
}
},
Ok(CoreMessage::RebuildInterconnect) => {
interconnect.restart_volatile_internals();
},
Err(RecvError::Disconnected) | Ok(CoreMessage::Poison) => {
break;
},
}
}
trace!("Main thread exited");
interconnect.poison_all();
}