use crate::{Error, Result, ResultExt}; use gst::{ Bus, Gst, MessageType, MessageView, Sink, Source, app::AppSink, caps::{Caps, CapsType}, element::ElementExt, pipeline::PipelineExt, playback::{PlayFlags, Playbin3}, videoconvertscale::VideoConvert, }; use std::sync::{Arc, Mutex, atomic::AtomicBool}; #[derive(Debug, Clone)] pub struct VideoSource { pub(crate) playbin: Playbin3, pub(crate) appsink: AppSink, pub(crate) bus: Bus, pub(crate) ready: Arc, pub(crate) frame: Arc>, pub(crate) size: std::sync::OnceLock<(i32, i32)>, } impl VideoSource { /// Creates a new video source from the given URL. /// Since this doesn't have to parse the pipeline manually, we aren't sanitizing the URL for /// now. pub fn new(url: impl AsRef) -> Result { Gst::new(); let mut appsink = AppSink::new("iced-video-sink").change_context(Error)?; appsink .drop(true) .sync(true) // .async_(true) .emit_signals(true); let playbin = Playbin3::new("iced-video") .change_context(Error)? .with_uri(url.as_ref()) .with_buffer_duration(core::time::Duration::from_secs(2)) .with_buffer_size(4096 * 4096 * 4 * 3) .with_ring_buffer_max_size(4096 * 4096 * 4 * 3) .with_flags(Playbin3::default_flags() | PlayFlags::DOWNLOAD) .with_video_sink(&appsink); let bus = playbin.bus().change_context(Error)?; playbin.pause().change_context(Error)?; let ready = Arc::new(AtomicBool::new(false)); let frame = Arc::new(Mutex::new(gst::Sample::new())); appsink.on_new_sample({ let ready = Arc::clone(&ready); let frame = Arc::clone(&frame); move |appsink| { let Ok(sample) = appsink.pull_sample() else { tracing::error!("Failed to pull video sample from appsink despite being notified of new frame"); return Ok(()); }; { let mut guard = frame.lock().expect("BUG: Mutex poisoned"); core::mem::replace(&mut *guard, sample); ready.store(true, std::sync::atomic::Ordering::Relaxed); } Ok(()) } }); Ok(Self { playbin, appsink, bus, ready, frame, size: std::sync::OnceLock::new(), }) } pub async fn wait(&self) -> Result<()> { use futures_lite::StreamExt; // self.bus_stream() // .for_each(|msg: gst::Message| { // use gst::gstreamer::prelude::*; // match msg.view() { // MessageView::Eos(_) => { // tracing::info!("Video reached end of stream"); // } // MessageView::Error(err) => { // tracing::error!( // "Video Error from {:?}: {} ({:?})", // err.src().map(|s| s.path_string()), // err.error(), // err.debug() // ); // } // view => tracing::info!("Video Message: {:#?}", view), // } // }) // .await; self.playbin .wait_for_states(&[gst::State::Paused, gst::State::Playing]) .await .change_context(Error) .attach("Failed to wait for video initialisation")?; Ok(()) } pub fn format(&self) -> Result { let caps = self .appsink .sink("sink") .current_caps() .change_context(Error)?; let format = caps .format() .ok_or(Error) .attach("Failed to get video caps structure")?; Ok(format) } pub fn bus_stream(&self) -> impl futures_lite::Stream { self.bus.stream() } pub fn is_playing(&self) -> Result { let state = self.playbin.state(None).change_context(Error)?; Ok(state == gst::State::Playing) } pub fn toggle(&self) -> Result<()> { if self.is_playing()? { self.pause()?; } else { self.play()?; } Ok(()) } pub fn play(&self) -> Result<()> { self.playbin .play() .change_context(Error) .attach("Failed to play video") } pub fn pause(&self) -> Result<()> { self.playbin .pause() .change_context(Error) .attach("Failed to pause video") } pub fn stop(&self) -> Result<()> { self.playbin .stop() .change_context(Error) .attach("Failed to stop video") } pub fn size(&self) -> Result<(i32, i32)> { if let Some(size) = self.size.get() { return Ok(*size); } let caps = self .appsink .sink("sink") .current_caps() .change_context(Error)?; let out = caps .width() .and_then(|width| caps.height().map(|height| (width, height))) .ok_or(Error) .attach("Failed to get width, height")?; self.size.set(out); Ok(out) } }