diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d14ac6e..78855d5 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -35,13 +35,12 @@ Audio processing remains synchronous for the following reasons: Songbird subdivides voice connection handling into several long- and short-lived tasks. * **Core**: Handles and directs commands received from the driver. Responsible for connection/reconnection, and creates network tasks. -* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state. ***Synchronous***. +* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state, and transmitting completed voice packets and keepalive messages. ***Synchronous***. * **Thread Pool**: A dynamically sized thread-pool for I/O tasks. Creates lazy tracks using `Compose` if sync creation is needed, otherwise spawns a tokio task. Seek operations always go to the thread pool. ***Synchronous***. * **Disposer**: Used by mixer thread to dispose of data with potentially long/blocking `Drop` implementations (i.e., audio sources). ***Synchronous***. * **Events**: Stores and runs event handlers, tracks event timing, and handles * **Websocket**: *Network task.* Sends speaking status updates and keepalives to Discord, and receives client (dis)connect events. -* **UDP Tx**: *Network task.* Responsible for transmitting completed voice packets. -* **UDP Rx**: *Network task.* Decrypts/decodes received voice packets and statistics information. +* **UDP Rx**: *Optional network task.* Decrypts/decodes received voice packets and statistics information. *Note: all tasks are able to message the permanent tasks via a block of interconnecting channels.* diff --git a/Cargo.toml b/Cargo.toml index 3a7629b..f38dabd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ audiopus = { optional = true, version = "0.3.0-rc.0" } byteorder = { optional = true, version = "1" } dashmap = { optional = true, version = "5" } derivative = "2" -discortp = { features = ["discord-full"], optional = true, version = "0.5" } +discortp = { default-features = false, features = ["discord", "pnet", "rtp"], optional = true, version = "0.5" } flume = { optional = true, version = "0.10" } futures = "0.3" once_cell = { optional = true, version = "1" } @@ -32,8 +32,9 @@ rubato = { optional = true, version = "0.12" } rusty_pool = { optional = true, version = "0.7" } serde = { version = "1", features = ["derive"] } serde-aux = { default-features = false, optional = true, version = "3"} -simd-json = { features = ["serde_impl"], optional = true, version = "0.6.0" } serde_json = "1" +simd-json = { features = ["serde_impl"], optional = true, version = "0.6.0" } +socket2 = { optional = true, version = "0.4" } streamcatcher = { optional = true, version = "1" } tokio = { default-features = false, optional = true, version = "1.0" } tokio-tungstenite = { optional = true, version = "0.17" } @@ -110,6 +111,7 @@ driver = [ "dep:rusty_pool", "dep:serde-aux", "dep:serenity-voice-model", + "dep:socket2", "dep:streamcatcher", "dep:symphonia", "dep:symphonia-core", @@ -145,9 +147,10 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"] # Behaviour altering features. builtin-queue = [] +receive = ["discortp?/demux", "discortp?/rtcp"] # Used for docgen/testing/benchmarking. -full-doc = ["default", "twilight", "builtin-queue"] +full-doc = ["default", "twilight", "builtin-queue", "receive"] internals = [] [[bench]] diff --git a/README.md b/README.md index 4ce592b..cb6b230 100644 --- a/README.md +++ b/README.md @@ -12,11 +12,11 @@ The library offers: * A standalone driver for voice calls, via the `"driver"` feature. If you can create a `ConnectionInfo` using any other gateway, or language for your bot, then you can run the songbird voice driver. - * And, by default, a fully featured voice system featuring events, queues, RT(C)P packet - handling, seeking on compatible streams, shared multithreaded audio stream caches, + * Voice receive and RT(C)P packet handling via the `"receive"` feature. + * SIMD-accelerated JSON decoding via the `"simd-json"` feature. + * And, by default, a fully featured voice system featuring events, queues, + seeking on compatible streams, shared multithreaded audio stream caches, and direct Opus data passthrough from DCA files. - * To be able to use `simd-json` from serenity, you will need to enable the `simdjson` - feature on both songbird and serenity. ## Intents Songbird's gateway functionality requires you to specify the `GUILD_VOICE_STATES` intent. diff --git a/benches/mixing-task.rs b/benches/mixing-task.rs index 0368392..d07db40 100644 --- a/benches/mixing-task.rs +++ b/benches/mixing-task.rs @@ -25,7 +25,7 @@ use songbird::{ tracks, Config, }; -use std::io::Cursor; +use std::{io::Cursor, net::UdpSocket}; use tokio::runtime::{Handle, Runtime}; use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher, KEY_SIZE}; @@ -41,14 +41,12 @@ fn dummied_mixer( Receiver, Receiver, Receiver, - Receiver, ), ) { let (mix_tx, mix_rx) = flume::unbounded(); let (core_tx, core_rx) = flume::unbounded(); let (event_tx, event_rx) = flume::unbounded(); - let (udp_sender_tx, udp_sender_rx) = flume::unbounded(); let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded(); let ic = Interconnect { @@ -61,18 +59,23 @@ fn dummied_mixer( let mut out = Mixer::new(mix_rx, handle, ic, config); + let udp_tx = UdpSocket::bind("0.0.0.0:0").expect("Failed to create send port."); + udp_tx + .connect("127.0.0.1:5316") + .expect("Failed to connect to local dest port."); + let fake_conn = MixerConnection { cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(), crypto_state: CryptoState::Normal, udp_rx: udp_receiver_tx, - udp_tx: udp_sender_tx, + udp_tx, }; out.conn_active = Some(fake_conn); out.skip_sleep = true; - (out, (core_rx, event_rx, udp_receiver_rx, udp_sender_rx)) + (out, (core_rx, event_rx, udp_receiver_rx)) } fn mixer_float( @@ -85,7 +88,6 @@ fn mixer_float( Receiver, Receiver, Receiver, - Receiver, ), ) { let mut out = dummied_mixer(handle, softclip); @@ -115,7 +117,6 @@ fn mixer_float_drop( Receiver, Receiver, Receiver, - Receiver, ), ) { let mut out = dummied_mixer(handle, true); @@ -143,7 +144,6 @@ fn mixer_opus( Receiver, Receiver, Receiver, - Receiver, ), ) { // should add a single opus-based track. diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 9737722..02ff869 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,3 +6,6 @@ members = [ "serenity/voice_receive", "twilight", ] + +[profile.release] +debug = true diff --git a/examples/serenity/voice_receive/Cargo.toml b/examples/serenity/voice_receive/Cargo.toml index c616efb..f0eec66 100644 --- a/examples/serenity/voice_receive/Cargo.toml +++ b/examples/serenity/voice_receive/Cargo.toml @@ -10,6 +10,7 @@ tracing-subscriber = "0.2" tracing-futures = "0.2" [dependencies.songbird] +features = ["receive"] path = "../../../" [dependencies.serenity] diff --git a/images/arch.afdesign b/images/arch.afdesign index 01947cf..d7ab5a0 100644 Binary files a/images/arch.afdesign and b/images/arch.afdesign differ diff --git a/images/driver.png b/images/driver.png index 56dd68b..b06c2c2 100644 Binary files a/images/driver.png and b/images/driver.png differ diff --git a/images/driver.svg b/images/driver.svg index 0fc7a41..c3b6faa 100644 --- a/images/driver.svg +++ b/images/driver.svg @@ -113,9 +113,6 @@ - - - @@ -132,21 +129,10 @@ UDP Rx - - - - - - UDP Tx - - - - - diff --git a/images/gateway.png b/images/gateway.png index d3e91d2..79f5a4f 100644 Binary files a/images/gateway.png and b/images/gateway.png differ diff --git a/src/config.rs b/src/config.rs index 1264aea..f89ae5c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,8 @@ +#[cfg(feature = "receive")] +use crate::driver::DecodeMode; #[cfg(feature = "driver")] use crate::{ - driver::{retry::Retry, CryptoMode, DecodeMode, MixMode}, + driver::{retry::Retry, CryptoMode, MixMode}, input::codecs::*, }; @@ -29,7 +31,8 @@ pub struct Config { /// /// [`CryptoMode::Normal`]: CryptoMode::Normal pub crypto_mode: CryptoMode, - #[cfg(feature = "driver")] + + #[cfg(all(feature = "driver", feature = "receive"))] /// Configures whether decoding and decryption occur for all received packets. /// /// If voice receiving voice packets, generally you should choose [`DecodeMode::Decode`]. @@ -45,6 +48,7 @@ pub struct Config { /// [`DecodeMode::Pass`]: DecodeMode::Pass /// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate pub decode_mode: DecodeMode, + #[cfg(feature = "gateway")] /// Configures the amount of time to wait for Discord to reply with connection information /// if [`Call::join`]/[`join_gateway`] are used. @@ -58,14 +62,16 @@ pub struct Config { /// [`Call::join`]: crate::Call::join /// [`join_gateway`]: crate::Call::join_gateway pub gateway_timeout: Option, + #[cfg(feature = "driver")] - /// Configures the maximum amount of time to wait for an attempted voice - /// connection to Discord. + /// Configures whether the driver will mix and output stereo or mono Opus data + /// over a voice channel. /// /// Defaults to [`Stereo`]. /// /// [`Stereo`]: MixMode::Stereo pub mix_mode: MixMode, + #[cfg(feature = "driver")] /// Number of concurrently active tracks to allocate memory for. /// @@ -79,6 +85,7 @@ pub struct Config { /// Changes to this field in a running driver will only ever increase /// the capacity of the track store. pub preallocated_tracks: usize, + #[cfg(feature = "driver")] /// Connection retry logic for the [`Driver`]. /// @@ -87,6 +94,7 @@ pub struct Config { /// /// [`Driver`]: crate::driver::Driver pub driver_retry: Retry, + #[cfg(feature = "driver")] /// Configures whether or not each mixed audio packet is [soft-clipped] into the /// [-1, 1] audio range. @@ -101,12 +109,14 @@ pub struct Config { /// /// [soft-clipped]: https://opus-codec.org/docs/opus_api-1.3.1/group__opus__decoder.html#gaff99598b352e8939dded08d96e125e0b pub use_softclip: bool, + #[cfg(feature = "driver")] /// Configures the maximum amount of time to wait for an attempted voice /// connection to Discord. /// /// Defaults to 10 seconds. If set to `None`, connections will never time out. pub driver_timeout: Option, + #[cfg(feature = "driver")] #[derivative(Debug = "ignore")] /// Registry of the inner codecs supported by the driver, adding audiopus-based @@ -116,6 +126,7 @@ pub struct Config { /// /// [`CODEC_REGISTRY`]: static@CODEC_REGISTRY pub codec_registry: &'static CodecRegistry, + #[cfg(feature = "driver")] #[derivative(Debug = "ignore")] /// Registry of the muxers and container formats supported by the driver. @@ -142,7 +153,7 @@ impl Default for Config { Self { #[cfg(feature = "driver")] crypto_mode: CryptoMode::Normal, - #[cfg(feature = "driver")] + #[cfg(all(feature = "driver", feature = "receive"))] decode_mode: DecodeMode::Decrypt, #[cfg(feature = "gateway")] gateway_timeout: Some(Duration::from_secs(10)), @@ -179,6 +190,7 @@ impl Config { self } + #[cfg(feature = "receive")] /// Sets this `Config`'s received packet decryption/decoding behaviour. #[must_use] pub fn decode_mode(mut self, decode_mode: DecodeMode) -> Self { diff --git a/src/driver/connection/mod.rs b/src/driver/connection/mod.rs index 9a92a25..4c2f1d3 100644 --- a/src/driver/connection/mod.rs +++ b/src/driver/connection/mod.rs @@ -1,7 +1,9 @@ pub mod error; +#[cfg(feature = "receive")] +use super::tasks::udp_rx; use super::{ - tasks::{message::*, udp_rx, udp_tx, ws as ws_task}, + tasks::{message::*, ws as ws_task}, Config, CryptoMode, }; @@ -18,7 +20,8 @@ use crate::{ use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket}; use error::{Error, Result}; use flume::Sender; -use std::{net::IpAddr, str::FromStr, sync::Arc}; +use socket2::Socket; +use std::{net::IpAddr, str::FromStr}; use tokio::{net::UdpSocket, spawn, time::timeout}; use tracing::{debug, info, instrument}; use url::Url; @@ -103,6 +106,16 @@ impl Connection { } let udp = UdpSocket::bind("0.0.0.0:0").await?; + + // Optimisation for non-receive case: set rx buffer size to zero. + let udp = if cfg!(feature = "receive") { + udp + } else { + let socket = Socket::from(udp.into_std()?); + socket.set_recv_buffer_size(0)?; + UdpSocket::from_std(socket.into())? + }; + udp.connect((ready.ip, ready.port)).await?; // Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed. @@ -164,22 +177,36 @@ impl Connection { info!("WS heartbeat duration {}ms.", hello.heartbeat_interval,); let (ws_msg_tx, ws_msg_rx) = flume::unbounded(); - let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded(); + #[cfg(feature = "receive")] let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded(); + // NOTE: This causes the UDP Socket on "receive" to be non-blocking, + // and the standard to be blocking. A UDP send should only WouldBlock if + // you're sending more data than the OS can handle (not likely, and + // at that point you should scale horizontally). + // + // If this is a problem for anyone, we can make non-blocking sends + // queue up a delayed send up to a limit. + #[cfg(feature = "receive")] let (udp_rx, udp_tx) = { - let udp_rx = Arc::new(udp); - let udp_tx = Arc::clone(&udp_rx); + let udp_tx = udp.into_std()?; + let udp_rx = UdpSocket::from_std(udp_tx.try_clone()?)?; (udp_rx, udp_tx) }; + #[cfg(not(feature = "receive"))] + let udp_tx = udp.into_std()?; let ssrc = ready.ssrc; let mix_conn = MixerConnection { + #[cfg(feature = "receive")] cipher: cipher.clone(), + #[cfg(not(feature = "receive"))] + cipher, crypto_state: config.crypto_mode.into(), + #[cfg(feature = "receive")] udp_rx: udp_receiver_msg_tx, - udp_tx: udp_sender_msg_tx, + udp_tx, }; interconnect @@ -200,6 +227,7 @@ impl Connection { info.clone(), )); + #[cfg(feature = "receive")] spawn(udp_rx::runner( interconnect.clone(), udp_receiver_msg_rx, @@ -207,7 +235,6 @@ impl Connection { config.clone(), udp_rx, )); - spawn(udp_tx::runner(udp_sender_msg_rx, ssrc, udp_tx)); Ok(Connection { info, diff --git a/src/driver/crypto.rs b/src/driver/crypto.rs index d127950..db2e898 100644 --- a/src/driver/crypto.rs +++ b/src/driver/crypto.rs @@ -3,10 +3,11 @@ use byteorder::{NetworkEndian, WriteBytesExt}; use discortp::{rtp::RtpPacket, MutablePacket}; use rand::Rng; use std::num::Wrapping; +#[cfg(any(feature = "receive", test))] +use xsalsa20poly1305::Tag; use xsalsa20poly1305::{ aead::{AeadInPlace, Error as CryptoError}, Nonce, - Tag, XSalsa20Poly1305 as Cipher, NONCE_SIZE, TAG_SIZE, @@ -110,6 +111,7 @@ impl CryptoMode { } } + #[cfg(any(feature = "receive", test))] /// Decrypts a Discord RT(C)P packet using the given key. /// /// If successful, this returns the number of bytes to be ignored from the diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 20a3f02..4045978 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -13,6 +13,7 @@ pub mod bench_internals; pub(crate) mod connection; mod crypto; +#[cfg(feature = "receive")] mod decode_mode; mod mix_mode; pub mod retry; @@ -23,6 +24,7 @@ pub(crate) mod test_config; use connection::error::{Error, Result}; pub use crypto::CryptoMode; pub(crate) use crypto::CryptoState; +#[cfg(feature = "receive")] pub use decode_mode::DecodeMode; pub use mix_mode::MixMode; #[cfg(test)] diff --git a/src/driver/tasks/error.rs b/src/driver/tasks/error.rs index 10a1bb7..339f14f 100644 --- a/src/driver/tasks/error.rs +++ b/src/driver/tasks/error.rs @@ -2,7 +2,7 @@ use super::message::*; use crate::ws::Error as WsError; use audiopus::Error as OpusError; use flume::SendError; -use std::io::Error as IoError; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use xsalsa20poly1305::aead::Error as CryptoError; #[derive(Debug)] @@ -10,8 +10,8 @@ pub enum Recipient { AuxNetwork, Event, Mixer, + #[cfg(feature = "receive")] UdpRx, - UdpTx, } pub type Result = std::result::Result; @@ -20,6 +20,7 @@ pub type Result = std::result::Result; #[non_exhaustive] pub enum Error { Crypto(CryptoError), + #[cfg(feature = "receive")] /// Received an illegal voice packet on the voice UDP socket. IllegalVoicePacket, InterconnectFailure(Recipient), @@ -30,15 +31,26 @@ pub enum Error { impl Error { pub(crate) fn should_trigger_connect(&self) -> bool { - matches!( - self, - Error::InterconnectFailure(Recipient::AuxNetwork | Recipient::UdpRx | Recipient::UdpTx) - ) + match self { + Error::InterconnectFailure(Recipient::AuxNetwork) => true, + #[cfg(feature = "receive")] + Error::InterconnectFailure(Recipient::UdpRx) => true, + _ => false, + } } pub(crate) fn should_trigger_interconnect_rebuild(&self) -> bool { matches!(self, Error::InterconnectFailure(Recipient::Event)) } + + // This prevents a `WouldBlock` from triggering a full reconnect, + // instead simply dropping the packet. + pub(crate) fn disarm_would_block(self) -> Result<()> { + match self { + Self::Io(i) if i.kind() == IoErrorKind::WouldBlock => Ok(()), + e => Err(e), + } + } } impl From for Error { @@ -77,18 +89,13 @@ impl From> for Error { } } +#[cfg(feature = "receive")] impl From> for Error { fn from(_e: SendError) -> Error { Error::InterconnectFailure(Recipient::UdpRx) } } -impl From> for Error { - fn from(_e: SendError) -> Error { - Error::InterconnectFailure(Recipient::UdpTx) - } -} - impl From for Error { fn from(e: WsError) -> Error { Error::Ws(e) diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 175295a..585e135 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -1,21 +1,24 @@ #![allow(missing_docs)] -use super::{Interconnect, TrackContext, UdpRxMessage, UdpTxMessage, WsMessage}; +#[cfg(feature = "receive")] +use super::UdpRxMessage; +use super::{Interconnect, TrackContext, WsMessage}; use crate::{ driver::{Bitrate, Config, CryptoState}, input::{AudioStreamError, Compose, Parsed}, }; use flume::Sender; -use std::sync::Arc; +use std::{net::UdpSocket, sync::Arc}; use symphonia_core::{errors::Error as SymphoniaError, formats::SeekedTo}; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; pub struct MixerConnection { pub cipher: Cipher, pub crypto_state: CryptoState, + #[cfg(feature = "receive")] pub udp_rx: Sender, - pub udp_tx: Sender, + pub udp_tx: UdpSocket, } pub enum MixerMessage { diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index 0009c17..ba69101 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -4,11 +4,13 @@ mod core; mod disposal; mod events; mod mixer; +#[cfg(feature = "receive")] mod udp_rx; -mod udp_tx; mod ws; -pub use self::{core::*, disposal::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; +#[cfg(feature = "receive")] +pub use self::udp_rx::*; +pub use self::{core::*, disposal::*, events::*, mixer::*, ws::*}; use flume::Sender; use tokio::spawn; diff --git a/src/driver/tasks/message/udp_tx.rs b/src/driver/tasks/message/udp_tx.rs deleted file mode 100644 index 16b7ad1..0000000 --- a/src/driver/tasks/message/udp_tx.rs +++ /dev/null @@ -1,4 +0,0 @@ -#![allow(missing_docs)] - -// TODO: do something cheaper. -pub type UdpTxMessage = Vec; diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 7be73d5..648bfd6 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -10,7 +10,11 @@ use result::*; use state::*; pub use track::*; -use super::{disposal, error::Result, message::*}; +use super::{ + disposal, + error::{Error, Result}, + message::*, +}; use crate::{ constants::*, driver::MixMode, @@ -26,6 +30,7 @@ use audiopus::{ Bitrate, }; use discortp::{ + discord::MutableKeepalivePacket, rtp::{MutableRtpPacket, RtpPacket}, MutablePacket, }; @@ -73,6 +78,9 @@ pub struct Mixer { thread_pool: BlockyTaskPool, pub ws: Option>, + pub keepalive_deadline: Instant, + pub keepalive_packet: [u8; MutableKeepalivePacket::minimum_packet_size()], + pub tracks: Vec, track_handles: Vec, @@ -104,6 +112,7 @@ impl Mixer { let soft_clip = SoftClip::new(config.mix_mode.to_opus()); let mut packet = [0u8; VOICE_PACKET_MAX]; + let keepalive_packet = [0u8; MutableKeepalivePacket::minimum_packet_size()]; let mut rtp = MutableRtpPacket::new(&mut packet[..]).expect( "FATAL: Too few bytes in self.packet for RTP header.\ @@ -146,12 +155,14 @@ impl Mixer { SignalSpec::new_with_layout(SAMPLE_RATE_RAW as u32, Layout::Stereo), ); + let deadline = Instant::now(); + Self { bitrate, config, conn_active: None, content_prep_sequence: 0, - deadline: Instant::now(), + deadline, disposer, encoder, interconnect, @@ -165,6 +176,9 @@ impl Mixer { thread_pool, ws: None, + keepalive_deadline: deadline, + keepalive_packet, + tracks, track_handles, @@ -213,7 +227,14 @@ impl Mixer { // The above action may have invalidated the connection; need to re-check! // Also, if we're in a test mode we should unconditionally run packet mixing code. if self.conn_active.is_some() || ignore_check { - if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) { + if let Err(e) = self + .cycle() + .and_then(|_| self.audio_commands_events()) + .and_then(|_| { + self.check_and_send_keepalive() + .or_else(Error::disarm_would_block) + }) + { events_failure |= e.should_trigger_interconnect_rebuild(); conn_failure |= e.should_trigger_connect(); @@ -313,6 +334,11 @@ impl Mixer { rtp.set_sequence(random::().into()); rtp.set_timestamp(random::().into()); self.deadline = Instant::now(); + + let mut ka = MutableKeepalivePacket::new(&mut self.keepalive_packet[..]) + .expect("FATAL: Insufficient bytes given to keepalive packet."); + ka.set_ssrc(ssrc); + self.keepalive_deadline = self.deadline + UDP_KEEPALIVE_GAP; Ok(()) }, MixerMessage::DropConn => { @@ -321,9 +347,12 @@ impl Mixer { }, MixerMessage::ReplaceInterconnect(i) => { self.prevent_events = false; + if let Some(ws) = &self.ws { conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); } + + #[cfg(feature = "receive")] if let Some(conn) = &self.conn_active { conn_failure |= conn .udp_rx @@ -357,13 +386,19 @@ impl Mixer { ); } - self.config = Arc::new(new_config.clone()); + self.config = Arc::new( + #[cfg(feature = "receive")] + new_config.clone(), + #[cfg(not(feature = "receive"))] + new_config, + ); if self.tracks.capacity() < self.config.preallocated_tracks { self.tracks .reserve(self.config.preallocated_tracks - self.tracks.len()); } + #[cfg(feature = "receive")] if let Some(conn) = &self.conn_active { conn_failure |= conn .udp_rx @@ -674,7 +709,7 @@ impl Mixer { let send_buffer = self.config.use_softclip.then(|| &softclip_buffer[..]); #[cfg(test)] - if let Some(OutputMode::Raw(tx)) = &self.config.override_connection { + let send_status = if let Some(OutputMode::Raw(tx)) = &self.config.override_connection { let msg = match mix_len { MixType::Passthrough(len) if len == SILENT_FRAME.len() => OutputMessage::Silent, MixType::Passthrough(len) => { @@ -693,12 +728,18 @@ impl Mixer { }; drop(tx.send(msg.into())); + + Ok(()) } else { - self.prep_and_send_packet(send_buffer, mix_len)?; - } + self.prep_and_send_packet(send_buffer, mix_len) + }; #[cfg(not(test))] - self.prep_and_send_packet(send_buffer, mix_len)?; + let send_status = self.prep_and_send_packet(send_buffer, mix_len); + + send_status.or_else(Error::disarm_would_block)?; + + self.advance_rtp_counters(); // Zero out all planes of the mix buffer if any audio was written. if matches!(mix_len, MixType::MixedPcm(a) if a > 0) { @@ -770,25 +811,36 @@ impl Mixer { // Test mode: send unencrypted (compressed) packets to local receiver. drop(tx.send(self.packet[..index].to_vec().into())); } else { - conn.udp_tx.send(self.packet[..index].to_vec())?; + conn.udp_tx.send(&self.packet[..index])?; } #[cfg(not(test))] { // Normal operation: send encrypted payload to UDP Tx task. - - // TODO: This is dog slow, don't do this. - // Can we replace this with a shared ring buffer + semaphore? - // or the BBQueue crate? - conn.udp_tx.send(self.packet[..index].to_vec())?; + conn.udp_tx.send(&self.packet[..index])?; } + Ok(()) + } + + #[inline] + fn advance_rtp_counters(&mut self) { let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( "FATAL: Too few bytes in self.packet for RTP header.\ (Blame: VOICE_PACKET_MAX?)", ); rtp.set_sequence(rtp.get_sequence() + 1); rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32); + } + + #[inline] + fn check_and_send_keepalive(&mut self) -> Result<()> { + if let Some(conn) = self.conn_active.as_mut() { + if Instant::now() >= self.keepalive_deadline { + conn.udp_tx.send(&self.keepalive_packet)?; + self.keepalive_deadline += UDP_KEEPALIVE_GAP; + } + } Ok(()) } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index 945c6e2..584375b 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -5,8 +5,8 @@ pub mod error; mod events; pub mod message; pub mod mixer; +#[cfg(feature = "receive")] pub(crate) mod udp_rx; -pub(crate) mod udp_tx; pub(crate) mod ws; use std::time::Duration; diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index 2d8e830..3a3d1fc 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -22,7 +22,7 @@ use discortp::{ PacketSize, }; use flume::Receiver; -use std::{collections::HashMap, convert::TryInto, sync::Arc}; +use std::{collections::HashMap, convert::TryInto}; use tokio::{net::UdpSocket, select}; use tracing::{error, instrument, trace, warn}; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; @@ -240,7 +240,7 @@ struct UdpRx { config: Config, packet_buffer: [u8; VOICE_PACKET_MAX], rx: Receiver, - udp_socket: Arc, + udp_socket: UdpSocket, } impl UdpRx { @@ -395,7 +395,7 @@ pub(crate) async fn runner( rx: Receiver, cipher: Cipher, config: Config, - udp_socket: Arc, + udp_socket: UdpSocket, ) { trace!("UDP receive handle started."); diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs deleted file mode 100644 index 7eb9e85..0000000 --- a/src/driver/tasks/udp_tx.rs +++ /dev/null @@ -1,63 +0,0 @@ -use super::message::*; -use crate::constants::*; -use discortp::discord::MutableKeepalivePacket; -use flume::Receiver; -use std::sync::Arc; -use tokio::{ - net::UdpSocket, - time::{timeout_at, Instant}, -}; -use tracing::{error, instrument, trace}; - -struct UdpTx { - ssrc: u32, - rx: Receiver, - udp_tx: Arc, -} - -impl UdpTx { - async fn run(&mut self) { - let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()]; - let mut ka = MutableKeepalivePacket::new(&mut keepalive_bytes[..]) - .expect("FATAL: Insufficient bytes given to keepalive packet."); - ka.set_ssrc(self.ssrc); - - let mut ka_time = Instant::now() + UDP_KEEPALIVE_GAP; - - loop { - match timeout_at(ka_time, self.rx.recv_async()).await { - Err(_) => { - trace!("Sending UDP Keepalive."); - if let Err(e) = self.udp_tx.send(&keepalive_bytes[..]).await { - error!("Fatal UDP keepalive send error: {:?}.", e); - break; - } - ka_time += UDP_KEEPALIVE_GAP; - }, - Ok(Ok(p)) => - if let Err(e) = self.udp_tx.send(&p[..]).await { - error!("Fatal UDP packet send error: {:?}.", e); - break; - }, - Ok(Err(flume::RecvError::Disconnected)) => { - break; - }, - } - } - } -} - -#[instrument(skip(udp_msg_rx))] -pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, udp_tx: Arc) { - trace!("UDP transmit handle started."); - - let mut txer = UdpTx { - ssrc, - rx: udp_msg_rx, - udp_tx, - }; - - txer.run().await; - - trace!("UDP transmit handle stopped."); -} diff --git a/src/events/context/data/mod.rs b/src/events/context/data/mod.rs index abc385e..6021e4d 100644 --- a/src/events/context/data/mod.rs +++ b/src/events/context/data/mod.rs @@ -3,10 +3,16 @@ //! [`EventContext`]: super::EventContext mod connect; mod disconnect; +#[cfg(feature = "receive")] mod rtcp; +#[cfg(feature = "receive")] mod speaking; +#[cfg(feature = "receive")] mod voice; +#[cfg(feature = "receive")] use discortp::{rtcp::Rtcp, rtp::Rtp}; -pub use self::{connect::*, disconnect::*, rtcp::*, speaking::*, voice::*}; +pub use self::{connect::*, disconnect::*}; +#[cfg(feature = "receive")] +pub use self::{rtcp::*, speaking::*, voice::*}; diff --git a/src/events/context/internal_data.rs b/src/events/context/internal_data.rs index e26d5ad..a3b71b6 100644 --- a/src/events/context/internal_data.rs +++ b/src/events/context/internal_data.rs @@ -1,6 +1,5 @@ use super::context_data::*; use crate::ConnectionInfo; -use discortp::{rtcp::Rtcp, rtp::Rtp}; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct InternalConnect { @@ -15,27 +14,6 @@ pub struct InternalDisconnect { pub info: ConnectionInfo, } -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct InternalSpeakingUpdate { - pub ssrc: u32, - pub speaking: bool, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct InternalVoicePacket { - pub audio: Option>, - pub packet: Rtp, - pub payload_offset: usize, - pub payload_end_pad: usize, -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct InternalRtcpPacket { - pub packet: Rtcp, - pub payload_offset: usize, - pub payload_end_pad: usize, -} - impl<'a> From<&'a InternalConnect> for ConnectData<'a> { fn from(val: &'a InternalConnect) -> Self { Self { @@ -60,32 +38,62 @@ impl<'a> From<&'a InternalDisconnect> for DisconnectData<'a> { } } -impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData { - fn from(val: &'a InternalSpeakingUpdate) -> Self { - Self { - speaking: val.speaking, - ssrc: val.ssrc, +#[cfg(feature = "receive")] +mod receive { + use super::*; + use discortp::{rtcp::Rtcp, rtp::Rtp}; + + #[derive(Clone, Debug, Eq, Hash, PartialEq)] + pub struct InternalSpeakingUpdate { + pub ssrc: u32, + pub speaking: bool, + } + + #[derive(Clone, Debug, Eq, PartialEq)] + pub struct InternalVoicePacket { + pub audio: Option>, + pub packet: Rtp, + pub payload_offset: usize, + pub payload_end_pad: usize, + } + + #[derive(Clone, Debug, Eq, PartialEq)] + pub struct InternalRtcpPacket { + pub packet: Rtcp, + pub payload_offset: usize, + pub payload_end_pad: usize, + } + + impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData { + fn from(val: &'a InternalSpeakingUpdate) -> Self { + Self { + speaking: val.speaking, + ssrc: val.ssrc, + } + } + } + + impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> { + fn from(val: &'a InternalVoicePacket) -> Self { + Self { + audio: &val.audio, + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } + } + } + + impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> { + fn from(val: &'a InternalRtcpPacket) -> Self { + Self { + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } } } } -impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> { - fn from(val: &'a InternalVoicePacket) -> Self { - Self { - audio: &val.audio, - packet: &val.packet, - payload_offset: val.payload_offset, - payload_end_pad: val.payload_end_pad, - } - } -} - -impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> { - fn from(val: &'a InternalRtcpPacket) -> Self { - Self { - packet: &val.packet, - payload_offset: val.payload_offset, - payload_end_pad: val.payload_end_pad, - } - } -} +#[cfg(feature = "receive")] +pub use receive::*; diff --git a/src/events/context/mod.rs b/src/events/context/mod.rs index 693dc03..2ea0921 100644 --- a/src/events/context/mod.rs +++ b/src/events/context/mod.rs @@ -26,24 +26,35 @@ pub enum EventContext<'a> { /// [`EventStore::add_event`]: EventStore::add_event /// [`TrackHandle::add_event`]: TrackHandle::add_event Track(&'a [(&'a TrackState, &'a TrackHandle)]), + /// Speaking state update, typically describing how another voice /// user is transmitting audio data. Clients must send at least one such /// packet to allow SSRC/UserID matching. SpeakingStateUpdate(Speaking), + + #[cfg(feature = "receive")] /// Speaking state transition, describing whether a given source has started/stopped /// transmitting. This fires in response to a silent burst, or the first packet /// breaking such a burst. SpeakingUpdate(SpeakingUpdateData), + + #[cfg(feature = "receive")] /// Opus audio packet, received from another stream. VoicePacket(VoiceData<'a>), + + #[cfg(feature = "receive")] /// Telemetry/statistics packet, received from another stream. RtcpPacket(RtcpData<'a>), + /// Fired whenever a client disconnects. ClientDisconnect(ClientDisconnect), + /// Fires when this driver successfully connects to a voice channel. DriverConnect(ConnectData<'a>), + /// Fires when this driver successfully reconnects after a network error. DriverReconnect(ConnectData<'a>), + /// Fires when this driver fails to connect to, or drops from, a voice channel. DriverDisconnect(DisconnectData<'a>), } @@ -51,8 +62,11 @@ pub enum EventContext<'a> { #[derive(Debug)] pub enum CoreContext { SpeakingStateUpdate(Speaking), + #[cfg(feature = "receive")] SpeakingUpdate(InternalSpeakingUpdate), + #[cfg(feature = "receive")] VoicePacket(InternalVoicePacket), + #[cfg(feature = "receive")] RtcpPacket(InternalRtcpPacket), ClientDisconnect(ClientDisconnect), DriverConnect(InternalConnect), @@ -64,9 +78,12 @@ impl<'a> CoreContext { pub(crate) fn to_user_context(&'a self) -> EventContext<'a> { match self { Self::SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), + #[cfg(feature = "receive")] Self::SpeakingUpdate(evt) => EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), + #[cfg(feature = "receive")] Self::VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), + #[cfg(feature = "receive")] Self::RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), Self::ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), Self::DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)), @@ -84,8 +101,11 @@ impl EventContext<'_> { pub fn to_core_event(&self) -> Option { match self { Self::SpeakingStateUpdate(_) => Some(CoreEvent::SpeakingStateUpdate), + #[cfg(feature = "receive")] Self::SpeakingUpdate(_) => Some(CoreEvent::SpeakingUpdate), + #[cfg(feature = "receive")] Self::VoicePacket(_) => Some(CoreEvent::VoicePacket), + #[cfg(feature = "receive")] Self::RtcpPacket(_) => Some(CoreEvent::RtcpPacket), Self::ClientDisconnect(_) => Some(CoreEvent::ClientDisconnect), Self::DriverConnect(_) => Some(CoreEvent::DriverConnect), diff --git a/src/events/core.rs b/src/events/core.rs index b73ac0b..cec06d1 100644 --- a/src/events/core.rs +++ b/src/events/core.rs @@ -6,8 +6,26 @@ /// /// ## Events from other users /// Songbird can observe when a user *speaks for the first time* ([`SpeakingStateUpdate`]), -/// when a client leaves the session ([`ClientDisconnect`]), voice packets ([`VoicePacket`]), and -/// telemetry data ([`RtcpPacket`]). The format of voice packets is described by [`VoiceData`]. +/// when a client leaves the session ([`ClientDisconnect`]). +/// +/// When the `"receive"` feature is enabled, songbird can also handle voice packets +#[cfg_attr(feature = "receive", doc = "([`VoicePacket`](Self::VoicePacket)),")] +#[cfg_attr(not(feature = "receive"), doc = "(`VoicePacket`),")] +/// detect speech starting/stopping +#[cfg_attr( + feature = "receive", + doc = "([`SpeakingUpdate`](Self::SpeakingUpdate))," +)] +#[cfg_attr(not(feature = "receive"), doc = "(`SpeakingUpdate`),")] +/// and handle telemetry data +#[cfg_attr(feature = "receive", doc = "([`RtcpPacket`](Self::RtcpPacket)).")] +#[cfg_attr(not(feature = "receive"), doc = "(`RtcpPacket`).")] +/// The format of voice packets is described by +#[cfg_attr( + feature = "receive", + doc = "[`VoiceData`](super::context::data::VoiceData)." +)] +#[cfg_attr(not(feature = "receive"), doc = "`VoiceData`.")] /// /// To detect when a user connects, you must correlate gateway (e.g., `VoiceStateUpdate`) events /// from the main part of your bot. @@ -15,15 +33,12 @@ /// To obtain a user's SSRC, you must use [`SpeakingStateUpdate`] events. /// /// [`EventData`]: super::EventData -/// [`VoiceData`]: super::context::data::VoiceData /// [`SpeakingStateUpdate`]: Self::SpeakingStateUpdate /// [`ClientDisconnect`]: Self::ClientDisconnect -/// [`VoicePacket`]: Self::VoicePacket -/// [`RtcpPacket`]: Self::RtcpPacket #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[non_exhaustive] pub enum CoreEvent { - /// Speaking state update, typically describing how another voice + /// Speaking state update from the WS gateway, typically describing how another voice /// user is transmitting audio data. Clients must send at least one such /// packet to allow SSRC/UserID matching. /// @@ -32,24 +47,34 @@ pub enum CoreEvent { /// Note: this will fire when a user starts speaking for the first time, /// or changes their capabilities. SpeakingStateUpdate, + + #[cfg(feature = "receive")] /// Fires when a source starts speaking, or stops speaking /// (*i.e.*, 5 consecutive silent frames). SpeakingUpdate, + + #[cfg(feature = "receive")] /// Fires on receipt of a voice packet from another stream in the voice call. /// /// As RTP packets do not map to Discord's notion of users, SSRCs must be mapped /// back using the user IDs seen through client connection, disconnection, /// or speaking state update. VoicePacket, + + #[cfg(feature = "receive")] /// Fires on receipt of an RTCP packet, containing various call stats /// such as latency reports. RtcpPacket, + /// Fires whenever a user disconnects from the same stream as the bot. ClientDisconnect, + /// Fires when this driver successfully connects to a voice channel. DriverConnect, + /// Fires when this driver successfully reconnects after a network error. DriverReconnect, + /// Fires when this driver fails to connect to, or drops from, a voice channel. DriverDisconnect, } diff --git a/src/lib.rs b/src/lib.rs index 6310800..54324b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,11 +13,13 @@ //! `"gateway"` and `"[serenity/twilight]"` plus `"[rustls/native]"` features. You can even run //! driverless, to help manage your [lavalink] sessions. //! * A standalone driver for voice calls, via the `"driver"` feature. If you can create -//! a [`ConnectionInfo`] using any other gateway, or language for your bot, then you +//! a `ConnectionInfo` using any other gateway, or language for your bot, then you //! can run the songbird voice driver. -//! * And, by default, a fully featured voice system featuring events, queues, RT(C)P packet -//! handling, seeking on compatible streams, shared multithreaded audio stream caches, -//! and direct Opus data passthrough. +//! * Voice receive and RT(C)P packet handling via the `"receive"` feature. +//! * SIMD-accelerated JSON decoding via the `"simd-json"` feature. +//! * And, by default, a fully featured voice system featuring events, queues, +//! seeking on compatible streams, shared multithreaded audio stream caches, +//! and direct Opus data passthrough from DCA files. //! //! ## Intents //! Songbird's gateway functionality requires you to specify the `GUILD_VOICE_STATES` intent. @@ -101,7 +103,7 @@ pub mod tracks; #[cfg(feature = "driver")] mod ws; -#[cfg(feature = "driver")] +#[cfg(all(feature = "driver", feature = "receive"))] pub use discortp as packet; #[cfg(feature = "driver")] pub use serenity_voice_model as model;