pipewire connection: Reconnection to PipeWire server

This commit is contained in:
Denis Drakhnia
2023-09-01 06:50:30 +03:00
committed by Tom Wagner
parent f0da839383
commit 94323510aa
6 changed files with 136 additions and 46 deletions

View File

@@ -16,10 +16,15 @@
mod state;
use std::{cell::RefCell, collections::HashMap, rc::Rc};
use std::{
cell::{Cell, RefCell},
collections::HashMap,
rc::Rc,
time::Duration,
};
use adw::glib::{self, clone};
use log::{debug, info, warn};
use log::{debug, error, info, warn};
use pipewire::{
link::{Link, LinkChangeMask, LinkInfo, LinkListener, LinkState},
port::{Port, PortChangeMask, PortInfo, PortListener},
@@ -28,7 +33,7 @@ use pipewire::{
registry::{GlobalObject, Registry},
spa::{
param::{ParamInfoFlags, ParamType},
ForeignDict,
ForeignDict, SpaResult,
},
types::ObjectType,
Context, Core, MainLoop,
@@ -51,56 +56,124 @@ enum ProxyItem {
/// The "main" function of the pipewire thread.
pub(super) fn thread_main(
gtk_sender: glib::Sender<PipewireMessage>,
pw_receiver: pipewire::channel::Receiver<GtkMessage>,
mut pw_receiver: pipewire::channel::Receiver<GtkMessage>,
) {
let mainloop = MainLoop::new().expect("Failed to create mainloop");
let context = Context::new(&mainloop).expect("Failed to create context");
let core = Rc::new(context.connect(None).expect("Failed to connect to remote"));
let registry = Rc::new(core.get_registry().expect("Failed to get registry"));
let context = Rc::new(Context::new(&mainloop).expect("Failed to create context"));
let is_stopped = Rc::new(Cell::new(false));
// Keep proxies and their listeners alive so that we can receive info events.
let proxies = Rc::new(RefCell::new(HashMap::new()));
while !is_stopped.get() {
// Try to connect
let core = match context.connect(None) {
Ok(core) => Rc::new(core),
Err(_) => {
// If connection is failed, try to connect every 200ms
let interval = Some(Duration::from_millis(200));
let state = Rc::new(RefCell::new(State::new()));
let _receiver = pw_receiver.attach(&mainloop, {
clone!(@strong mainloop, @weak core, @weak registry, @strong state => move |msg| match msg {
GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, &registry, &state),
GtkMessage::Terminate => mainloop.quit(),
})
});
let _listener = registry
.add_listener_local()
.global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state =>
move |global| match global.type_ {
ObjectType::Node => handle_node(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &gtk_sender, &registry, &proxies, &state),
ObjectType::Link => handle_link(global, &gtk_sender, &registry, &proxies, &state),
_ => {
// Other objects are not interesting to us
}
}
))
.global_remove(clone!(@strong proxies, @strong state => move |id| {
if let Some(item) = state.borrow_mut().remove(id) {
gtk_sender.send(match item {
Item::Node { .. } => PipewireMessage::NodeRemoved {id},
Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id},
Item::Link { .. } => PipewireMessage::LinkRemoved {id},
}).expect("Failed to send message");
} else {
warn!(
"Attempted to remove item with id {} that is not saved in state",
id
let core = Rc::new(RefCell::new(None));
let timer = mainloop.add_timer(
clone!(@strong mainloop, @strong context, @strong core => move |_| {
if let Ok(x) = context.connect(None) {
core.replace(Some(x));
mainloop.quit();
}
}),
);
timer
.update_timer(interval, interval)
.into_result()
.unwrap();
let receiver = pw_receiver.attach(&mainloop, {
clone!(@strong mainloop, @strong is_stopped => move |msg|
if let GtkMessage::Terminate = msg {
// main thread requested stop
is_stopped.set(true);
mainloop.quit();
}
)
});
mainloop.run();
pw_receiver = receiver.deattach();
if is_stopped.get() {
break;
}
Rc::new(core.take().unwrap())
}
};
proxies.borrow_mut().remove(&id);
}))
.register();
let registry = Rc::new(core.get_registry().expect("Failed to get registry"));
mainloop.run();
// Keep proxies and their listeners alive so that we can receive info events.
let proxies = Rc::new(RefCell::new(HashMap::new()));
let state = Rc::new(RefCell::new(State::new()));
let receiver = pw_receiver.attach(&mainloop, {
clone!(@strong mainloop, @weak core, @weak registry, @strong state, @strong is_stopped => move |msg| match msg {
GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, &registry, &state),
GtkMessage::Terminate => {
// main thread requested stop
is_stopped.set(true);
mainloop.quit();
}
})
});
let gtk_sender = gtk_sender.clone();
let _listener = core.add_listener_local()
.error(clone!(@strong mainloop, @strong gtk_sender, @strong is_stopped => move |id, _seq, res, message| {
if id != pipewire::PW_ID_CORE {
return;
}
if res == -libc::EPIPE {
gtk_sender.send(PipewireMessage::Disconnected)
.expect("Failed to send message");
mainloop.quit();
} else {
let serr = SpaResult::from_c(res).into_result().unwrap_err();
error!("Pipewire Core received error {serr}: {message}");
}
}))
.register();
let _listener = registry
.add_listener_local()
.global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state =>
move |global| match global.type_ {
ObjectType::Node => handle_node(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &gtk_sender, &registry, &proxies, &state),
ObjectType::Link => handle_link(global, &gtk_sender, &registry, &proxies, &state),
_ => {
// Other objects are not interesting to us
}
}
))
.global_remove(clone!(@strong proxies, @strong state => move |id| {
if let Some(item) = state.borrow_mut().remove(id) {
gtk_sender.send(match item {
Item::Node { .. } => PipewireMessage::NodeRemoved {id},
Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id},
Item::Link { .. } => PipewireMessage::LinkRemoved {id},
}).expect("Failed to send message");
} else {
warn!(
"Attempted to remove item with id {} that is not saved in state",
id
);
}
proxies.borrow_mut().remove(&id);
}))
.register();
mainloop.run();
pw_receiver = receiver.deattach();
}
}
/// Handle a new node being added