feat: Added PipelineExt trait for all Children of Pipelines
Some checks failed
build / checks-matrix (push) Has been cancelled
build / codecov (push) Has been cancelled
docs / docs (push) Has been cancelled
build / checks-build (push) Has been cancelled

This commit is contained in:
uttarayan21
2025-12-23 01:33:54 +05:30
parent 8d46bd2b85
commit 3382aebb1f
6 changed files with 93 additions and 136 deletions

View File

@@ -18,7 +18,7 @@ impl Pipeline {
.bus()
.ok_or(Error)
.attach("Failed to get bus from pipeline")?;
Ok(Bus { bus })
Ok(Bus::from_gst(bus))
}
/// Get the state
@@ -31,57 +31,6 @@ impl Pipeline {
Ok(current)
}
// pub fn wait_non_null_sync(&self) -> Result<()> {
// if dbg!(
// self.state(core::time::Duration::ZERO)
// .change_context(Error)
// .attach("Failed to get video context")
// )? != gstreamer::State::Null
// {
// Ok(())
// } else {
// let bus = self.bus()?;
// for message in bus.iter_timed(None) {
// let view = message.view();
// dbg!(&view);
// panic!();
// if let gstreamer::MessageView::StateChanged(change) = view
// && change.current() != State::Null
// {
// break;
// }
// }
// Ok(())
// }
// }
// Waits for the pipeline to be ready
// pub async fn wait_non_null(&self) -> Result<()> {
// if self
// .state(None)
// .change_context(Error)
// .attach("Failed to get video context")?
// != gstreamer::State::Null
// {
// Ok(())
// } else {
// use futures::StreamExt;
// self.bus()?
// .stream()
// .filter(|message: &gstreamer::Message| {
// let view = message.view();
// if let gstreamer::MessageView::StateChanged(change) = view {
// core::future::ready(change.current() != gstreamer::State::Null)
// } else {
// core::future::ready(false)
// }
// })
// .next()
// .await;
// Ok(())
// }
// }
pub fn play(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Playing)
@@ -115,3 +64,36 @@ impl Pipeline {
Ok(result)
}
}
pub trait PipelineExt {
fn bus(&self) -> Result<Bus>;
fn play(&self) -> Result<()>;
fn pause(&self) -> Result<()>;
fn ready(&self) -> Result<()>;
fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess>;
fn state(&self, timeout: impl Into<Option<core::time::Duration>>) -> Result<gstreamer::State>;
}
impl<T> PipelineExt for T
where
T: ChildOf<Pipeline>,
{
fn bus(&self) -> Result<Bus> {
self.upcast_ref().bus()
}
fn play(&self) -> Result<()> {
self.upcast_ref().play()
}
fn pause(&self) -> Result<()> {
self.upcast_ref().pause()
}
fn ready(&self) -> Result<()> {
self.upcast_ref().ready()
}
fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
self.upcast_ref().set_state(state)
}
fn state(&self, timeout: impl Into<Option<core::time::Duration>>) -> Result<gstreamer::State> {
self.upcast_ref().state(timeout)
}
}