use crate::{Error, Result, ResultExt}; use gst::{ Bus, Gst, MessageType, MessageView, Sink, Source, app::AppSink, caps::{Caps, CapsType}, element::ElementExt, pipeline::PipelineExt, playback::{PlayFlags, Playbin, Playbin3}, videoconvertscale::VideoConvert, }; use std::sync::{Arc, Mutex, atomic::AtomicBool}; #[derive(Debug, Clone)] pub struct VideoSource { pub(crate) playbin: Playbin3, pub(crate) videoconvert: VideoConvert, pub(crate) appsink: AppSink, pub(crate) bus: Bus, pub(crate) ready: Arc, pub(crate) frame: Arc>, } impl VideoSource { /// Creates a new video source from the given URL. /// Since this doesn't have to parse the pipeline manually, we aren't sanitizing the URL for /// now. pub fn new(url: impl AsRef) -> Result { Gst::new(); let videoconvert = VideoConvert::new("iced-video-convert") // .change_context(Error)? // .with_output_format(gst::plugins::videoconvertscale::VideoFormat::Rgba) .change_context(Error)?; let mut appsink = AppSink::new("iced-video-sink").change_context(Error)?; appsink .drop(true) .sync(true) // .async_(true) .emit_signals(true) .caps( Caps::builder(CapsType::Video) .field("format", "RGB10A2_LE") // Forced for now .build(), ); let video_sink = videoconvert.link(&appsink).change_context(Error)?; let playbin = Playbin3::new("iced-video") .change_context(Error)? .with_uri(url.as_ref()) .with_buffer_duration(core::time::Duration::from_secs(2)) .with_buffer_size(4096 * 4096 * 4 * 3) .with_ring_buffer_max_size(4096 * 4096 * 4 * 3) .with_flags(PlayFlags::default() | PlayFlags::DOWNLOAD) .with_video_sink(&video_sink); let bus = playbin.bus().change_context(Error)?; playbin.pause().change_context(Error)?; let ready = Arc::new(AtomicBool::new(false)); let frame = Arc::new(Mutex::new(gst::Sample::new())); appsink.on_new_frame({ let ready = Arc::clone(&ready); let frame = Arc::clone(&frame); move |appsink| { let Ok(sample) = appsink.pull_sample() else { tracing::error!("Failed to pull video sample from appsink despite being notified of new frame"); return Ok(()); }; { let mut guard = frame.lock().expect("BUG: Mutex poisoned"); core::mem::replace(&mut *guard, sample); ready.store(true, std::sync::atomic::Ordering::Relaxed); } Ok(()) } }); Ok(Self { playbin, videoconvert, appsink, bus, ready, frame, }) } pub async fn wait(self) -> Result<()> { self.playbin .wait_for_states(&[gst::State::Paused, gst::State::Playing]) .await .change_context(Error) .attach("Failed to wait for video initialisation") } pub fn is_playing(&self) -> Result { let state = self.playbin.state(None).change_context(Error)?; Ok(state == gst::State::Playing) } pub fn toggle(&self) -> Result<()> { if self.is_playing()? { self.pause()?; } else { self.play()?; } Ok(()) } pub fn play(&self) -> Result<()> { self.playbin .play() .change_context(Error) .attach("Failed to play video") } pub fn pause(&self) -> Result<()> { self.playbin .pause() .change_context(Error) .attach("Failed to pause video") } pub fn stop(&self) -> Result<()> { self.playbin .stop() .change_context(Error) .attach("Failed to stop video") } pub fn size(&self) -> Result<(i32, i32)> { let caps = self .appsink .sink("sink") .current_caps() .change_context(Error)?; caps.width() .and_then(|width| caps.height().map(|height| (width, height))) .ok_or(Error) .attach("Failed to get width, height") } }