Gateway: Generic Shard and Twilight v0.8 Support (#109)

This PR adds support for twilight v0.8, mainly adapting to significant API changes introduced by v0.7. As a result of these, twilight no longer accepts arbitrary JSON input, so it seemed sensible to adapt our `Shard` design to no longer require the same.

Adding to this, I've added in a trait to allow an arbitrary `Shard` to be installed, given only an implementation of a method to send a `VoiceStateUpdate`. Together, `Sharder::Generic` (songbird::shards::VoiceUpdate) and `Shard::Generic` (songbird::shards::GenericSharder) should allow any library to be hooked in to Songbird.

This PR was tested using `cargo make ready` and by manually testing `examples/twilight`.
This commit is contained in:
Kyle Simpson
2022-01-13 12:17:41 +00:00
parent 12c76a9046
commit b4ce84546b
9 changed files with 187 additions and 54 deletions

View File

@@ -7,7 +7,7 @@ use serenity::gateway::InterMessage;
#[cfg(feature = "gateway-core")]
use std::{error::Error, fmt};
#[cfg(feature = "twilight")]
use twilight_gateway::shard::CommandError;
use twilight_gateway::{cluster::ClusterCommandError, shard::CommandError};
#[cfg(feature = "gateway-core")]
#[derive(Debug)]
@@ -36,6 +36,10 @@ pub enum JoinError {
///
/// [the `Call`'s configuration]: crate::Config
TimedOut,
/// The given guild ID was zero.
IllegalGuild,
/// The given channel ID was zero.
IllegalChannel,
#[cfg(feature = "driver-core")]
/// The driver failed to establish a voice connection.
///
@@ -46,8 +50,11 @@ pub enum JoinError {
/// Serenity-specific WebSocket send error.
Serenity(TrySendError<InterMessage>),
#[cfg(feature = "twilight")]
/// Twilight-specific WebSocket send error.
Twilight(CommandError),
/// Twilight-specific WebSocket send error returned when using a shard cluster.
TwilightCluster(ClusterCommandError),
#[cfg(feature = "twilight")]
/// Twilight-specific WebSocket send error when explicitly using a single shard.
TwilightShard(CommandError),
}
#[cfg(feature = "gateway-core")]
@@ -84,12 +91,16 @@ impl fmt::Display for JoinError {
JoinError::NoSender => write!(f, "no gateway destination"),
JoinError::NoCall => write!(f, "tried to leave a non-existent call"),
JoinError::TimedOut => write!(f, "gateway response from Discord timed out"),
JoinError::IllegalGuild => write!(f, "target guild ID was zero"),
JoinError::IllegalChannel => write!(f, "target channel ID was zero"),
#[cfg(feature = "driver-core")]
JoinError::Driver(_) => write!(f, "establishing connection failed"),
#[cfg(feature = "serenity")]
JoinError::Serenity(e) => e.fmt(f),
#[cfg(feature = "twilight")]
JoinError::Twilight(e) => e.fmt(f),
JoinError::TwilightCluster(e) => e.fmt(f),
#[cfg(feature = "twilight")]
JoinError::TwilightShard(e) => e.fmt(f),
}
}
}
@@ -102,12 +113,16 @@ impl Error for JoinError {
JoinError::NoSender => None,
JoinError::NoCall => None,
JoinError::TimedOut => None,
JoinError::IllegalGuild => None,
JoinError::IllegalChannel => None,
#[cfg(feature = "driver-core")]
JoinError::Driver(e) => Some(e),
#[cfg(feature = "serenity")]
JoinError::Serenity(e) => e.source(),
#[cfg(feature = "twilight")]
JoinError::Twilight(e) => e.source(),
JoinError::TwilightCluster(e) => e.source(),
#[cfg(feature = "twilight")]
JoinError::TwilightShard(e) => e.source(),
}
}
}
@@ -122,7 +137,14 @@ impl From<TrySendError<InterMessage>> for JoinError {
#[cfg(all(feature = "twilight", feature = "gateway-core"))]
impl From<CommandError> for JoinError {
fn from(e: CommandError) -> Self {
JoinError::Twilight(e)
JoinError::TwilightShard(e)
}
}
#[cfg(all(feature = "twilight", feature = "gateway-core"))]
impl From<ClusterCommandError> for JoinError {
fn from(e: ClusterCommandError) -> Self {
JoinError::TwilightCluster(e)
}
}

View File

@@ -5,11 +5,10 @@ use crate::{
id::{ChannelId, GuildId, UserId},
info::{ConnectionInfo, ConnectionProgress},
join::*,
shards::Shard,
shards::{Shard, VoiceUpdate},
Config,
};
use flume::Sender;
use serde_json::json;
use std::fmt::Debug;
use tracing::instrument;
@@ -448,17 +447,13 @@ impl Call {
#[instrument(skip(self))]
async fn update(&mut self) -> JoinResult<()> {
if let Some(ws) = self.ws.as_mut() {
let map = json!({
"op": 4,
"d": {
"channel_id": self.connection.as_ref().map(|c| c.0.channel_id().0),
"guild_id": self.guild_id.0,
"self_deaf": self.self_deaf,
"self_mute": self.self_mute,
}
});
ws.send(map).await
ws.update_voice_state(
self.guild_id,
self.connection.as_ref().map(|c| c.0.channel_id()),
self.self_deaf,
self.self_mute,
)
.await
} else {
Err(JoinError::NoSender)
}

View File

@@ -50,7 +50,7 @@ impl From<SerenityChannel> for ChannelId {
#[cfg(feature = "twilight")]
impl From<TwilightChannel> for ChannelId {
fn from(id: TwilightChannel) -> Self {
Self(id.0)
Self(id.0.into())
}
}
@@ -83,7 +83,7 @@ impl From<GuildId> for DriverGuild {
#[cfg(feature = "twilight")]
impl From<TwilightGuild> for GuildId {
fn from(id: TwilightGuild) -> Self {
Self(id.0)
Self(id.0.into())
}
}
@@ -116,6 +116,6 @@ impl From<UserId> for DriverUser {
#[cfg(feature = "twilight")]
impl From<TwilightUser> for UserId {
fn from(id: TwilightUser) -> Self {
Self(id.0)
Self(id.0.into())
}
}

View File

@@ -65,6 +65,7 @@ async fn _dca(path: &OsStr) -> Result<Input, DcaError> {
))
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct DcaMetadata {
pub(crate) dca: Dca,
@@ -74,12 +75,14 @@ pub(crate) struct DcaMetadata {
pub(crate) extra: Option<serde_json::Value>,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct Dca {
pub(crate) version: u64,
pub(crate) tool: Tool,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct Tool {
pub(crate) name: String,
@@ -88,6 +91,7 @@ pub(crate) struct Tool {
pub(crate) author: String,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct Opus {
pub(crate) mode: String,
@@ -98,6 +102,7 @@ pub(crate) struct Opus {
pub(crate) channels: u8,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct Info {
pub(crate) title: Option<String>,
@@ -107,6 +112,7 @@ pub(crate) struct Info {
pub(crate) cover: Option<String>,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize)]
pub(crate) struct Origin {
pub(crate) source: Option<String>,

View File

@@ -87,7 +87,7 @@ impl Songbird {
/// [`process`].
///
/// [`process`]: Songbird::process
pub fn twilight<U>(cluster: Cluster, user_id: U) -> Self
pub fn twilight<U>(cluster: Arc<Cluster>, user_id: U) -> Self
where
U: Into<UserId>,
{
@@ -102,7 +102,7 @@ impl Songbird {
/// [`process`].
///
/// [`process`]: Songbird::process
pub fn twilight_from_config<U>(cluster: Cluster, user_id: U, config: Config) -> Self
pub fn twilight_from_config<U>(cluster: Arc<Cluster>, user_id: U, config: Config) -> Self
where
U: Into<UserId>,
{
@@ -117,7 +117,7 @@ impl Songbird {
user_id: user_id.into(),
}),
calls: Default::default(),
sharder: Sharder::Twilight(cluster),
sharder: Sharder::TwilightCluster(cluster),
config: Some(config).into(),
}
}
@@ -378,7 +378,7 @@ impl Songbird {
}
},
TwilightEvent::VoiceStateUpdate(v) => {
if v.0.user_id.0 != self.client_data.read().user_id.0 {
if v.0.user_id.0.get() != self.client_data.read().user_id.0 {
return;
}

View File

@@ -1,20 +1,32 @@
//! Handlers for sending packets over sharded connections.
use crate::error::{JoinError, JoinResult};
use crate::{
error::{JoinError, JoinResult},
id::*,
};
use async_trait::async_trait;
use derivative::Derivative;
#[cfg(feature = "serenity")]
use futures::channel::mpsc::{TrySendError, UnboundedSender as Sender};
#[cfg(feature = "serenity")]
use parking_lot::{lock_api::RwLockWriteGuard, Mutex as PMutex, RwLock as PRwLock};
use serde_json::Value;
use serde_json::json;
#[cfg(feature = "serenity")]
use serenity::gateway::InterMessage;
#[cfg(feature = "serenity")]
use std::{collections::HashMap, result::Result as StdResult, sync::Arc};
use std::{collections::HashMap, result::Result as StdResult};
use std::{num::NonZeroU64, sync::Arc};
use tracing::{debug, error};
#[cfg(feature = "twilight")]
use twilight_gateway::{Cluster, Shard as TwilightShard};
#[cfg(feature = "twilight")]
use twilight_model::{
gateway::payload::outgoing::update_voice_state::UpdateVoiceState as TwilightVoiceState,
id::ChannelId as TwilightChannel,
};
#[derive(Debug)]
#[derive(Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
/// Source of individual shard connection handles.
pub enum Sharder {
@@ -23,19 +35,35 @@ pub enum Sharder {
Serenity(SerenitySharder),
#[cfg(feature = "twilight")]
/// Twilight-specific wrapper for sharder state initialised by the user.
Twilight(Cluster),
TwilightCluster(Arc<Cluster>),
#[cfg(feature = "twilight")]
/// Twilight-specific wrapper for a single shard initialised by the user.
TwilightShard(Arc<TwilightShard>),
/// A generic shard handle source.
Generic(#[derivative(Debug = "ignore")] Arc<dyn GenericSharder + Send + Sync>),
}
/// Trait for a generic shard cluster or other handle source.
///
/// This allows any Discord library to be integrated with Songbird, and offers a source
/// of generic shard handles.
#[async_trait]
pub trait GenericSharder {
/// Get access to a new shard
fn get_shard(&self, shard_id: u64) -> Option<Arc<dyn VoiceUpdate + Send + Sync>>;
}
impl Sharder {
#[allow(unreachable_patterns)]
/// Returns a new handle to the required inner shard.
pub fn get_shard(&self, shard_id: u64) -> Option<Shard> {
match self {
#[cfg(feature = "serenity")]
Sharder::Serenity(s) => Some(Shard::Serenity(s.get_or_insert_shard_handle(shard_id))),
#[cfg(feature = "twilight")]
Sharder::Twilight(t) => t.shard(shard_id).map(Shard::Twilight),
_ => None,
Sharder::TwilightCluster(t) => Some(Shard::TwilightCluster(t.clone(), shard_id)),
#[cfg(feature = "twilight")]
Sharder::TwilightShard(t) => Some(Shard::TwilightShard(t.clone())),
Sharder::Generic(src) => src.get_shard(shard_id).map(Shard::Generic),
}
}
}
@@ -95,7 +123,8 @@ impl SerenitySharder {
}
}
#[derive(Clone, Debug)]
#[derive(Derivative)]
#[derivative(Debug)]
#[non_exhaustive]
/// A reference to an individual websocket connection.
pub enum Shard {
@@ -104,24 +133,106 @@ pub enum Shard {
Serenity(Arc<SerenityShardHandle>),
#[cfg(feature = "twilight")]
/// Handle to a twilight shard spawned from a cluster.
Twilight(TwilightShard),
TwilightCluster(Arc<Cluster>, u64),
#[cfg(feature = "twilight")]
/// Handle to a twilight shard spawned from a cluster.
TwilightShard(Arc<TwilightShard>),
/// Handle to a generic shard instance.
Generic(#[derivative(Debug = "ignore")] Arc<dyn VoiceUpdate + Send + Sync>),
}
impl Shard {
#[allow(unreachable_patterns)]
/// Send a JSON message to the inner shard handle.
pub async fn send(&mut self, msg: Value) -> JoinResult<()> {
impl Clone for Shard {
fn clone(&self) -> Self {
use Shard::*;
match self {
#[cfg(feature = "serenity")]
Shard::Serenity(s) => s.send(InterMessage::Json(msg))?,
Serenity(handle) => Serenity(Arc::clone(handle)),
#[cfg(feature = "twilight")]
Shard::Twilight(t) => t.command(&msg).await?,
_ => return Err(JoinError::NoSender),
TwilightCluster(handle, id) => TwilightCluster(Arc::clone(handle), *id),
#[cfg(feature = "twilight")]
TwilightShard(handle) => TwilightShard(Arc::clone(handle)),
Generic(handle) => Generic(Arc::clone(handle)),
}
Ok(())
}
}
#[async_trait]
impl VoiceUpdate for Shard {
async fn update_voice_state(
&self,
guild_id: GuildId,
channel_id: Option<ChannelId>,
self_deaf: bool,
self_mute: bool,
) -> JoinResult<()> {
let nz_guild_id = NonZeroU64::new(guild_id.0).ok_or(JoinError::IllegalGuild)?;
let nz_channel_id = match channel_id {
Some(c) => Some(NonZeroU64::new(c.0).ok_or(JoinError::IllegalChannel)?),
None => None,
};
match self {
#[cfg(feature = "serenity")]
Shard::Serenity(handle) => {
let map = json!({
"op": 4,
"d": {
"channel_id": channel_id.map(|c| c.0),
"guild_id": guild_id.0,
"self_deaf": self_deaf,
"self_mute": self_mute,
}
});
handle.send(InterMessage::Json(map))?;
Ok(())
},
#[cfg(feature = "twilight")]
Shard::TwilightCluster(handle, shard_id) => {
let channel_id = nz_channel_id.map(TwilightChannel);
let cmd = TwilightVoiceState::new(nz_guild_id, channel_id, self_deaf, self_mute);
handle.command(*shard_id, &cmd).await?;
Ok(())
},
#[cfg(feature = "twilight")]
Shard::TwilightShard(handle) => {
let channel_id = nz_channel_id.map(TwilightChannel);
let cmd = TwilightVoiceState::new(nz_guild_id, channel_id, self_deaf, self_mute);
handle.command(&cmd).await?;
Ok(())
},
Shard::Generic(g) =>
g.update_voice_state(guild_id, channel_id, self_deaf, self_mute)
.await,
}
}
}
/// Trait for a generic shard handle to send voice state updates to Discord.
///
/// This allows any Discord library to be integrated with Songbird, and is intended to
/// wrap a message channel to a single shard. Songbird only needs to send `VoiceStateUpdate`s
/// to Discord to function.
///
/// Generic libraries must be sure to call [`Call::update_server`] and [`Call::update_state`]
/// in response to their own received messages.
///
/// [`Call::update_server`]: crate::Call::update_server
/// [`Call::update_state`]: crate::Call::update_state
#[async_trait]
pub trait VoiceUpdate {
/// Send a voice update message to the inner shard handle.
async fn update_voice_state(
&self,
guild_id: GuildId,
channel_id: Option<ChannelId>,
self_deaf: bool,
self_mute: bool,
) -> JoinResult<()>;
}
#[cfg(feature = "serenity")]
/// Handle to an individual shard designed to buffer unsent messages while
/// a reconnect/rebalance is ongoing.