diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 547813c..8f615ac 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -41,7 +41,7 @@ jobs: env: RUSTDOCFLAGS: -D broken_intra_doc_links run: | - cargo doc --no-deps --all-features + cargo doc --no-deps --features default,twilight-rustls,builtin-queue,stock-zlib - name: Prepare docs shell: bash -e -O extglob {0} diff --git a/Cargo.toml b/Cargo.toml index 0248d87..580151c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,10 +158,18 @@ serenity-deps = ["async-trait"] youtube-dlc = [] builtin-queue = [] +internals = [] + [[bench]] -name = "mixing" -path = "benches/mixing.rs" +name = "base-mixing" +path = "benches/base-mixing.rs" +harness = false + +[[bench]] +name = "mixing-task" +path = "benches/mixing-task.rs" +required-features = ["internals"] harness = false [package.metadata.docs.rs] -all-features = true +features = ["default", "twilight-rustls", "builtin-queue", "stock-zlib"] diff --git a/benches/mixing.rs b/benches/base-mixing.rs similarity index 100% rename from benches/mixing.rs rename to benches/base-mixing.rs diff --git a/benches/mixing-task.rs b/benches/mixing-task.rs new file mode 100644 index 0000000..fcf37f3 --- /dev/null +++ b/benches/mixing-task.rs @@ -0,0 +1,237 @@ +use criterion::{ + black_box, + criterion_group, + criterion_main, + BatchSize, + Bencher, + BenchmarkId, + Criterion, +}; +use flume::{Receiver, Sender, TryRecvError}; +use songbird::{ + constants::*, + driver::bench_internals::{mixer::Mixer, task_message::*, CryptoState}, + input::{cached::Compressed, Input}, + tracks, + Bitrate, +}; +use tokio::runtime::{Handle, Runtime}; +use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher, KEY_SIZE}; + +// create a dummied task + interconnect. +// measure perf at varying numbers of sources (binary 1--64) without passthrough support. + +fn dummied_mixer( + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let (mix_tx, mix_rx) = flume::unbounded(); + let (core_tx, core_rx) = flume::unbounded(); + let (event_tx, event_rx) = flume::unbounded(); + + let (udp_sender_tx, udp_sender_rx) = flume::unbounded(); + let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded(); + + let ic = Interconnect { + core: core_tx, + events: event_tx, + mixer: mix_tx, + }; + + let mut out = Mixer::new(mix_rx, handle, ic, Default::default()); + + let fake_conn = MixerConnection { + cipher: Cipher::new_varkey(&vec![0u8; KEY_SIZE]).unwrap(), + crypto_state: CryptoState::Normal, + udp_rx: udp_receiver_tx, + udp_tx: udp_sender_tx, + }; + + out.conn_active = Some(fake_conn); + + out.skip_sleep = true; + + (out, (core_rx, event_rx, udp_receiver_rx, udp_sender_rx)) +} + +fn mixer_float( + num_tracks: usize, + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let mut out = dummied_mixer(handle); + + let floats = utils::make_sine(10 * STEREO_FRAME_SIZE, true); + + let mut tracks = vec![]; + for i in 0..num_tracks { + let input = Input::float_pcm(true, floats.clone().into()); + tracks.push(tracks::create_player(input).0.into()); + } + + out.0.tracks = tracks; + + out +} + +fn mixer_float_drop( + num_tracks: usize, + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + let mut out = dummied_mixer(handle); + + let mut tracks = vec![]; + for i in 0..num_tracks { + let floats = utils::make_sine((i / 5) * STEREO_FRAME_SIZE, true); + let input = Input::float_pcm(true, floats.clone().into()); + tracks.push(tracks::create_player(input).0.into()); + } + + out.0.tracks = tracks; + + out +} + +fn mixer_opus( + handle: Handle, +) -> ( + Mixer, + ( + Receiver, + Receiver, + Receiver, + Receiver, + ), +) { + // should add a single opus-based track. + // make this fully loaded to prevent any perf cost there. + let mut out = dummied_mixer(handle); + + let floats = utils::make_sine(6 * STEREO_FRAME_SIZE, true); + + let mut tracks = vec![]; + + let mut src = Compressed::new( + Input::float_pcm(true, floats.clone().into()), + Bitrate::BitsPerSecond(128_000), + ) + .expect("These parameters are well-defined."); + src.raw.load_all(); + + tracks.push(tracks::create_player(src.into()).0.into()); + + out.0.tracks = tracks; + + out +} + +fn no_passthrough(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("Float Input (No Passthrough)"); + + for shift in 0..=6 { + let track_count = 1 << shift; + + group.bench_with_input( + BenchmarkId::new("Single Packet", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || black_box(mixer_float(*i, rt.handle().clone())), + |input| { + black_box(input.0.cycle()); + }, + BatchSize::SmallInput, + ) + }, + ); + group.bench_with_input( + BenchmarkId::new("n=5 Packets", track_count), + &track_count, + |b, i| { + b.iter_batched_ref( + || black_box(mixer_float(*i, rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + + group.finish(); +} + +fn passthrough(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let mut group = c.benchmark_group("Opus Input (Passthrough)"); + + group.bench_function("Single Packet", |b| { + b.iter_batched_ref( + || black_box(mixer_opus(rt.handle().clone())), + |input| { + black_box(input.0.cycle()); + }, + BatchSize::SmallInput, + ) + }); + group.bench_function("n=5 Packets", |b| { + b.iter_batched_ref( + || black_box(mixer_opus(rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }); + + group.finish(); +} + +fn culling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("Worst-case Track Culling (15 tracks, 5 pkts)", |b| { + b.iter_batched_ref( + || black_box(mixer_float_drop(15, rt.handle().clone())), + |input| { + for i in 0..5 { + black_box(input.0.cycle()); + } + }, + BatchSize::SmallInput, + ) + }); +} + +criterion_group!(benches, no_passthrough, passthrough, culling); +criterion_main!(benches); diff --git a/src/driver/bench_internals.rs b/src/driver/bench_internals.rs new file mode 100644 index 0000000..d335d49 --- /dev/null +++ b/src/driver/bench_internals.rs @@ -0,0 +1,8 @@ +//! Various driver internals which need to be exported for benchmarking. +//! +//! Included if using the `"internals"` feature flag. +//! You should not and/or cannot use these as part of a normal application. + +pub use super::tasks::{message as task_message, mixer}; + +pub use super::crypto::CryptoState; diff --git a/src/driver/crypto.rs b/src/driver/crypto.rs index cfbc813..1ef763b 100644 --- a/src/driver/crypto.rs +++ b/src/driver/crypto.rs @@ -169,9 +169,10 @@ impl CryptoMode { } } +#[allow(missing_docs)] #[derive(Clone, Copy, Debug, Eq, PartialEq)] #[non_exhaustive] -pub(crate) enum CryptoState { +pub enum CryptoState { Normal, Suffix, Lite(Wrapping), @@ -217,6 +218,7 @@ impl CryptoState { endpoint } + /// Returns the underlying (stateless) type of the active crypto mode. pub fn kind(&self) -> CryptoMode { CryptoMode::from(*self) } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 08a1a1d..5e0ad74 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -8,6 +8,9 @@ //! generation from being slowed down past its deadline, or from affecting other //! asynchronous tasks your bot must handle. +#[cfg(feature = "internals")] +pub mod bench_internals; + mod config; pub(crate) mod connection; mod crypto; @@ -16,7 +19,8 @@ pub(crate) mod tasks; pub use config::Config; use connection::error::{Error, Result}; -pub use crypto::*; +pub use crypto::CryptoMode; +pub(crate) use crypto::CryptoState; pub use decode_mode::DecodeMode; #[cfg(feature = "builtin-queue")] diff --git a/src/driver/tasks/events.rs b/src/driver/tasks/events.rs index bb28895..95ec677 100644 --- a/src/driver/tasks/events.rs +++ b/src/driver/tasks/events.rs @@ -93,9 +93,9 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { info!("Event state for track {} of {} removed.", i, events.len()); - events.remove(i); - states.remove(i); - handles.remove(i); + events.swap_remove(i); + states.swap_remove(i); + handles.swap_remove(i); }, Ok(RemoveAllTracks) => { info!("Event state for all tracks removed."); diff --git a/src/driver/tasks/message/core.rs b/src/driver/tasks/message/core.rs index 270beec..dc02f54 100644 --- a/src/driver/tasks/message/core.rs +++ b/src/driver/tasks/message/core.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use crate::{ driver::{connection::error::Error, Config}, events::EventData, diff --git a/src/driver/tasks/message/events.rs b/src/driver/tasks/message/events.rs index 197ebe8..c7989ea 100644 --- a/src/driver/tasks/message/events.rs +++ b/src/driver/tasks/message/events.rs @@ -1,10 +1,12 @@ +#![allow(missing_docs)] + use crate::{ events::{CoreContext, EventData, EventStore}, tracks::{LoopState, PlayMode, TrackHandle, TrackState}, }; use std::time::Duration; -pub(crate) enum EventMessage { +pub enum EventMessage { // Event related. // Track events should fire off the back of state changes. AddGlobalEvent(EventData), diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 260f400..3c9b0a1 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + use super::{Interconnect, UdpRxMessage, UdpTxMessage, WsMessage}; use crate::{ @@ -8,7 +10,7 @@ use crate::{ use flume::Sender; use xsalsa20poly1305::XSalsa20Poly1305 as Cipher; -pub(crate) struct MixerConnection { +pub struct MixerConnection { pub cipher: Cipher, pub crypto_state: CryptoState, pub udp_rx: Sender, @@ -22,7 +24,7 @@ impl Drop for MixerConnection { } } -pub(crate) enum MixerMessage { +pub enum MixerMessage { AddTrack(Track), SetTrack(Option), diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index 1831839..f769efb 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + mod core; mod events; mod mixer; @@ -5,13 +7,13 @@ mod udp_rx; mod udp_tx; mod ws; -pub(crate) use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; +pub use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*}; use flume::Sender; use tracing::info; #[derive(Clone, Debug)] -pub(crate) struct Interconnect { +pub struct Interconnect { pub core: Sender, pub events: Sender, pub mixer: Sender, diff --git a/src/driver/tasks/message/udp_rx.rs b/src/driver/tasks/message/udp_rx.rs index 453415d..9034090 100644 --- a/src/driver/tasks/message/udp_rx.rs +++ b/src/driver/tasks/message/udp_rx.rs @@ -1,7 +1,9 @@ +#![allow(missing_docs)] + use super::Interconnect; use crate::driver::Config; -pub(crate) enum UdpRxMessage { +pub enum UdpRxMessage { SetConfig(Config), ReplaceInterconnect(Interconnect), diff --git a/src/driver/tasks/message/udp_tx.rs b/src/driver/tasks/message/udp_tx.rs index 349d524..d3dbf36 100644 --- a/src/driver/tasks/message/udp_tx.rs +++ b/src/driver/tasks/message/udp_tx.rs @@ -1,3 +1,5 @@ +#![allow(missing_docs)] + pub enum UdpTxMessage { Packet(Vec), // TODO: do something cheaper. Poison, diff --git a/src/driver/tasks/message/ws.rs b/src/driver/tasks/message/ws.rs index 7ce5f07..1cd7e49 100644 --- a/src/driver/tasks/message/ws.rs +++ b/src/driver/tasks/message/ws.rs @@ -1,8 +1,10 @@ +#![allow(missing_docs)] + use super::Interconnect; use crate::ws::WsStream; #[allow(dead_code)] -pub(crate) enum WsMessage { +pub enum WsMessage { Ws(Box), ReplaceInterconnect(Interconnect), SetKeepalive(f64), diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index ec8a121..d552e12 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -22,23 +22,24 @@ use tokio::runtime::Handle; use tracing::{error, instrument}; use xsalsa20poly1305::TAG_SIZE; -struct Mixer { - async_handle: Handle, - bitrate: Bitrate, - config: Config, - conn_active: Option, - deadline: Instant, - encoder: OpusEncoder, - interconnect: Interconnect, - mix_rx: Receiver, - muted: bool, - packet: [u8; VOICE_PACKET_MAX], - prevent_events: bool, - silence_frames: u8, - sleeper: SpinSleeper, - soft_clip: SoftClip, - tracks: Vec, - ws: Option>, +pub struct Mixer { + pub async_handle: Handle, + pub bitrate: Bitrate, + pub config: Config, + pub conn_active: Option, + pub deadline: Instant, + pub encoder: OpusEncoder, + pub interconnect: Interconnect, + pub mix_rx: Receiver, + pub muted: bool, + pub packet: [u8; VOICE_PACKET_MAX], + pub prevent_events: bool, + pub silence_frames: u8, + pub skip_sleep: bool, + pub sleeper: SpinSleeper, + pub soft_clip: SoftClip, + pub tracks: Vec, + pub ws: Option>, } fn new_encoder(bitrate: Bitrate) -> Result { @@ -49,7 +50,7 @@ fn new_encoder(bitrate: Bitrate) -> Result { } impl Mixer { - fn new( + pub fn new( mix_rx: Receiver, async_handle: Handle, interconnect: Interconnect, @@ -86,6 +87,7 @@ impl Mixer { packet, prevent_events: false, silence_frames: 0, + skip_sleep: false, sleeper: Default::default(), soft_clip, tracks, @@ -288,70 +290,6 @@ impl Mixer { Ok(()) } - #[inline] - fn mix_tracks<'a>( - &mut self, - opus_frame: &'a mut [u8], - mix_buffer: &mut [f32; STEREO_FRAME_SIZE], - ) -> Result<(usize, &'a [u8])> { - let mut len = 0; - - // Opus frame passthrough. - // This requires that we have only one track, who has volume 1.0, and an - // Opus codec type. - let do_passthrough = self.tracks.len() == 1 && { - let track = &self.tracks[0]; - (track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough() - }; - - for (i, track) in self.tracks.iter_mut().enumerate() { - let vol = track.volume; - let stream = &mut track.source; - - if track.playing != PlayMode::Play { - continue; - } - - let (temp_len, opus_len) = if do_passthrough { - (0, track.source.read_opus_frame(opus_frame).ok()) - } else { - (stream.mix(mix_buffer, vol), None) - }; - - len = len.max(temp_len); - if temp_len > 0 || opus_len.is_some() { - track.step_frame(); - } else if track.do_loop() { - if let Ok(time) = track.seek_time(Default::default()) { - // have to reproduce self.fire_event here - // to circumvent the borrow checker's lack of knowledge. - // - // In event of error, one of the later event calls will - // trigger the event thread rebuild: it is more prudent that - // the mixer works as normal right now. - if !self.prevent_events { - let _ = self.interconnect.events.send(EventMessage::ChangeState( - i, - TrackStateChange::Position(time), - )); - let _ = self.interconnect.events.send(EventMessage::ChangeState( - i, - TrackStateChange::Loops(track.loops, false), - )); - } - } - } else { - track.end(); - } - - if let Some(opus_len) = opus_len { - return Ok((STEREO_FRAME_SIZE, &opus_frame[..opus_len])); - } - } - - Ok((len, &opus_frame[..0])) - } - #[inline] fn audio_commands_events(&mut self) -> Result<()> { // Apply user commands. @@ -374,7 +312,7 @@ impl Mixer { if track.playing.is_done() { let p_state = track.playing(); - self.tracks.remove(i); + self.tracks.swap_remove(i); to_remove.push(i); self.fire_event(EventMessage::ChangeState( i, @@ -398,42 +336,65 @@ impl Mixer { #[inline] fn march_deadline(&mut self) { + if self.skip_sleep { + return; + } + self.sleeper .sleep(self.deadline.saturating_duration_since(Instant::now())); self.deadline += TIMESTEP_LENGTH; } - fn cycle(&mut self) -> Result<()> { + pub fn cycle(&mut self) -> Result<()> { if self.conn_active.is_none() { self.march_deadline(); return Ok(()); } - // TODO: can we make opus_frame_backing *actually* a view over - // some region of self.packet, derived using the encryption mode? - // This saves a copy on Opus passthrough. - let mut opus_frame_backing = [0u8; STEREO_FRAME_SIZE]; let mut mix_buffer = [0f32; STEREO_FRAME_SIZE]; - // Slice which mix tracks may use to passthrough direct Opus frames. - let mut opus_space = &mut opus_frame_backing[..]; - // Walk over all the audio files, combining into one audio frame according // to volume, play state, etc. - let (mut len, mut opus_frame) = self.mix_tracks(&mut opus_space, &mut mix_buffer)?; + let mut mix_len = { + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + + let payload = rtp.payload_mut(); + + // self.mix_tracks(&mut payload[TAG_SIZE..], &mut mix_buffer) + mix_tracks( + &mut payload[TAG_SIZE..], + &mut mix_buffer, + &mut self.tracks, + &self.interconnect, + self.prevent_events, + ) + }; self.soft_clip.apply(&mut mix_buffer[..])?; if self.muted { - len = 0; + mix_len = MixType::MixedPcm(0); } - if len == 0 { + if mix_len == MixType::MixedPcm(0) { if self.silence_frames > 0 { self.silence_frames -= 1; // Explicit "Silence" frame. - opus_frame = &SILENT_FRAME[..]; + let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( + "FATAL: Too few bytes in self.packet for RTP header.\ + (Blame: VOICE_PACKET_MAX?)", + ); + + let payload = rtp.payload_mut(); + + (&mut payload[TAG_SIZE..TAG_SIZE + SILENT_FRAME.len()]) + .copy_from_slice(&SILENT_FRAME[..]); + + mix_len = MixType::Passthrough(SILENT_FRAME.len()); } else { // Per official guidelines, send 5x silence BEFORE we stop speaking. if let Some(ws) = &self.ws { @@ -457,7 +418,7 @@ impl Mixer { } self.march_deadline(); - self.prep_and_send_packet(mix_buffer, opus_frame)?; + self.prep_and_send_packet(mix_buffer, mix_len)?; Ok(()) } @@ -466,7 +427,8 @@ impl Mixer { self.encoder.set_bitrate(bitrate).map_err(Into::into) } - fn prep_and_send_packet(&mut self, buffer: [f32; 1920], opus_frame: &[u8]) -> Result<()> { + #[inline] + fn prep_and_send_packet(&mut self, buffer: [f32; 1920], mix_len: MixType) -> Result<()> { let conn = self .conn_active .as_mut() @@ -481,16 +443,15 @@ impl Mixer { let payload = rtp.payload_mut(); let crypto_mode = conn.crypto_state.kind(); - let payload_len = if opus_frame.is_empty() { - let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); - self.encoder.encode_float( - &buffer[..STEREO_FRAME_SIZE], - &mut payload[TAG_SIZE..total_payload_space], - )? - } else { - let len = opus_frame.len(); - payload[TAG_SIZE..TAG_SIZE + len].clone_from_slice(opus_frame); - len + let payload_len = match mix_len { + MixType::Passthrough(opus_len) => opus_len, + MixType::MixedPcm(_samples) => { + let total_payload_space = payload.len() - crypto_mode.payload_suffix_len(); + self.encoder.encode_float( + &buffer[..STEREO_FRAME_SIZE], + &mut payload[TAG_SIZE..total_payload_space], + )? + }, }; let final_payload_size = conn @@ -523,6 +484,78 @@ impl Mixer { } } +#[derive(Debug, Eq, PartialEq)] +enum MixType { + Passthrough(usize), + MixedPcm(usize), +} + +#[inline] +fn mix_tracks<'a>( + opus_frame: &'a mut [u8], + mix_buffer: &mut [f32; STEREO_FRAME_SIZE], + tracks: &mut Vec, + interconnect: &Interconnect, + prevent_events: bool, +) -> MixType { + let mut len = 0; + + // Opus frame passthrough. + // This requires that we have only one track, who has volume 1.0, and an + // Opus codec type. + let do_passthrough = tracks.len() == 1 && { + let track = &tracks[0]; + (track.volume - 1.0).abs() < f32::EPSILON && track.source.supports_passthrough() + }; + + for (i, track) in tracks.iter_mut().enumerate() { + let vol = track.volume; + let stream = &mut track.source; + + if track.playing != PlayMode::Play { + continue; + } + + let (temp_len, opus_len) = if do_passthrough { + (0, track.source.read_opus_frame(opus_frame).ok()) + } else { + (stream.mix(mix_buffer, vol), None) + }; + + len = len.max(temp_len); + if temp_len > 0 || opus_len.is_some() { + track.step_frame(); + } else if track.do_loop() { + if let Ok(time) = track.seek_time(Default::default()) { + // have to reproduce self.fire_event here + // to circumvent the borrow checker's lack of knowledge. + // + // In event of error, one of the later event calls will + // trigger the event thread rebuild: it is more prudent that + // the mixer works as normal right now. + if !prevent_events { + let _ = interconnect.events.send(EventMessage::ChangeState( + i, + TrackStateChange::Position(time), + )); + let _ = interconnect.events.send(EventMessage::ChangeState( + i, + TrackStateChange::Loops(track.loops, false), + )); + } + } + } else { + track.end(); + } + + if let Some(opus_len) = opus_len { + return MixType::Passthrough(opus_len); + } + } + + MixType::MixedPcm(len) +} + /// The mixing thread is a synchronous context due to its compute-bound nature. /// /// We pass in an async handle for the benefit of some Input classes (e.g., restartables) diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index fe0257c..7dd8960 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -1,7 +1,9 @@ +#![allow(missing_docs)] + pub mod error; mod events; -pub(crate) mod message; -mod mixer; +pub mod message; +pub mod mixer; pub(crate) mod udp_rx; pub(crate) mod udp_tx; pub(crate) mod ws; diff --git a/src/events/context.rs b/src/events/context.rs index 9ff0268..7b9809f 100644 --- a/src/events/context.rs +++ b/src/events/context.rs @@ -72,7 +72,7 @@ pub enum EventContext<'a> { } #[derive(Clone, Debug)] -pub(crate) enum CoreContext { +pub enum CoreContext { SpeakingStateUpdate(Speaking), SpeakingUpdate { ssrc: u32, diff --git a/src/events/mod.rs b/src/events/mod.rs index ea885c0..5bd59cb 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -7,7 +7,8 @@ mod store; mod track; mod untimed; -pub use self::{context::*, core::*, data::*, store::*, track::*, untimed::*}; +pub use self::{context::EventContext, core::*, data::*, store::*, track::*, untimed::*}; +pub(crate) use context::CoreContext; use async_trait::async_trait; use std::time::Duration; diff --git a/src/input/mod.rs b/src/input/mod.rs index 088cac2..0ba6396 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -85,7 +85,7 @@ use tracing::{debug, error}; #[derive(Debug)] pub struct Input { /// Information about the played source. - pub metadata: Metadata, + pub metadata: Box, /// Indicates whether `source` is stereo or mono. pub stereo: bool, /// Underlying audio data bytestream. @@ -119,7 +119,7 @@ impl Input { metadata: Option, ) -> Self { Input { - metadata: metadata.unwrap_or_default(), + metadata: metadata.unwrap_or_default().into(), stereo, reader, kind, diff --git a/src/manager.rs b/src/manager.rs index ca78a91..7bdd28a 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -100,7 +100,7 @@ impl Songbird { /// If this struct is already initialised (e.g., from [`::twilight`]), /// or a previous call, then this function is a no-op. /// - /// [`::twilight`]: Songbird::twilight + /// [`::twilight`]: #method.twilight pub fn initialise_client_data>(&self, shard_count: u64, user_id: U) { let mut client_data = self.client_data.write(); diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index 71bd40a..aaf92c0 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -17,10 +17,15 @@ use uuid::Uuid; /// /// [`Track`]: Track pub struct TrackHandle { + inner: Arc, +} + +#[derive(Clone, Debug)] +struct InnerHandle { command_channel: UnboundedSender, seekable: bool, uuid: Uuid, - metadata: Arc, + metadata: Box, } impl TrackHandle { @@ -32,14 +37,16 @@ impl TrackHandle { command_channel: UnboundedSender, seekable: bool, uuid: Uuid, - metadata: Metadata, + metadata: Box, ) -> Self { - Self { + let inner = Arc::new(InnerHandle { command_channel, seekable, uuid, - metadata: Arc::new(metadata), - } + metadata, + }); + + Self { inner } } /// Unpauses an audio track. @@ -75,7 +82,7 @@ impl TrackHandle { /// [`seek_time`]: TrackHandle::seek_time /// [`Input`]: crate::input::Input pub fn is_seekable(&self) -> bool { - self.seekable + self.inner.seekable } /// Seeks along the track to the specified position. @@ -86,7 +93,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn seek_time(&self, position: Duration) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Seek(position)) } else { Err(TrackError::SeekUnsupported) @@ -139,7 +146,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn enable_loop(&self) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Infinite)) } else { Err(TrackError::SeekUnsupported) @@ -154,7 +161,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn disable_loop(&self) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Finite(0))) } else { Err(TrackError::SeekUnsupported) @@ -169,7 +176,7 @@ impl TrackHandle { /// [`Input`]: crate::input::Input /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported pub fn loop_for(&self, count: usize) -> TrackResult<()> { - if self.seekable { + if self.is_seekable() { self.send(TrackCommand::Loop(LoopState::Finite(count))) } else { Err(TrackError::SeekUnsupported) @@ -178,7 +185,7 @@ impl TrackHandle { /// Returns this handle's (and track's) unique identifier. pub fn uuid(&self) -> Uuid { - self.uuid + self.inner.uuid } /// Returns the metadata stored in the handle. @@ -188,8 +195,8 @@ impl TrackHandle { /// read-only from then on. /// /// [`Input`]: crate::input::Input - pub fn metadata(&self) -> Arc { - self.metadata.clone() + pub fn metadata(&self) -> &Metadata { + &self.inner.metadata } #[inline] @@ -199,7 +206,8 @@ impl TrackHandle { pub fn send(&self, cmd: TrackCommand) -> TrackResult<()> { // As the send channels are unbounded, we can be reasonably certain // that send failure == cancellation. - self.command_channel + self.inner + .command_channel .send(cmd) .map_err(|_e| TrackError::Finished) } diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index e62cd0d..c4560b6 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -54,7 +54,7 @@ use tracing::{info, warn}; /// ``` /// /// [`TrackEvent`]: crate::events::TrackEvent -/// [`Driver::queue`]: crate::driver::Driver::queue +/// [`Driver::queue`]: crate::driver::Driver #[derive(Clone, Debug, Default)] pub struct TrackQueue { // NOTE: the choice of a parking lot mutex is quite deliberate