Input: Allow Restartable sources to be lazy

This change is made with queue users in mind. Since sources
of this kind *know* how to (re)create themselves, they can
avoid being created at all until needed.

This also adds machinery to preload tracks *before* they are
needed, for gapless playback on queues and so on. Queues
make use of the event system to do this.
This commit is contained in:
Kyle Simpson
2020-12-28 17:02:10 +00:00
parent c0d3cb3113
commit 03ae0e7628
10 changed files with 368 additions and 101 deletions

View File

@@ -54,7 +54,7 @@ version = "0.11"
[dependencies.rand]
optional = true
version = "0.7"
version = "0.8"
[dependencies.serenity]
optional = true

View File

@@ -35,7 +35,10 @@ use serenity::{
};
use songbird::{
input,
input::{
self,
restartable::Restartable,
},
Event,
EventContext,
EventHandler as VoiceEventHandler,
@@ -477,7 +480,10 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult {
if let Some(handler_lock) = manager.get(guild_id) {
let mut handler = handler_lock.lock().await;
let source = match input::ytdl(&url).await {
// Here, we use lazy restartable sources to make sure that we don't pay
// for decoding, playback on tracks which aren't actually live yet.
let source = match Restartable::ytdl(url, true).await {
Ok(source) => source,
Err(why) => {
println!("Err starting source: {:?}", why);
@@ -488,7 +494,7 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult {
},
};
handler.enqueue_source(source);
handler.enqueue_source(source.into());
check_msg(
msg.channel_id

View File

@@ -183,7 +183,7 @@ async fn play(msg: Message, state: State) -> Result<(), Box<dyn Error + Send + S
let guild_id = msg.guild_id.unwrap();
if let Ok(song) = Restartable::ytdl(msg.content.clone()).await {
if let Ok(song) = Restartable::ytdl(msg.content.clone(), false).await {
let input = Input::from(song);
let content = format!(

View File

@@ -86,6 +86,15 @@ impl Reader {
_ => {},
}
}
#[allow(clippy::single_match)]
pub(crate) fn make_playable(&mut self) {
use Reader::*;
match self {
Restartable(r) => r.make_playable(),
_ => {},
}
}
}
impl Read for Reader {

View File

@@ -23,6 +23,36 @@ use std::{
type Recreator = Box<dyn Restart + Send + 'static>;
type RecreateChannel = Receiver<Result<(Box<Input>, Recreator)>>;
// Use options here to make "take" more doable from a mut ref.
enum LazyProgress {
Dead(Box<Metadata>, Option<Recreator>, Codec, Container),
Live(Box<Input>, Option<Recreator>),
Working(Codec, Container, bool, RecreateChannel),
}
impl Debug for LazyProgress {
fn fmt(&self, f: &mut Formatter<'_>) -> StdResult<(), FormatError> {
match self {
LazyProgress::Dead(meta, _, codec, container) => f
.debug_tuple("Dead")
.field(meta)
.field(&"<fn>")
.field(codec)
.field(container)
.finish(),
LazyProgress::Live(input, _) =>
f.debug_tuple("Live").field(input).field(&"<fn>").finish(),
LazyProgress::Working(codec, container, stereo, chan) => f
.debug_tuple("Working")
.field(codec)
.field(container)
.field(stereo)
.field(chan)
.finish(),
}
}
}
/// A wrapper around a method to create a new [`Input`] which
/// seeks backward by recreating the source.
///
@@ -40,50 +70,84 @@ type RecreateChannel = Receiver<Result<(Box<Input>, Recreator)>>;
/// [`Input`]: Input
/// [`Memory`]: cached::Memory
/// [`Compressed`]: cached::Compressed
#[derive(Debug)]
pub struct Restartable {
async_handle: Option<Handle>,
awaiting_source: Option<RecreateChannel>,
position: usize,
recreator: Option<Recreator>,
source: Box<Input>,
source: LazyProgress,
}
impl Restartable {
/// Create a new source, which can be restarted using a `recreator` function.
pub async fn new(mut recreator: impl Restart + Send + 'static) -> Result<Self> {
recreator.call_restart(None).await.map(move |source| Self {
async_handle: None,
awaiting_source: None,
position: 0,
recreator: Some(Box::new(recreator)),
source: Box::new(source),
})
///
/// Lazy sources will not run their input recreator until the first byte
/// is needed, or are sent [`Track::make_playable`]/[`TrackHandle::make_playable`].
///
/// [`Track::make_playable`]: crate::tracks::Track::make_playable
/// [`TrackHandle::make_playable`]: crate::tracks::TrackHandle::make_playable
pub async fn new(mut recreator: impl Restart + Send + 'static, lazy: bool) -> Result<Self> {
if lazy {
recreator
.lazy_init()
.await
.map(move |(meta, kind, codec)| Self {
async_handle: None,
position: 0,
source: LazyProgress::Dead(
meta.unwrap_or_default().into(),
Some(Box::new(recreator)),
kind,
codec,
),
})
} else {
recreator.call_restart(None).await.map(move |source| Self {
async_handle: None,
position: 0,
source: LazyProgress::Live(source.into(), Some(Box::new(recreator))),
})
}
}
/// Create a new restartable ffmpeg source for a local file.
pub async fn ffmpeg<P: AsRef<OsStr> + Send + Clone + Sync + 'static>(path: P) -> Result<Self> {
Self::new(FfmpegRestarter { path }).await
pub async fn ffmpeg<P: AsRef<OsStr> + Send + Clone + Sync + 'static>(
path: P,
lazy: bool,
) -> Result<Self> {
Self::new(FfmpegRestarter { path }, lazy).await
}
/// Create a new restartable ytdl source.
///
/// The cost of restarting and seeking will probably be *very* high:
/// expect a pause if you seek backwards.
pub async fn ytdl<P: AsRef<str> + Send + Clone + Sync + 'static>(uri: P) -> Result<Self> {
Self::new(YtdlRestarter { uri }).await
pub async fn ytdl<P: AsRef<str> + Send + Clone + Sync + 'static>(
uri: P,
lazy: bool,
) -> Result<Self> {
Self::new(YtdlRestarter { uri }, lazy).await
}
/// Create a new restartable ytdl source, using the first result of a youtube search.
///
/// The cost of restarting and seeking will probably be *very* high:
/// expect a pause if you seek backwards.
pub async fn ytdl_search(name: &str) -> Result<Self> {
Self::ytdl(format!("ytsearch1:{}", name)).await
pub async fn ytdl_search(name: &str, lazy: bool) -> Result<Self> {
Self::ytdl(format!("ytsearch1:{}", name), lazy).await
}
pub(crate) fn prep_with_handle(&mut self, handle: Handle) {
self.async_handle = Some(handle);
}
pub(crate) fn make_playable(&mut self) {
if matches!(self.source, LazyProgress::Dead(_, _, _, _)) {
// This read triggers creation of a source, and is guaranteed not to modify any internals.
// It will harmlessly write out zeroes into the target buffer.
let mut bytes = [0u8; 0];
let _ = Read::read(self, &mut bytes[..]);
}
}
}
/// Trait used to create an instance of a [`Reader`] at instantiation and when
@@ -94,6 +158,13 @@ impl Restartable {
pub trait Restart {
/// Tries to create a replacement source.
async fn call_restart(&mut self, time: Option<Duration>) -> Result<Input>;
/// Optionally retrieve metadata for a source which has been lazily initialised.
///
/// This is particularly useful for sources intended to be queued, which
/// should occupy few resources when not live BUT have as much information as
/// possible made available at creation.
async fn lazy_init(&mut self) -> Result<(Option<Metadata>, Codec, Container)>;
}
struct FfmpegRestarter<P>
@@ -137,6 +208,12 @@ where
ffmpeg(self.path.as_ref()).await
}
}
async fn lazy_init(&mut self) -> Result<(Option<Metadata>, Codec, Container)> {
is_stereo(self.path.as_ref())
.await
.map(|(_stereo, metadata)| (Some(metadata), Codec::FloatPcm, Container::Raw))
}
}
struct YtdlRestarter<P>
@@ -160,26 +237,31 @@ where
ytdl(self.uri.as_ref()).await
}
}
}
impl Debug for Restartable {
fn fmt(&self, f: &mut Formatter<'_>) -> StdResult<(), FormatError> {
f.debug_struct("Restartable")
.field("async_handle", &self.async_handle)
.field("awaiting_source", &self.awaiting_source)
.field("position", &self.position)
.field("recreator", &"<fn>")
.field("source", &self.source)
.finish()
async fn lazy_init(&mut self) -> Result<(Option<Metadata>, Codec, Container)> {
_ytdl_metadata(self.uri.as_ref())
.await
.map(|m| (Some(m), Codec::FloatPcm, Container::Raw))
}
}
impl From<Restartable> for Input {
fn from(mut src: Restartable) -> Self {
let kind = src.source.kind.clone();
let meta = Some(src.source.metadata.take());
let stereo = src.source.stereo;
let container = src.source.container;
let (meta, stereo, kind, container) = match &mut src.source {
LazyProgress::Dead(ref mut m, _rec, kind, container) => {
let stereo = m.channels == Some(2);
(Some(m.take()), stereo, kind.clone(), *container)
},
LazyProgress::Live(ref mut input, _rec) => (
Some(input.metadata.take()),
input.stereo,
input.kind.clone(),
input.container,
),
// This branch should never be taken: this is an emergency measure.
LazyProgress::Working(kind, container, stereo, _) =>
(None, *stereo, kind.clone(), *container),
};
Input::new(stereo, Reader::Restartable(src), kind, container, meta)
}
}
@@ -190,43 +272,70 @@ impl From<Restartable> for Input {
impl Read for Restartable {
fn read(&mut self, buffer: &mut [u8]) -> IoResult<usize> {
let (out_val, march_pos, remove_async) = if let Some(chan) = &self.awaiting_source {
match chan.try_recv() {
Ok(Ok((new_source, recreator))) => {
self.source = new_source;
self.recreator = Some(recreator);
use LazyProgress::*;
let (out_val, march_pos, next_source) = match &mut self.source {
Dead(meta, rec, kind, container) => {
let stereo = meta.channels == Some(2);
let handle = self.async_handle.clone();
let new_chan = if let Some(rec) = rec.take() {
Some(regenerate_channel(
rec,
0,
stereo,
kind.clone(),
*container,
handle,
)?)
} else {
return Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Illegal state: taken recreator was observed.".to_string(),
));
};
(Read::read(&mut self.source, buffer), true, true)
},
Ok(Err(source_error)) => {
let e = Err(IoError::new(
IoErrorKind::UnexpectedEof,
format!("Failed to create new reader: {:?}.", source_error),
));
(e, false, true)
},
Err(TryRecvError::Empty) => {
// Output all zeroes.
for el in buffer.iter_mut() {
*el = 0;
}
(Ok(buffer.len()), false, false)
},
Err(_) => {
let e = Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Failed to create new reader: dropped.",
));
(e, false, true)
},
}
} else {
// already have a good, valid source.
(Read::read(&mut self.source, buffer), true, false)
// Then, output all zeroes.
for el in buffer.iter_mut() {
*el = 0;
}
(Ok(buffer.len()), false, new_chan)
},
Live(source, _) => (Read::read(source, buffer), true, None),
Working(_, _, _, chan) => {
match chan.try_recv() {
Ok(Ok((mut new_source, recreator))) => {
// Completed!
// Do read, then replace inner progress.
let bytes_read = Read::read(&mut new_source, buffer);
(bytes_read, true, Some(Live(new_source, Some(recreator))))
},
Ok(Err(source_error)) => {
let e = Err(IoError::new(
IoErrorKind::UnexpectedEof,
format!("Failed to create new reader: {:?}.", source_error),
));
(e, false, None)
},
Err(TryRecvError::Empty) => {
// Output all zeroes.
for el in buffer.iter_mut() {
*el = 0;
}
(Ok(buffer.len()), false, None)
},
Err(_) => {
let e = Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Failed to create new reader: dropped.",
));
(e, false, None)
},
}
},
};
if remove_async {
self.awaiting_source = None;
if let Some(src) = next_source {
self.source = src;
}
if march_pos {
@@ -247,45 +356,62 @@ impl Seek for Restartable {
use SeekFrom::*;
match pos {
Start(offset) => {
let stereo = self.source.stereo;
let _current_ts = utils::byte_count_to_timestamp(self.position, stereo);
let offset = offset as usize;
let handle = self.async_handle.clone();
if offset < self.position {
// We're going back in time.
if let Some(handle) = self.async_handle.as_ref() {
let (tx, rx) = flume::bounded(1);
self.awaiting_source = Some(rx);
let recreator = self.recreator.take();
if let Some(mut rec) = recreator {
handle.spawn(async move {
let ret_val = rec
.call_restart(Some(utils::byte_count_to_timestamp(
offset, stereo,
)))
.await;
let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec)));
});
use LazyProgress::*;
match &mut self.source {
Dead(meta, rec, kind, container) => {
// regen at given start point
self.source = if let Some(rec) = rec.take() {
regenerate_channel(
rec,
offset,
meta.channels == Some(2),
kind.clone(),
*container,
handle,
)?
} else {
return Err(IoError::new(
IoErrorKind::Interrupted,
"Previous seek in progress.",
IoErrorKind::UnexpectedEof,
"Illegal state: taken recreator was observed.".to_string(),
));
}
};
self.position = offset;
} else {
},
Live(input, rec) =>
if offset < self.position {
// regen at given start point
// We're going back in time.
self.source = if let Some(rec) = rec.take() {
regenerate_channel(
rec,
offset,
input.stereo,
input.kind.clone(),
input.container,
handle,
)?
} else {
return Err(IoError::new(
IoErrorKind::UnexpectedEof,
"Illegal state: taken recreator was observed.".to_string(),
));
};
self.position = offset;
} else {
// march on with live source.
self.position += input.consume(offset - self.position);
},
Working(_, _, _, _) => {
return Err(IoError::new(
IoErrorKind::Interrupted,
"Cannot safely call seek until provided an async context handle.",
"Previous seek in progress.",
));
}
} else {
self.position += self.source.consume(offset - self.position);
},
}
Ok(offset as u64)
@@ -298,3 +424,31 @@ impl Seek for Restartable {
}
}
}
fn regenerate_channel(
mut rec: Recreator,
offset: usize,
stereo: bool,
kind: Codec,
container: Container,
handle: Option<Handle>,
) -> IoResult<LazyProgress> {
if let Some(handle) = handle.as_ref() {
let (tx, rx) = flume::bounded(1);
handle.spawn(async move {
let ret_val = rec
.call_restart(Some(utils::byte_count_to_timestamp(offset, stereo)))
.await;
let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec)));
});
Ok(LazyProgress::Working(kind, container, stereo, rx))
} else {
Err(IoError::new(
IoErrorKind::Interrupted,
"Cannot safely call seek until provided an async context handle.",
))
}
}

View File

@@ -11,7 +11,7 @@ use std::{
io::{BufRead, BufReader, Read},
process::{Command, Stdio},
};
use tokio::task;
use tokio::{process::Command as TokioCommand, task};
use tracing::trace;
const YOUTUBE_DL_COMMAND: &str = if cfg!(feature = "youtube-dlc") {
@@ -66,6 +66,7 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result<Input> {
.stdout(Stdio::piped())
.spawn()?;
// This rigmarole is required due to the inner synchronous reading context.
let stderr = youtube_dl.stderr.take();
let (returned_stderr, value) = task::spawn_blocking(move || {
let mut s = stderr.unwrap();
@@ -113,6 +114,45 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result<Input> {
))
}
pub(crate) async fn _ytdl_metadata(uri: &str) -> Result<Metadata> {
// Most of these flags are likely unused, but we want identical search
// and/or selection as the above functions.
let ytdl_args = [
"-j",
"-f",
"webm[abr>0]/bestaudio/best",
"-R",
"infinite",
"--no-playlist",
"--ignore-config",
uri,
"-o",
"-",
];
let youtube_dl_output = TokioCommand::new(YOUTUBE_DL_COMMAND)
.args(&ytdl_args)
.stdin(Stdio::null())
.output()
.await?;
let o_vec = youtube_dl_output.stderr;
let end = (&o_vec)
.iter()
.position(|el| *el == 0xA)
.unwrap_or_else(|| o_vec.len());
let value = serde_json::from_slice(&o_vec[..end]).map_err(|err| Error::Json {
error: err,
parsed_text: std::str::from_utf8(&o_vec).unwrap_or_default().to_string(),
})?;
let metadata = Metadata::from_ytdl_output(value);
Ok(metadata)
}
/// Creates a streamed audio source from YouTube search results with `youtube-dl(c)`,`ffmpeg`, and `ytsearch`.
/// Takes the first video listed from the YouTube search.
///

View File

@@ -30,6 +30,8 @@ pub enum TrackCommand {
Request(OneshotSender<Box<TrackState>>),
/// Change the loop count/strategy of this track.
Loop(LoopState),
/// Prompts a track's input to become live and usable, if it is not already.
MakePlayable,
}
impl std::fmt::Debug for TrackCommand {
@@ -48,6 +50,7 @@ impl std::fmt::Debug for TrackCommand {
Do(_f) => "Do([function])".to_string(),
Request(tx) => format!("Request({:?})", tx),
Loop(loops) => format!("Loop({:?})", loops),
MakePlayable => "MakePlayable".to_string(),
}
)
}

View File

@@ -74,6 +74,16 @@ impl TrackHandle {
self.send(TrackCommand::Volume(volume))
}
/// Ready a track for playing if it is lazily initialised.
///
/// Currently, only [`Restartable`] sources support lazy setup.
/// This call is a no-op for all others.
///
/// [`Restartable`]: crate::input::restartable::Restartable
pub fn make_playable(&self) -> TrackResult<()> {
self.send(TrackCommand::MakePlayable)
}
/// Denotes whether the underlying [`Input`] stream is compatible with arbitrary seeking.
///
/// If this returns `false`, all calls to [`seek_time`] will fail, and the track is

View File

@@ -307,6 +307,7 @@ impl Track {
TrackStateChange::Loops(self.loops, true),
));
},
MakePlayable => self.make_playable(),
}
},
Err(TryRecvError::Closed) => {
@@ -320,6 +321,16 @@ impl Track {
}
}
/// Ready a track for playing if it is lazily initialised.
///
/// Currently, only [`Restartable`] sources support lazy setup.
/// This call is a no-op for all others.
///
/// [`Restartable`]: crate::input::restartable::Restartable
pub fn make_playable(&mut self) {
self.source.reader.make_playable();
}
/// Creates a read-only copy of the audio track's state.
///
/// The primary use-case of this is sending information across

View File

@@ -6,7 +6,7 @@ use crate::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{collections::VecDeque, ops::Deref, sync::Arc};
use std::{collections::VecDeque, ops::Deref, sync::Arc, time::Duration};
use tracing::{info, warn};
/// A simple queue for several audio sources, designed to
@@ -145,6 +145,23 @@ impl EventHandler for QueueHandler {
}
}
struct SongPreloader {
remote_lock: Arc<Mutex<TrackQueueCore>>,
}
#[async_trait]
impl EventHandler for SongPreloader {
async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
let inner = self.remote_lock.lock();
if let Some(track) = inner.tracks.get(1) {
let _ = track.0.make_playable();
}
None
}
}
impl TrackQueue {
/// Create a new, empty, track queue.
pub fn new() -> Self {
@@ -194,6 +211,23 @@ impl TrackQueue {
track.position,
);
// Attempts to start loading the next track before this one ends.
// Idea is to provide as close to gapless playback as possible,
// while minimising memory use.
if let Some(time) = track.source.metadata.duration {
let preload_time = time.checked_sub(Duration::from_secs(5)).unwrap_or_default();
let remote_lock = self.inner.clone();
track
.events
.as_mut()
.expect("Queue inspecting EventStore on new Track: did not exist.")
.add_event(
EventData::new(Event::Delayed(preload_time), SongPreloader { remote_lock }),
track.position,
);
}
inner.tracks.push_back(Queued(track_handle));
}