feat: Get iced-video working

This commit is contained in:
uttarayan21
2025-12-25 02:14:56 +05:30
parent 3382aebb1f
commit ebe2312272
18 changed files with 714 additions and 189 deletions

View File

@@ -17,4 +17,11 @@ impl Bus {
pub fn stream(&self) -> gstreamer::bus::BusStream {
self.inner.stream()
}
pub fn filtered_stream<'a>(
&self,
msg_types: &'a [gstreamer::MessageType],
) -> impl futures::stream::FusedStream<Item = gstreamer::Message> + Unpin + Send + 'a {
self.inner.stream_filtered(msg_types)
}
}

View File

@@ -1,4 +1,6 @@
use crate::*;
use gstreamer::Fraction;
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Caps {
pub(crate) inner: gstreamer::caps::Caps,
@@ -16,7 +18,6 @@ pub struct CapsBuilder {
impl CapsBuilder {
pub fn field<V: Into<glib::Value> + Send>(mut self, name: impl AsRef<str>, value: V) -> Self {
use gstreamer::prelude::*;
self.inner = self.inner.field(name.as_ref(), value);
self
}
@@ -53,10 +54,25 @@ impl CapsBuilder {
}
impl Caps {
pub fn format(&self) -> Option<&str> {
use gstreamer::prelude::*;
pub fn format(&self) -> Option<gstreamer_video::VideoFormat> {
self.inner
.structure(0)
.and_then(|s| s.get::<&str>("format").ok())
.map(|s| gstreamer_video::VideoFormat::from_string(s))
}
pub fn width(&self) -> Option<i32> {
self.inner
.structure(0)
.and_then(|s| s.get::<i32>("width").ok())
}
pub fn height(&self) -> Option<i32> {
self.inner
.structure(0)
.and_then(|s| s.get::<i32>("height").ok())
}
pub fn framerate(&self) -> Option<gstreamer::Fraction> {
self.inner
.structure(0)
.and_then(|s| s.get::<Fraction>("framerate").ok())
}
}

View File

@@ -27,6 +27,15 @@ impl Element {
use gstreamer::prelude::*;
self.inner.static_pad(name.as_ref()).map(Pad::from)
}
pub fn bus(&self) -> Result<Bus> {
use gstreamer::prelude::*;
self.inner
.bus()
.map(Bus::from)
.ok_or(Error)
.attach_with(|| format!("Failed to get bus from Element: {}", self.inner.name()))
}
}
pub trait Sink: ChildOf<Element> {
@@ -108,3 +117,17 @@ pub trait Source: ChildOf<Element> {
// Ok(())
// }
}
pub trait ElementExt: ChildOf<Element> + Sync {
#[track_caller]
fn bus(&self) -> Result<Bus> {
self.upcast_ref().bus()
}
#[track_caller]
fn pad(&self, name: impl AsRef<str>) -> Option<Pad> {
self.upcast_ref().pad(name)
}
}
impl<T: ChildOf<Element> + Sync> ElementExt for T {}

View File

@@ -13,6 +13,9 @@ pub use bin::*;
pub use bus::*;
pub use caps::*;
pub use element::*;
pub use gstreamer;
#[doc(inline)]
pub use gstreamer::{Message, MessageType, MessageView, State};
pub use pad::*;
pub use pipeline::*;
pub use plugins::*;
@@ -21,6 +24,7 @@ pub(crate) mod priv_prelude {
pub use crate::errors::*;
pub use crate::wrapper::*;
pub use crate::*;
pub use gstreamer::prelude::ElementExt as _;
pub use gstreamer::prelude::*;
#[track_caller]
pub fn duration_to_clocktime(
@@ -33,15 +37,12 @@ pub(crate) mod priv_prelude {
.attach("Failed to convert duration to ClockTime")?;
Ok(Some(clocktime))
}
None => Ok(None),
None => Ok(gstreamer::ClockTime::NONE),
}
}
}
use errors::*;
use gstreamer::prelude::*;
use std::sync::Arc;
static GST: std::sync::LazyLock<std::sync::Arc<Gst>> = std::sync::LazyLock::new(|| {
gstreamer::init().expect("Failed to initialize GStreamer");
std::sync::Arc::new(Gst {
@@ -49,7 +50,6 @@ static GST: std::sync::LazyLock<std::sync::Arc<Gst>> = std::sync::LazyLock::new(
})
});
/// This should be a global singleton
pub struct Gst {
__private: core::marker::PhantomData<()>,
}
@@ -58,14 +58,4 @@ impl Gst {
pub fn new() -> Arc<Self> {
Arc::clone(&GST)
}
// pub fn pipeline_from_str(&self, s: &str) -> Result<Pipeline> {
// let pipeline = gstreamer::parse::launch(s).change_context(Error)?;
// let pipeline = pipeline.downcast::<gstreamer::Pipeline>();
// let pipeline = match pipeline {
// Err(_e) => return Err(Error).attach("Failed to downcast to Pipeline"),
// Ok(p) => p,
// };
// Ok(Pipeline { inner: pipeline })
// }
}

View File

@@ -1,8 +1,9 @@
use crate::priv_prelude::*;
/// Pads are link points between elements
wrap_gst!(Pad, gstreamer::Pad);
impl Pad {
#[track_caller]
pub fn ghost(target: &Pad) -> Result<Pad> {
let ghost_pad = gstreamer::GhostPad::with_target(&target.inner)
.change_context(Error)
@@ -12,6 +13,7 @@ impl Pad {
})
}
#[track_caller]
pub fn link(&self, peer: &Pad) -> Result<()> {
use gstreamer::prelude::*;
self.inner
@@ -21,6 +23,7 @@ impl Pad {
Ok(())
}
#[track_caller]
pub fn current_caps(&self) -> Result<Caps> {
let caps = self
.inner
@@ -30,6 +33,7 @@ impl Pad {
Ok(Caps { inner: caps })
}
#[track_caller]
pub fn activate(&self, activate: bool) -> Result<()> {
use gstreamer::prelude::*;
self.inner

View File

@@ -12,6 +12,7 @@ impl Drop for Pipeline {
}
impl Pipeline {
#[track_caller]
pub fn bus(&self) -> Result<Bus> {
let bus = self
.inner
@@ -22,15 +23,17 @@ impl Pipeline {
}
/// Get the state
#[track_caller]
pub fn state(
&self,
timeout: impl Into<Option<core::time::Duration>>,
) -> Result<gstreamer::State> {
let (result, current, pending) = self.inner.state(duration_to_clocktime(timeout)?);
let (result, current, _pending) = self.inner.state(duration_to_clocktime(timeout)?);
result.change_context(Error).attach("Failed to get state")?;
Ok(current)
}
#[track_caller]
pub fn play(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Playing)
@@ -39,6 +42,7 @@ impl Pipeline {
Ok(())
}
#[track_caller]
pub fn pause(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Paused)
@@ -47,6 +51,7 @@ impl Pipeline {
Ok(())
}
#[track_caller]
pub fn ready(&self) -> Result<()> {
self.inner
.set_state(gstreamer::State::Ready)
@@ -55,6 +60,7 @@ impl Pipeline {
Ok(())
}
#[track_caller]
pub fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
let result = self
.inner
@@ -63,37 +69,135 @@ impl Pipeline {
.attach("Failed to set pipeline state")?;
Ok(result)
}
}
pub trait PipelineExt {
fn bus(&self) -> Result<Bus>;
fn play(&self) -> Result<()>;
fn pause(&self) -> Result<()>;
fn ready(&self) -> Result<()>;
fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess>;
fn state(&self, timeout: impl Into<Option<core::time::Duration>>) -> Result<gstreamer::State>;
}
impl<T> PipelineExt for T
where
T: ChildOf<Pipeline>,
{
fn bus(&self) -> Result<Bus> {
self.upcast_ref().bus()
pub async fn wait_for(&self, state: gstreamer::State) -> Result<()> {
let current_state = self.state(core::time::Duration::ZERO)?;
if current_state == state {
Ok(())
} else {
// use futures::stream::StreamExt;
use futures_lite::stream::StreamExt as _;
self.bus()?
.filtered_stream(&[MessageType::StateChanged])
.find(|message: &gstreamer::Message| {
let view = message.view();
if let gstreamer::MessageView::StateChanged(changed) = view {
changed.current() == state
&& changed.src().is_some_and(|s| s == &self.inner)
} else {
false
}
})
.await;
Ok(())
}
}
pub async fn wait_for_states(&self, states: impl AsRef<[gstreamer::State]>) -> Result<()> {
let current_state = self.state(core::time::Duration::ZERO)?;
let states = states.as_ref();
if states.contains(&current_state) {
Ok(())
} else {
use futures_lite::stream::StreamExt as _;
self.bus()?
.filtered_stream(&[MessageType::StateChanged])
.find(|message: &gstreamer::Message| {
let view = message.view();
if let gstreamer::MessageView::StateChanged(changed) = view {
states.contains(&changed.current())
&& changed.src().is_some_and(|s| s == &self.inner)
} else {
false
}
})
.await;
Ok(())
}
}
pub async fn wait_for_message<'a, F2>(
&self,
filter: Option<&'a [gstreamer::MessageType]>,
filter_fn: F2,
) -> Result<gstreamer::Message>
where
F2: Fn(&gstreamer::Message) -> bool + Send + 'a,
{
use futures_lite::stream::StreamExt as _;
match filter {
Some(filter) => {
let message = self.bus()?.filtered_stream(filter).find(filter_fn).await;
match message {
Some(msg) => Ok(msg),
None => {
Err(Error).attach("Failed to find message matching the provided filter")
}
}
}
None => {
let message = self.bus()?.stream().find(filter_fn).await;
match message {
Some(msg) => Ok(msg),
None => {
Err(Error).attach("Failed to find message matching the provided filter")
}
}
}
}
}
}
pub trait PipelineExt: ChildOf<Pipeline> + Sync {
// #[track_caller]
// fn bus(&self) -> Result<Bus> {
// self.upcast_ref().bus()
// }
#[track_caller]
fn play(&self) -> Result<()> {
self.upcast_ref().play()
}
#[track_caller]
fn pause(&self) -> Result<()> {
self.upcast_ref().pause()
}
#[track_caller]
fn ready(&self) -> Result<()> {
self.upcast_ref().ready()
}
#[track_caller]
fn set_state(&self, state: gstreamer::State) -> Result<gstreamer::StateChangeSuccess> {
self.upcast_ref().set_state(state)
}
#[track_caller]
fn state(&self, timeout: impl Into<Option<core::time::Duration>>) -> Result<gstreamer::State> {
self.upcast_ref().state(timeout)
}
fn wait_for(
&self,
state: gstreamer::State,
) -> impl std::future::Future<Output = Result<()>> + Send {
self.upcast_ref().wait_for(state)
}
fn wait_for_states(
&self,
states: impl AsRef<[gstreamer::State]> + Send,
) -> impl std::future::Future<Output = Result<()>> + Send {
self.upcast_ref().wait_for_states(states)
}
fn wait_for_message<'a, F2>(
&self,
filter: Option<&'a [gstreamer::MessageType]>,
filter_fn: F2,
) -> impl std::future::Future<Output = Result<gstreamer::Message>> + Send
where
F2: Fn(&gstreamer::Message) -> bool + Send + 'a,
{
self.upcast_ref().wait_for_message(filter, filter_fn)
}
}
impl<T: ChildOf<Pipeline> + Sync> PipelineExt for T {}

View File

@@ -1,7 +1,9 @@
use crate::priv_prelude::*;
#[doc(inline)]
pub use gstreamer_app::AppSinkCallbacks;
wrap_gst!(AppSink, gstreamer::Element);
parent_child!(Pipeline, AppSink, downcast); // since AppSink is an Element internaly
parent_child!(Element, AppSink);
impl Sink for AppSink {}
@@ -12,6 +14,7 @@ impl AppSink {
.downcast_ref::<gstreamer_app::AppSink>()
.expect("Failed to downcast to AppSink")
}
pub fn new(name: impl AsRef<str>) -> Result<Self> {
use gstreamer::prelude::*;
let inner = gstreamer::ElementFactory::make("appsink")
@@ -22,14 +25,47 @@ impl AppSink {
Ok(AppSink { inner })
}
pub fn with_caps(mut self, caps: Caps) -> Self {
pub fn with_emit_signals(self, emit: bool) -> Self {
self.inner.set_property("emit-signals", emit);
self
}
pub fn with_async(self, async_: bool) -> Self {
self.inner.set_property("async", async_);
self
}
pub fn with_sync(self, sync: bool) -> Self {
self.inner.set_property("sync", sync);
self
}
pub fn with_caps(self, caps: Caps) -> Self {
self.inner.set_property("caps", caps.inner);
self
}
pub fn set_callbacks(&self, callbacks: gstreamer_app::AppSinkCallbacks) -> Result<()> {
pub fn with_callbacks(self, callbacks: gstreamer_app::AppSinkCallbacks) -> Self {
self.appsink().set_callbacks(callbacks);
Ok(())
self
}
pub fn on_new_frame<F>(self, mut f: F) -> Self
where
F: FnMut(&AppSink) -> Result<(), gstreamer::FlowError> + Send + 'static,
{
self.with_emit_signals(true).with_callbacks(
AppSinkCallbacks::builder()
.new_sample(move |appsink| {
use glib::object::Cast;
let element = appsink.upcast_ref::<gstreamer::Element>();
let appsink = AppSink::from_gst_ref(element);
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(appsink)))
.unwrap_or(Err(gstreamer::FlowError::Error))
.map(|_| gstreamer::FlowSuccess::Ok)
})
.build(),
)
}
pub fn pull_sample(&self) -> Result<Sample> {
@@ -151,3 +187,49 @@ fn test_appsink() {
}
// std::thread::sleep(std::time::Duration::from_secs(5));
}
#[test]
fn test_appsink_metadata() {
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_file(true),
)
.init();
crate::Gst::new();
let url = "https://jellyfin.tsuba.darksailor.dev/Items/6010382cf25273e624d305907010d773/Download?api_key=036c140222464878862231ef66a2bc9c";
let videoconvert = crate::plugins::videoconvertscale::VideoConvert::new("iced-video-convert")
// .unwrap();
// .with_output_format(gst::plugins::videoconvertscale::VideoFormat::Rgba)
.unwrap();
let appsink = crate::plugins::app::AppSink::new("iced-video-sink")
.unwrap()
.with_async(true)
.with_sync(true);
let video_sink = videoconvert.link(&appsink).unwrap();
let playbin = crate::plugins::playback::Playbin3::new("iced-video")
.unwrap()
.with_uri(url)
.with_video_sink(&video_sink);
playbin.pause().unwrap();
smol::block_on(async {
playbin.wait_for(gstreamer::State::Paused).await.unwrap();
});
// std::thread::sleep(core::time::Duration::from_secs(1));
let pad = appsink.pad("sink").unwrap();
let caps = pad.current_caps().unwrap();
let format = caps.format();
let height = caps.height();
let width = caps.width();
let framerate = caps.framerate();
dbg!(&format, height, width, framerate);
dbg!(&caps);
}

View File

@@ -3,6 +3,7 @@ pub trait GstWrapper {
fn from_gst(gst: Self::GstType) -> Self;
// fn into_gst(self) -> Self::GstType;
fn as_gst_ref(&self) -> &Self::GstType;
fn from_gst_ref(gst: &Self::GstType) -> &Self;
}
#[macro_export]
@@ -51,6 +52,10 @@ macro_rules! wrap_gst {
fn as_gst_ref(&self) -> &Self::GstType {
&self.inner
}
fn from_gst_ref(gst: &Self::GstType) -> &Self {
unsafe { &*(gst as *const Self::GstType as *const Self) }
}
}
impl ChildOf<$name> for $name {
@@ -97,7 +102,14 @@ macro_rules! parent_child {
let downcasted = self
.inner
.downcast_ref::<<$parent as GstWrapper>::GstType>()
.expect("BUG: Failed to downcast GStreamer type from child to parent");
.expect(
format!(
"BUG: Failed to downcast GStreamer type from child {} to parent {}",
stringify!($child),
stringify!($parent)
)
.as_str(),
);
unsafe {
&*(downcasted as *const <$parent as GstWrapper>::GstType as *const $parent)
}