diff --git a/examples/serenity/voice_receive/src/main.rs b/examples/serenity/voice_receive/src/main.rs index dbaf47a..82cf170 100644 --- a/examples/serenity/voice_receive/src/main.rs +++ b/examples/serenity/voice_receive/src/main.rs @@ -84,35 +84,35 @@ impl VoiceEventHandler for Receiver { speaking, ); }, - Ctx::SpeakingUpdate {ssrc, speaking} => { + Ctx::SpeakingUpdate(data) => { // You can implement logic here which reacts to a user starting // or stopping speaking. println!( "Source {} has {} speaking.", - ssrc, - if *speaking {"started"} else {"stopped"}, + data.ssrc, + if data.speaking {"started"} else {"stopped"}, ); }, - Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { + Ctx::VoicePacket(data) => { // An event which fires for every received audio packet, // containing the decoded data. - if let Some(audio) = audio { + if let Some(audio) = data.audio { println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); println!( "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", - packet.sequence.0, + data.packet.sequence.0, audio.len() * std::mem::size_of::(), - packet.payload.len(), - packet.ssrc, + data.packet.payload.len(), + data.packet.ssrc, ); } else { println!("RTP packet, but no audio. Driver may not be configured to decode."); } }, - Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { + Ctx::RtcpPacket(data) => { // An event which fires for every received rtcp packet, // containing the call statistics and reporting information. - println!("RTCP packet received: {:?}", packet); + println!("RTCP packet received: {:?}", data.packet); }, Ctx::ClientConnect( ClientConnect {audio_ssrc, video_ssrc, user_id, ..} diff --git a/src/driver/connection/mod.rs b/src/driver/connection/mod.rs index 1c89797..85922a5 100644 --- a/src/driver/connection/mod.rs +++ b/src/driver/connection/mod.rs @@ -36,6 +36,7 @@ use ws::create_native_tls_client; pub(crate) struct Connection { pub(crate) info: ConnectionInfo, + pub(crate) ssrc: u32, pub(crate) ws: Sender, } @@ -219,6 +220,7 @@ impl Connection { Ok(Connection { info, + ssrc, ws: ws_msg_tx, }) } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index f85e3fe..8f9c26c 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -10,7 +10,10 @@ pub(crate) mod udp_tx; pub(crate) mod ws; use super::connection::{error::Error as ConnectionError, Connection}; -use crate::{events::CoreContext, Config}; +use crate::{ + events::{internal_data::InternalConnect, CoreContext}, + Config, +}; use flume::{Receiver, RecvError, Sender}; use message::*; #[cfg(not(feature = "tokio-02-marker"))] @@ -78,9 +81,12 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender, tx: Sender, tx: Sender { diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index d118fd5..96b1f72 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -3,7 +3,11 @@ use super::{ message::*, Config, }; -use crate::{constants::*, driver::DecodeMode, events::CoreContext}; +use crate::{ + constants::*, + driver::DecodeMode, + events::{internal_data::*, CoreContext}, +}; use audiopus::{ coder::Decoder as OpusDecoder, error::{Error as OpusError, ErrorCode}, @@ -322,30 +326,30 @@ impl UdpRx { match delta { SpeakingDelta::Start => { let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate { + CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: true, - }, + }), )); }, SpeakingDelta::Stop => { let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate { + CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: false, - }, + }), )); }, _ => {}, } let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::VoicePacket { + CoreContext::VoicePacket(InternalVoicePacket { audio, packet: rtp.from_packet(), payload_offset: rtp_body_start, payload_end_pad: rtp_body_tail, - }, + }), )); } else { warn!("RTP decoding/processing failed."); @@ -371,13 +375,16 @@ impl UdpRx { ) }); - let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::RtcpPacket { - packet: rtcp.from_packet(), - payload_offset: start, - payload_end_pad: tail, - }, - )); + let _ = + interconnect + .events + .send(EventMessage::FireCoreEvent(CoreContext::RtcpPacket( + InternalRtcpPacket { + packet: rtcp.from_packet(), + payload_offset: start, + payload_end_pad: tail, + }, + ))); }, DemuxedMut::FailedParse(t) => { warn!("Failed to parse message of type {:?}.", t); diff --git a/src/events/context/data/connect.rs b/src/events/context/data/connect.rs new file mode 100644 index 0000000..c618828 --- /dev/null +++ b/src/events/context/data/connect.rs @@ -0,0 +1,21 @@ +/// Voice connection details gathered at setup/reinstantiation. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +pub struct ConnectData<'a> { + /// The domain name of Discord's voice/TURN server. + /// + /// With the introduction of Discord's automatic voice server selection, + /// this is no longer guaranteed to match a server's settings. This field + /// may be useful if you need/wish to move your voice connection to a node/shard + /// closer to Discord. + pub server: &'a str, + /// The [RTP SSRC] *("Synchronisation source")* assigned by the voice server + /// for the duration of this call. + /// + /// All packets sent will use this SSRC, which is not related to the sender's User + /// ID. These are usually allocated sequentially by Discord, following on from + /// a random starting SSRC. + /// + /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 + pub ssrc: u32, +} diff --git a/src/events/context/data/mod.rs b/src/events/context/data/mod.rs new file mode 100644 index 0000000..6ed8211 --- /dev/null +++ b/src/events/context/data/mod.rs @@ -0,0 +1,11 @@ +//! Types containing the main body of an [`EventContext`]. +//! +//! [`EventContext`]: super::EventContext +mod connect; +mod rtcp; +mod speaking; +mod voice; + +use discortp::{rtcp::Rtcp, rtp::Rtp}; + +pub use self::{connect::*, rtcp::*, speaking::*, voice::*}; diff --git a/src/events/context/data/rtcp.rs b/src/events/context/data/rtcp.rs new file mode 100644 index 0000000..348a4f5 --- /dev/null +++ b/src/events/context/data/rtcp.rs @@ -0,0 +1,15 @@ +use super::*; + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Telemetry/statistics packet, received from another stream (detailed in `packet`). +/// `payload_offset` contains the true payload location within the raw packet's `payload()`, +/// to allow manual decoding of `Rtcp` packet bodies. +pub struct RtcpData<'a> { + /// Raw RTCP packet data. + pub packet: &'a Rtcp, + /// Byte index into the packet body (after headers) for where the payload begins. + pub payload_offset: usize, + /// Number of bytes at the end of the packet to discard. + pub payload_end_pad: usize, +} diff --git a/src/events/context/data/speaking.rs b/src/events/context/data/speaking.rs new file mode 100644 index 0000000..8b31b99 --- /dev/null +++ b/src/events/context/data/speaking.rs @@ -0,0 +1,14 @@ +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +/// Speaking state transition, describing whether a given source has started/stopped +/// transmitting. This fires in response to a silent burst, or the first packet +/// breaking such a burst. +pub struct SpeakingUpdateData { + /// Whether this user is currently speaking. + pub speaking: bool, + /// Synchronisation Source of the user who has begun speaking. + /// + /// This must be combined with another event class to map this back to + /// its original UserId. + pub ssrc: u32, +} diff --git a/src/events/context/data/voice.rs b/src/events/context/data/voice.rs new file mode 100644 index 0000000..46aa071 --- /dev/null +++ b/src/events/context/data/voice.rs @@ -0,0 +1,20 @@ +use super::*; + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Opus audio packet, received from another stream (detailed in `packet`). +/// `payload_offset` contains the true payload location within the raw packet's `payload()`, +/// if extensions or raw packet data are required. +/// If `audio.len() == 0`, then this packet arrived out-of-order. +pub struct VoiceData<'a> { + /// Decoded audio from this packet. + pub audio: &'a Option>, + /// Raw RTP packet data. + /// + /// Includes the SSRC (i.e., sender) of this packet. + pub packet: &'a Rtp, + /// Byte index into the packet body (after headers) for where the payload begins. + pub payload_offset: usize, + /// Number of bytes at the end of the packet to discard. + pub payload_end_pad: usize, +} diff --git a/src/events/context/internal_data.rs b/src/events/context/internal_data.rs new file mode 100644 index 0000000..4a26a18 --- /dev/null +++ b/src/events/context/internal_data.rs @@ -0,0 +1,68 @@ +use super::context_data::*; +use discortp::{rtcp::Rtcp, rtp::Rtp}; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct InternalConnect { + pub server: String, + pub ssrc: u32, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct InternalSpeakingUpdate { + pub ssrc: u32, + pub speaking: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct InternalVoicePacket { + pub audio: Option>, + pub packet: Rtp, + pub payload_offset: usize, + pub payload_end_pad: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct InternalRtcpPacket { + pub packet: Rtcp, + pub payload_offset: usize, + pub payload_end_pad: usize, +} + +impl<'a> From<&'a InternalConnect> for ConnectData<'a> { + fn from(val: &'a InternalConnect) -> Self { + Self { + server: &val.server, + ssrc: val.ssrc, + } + } +} + +impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData { + fn from(val: &'a InternalSpeakingUpdate) -> Self { + Self { + speaking: val.speaking, + ssrc: val.ssrc, + } + } +} + +impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> { + fn from(val: &'a InternalVoicePacket) -> Self { + Self { + audio: &val.audio, + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } + } +} + +impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> { + fn from(val: &'a InternalRtcpPacket) -> Self { + Self { + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } + } +} diff --git a/src/events/context.rs b/src/events/context/mod.rs similarity index 56% rename from src/events/context.rs rename to src/events/context/mod.rs index 160a6f6..71ee51f 100644 --- a/src/events/context.rs +++ b/src/events/context/mod.rs @@ -1,9 +1,14 @@ +pub mod data; +pub(crate) mod internal_data; + use super::*; use crate::{ model::payload::{ClientConnect, ClientDisconnect, Speaking}, tracks::{TrackHandle, TrackState}, }; -use discortp::{rtcp::Rtcp, rtp::Rtp}; +pub use data as context_data; +use data::*; +use internal_data::*; /// Information about which tracks or data fired an event. /// @@ -28,90 +33,52 @@ pub enum EventContext<'a> { /// Speaking state transition, describing whether a given source has started/stopped /// transmitting. This fires in response to a silent burst, or the first packet /// breaking such a burst. - SpeakingUpdate { - /// Synchronisation Source of the user who has begun speaking. - /// - /// This must be combined with another event class to map this back to - /// its original UserId. - ssrc: u32, - /// Whether this user is currently speaking. - speaking: bool, - }, - /// Opus audio packet, received from another stream (detailed in `packet`). - /// `payload_offset` contains the true payload location within the raw packet's `payload()`, - /// if extensions or raw packet data are required. - /// if `audio.len() == 0`, then this packet arrived out-of-order. - VoicePacket { - /// Decoded audio from this packet. - audio: &'a Option>, - /// Raw RTP packet data. - /// - /// Includes the SSRC (i.e., sender) of this packet. - packet: &'a Rtp, - /// Byte index into the packet body (after headers) for where the payload begins. - payload_offset: usize, - /// Number of bytes at the end of the packet to discard. - payload_end_pad: usize, - }, - /// Telemetry/statistics packet, received from another stream (detailed in `packet`). - /// `payload_offset` contains the true payload location within the raw packet's `payload()`, - /// to allow manual decoding of `Rtcp` packet bodies. - RtcpPacket { - /// Raw RTCP packet data. - packet: &'a Rtcp, - /// Byte index into the packet body (after headers) for where the payload begins. - payload_offset: usize, - /// Number of bytes at the end of the packet to discard. - payload_end_pad: usize, - }, + SpeakingUpdate(SpeakingUpdateData), + /// Opus audio packet, received from another stream. + VoicePacket(VoiceData<'a>), + /// Telemetry/statistics packet, received from another stream. + RtcpPacket(RtcpData<'a>), /// Fired whenever a client connects to a call for the first time, allowing SSRC/UserID /// matching. ClientConnect(ClientConnect), /// Fired whenever a client disconnects. ClientDisconnect(ClientDisconnect), /// Fires when this driver successfully connects to a voice channel. - DriverConnect, + DriverConnect(ConnectData<'a>), /// Fires when this driver successfully reconnects after a network error. - DriverReconnect, + DriverReconnect(ConnectData<'a>), /// Fires when this driver fails to connect to a voice channel. DriverConnectFailed, /// Fires when this driver fails to reconnect to a voice channel after a network error. /// /// Users will need to manually reconnect on receipt of this error. DriverReconnectFailed, + #[deprecated( + since = "0.2.0", + note = "Please use the DriverConnect/Reconnect events instead." + )] /// Fires whenever the driver is assigned a new [RTP SSRC] by the voice server. /// /// This typically fires alongside a [DriverConnect], or a full [DriverReconnect]. + /// **This event is *deprecated* in favour of these alternatives**. /// /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 /// [DriverConnect]: Self::DriverConnect /// [DriverReconnect]: Self::DriverReconnect - // TODO: move assigned SSRC into payload of Driver(Re)Connect as part of next breaking, and deprecate this. + // TODO: remove in 0.3.x SsrcKnown(u32), } #[derive(Clone, Debug)] pub enum CoreContext { SpeakingStateUpdate(Speaking), - SpeakingUpdate { - ssrc: u32, - speaking: bool, - }, - VoicePacket { - audio: Option>, - packet: Rtp, - payload_offset: usize, - payload_end_pad: usize, - }, - RtcpPacket { - packet: Rtcp, - payload_offset: usize, - payload_end_pad: usize, - }, + SpeakingUpdate(InternalSpeakingUpdate), + VoicePacket(InternalVoicePacket), + RtcpPacket(InternalRtcpPacket), ClientConnect(ClientConnect), ClientDisconnect(ClientDisconnect), - DriverConnect, - DriverReconnect, + DriverConnect(InternalConnect), + DriverReconnect(InternalConnect), DriverConnectFailed, DriverReconnectFailed, SsrcKnown(u32), @@ -123,36 +90,16 @@ impl<'a> CoreContext { match self { SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), - SpeakingUpdate { ssrc, speaking } => EventContext::SpeakingUpdate { - ssrc: *ssrc, - speaking: *speaking, - }, - VoicePacket { - audio, - packet, - payload_offset, - payload_end_pad, - } => EventContext::VoicePacket { - audio, - packet, - payload_offset: *payload_offset, - payload_end_pad: *payload_end_pad, - }, - RtcpPacket { - packet, - payload_offset, - payload_end_pad, - } => EventContext::RtcpPacket { - packet, - payload_offset: *payload_offset, - payload_end_pad: *payload_end_pad, - }, + SpeakingUpdate(evt) => EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), + VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), + RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), ClientConnect(evt) => EventContext::ClientConnect(*evt), ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), - DriverConnect => EventContext::DriverConnect, - DriverReconnect => EventContext::DriverReconnect, + DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)), + DriverReconnect(evt) => EventContext::DriverReconnect(ConnectData::from(evt)), DriverConnectFailed => EventContext::DriverConnectFailed, DriverReconnectFailed => EventContext::DriverReconnectFailed, + #[allow(deprecated)] SsrcKnown(s) => EventContext::SsrcKnown(*s), } } @@ -171,10 +118,11 @@ impl EventContext<'_> { RtcpPacket { .. } => Some(CoreEvent::RtcpPacket), ClientConnect { .. } => Some(CoreEvent::ClientConnect), ClientDisconnect { .. } => Some(CoreEvent::ClientDisconnect), - DriverConnect => Some(CoreEvent::DriverConnect), - DriverReconnect => Some(CoreEvent::DriverReconnect), + DriverConnect(_) => Some(CoreEvent::DriverConnect), + DriverReconnect(_) => Some(CoreEvent::DriverReconnect), DriverConnectFailed => Some(CoreEvent::DriverConnectFailed), DriverReconnectFailed => Some(CoreEvent::DriverReconnectFailed), + #[allow(deprecated)] SsrcKnown(_) => Some(CoreEvent::SsrcKnown), _ => None, } diff --git a/src/events/core.rs b/src/events/core.rs index 218182c..803547f 100644 --- a/src/events/core.rs +++ b/src/events/core.rs @@ -46,6 +46,9 @@ pub enum CoreEvent { /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 /// [DriverConnect]: Self::DriverConnect /// [DriverReconnect]: Self::DriverReconnect - // TODO: deprecate in next breaking after fusing with Driver(Re)Connect. + #[deprecated( + since = "0.2.0", + note = "Please use the DriverConnect/Reconnect events instead." + )] SsrcKnown, } diff --git a/src/events/mod.rs b/src/events/mod.rs index 5bd59cb..51e9697 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,4 +1,51 @@ //! Events relating to tracks, timing, and other callers. +//! +//! ## Listening for events +//! Driver events in Songbird are composed of two parts: +//! * An [`Event`] to listen out for. These may be discrete events, +//! or generated by timers. +//! * An [`EventHandler`] to be called on receipt of an event. As event +//! handlers may be shared between several events, the handler is called +//! with an [`EventContext`] describing which event was fired. +//! +//! Event handlers are registered using functions such as [`Driver::add_global_event`], +//! or [`TrackHandle::add_event`], or. Internally, these pairs are stored +//! as [`EventData`]. +//! +//! ## `EventHandler` lifecycle +//! An event handler is essentially just an async function which may return +//! another type of event to listen out for (an `Option`). For instance, +//! [`Some(Event::Cancel)`] will remove that event listener, while `None` won't +//! change it at all. +//! +//! The exception is one-off events like [`Event::Delayed`], which remove themselves +//! after one call *unless* an [`Event`] override is returned. +//! +//! ## Global and local listeners +//! *Global* event listeners are those which are placed onto the [`Driver`], +//! while *local* event listeners are those which are placed on a +//! [`Track`]/[`Handle`]. +//! +//! Track or timed events, when local, return a reference to the parent track. +//! When registered globally, they fire on a per-tick basis, returning references to +//! all relevant tracks in that 20ms window. Global/local timed events use a global +//! timer or a [track's playback time], respectively. +//! +//! [`CoreEvent`]s may only be registered globally. +//! +//! [`Event`]: Event +//! [`EventHandler`]: EventHandler +//! [`EventContext`]: EventContext +//! [`Driver::add_global_event`]: crate::driver::Driver::add_global_event +//! [`Driver`]: crate::driver::Driver::add_global_event +//! [`TrackHandle::add_event`]: crate::tracks::TrackHandle::add_event +//! [`Track`]: crate::tracks::Track::events +//! [`Handle`]: crate::tracks::TrackHandle::add_event +//! [`EventData`]: EventData +//! [`Some(Event::Cancel)`]: Event::Cancel +//! [`Event::Delayed`]: Event::Delayed +//! [track's playback time]: crate::tracks::TrackState::play_time +//! [`CoreEvent`]: CoreEvent mod context; mod core; @@ -7,8 +54,15 @@ mod store; mod track; mod untimed; -pub use self::{context::EventContext, core::*, data::*, store::*, track::*, untimed::*}; -pub(crate) use context::CoreContext; +pub use self::{ + context::{context_data, EventContext}, + core::*, + data::*, + store::*, + track::*, + untimed::*, +}; +pub(crate) use context::{internal_data, CoreContext}; use async_trait::async_trait; use std::time::Duration; @@ -30,9 +84,9 @@ pub trait EventHandler: Send + Sync { /// or stops. In case this is required, global events are a better /// fit. /// -/// Event handlers themselves are described in [`EventData::action`]. +/// Event handlers themselves are described in [`EventData::new`]. /// -/// [`EventData::action`]: EventData::action +/// [`EventData::new`]: EventData::new #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[non_exhaustive] pub enum Event { diff --git a/src/events/store.rs b/src/events/store.rs index 67057df..9bec9e2 100644 --- a/src/events/store.rs +++ b/src/events/store.rs @@ -103,6 +103,19 @@ impl EventStore { } } + /// Processes all events due up to and including `now`. + pub(crate) fn timed_event_ready(&self, now: Duration) -> bool { + self.timed + .peek() + .map(|evt| { + evt.fire_time + .as_ref() + .expect("Timed event must have a fire_time.") + <= &now + }) + .unwrap_or(false) + } + /// Processes all events attached to the given track event. pub(crate) async fn process_untimed( &mut self, @@ -176,9 +189,13 @@ impl GlobalEvents { ) { // Global timed events self.time += TIMESTEP_LENGTH; - self.store - .process_timed(self.time, EventContext::Track(&[])) - .await; + if self.store.timed_event_ready(self.time) { + let global_ctx: Vec<(&TrackState, &TrackHandle)> = + states.iter().zip(handles.iter()).collect(); + self.store + .process_timed(self.time, EventContext::Track(&global_ctx[..])) + .await; + } // Local timed events for (i, state) in states.iter_mut().enumerate() {