Songbird: Tokio 1.0 (#36)
Migrates to the new version of tokio, requiring channel and sleep changes in a few locations. Additionally points to the in-tree v0.3 version of twilight.
This commit is contained in:
17
Cargo.toml
17
Cargo.toml
@@ -26,7 +26,7 @@ version = "0.1"
|
|||||||
default-features = false
|
default-features = false
|
||||||
features = ["tokio-runtime"]
|
features = ["tokio-runtime"]
|
||||||
optional = true
|
optional = true
|
||||||
version = "0.9"
|
version = "0.11"
|
||||||
|
|
||||||
[dependencies.audiopus]
|
[dependencies.audiopus]
|
||||||
optional = true
|
optional = true
|
||||||
@@ -58,6 +58,7 @@ version = "0.8"
|
|||||||
|
|
||||||
[dependencies.serenity]
|
[dependencies.serenity]
|
||||||
optional = true
|
optional = true
|
||||||
|
#version = "0.10"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["voice", "gateway"]
|
features = ["voice", "gateway"]
|
||||||
git = "https://github.com/serenity-rs/serenity"
|
git = "https://github.com/serenity-rs/serenity"
|
||||||
@@ -65,6 +66,7 @@ branch = "current"
|
|||||||
|
|
||||||
[dependencies.serenity-voice-model]
|
[dependencies.serenity-voice-model]
|
||||||
optional = true
|
optional = true
|
||||||
|
#version = "0.10"
|
||||||
git = "https://github.com/serenity-rs/serenity"
|
git = "https://github.com/serenity-rs/serenity"
|
||||||
branch = "current"
|
branch = "current"
|
||||||
|
|
||||||
@@ -78,18 +80,22 @@ version = "0.1"
|
|||||||
|
|
||||||
[dependencies.tokio]
|
[dependencies.tokio]
|
||||||
optional = true
|
optional = true
|
||||||
version = "0.2"
|
version = "1.0"
|
||||||
default-features = false
|
default-features = false
|
||||||
|
|
||||||
[dependencies.twilight-gateway]
|
[dependencies.twilight-gateway]
|
||||||
optional = true
|
optional = true
|
||||||
version = "0.2"
|
#version = "0.3"
|
||||||
default-features = false
|
default-features = false
|
||||||
|
git = "https://github.com/twilight-rs/twilight"
|
||||||
|
branch = "v0.3"
|
||||||
|
|
||||||
[dependencies.twilight-model]
|
[dependencies.twilight-model]
|
||||||
optional = true
|
optional = true
|
||||||
version = "0.2"
|
#version = "0.3"
|
||||||
default-features = false
|
default-features = false
|
||||||
|
git = "https://github.com/twilight-rs/twilight"
|
||||||
|
branch = "v0.3"
|
||||||
|
|
||||||
[dependencies.typemap_rev]
|
[dependencies.typemap_rev]
|
||||||
optional = true
|
optional = true
|
||||||
@@ -135,13 +141,12 @@ driver = [
|
|||||||
"serenity-voice-model",
|
"serenity-voice-model",
|
||||||
"spin_sleep",
|
"spin_sleep",
|
||||||
"streamcatcher",
|
"streamcatcher",
|
||||||
"tokio/blocking",
|
|
||||||
"tokio/fs",
|
"tokio/fs",
|
||||||
"tokio/io-util",
|
"tokio/io-util",
|
||||||
"tokio/macros",
|
"tokio/macros",
|
||||||
"tokio/net",
|
"tokio/net",
|
||||||
"tokio/process",
|
"tokio/process",
|
||||||
"tokio/rt-core",
|
"tokio/rt",
|
||||||
"tokio/sync",
|
"tokio/sync",
|
||||||
"tokio/time",
|
"tokio/time",
|
||||||
"typemap_rev",
|
"typemap_rev",
|
||||||
|
|||||||
@@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
|
|||||||
branch = "current"
|
branch = "current"
|
||||||
|
|
||||||
[dependencies.tokio]
|
[dependencies.tokio]
|
||||||
version = "0.2"
|
version = "1.0"
|
||||||
features = ["macros"]
|
features = ["macros", "rt-multi-thread"]
|
||||||
|
|||||||
@@ -19,5 +19,5 @@ git = "https://github.com/serenity-rs/serenity"
|
|||||||
branch = "current"
|
branch = "current"
|
||||||
|
|
||||||
[dependencies.tokio]
|
[dependencies.tokio]
|
||||||
version = "0.2"
|
version = "1.0"
|
||||||
features = ["macros"]
|
features = ["macros", "rt-multi-thread"]
|
||||||
|
|||||||
@@ -5,8 +5,9 @@ authors = ["my name <my@email.address>"]
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
env_logger = "~0.6"
|
tracing = "0.1"
|
||||||
log = "~0.4"
|
tracing-subscriber = "0.2"
|
||||||
|
tracing-futures = "0.2"
|
||||||
|
|
||||||
[dependencies.songbird]
|
[dependencies.songbird]
|
||||||
path = "../../../"
|
path = "../../../"
|
||||||
@@ -17,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
|
|||||||
branch = "current"
|
branch = "current"
|
||||||
|
|
||||||
[dependencies.tokio]
|
[dependencies.tokio]
|
||||||
version = "0.2"
|
version = "1.0"
|
||||||
features = ["macros"]
|
features = ["macros", "rt-multi-thread"]
|
||||||
|
|||||||
@@ -153,6 +153,8 @@ struct General;
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
// Configure the client with your Discord bot token in the environment.
|
// Configure the client with your Discord bot token in the environment.
|
||||||
let token = env::var("DISCORD_TOKEN")
|
let token = env::var("DISCORD_TOKEN")
|
||||||
.expect("Expected a token in the environment");
|
.expect("Expected a token in the environment");
|
||||||
|
|||||||
@@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
|
|||||||
branch = "current"
|
branch = "current"
|
||||||
|
|
||||||
[dependencies.tokio]
|
[dependencies.tokio]
|
||||||
version = "0.2"
|
version = "1.0"
|
||||||
features = ["macros"]
|
features = ["macros", "rt-multi-thread"]
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ futures = "0.3"
|
|||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.2"
|
tracing-subscriber = "0.2"
|
||||||
serde_json = { version = "1" }
|
serde_json = { version = "1" }
|
||||||
tokio = { features = ["macros", "rt-threaded", "sync"], version = "0.2" }
|
tokio = { features = ["macros", "rt-multi-thread", "sync"], version = "1" }
|
||||||
twilight-gateway = "0.2"
|
twilight-gateway = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
|
||||||
twilight-http = "0.2"
|
twilight-http = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
|
||||||
twilight-model = "0.2"
|
twilight-model = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
|
||||||
twilight-standby = "0.2"
|
twilight-standby = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
|
||||||
|
|
||||||
[dependencies.songbird]
|
[dependencies.songbird]
|
||||||
path = "../.."
|
path = "../.."
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use crate::{
|
|||||||
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
|
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
|
||||||
use error::{Error, Result};
|
use error::{Error, Result};
|
||||||
use flume::Sender;
|
use flume::Sender;
|
||||||
use std::{net::IpAddr, str::FromStr};
|
use std::{net::IpAddr, str::FromStr, sync::Arc};
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
use tracing::{debug, info, instrument};
|
use tracing::{debug, info, instrument};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
@@ -97,7 +97,7 @@ impl Connection {
|
|||||||
return Err(Error::CryptoModeUnavailable);
|
return Err(Error::CryptoModeUnavailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut udp = UdpSocket::bind("0.0.0.0:0").await?;
|
let udp = UdpSocket::bind("0.0.0.0:0").await?;
|
||||||
udp.connect((ready.ip, ready.port)).await?;
|
udp.connect((ready.ip, ready.port)).await?;
|
||||||
|
|
||||||
// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
|
// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
|
||||||
@@ -161,7 +161,9 @@ impl Connection {
|
|||||||
let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
|
let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
|
||||||
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
|
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
|
||||||
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();
|
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();
|
||||||
let (udp_rx, udp_tx) = udp.split();
|
|
||||||
|
let udp_rx = Arc::new(udp);
|
||||||
|
let udp_tx = Arc::clone(&udp_rx);
|
||||||
|
|
||||||
let ssrc = ready.ssrc;
|
let ssrc = ready.ssrc;
|
||||||
|
|
||||||
|
|||||||
@@ -20,8 +20,8 @@ use discortp::{
|
|||||||
PacketSize,
|
PacketSize,
|
||||||
};
|
};
|
||||||
use flume::Receiver;
|
use flume::Receiver;
|
||||||
use std::collections::HashMap;
|
use std::{collections::HashMap, sync::Arc};
|
||||||
use tokio::net::udp::RecvHalf;
|
use tokio::net::UdpSocket;
|
||||||
use tracing::{error, info, instrument, warn};
|
use tracing::{error, info, instrument, warn};
|
||||||
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;
|
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;
|
||||||
|
|
||||||
@@ -236,7 +236,7 @@ struct UdpRx {
|
|||||||
config: Config,
|
config: Config,
|
||||||
packet_buffer: [u8; VOICE_PACKET_MAX],
|
packet_buffer: [u8; VOICE_PACKET_MAX],
|
||||||
rx: Receiver<UdpRxMessage>,
|
rx: Receiver<UdpRxMessage>,
|
||||||
udp_socket: RecvHalf,
|
udp_socket: Arc<UdpSocket>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UdpRx {
|
impl UdpRx {
|
||||||
@@ -391,7 +391,7 @@ pub(crate) async fn runner(
|
|||||||
rx: Receiver<UdpRxMessage>,
|
rx: Receiver<UdpRxMessage>,
|
||||||
cipher: Cipher,
|
cipher: Cipher,
|
||||||
config: Config,
|
config: Config,
|
||||||
udp_socket: RecvHalf,
|
udp_socket: Arc<UdpSocket>,
|
||||||
) {
|
) {
|
||||||
info!("UDP receive handle started.");
|
info!("UDP receive handle started.");
|
||||||
|
|
||||||
|
|||||||
@@ -2,14 +2,15 @@ use super::message::*;
|
|||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
use discortp::discord::MutableKeepalivePacket;
|
use discortp::discord::MutableKeepalivePacket;
|
||||||
use flume::Receiver;
|
use flume::Receiver;
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::udp::SendHalf,
|
net::UdpSocket,
|
||||||
time::{timeout_at, Elapsed, Instant},
|
time::{timeout_at, Instant},
|
||||||
};
|
};
|
||||||
use tracing::{error, info, instrument, trace};
|
use tracing::{error, info, instrument, trace};
|
||||||
|
|
||||||
#[instrument(skip(udp_msg_rx))]
|
#[instrument(skip(udp_msg_rx))]
|
||||||
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut udp_tx: SendHalf) {
|
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, udp_tx: Arc<UdpSocket>) {
|
||||||
info!("UDP transmit handle started.");
|
info!("UDP transmit handle started.");
|
||||||
|
|
||||||
let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()];
|
let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()];
|
||||||
@@ -22,7 +23,7 @@ pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut ud
|
|||||||
loop {
|
loop {
|
||||||
use UdpTxMessage::*;
|
use UdpTxMessage::*;
|
||||||
match timeout_at(ka_time, udp_msg_rx.recv_async()).await {
|
match timeout_at(ka_time, udp_msg_rx.recv_async()).await {
|
||||||
Err(Elapsed { .. }) => {
|
Err(_) => {
|
||||||
trace!("Sending UDP Keepalive.");
|
trace!("Sending UDP Keepalive.");
|
||||||
if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await {
|
if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await {
|
||||||
error!("Fatal UDP keepalive send error: {:?}.", e);
|
error!("Fatal UDP keepalive send error: {:?}.", e);
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ impl AuxNetwork {
|
|||||||
let mut ws_error = false;
|
let mut ws_error = false;
|
||||||
let mut should_reconnect = false;
|
let mut should_reconnect = false;
|
||||||
|
|
||||||
let hb = time::delay_until(next_heartbeat);
|
let hb = time::sleep_until(next_heartbeat);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = hb => {
|
_ = hb => {
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::events::EventData;
|
use crate::events::EventData;
|
||||||
|
use flume::Sender;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::oneshot::Sender as OneshotSender;
|
|
||||||
|
|
||||||
/// A request from external code using a [`TrackHandle`] to modify
|
/// A request from external code using a [`TrackHandle`] to modify
|
||||||
/// or act upon an [`Track`] object.
|
/// or act upon an [`Track`] object.
|
||||||
@@ -27,7 +27,7 @@ pub enum TrackCommand {
|
|||||||
/// Run some closure on this track, with direct access to the core object.
|
/// Run some closure on this track, with direct access to the core object.
|
||||||
Do(Box<dyn FnOnce(&mut Track) + Send + Sync + 'static>),
|
Do(Box<dyn FnOnce(&mut Track) + Send + Sync + 'static>),
|
||||||
/// Request a read-only view of this track's state.
|
/// Request a read-only view of this track's state.
|
||||||
Request(OneshotSender<Box<TrackState>>),
|
Request(Sender<Box<TrackState>>),
|
||||||
/// Change the loop count/strategy of this track.
|
/// Change the loop count/strategy of this track.
|
||||||
Loop(LoopState),
|
Loop(LoopState),
|
||||||
/// Prompts a track's input to become live and usable, if it is not already.
|
/// Prompts a track's input to become live and usable, if it is not already.
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ use crate::{
|
|||||||
events::{Event, EventData, EventHandler},
|
events::{Event, EventData, EventHandler},
|
||||||
input::Metadata,
|
input::Metadata,
|
||||||
};
|
};
|
||||||
|
use flume::Sender;
|
||||||
use std::{fmt, sync::Arc, time::Duration};
|
use std::{fmt, sync::Arc, time::Duration};
|
||||||
use tokio::sync::{mpsc::UnboundedSender, oneshot, RwLock};
|
use tokio::sync::RwLock;
|
||||||
use typemap_rev::TypeMap;
|
use typemap_rev::TypeMap;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@@ -25,7 +26,7 @@ pub struct TrackHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct InnerHandle {
|
struct InnerHandle {
|
||||||
command_channel: UnboundedSender<TrackCommand>,
|
command_channel: Sender<TrackCommand>,
|
||||||
seekable: bool,
|
seekable: bool,
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
metadata: Box<Metadata>,
|
metadata: Box<Metadata>,
|
||||||
@@ -50,7 +51,7 @@ impl TrackHandle {
|
|||||||
///
|
///
|
||||||
/// [`Input`]: crate::input::Input
|
/// [`Input`]: crate::input::Input
|
||||||
pub fn new(
|
pub fn new(
|
||||||
command_channel: UnboundedSender<TrackCommand>,
|
command_channel: Sender<TrackCommand>,
|
||||||
seekable: bool,
|
seekable: bool,
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
metadata: Box<Metadata>,
|
metadata: Box<Metadata>,
|
||||||
@@ -159,10 +160,10 @@ impl TrackHandle {
|
|||||||
|
|
||||||
/// Request playback information and state from the audio context.
|
/// Request playback information and state from the audio context.
|
||||||
pub async fn get_info(&self) -> TrackResult<Box<TrackState>> {
|
pub async fn get_info(&self) -> TrackResult<Box<TrackState>> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = flume::bounded(1);
|
||||||
self.send(TrackCommand::Request(tx))?;
|
self.send(TrackCommand::Request(tx))?;
|
||||||
|
|
||||||
rx.await.map_err(|_| TrackError::Finished)
|
rx.recv_async().await.map_err(|_| TrackError::Finished)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set an audio track to loop indefinitely.
|
/// Set an audio track to loop indefinitely.
|
||||||
|
|||||||
@@ -25,8 +25,8 @@ mod state;
|
|||||||
pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*};
|
pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*};
|
||||||
|
|
||||||
use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input};
|
use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input};
|
||||||
|
use flume::{Receiver, TryRecvError};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc::{self, error::TryRecvError, UnboundedReceiver};
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Control object for audio playback.
|
/// Control object for audio playback.
|
||||||
@@ -102,7 +102,7 @@ pub struct Track {
|
|||||||
/// Track commands are sent in this manner to ensure that access
|
/// Track commands are sent in this manner to ensure that access
|
||||||
/// occurs in a thread-safe manner, without allowing any external
|
/// occurs in a thread-safe manner, without allowing any external
|
||||||
/// code to lock access to audio objects and block packet generation.
|
/// code to lock access to audio objects and block packet generation.
|
||||||
pub(crate) commands: UnboundedReceiver<TrackCommand>,
|
pub(crate) commands: Receiver<TrackCommand>,
|
||||||
|
|
||||||
/// Handle for safe control of this audio track from other threads.
|
/// Handle for safe control of this audio track from other threads.
|
||||||
///
|
///
|
||||||
@@ -124,11 +124,7 @@ impl Track {
|
|||||||
/// In general, you should probably use [`create_player`].
|
/// In general, you should probably use [`create_player`].
|
||||||
///
|
///
|
||||||
/// [`create_player`]: fn.create_player.html
|
/// [`create_player`]: fn.create_player.html
|
||||||
pub fn new_raw(
|
pub fn new_raw(source: Input, commands: Receiver<TrackCommand>, handle: TrackHandle) -> Self {
|
||||||
source: Input,
|
|
||||||
commands: UnboundedReceiver<TrackCommand>,
|
|
||||||
handle: TrackHandle,
|
|
||||||
) -> Self {
|
|
||||||
let uuid = handle.uuid();
|
let uuid = handle.uuid();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@@ -310,7 +306,7 @@ impl Track {
|
|||||||
MakePlayable => self.make_playable(),
|
MakePlayable => self.make_playable(),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(TryRecvError::Closed) => {
|
Err(TryRecvError::Disconnected) => {
|
||||||
// this branch will never be visited.
|
// this branch will never be visited.
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
@@ -389,7 +385,7 @@ pub fn create_player(source: Input) -> (Track, TrackHandle) {
|
|||||||
/// [`Track`]: Track
|
/// [`Track`]: Track
|
||||||
/// [`TrackHandle`]: TrackHandle
|
/// [`TrackHandle`]: TrackHandle
|
||||||
pub fn create_player_with_uuid(source: Input, uuid: Uuid) -> (Track, TrackHandle) {
|
pub fn create_player_with_uuid(source: Input, uuid: Uuid) -> (Track, TrackHandle) {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = flume::unbounded();
|
||||||
let can_seek = source.is_seekable();
|
let can_seek = source.is_seekable();
|
||||||
let metadata = source.metadata.clone();
|
let metadata = source.metadata.clone();
|
||||||
let handle = TrackHandle::new(tx, can_seek, uuid, metadata);
|
let handle = TrackHandle::new(tx, can_seek, uuid, metadata);
|
||||||
|
|||||||
Reference in New Issue
Block a user