From 27f26ade994d1f4455d64f214678176151f73183 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Wed, 7 Apr 2021 12:52:05 +0100 Subject: [PATCH] Events: Break out and non-exhaust context body structs (#54) This PR makes many of the types under `EventContext` separate `#[non_exhaustive]` structs. This makes it more feasible to add further information to connection and packet events as required in future. On this note, driver (re)connection events now include the SSRC supplied by Discord and the domain name which was connected to. In addition, this fixes global timed events to return a list of all live tracks, and extensively details/documents events at a high level. This was tested using `cargo make ready`. --- examples/serenity/voice_receive/src/main.rs | 20 ++-- src/driver/connection/mod.rs | 2 + src/driver/tasks/mod.rs | 36 ++++-- src/driver/tasks/udp_rx.rs | 35 +++--- src/events/context/data/connect.rs | 21 ++++ src/events/context/data/mod.rs | 11 ++ src/events/context/data/rtcp.rs | 15 +++ src/events/context/data/speaking.rs | 14 +++ src/events/context/data/voice.rs | 20 ++++ src/events/context/internal_data.rs | 68 +++++++++++ src/events/{context.rs => context/mod.rs} | 118 ++++++-------------- src/events/core.rs | 5 +- src/events/mod.rs | 62 +++++++++- src/events/store.rs | 23 +++- 14 files changed, 321 insertions(+), 129 deletions(-) create mode 100644 src/events/context/data/connect.rs create mode 100644 src/events/context/data/mod.rs create mode 100644 src/events/context/data/rtcp.rs create mode 100644 src/events/context/data/speaking.rs create mode 100644 src/events/context/data/voice.rs create mode 100644 src/events/context/internal_data.rs rename src/events/{context.rs => context/mod.rs} (56%) diff --git a/examples/serenity/voice_receive/src/main.rs b/examples/serenity/voice_receive/src/main.rs index dbaf47a..82cf170 100644 --- a/examples/serenity/voice_receive/src/main.rs +++ b/examples/serenity/voice_receive/src/main.rs @@ -84,35 +84,35 @@ impl VoiceEventHandler for Receiver { speaking, ); }, - Ctx::SpeakingUpdate {ssrc, speaking} => { + Ctx::SpeakingUpdate(data) => { // You can implement logic here which reacts to a user starting // or stopping speaking. println!( "Source {} has {} speaking.", - ssrc, - if *speaking {"started"} else {"stopped"}, + data.ssrc, + if data.speaking {"started"} else {"stopped"}, ); }, - Ctx::VoicePacket {audio, packet, payload_offset, payload_end_pad} => { + Ctx::VoicePacket(data) => { // An event which fires for every received audio packet, // containing the decoded data. - if let Some(audio) = audio { + if let Some(audio) = data.audio { println!("Audio packet's first 5 samples: {:?}", audio.get(..5.min(audio.len()))); println!( "Audio packet sequence {:05} has {:04} bytes (decompressed from {}), SSRC {}", - packet.sequence.0, + data.packet.sequence.0, audio.len() * std::mem::size_of::(), - packet.payload.len(), - packet.ssrc, + data.packet.payload.len(), + data.packet.ssrc, ); } else { println!("RTP packet, but no audio. Driver may not be configured to decode."); } }, - Ctx::RtcpPacket {packet, payload_offset, payload_end_pad} => { + Ctx::RtcpPacket(data) => { // An event which fires for every received rtcp packet, // containing the call statistics and reporting information. - println!("RTCP packet received: {:?}", packet); + println!("RTCP packet received: {:?}", data.packet); }, Ctx::ClientConnect( ClientConnect {audio_ssrc, video_ssrc, user_id, ..} diff --git a/src/driver/connection/mod.rs b/src/driver/connection/mod.rs index 1c89797..85922a5 100644 --- a/src/driver/connection/mod.rs +++ b/src/driver/connection/mod.rs @@ -36,6 +36,7 @@ use ws::create_native_tls_client; pub(crate) struct Connection { pub(crate) info: ConnectionInfo, + pub(crate) ssrc: u32, pub(crate) ws: Sender, } @@ -219,6 +220,7 @@ impl Connection { Ok(Connection { info, + ssrc, ws: ws_msg_tx, }) } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index f85e3fe..8f9c26c 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -10,7 +10,10 @@ pub(crate) mod udp_tx; pub(crate) mod ws; use super::connection::{error::Error as ConnectionError, Connection}; -use crate::{events::CoreContext, Config}; +use crate::{ + events::{internal_data::InternalConnect, CoreContext}, + Config, +}; use flume::{Receiver, RecvError, Sender}; use message::*; #[cfg(not(feature = "tokio-02-marker"))] @@ -78,9 +81,12 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender, tx: Sender, tx: Sender { diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index d118fd5..96b1f72 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -3,7 +3,11 @@ use super::{ message::*, Config, }; -use crate::{constants::*, driver::DecodeMode, events::CoreContext}; +use crate::{ + constants::*, + driver::DecodeMode, + events::{internal_data::*, CoreContext}, +}; use audiopus::{ coder::Decoder as OpusDecoder, error::{Error as OpusError, ErrorCode}, @@ -322,30 +326,30 @@ impl UdpRx { match delta { SpeakingDelta::Start => { let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate { + CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: true, - }, + }), )); }, SpeakingDelta::Stop => { let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::SpeakingUpdate { + CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: false, - }, + }), )); }, _ => {}, } let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::VoicePacket { + CoreContext::VoicePacket(InternalVoicePacket { audio, packet: rtp.from_packet(), payload_offset: rtp_body_start, payload_end_pad: rtp_body_tail, - }, + }), )); } else { warn!("RTP decoding/processing failed."); @@ -371,13 +375,16 @@ impl UdpRx { ) }); - let _ = interconnect.events.send(EventMessage::FireCoreEvent( - CoreContext::RtcpPacket { - packet: rtcp.from_packet(), - payload_offset: start, - payload_end_pad: tail, - }, - )); + let _ = + interconnect + .events + .send(EventMessage::FireCoreEvent(CoreContext::RtcpPacket( + InternalRtcpPacket { + packet: rtcp.from_packet(), + payload_offset: start, + payload_end_pad: tail, + }, + ))); }, DemuxedMut::FailedParse(t) => { warn!("Failed to parse message of type {:?}.", t); diff --git a/src/events/context/data/connect.rs b/src/events/context/data/connect.rs new file mode 100644 index 0000000..c618828 --- /dev/null +++ b/src/events/context/data/connect.rs @@ -0,0 +1,21 @@ +/// Voice connection details gathered at setup/reinstantiation. +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +pub struct ConnectData<'a> { + /// The domain name of Discord's voice/TURN server. + /// + /// With the introduction of Discord's automatic voice server selection, + /// this is no longer guaranteed to match a server's settings. This field + /// may be useful if you need/wish to move your voice connection to a node/shard + /// closer to Discord. + pub server: &'a str, + /// The [RTP SSRC] *("Synchronisation source")* assigned by the voice server + /// for the duration of this call. + /// + /// All packets sent will use this SSRC, which is not related to the sender's User + /// ID. These are usually allocated sequentially by Discord, following on from + /// a random starting SSRC. + /// + /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 + pub ssrc: u32, +} diff --git a/src/events/context/data/mod.rs b/src/events/context/data/mod.rs new file mode 100644 index 0000000..6ed8211 --- /dev/null +++ b/src/events/context/data/mod.rs @@ -0,0 +1,11 @@ +//! Types containing the main body of an [`EventContext`]. +//! +//! [`EventContext`]: super::EventContext +mod connect; +mod rtcp; +mod speaking; +mod voice; + +use discortp::{rtcp::Rtcp, rtp::Rtp}; + +pub use self::{connect::*, rtcp::*, speaking::*, voice::*}; diff --git a/src/events/context/data/rtcp.rs b/src/events/context/data/rtcp.rs new file mode 100644 index 0000000..348a4f5 --- /dev/null +++ b/src/events/context/data/rtcp.rs @@ -0,0 +1,15 @@ +use super::*; + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Telemetry/statistics packet, received from another stream (detailed in `packet`). +/// `payload_offset` contains the true payload location within the raw packet's `payload()`, +/// to allow manual decoding of `Rtcp` packet bodies. +pub struct RtcpData<'a> { + /// Raw RTCP packet data. + pub packet: &'a Rtcp, + /// Byte index into the packet body (after headers) for where the payload begins. + pub payload_offset: usize, + /// Number of bytes at the end of the packet to discard. + pub payload_end_pad: usize, +} diff --git a/src/events/context/data/speaking.rs b/src/events/context/data/speaking.rs new file mode 100644 index 0000000..8b31b99 --- /dev/null +++ b/src/events/context/data/speaking.rs @@ -0,0 +1,14 @@ +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +/// Speaking state transition, describing whether a given source has started/stopped +/// transmitting. This fires in response to a silent burst, or the first packet +/// breaking such a burst. +pub struct SpeakingUpdateData { + /// Whether this user is currently speaking. + pub speaking: bool, + /// Synchronisation Source of the user who has begun speaking. + /// + /// This must be combined with another event class to map this back to + /// its original UserId. + pub ssrc: u32, +} diff --git a/src/events/context/data/voice.rs b/src/events/context/data/voice.rs new file mode 100644 index 0000000..46aa071 --- /dev/null +++ b/src/events/context/data/voice.rs @@ -0,0 +1,20 @@ +use super::*; + +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +/// Opus audio packet, received from another stream (detailed in `packet`). +/// `payload_offset` contains the true payload location within the raw packet's `payload()`, +/// if extensions or raw packet data are required. +/// If `audio.len() == 0`, then this packet arrived out-of-order. +pub struct VoiceData<'a> { + /// Decoded audio from this packet. + pub audio: &'a Option>, + /// Raw RTP packet data. + /// + /// Includes the SSRC (i.e., sender) of this packet. + pub packet: &'a Rtp, + /// Byte index into the packet body (after headers) for where the payload begins. + pub payload_offset: usize, + /// Number of bytes at the end of the packet to discard. + pub payload_end_pad: usize, +} diff --git a/src/events/context/internal_data.rs b/src/events/context/internal_data.rs new file mode 100644 index 0000000..4a26a18 --- /dev/null +++ b/src/events/context/internal_data.rs @@ -0,0 +1,68 @@ +use super::context_data::*; +use discortp::{rtcp::Rtcp, rtp::Rtp}; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct InternalConnect { + pub server: String, + pub ssrc: u32, +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct InternalSpeakingUpdate { + pub ssrc: u32, + pub speaking: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct InternalVoicePacket { + pub audio: Option>, + pub packet: Rtp, + pub payload_offset: usize, + pub payload_end_pad: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct InternalRtcpPacket { + pub packet: Rtcp, + pub payload_offset: usize, + pub payload_end_pad: usize, +} + +impl<'a> From<&'a InternalConnect> for ConnectData<'a> { + fn from(val: &'a InternalConnect) -> Self { + Self { + server: &val.server, + ssrc: val.ssrc, + } + } +} + +impl<'a> From<&'a InternalSpeakingUpdate> for SpeakingUpdateData { + fn from(val: &'a InternalSpeakingUpdate) -> Self { + Self { + speaking: val.speaking, + ssrc: val.ssrc, + } + } +} + +impl<'a> From<&'a InternalVoicePacket> for VoiceData<'a> { + fn from(val: &'a InternalVoicePacket) -> Self { + Self { + audio: &val.audio, + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } + } +} + +impl<'a> From<&'a InternalRtcpPacket> for RtcpData<'a> { + fn from(val: &'a InternalRtcpPacket) -> Self { + Self { + packet: &val.packet, + payload_offset: val.payload_offset, + payload_end_pad: val.payload_end_pad, + } + } +} diff --git a/src/events/context.rs b/src/events/context/mod.rs similarity index 56% rename from src/events/context.rs rename to src/events/context/mod.rs index 160a6f6..71ee51f 100644 --- a/src/events/context.rs +++ b/src/events/context/mod.rs @@ -1,9 +1,14 @@ +pub mod data; +pub(crate) mod internal_data; + use super::*; use crate::{ model::payload::{ClientConnect, ClientDisconnect, Speaking}, tracks::{TrackHandle, TrackState}, }; -use discortp::{rtcp::Rtcp, rtp::Rtp}; +pub use data as context_data; +use data::*; +use internal_data::*; /// Information about which tracks or data fired an event. /// @@ -28,90 +33,52 @@ pub enum EventContext<'a> { /// Speaking state transition, describing whether a given source has started/stopped /// transmitting. This fires in response to a silent burst, or the first packet /// breaking such a burst. - SpeakingUpdate { - /// Synchronisation Source of the user who has begun speaking. - /// - /// This must be combined with another event class to map this back to - /// its original UserId. - ssrc: u32, - /// Whether this user is currently speaking. - speaking: bool, - }, - /// Opus audio packet, received from another stream (detailed in `packet`). - /// `payload_offset` contains the true payload location within the raw packet's `payload()`, - /// if extensions or raw packet data are required. - /// if `audio.len() == 0`, then this packet arrived out-of-order. - VoicePacket { - /// Decoded audio from this packet. - audio: &'a Option>, - /// Raw RTP packet data. - /// - /// Includes the SSRC (i.e., sender) of this packet. - packet: &'a Rtp, - /// Byte index into the packet body (after headers) for where the payload begins. - payload_offset: usize, - /// Number of bytes at the end of the packet to discard. - payload_end_pad: usize, - }, - /// Telemetry/statistics packet, received from another stream (detailed in `packet`). - /// `payload_offset` contains the true payload location within the raw packet's `payload()`, - /// to allow manual decoding of `Rtcp` packet bodies. - RtcpPacket { - /// Raw RTCP packet data. - packet: &'a Rtcp, - /// Byte index into the packet body (after headers) for where the payload begins. - payload_offset: usize, - /// Number of bytes at the end of the packet to discard. - payload_end_pad: usize, - }, + SpeakingUpdate(SpeakingUpdateData), + /// Opus audio packet, received from another stream. + VoicePacket(VoiceData<'a>), + /// Telemetry/statistics packet, received from another stream. + RtcpPacket(RtcpData<'a>), /// Fired whenever a client connects to a call for the first time, allowing SSRC/UserID /// matching. ClientConnect(ClientConnect), /// Fired whenever a client disconnects. ClientDisconnect(ClientDisconnect), /// Fires when this driver successfully connects to a voice channel. - DriverConnect, + DriverConnect(ConnectData<'a>), /// Fires when this driver successfully reconnects after a network error. - DriverReconnect, + DriverReconnect(ConnectData<'a>), /// Fires when this driver fails to connect to a voice channel. DriverConnectFailed, /// Fires when this driver fails to reconnect to a voice channel after a network error. /// /// Users will need to manually reconnect on receipt of this error. DriverReconnectFailed, + #[deprecated( + since = "0.2.0", + note = "Please use the DriverConnect/Reconnect events instead." + )] /// Fires whenever the driver is assigned a new [RTP SSRC] by the voice server. /// /// This typically fires alongside a [DriverConnect], or a full [DriverReconnect]. + /// **This event is *deprecated* in favour of these alternatives**. /// /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 /// [DriverConnect]: Self::DriverConnect /// [DriverReconnect]: Self::DriverReconnect - // TODO: move assigned SSRC into payload of Driver(Re)Connect as part of next breaking, and deprecate this. + // TODO: remove in 0.3.x SsrcKnown(u32), } #[derive(Clone, Debug)] pub enum CoreContext { SpeakingStateUpdate(Speaking), - SpeakingUpdate { - ssrc: u32, - speaking: bool, - }, - VoicePacket { - audio: Option>, - packet: Rtp, - payload_offset: usize, - payload_end_pad: usize, - }, - RtcpPacket { - packet: Rtcp, - payload_offset: usize, - payload_end_pad: usize, - }, + SpeakingUpdate(InternalSpeakingUpdate), + VoicePacket(InternalVoicePacket), + RtcpPacket(InternalRtcpPacket), ClientConnect(ClientConnect), ClientDisconnect(ClientDisconnect), - DriverConnect, - DriverReconnect, + DriverConnect(InternalConnect), + DriverReconnect(InternalConnect), DriverConnectFailed, DriverReconnectFailed, SsrcKnown(u32), @@ -123,36 +90,16 @@ impl<'a> CoreContext { match self { SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), - SpeakingUpdate { ssrc, speaking } => EventContext::SpeakingUpdate { - ssrc: *ssrc, - speaking: *speaking, - }, - VoicePacket { - audio, - packet, - payload_offset, - payload_end_pad, - } => EventContext::VoicePacket { - audio, - packet, - payload_offset: *payload_offset, - payload_end_pad: *payload_end_pad, - }, - RtcpPacket { - packet, - payload_offset, - payload_end_pad, - } => EventContext::RtcpPacket { - packet, - payload_offset: *payload_offset, - payload_end_pad: *payload_end_pad, - }, + SpeakingUpdate(evt) => EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), + VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), + RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), ClientConnect(evt) => EventContext::ClientConnect(*evt), ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), - DriverConnect => EventContext::DriverConnect, - DriverReconnect => EventContext::DriverReconnect, + DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)), + DriverReconnect(evt) => EventContext::DriverReconnect(ConnectData::from(evt)), DriverConnectFailed => EventContext::DriverConnectFailed, DriverReconnectFailed => EventContext::DriverReconnectFailed, + #[allow(deprecated)] SsrcKnown(s) => EventContext::SsrcKnown(*s), } } @@ -171,10 +118,11 @@ impl EventContext<'_> { RtcpPacket { .. } => Some(CoreEvent::RtcpPacket), ClientConnect { .. } => Some(CoreEvent::ClientConnect), ClientDisconnect { .. } => Some(CoreEvent::ClientDisconnect), - DriverConnect => Some(CoreEvent::DriverConnect), - DriverReconnect => Some(CoreEvent::DriverReconnect), + DriverConnect(_) => Some(CoreEvent::DriverConnect), + DriverReconnect(_) => Some(CoreEvent::DriverReconnect), DriverConnectFailed => Some(CoreEvent::DriverConnectFailed), DriverReconnectFailed => Some(CoreEvent::DriverReconnectFailed), + #[allow(deprecated)] SsrcKnown(_) => Some(CoreEvent::SsrcKnown), _ => None, } diff --git a/src/events/core.rs b/src/events/core.rs index 218182c..803547f 100644 --- a/src/events/core.rs +++ b/src/events/core.rs @@ -46,6 +46,9 @@ pub enum CoreEvent { /// [RTP SSRC]: https://tools.ietf.org/html/rfc3550#section-3 /// [DriverConnect]: Self::DriverConnect /// [DriverReconnect]: Self::DriverReconnect - // TODO: deprecate in next breaking after fusing with Driver(Re)Connect. + #[deprecated( + since = "0.2.0", + note = "Please use the DriverConnect/Reconnect events instead." + )] SsrcKnown, } diff --git a/src/events/mod.rs b/src/events/mod.rs index 5bd59cb..51e9697 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,4 +1,51 @@ //! Events relating to tracks, timing, and other callers. +//! +//! ## Listening for events +//! Driver events in Songbird are composed of two parts: +//! * An [`Event`] to listen out for. These may be discrete events, +//! or generated by timers. +//! * An [`EventHandler`] to be called on receipt of an event. As event +//! handlers may be shared between several events, the handler is called +//! with an [`EventContext`] describing which event was fired. +//! +//! Event handlers are registered using functions such as [`Driver::add_global_event`], +//! or [`TrackHandle::add_event`], or. Internally, these pairs are stored +//! as [`EventData`]. +//! +//! ## `EventHandler` lifecycle +//! An event handler is essentially just an async function which may return +//! another type of event to listen out for (an `Option`). For instance, +//! [`Some(Event::Cancel)`] will remove that event listener, while `None` won't +//! change it at all. +//! +//! The exception is one-off events like [`Event::Delayed`], which remove themselves +//! after one call *unless* an [`Event`] override is returned. +//! +//! ## Global and local listeners +//! *Global* event listeners are those which are placed onto the [`Driver`], +//! while *local* event listeners are those which are placed on a +//! [`Track`]/[`Handle`]. +//! +//! Track or timed events, when local, return a reference to the parent track. +//! When registered globally, they fire on a per-tick basis, returning references to +//! all relevant tracks in that 20ms window. Global/local timed events use a global +//! timer or a [track's playback time], respectively. +//! +//! [`CoreEvent`]s may only be registered globally. +//! +//! [`Event`]: Event +//! [`EventHandler`]: EventHandler +//! [`EventContext`]: EventContext +//! [`Driver::add_global_event`]: crate::driver::Driver::add_global_event +//! [`Driver`]: crate::driver::Driver::add_global_event +//! [`TrackHandle::add_event`]: crate::tracks::TrackHandle::add_event +//! [`Track`]: crate::tracks::Track::events +//! [`Handle`]: crate::tracks::TrackHandle::add_event +//! [`EventData`]: EventData +//! [`Some(Event::Cancel)`]: Event::Cancel +//! [`Event::Delayed`]: Event::Delayed +//! [track's playback time]: crate::tracks::TrackState::play_time +//! [`CoreEvent`]: CoreEvent mod context; mod core; @@ -7,8 +54,15 @@ mod store; mod track; mod untimed; -pub use self::{context::EventContext, core::*, data::*, store::*, track::*, untimed::*}; -pub(crate) use context::CoreContext; +pub use self::{ + context::{context_data, EventContext}, + core::*, + data::*, + store::*, + track::*, + untimed::*, +}; +pub(crate) use context::{internal_data, CoreContext}; use async_trait::async_trait; use std::time::Duration; @@ -30,9 +84,9 @@ pub trait EventHandler: Send + Sync { /// or stops. In case this is required, global events are a better /// fit. /// -/// Event handlers themselves are described in [`EventData::action`]. +/// Event handlers themselves are described in [`EventData::new`]. /// -/// [`EventData::action`]: EventData::action +/// [`EventData::new`]: EventData::new #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[non_exhaustive] pub enum Event { diff --git a/src/events/store.rs b/src/events/store.rs index 67057df..9bec9e2 100644 --- a/src/events/store.rs +++ b/src/events/store.rs @@ -103,6 +103,19 @@ impl EventStore { } } + /// Processes all events due up to and including `now`. + pub(crate) fn timed_event_ready(&self, now: Duration) -> bool { + self.timed + .peek() + .map(|evt| { + evt.fire_time + .as_ref() + .expect("Timed event must have a fire_time.") + <= &now + }) + .unwrap_or(false) + } + /// Processes all events attached to the given track event. pub(crate) async fn process_untimed( &mut self, @@ -176,9 +189,13 @@ impl GlobalEvents { ) { // Global timed events self.time += TIMESTEP_LENGTH; - self.store - .process_timed(self.time, EventContext::Track(&[])) - .await; + if self.store.timed_event_ready(self.time) { + let global_ctx: Vec<(&TrackState, &TrackHandle)> = + states.iter().zip(handles.iter()).collect(); + self.store + .process_timed(self.time, EventContext::Track(&global_ctx[..])) + .await; + } // Local timed events for (i, state) in states.iter_mut().enumerate() {