feat: Modify gst crate to add lot of more granularity

This commit is contained in:
uttarayan21
2025-12-22 13:27:30 +05:30
parent d42ef3b550
commit 043d1e99f0
23 changed files with 947 additions and 392 deletions

141
gst/src/pipeline.rs Normal file
View File

@@ -0,0 +1,141 @@
use crate::{playback::Playbin3, priv_prelude::*};
use gstreamer::State;
#[repr(transparent)]
pub struct Pipeline {
inner: gstreamer::Pipeline,
}
impl core::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("pipeline", &self.inner)
// .field("state", &self.pipeline.state(gstreamer::ClockTime::NONE))
.finish()
}
}
impl Drop for Pipeline {
fn drop(&mut self) {
let _ = self.inner.set_state(gstreamer::State::Null);
}
}
impl Pipeline {
pub fn bus(&self) -> Result<Bus> {
let bus = self
.inner
.bus()
.ok_or(Error)
.attach("Failed to get bus from pipeline")?;
Ok(Bus { bus })
}
/// Get the state
pub fn state(
&self,
timeout: impl Into<Option<core::time::Duration>>,
) -> Result<gstreamer::State> {
let (result, current, pending) = self.inner.state(duration_to_clocktime(timeout)?);
result.change_context(Error).attach("Failed to get state")?;
Ok(current)
}
pub fn wait_non_null_sync(&self) -> Result<()> {
if self
.state(core::time::Duration::ZERO)
.change_context(Error)
.attach("Failed to get video context")?
!= gstreamer::State::Null
{
Ok(())
} else {
let bus = self.bus()?;
for message in bus.iter_timed(core::time::Duration::from_secs(1)) {
let view = message.view();
dbg!(&view);
if let gstreamer::MessageView::StateChanged(change) = view
&& change.current() != State::Null
{
break;
}
}
Ok(())
}
}
/// Waits for the pipeline to be ready
pub async fn wait_non_null(&self) -> Result<()> {
if self
.state(None)
.change_context(Error)
.attach("Failed to get video context")?
!= gstreamer::State::Null
{
Ok(())
} else {
use futures::StreamExt;
self.bus()?
.stream()
.filter(|message: &gstreamer::Message| {
let view = message.view();
if let gstreamer::MessageView::StateChanged(change) = view {
core::future::ready(change.current() != gstreamer::State::Null)
} else {
core::future::ready(false)
}
})
.next()
.await;
Ok(())
}
}
pub fn play(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Playing)
.change_context(Error)
.attach("Failed to set pipeline to Playing state")?;
Ok(())
}
pub fn pause(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Paused)
.change_context(Error)
.attach("Failed to set pipeline to Paused state")?;
Ok(())
}
pub fn ready(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Ready)
.change_context(Error)
.attach("Failed to set pipeline to Paused state")?;
Ok(())
}
pub unsafe fn set_state(
&self,
state: gstreamer::State,
) -> Result<gstreamer::StateChangeSuccess> {
let result = self
.inner
.set_state(state)
.change_context(Error)
.attach("Failed to set pipeline state")?;
Ok(result)
}
}
impl core::ops::Deref for Playbin3 {
type Target = Pipeline;
fn deref(&self) -> &Self::Target {
let gp = self
.inner
.downcast_ref::<gstreamer::Pipeline>()
.expect("BUG: Playbin3 must be a pipeline");
unsafe { &*(gp as *const _ as *const Pipeline) }
}
}