Driver, Input: Performance & Benchmarks (#27)

* Driver Benchmarks

Benchmarks driver use cases for single packet send,
multiple packet send, float vs opus, and the cost of
head-of-queue track removal.

Mix costs for large packet counts are also included.

This is a prelude to the optimisations discussed in
#21.

* Typo in benchmark

* Place Opus packet directly into packet buffer

Cleans up some other logic surrounding this, too. Gets a 16.9% perf improvement on opus packet passthrough (sub 5us here).

* Better track removal

In theory this should be faster, but it aint. Keeping in case
reducing struct sizes down the line magically makes this
faster.

* Reduce size of Input, TrackHandle

Metadata is now boxed away. Similarly, TrackHandles are neatly Arc'd to reduce their size to pointer length (and mitigate the impact of copies if we add in more fields).
This commit is contained in:
Kyle Simpson
2020-12-26 23:08:35 +00:00
committed by GitHub
parent 2fc88a6ef1
commit 504b8dfaef
23 changed files with 462 additions and 145 deletions

View File

@@ -22,23 +22,24 @@ use tokio::runtime::Handle;
use tracing::{error, instrument};
use xsalsa20poly1305::TAG_SIZE;
struct Mixer {
async_handle: Handle,
bitrate: Bitrate,
config: Config,
conn_active: Option<MixerConnection>,
deadline: Instant,
encoder: OpusEncoder,
interconnect: Interconnect,
mix_rx: Receiver<MixerMessage>,
muted: bool,
packet: [u8; VOICE_PACKET_MAX],
prevent_events: bool,
silence_frames: u8,
sleeper: SpinSleeper,
soft_clip: SoftClip,
tracks: Vec<Track>,
ws: Option<Sender<WsMessage>>,
pub struct Mixer {
pub async_handle: Handle,
pub bitrate: Bitrate,
pub config: Config,
pub conn_active: Option<MixerConnection>,
pub deadline: Instant,
pub encoder: OpusEncoder,
pub interconnect: Interconnect,
pub mix_rx: Receiver<MixerMessage>,
pub muted: bool,
pub packet: [u8; VOICE_PACKET_MAX],
pub prevent_events: bool,
pub silence_frames: u8,
pub skip_sleep: bool,
pub sleeper: SpinSleeper,
pub soft_clip: SoftClip,
pub tracks: Vec<Track>,
pub ws: Option<Sender<WsMessage>>,
}
fn new_encoder(bitrate: Bitrate) -> Result<OpusEncoder> {
@@ -49,7 +50,7 @@ fn new_encoder(bitrate: Bitrate) -> Result<OpusEncoder> {
}
impl Mixer {
fn new(
pub fn new(
mix_rx: Receiver<MixerMessage>,
async_handle: Handle,
interconnect: Interconnect,
@@ -86,6 +87,7 @@ impl Mixer {
packet,
prevent_events: false,
silence_frames: 0,
skip_sleep: false,
sleeper: Default::default(),
soft_clip,
tracks,
@@ -288,70 +290,6 @@ impl Mixer {
Ok(())
}
#[inline]
fn mix_tracks<'a>(
&mut self,
opus_frame: &'a mut [u8],
mix_buffer: &mut [f32; STEREO_FRAME_SIZE],
) -> Result<(usize, &'a [u8])> {
let mut len = 0;
// Opus frame passthrough.
// This requires that we have only one track, who has volume 1.0, and an
// Opus codec type.
let do_passthrough = self.tracks.len() == 1 && {
let track = &self.tracks[0];
(track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough()
};
for (i, track) in self.tracks.iter_mut().enumerate() {
let vol = track.volume;
let stream = &mut track.source;
if track.playing != PlayMode::Play {
continue;
}
let (temp_len, opus_len) = if do_passthrough {
(0, track.source.read_opus_frame(opus_frame).ok())
} else {
(stream.mix(mix_buffer, vol), None)
};
len = len.max(temp_len);
if temp_len > 0 || opus_len.is_some() {
track.step_frame();
} else if track.do_loop() {
if let Ok(time) = track.seek_time(Default::default()) {
// have to reproduce self.fire_event here
// to circumvent the borrow checker's lack of knowledge.
//
// In event of error, one of the later event calls will
// trigger the event thread rebuild: it is more prudent that
// the mixer works as normal right now.
if !self.prevent_events {
let _ = self.interconnect.events.send(EventMessage::ChangeState(
i,
TrackStateChange::Position(time),
));
let _ = self.interconnect.events.send(EventMessage::ChangeState(
i,
TrackStateChange::Loops(track.loops, false),
));
}
}
} else {
track.end();
}
if let Some(opus_len) = opus_len {
return Ok((STEREO_FRAME_SIZE, &opus_frame[..opus_len]));
}
}
Ok((len, &opus_frame[..0]))
}
#[inline]
fn audio_commands_events(&mut self) -> Result<()> {
// Apply user commands.
@@ -374,7 +312,7 @@ impl Mixer {
if track.playing.is_done() {
let p_state = track.playing();
self.tracks.remove(i);
self.tracks.swap_remove(i);
to_remove.push(i);
self.fire_event(EventMessage::ChangeState(
i,
@@ -398,42 +336,65 @@ impl Mixer {
#[inline]
fn march_deadline(&mut self) {
if self.skip_sleep {
return;
}
self.sleeper
.sleep(self.deadline.saturating_duration_since(Instant::now()));
self.deadline += TIMESTEP_LENGTH;
}
fn cycle(&mut self) -> Result<()> {
pub fn cycle(&mut self) -> Result<()> {
if self.conn_active.is_none() {
self.march_deadline();
return Ok(());
}
// TODO: can we make opus_frame_backing *actually* a view over
// some region of self.packet, derived using the encryption mode?
// This saves a copy on Opus passthrough.
let mut opus_frame_backing = [0u8; STEREO_FRAME_SIZE];
let mut mix_buffer = [0f32; STEREO_FRAME_SIZE];
// Slice which mix tracks may use to passthrough direct Opus frames.
let mut opus_space = &mut opus_frame_backing[..];
// Walk over all the audio files, combining into one audio frame according
// to volume, play state, etc.
let (mut len, mut opus_frame) = self.mix_tracks(&mut opus_space, &mut mix_buffer)?;
let mut mix_len = {
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"FATAL: Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
let payload = rtp.payload_mut();
// self.mix_tracks(&mut payload[TAG_SIZE..], &mut mix_buffer)
mix_tracks(
&mut payload[TAG_SIZE..],
&mut mix_buffer,
&mut self.tracks,
&self.interconnect,
self.prevent_events,
)
};
self.soft_clip.apply(&mut mix_buffer[..])?;
if self.muted {
len = 0;
mix_len = MixType::MixedPcm(0);
}
if len == 0 {
if mix_len == MixType::MixedPcm(0) {
if self.silence_frames > 0 {
self.silence_frames -= 1;
// Explicit "Silence" frame.
opus_frame = &SILENT_FRAME[..];
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"FATAL: Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
let payload = rtp.payload_mut();
(&mut payload[TAG_SIZE..TAG_SIZE + SILENT_FRAME.len()])
.copy_from_slice(&SILENT_FRAME[..]);
mix_len = MixType::Passthrough(SILENT_FRAME.len());
} else {
// Per official guidelines, send 5x silence BEFORE we stop speaking.
if let Some(ws) = &self.ws {
@@ -457,7 +418,7 @@ impl Mixer {
}
self.march_deadline();
self.prep_and_send_packet(mix_buffer, opus_frame)?;
self.prep_and_send_packet(mix_buffer, mix_len)?;
Ok(())
}
@@ -466,7 +427,8 @@ impl Mixer {
self.encoder.set_bitrate(bitrate).map_err(Into::into)
}
fn prep_and_send_packet(&mut self, buffer: [f32; 1920], opus_frame: &[u8]) -> Result<()> {
#[inline]
fn prep_and_send_packet(&mut self, buffer: [f32; 1920], mix_len: MixType) -> Result<()> {
let conn = self
.conn_active
.as_mut()
@@ -481,16 +443,15 @@ impl Mixer {
let payload = rtp.payload_mut();
let crypto_mode = conn.crypto_state.kind();
let payload_len = if opus_frame.is_empty() {
let total_payload_space = payload.len() - crypto_mode.payload_suffix_len();
self.encoder.encode_float(
&buffer[..STEREO_FRAME_SIZE],
&mut payload[TAG_SIZE..total_payload_space],
)?
} else {
let len = opus_frame.len();
payload[TAG_SIZE..TAG_SIZE + len].clone_from_slice(opus_frame);
len
let payload_len = match mix_len {
MixType::Passthrough(opus_len) => opus_len,
MixType::MixedPcm(_samples) => {
let total_payload_space = payload.len() - crypto_mode.payload_suffix_len();
self.encoder.encode_float(
&buffer[..STEREO_FRAME_SIZE],
&mut payload[TAG_SIZE..total_payload_space],
)?
},
};
let final_payload_size = conn
@@ -523,6 +484,78 @@ impl Mixer {
}
}
#[derive(Debug, Eq, PartialEq)]
enum MixType {
Passthrough(usize),
MixedPcm(usize),
}
#[inline]
fn mix_tracks<'a>(
opus_frame: &'a mut [u8],
mix_buffer: &mut [f32; STEREO_FRAME_SIZE],
tracks: &mut Vec<Track>,
interconnect: &Interconnect,
prevent_events: bool,
) -> MixType {
let mut len = 0;
// Opus frame passthrough.
// This requires that we have only one track, who has volume 1.0, and an
// Opus codec type.
let do_passthrough = tracks.len() == 1 && {
let track = &tracks[0];
(track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough()
};
for (i, track) in tracks.iter_mut().enumerate() {
let vol = track.volume;
let stream = &mut track.source;
if track.playing != PlayMode::Play {
continue;
}
let (temp_len, opus_len) = if do_passthrough {
(0, track.source.read_opus_frame(opus_frame).ok())
} else {
(stream.mix(mix_buffer, vol), None)
};
len = len.max(temp_len);
if temp_len > 0 || opus_len.is_some() {
track.step_frame();
} else if track.do_loop() {
if let Ok(time) = track.seek_time(Default::default()) {
// have to reproduce self.fire_event here
// to circumvent the borrow checker's lack of knowledge.
//
// In event of error, one of the later event calls will
// trigger the event thread rebuild: it is more prudent that
// the mixer works as normal right now.
if !prevent_events {
let _ = interconnect.events.send(EventMessage::ChangeState(
i,
TrackStateChange::Position(time),
));
let _ = interconnect.events.send(EventMessage::ChangeState(
i,
TrackStateChange::Loops(track.loops, false),
));
}
}
} else {
track.end();
}
if let Some(opus_len) = opus_len {
return MixType::Passthrough(opus_len);
}
}
MixType::MixedPcm(len)
}
/// The mixing thread is a synchronous context due to its compute-bound nature.
///
/// We pass in an async handle for the benefit of some Input classes (e.g., restartables)