feat: Restructure the gst parent<->child relations

This commit is contained in:
uttarayan21
2025-12-23 01:09:01 +05:30
parent 043d1e99f0
commit 8d46bd2b85
11 changed files with 274 additions and 343 deletions

View File

@@ -1,48 +1,28 @@
use crate::*;
use crate::priv_prelude::*;
#[repr(transparent)]
pub struct Bin {
inner: gstreamer::Bin,
}
wrap_gst!(Bin);
parent_child!(Element, Bin);
impl From<gstreamer::Bin> for Bin {
fn from(inner: gstreamer::Bin) -> Self {
Bin { inner }
}
}
impl IsElement for Bin {
fn as_element(&self) -> &Element {
let element = self.inner.upcast_ref::<gstreamer::Element>();
unsafe { core::mem::transmute(element) }
}
fn into_element(self) -> Element {
Element {
inner: self.inner.into(),
}
}
}
impl Bin {
pub fn new(name: impl AsRef<str>) -> Self {
let bin = gstreamer::Bin::with_name(name.as_ref());
Bin { inner: bin }
}
pub fn add(&mut self, element: &impl IsElement) -> Result<&mut Self> {
pub fn add(&mut self, element: &impl ChildOf<Element>) -> Result<&mut Self> {
self.inner
.add(&element.as_element().inner)
.add(&element.upcast_ref().inner)
.change_context(Error)
.attach("Failed to add element to bin")?;
Ok(self)
}
pub fn add_many<'a, E: IsElement + 'a>(
pub fn add_many<'a, E: ChildOf<Element> + 'a>(
&mut self,
elements: impl IntoIterator<Item = &'a E>,
) -> Result<&mut Self> {
self.inner
.add_many(elements.into_iter().map(|e| &e.as_element().inner))
.add_many(elements.into_iter().map(|e| &e.upcast_ref().inner))
.change_context(Error)
.attach("Failed to add elements to bin")?;
Ok(self)

View File

@@ -1,43 +1,45 @@
use crate::{Bin, Error, Pad, Result, ResultExt};
#[repr(transparent)]
pub struct Element {
pub(crate) inner: gstreamer::Element,
}
use crate::priv_prelude::*;
use crate::wrap_gst;
pub trait IsElement {
fn as_element(&self) -> &Element;
fn into_element(self) -> Element;
fn pad(&self, name: &str) -> Option<Pad> {
wrap_gst!(Element, gstreamer::Element);
// pub trait IsElement {
// fn upcast_ref(&self) -> &Element;
// fn into_element(self) -> Element;
// fn pad(&self, name: &str) -> Option<Pad> {
// use gstreamer::prelude::*;
// self.upcast_ref().inner.static_pad(name).map(Pad::from)
// }
// }
// impl IsElement for Element {
// fn upcast_ref(&self) -> &Element {
// self
// }
//
// fn into_element(self) -> Element {
// self
// }
// }
impl Element {
pub fn pad(&self, name: impl AsRef<str>) -> Option<Pad> {
use gstreamer::prelude::*;
self.as_element().inner.static_pad(name).map(Pad::from)
self.inner.static_pad(name.as_ref()).map(Pad::from)
}
}
impl IsElement for Element {
fn as_element(&self) -> &Element {
self
}
fn into_element(self) -> Element {
self
}
}
pub trait Sink: IsElement {
pub trait Sink: ChildOf<Element> {
fn sink(&self, name: impl AsRef<str>) -> Pad {
use gstreamer::prelude::*;
self.as_element()
.pad(name.as_ref())
.map(From::from)
self.upcast_ref()
.pad(name)
.expect("Sink element has no sink pad")
}
}
pub trait Source: IsElement {
pub trait Source: ChildOf<Element> {
fn source(&self, name: impl AsRef<str>) -> Pad {
use gstreamer::prelude::*;
self.as_element()
.pad(name.as_ref())
.map(From::from)
self.upcast_ref()
.pad(name)
.expect("Source element has no src pad")
}
@@ -46,13 +48,13 @@ pub trait Source: IsElement {
Self: Sized,
{
use gstreamer::prelude::*;
if let Ok(bin) = self.as_element().inner.clone().downcast::<gstreamer::Bin>() {
bin.add(&sink.as_element().inner)
if let Ok(bin) = self.upcast_ref().inner.clone().downcast::<gstreamer::Bin>() {
bin.add(&sink.upcast_ref().inner)
.change_context(Error)
.attach("Failed to add sink to bin")?;
self.as_element()
self.upcast_ref()
.inner
.link(&sink.as_element().inner)
.link(&sink.upcast_ref().inner)
.change_context(Error)
.attach("Failed to link elements")?;
Ok(Bin::from(bin))
@@ -60,22 +62,22 @@ pub trait Source: IsElement {
let bin = gstreamer::Bin::builder()
.name(format!(
"{}-link-{}",
self.as_element().inner.name(),
sink.as_element().inner.name()
self.upcast_ref().inner.name(),
sink.upcast_ref().inner.name()
))
.build();
bin.add(&self.as_element().inner)
bin.add(&self.upcast_ref().inner)
.change_context(Error)
.attach("Failed to add source to bin")?;
bin.add(&sink.as_element().inner)
bin.add(&sink.upcast_ref().inner)
.change_context(Error)
.attach("Failed to add sink to bin")?;
self.as_element()
self.upcast_ref()
.inner
.link(&sink.as_element().inner)
.link(&sink.upcast_ref().inner)
.change_context(Error)
.attach("Failed to link elements")?;
if let Some(sink_pad) = self.as_element().pad("sink") {
if let Some(sink_pad) = self.upcast_ref().pad("sink") {
let ghost_pad = Pad::ghost(&sink_pad)?;
bin.add_pad(&ghost_pad.inner)
.change_context(Error)
@@ -89,12 +91,12 @@ pub trait Source: IsElement {
// fn link_pad<S: Sink>(&self, sink: &S, src_pad_name: &str, sink_pad_name: &str) -> Result<()> {
// use gstreamer::prelude::*;
// let src_pad = self
// .as_element()
// .upcast_ref()
// .pad(src_pad_name)
// .ok_or(Error)
// .attach("Source pad not found")?;
// let sink_pad = sink
// .as_element()
// .upcast_ref()
// .pad(sink_pad_name)
// .ok_or(Error)
// .attach("Sink pad not found")?;

View File

@@ -1,19 +0,0 @@
// use crate::errors::*;
// /// This trait is used for implementing parent traits methods on children.
// pub trait Upcastable<T> {
// #[track_caller]
// fn upcast(self) -> T;
// fn upcast_ref(&self) -> &T;
// }
//
// // impl Upcastable<GenericPipeline> for crate::playback::Playbin3 {}
// impl<P, C> core::ops::Deref for C
// where
// C: Upcastable<P>,
// {
// type Target = P;
//
// fn deref(&self) -> &Self::Target {
// todo!()
// }
// }

View File

@@ -3,10 +3,11 @@ pub mod bus;
pub mod caps;
pub mod element;
pub mod errors;
pub mod isa;
pub mod pad;
pub mod pipeline;
pub mod plugins;
#[macro_use]
pub mod wrapper;
pub use bin::*;
pub use bus::*;
@@ -18,6 +19,7 @@ pub use plugins::*;
pub(crate) mod priv_prelude {
pub use crate::errors::*;
pub use crate::wrapper::*;
pub use crate::*;
pub use gstreamer::prelude::*;
#[track_caller]

View File

@@ -1,13 +1,35 @@
use crate::*;
/// Pads are link points between elements
use crate::priv_prelude::*;
#[derive(Debug)]
#[repr(transparent)]
pub struct Pad {
pub(crate) inner: gstreamer::Pad,
}
impl From<gstreamer::Pad> for Pad {
fn from(inner: gstreamer::Pad) -> Self {
Pad { inner }
Self { inner }
}
}
impl From<Pad> for gstreamer::Pad {
fn from(wrapper: Pad) -> Self {
wrapper.inner
}
}
impl Pad {
pub fn into_inner(self) -> gstreamer::Pad {
self.inner
}
}
impl GstWrapper for Pad {
type GstType = gstreamer::Pad;
fn from_gst(gst: Self::GstType) -> Self {
Self { inner: gst }
}
fn into_gst(self) -> Self::GstType {
self.inner
}
fn as_gst_ref(&self) -> &Self::GstType {
&self.inner
}
}
@@ -30,6 +52,15 @@ impl Pad {
Ok(())
}
pub fn current_caps(&self) -> Result<Caps> {
let caps = self
.inner
.current_caps()
.ok_or(Error)
.attach("Failed to get pad caps")?;
Ok(Caps { inner: caps })
}
pub fn activate(&self, activate: bool) -> Result<()> {
use gstreamer::prelude::*;
self.inner

View File

@@ -1,19 +1,9 @@
use crate::{playback::Playbin3, priv_prelude::*};
use gstreamer::State;
#[repr(transparent)]
pub struct Pipeline {
inner: gstreamer::Pipeline,
}
impl core::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Pipeline")
.field("pipeline", &self.inner)
// .field("state", &self.pipeline.state(gstreamer::ClockTime::NONE))
.finish()
}
}
wrap_gst!(Pipeline);
parent_child!(Element, Pipeline);
parent_child!(Bin, Pipeline);
impl Drop for Pipeline {
fn drop(&mut self) {
@@ -41,55 +31,56 @@ impl Pipeline {
Ok(current)
}
pub fn wait_non_null_sync(&self) -> Result<()> {
if self
.state(core::time::Duration::ZERO)
.change_context(Error)
.attach("Failed to get video context")?
!= gstreamer::State::Null
{
Ok(())
} else {
let bus = self.bus()?;
for message in bus.iter_timed(core::time::Duration::from_secs(1)) {
let view = message.view();
dbg!(&view);
if let gstreamer::MessageView::StateChanged(change) = view
&& change.current() != State::Null
{
break;
}
}
Ok(())
}
}
// pub fn wait_non_null_sync(&self) -> Result<()> {
// if dbg!(
// self.state(core::time::Duration::ZERO)
// .change_context(Error)
// .attach("Failed to get video context")
// )? != gstreamer::State::Null
// {
// Ok(())
// } else {
// let bus = self.bus()?;
// for message in bus.iter_timed(None) {
// let view = message.view();
// dbg!(&view);
// panic!();
// if let gstreamer::MessageView::StateChanged(change) = view
// && change.current() != State::Null
// {
// break;
// }
// }
// Ok(())
// }
// }
/// Waits for the pipeline to be ready
pub async fn wait_non_null(&self) -> Result<()> {
if self
.state(None)
.change_context(Error)
.attach("Failed to get video context")?
!= gstreamer::State::Null
{
Ok(())
} else {
use futures::StreamExt;
self.bus()?
.stream()
.filter(|message: &gstreamer::Message| {
let view = message.view();
if let gstreamer::MessageView::StateChanged(change) = view {
core::future::ready(change.current() != gstreamer::State::Null)
} else {
core::future::ready(false)
}
})
.next()
.await;
Ok(())
}
}
// Waits for the pipeline to be ready
// pub async fn wait_non_null(&self) -> Result<()> {
// if self
// .state(None)
// .change_context(Error)
// .attach("Failed to get video context")?
// != gstreamer::State::Null
// {
// Ok(())
// } else {
// use futures::StreamExt;
// self.bus()?
// .stream()
// .filter(|message: &gstreamer::Message| {
// let view = message.view();
// if let gstreamer::MessageView::StateChanged(change) = view {
// core::future::ready(change.current() != gstreamer::State::Null)
// } else {
// core::future::ready(false)
// }
// })
// .next()
// .await;
// Ok(())
// }
// }
pub fn play(&self) -> Result<()> {
self.inner
@@ -111,14 +102,11 @@ impl Pipeline {
self.inner
.set_state(gstreamer::State::Ready)
.change_context(Error)
.attach("Failed to set pipeline to Paused state")?;
.attach("Failed to set pipeline to Ready state")?;
Ok(())
}
pub unsafe fn set_state(
&self,
state: gstreamer::State,
) -> Result<gstreamer::StateChangeSuccess> {
pub fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
let result = self
.inner
.set_state(state)
@@ -127,15 +115,3 @@ impl Pipeline {
Ok(result)
}
}
impl core::ops::Deref for Playbin3 {
type Target = Pipeline;
fn deref(&self) -> &Self::Target {
let gp = self
.inner
.downcast_ref::<gstreamer::Pipeline>()
.expect("BUG: Playbin3 must be a pipeline");
unsafe { &*(gp as *const _ as *const Pipeline) }
}
}

View File

@@ -1,19 +1,8 @@
use crate::priv_prelude::*;
#[derive(Debug, Clone)]
pub struct AppSink {
inner: gstreamer::Element,
}
impl IsElement for AppSink {
fn as_element(&self) -> &Element {
unsafe { core::mem::transmute(&self.inner) }
}
fn into_element(self) -> Element {
Element { inner: self.inner }
}
}
wrap_gst!(AppSink, gstreamer::Element);
parent_child!(Pipeline, AppSink, downcast); // since AppSink is an Element internaly
parent_child!(Element, AppSink);
impl Sink for AppSink {}
@@ -136,28 +125,8 @@ fn test_appsink() {
.expect("Link videoconvert to appsink");
let playbin3 = playbin3.with_video_sink(&video_sink);
playbin3.play().expect("Play video");
let bus = playbin3.bus().unwrap();
// playbin3.play().expect("Play playbin3");
std::thread::spawn({
let playbin3 = playbin3.clone();
move || {
loop {
std::thread::sleep(std::time::Duration::from_secs(5));
playbin3.play();
playbin3.wait_non_null_sync();
let sample = appsink
.try_pull_sample(core::time::Duration::from_secs(5))
.expect("Pull sample from appsink")
.expect("No sample received from appsink");
dbg!(sample.caps());
playbin3.play();
tracing::info!("Played");
}
}
});
for msg in bus.iter_timed(None) {
match msg.view() {
gstreamer::MessageView::Eos(..) => {

View File

@@ -1,19 +1,8 @@
use crate::*;
use crate::priv_prelude::*;
#[repr(transparent)]
pub struct AutoVideoSink {
inner: gstreamer::Element,
}
impl IsElement for AutoVideoSink {
fn as_element(&self) -> &Element {
unsafe { core::mem::transmute(&self.inner) }
}
fn into_element(self) -> Element {
Element { inner: self.inner }
}
}
wrap_gst!(AutoVideoSink, gstreamer::Element);
parent_child!(Element, AutoVideoSink);
parent_child!(Bin, AutoVideoSink, downcast);
impl Sink for AutoVideoSink {}

View File

@@ -1,19 +1,13 @@
use crate::priv_prelude::*;
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Playbin3 {
pub(crate) inner: gstreamer::Element,
}
wrap_gst!(Playbin3, gstreamer::Element);
parent_child!(Element, Playbin3);
parent_child!(Pipeline, Playbin3, downcast);
parent_child!(Bin, Playbin3, downcast);
impl Drop for Playbin3 {
fn drop(&mut self) {
let _ = self
.inner
.set_state(gstreamer::State::Null)
.inspect_err(|e| {
tracing::error!("Failed to set playbin3 to Null state on drop: {:?}", e)
});
self.set_state(gstreamer::State::Null).ok();
}
}
@@ -33,24 +27,24 @@ impl Playbin3 {
self
}
pub fn with_video_sink(self, video_sink: &impl IsElement) -> Self {
pub fn with_video_sink(self, video_sink: &impl ChildOf<Element>) -> Self {
use gstreamer::prelude::*;
self.inner
.set_property("video-sink", &video_sink.as_element().inner);
.set_property("video-sink", &video_sink.upcast_ref().inner);
self
}
pub fn with_text_sink(self, text_sink: &impl IsElement) -> Self {
pub fn with_text_sink(self, text_sink: &impl ChildOf<Element>) -> Self {
use gstreamer::prelude::*;
self.inner
.set_property("text-sink", &text_sink.as_element().inner);
.set_property("text-sink", &text_sink.upcast_ref().inner);
self
}
pub fn with_audio_sink(self, audio_sink: &impl IsElement) -> Self {
pub fn with_audio_sink(self, audio_sink: &impl ChildOf<Element>) -> Self {
use gstreamer::prelude::*;
self.inner
.set_property("audio-sink", &audio_sink.as_element().inner);
.set_property("audio-sink", &audio_sink.upcast_ref().inner);
self
}
@@ -63,93 +57,16 @@ impl Playbin3 {
use gstreamer::prelude::*;
self.inner.property::<f64>("volume")
}
// pub fn play(&self) -> Result<()> {
// use gstreamer::prelude::*;
// self.inner
// .set_state(gstreamer::State::Playing)
// .change_context(Error)
// .attach("Failed to set playbin3 to Playing state")?;
// Ok(())
// }
//
// pub fn pause(&self) -> Result<()> {
// use gstreamer::prelude::*;
// self.inner
// .set_state(gstreamer::State::Paused)
// .change_context(Error)
// .attach("Failed to set playbin3 to Paused state")?;
// Ok(())
// }
//
// pub fn bus(&self) -> Result<Bus> {
// let bus = self
// .inner
// .bus()
// .ok_or(Error)
// .attach("Failed to get bus from playbin3")?;
// Ok(Bus { bus })
// }
}
// #[test]
// fn test_playbin3() {
// use gstreamer::prelude::*;
// use tracing_subscriber::prelude::*;
// tracing_subscriber::registry()
// .with(
// tracing_subscriber::fmt::layer()
// .with_thread_ids(true)
// .with_file(true),
// )
// .init();
// tracing::info!("Linking videoconvert to appsink");
// Gst::new();
// let playbin3 = Playbin3::new("pppppppppppppppppppppppppppppp").unwrap().with_uri("https://jellyfin.tsuba.darksailor.dev/Items/6010382cf25273e624d305907010d773/Download?api_key=036c140222464878862231ef66a2bc9c");
//
// let video_convert = plugins::videoconvertscale::VideoConvert::new("vcvcvcvcvcvcvcvcvcvcvcvcvc")
// .expect("Create videoconvert");
// let appsink =
// autodetect::AutoVideoSink::new("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").expect("Create appsink");
//
// let mut video_sink = video_convert
// .link(&appsink)
// .expect("Link videoconvert to appsink");
//
// let playbin3 = playbin3.with_video_sink(&video_sink);
// let bus = playbin3.bus().unwrap();
// playbin3.play().expect("Play playbin3");
// std::thread::spawn(move || loop {
// std::thread::sleep(std::time::Duration::from_secs(15));
// playbin3.play();
// tracing::info!("Played");
// });
// for msg in bus.iter_timed(None) {
// match msg.view() {
// gstreamer::MessageView::Eos(..) => {
// tracing::info!("End of stream reached");
// break;
// }
// gstreamer::MessageView::Error(err) => {
// tracing::error!(
// "Error from {:?}: {} ({:?})",
// err.src().map(|s| s.path_string()),
// err.error(),
// err.debug()
// );
// break;
// }
// gstreamer::MessageView::StateChanged(state) => {
// eprintln!(
// "State changed from {:?} to \x1b[33m{:?}\x1b[0m for {:?}",
// state.old(),
// state.current(),
// state.src().map(|s| s.path_string())
// );
// }
// _ => {}
// }
// // tracing::info!("{:#?}", &msg.view());
// }
// // std::thread::sleep(std::time::Duration::from_secs(5));
// }
impl core::ops::Deref for Playbin3 {
type Target = Pipeline;
fn deref(&self) -> &Self::Target {
let gp = self
.inner
.downcast_ref::<gstreamer::Pipeline>()
.expect("BUG: Playbin3 must be a pipeline");
unsafe { &*(gp as *const _ as *const Pipeline) }
}
}

View File

@@ -1,22 +1,9 @@
use crate::*;
use crate::priv_prelude::*;
#[doc(inline)]
pub use gstreamer_video::VideoFormat;
#[repr(transparent)]
#[derive(Debug, Clone)]
pub struct VideoConvert {
inner: gstreamer::Element,
}
impl IsElement for VideoConvert {
fn as_element(&self) -> &Element {
unsafe { core::mem::transmute(&self.inner) }
}
fn into_element(self) -> Element {
Element { inner: self.inner }
}
}
wrap_gst!(VideoConvert, gstreamer::Element);
parent_child!(Element, VideoConvert);
impl Sink for VideoConvert {}
impl Source for VideoConvert {}

97
gst/src/wrapper.rs Normal file
View File

@@ -0,0 +1,97 @@
pub trait GstWrapper {
type GstType: glib::prelude::ObjectType;
fn from_gst(gst: Self::GstType) -> Self;
fn into_gst(self) -> Self::GstType;
fn as_gst_ref(&self) -> &Self::GstType;
}
#[macro_export]
macro_rules! wrap_gst {
($name:ident) => {
wrap_gst!($name, gstreamer::$name);
};
($name:ident, $inner:ty) => {
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct $name {
pub(crate) inner: $inner,
}
impl From<$inner> for $name {
fn from(inner: $inner) -> Self {
Self { inner }
}
}
impl From<$name> for $inner {
fn from(wrapper: $name) -> Self {
wrapper.into_inner()
}
}
impl $name {
pub fn into_inner(self) -> $inner {
self.inner.clone()
}
}
impl $crate::wrapper::GstWrapper for $name {
type GstType = $inner;
fn from_gst(gst: Self::GstType) -> Self {
Self { inner: gst }
}
fn into_gst(self) -> Self::GstType {
self.inner.clone()
}
fn as_gst_ref(&self) -> &Self::GstType {
&self.inner
}
}
impl ChildOf<$name> for $name {
fn upcast_ref(&self) -> &$name {
self
}
}
};
}
/// A trait for types that can be upcasted to type T.
pub trait ChildOf<T> {
fn upcast_ref(&self) -> &T;
}
#[macro_export]
macro_rules! parent_child {
($parent:ty, $child:ty) => {
impl ChildOf<$parent> for $child
where
$child: GstWrapper,
$parent: GstWrapper,
{
fn upcast_ref(&self) -> &$parent {
let upcasted = self.inner.upcast_ref::<<$parent as GstWrapper>::GstType>();
unsafe { &*(upcasted as *const <$parent as GstWrapper>::GstType as *const $parent) }
}
}
};
($parent:ty, $child:ty, downcast) => {
impl ChildOf<$parent> for $child
where
$child: GstWrapper,
$parent: GstWrapper,
{
fn upcast_ref(&self) -> &$parent {
let downcasted = self
.inner
.downcast_ref::<<$parent as GstWrapper>::GstType>()
.expect("BUG: Failed to downcast GStreamer type from child to parent");
unsafe {
&*(downcasted as *const <$parent as GstWrapper>::GstType as *const $parent)
}
}
}
};
}