Files
jello/gst/src/plugins/app/appsink.rs
servius 9dac0b6c78
Some checks failed
build / checks-matrix (push) Has been cancelled
build / checks-build (push) Has been cancelled
build / codecov (push) Has been cancelled
docs / docs (push) Has been cancelled
feat(iced-video): added video format to the video frame
2026-01-14 09:51:56 +05:30

279 lines
9.0 KiB
Rust

use crate::priv_prelude::*;
#[doc(inline)]
pub use gstreamer_app::AppSinkCallbacks;
wrap_gst!(AppSink, gstreamer::Element);
parent_child!(Element, AppSink);
pub struct AppSinkBuilder {
inner: AppSink,
callbacks: Option<gstreamer_app::app_sink::AppSinkCallbacksBuilder>,
}
impl AppSinkBuilder {
pub fn on_new_sample<F>(mut self, mut f: F) -> Self
where
F: FnMut(&AppSink) -> Result<(), gstreamer::FlowError> + Send + 'static,
{
let mut callbacks_builder = self
.callbacks
.take()
.unwrap_or_else(gstreamer_app::app_sink::AppSinkCallbacks::builder);
callbacks_builder = callbacks_builder.new_sample(move |appsink| {
use glib::object::Cast;
let element = appsink.upcast_ref::<gstreamer::Element>();
let appsink = AppSink::from_gst_ref(element);
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(appsink)))
.unwrap_or(Err(gstreamer::FlowError::Error))
.map(|_| gstreamer::FlowSuccess::Ok)
});
self.callbacks = Some(callbacks_builder);
self
}
pub fn on_new_preroll<F>(mut self, mut f: F) -> Self
where
F: FnMut(&AppSink) -> Result<(), gstreamer::FlowError> + Send + 'static,
{
let mut callbacks_builder = self
.callbacks
.take()
.unwrap_or_else(gstreamer_app::app_sink::AppSinkCallbacks::builder);
callbacks_builder = callbacks_builder.new_preroll(move |appsink| {
use glib::object::Cast;
let element = appsink.upcast_ref::<gstreamer::Element>();
let appsink = AppSink::from_gst_ref(element);
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(appsink)))
.unwrap_or(Err(gstreamer::FlowError::Error))
.map(|_| gstreamer::FlowSuccess::Ok)
});
self.callbacks = Some(callbacks_builder);
self
}
pub fn build(self) -> AppSink {
let AppSinkBuilder { inner, callbacks } = self;
if let Some(callbacks) = callbacks {
inner.appsink().set_callbacks(callbacks.build());
}
inner
}
}
impl Sink for AppSink {}
impl AppSink {
pub fn builder(name: impl AsRef<str>) -> AppSinkBuilder {
let inner = AppSink::new(name).expect("Failed to create AppSink");
AppSinkBuilder {
inner,
callbacks: None,
}
}
fn appsink(&self) -> &gstreamer_app::AppSink {
self.inner
.downcast_ref::<gstreamer_app::AppSink>()
.expect("Failed to downcast to AppSink")
}
pub fn new(name: impl AsRef<str>) -> Result<Self> {
let inner = gstreamer::ElementFactory::make("appsink")
.name(name.as_ref())
.build()
.change_context(Error)
.attach("Failed to create appsink element")?;
Ok(AppSink { inner })
}
pub fn emit_signals(&mut self, emit: bool) -> &mut Self {
self.inner.set_property("emit-signals", emit);
self
}
pub fn async_(&mut self, async_: bool) -> &mut Self {
self.inner.set_property("async", async_);
self
}
pub fn sync(&mut self, sync: bool) -> &mut Self {
self.inner.set_property("sync", sync);
self
}
pub fn drop(&mut self, drop: bool) -> &mut Self {
self.inner.set_property("drop", drop);
self
}
pub fn caps(&mut self, caps: Caps) -> &mut Self {
self.inner.set_property("caps", caps.inner);
self
}
pub fn callbacks(&mut self, callbacks: gstreamer_app::AppSinkCallbacks) -> &mut Self {
self.appsink().set_callbacks(callbacks);
self
}
pub fn on_new_sample<F>(&mut self, mut f: F) -> &mut Self
where
F: FnMut(&AppSink) -> Result<(), gstreamer::FlowError> + Send + 'static,
{
self.emit_signals(true).callbacks(
AppSinkCallbacks::builder()
.new_sample(move |appsink| {
use glib::object::Cast;
let element = appsink.upcast_ref::<gstreamer::Element>();
let appsink = AppSink::from_gst_ref(element);
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(appsink)))
.unwrap_or(Err(gstreamer::FlowError::Error))
.map(|_| gstreamer::FlowSuccess::Ok)
})
.build(),
)
}
pub fn pull_sample(&self) -> Result<Sample> {
self.appsink()
.pull_sample()
.change_context(Error)
.attach("Failed to pull sample from AppSink")
.map(Sample::from)
}
pub fn try_pull_sample(
&self,
timeout: impl Into<Option<core::time::Duration>>,
) -> Result<Option<Sample>> {
Ok(self
.appsink()
.try_pull_sample(duration_to_clocktime(timeout)?)
.map(From::from))
}
pub fn pull_preroll(&self) -> Result<Sample> {
self.appsink()
.pull_preroll()
.change_context(Error)
.attach("Failed to pull preroll sample from AppSink")
.map(Sample::from)
}
pub fn try_pull_preroll(
&self,
timeout: impl Into<Option<core::time::Duration>>,
) -> Result<Option<Sample>> {
Ok(self
.appsink()
.try_pull_preroll(duration_to_clocktime(timeout)?)
.map(From::from))
}
}
#[test]
fn test_appsink() {
use gstreamer::prelude::*;
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_file(true),
)
.init();
tracing::info!("Linking videoconvert to appsink");
Gst::new();
let playbin3 = playback::Playbin3::new("pppppppppppppppppppppppppppppp").unwrap().with_uri("https://jellyfin.tsuba.darksailor.dev/Items/6010382cf25273e624d305907010d773/Download?api_key=036c140222464878862231ef66a2bc9c");
let video_convert = plugins::videoconvertscale::VideoConvert::new("vcvcvcvcvcvcvcvcvcvcvcvcvc")
.expect("Create videoconvert");
let mut appsink = app::AppSink::new("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").expect("Create appsink");
appsink.caps(
Caps::builder(CapsType::Video)
.field("format", "RGB")
.build(),
);
let video_sink = video_convert
.link(&appsink)
.expect("Link videoconvert to appsink");
let playbin3 = playbin3.with_video_sink(&video_sink);
playbin3.play().expect("Play video");
let bus = playbin3.bus().unwrap();
for msg in bus.iter_timed(None) {
match msg.view() {
gstreamer::MessageView::Eos(..) => {
tracing::info!("End of stream reached");
break;
}
gstreamer::MessageView::Error(err) => {
tracing::error!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
break;
}
gstreamer::MessageView::StateChanged(state) => {
eprintln!(
"State changed from {:?} to \x1b[33m{:?}\x1b[0m for {:?}",
state.old(),
state.current(),
state.src().map(|s| s.path_string())
);
}
_ => {}
}
// tracing::info!("{:#?}", &msg.view());
}
// std::thread::sleep(std::time::Duration::from_secs(5));
}
#[test]
fn test_appsink_metadata() {
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_thread_ids(true)
.with_file(true),
)
.init();
crate::Gst::new();
let url = "https://jellyfin.tsuba.darksailor.dev/Items/6010382cf25273e624d305907010d773/Download?api_key=036c140222464878862231ef66a2bc9c";
let videoconvert = crate::plugins::videoconvertscale::VideoConvert::new("iced-video-convert")
// .unwrap();
// .with_output_format(gst::plugins::videoconvertscale::VideoFormat::Rgba)
.unwrap();
let appsink = crate::plugins::app::AppSink::new("iced-video-sink")
.unwrap()
.with_async(true)
.with_sync(true);
let video_sink = videoconvert.link(&appsink).unwrap();
let playbin = crate::plugins::playback::Playbin3::new("iced-video")
.unwrap()
.with_uri(url)
.with_video_sink(&video_sink);
playbin.pause().unwrap();
smol::block_on(async {
playbin.wait_for(gstreamer::State::Paused).await.unwrap();
});
// std::thread::sleep(core::time::Duration::from_secs(1));
let pad = appsink.pad("sink").unwrap();
let caps = pad.current_caps().unwrap();
let format = caps.format();
let height = caps.height();
let width = caps.width();
let framerate = caps.framerate();
dbg!(&format, height, width, framerate);
dbg!(&caps);
}