Events: Break out and non-exhaust context body structs (#54)

This PR makes many of the types under `EventContext` separate `#[non_exhaustive]` structs. This makes it more feasible to add further information to connection and packet events as required in future. On this note, driver (re)connection events now include the SSRC supplied by Discord and the domain name which was connected to.

In addition, this fixes global timed events to return a list of all live tracks, and extensively details/documents events at a high level.

This was tested using `cargo make ready`.
This commit is contained in:
Kyle Simpson
2021-04-07 12:52:05 +01:00
parent 1bfee1b989
commit 27f26ade99
14 changed files with 321 additions and 129 deletions

View File

@@ -0,0 +1,21 @@
/// Voice connection details gathered at setup/reinstantiation.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
pub struct ConnectData<'a> {
/// The domain name of Discord's voice/TURN server.
///
/// With the introduction of Discord's automatic voice server selection,
/// this is no longer guaranteed to match a server's settings. This field
/// may be useful if you need/wish to move your voice connection to a node/shard
/// closer to Discord.
pub server: &'a str,
/// The [RTP SSRC] *("Synchronisation source")* assigned by the voice server
/// for the duration of this call.
///
/// All packets sent will use this SSRC, which is not related to the sender's User
/// ID. These are usually allocated sequentially by Discord, following on from
/// a random starting SSRC.
///
/// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3
pub ssrc: u32,
}

View File

@@ -0,0 +1,11 @@
//! Types containing the main body of an [`EventContext`].
//!
//! [`EventContext`]: super::EventContext
mod connect;
mod rtcp;
mod speaking;
mod voice;
use discortp::{rtcp::Rtcp, rtp::Rtp};
pub use self::{connect::*, rtcp::*, speaking::*, voice::*};

View File

@@ -0,0 +1,15 @@
use super::*;
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
/// Telemetry/statistics packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// to allow manual decoding of `Rtcp` packet bodies.
pub struct RtcpData<'a> {
/// Raw RTCP packet data.
pub packet: &'a Rtcp,
/// Byte index into the packet body (after headers) for where the payload begins.
pub payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
pub payload_end_pad: usize,
}

View File

@@ -0,0 +1,14 @@
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
/// Speaking state transition, describing whether a given source has started/stopped
/// transmitting. This fires in response to a silent burst, or the first packet
/// breaking such a burst.
pub struct SpeakingUpdateData {
/// Whether this user is currently speaking.
pub speaking: bool,
/// Synchronisation Source of the user who has begun speaking.
///
/// This must be combined with another event class to map this back to
/// its original UserId.
pub ssrc: u32,
}

View File

@@ -0,0 +1,20 @@
use super::*;
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
/// Opus audio packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// if extensions or raw packet data are required.
/// If `audio.len() == 0`, then this packet arrived out-of-order.
pub struct VoiceData<'a> {
/// Decoded audio from this packet.
pub audio: &'a Option<Vec<i16>>,
/// Raw RTP packet data.
///
/// Includes the SSRC (i.e., sender) of this packet.
pub packet: &'a Rtp,
/// Byte index into the packet body (after headers) for where the payload begins.
pub payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
pub payload_end_pad: usize,
}

View File

@@ -0,0 +1,68 @@
use super::context_data::*;
use discortp::{rtcp::Rtcp, rtp::Rtp};
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct InternalConnect {
pub server: String,
pub ssrc: u32,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct InternalSpeakingUpdate {
pub ssrc: u32,
pub speaking: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InternalVoicePacket {
pub audio: Option<Vec<i16>>,
pub packet: Rtp,
pub payload_offset: usize,
pub payload_end_pad: usize,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct InternalRtcpPacket {
pub packet: Rtcp,
pub payload_offset: usize,
pub payload_end_pad: usize,
}
impl<'a> From<&'a InternalConnect> for ConnectData<'a> {
fn from(val: &'a InternalConnect) -> Self {
Self {
server: &val.server,
ssrc: val.ssrc,
}
}
}
impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData {
fn from(val: &'a InternalSpeakingUpdate) -> Self {
Self {
speaking: val.speaking,
ssrc: val.ssrc,
}
}
}
impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> {
fn from(val: &'a InternalVoicePacket) -> Self {
Self {
audio: &val.audio,
packet: &val.packet,
payload_offset: val.payload_offset,
payload_end_pad: val.payload_end_pad,
}
}
}
impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> {
fn from(val: &'a InternalRtcpPacket) -> Self {
Self {
packet: &val.packet,
payload_offset: val.payload_offset,
payload_end_pad: val.payload_end_pad,
}
}
}

View File

@@ -1,9 +1,14 @@
pub mod data;
pub(crate) mod internal_data;
use super::*;
use crate::{
model::payload::{ClientConnect, ClientDisconnect, Speaking},
tracks::{TrackHandle, TrackState},
};
use discortp::{rtcp::Rtcp, rtp::Rtp};
pub use data as context_data;
use data::*;
use internal_data::*;
/// Information about which tracks or data fired an event.
///
@@ -28,90 +33,52 @@ pub enum EventContext<'a> {
/// Speaking state transition, describing whether a given source has started/stopped
/// transmitting. This fires in response to a silent burst, or the first packet
/// breaking such a burst.
SpeakingUpdate {
/// Synchronisation Source of the user who has begun speaking.
///
/// This must be combined with another event class to map this back to
/// its original UserId.
ssrc: u32,
/// Whether this user is currently speaking.
speaking: bool,
},
/// Opus audio packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// if extensions or raw packet data are required.
/// if `audio.len() == 0`, then this packet arrived out-of-order.
VoicePacket {
/// Decoded audio from this packet.
audio: &'a Option<Vec<i16>>,
/// Raw RTP packet data.
///
/// Includes the SSRC (i.e., sender) of this packet.
packet: &'a Rtp,
/// Byte index into the packet body (after headers) for where the payload begins.
payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
payload_end_pad: usize,
},
/// Telemetry/statistics packet, received from another stream (detailed in `packet`).
/// `payload_offset` contains the true payload location within the raw packet's `payload()`,
/// to allow manual decoding of `Rtcp` packet bodies.
RtcpPacket {
/// Raw RTCP packet data.
packet: &'a Rtcp,
/// Byte index into the packet body (after headers) for where the payload begins.
payload_offset: usize,
/// Number of bytes at the end of the packet to discard.
payload_end_pad: usize,
},
SpeakingUpdate(SpeakingUpdateData),
/// Opus audio packet, received from another stream.
VoicePacket(VoiceData<'a>),
/// Telemetry/statistics packet, received from another stream.
RtcpPacket(RtcpData<'a>),
/// Fired whenever a client connects to a call for the first time, allowing SSRC/UserID
/// matching.
ClientConnect(ClientConnect),
/// Fired whenever a client disconnects.
ClientDisconnect(ClientDisconnect),
/// Fires when this driver successfully connects to a voice channel.
DriverConnect,
DriverConnect(ConnectData<'a>),
/// Fires when this driver successfully reconnects after a network error.
DriverReconnect,
DriverReconnect(ConnectData<'a>),
/// Fires when this driver fails to connect to a voice channel.
DriverConnectFailed,
/// Fires when this driver fails to reconnect to a voice channel after a network error.
///
/// Users will need to manually reconnect on receipt of this error.
DriverReconnectFailed,
#[deprecated(
since = "0.2.0",
note = "Please use the DriverConnect/Reconnect events instead."
)]
/// Fires whenever the driver is assigned a new [RTP SSRC] by the voice server.
///
/// This typically fires alongside a [DriverConnect], or a full [DriverReconnect].
/// **This event is *deprecated* in favour of these alternatives**.
///
/// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3
/// [DriverConnect]: Self::DriverConnect
/// [DriverReconnect]: Self::DriverReconnect
// TODO: move assigned SSRC into payload of Driver(Re)Connect as part of next breaking, and deprecate this.
// TODO: remove in 0.3.x
SsrcKnown(u32),
}
#[derive(Clone, Debug)]
pub enum CoreContext {
SpeakingStateUpdate(Speaking),
SpeakingUpdate {
ssrc: u32,
speaking: bool,
},
VoicePacket {
audio: Option<Vec<i16>>,
packet: Rtp,
payload_offset: usize,
payload_end_pad: usize,
},
RtcpPacket {
packet: Rtcp,
payload_offset: usize,
payload_end_pad: usize,
},
SpeakingUpdate(InternalSpeakingUpdate),
VoicePacket(InternalVoicePacket),
RtcpPacket(InternalRtcpPacket),
ClientConnect(ClientConnect),
ClientDisconnect(ClientDisconnect),
DriverConnect,
DriverReconnect,
DriverConnect(InternalConnect),
DriverReconnect(InternalConnect),
DriverConnectFailed,
DriverReconnectFailed,
SsrcKnown(u32),
@@ -123,36 +90,16 @@ impl<'a> CoreContext {
match self {
SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt),
SpeakingUpdate { ssrc, speaking } => EventContext::SpeakingUpdate {
ssrc: *ssrc,
speaking: *speaking,
},
VoicePacket {
audio,
packet,
payload_offset,
payload_end_pad,
} => EventContext::VoicePacket {
audio,
packet,
payload_offset: *payload_offset,
payload_end_pad: *payload_end_pad,
},
RtcpPacket {
packet,
payload_offset,
payload_end_pad,
} => EventContext::RtcpPacket {
packet,
payload_offset: *payload_offset,
payload_end_pad: *payload_end_pad,
},
SpeakingUpdate(evt) => EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)),
VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)),
RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)),
ClientConnect(evt) => EventContext::ClientConnect(*evt),
ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt),
DriverConnect => EventContext::DriverConnect,
DriverReconnect => EventContext::DriverReconnect,
DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)),
DriverReconnect(evt) => EventContext::DriverReconnect(ConnectData::from(evt)),
DriverConnectFailed => EventContext::DriverConnectFailed,
DriverReconnectFailed => EventContext::DriverReconnectFailed,
#[allow(deprecated)]
SsrcKnown(s) => EventContext::SsrcKnown(*s),
}
}
@@ -171,10 +118,11 @@ impl EventContext<'_> {
RtcpPacket { .. } => Some(CoreEvent::RtcpPacket),
ClientConnect { .. } => Some(CoreEvent::ClientConnect),
ClientDisconnect { .. } => Some(CoreEvent::ClientDisconnect),
DriverConnect => Some(CoreEvent::DriverConnect),
DriverReconnect => Some(CoreEvent::DriverReconnect),
DriverConnect(_) => Some(CoreEvent::DriverConnect),
DriverReconnect(_) => Some(CoreEvent::DriverReconnect),
DriverConnectFailed => Some(CoreEvent::DriverConnectFailed),
DriverReconnectFailed => Some(CoreEvent::DriverReconnectFailed),
#[allow(deprecated)]
SsrcKnown(_) => Some(CoreEvent::SsrcKnown),
_ => None,
}

View File

@@ -46,6 +46,9 @@ pub enum CoreEvent {
/// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3
/// [DriverConnect]: Self::DriverConnect
/// [DriverReconnect]: Self::DriverReconnect
// TODO: deprecate in next breaking after fusing with Driver(Re)Connect.
#[deprecated(
since = "0.2.0",
note = "Please use the DriverConnect/Reconnect events instead."
)]
SsrcKnown,
}

View File

@@ -1,4 +1,51 @@
//! Events relating to tracks, timing, and other callers.
//!
//! ## Listening for events
//! Driver events in Songbird are composed of two parts:
//! * An [`Event`] to listen out for. These may be discrete events,
//! or generated by timers.
//! * An [`EventHandler`] to be called on receipt of an event. As event
//! handlers may be shared between several events, the handler is called
//! with an [`EventContext`] describing which event was fired.
//!
//! Event handlers are registered using functions such as [`Driver::add_global_event`],
//! or [`TrackHandle::add_event`], or. Internally, these pairs are stored
//! as [`EventData`].
//!
//! ## `EventHandler` lifecycle
//! An event handler is essentially just an async function which may return
//! another type of event to listen out for (an `Option<Event>`). For instance,
//! [`Some(Event::Cancel)`] will remove that event listener, while `None` won't
//! change it at all.
//!
//! The exception is one-off events like [`Event::Delayed`], which remove themselves
//! after one call *unless* an [`Event`] override is returned.
//!
//! ## Global and local listeners
//! *Global* event listeners are those which are placed onto the [`Driver`],
//! while *local* event listeners are those which are placed on a
//! [`Track`]/[`Handle`].
//!
//! Track or timed events, when local, return a reference to the parent track.
//! When registered globally, they fire on a per-tick basis, returning references to
//! all relevant tracks in that 20ms window. Global/local timed events use a global
//! timer or a [track's playback time], respectively.
//!
//! [`CoreEvent`]s may only be registered globally.
//!
//! [`Event`]: Event
//! [`EventHandler`]: EventHandler
//! [`EventContext`]: EventContext
//! [`Driver::add_global_event`]: crate::driver::Driver::add_global_event
//! [`Driver`]: crate::driver::Driver::add_global_event
//! [`TrackHandle::add_event`]: crate::tracks::TrackHandle::add_event
//! [`Track`]: crate::tracks::Track::events
//! [`Handle`]: crate::tracks::TrackHandle::add_event
//! [`EventData`]: EventData
//! [`Some(Event::Cancel)`]: Event::Cancel
//! [`Event::Delayed`]: Event::Delayed
//! [track's playback time]: crate::tracks::TrackState::play_time
//! [`CoreEvent`]: CoreEvent
mod context;
mod core;
@@ -7,8 +54,15 @@ mod store;
mod track;
mod untimed;
pub use self::{context::EventContext, core::*, data::*, store::*, track::*, untimed::*};
pub(crate) use context::CoreContext;
pub use self::{
context::{context_data, EventContext},
core::*,
data::*,
store::*,
track::*,
untimed::*,
};
pub(crate) use context::{internal_data, CoreContext};
use async_trait::async_trait;
use std::time::Duration;
@@ -30,9 +84,9 @@ pub trait EventHandler: Send + Sync {
/// or stops. In case this is required, global events are a better
/// fit.
///
/// Event handlers themselves are described in [`EventData::action`].
/// Event handlers themselves are described in [`EventData::new`].
///
/// [`EventData::action`]: EventData::action
/// [`EventData::new`]: EventData::new
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[non_exhaustive]
pub enum Event {

View File

@@ -103,6 +103,19 @@ impl EventStore {
}
}
/// Processes all events due up to and including `now`.
pub(crate) fn timed_event_ready(&self, now: Duration) -> bool {
self.timed
.peek()
.map(|evt| {
evt.fire_time
.as_ref()
.expect("Timed event must have a fire_time.")
<= &now
})
.unwrap_or(false)
}
/// Processes all events attached to the given track event.
pub(crate) async fn process_untimed(
&mut self,
@@ -176,9 +189,13 @@ impl GlobalEvents {
) {
// Global timed events
self.time += TIMESTEP_LENGTH;
self.store
.process_timed(self.time, EventContext::Track(&[]))
.await;
if self.store.timed_event_ready(self.time) {
let global_ctx: Vec<(&TrackState, &TrackHandle)> =
states.iter().zip(handles.iter()).collect();
self.store
.process_timed(self.time, EventContext::Track(&global_ctx[..]))
.await;
}
// Local timed events
for (i, state) in states.iter_mut().enumerate() {