diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87d2a78..66d1935 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: uses: actions-rs/clippy-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - args: --all-features + args: --features full-doc test: name: Test @@ -39,6 +39,7 @@ jobs: - Windows - driver only - gateway only + - legacy tokio include: - name: beta @@ -51,8 +52,13 @@ jobs: os: windows-latest - name: driver only features: driver rustls + dont-test: true - name: gateway only features: serenity-rustls + dont-test: true + - name: legacy tokio + features: serenity-rustls-tokio-02 driver-tokio-02 + dont-test: true steps: - name: Checkout sources @@ -84,18 +90,18 @@ jobs: - name: Build all features if: matrix.features == '' - run: cargo build --all-features + run: cargo build --features full-doc - name: Test all features if: matrix.features == '' - run: cargo test --all-features + run: cargo test --features full-doc - name: Build some features if: matrix.features run: cargo build --no-default-features --features "${{ matrix.features }}" - name: Test some features - if: matrix.features + if: ${{ !matrix.dont-test && matrix.features }} run: cargo test --no-default-features --features "${{ matrix.features }}" doc: @@ -131,7 +137,7 @@ jobs: env: RUSTDOCFLAGS: -D broken_intra_doc_links run: | - cargo doc --no-deps --all-features + cargo doc --no-deps --features full-doc examples: name: Examples diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 8f615ac..1e5fc21 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -41,7 +41,7 @@ jobs: env: RUSTDOCFLAGS: -D broken_intra_doc_links run: | - cargo doc --no-deps --features default,twilight-rustls,builtin-queue,stock-zlib + cargo doc --no-deps --features full-doc - name: Prepare docs shell: bash -e -O extglob {0} diff --git a/Cargo.toml b/Cargo.toml index e2319b6..6cf8657 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,13 @@ features = ["tokio-runtime"] optional = true version = "0.11" +[dependencies.async-tungstenite-compat] +package = "async-tungstenite" +default-features = false +features = ["tokio-runtime"] +optional = true +version = "0.9" + [dependencies.audiopus] optional = true version = "0.2" @@ -62,7 +69,7 @@ version = "0.8" [dependencies.serenity] optional = true -version = "0.10" +version = "^0.10.2" default-features = false features = ["voice", "gateway"] @@ -83,6 +90,12 @@ optional = true version = "1.0" default-features = false +[dependencies.tokio-compat] +optional = true +package = "tokio" +version = "0.2" +default-features = false + [dependencies.twilight-gateway] optional = true version = "0.3" @@ -115,20 +128,35 @@ criterion = "0.3" utils = { path = "utils" } [features] +# Core features default = [ "serenity-rustls", "driver", "gateway", ] gateway = [ + "gateway-core", + "tokio/sync", +] +gateway-core = [ "dashmap", "flume", "parking_lot", - "tokio/sync", ] driver = [ - "async-trait", "async-tungstenite", + "driver-core", + "tokio/fs", + "tokio/io-util", + "tokio/macros", + "tokio/net", + "tokio/process", + "tokio/rt", + "tokio/sync", + "tokio/time", +] +driver-core = [ + "async-trait", "audiopus", "byteorder", "discortp", @@ -138,21 +166,13 @@ driver = [ "serenity-voice-model", "spin_sleep", "streamcatcher", - "tokio/fs", - "tokio/io-util", - "tokio/macros", - "tokio/net", - "tokio/process", - "tokio/rt", - "tokio/sync", - "tokio/time", "typemap_rev", "url", "uuid", "xsalsa20poly1305", ] -rustls = ["async-tungstenite/tokio-rustls"] -native = ["async-tungstenite/tokio-native-tls"] +rustls = ["async-tungstenite/tokio-rustls", "rustls-marker"] +native = ["async-tungstenite/tokio-native-tls", "native-marker"] serenity-rustls = ["serenity/rustls_backend", "rustls", "gateway", "serenity-deps"] serenity-native = ["serenity/native_tls_backend", "native", "gateway", "serenity-deps"] twilight-rustls = ["twilight", "twilight-gateway/rustls", "rustls", "gateway"] @@ -162,9 +182,41 @@ simd-zlib = ["twilight-gateway/simd-zlib"] stock-zlib = ["twilight-gateway/stock-zlib"] serenity-deps = ["async-trait"] +rustls-marker = [] +native-marker = [] + +# Tokio 0.2 Compatibility features +# These should probably be dropped around the same time as serenity drop them. +rustls-tokio-02 = ["async-tungstenite-compat/tokio-rustls", "rustls-marker", "tokio-02-marker"] +native-tokio-02 = ["async-tungstenite-compat/tokio-native-tls", "native-marker", "tokio-02-marker"] +serenity-rustls-tokio-02 = ["serenity/rustls_tokio_0_2_backend", "rustls-tokio-02", "gateway-tokio-02", "serenity-deps"] +serenity-native-tokio-02 = ["serenity/native_tls_tokio_0_2_backend", "native-tokio-02", "gateway-tokio-02", "serenity-deps"] +gateway-tokio-02 = [ + "gateway-core", + "tokio-02-marker", + "tokio-compat/sync", +] +driver-tokio-02 = [ + "async-tungstenite-compat", + "driver-core", + "tokio-02-marker", + "tokio-compat/fs", + "tokio-compat/io-util", + "tokio-compat/macros", + "tokio-compat/net", + "tokio-compat/process", + "tokio-compat/rt-core", + "tokio-compat/sync", + "tokio-compat/time", +] +tokio-02-marker = [] + +# Behaviour altering features. youtube-dlc = [] builtin-queue = [] +# Used for docgen/testing/benchmarking. +full-doc = ["default", "twilight-rustls", "builtin-queue", "stock-zlib"] internals = [] [[bench]] @@ -179,4 +231,4 @@ required-features = ["internals"] harness = false [package.metadata.docs.rs] -features = ["default", "twilight-rustls", "builtin-queue", "stock-zlib"] +features = ["full-doc"] diff --git a/build.rs b/build.rs index 85b9e90..f969781 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,7 @@ -#[cfg(all(feature = "driver", not(any(feature = "rustls", feature = "native"))))] +#[cfg(all( + feature = "driver", + not(any(feature = "rustls-marker", feature = "native-marker")) +))] compile_error!( "You have the `driver` feature enabled: \ either the `rustls` or `native` feature must be diff --git a/src/constants.rs b/src/constants.rs index d6b757a..9c2e834 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,16 +1,16 @@ //! Constants affecting driver function and API handling. -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use audiopus::{Bitrate, SampleRate}; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use discortp::rtp::RtpType; use std::time::Duration; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] /// The voice gateway version used by the library. pub const VOICE_GATEWAY_VERSION: u8 = crate::model::constants::GATEWAY_VERSION; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] /// Sample rate of audio to be sent to Discord. pub const SAMPLE_RATE: SampleRate = SampleRate::Hz48000; @@ -23,7 +23,7 @@ pub const AUDIO_FRAME_RATE: usize = 50; /// Length of time between any two audio frames. pub const TIMESTEP_LENGTH: Duration = Duration::from_millis(1000 / AUDIO_FRAME_RATE as u64); -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] /// Default bitrate for audio. pub const DEFAULT_BITRATE: Bitrate = Bitrate::BitsPerSecond(128_000); @@ -70,6 +70,6 @@ pub const SILENT_FRAME: [u8; 3] = [0xf8, 0xff, 0xfe]; /// The one (and only) RTP version. pub const RTP_VERSION: u8 = 2; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] /// Profile type used by Discord's Opus audio traffic. pub const RTP_PROFILE_TYPE: RtpType = RtpType::Dynamic(120); diff --git a/src/driver/connection/mod.rs b/src/driver/connection/mod.rs index 3563559..3f33a16 100644 --- a/src/driver/connection/mod.rs +++ b/src/driver/connection/mod.rs @@ -19,15 +19,18 @@ use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPa use error::{Error, Result}; use flume::Sender; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::net::UdpSocket; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::{net::UdpSocket, spawn}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{net::UdpSocket, spawn}; use tracing::{debug, info, instrument}; use url::Url; use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher}; -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] use ws::create_rustls_client; -#[cfg(feature = "native")] +#[cfg(feature = "native-marker")] use ws::create_native_tls_client; pub(crate) struct Connection { @@ -43,10 +46,10 @@ impl Connection { ) -> Result { let url = generate_url(&mut info.endpoint)?; - #[cfg(all(feature = "rustls", not(feature = "native")))] + #[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] let mut client = create_rustls_client(url).await?; - #[cfg(feature = "native")] + #[cfg(feature = "native-marker")] let mut client = create_native_tls_client(url).await?; let mut hello = None; @@ -97,7 +100,11 @@ impl Connection { return Err(Error::CryptoModeUnavailable); } + #[cfg(not(feature = "tokio-02-marker"))] let udp = UdpSocket::bind("0.0.0.0:0").await?; + #[cfg(feature = "tokio-02-marker")] + let mut udp = UdpSocket::bind("0.0.0.0:0").await?; + udp.connect((ready.ip, ready.port)).await?; // Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed. @@ -124,7 +131,7 @@ impl Connection { } // We could do something clever like binary search, - // but possibility of UDP spoofing preclueds us from + // but possibility of UDP spoofing precludes us from // making the assumption we can find a "left edge" of '\0's. let nul_byte_index = view .get_address_raw() @@ -162,8 +169,14 @@ impl Connection { let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded(); let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded(); - let udp_rx = Arc::new(udp); - let udp_tx = Arc::clone(&udp_rx); + #[cfg(not(feature = "tokio-02-marker"))] + let (udp_rx, udp_tx) = { + let udp_rx = Arc::new(udp); + let udp_tx = Arc::clone(&udp_rx); + (udp_rx, udp_tx) + }; + #[cfg(feature = "tokio-02-marker")] + let (udp_rx, udp_tx) = udp.split(); let ssrc = ready.ssrc; @@ -182,7 +195,7 @@ impl Connection { .mixer .send(MixerMessage::SetConn(mix_conn, ready.ssrc))?; - tokio::spawn(ws_task::runner( + spawn(ws_task::runner( interconnect.clone(), ws_msg_rx, client, @@ -190,14 +203,14 @@ impl Connection { hello.heartbeat_interval, )); - tokio::spawn(udp_rx::runner( + spawn(udp_rx::runner( interconnect.clone(), udp_receiver_msg_rx, cipher, config.clone(), udp_rx, )); - tokio::spawn(udp_tx::runner(udp_sender_msg_rx, ssrc, udp_tx)); + spawn(udp_tx::runner(udp_sender_msg_rx, ssrc, udp_tx)); Ok(Connection { info, @@ -212,10 +225,10 @@ impl Connection { // Thread may have died, we want to send to prompt a clean exit // (if at all possible) and then proceed as normal. - #[cfg(all(feature = "rustls", not(feature = "native")))] + #[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] let mut client = create_rustls_client(url).await?; - #[cfg(feature = "native")] + #[cfg(feature = "native-marker")] let mut client = create_native_tls_client(url).await?; client diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index 6c20f90..9129db5 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -11,6 +11,10 @@ mod ws; pub use self::{core::*, disposal::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; use flume::Sender; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::spawn; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::spawn; use tracing::info; #[derive(Clone, Debug)] @@ -38,7 +42,7 @@ impl Interconnect { self.events = evt_tx; let ic = self.clone(); - tokio::spawn(async move { + spawn(async move { info!("Event processor restarted."); super::events::runner(ic, evt_rx).await; info!("Event processor finished."); diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index e6af8d6..77e11fb 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -18,7 +18,10 @@ use flume::{Receiver, Sender, TryRecvError}; use rand::random; use spin_sleep::SpinSleeper; use std::time::Instant; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::runtime::Handle; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::runtime::Handle; use tracing::{error, instrument}; use xsalsa20poly1305::TAG_SIZE; diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index f303231..183ff6a 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -16,11 +16,14 @@ use super::{ use crate::events::CoreContext; use flume::{Receiver, RecvError, Sender}; use message::*; -use tokio::runtime::Handle; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::{runtime::Handle, spawn}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{runtime::Handle, spawn}; use tracing::{error, info, instrument}; pub(crate) fn start(config: Config, rx: Receiver, tx: Sender) { - tokio::spawn(async move { + spawn(async move { info!("Driver started."); runner(config, rx, tx).await; info!("Driver finished."); @@ -38,7 +41,7 @@ fn start_internals(core: Sender, config: Config) -> Interconnect { }; let ic = interconnect.clone(); - tokio::spawn(async move { + spawn(async move { info!("Event processor started."); events::runner(ic, evt_rx).await; info!("Event processor finished."); diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index 2ead054..48067be 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -21,7 +21,10 @@ use discortp::{ }; use flume::Receiver; use std::{collections::HashMap, sync::Arc}; -use tokio::net::UdpSocket; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::{net::UdpSocket, select}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{net::udp::RecvHalf, select}; use tracing::{error, info, instrument, warn}; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; @@ -236,14 +239,18 @@ struct UdpRx { config: Config, packet_buffer: [u8; VOICE_PACKET_MAX], rx: Receiver, + + #[cfg(not(feature = "tokio-02-marker"))] udp_socket: Arc, + #[cfg(feature = "tokio-02-marker")] + udp_socket: RecvHalf, } impl UdpRx { #[instrument(skip(self))] async fn run(&mut self, interconnect: &mut Interconnect) { loop { - tokio::select! { + select! { Ok((len, _addr)) = self.udp_socket.recv_from(&mut self.packet_buffer[..]) => { self.process_udp_message(interconnect, len); } @@ -385,6 +392,7 @@ impl UdpRx { } } +#[cfg(not(feature = "tokio-02-marker"))] #[instrument(skip(interconnect, rx, cipher))] pub(crate) async fn runner( mut interconnect: Interconnect, @@ -409,6 +417,31 @@ pub(crate) async fn runner( info!("UDP receive handle stopped."); } +#[cfg(feature = "tokio-02-marker")] +#[instrument(skip(interconnect, rx, cipher))] +pub(crate) async fn runner( + mut interconnect: Interconnect, + rx: Receiver, + cipher: Cipher, + config: Config, + udp_socket: RecvHalf, +) { + info!("UDP receive handle started."); + + let mut state = UdpRx { + cipher, + decoder_map: Default::default(), + config, + packet_buffer: [0u8; VOICE_PACKET_MAX], + rx, + udp_socket, + }; + + state.run(&mut interconnect).await; + + info!("UDP receive handle stopped."); +} + #[inline] fn rtp_valid(packet: RtpPacket<'_>) -> bool { packet.get_version() == RTP_VERSION && packet.get_payload_type() == RTP_PROFILE_TYPE diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs index 4fd49f2..f6343a9 100644 --- a/src/driver/tasks/udp_tx.rs +++ b/src/driver/tasks/udp_tx.rs @@ -3,48 +3,93 @@ use crate::constants::*; use discortp::discord::MutableKeepalivePacket; use flume::Receiver; use std::sync::Arc; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::{ net::UdpSocket, time::{timeout_at, Instant}, }; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{ + net::udp::SendHalf, + time::{timeout_at, Instant}, +}; use tracing::{error, info, instrument, trace}; +struct UdpTx { + ssrc: u32, + rx: Receiver, + + #[cfg(not(feature = "tokio-02-marker"))] + udp_tx: Arc, + #[cfg(feature = "tokio-02-marker")] + udp_tx: SendHalf, +} + +impl UdpTx { + async fn run(&mut self) { + let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()]; + let mut ka = MutableKeepalivePacket::new(&mut keepalive_bytes[..]) + .expect("FATAL: Insufficient bytes given to keepalive packet."); + ka.set_ssrc(self.ssrc); + + let mut ka_time = Instant::now() + UDP_KEEPALIVE_GAP; + + loop { + use UdpTxMessage::*; + match timeout_at(ka_time, self.rx.recv_async()).await { + Err(_) => { + trace!("Sending UDP Keepalive."); + if let Err(e) = self.udp_tx.send(&keepalive_bytes[..]).await { + error!("Fatal UDP keepalive send error: {:?}.", e); + break; + } + ka_time += UDP_KEEPALIVE_GAP; + }, + Ok(Ok(Packet(p))) => + if let Err(e) = self.udp_tx.send(&p[..]).await { + error!("Fatal UDP packet send error: {:?}.", e); + break; + }, + Ok(Err(e)) => { + error!("Fatal UDP packet receive error: {:?}.", e); + break; + }, + Ok(Ok(Poison)) => { + break; + }, + } + } + } +} + +#[cfg(not(feature = "tokio-02-marker"))] #[instrument(skip(udp_msg_rx))] pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, udp_tx: Arc) { info!("UDP transmit handle started."); - let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()]; - let mut ka = MutableKeepalivePacket::new(&mut keepalive_bytes[..]) - .expect("FATAL: Insufficient bytes given to keepalive packet."); - ka.set_ssrc(ssrc); + let mut txer = UdpTx { + ssrc, + rx: udp_msg_rx, + udp_tx, + }; - let mut ka_time = Instant::now() + UDP_KEEPALIVE_GAP; - - loop { - use UdpTxMessage::*; - match timeout_at(ka_time, udp_msg_rx.recv_async()).await { - Err(_) => { - trace!("Sending UDP Keepalive."); - if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await { - error!("Fatal UDP keepalive send error: {:?}.", e); - break; - } - ka_time += UDP_KEEPALIVE_GAP; - }, - Ok(Ok(Packet(p))) => - if let Err(e) = udp_tx.send(&p[..]).await { - error!("Fatal UDP packet send error: {:?}.", e); - break; - }, - Ok(Err(e)) => { - error!("Fatal UDP packet receive error: {:?}.", e); - break; - }, - Ok(Ok(Poison)) => { - break; - }, - } - } + txer.run().await; + + info!("UDP transmit handle stopped."); +} + +#[cfg(feature = "tokio-02-marker")] +#[instrument(skip(udp_msg_rx))] +pub(crate) async fn runner(udp_msg_rx: Receiver, ssrc: u32, udp_tx: SendHalf) { + info!("UDP transmit handle started."); + + let mut txer = UdpTx { + ssrc, + rx: udp_msg_rx, + udp_tx, + }; + + txer.run().await; info!("UDP transmit handle stopped."); } diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index 7d8de5f..0eb26d9 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -10,11 +10,23 @@ use crate::{ }, ws::{Error as WsError, ReceiverExt, SenderExt, WsStream}, }; +#[cfg(not(feature = "tokio-02-marker"))] use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; +#[cfg(feature = "tokio-02-marker")] +use async_tungstenite_compat::tungstenite::protocol::frame::coding::CloseCode; use flume::Receiver; use rand::random; use std::time::Duration; -use tokio::time::{self, Instant}; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::{ + select, + time::{sleep_until, Instant}, +}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{ + select, + time::{delay_until as sleep_until, Instant}, +}; use tracing::{error, info, instrument, trace, warn}; struct AuxNetwork { @@ -57,9 +69,9 @@ impl AuxNetwork { let mut ws_error = false; let mut should_reconnect = false; - let hb = time::sleep_until(next_heartbeat); + let hb = sleep_until(next_heartbeat); - tokio::select! { + select! { _ = hb => { ws_error = match self.send_heartbeat().await { Err(e) => { diff --git a/src/error.rs b/src/error.rs index c8d77f0..7fbc86b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,12 +4,12 @@ use futures::channel::mpsc::TrySendError; #[cfg(feature = "serenity")] use serenity::gateway::InterMessage; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] use std::{error::Error, fmt}; #[cfg(feature = "twilight")] use twilight_gateway::shard::CommandError; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] #[derive(Debug)] /// Error returned when a manager or call handler is /// unable to send messages over Discord's gateway. @@ -23,7 +23,7 @@ pub enum JoinError { /// /// [`Call`]: crate::Call NoCall, - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// The driver failed to establish a voice connection. Driver(ConnectionError), #[cfg(feature = "serenity")] @@ -34,7 +34,7 @@ pub enum JoinError { Twilight(CommandError), } -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] impl fmt::Display for JoinError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Failed to Join Voice channel: ")?; @@ -42,7 +42,7 @@ impl fmt::Display for JoinError { JoinError::Dropped => write!(f, "request was cancelled/dropped."), JoinError::NoSender => write!(f, "no gateway destination."), JoinError::NoCall => write!(f, "tried to leave a non-existent call."), - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] JoinError::Driver(t) => write!(f, "internal driver error {}.", t), #[cfg(feature = "serenity")] JoinError::Serenity(t) => write!(f, "serenity failure {}.", t), @@ -52,35 +52,35 @@ impl fmt::Display for JoinError { } } -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] impl Error for JoinError {} -#[cfg(all(feature = "serenity", feature = "gateway"))] +#[cfg(all(feature = "serenity", feature = "gateway-core"))] impl From> for JoinError { fn from(e: TrySendError) -> Self { JoinError::Serenity(e) } } -#[cfg(all(feature = "twilight", feature = "gateway"))] +#[cfg(all(feature = "twilight", feature = "gateway-core"))] impl From for JoinError { fn from(e: CommandError) -> Self { JoinError::Twilight(e) } } -#[cfg(all(feature = "driver", feature = "gateway"))] +#[cfg(all(feature = "driver-core", feature = "gateway-core"))] impl From for JoinError { fn from(e: ConnectionError) -> Self { JoinError::Driver(e) } } -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] /// Convenience type for Discord gateway error handling. pub type JoinResult = Result; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use crate::{ driver::connection::error::{Error as ConnectionError, Result as ConnectionResult}, tracks::{TrackError, TrackResult}, diff --git a/src/handler.rs b/src/handler.rs index e150272..f1159c3 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use crate::{ driver::{Config, Driver}, error::ConnectionResult, @@ -13,13 +13,13 @@ use flume::{r#async::RecvFut, Sender}; use serde_json::json; use tracing::instrument; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use std::ops::{Deref, DerefMut}; #[derive(Clone, Debug)] enum Return { Info(Sender), - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] Conn(Sender>), } @@ -34,7 +34,7 @@ enum Return { pub struct Call { connection: Option<(ChannelId, ConnectionProgress, Return)>, - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// The internal controller of the voice connection monitor thread. driver: Driver, @@ -64,7 +64,7 @@ impl Call { Self::new_raw(guild_id, Some(ws), user_id) } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// Creates a new Call, configuring the driver as specified. #[inline] #[instrument] @@ -91,7 +91,7 @@ impl Call { Self::new_raw(guild_id, None, user_id) } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// Creates a new standalone Call, configuring the driver as specified. #[inline] #[instrument] @@ -106,7 +106,7 @@ impl Call { fn new_raw(guild_id: GuildId, ws: Option, user_id: UserId) -> Self { Call { connection: None, - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] driver: Default::default(), guild_id, self_deaf: false, @@ -116,7 +116,7 @@ impl Call { } } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] fn new_raw_cfg(guild_id: GuildId, ws: Option, user_id: UserId, config: Config) -> Self { Call { connection: None, @@ -136,7 +136,7 @@ impl Call { // It's okay if the receiver hung up. let _ = tx.send(c.clone()); }, - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] Some((_, ConnectionProgress::Complete(c), Return::Conn(tx))) => { self.driver.raw_connect(c.clone(), tx.clone()); }, @@ -171,7 +171,7 @@ impl Call { self.self_deaf } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// Connect or switch to the given voice channel by its Id. /// /// This function acts as a future in two stages: @@ -245,7 +245,7 @@ impl Call { // Only send an update if we were in a voice channel. self.connection = None; - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] self.driver.leave(); self.update().await @@ -264,7 +264,7 @@ impl Call { pub async fn mute(&mut self, mute: bool) -> JoinResult<()> { self.self_mute = mute; - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] self.driver.mute(mute); self.update().await @@ -339,7 +339,7 @@ impl Call { } } -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] impl Deref for Call { type Target = Driver; @@ -348,7 +348,7 @@ impl Deref for Call { } } -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] impl DerefMut for Call { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.driver diff --git a/src/id.rs b/src/id.rs index f28e108..b823437 100644 --- a/src/id.rs +++ b/src/id.rs @@ -1,6 +1,6 @@ //! Newtypes around Discord IDs for library cross-compatibility. -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use crate::model::id::{GuildId as DriverGuild, UserId as DriverUser}; #[cfg(feature = "serenity")] use serenity::model::id::{ @@ -73,7 +73,7 @@ impl From for GuildId { } } -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] impl From for DriverGuild { fn from(id: GuildId) -> Self { Self(id.0) @@ -106,7 +106,7 @@ impl From for UserId { } } -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] impl From for DriverUser { fn from(id: UserId) -> Self { Self(id.0) diff --git a/src/input/child.rs b/src/input/child.rs index 565f4f5..d63fa4f 100644 --- a/src/input/child.rs +++ b/src/input/child.rs @@ -4,7 +4,10 @@ use std::{ mem, process::Child, }; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::runtime::Handle; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::runtime::Handle; use tracing::debug; /// Handle for a child process which ensures that any subprocesses are properly closed diff --git a/src/input/dca.rs b/src/input/dca.rs index ea46331..87e6ea6 100644 --- a/src/input/dca.rs +++ b/src/input/dca.rs @@ -1,7 +1,10 @@ use super::{codec::OpusDecoderState, error::DcaError, Codec, Container, Input, Metadata, Reader}; use serde::Deserialize; use std::{ffi::OsStr, io::BufReader, mem}; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::{fs::File as TokioFile, io::AsyncReadExt}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{fs::File as TokioFile, io::AsyncReadExt}; /// Creates a streamed audio source from a DCA file. /// Currently only accepts the [DCA1 format](https://github.com/bwmarrin/dca). diff --git a/src/input/ffmpeg_src.rs b/src/input/ffmpeg_src.rs index a8e73ca..1ea14f1 100644 --- a/src/input/ffmpeg_src.rs +++ b/src/input/ffmpeg_src.rs @@ -11,7 +11,10 @@ use std::{ ffi::OsStr, process::{Command, Stdio}, }; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::process::Command as TokioCommand; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::process::Command as TokioCommand; use tracing::debug; /// Opens an audio file through `ffmpeg` and creates an audio source. diff --git a/src/input/mod.rs b/src/input/mod.rs index 0ba6396..d3c35a8 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -58,7 +58,10 @@ use audiopus::coder::GenericCtl; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use cached::OpusCompressor; use error::{Error, Result}; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::runtime::Handle; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::runtime::Handle; use std::{ convert::TryFrom, diff --git a/src/input/ytdl_src.rs b/src/input/ytdl_src.rs index 4464a89..2aa3764 100644 --- a/src/input/ytdl_src.rs +++ b/src/input/ytdl_src.rs @@ -11,7 +11,10 @@ use std::{ io::{BufRead, BufReader, Read}, process::{Command, Stdio}, }; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::{process::Command as TokioCommand, task}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::{process::Command as TokioCommand, task}; use tracing::trace; const YOUTUBE_DL_COMMAND: &str = if cfg!(feature = "youtube-dlc") { diff --git a/src/lib.rs b/src/lib.rs index 177472d..91fb875 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,41 +38,41 @@ //! [lavalink]: https://github.com/Frederikam/Lavalink pub mod constants; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub mod driver; pub mod error; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub mod events; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] mod handler; pub mod id; pub(crate) mod info; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub mod input; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] mod manager; #[cfg(feature = "serenity")] pub mod serenity; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] pub mod shards; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub mod tracks; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] mod ws; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use audiopus::{self as opus, Bitrate}; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use discortp as packet; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use serenity_voice_model as model; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use typemap_rev as typemap; #[cfg(test)] use utils as test_utils; -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] pub use crate::{ driver::Driver, events::{CoreEvent, Event, EventContext, EventHandler, TrackEvent}, @@ -80,7 +80,7 @@ pub use crate::{ tracks::create_player, }; -#[cfg(feature = "gateway")] +#[cfg(feature = "gateway-core")] pub use crate::{handler::*, manager::*}; #[cfg(feature = "serenity")] diff --git a/src/manager.rs b/src/manager.rs index 2df38c8..7e091f7 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,4 +1,4 @@ -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] use crate::driver::Config; use crate::{ error::{JoinError, JoinResult}, @@ -23,7 +23,10 @@ use serenity::{ }, }; use std::sync::Arc; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::sync::Mutex; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::sync::Mutex; #[cfg(feature = "twilight")] use twilight_gateway::Cluster; #[cfg(feature = "twilight")] @@ -48,7 +51,7 @@ pub struct Songbird { calls: DashMap>>, sharder: Sharder, - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] driver_config: PRwLock>, } @@ -65,7 +68,7 @@ impl Songbird { calls: Default::default(), sharder: Sharder::Serenity(Default::default()), - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] driver_config: Default::default(), }) } @@ -91,7 +94,7 @@ impl Songbird { calls: Default::default(), sharder: Sharder::Twilight(cluster), - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] driver_config: Default::default(), }) } @@ -141,7 +144,7 @@ impl Songbird { .get_shard(shard) .expect("Failed to get shard handle: shard_count incorrect?"); - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] let call = Call::from_driver_config( guild_id, shard_handle, @@ -149,7 +152,7 @@ impl Songbird { self.driver_config.read().clone().unwrap_or_default(), ); - #[cfg(not(feature = "driver"))] + #[cfg(not(feature = "driver-core"))] let call = Call::new(guild_id, shard_handle, info.user_id); Arc::new(Mutex::new(call)) @@ -164,7 +167,7 @@ impl Songbird { *client_data } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] /// Connects to a target by retrieving its relevant [`Call`] and /// connecting, or creating the handler if required. /// @@ -196,7 +199,7 @@ impl Songbird { self._join(guild_id.into(), channel_id.into()).await } - #[cfg(feature = "driver")] + #[cfg(feature = "driver-core")] async fn _join( &self, guild_id: GuildId, @@ -388,7 +391,7 @@ impl VoiceGatewayManager for Songbird { } } -#[cfg(feature = "driver")] +#[cfg(feature = "driver-core")] impl Songbird { /// Sets a shared configuration for all drivers created from this /// manager. diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index 8b1a999..89ea214 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -5,7 +5,10 @@ use crate::{ }; use flume::Sender; use std::{fmt, sync::Arc, time::Duration}; +#[cfg(not(feature = "tokio-02-marker"))] use tokio::sync::RwLock; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::sync::RwLock; use typemap_rev::TypeMap; use uuid::Uuid; diff --git a/src/ws.rs b/src/ws.rs index f0100e2..43c8194 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -6,14 +6,26 @@ use crate::model::Event; use async_trait::async_trait; +#[cfg(not(feature = "tokio-02-marker"))] use async_tungstenite::{ + self as tungstenite, + tokio::ConnectStream, + tungstenite::{error::Error as TungsteniteError, protocol::CloseFrame, Message}, + WebSocketStream, +}; +#[cfg(feature = "tokio-02-marker")] +use async_tungstenite_compat::{ + self as tungstenite, tokio::ConnectStream, tungstenite::{error::Error as TungsteniteError, protocol::CloseFrame, Message}, WebSocketStream, }; use futures::{SinkExt, StreamExt, TryStreamExt}; use serde_json::Error as JsonError; -use tokio::time::timeout; +#[cfg(not(feature = "tokio-02-marker"))] +use tokio::time::{timeout, Duration}; +#[cfg(feature = "tokio-02-marker")] +use tokio_compat::time::{timeout, Duration}; use tracing::{instrument, warn}; pub type WsStream = WebSocketStream; @@ -23,7 +35,7 @@ pub type Result = std::result::Result; #[derive(Debug)] pub enum Error { Json(JsonError), - #[cfg(all(feature = "rustls", not(feature = "native")))] + #[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] Tls(RustlsError), /// The discord voice gateway does not support or offer zlib compression. @@ -41,7 +53,7 @@ impl From for Error { } } -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] impl From for Error { fn from(e: RustlsError) -> Error { Error::Tls(e) @@ -55,7 +67,7 @@ impl From for Error { } use futures::stream::SplitSink; -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] use std::{ error::Error as StdError, fmt::{Display, Formatter, Result as FmtResult}, @@ -77,7 +89,7 @@ pub trait SenderExt { #[async_trait] impl ReceiverExt for WsStream { async fn recv_json(&mut self) -> Result> { - const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_millis(500); + const TIMEOUT: Duration = Duration::from_millis(500); let ws_message = match timeout(TIMEOUT, self.next()).await { Ok(Some(Ok(v))) => Some(v), @@ -138,7 +150,7 @@ pub(crate) fn convert_ws_message(message: Option) -> Result for RustlsError { fn from(e: IoError) -> Self { RustlsError::Io(e) } } -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] impl Display for RustlsError { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { match self { @@ -164,7 +176,7 @@ impl Display for RustlsError { } } -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] impl StdError for RustlsError { fn source(&self) -> Option<&(dyn StdError + 'static)> { match self { @@ -174,12 +186,12 @@ impl StdError for RustlsError { } } -#[cfg(all(feature = "rustls", not(feature = "native")))] +#[cfg(all(feature = "rustls-marker", not(feature = "native-marker")))] #[instrument] pub(crate) async fn create_rustls_client(url: Url) -> Result { - let (stream, _) = async_tungstenite::tokio::connect_async_with_config::( + let (stream, _) = tungstenite::tokio::connect_async_with_config::( url, - Some(async_tungstenite::tungstenite::protocol::WebSocketConfig { + Some(tungstenite::tungstenite::protocol::WebSocketConfig { max_message_size: None, max_frame_size: None, max_send_queue: None, @@ -191,12 +203,12 @@ pub(crate) async fn create_rustls_client(url: Url) -> Result { Ok(stream) } -#[cfg(feature = "native")] +#[cfg(feature = "native-marker")] #[instrument] pub(crate) async fn create_native_tls_client(url: Url) -> Result { - let (stream, _) = async_tungstenite::tokio::connect_async_with_config::( + let (stream, _) = tungstenite::tokio::connect_async_with_config::( url, - Some(async_tungstenite::tungstenite::protocol::WebSocketConfig { + Some(tungstenite::tungstenite::protocol::WebSocketConfig { max_message_size: None, max_frame_size: None, max_send_queue: None,