TrackQueues: Convenience methods and extension (#7)

* Adds a uuid field to tracks and handles to make it easier to identify and match event sources after the fact.
* Adds optional feature "builtin-queue" to expose a queue on every driver, as a convenience for users who can guarantee they'll need a queue for every driver/call.
* Adds methods to queues to allow access to the currently running track handle, remove a specified queue entry, as well as to mutate the underlying queue from a closure.
This commit is contained in:
Kyle Simpson
2020-11-16 08:57:54 +00:00
committed by GitHub
parent 047ce0379a
commit de652250d8
7 changed files with 387 additions and 119 deletions

View File

@@ -19,10 +19,12 @@ use connection::error::Result;
pub use crypto::*;
pub use decode_mode::DecodeMode;
#[cfg(feature = "builtin-queue")]
use crate::tracks::TrackQueue;
use crate::{
events::EventData,
input::Input,
tracks::{Track, TrackHandle},
tracks::{self, Track, TrackHandle},
ConnectionInfo,
Event,
EventHandler,
@@ -34,11 +36,16 @@ use tracing::instrument;
/// The control object for a Discord voice connection, handling connection,
/// mixing, encoding, en/decryption, and event generation.
///
/// When compiled with the `"builtin-queue"` feature, each driver includes a track queue
/// as a convenience to prevent the additional overhead of per-guild state management.
#[derive(Clone, Debug)]
pub struct Driver {
config: Config,
self_mute: bool,
sender: Sender<CoreMessage>,
#[cfg(feature = "builtin-queue")]
queue: TrackQueue,
}
impl Driver {
@@ -53,6 +60,8 @@ impl Driver {
config,
self_mute: false,
sender,
#[cfg(feature = "builtin-queue")]
queue: Default::default(),
}
}
@@ -226,6 +235,42 @@ impl Driver {
}
}
#[cfg(feature = "builtin-queue")]
impl Driver {
/// Returns a reference to this driver's built-in queue.
///
/// Requires the `"builtin-queue"` feature.
/// Queue additions should be made via [`enqueue`] and
/// [`enqueue_source`].
///
/// [`enqueue`]: #method.enqueue
/// [`enqueue_source`]: #method.enqueue_source
pub fn queue(&self) -> &TrackQueue {
&self.queue
}
/// Adds an audio [`Input`] to this driver's built-in queue.
///
/// Requires the `"builtin-queue"` feature.
///
/// [`Input`]: ../input/struct.input.html
pub fn enqueue_source(&mut self, source: Input) {
let (mut track, _) = tracks::create_player(source);
self.queue.add_raw(&mut track);
self.play(track);
}
/// Adds an existing [`Track`] to this driver's built-in queue.
///
/// Requires the `"builtin-queue"` feature.
///
/// [`Track`]: ../tracks/struct.track.html
pub fn enqueue(&mut self, mut track: Track) {
self.queue.add_raw(&mut track);
self.play(track);
}
}
impl Default for Driver {
fn default() -> Self {
Self::new(Default::default())

View File

@@ -5,6 +5,7 @@ use tokio::sync::{
mpsc::{error::SendError, UnboundedSender},
oneshot,
};
use uuid::Uuid;
#[derive(Clone, Debug)]
/// Handle for safe control of a [`Track`] track from other threads, outside
@@ -18,6 +19,7 @@ use tokio::sync::{
pub struct TrackHandle {
command_channel: UnboundedSender<TrackCommand>,
seekable: bool,
uuid: Uuid,
}
impl TrackHandle {
@@ -25,10 +27,11 @@ impl TrackHandle {
/// the underlying [`Input`] supports seek operations.
///
/// [`Input`]: ../input/struct.Input.html
pub fn new(command_channel: UnboundedSender<TrackCommand>, seekable: bool) -> Self {
pub fn new(command_channel: UnboundedSender<TrackCommand>, seekable: bool, uuid: Uuid) -> Self {
Self {
command_channel,
seekable,
uuid,
}
}
@@ -149,6 +152,11 @@ impl TrackHandle {
}
}
/// Returns this handle's (and track's) unique identifier.
pub fn uuid(&self) -> Uuid {
self.uuid
}
#[inline]
/// Send a raw command to the [`Track`] object.
///

View File

@@ -33,6 +33,7 @@ use tokio::sync::{
},
oneshot::Receiver as OneshotReceiver,
};
use uuid::Uuid;
/// Control object for audio playback.
///
@@ -117,6 +118,9 @@ pub struct Track {
/// Count of remaining loops.
pub loops: LoopState,
/// Unique identifier for this track.
pub(crate) uuid: Uuid,
}
impl Track {
@@ -131,6 +135,8 @@ impl Track {
commands: UnboundedReceiver<TrackCommand>,
handle: TrackHandle,
) -> Self {
let uuid = handle.uuid();
Self {
playing: Default::default(),
volume: 1.0,
@@ -141,6 +147,7 @@ impl Track {
commands,
handle,
loops: LoopState::Finite(0),
uuid,
}
}
@@ -341,6 +348,11 @@ impl Track {
out
}
/// Returns this track's unique identifier.
pub fn uuid(&self) -> Uuid {
self.uuid
}
}
/// Creates a [`Track`] object to pass into the audio context, and a [`TrackHandle`]
@@ -354,9 +366,11 @@ impl Track {
pub fn create_player(source: Input) -> (Track, TrackHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let can_seek = source.is_seekable();
let player = Track::new_raw(source, rx, TrackHandle::new(tx.clone(), can_seek));
let handle = TrackHandle::new(tx, can_seek, Uuid::new_v4());
(player, TrackHandle::new(tx, can_seek))
let player = Track::new_raw(source, rx, handle.clone());
(player, handle)
}
/// Alias for most result-free calls to a [`TrackHandle`].

View File

@@ -6,16 +6,18 @@ use crate::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{collections::VecDeque, sync::Arc};
use std::{collections::VecDeque, ops::Deref, sync::Arc};
use tracing::{info, warn};
#[derive(Default)]
/// A simple queue for several audio sources, designed to
/// play in sequence.
///
/// This makes use of [`TrackEvent`]s to determine when the current
/// song or audio file has finished before playing the next entry.
///
/// One of these is automatically included via [`Driver::queue`] when
/// the `"builtin-queue"` feature is enabled.
///
/// `examples/serenity/voice_events_queue` demonstrates how a user might manage,
/// track and use this to run a song queue in many guilds in parallel.
/// This code is trivial to extend if extra functionality is needed.
@@ -50,15 +52,37 @@ use tracing::{info, warn};
/// queue.add_source(source, &mut driver);
/// # };
/// ```
///
/// [`TrackEvent`]: ../events/enum.TrackEvent.html
/// [`Driver::queue`]: ../driver/struct.Driver.html#method.queue
#[derive(Clone, Debug, Default)]
pub struct TrackQueue {
// NOTE: the choice of a parking lot mutex is quite deliberate
inner: Arc<Mutex<TrackQueueCore>>,
}
#[derive(Default)]
/// Reference to a track which is known to be part of a queue.
///
/// Instances *should not* be moved from one queue to another.
#[derive(Debug)]
pub struct Queued(TrackHandle);
impl Deref for Queued {
type Target = TrackHandle;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Queued {
/// Clones the inner handle
pub fn handle(&self) -> TrackHandle {
self.0.clone()
}
}
#[derive(Debug, Default)]
/// Inner portion of a [`TrackQueue`].
///
/// This abstracts away thread-safety from the user,
@@ -66,7 +90,7 @@ pub struct TrackQueue {
///
/// [`TrackQueue`]: struct.TrackQueue.html
struct TrackQueueCore {
tracks: VecDeque<TrackHandle>,
tracks: VecDeque<Queued>,
}
struct QueueHandler {
@@ -77,13 +101,33 @@ struct QueueHandler {
impl EventHandler for QueueHandler {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
let mut inner = self.remote_lock.lock();
// Due to possibility that users might remove, reorder,
// or dequeue+stop tracks, we need to verify that the FIRST
// track is the one who has ended.
let front_ended = match ctx {
EventContext::Track(ts) => {
// This slice should have exactly one entry.
// If the ended track has same id as the queue head, then
// we can progress the queue.
let queue_uuid = inner.tracks.front().map(|handle| handle.uuid());
let ended_uuid = ts.first().map(|handle| handle.1.uuid());
queue_uuid.is_some() && queue_uuid == ended_uuid
},
_ => false,
};
if !front_ended {
return None;
}
let _old = inner.tracks.pop_front();
info!("Queued track ended: {:?}.", ctx);
info!("{} tracks remain.", inner.tracks.len());
// If any audio files die unexpectedly, then keep going until we
// find one which works, or we run out.
// Keep going until we find one track which works, or we run out.
let mut keep_looking = true;
while keep_looking && !inner.tracks.is_empty() {
if let Some(new) = inner.tracks.front() {
@@ -113,8 +157,8 @@ impl TrackQueue {
/// Adds an audio source to the queue, to be played in the channel managed by `handler`.
pub fn add_source(&self, source: Input, handler: &mut Driver) {
let (audio, audio_handle) = tracks::create_player(source);
self.add(audio, audio_handle, handler);
let (audio, _) = tracks::create_player(source);
self.add(audio, handler);
}
/// Adds a [`Track`] object to the queue, to be played in the channel managed by `handler`.
@@ -124,11 +168,19 @@ impl TrackQueue {
///
/// [`Track`]: struct.Track.html
/// [`voice::create_player`]: fn.create_player.html
pub fn add(&self, mut track: Track, track_handle: TrackHandle, handler: &mut Driver) {
pub fn add(&self, mut track: Track, handler: &mut Driver) {
self.add_raw(&mut track);
handler.play(track);
}
#[inline]
pub(crate) fn add_raw(&self, track: &mut Track) {
info!("Track added to queue.");
let remote_lock = self.inner.clone();
let mut inner = self.inner.lock();
let track_handle = track.handle.clone();
if !inner.tracks.is_empty() {
track.pause();
}
@@ -142,8 +194,23 @@ impl TrackQueue {
track.position,
);
handler.play(track);
inner.tracks.push_back(track_handle);
inner.tracks.push_back(Queued(track_handle));
}
/// Returns a handle to the currently playing track.
pub fn current(&self) -> Option<TrackHandle> {
let inner = self.inner.lock();
inner.tracks.front().map(|h| h.handle())
}
/// Attempts to remove a track from the specified index.
///
/// The returned entry can be readded to *this* queue via [`modify_queue`].
///
/// [`modify_queue`]: #method.modify_queue
pub fn dequeue(&self, index: usize) -> Option<Queued> {
self.modify_queue(|vq| vq.remove(index))
}
/// Returns the number of tracks currently in the queue.
@@ -160,6 +227,18 @@ impl TrackQueue {
inner.tracks.is_empty()
}
/// Allows modification of the inner queue (i.e., deletion, reordering).
///
/// Users must be careful to `stop` removed tracks, so as to prevent
/// resource leaks.
pub fn modify_queue<F, O>(&self, func: F) -> O
where
F: FnOnce(&mut VecDeque<Queued>) -> O,
{
let mut inner = self.inner.lock();
func(&mut inner.tracks)
}
/// Pause the track at the head of the queue.
pub fn pause(&self) -> TrackResult {
let inner = self.inner.lock();
@@ -183,14 +262,14 @@ impl TrackQueue {
}
/// Stop the currently playing track, and clears the queue.
pub fn stop(&self) -> TrackResult {
pub fn stop(&self) {
let mut inner = self.inner.lock();
let out = inner.stop_current();
inner.tracks.clear();
out
for track in inner.tracks.drain(..) {
// Errors when removing tracks don't really make
// a difference: an error just implies it's already gone.
let _ = track.stop();
}
}
/// Skip to the next track in the queue, if it exists.