212 lines
6.4 KiB
Rust
212 lines
6.4 KiB
Rust
use crate::priv_prelude::*;
|
|
|
|
wrap_gst!(Pipeline);
|
|
parent_child!(Element, Pipeline);
|
|
parent_child!(Bin, Pipeline);
|
|
|
|
impl Drop for Pipeline {
|
|
fn drop(&mut self) {
|
|
let _ = self.inner.set_state(gstreamer::State::Null);
|
|
}
|
|
}
|
|
|
|
impl Pipeline {
|
|
#[track_caller]
|
|
pub fn bus(&self) -> Result<Bus> {
|
|
let bus = self
|
|
.inner
|
|
.bus()
|
|
.ok_or(Error)
|
|
.attach("Failed to get bus from pipeline")?;
|
|
Ok(Bus::from_gst(bus))
|
|
}
|
|
|
|
/// Get the state
|
|
pub fn state(
|
|
&self,
|
|
timeout: impl Into<Option<core::time::Duration>>,
|
|
) -> Result<gstreamer::State> {
|
|
let (result, current, _pending) = self.inner.state(duration_to_clocktime(timeout)?);
|
|
result.change_context(Error).attach("Failed to get state")?;
|
|
Ok(current)
|
|
}
|
|
|
|
pub fn play(&self) -> Result<()> {
|
|
self.inner
|
|
.set_state(gstreamer::State::Playing)
|
|
.change_context(Error)
|
|
.attach("Failed to set pipeline to Playing state")?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn pause(&self) -> Result<()> {
|
|
self.inner
|
|
.set_state(gstreamer::State::Paused)
|
|
.change_context(Error)
|
|
.attach("Failed to set pipeline to Paused state")?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn ready(&self) -> Result<()> {
|
|
self.inner
|
|
.set_state(gstreamer::State::Ready)
|
|
.change_context(Error)
|
|
.attach("Failed to set pipeline to Ready state")?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn stop(&self) -> Result<()> {
|
|
self.inner
|
|
.set_state(gstreamer::State::Null)
|
|
.change_context(Error)
|
|
.attach("Failed to set pipeline to Null state")?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
|
|
let result = self
|
|
.inner
|
|
.set_state(state)
|
|
.change_context(Error)
|
|
.attach("Failed to set pipeline state")?;
|
|
Ok(result)
|
|
}
|
|
|
|
pub async fn wait_for(&self, state: gstreamer::State) -> Result<()> {
|
|
let current_state = self.state(core::time::Duration::ZERO)?;
|
|
if current_state == state {
|
|
Ok(())
|
|
} else {
|
|
// use futures::stream::StreamExt;
|
|
use futures_lite::stream::StreamExt as _;
|
|
self.bus()?
|
|
.filtered_stream(&[MessageType::StateChanged])
|
|
.find(|message: &gstreamer::Message| {
|
|
let view = message.view();
|
|
if let gstreamer::MessageView::StateChanged(changed) = view {
|
|
changed.current() == state
|
|
&& changed.src().is_some_and(|s| s == &self.inner)
|
|
} else {
|
|
false
|
|
}
|
|
})
|
|
.await;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub async fn wait_for_states(&self, states: impl AsRef<[gstreamer::State]>) -> Result<()> {
|
|
let current_state = self.state(core::time::Duration::ZERO)?;
|
|
let states = states.as_ref();
|
|
if states.contains(¤t_state) {
|
|
Ok(())
|
|
} else {
|
|
use futures_lite::stream::StreamExt as _;
|
|
self.bus()?
|
|
.filtered_stream(&[MessageType::StateChanged])
|
|
.find(|message: &gstreamer::Message| {
|
|
let view = message.view();
|
|
if let gstreamer::MessageView::StateChanged(changed) = view {
|
|
states.contains(&changed.current())
|
|
&& changed.src().is_some_and(|s| s == &self.inner)
|
|
} else {
|
|
false
|
|
}
|
|
})
|
|
.await;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub async fn wait_for_message<'a, F2>(
|
|
&self,
|
|
filter: Option<&'a [gstreamer::MessageType]>,
|
|
filter_fn: F2,
|
|
) -> Result<gstreamer::Message>
|
|
where
|
|
F2: Fn(&gstreamer::Message) -> bool + Send + 'a,
|
|
{
|
|
use futures_lite::stream::StreamExt as _;
|
|
match filter {
|
|
Some(filter) => {
|
|
let message = self.bus()?.filtered_stream(filter).find(filter_fn).await;
|
|
match message {
|
|
Some(msg) => Ok(msg),
|
|
None => {
|
|
Err(Error).attach("Failed to find message matching the provided filter")
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
let message = self.bus()?.stream().find(filter_fn).await;
|
|
match message {
|
|
Some(msg) => Ok(msg),
|
|
None => {
|
|
Err(Error).attach("Failed to find message matching the provided filter")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub trait PipelineExt: ChildOf<Pipeline> + Sync {
|
|
// #[track_caller]
|
|
// fn bus(&self) -> Result<Bus> {
|
|
// self.upcast_ref().bus()
|
|
// }
|
|
#[track_caller]
|
|
fn play(&self) -> Result<()> {
|
|
self.upcast_ref().play()
|
|
}
|
|
#[track_caller]
|
|
fn pause(&self) -> Result<()> {
|
|
self.upcast_ref().pause()
|
|
}
|
|
#[track_caller]
|
|
fn ready(&self) -> Result<()> {
|
|
self.upcast_ref().ready()
|
|
}
|
|
|
|
#[track_caller]
|
|
fn stop(&self) -> Result<()> {
|
|
self.upcast_ref().stop()
|
|
}
|
|
|
|
#[track_caller]
|
|
fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
|
|
self.upcast_ref().set_state(state)
|
|
}
|
|
#[track_caller]
|
|
fn state(&self, timeout: impl Into<Option<core::time::Duration>>) -> Result<gstreamer::State> {
|
|
self.upcast_ref().state(timeout)
|
|
}
|
|
|
|
fn wait_for(
|
|
&self,
|
|
state: gstreamer::State,
|
|
) -> impl std::future::Future<Output = Result<()>> + Send {
|
|
self.upcast_ref().wait_for(state)
|
|
}
|
|
|
|
fn wait_for_states(
|
|
&self,
|
|
states: impl AsRef<[gstreamer::State]> + Send,
|
|
) -> impl std::future::Future<Output = Result<()>> + Send {
|
|
self.upcast_ref().wait_for_states(states)
|
|
}
|
|
|
|
fn wait_for_message<'a, F2>(
|
|
&self,
|
|
filter: Option<&'a [gstreamer::MessageType]>,
|
|
filter_fn: F2,
|
|
) -> impl std::future::Future<Output = Result<gstreamer::Message>> + Send
|
|
where
|
|
F2: Fn(&gstreamer::Message) -> bool + Send + 'a,
|
|
{
|
|
self.upcast_ref().wait_for_message(filter, filter_fn)
|
|
}
|
|
}
|
|
|
|
impl<T: ChildOf<Pipeline> + Sync> PipelineExt for T {}
|