Discord will send `GatewayEvent::Speaking` (opcode 5) messages after the Hello+Ready exchange, but will happily interleave them with crypto mode negotiation. We were previously not expecting such messages and dropping them -- this hurts receive-based bots' ability to map SSRCs to UserIds when joining a call with existing users. This PR feeds all unexpected messages into the WS task directly, which will handle them once all tasks are fully started.
330 lines
11 KiB
Rust
330 lines
11 KiB
Rust
//! Requires the "client", "standard_framework", and "voice" features be enabled
|
|
//! in your Cargo.toml, like so:
|
|
//!
|
|
//! ```toml
|
|
//! [dependencies.serenity]
|
|
//! git = "https://github.com/serenity-rs/serenity.git"
|
|
//! features = ["client", "standard_framework", "voice"]
|
|
//! ```
|
|
use std::{
|
|
env,
|
|
sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
},
|
|
};
|
|
|
|
use dashmap::DashMap;
|
|
|
|
use serenity::{
|
|
async_trait,
|
|
client::{Client, Context, EventHandler},
|
|
framework::{
|
|
standard::{
|
|
macros::{command, group},
|
|
Args,
|
|
CommandResult,
|
|
Configuration,
|
|
},
|
|
StandardFramework,
|
|
},
|
|
model::{channel::Message, gateway::Ready, id::ChannelId},
|
|
prelude::{GatewayIntents, Mentionable},
|
|
Result as SerenityResult,
|
|
};
|
|
|
|
use songbird::{
|
|
driver::DecodeMode,
|
|
model::{
|
|
id::UserId,
|
|
payload::{ClientDisconnect, Speaking},
|
|
},
|
|
packet::Packet,
|
|
Config,
|
|
CoreEvent,
|
|
Event,
|
|
EventContext,
|
|
EventHandler as VoiceEventHandler,
|
|
SerenityInit,
|
|
};
|
|
|
|
struct Handler;
|
|
|
|
#[async_trait]
|
|
impl EventHandler for Handler {
|
|
async fn ready(&self, _: Context, ready: Ready) {
|
|
println!("{} is connected!", ready.user.name);
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct Receiver {
|
|
inner: Arc<InnerReceiver>,
|
|
}
|
|
|
|
struct InnerReceiver {
|
|
last_tick_was_empty: AtomicBool,
|
|
known_ssrcs: DashMap<u32, UserId>,
|
|
}
|
|
|
|
impl Receiver {
|
|
pub fn new() -> Self {
|
|
// You can manage state here, such as a buffer of audio packet bytes so
|
|
// you can later store them in intervals.
|
|
Self {
|
|
inner: Arc::new(InnerReceiver {
|
|
last_tick_was_empty: AtomicBool::default(),
|
|
known_ssrcs: DashMap::new(),
|
|
}),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl VoiceEventHandler for Receiver {
|
|
#[allow(unused_variables)]
|
|
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
|
|
use EventContext as Ctx;
|
|
match ctx {
|
|
Ctx::SpeakingStateUpdate(Speaking {
|
|
speaking,
|
|
ssrc,
|
|
user_id,
|
|
..
|
|
}) => {
|
|
// Discord voice calls use RTP, where every sender uses a randomly allocated
|
|
// *Synchronisation Source* (SSRC) to allow receivers to tell which audio
|
|
// stream a received packet belongs to. As this number is not derived from
|
|
// the sender's user_id, only Discord Voice Gateway messages like this one
|
|
// inform us about which random SSRC a user has been allocated. Future voice
|
|
// packets will contain *only* the SSRC.
|
|
//
|
|
// You can implement logic here so that you can differentiate users'
|
|
// SSRCs and map the SSRC to the User ID and maintain this state.
|
|
// Using this map, you can map the `ssrc` in `voice_packet`
|
|
// to the user ID and handle their audio packets separately.
|
|
println!(
|
|
"Speaking state update: user {:?} has SSRC {:?}, using {:?}",
|
|
user_id, ssrc, speaking,
|
|
);
|
|
|
|
if let Some(user) = user_id {
|
|
self.inner.known_ssrcs.insert(*ssrc, *user);
|
|
}
|
|
},
|
|
Ctx::VoiceTick(tick) => {
|
|
let speaking = tick.speaking.len();
|
|
let total_participants = speaking + tick.silent.len();
|
|
let last_tick_was_empty = self.inner.last_tick_was_empty.load(Ordering::SeqCst);
|
|
|
|
if speaking == 0 && !last_tick_was_empty {
|
|
println!("No speakers");
|
|
|
|
self.inner.last_tick_was_empty.store(true, Ordering::SeqCst);
|
|
} else if speaking != 0 {
|
|
self.inner
|
|
.last_tick_was_empty
|
|
.store(false, Ordering::SeqCst);
|
|
|
|
println!("Voice tick ({speaking}/{total_participants} live):");
|
|
|
|
// You can also examine tick.silent to see users who are present
|
|
// but haven't spoken in this tick.
|
|
for (ssrc, data) in &tick.speaking {
|
|
let user_id_str = if let Some(id) = self.inner.known_ssrcs.get(ssrc) {
|
|
format!("{:?}", *id)
|
|
} else {
|
|
"?".into()
|
|
};
|
|
|
|
// This field should *always* exist under DecodeMode::Decode.
|
|
// The `else` allows you to see how the other modes are affected.
|
|
if let Some(decoded_voice) = data.decoded_voice.as_ref() {
|
|
let voice_len = decoded_voice.len();
|
|
let audio_str = format!(
|
|
"first samples from {}: {:?}",
|
|
voice_len,
|
|
&decoded_voice[..voice_len.min(5)]
|
|
);
|
|
|
|
if let Some(packet) = &data.packet {
|
|
let rtp = packet.rtp();
|
|
println!(
|
|
"\t{ssrc}/{user_id_str}: packet seq {} ts {} -- {audio_str}",
|
|
rtp.get_sequence().0,
|
|
rtp.get_timestamp().0
|
|
);
|
|
} else {
|
|
println!("\t{ssrc}/{user_id_str}: Missed packet -- {audio_str}");
|
|
}
|
|
} else {
|
|
println!("\t{ssrc}/{user_id_str}: Decode disabled.");
|
|
}
|
|
}
|
|
}
|
|
},
|
|
Ctx::RtpPacket(packet) => {
|
|
// An event which fires for every received audio packet,
|
|
// containing the decoded data.
|
|
let rtp = packet.rtp();
|
|
println!(
|
|
"Received voice packet from SSRC {}, sequence {}, timestamp {} -- {}B long",
|
|
rtp.get_ssrc(),
|
|
rtp.get_sequence().0,
|
|
rtp.get_timestamp().0,
|
|
rtp.payload().len()
|
|
);
|
|
},
|
|
Ctx::RtcpPacket(data) => {
|
|
// An event which fires for every received rtcp packet,
|
|
// containing the call statistics and reporting information.
|
|
println!("RTCP packet received: {:?}", data.packet);
|
|
},
|
|
Ctx::ClientDisconnect(ClientDisconnect { user_id, .. }) => {
|
|
// You can implement your own logic here to handle a user who has left the
|
|
// voice channel e.g., finalise processing of statistics etc.
|
|
// You will typically need to map the User ID to their SSRC; observed when
|
|
// first speaking.
|
|
|
|
println!("Client disconnected: user {:?}", user_id);
|
|
},
|
|
_ => {
|
|
// We won't be registering this struct for any more event classes.
|
|
unimplemented!()
|
|
},
|
|
}
|
|
|
|
None
|
|
}
|
|
}
|
|
|
|
#[group]
|
|
#[commands(join, leave, ping)]
|
|
struct General;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
tracing_subscriber::fmt::init();
|
|
|
|
// Configure the client with your Discord bot token in the environment.
|
|
let token = env::var("DISCORD_TOKEN").expect("Expected a token in the environment");
|
|
|
|
let framework = StandardFramework::new().group(&GENERAL_GROUP);
|
|
framework.configure(Configuration::new().prefix("~"));
|
|
|
|
let intents = GatewayIntents::non_privileged() | GatewayIntents::MESSAGE_CONTENT;
|
|
|
|
// Here, we need to configure Songbird to decode all incoming voice packets.
|
|
// If you want, you can do this on a per-call basis---here, we need it to
|
|
// read the audio data that other people are sending us!
|
|
let songbird_config = Config::default().decode_mode(DecodeMode::Decode);
|
|
|
|
let mut client = Client::builder(&token, intents)
|
|
.event_handler(Handler)
|
|
.framework(framework)
|
|
.register_songbird_from_config(songbird_config)
|
|
.await
|
|
.expect("Err creating client");
|
|
|
|
let _ = client
|
|
.start()
|
|
.await
|
|
.map_err(|why| println!("Client ended: {:?}", why));
|
|
}
|
|
|
|
#[command]
|
|
#[only_in(guilds)]
|
|
async fn join(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult {
|
|
let Ok(connect_to) = args.single::<ChannelId>() else {
|
|
check_msg(
|
|
msg.reply(ctx, "Requires a valid voice channel ID be given")
|
|
.await,
|
|
);
|
|
|
|
return Ok(());
|
|
};
|
|
|
|
let guild_id = msg.guild_id.unwrap();
|
|
|
|
let manager = songbird::get(ctx)
|
|
.await
|
|
.expect("Songbird Voice client placed in at initialisation.")
|
|
.clone();
|
|
|
|
// Some events relating to voice receive fire *while joining*.
|
|
// We must make sure that any event handlers are installed before we attempt to join.
|
|
{
|
|
let handler_lock = manager.get_or_insert(guild_id);
|
|
let mut handler = handler_lock.lock().await;
|
|
|
|
let evt_receiver = Receiver::new();
|
|
|
|
handler.add_global_event(CoreEvent::SpeakingStateUpdate.into(), evt_receiver.clone());
|
|
handler.add_global_event(CoreEvent::RtpPacket.into(), evt_receiver.clone());
|
|
handler.add_global_event(CoreEvent::RtcpPacket.into(), evt_receiver.clone());
|
|
handler.add_global_event(CoreEvent::ClientDisconnect.into(), evt_receiver.clone());
|
|
handler.add_global_event(CoreEvent::VoiceTick.into(), evt_receiver);
|
|
}
|
|
|
|
if let Ok(handler_lock) = manager.join(guild_id, connect_to).await {
|
|
check_msg(
|
|
msg.channel_id
|
|
.say(&ctx.http, &format!("Joined {}", connect_to.mention()))
|
|
.await,
|
|
);
|
|
} else {
|
|
// Although we failed to join, we need to clear out existing event handlers on the call.
|
|
_ = manager.remove(guild_id).await;
|
|
|
|
check_msg(
|
|
msg.channel_id
|
|
.say(&ctx.http, "Error joining the channel")
|
|
.await,
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[command]
|
|
#[only_in(guilds)]
|
|
async fn leave(ctx: &Context, msg: &Message) -> CommandResult {
|
|
let guild_id = msg.guild_id.unwrap();
|
|
|
|
let manager = songbird::get(ctx)
|
|
.await
|
|
.expect("Songbird Voice client placed in at initialisation.")
|
|
.clone();
|
|
let has_handler = manager.get(guild_id).is_some();
|
|
|
|
if has_handler {
|
|
if let Err(e) = manager.remove(guild_id).await {
|
|
check_msg(
|
|
msg.channel_id
|
|
.say(&ctx.http, format!("Failed: {:?}", e))
|
|
.await,
|
|
);
|
|
}
|
|
|
|
check_msg(msg.channel_id.say(&ctx.http, "Left voice channel").await);
|
|
} else {
|
|
check_msg(msg.reply(ctx, "Not in a voice channel").await);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[command]
|
|
async fn ping(ctx: &Context, msg: &Message) -> CommandResult {
|
|
check_msg(msg.channel_id.say(&ctx.http, "Pong!").await);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Checks that a message successfully sent; if not, then logs why to stdout.
|
|
fn check_msg(result: SerenityResult<Message>) {
|
|
if let Err(why) = result {
|
|
println!("Error sending message: {:?}", why);
|
|
}
|
|
}
|