use crate::priv_prelude::*; wrap_gst!(Pipeline); parent_child!(Element, Pipeline); parent_child!(Bin, Pipeline); impl Drop for Pipeline { fn drop(&mut self) { let _ = self.inner.set_state(gstreamer::State::Null); } } impl Pipeline { #[track_caller] pub fn bus(&self) -> Result { let bus = self .inner .bus() .ok_or(Error) .attach("Failed to get bus from pipeline")?; Ok(Bus::from_gst(bus)) } /// Get the state pub fn state( &self, timeout: impl Into>, ) -> Result { let (result, current, _pending) = self.inner.state(duration_to_clocktime(timeout)?); result.change_context(Error).attach("Failed to get state")?; Ok(current) } pub fn play(&self) -> Result<()> { self.inner .set_state(gstreamer::State::Playing) .change_context(Error) .attach("Failed to set pipeline to Playing state")?; Ok(()) } pub fn pause(&self) -> Result<()> { self.inner .set_state(gstreamer::State::Paused) .change_context(Error) .attach("Failed to set pipeline to Paused state")?; Ok(()) } pub fn ready(&self) -> Result<()> { self.inner .set_state(gstreamer::State::Ready) .change_context(Error) .attach("Failed to set pipeline to Ready state")?; Ok(()) } pub fn stop(&self) -> Result<()> { self.inner .set_state(gstreamer::State::Null) .change_context(Error) .attach("Failed to set pipeline to Null state")?; Ok(()) } pub fn set_state(&self, state: gstreamer::State) -> Result { let result = self .inner .set_state(state) .change_context(Error) .attach("Failed to set pipeline state")?; Ok(result) } pub async fn wait_for(&self, state: gstreamer::State) -> Result<()> { let current_state = self.state(core::time::Duration::ZERO)?; if current_state == state { Ok(()) } else { // use futures::stream::StreamExt; use futures_lite::stream::StreamExt as _; self.bus()? .filtered_stream(&[MessageType::StateChanged]) .find(|message: &gstreamer::Message| { let view = message.view(); if let gstreamer::MessageView::StateChanged(changed) = view { changed.current() == state && changed.src().is_some_and(|s| s == &self.inner) } else { false } }) .await; Ok(()) } } pub async fn wait_for_states(&self, states: impl AsRef<[gstreamer::State]>) -> Result<()> { let current_state = self.state(core::time::Duration::ZERO)?; let states = states.as_ref(); if states.contains(¤t_state) { Ok(()) } else { use futures_lite::stream::StreamExt as _; self.bus()? .filtered_stream(&[MessageType::StateChanged]) .find(|message: &gstreamer::Message| { let view = message.view(); if let gstreamer::MessageView::StateChanged(changed) = view { states.contains(&changed.current()) && changed.src().is_some_and(|s| s == &self.inner) } else { false } }) .await; Ok(()) } } pub async fn wait_for_message<'a, F2>( &self, filter: Option<&'a [gstreamer::MessageType]>, filter_fn: F2, ) -> Result where F2: Fn(&gstreamer::Message) -> bool + Send + 'a, { use futures_lite::stream::StreamExt as _; match filter { Some(filter) => { let message = self.bus()?.filtered_stream(filter).find(filter_fn).await; match message { Some(msg) => Ok(msg), None => { Err(Error).attach("Failed to find message matching the provided filter") } } } None => { let message = self.bus()?.stream().find(filter_fn).await; match message { Some(msg) => Ok(msg), None => { Err(Error).attach("Failed to find message matching the provided filter") } } } } } } pub trait PipelineExt: ChildOf + Sync { // #[track_caller] // fn bus(&self) -> Result { // self.upcast_ref().bus() // } #[track_caller] fn play(&self) -> Result<()> { self.upcast_ref().play() } #[track_caller] fn pause(&self) -> Result<()> { self.upcast_ref().pause() } #[track_caller] fn ready(&self) -> Result<()> { self.upcast_ref().ready() } #[track_caller] fn stop(&self) -> Result<()> { self.upcast_ref().stop() } #[track_caller] fn set_state(&self, state: gstreamer::State) -> Result { self.upcast_ref().set_state(state) } #[track_caller] fn state(&self, timeout: impl Into>) -> Result { self.upcast_ref().state(timeout) } fn wait_for( &self, state: gstreamer::State, ) -> impl std::future::Future> + Send { self.upcast_ref().wait_for(state) } fn wait_for_states( &self, states: impl AsRef<[gstreamer::State]> + Send, ) -> impl std::future::Future> + Send { self.upcast_ref().wait_for_states(states) } fn wait_for_message<'a, F2>( &self, filter: Option<&'a [gstreamer::MessageType]>, filter_fn: F2, ) -> impl std::future::Future> + Send where F2: Fn(&gstreamer::Message) -> bool + Send + 'a, { self.upcast_ref().wait_for_message(filter, filter_fn) } } impl + Sync> PipelineExt for T {}