Driver: Split receive into its own feature (#141)

Adds the "receive" feature, which is disabled by default. When this is disabled, the UDP receive task is not compiled and not run, and as an optimisation the UDP receive buffer size is set to 0. All related events are also removed.

This also removes the UDP Tx task, and moves packet and keepalive sends back into the mixer thread. This allows us to entirely remove channels and various allocations between the mixer and an async task created only for sending data (i.e., fewer memcopies).

If "receive" is enabled, UDP sends are now non-blocking due to technical constraints -- failure to send is non-fatal, but *will* drop affected packets. Given that blocking on a UDP send indicates that the OS cannot clear send buffers fast enough, this should alleviate OS load.

Closes #131.
This commit is contained in:
Kyle Simpson
2022-08-01 15:54:20 +01:00
parent c1d93f790c
commit 2277595be4
27 changed files with 299 additions and 206 deletions

View File

@@ -1,7 +1,9 @@
pub mod error;
#[cfg(feature = "receive")]
use super::tasks::udp_rx;
use super::{
tasks::{message::*, udp_rx, udp_tx, ws as ws_task},
tasks::{message::*, ws as ws_task},
Config,
CryptoMode,
};
@@ -18,7 +20,8 @@ use crate::{
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
use error::{Error, Result};
use flume::Sender;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use socket2::Socket;
use std::{net::IpAddr, str::FromStr};
use tokio::{net::UdpSocket, spawn, time::timeout};
use tracing::{debug, info, instrument};
use url::Url;
@@ -103,6 +106,16 @@ impl Connection {
}
let udp = UdpSocket::bind("0.0.0.0:0").await?;
// Optimisation for non-receive case: set rx buffer size to zero.
let udp = if cfg!(feature = "receive") {
udp
} else {
let socket = Socket::from(udp.into_std()?);
socket.set_recv_buffer_size(0)?;
UdpSocket::from_std(socket.into())?
};
udp.connect((ready.ip, ready.port)).await?;
// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
@@ -164,22 +177,36 @@ impl Connection {
info!("WS heartbeat duration {}ms.", hello.heartbeat_interval,);
let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
#[cfg(feature = "receive")]
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();
// NOTE: This causes the UDP Socket on "receive" to be non-blocking,
// and the standard to be blocking. A UDP send should only WouldBlock if
// you're sending more data than the OS can handle (not likely, and
// at that point you should scale horizontally).
//
// If this is a problem for anyone, we can make non-blocking sends
// queue up a delayed send up to a limit.
#[cfg(feature = "receive")]
let (udp_rx, udp_tx) = {
let udp_rx = Arc::new(udp);
let udp_tx = Arc::clone(&udp_rx);
let udp_tx = udp.into_std()?;
let udp_rx = UdpSocket::from_std(udp_tx.try_clone()?)?;
(udp_rx, udp_tx)
};
#[cfg(not(feature = "receive"))]
let udp_tx = udp.into_std()?;
let ssrc = ready.ssrc;
let mix_conn = MixerConnection {
#[cfg(feature = "receive")]
cipher: cipher.clone(),
#[cfg(not(feature = "receive"))]
cipher,
crypto_state: config.crypto_mode.into(),
#[cfg(feature = "receive")]
udp_rx: udp_receiver_msg_tx,
udp_tx: udp_sender_msg_tx,
udp_tx,
};
interconnect
@@ -200,6 +227,7 @@ impl Connection {
info.clone(),
));
#[cfg(feature = "receive")]
spawn(udp_rx::runner(
interconnect.clone(),
udp_receiver_msg_rx,
@@ -207,7 +235,6 @@ impl Connection {
config.clone(),
udp_rx,
));
spawn(udp_tx::runner(udp_sender_msg_rx, ssrc, udp_tx));
Ok(Connection {
info,

View File

@@ -3,10 +3,11 @@ use byteorder::{NetworkEndian, WriteBytesExt};
use discortp::{rtp::RtpPacket, MutablePacket};
use rand::Rng;
use std::num::Wrapping;
#[cfg(any(feature = "receive", test))]
use xsalsa20poly1305::Tag;
use xsalsa20poly1305::{
aead::{AeadInPlace, Error as CryptoError},
Nonce,
Tag,
XSalsa20Poly1305 as Cipher,
NONCE_SIZE,
TAG_SIZE,
@@ -110,6 +111,7 @@ impl CryptoMode {
}
}
#[cfg(any(feature = "receive", test))]
/// Decrypts a Discord RT(C)P packet using the given key.
///
/// If successful, this returns the number of bytes to be ignored from the

View File

@@ -13,6 +13,7 @@ pub mod bench_internals;
pub(crate) mod connection;
mod crypto;
#[cfg(feature = "receive")]
mod decode_mode;
mod mix_mode;
pub mod retry;
@@ -23,6 +24,7 @@ pub(crate) mod test_config;
use connection::error::{Error, Result};
pub use crypto::CryptoMode;
pub(crate) use crypto::CryptoState;
#[cfg(feature = "receive")]
pub use decode_mode::DecodeMode;
pub use mix_mode::MixMode;
#[cfg(test)]

View File

@@ -2,7 +2,7 @@ use super::message::*;
use crate::ws::Error as WsError;
use audiopus::Error as OpusError;
use flume::SendError;
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use xsalsa20poly1305::aead::Error as CryptoError;
#[derive(Debug)]
@@ -10,8 +10,8 @@ pub enum Recipient {
AuxNetwork,
Event,
Mixer,
#[cfg(feature = "receive")]
UdpRx,
UdpTx,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -20,6 +20,7 @@ pub type Result<T> = std::result::Result<T, Error>;
#[non_exhaustive]
pub enum Error {
Crypto(CryptoError),
#[cfg(feature = "receive")]
/// Received an illegal voice packet on the voice UDP socket.
IllegalVoicePacket,
InterconnectFailure(Recipient),
@@ -30,15 +31,26 @@ pub enum Error {
impl Error {
pub(crate) fn should_trigger_connect(&self) -> bool {
matches!(
self,
Error::InterconnectFailure(Recipient::AuxNetwork | Recipient::UdpRx | Recipient::UdpTx)
)
match self {
Error::InterconnectFailure(Recipient::AuxNetwork) => true,
#[cfg(feature = "receive")]
Error::InterconnectFailure(Recipient::UdpRx) => true,
_ => false,
}
}
pub(crate) fn should_trigger_interconnect_rebuild(&self) -> bool {
matches!(self, Error::InterconnectFailure(Recipient::Event))
}
// This prevents a `WouldBlock` from triggering a full reconnect,
// instead simply dropping the packet.
pub(crate) fn disarm_would_block(self) -> Result<()> {
match self {
Self::Io(i) if i.kind() == IoErrorKind::WouldBlock => Ok(()),
e => Err(e),
}
}
}
impl From<CryptoError> for Error {
@@ -77,18 +89,13 @@ impl From<SendError<MixerMessage>> for Error {
}
}
#[cfg(feature = "receive")]
impl From<SendError<UdpRxMessage>> for Error {
fn from(_e: SendError<UdpRxMessage>) -> Error {
Error::InterconnectFailure(Recipient::UdpRx)
}
}
impl From<SendError<UdpTxMessage>> for Error {
fn from(_e: SendError<UdpTxMessage>) -> Error {
Error::InterconnectFailure(Recipient::UdpTx)
}
}
impl From<WsError> for Error {
fn from(e: WsError) -> Error {
Error::Ws(e)

View File

@@ -1,21 +1,24 @@
#![allow(missing_docs)]
use super::{Interconnect, TrackContext, UdpRxMessage, UdpTxMessage, WsMessage};
#[cfg(feature = "receive")]
use super::UdpRxMessage;
use super::{Interconnect, TrackContext, WsMessage};
use crate::{
driver::{Bitrate, Config, CryptoState},
input::{AudioStreamError, Compose, Parsed},
};
use flume::Sender;
use std::sync::Arc;
use std::{net::UdpSocket, sync::Arc};
use symphonia_core::{errors::Error as SymphoniaError, formats::SeekedTo};
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;
pub struct MixerConnection {
pub cipher: Cipher,
pub crypto_state: CryptoState,
#[cfg(feature = "receive")]
pub udp_rx: Sender<UdpRxMessage>,
pub udp_tx: Sender<UdpTxMessage>,
pub udp_tx: UdpSocket,
}
pub enum MixerMessage {

View File

@@ -4,11 +4,13 @@ mod core;
mod disposal;
mod events;
mod mixer;
#[cfg(feature = "receive")]
mod udp_rx;
mod udp_tx;
mod ws;
pub use self::{core::*, disposal::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*};
#[cfg(feature = "receive")]
pub use self::udp_rx::*;
pub use self::{core::*, disposal::*, events::*, mixer::*, ws::*};
use flume::Sender;
use tokio::spawn;

View File

@@ -1,4 +0,0 @@
#![allow(missing_docs)]
// TODO: do something cheaper.
pub type UdpTxMessage = Vec<u8>;

View File

@@ -10,7 +10,11 @@ use result::*;
use state::*;
pub use track::*;
use super::{disposal, error::Result, message::*};
use super::{
disposal,
error::{Error, Result},
message::*,
};
use crate::{
constants::*,
driver::MixMode,
@@ -26,6 +30,7 @@ use audiopus::{
Bitrate,
};
use discortp::{
discord::MutableKeepalivePacket,
rtp::{MutableRtpPacket, RtpPacket},
MutablePacket,
};
@@ -73,6 +78,9 @@ pub struct Mixer {
thread_pool: BlockyTaskPool,
pub ws: Option<Sender<WsMessage>>,
pub keepalive_deadline: Instant,
pub keepalive_packet: [u8; MutableKeepalivePacket::minimum_packet_size()],
pub tracks: Vec<InternalTrack>,
track_handles: Vec<TrackHandle>,
@@ -104,6 +112,7 @@ impl Mixer {
let soft_clip = SoftClip::new(config.mix_mode.to_opus());
let mut packet = [0u8; VOICE_PACKET_MAX];
let keepalive_packet = [0u8; MutableKeepalivePacket::minimum_packet_size()];
let mut rtp = MutableRtpPacket::new(&mut packet[..]).expect(
"FATAL: Too few bytes in self.packet for RTP header.\
@@ -146,12 +155,14 @@ impl Mixer {
SignalSpec::new_with_layout(SAMPLE_RATE_RAW as u32, Layout::Stereo),
);
let deadline = Instant::now();
Self {
bitrate,
config,
conn_active: None,
content_prep_sequence: 0,
deadline: Instant::now(),
deadline,
disposer,
encoder,
interconnect,
@@ -165,6 +176,9 @@ impl Mixer {
thread_pool,
ws: None,
keepalive_deadline: deadline,
keepalive_packet,
tracks,
track_handles,
@@ -213,7 +227,14 @@ impl Mixer {
// The above action may have invalidated the connection; need to re-check!
// Also, if we're in a test mode we should unconditionally run packet mixing code.
if self.conn_active.is_some() || ignore_check {
if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) {
if let Err(e) = self
.cycle()
.and_then(|_| self.audio_commands_events())
.and_then(|_| {
self.check_and_send_keepalive()
.or_else(Error::disarm_would_block)
})
{
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();
@@ -313,6 +334,11 @@ impl Mixer {
rtp.set_sequence(random::<u16>().into());
rtp.set_timestamp(random::<u32>().into());
self.deadline = Instant::now();
let mut ka = MutableKeepalivePacket::new(&mut self.keepalive_packet[..])
.expect("FATAL: Insufficient bytes given to keepalive packet.");
ka.set_ssrc(ssrc);
self.keepalive_deadline = self.deadline + UDP_KEEPALIVE_GAP;
Ok(())
},
MixerMessage::DropConn => {
@@ -321,9 +347,12 @@ impl Mixer {
},
MixerMessage::ReplaceInterconnect(i) => {
self.prevent_events = false;
if let Some(ws) = &self.ws {
conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err();
}
#[cfg(feature = "receive")]
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
@@ -357,13 +386,19 @@ impl Mixer {
);
}
self.config = Arc::new(new_config.clone());
self.config = Arc::new(
#[cfg(feature = "receive")]
new_config.clone(),
#[cfg(not(feature = "receive"))]
new_config,
);
if self.tracks.capacity() < self.config.preallocated_tracks {
self.tracks
.reserve(self.config.preallocated_tracks - self.tracks.len());
}
#[cfg(feature = "receive")]
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
@@ -674,7 +709,7 @@ impl Mixer {
let send_buffer = self.config.use_softclip.then(|| &softclip_buffer[..]);
#[cfg(test)]
if let Some(OutputMode::Raw(tx)) = &self.config.override_connection {
let send_status = if let Some(OutputMode::Raw(tx)) = &self.config.override_connection {
let msg = match mix_len {
MixType::Passthrough(len) if len == SILENT_FRAME.len() => OutputMessage::Silent,
MixType::Passthrough(len) => {
@@ -693,12 +728,18 @@ impl Mixer {
};
drop(tx.send(msg.into()));
Ok(())
} else {
self.prep_and_send_packet(send_buffer, mix_len)?;
}
self.prep_and_send_packet(send_buffer, mix_len)
};
#[cfg(not(test))]
self.prep_and_send_packet(send_buffer, mix_len)?;
let send_status = self.prep_and_send_packet(send_buffer, mix_len);
send_status.or_else(Error::disarm_would_block)?;
self.advance_rtp_counters();
// Zero out all planes of the mix buffer if any audio was written.
if matches!(mix_len, MixType::MixedPcm(a) if a > 0) {
@@ -770,25 +811,36 @@ impl Mixer {
// Test mode: send unencrypted (compressed) packets to local receiver.
drop(tx.send(self.packet[..index].to_vec().into()));
} else {
conn.udp_tx.send(self.packet[..index].to_vec())?;
conn.udp_tx.send(&self.packet[..index])?;
}
#[cfg(not(test))]
{
// Normal operation: send encrypted payload to UDP Tx task.
// TODO: This is dog slow, don't do this.
// Can we replace this with a shared ring buffer + semaphore?
// or the BBQueue crate?
conn.udp_tx.send(self.packet[..index].to_vec())?;
conn.udp_tx.send(&self.packet[..index])?;
}
Ok(())
}
#[inline]
fn advance_rtp_counters(&mut self) {
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"FATAL: Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_sequence(rtp.get_sequence() + 1);
rtp.set_timestamp(rtp.get_timestamp() + MONO_FRAME_SIZE as u32);
}
#[inline]
fn check_and_send_keepalive(&mut self) -> Result<()> {
if let Some(conn) = self.conn_active.as_mut() {
if Instant::now() >= self.keepalive_deadline {
conn.udp_tx.send(&self.keepalive_packet)?;
self.keepalive_deadline += UDP_KEEPALIVE_GAP;
}
}
Ok(())
}

View File

@@ -5,8 +5,8 @@ pub mod error;
mod events;
pub mod message;
pub mod mixer;
#[cfg(feature = "receive")]
pub(crate) mod udp_rx;
pub(crate) mod udp_tx;
pub(crate) mod ws;
use std::time::Duration;

View File

@@ -22,7 +22,7 @@ use discortp::{
PacketSize,
};
use flume::Receiver;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use std::{collections::HashMap, convert::TryInto};
use tokio::{net::UdpSocket, select};
use tracing::{error, instrument, trace, warn};
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;
@@ -240,7 +240,7 @@ struct UdpRx {
config: Config,
packet_buffer: [u8; VOICE_PACKET_MAX],
rx: Receiver<UdpRxMessage>,
udp_socket: Arc<UdpSocket>,
udp_socket: UdpSocket,
}
impl UdpRx {
@@ -395,7 +395,7 @@ pub(crate) async fn runner(
rx: Receiver<UdpRxMessage>,
cipher: Cipher,
config: Config,
udp_socket: Arc<UdpSocket>,
udp_socket: UdpSocket,
) {
trace!("UDP receive handle started.");

View File

@@ -1,63 +0,0 @@
use super::message::*;
use crate::constants::*;
use discortp::discord::MutableKeepalivePacket;
use flume::Receiver;
use std::sync::Arc;
use tokio::{
net::UdpSocket,
time::{timeout_at, Instant},
};
use tracing::{error, instrument, trace};
struct UdpTx {
ssrc: u32,
rx: Receiver<UdpTxMessage>,
udp_tx: Arc<UdpSocket>,
}
impl UdpTx {
async fn run(&mut self) {
let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()];
let mut ka = MutableKeepalivePacket::new(&mut keepalive_bytes[..])
.expect("FATAL: Insufficient bytes given to keepalive packet.");
ka.set_ssrc(self.ssrc);
let mut ka_time = Instant::now() + UDP_KEEPALIVE_GAP;
loop {
match timeout_at(ka_time, self.rx.recv_async()).await {
Err(_) => {
trace!("Sending UDP Keepalive.");
if let Err(e) = self.udp_tx.send(&keepalive_bytes[..]).await {
error!("Fatal UDP keepalive send error: {:?}.", e);
break;
}
ka_time += UDP_KEEPALIVE_GAP;
},
Ok(Ok(p)) =>
if let Err(e) = self.udp_tx.send(&p[..]).await {
error!("Fatal UDP packet send error: {:?}.", e);
break;
},
Ok(Err(flume::RecvError::Disconnected)) => {
break;
},
}
}
}
}
#[instrument(skip(udp_msg_rx))]
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, udp_tx: Arc<UdpSocket>) {
trace!("UDP transmit handle started.");
let mut txer = UdpTx {
ssrc,
rx: udp_msg_rx,
udp_tx,
};
txer.run().await;
trace!("UDP transmit handle stopped.");
}