feat(input): Support HLS streams (#242)

This patch adds support for yt-dl streams with the protocol m3u8_native which includes sites like Soundcloud.

Closes: #241
This commit is contained in:
Erk
2024-07-07 19:37:25 +02:00
committed by GitHub
parent 2e683380fa
commit 8e92c49b2b
8 changed files with 187 additions and 18 deletions

View File

@@ -41,6 +41,7 @@ serenity-voice-model = { optional = true, version = "0.2" }
simd-json = { features = ["serde_impl"], optional = true, version = "0.13" }
socket2 = { optional = true, version = "0.5" }
streamcatcher = { optional = true, version = "1" }
stream_lib = { optional = true, version = "0.4.1" }
symphonia = { default_features = false, optional = true, version = "0.5.2" }
symphonia-core = { optional = true, version = "0.5.2" }
tokio = { default-features = false, optional = true, version = "1.0" }
@@ -83,20 +84,22 @@ driver = [
"dep:async-trait",
"dep:audiopus",
"dep:byteorder",
"dep:bytes",
"dep:crypto_secretbox",
"dep:discortp",
"dep:reqwest",
"dep:flume",
"dep:nohash-hasher",
"dep:once_cell",
"dep:parking_lot",
"dep:rand",
"dep:reqwest",
"dep:ringbuf",
"dep:rubato",
"dep:rusty_pool",
"dep:serde-aux",
"dep:serenity-voice-model",
"dep:socket2",
"dep:stream_lib",
"dep:streamcatcher",
"dep:symphonia",
"dep:symphonia-core",

View File

@@ -101,8 +101,9 @@ impl fmt::Display for Error {
Self::CryptoModeInvalid => write!(f, "server changed negotiated encryption mode"),
Self::CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode"),
Self::EndpointUrl => write!(f, "endpoint URL received from gateway was invalid"),
Self::IllegalDiscoveryResponse =>
write!(f, "IP discovery/NAT punching response was invalid"),
Self::IllegalDiscoveryResponse => {
write!(f, "IP discovery/NAT punching response was invalid")
},
Self::IllegalIp => write!(f, "IP discovery/NAT punching response had bad IP value"),
Self::Io(e) => e.fmt(f),
Self::Json(e) => e.fmt(f),

View File

@@ -242,7 +242,7 @@ impl<'a> InternalTrack {
},
Ok(MixerInputResultMessage::Seek(parsed, rec, seek_res)) => {
match seek_res {
Ok(pos) =>
Ok(pos) => {
if let Some(time_base) = parsed.decoder.codec_params().time_base {
// Update track's position to match the actual timestamp the
// seek landed at.
@@ -282,7 +282,8 @@ impl<'a> InternalTrack {
SymphError::Unsupported("Track had no recorded time base.")
.into(),
))
},
}
},
Err(e) => Err(InputReadyingError::Seeking(e)),
}
},

View File

@@ -177,10 +177,12 @@ impl DriverTestHandle {
OutputPacket::Empty => eprintln!("pkt: Nothing"),
OutputPacket::Rtp(p) => eprintln!("pkt: RTP[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Silent) => eprintln!("pkt: Raw-Silent"),
OutputPacket::Raw(OutputMessage::Passthrough(p)) =>
eprintln!("pkt: Raw-Passthrough[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Mixed(p)) =>
eprintln!("pkt: Raw-Mixed[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Passthrough(p)) => {
eprintln!("pkt: Raw-Passthrough[{}B]", p.len())
},
OutputPacket::Raw(OutputMessage::Mixed(p)) => {
eprintln!("pkt: Raw-Mixed[{}B]", p.len())
},
}
}
}

View File

@@ -19,6 +19,7 @@ pub struct Output {
pub uploader: Option<String>,
pub url: String,
pub webpage_url: Option<String>,
pub protocol: Option<String>,
}
impl Output {

149
src/input/sources/hls.rs Normal file
View File

@@ -0,0 +1,149 @@
use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use pin_project::pin_project;
use reqwest::{header::HeaderMap, Client};
use stream_lib::Event;
use symphonia_core::io::MediaSource;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio_util::io::StreamReader;
use crate::input::{
AsyncAdapterStream,
AsyncMediaSource,
AudioStream,
AudioStreamError,
Compose,
Input,
};
/// Lazy HLS stream
#[derive(Debug)]
pub struct HlsRequest {
/// HTTP client
client: Client,
/// URL of hls playlist
request: String,
/// Headers of the request
headers: HeaderMap,
}
impl HlsRequest {
#[must_use]
/// Create a lazy HLS request.
pub fn new(client: Client, request: String) -> Self {
Self::new_with_headers(client, request, HeaderMap::default())
}
#[must_use]
/// Create a lazy HTTP request.
pub fn new_with_headers(client: Client, request: String, headers: HeaderMap) -> Self {
HlsRequest {
client,
request,
headers,
}
}
fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
let request = self
.client
.get(&self.request)
.headers(self.headers.clone())
.build()
.map_err(|why| AudioStreamError::Fail(why.into()))?;
let hls = stream_lib::download_hls(self.client.clone(), request, None);
let stream = Box::new(StreamReader::new(hls.map(|ev| match ev {
Event::Bytes { bytes } => Ok(bytes),
Event::End => Ok(Bytes::new()),
Event::Error { error } => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error,
)),
})));
Ok(HlsStream { stream })
}
}
#[pin_project]
struct HlsStream {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}
impl AsyncRead for HlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}
impl AsyncSeek for HlsStream {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}
fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}
#[async_trait]
impl AsyncMediaSource for HlsStream {
fn is_seekable(&self) -> bool {
false
}
async fn byte_len(&self) -> Option<u64> {
None
}
async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}
#[async_trait]
impl Compose for HlsRequest {
fn create(&mut self) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
self.create_stream().map(|input| {
let stream = AsyncAdapterStream::new(Box::new(input), 64 * 1024);
AudioStream {
input: Box::new(stream) as Box<dyn MediaSource>,
hint: None,
}
})
}
async fn create_async(
&mut self,
) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
fn should_create_async(&self) -> bool {
false
}
}
impl From<HlsRequest> for Input {
fn from(val: HlsRequest) -> Self {
Input::Lazy(Box::new(val))
}
}

View File

@@ -1,5 +1,6 @@
mod file;
mod hls;
mod http;
mod ytdl;
pub use self::{file::*, http::*, ytdl::*};
pub use self::{file::*, hls::*, http::*, ytdl::*};

View File

@@ -16,6 +16,8 @@ use std::{error::Error, io::ErrorKind};
use symphonia_core::io::MediaSource;
use tokio::process::Command;
use super::HlsRequest;
const YOUTUBE_DL_COMMAND: &str = "yt-dlp";
#[derive(Clone, Debug)]
@@ -194,14 +196,23 @@ impl Compose for YoutubeDl {
}));
}
let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};
req.create_async().await
#[allow(clippy::single_match_else)]
match result.protocol.as_deref() {
Some("m3u8_native") => {
let mut req =
HlsRequest::new_with_headers(self.client.clone(), result.url, headers);
req.create()
},
_ => {
let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};
req.create_async().await
},
}
}
fn should_create_async(&self) -> bool {