diff --git a/Cargo.lock b/Cargo.lock index 382376a..c7ce408 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,7 @@ version = "0.4.1" dependencies = [ "glib", "libadwaita", + "libc", "log", "once_cell", "pipewire", diff --git a/Cargo.toml b/Cargo.toml index 673a98f..a18731b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,4 @@ log = "0.4.11" once_cell = "1.7.2" +libc = "0.2" diff --git a/src/graph_manager.rs b/src/graph_manager.rs index 874d378..119092c 100644 --- a/src/graph_manager.rs +++ b/src/graph_manager.rs @@ -65,7 +65,8 @@ mod imp { PipewireMessage::LinkFormatChanged { id, media_type } => imp.link_format_changed(id, media_type), PipewireMessage::NodeRemoved { id } => imp.remove_node(id), PipewireMessage::PortRemoved { id, node_id } => imp.remove_port(id, node_id), - PipewireMessage::LinkRemoved { id } => imp.remove_link(id) + PipewireMessage::LinkRemoved { id } => imp.remove_link(id), + PipewireMessage::Disconnected => imp.clear(), }; glib::ControlFlow::Continue } @@ -280,6 +281,11 @@ mod imp { self.obj().graph().remove_link(&link); } + + fn clear(&self) { + self.items.borrow_mut().clear(); + self.obj().graph().clear(); + } } } diff --git a/src/main.rs b/src/main.rs index 9e95930..bfa8824 100644 --- a/src/main.rs +++ b/src/main.rs @@ -74,6 +74,7 @@ pub enum PipewireMessage { LinkRemoved { id: u32, }, + Disconnected, } #[derive(Debug, Clone)] diff --git a/src/pipewire_connection/mod.rs b/src/pipewire_connection/mod.rs index cf7f294..f03b677 100644 --- a/src/pipewire_connection/mod.rs +++ b/src/pipewire_connection/mod.rs @@ -16,10 +16,15 @@ mod state; -use std::{cell::RefCell, collections::HashMap, rc::Rc}; +use std::{ + cell::{Cell, RefCell}, + collections::HashMap, + rc::Rc, + time::Duration, +}; use adw::glib::{self, clone}; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use pipewire::{ link::{Link, LinkChangeMask, LinkInfo, LinkListener, LinkState}, port::{Port, PortChangeMask, PortInfo, PortListener}, @@ -28,7 +33,7 @@ use pipewire::{ registry::{GlobalObject, Registry}, spa::{ param::{ParamInfoFlags, ParamType}, - ForeignDict, + ForeignDict, SpaResult, }, types::ObjectType, Context, Core, MainLoop, @@ -51,56 +56,124 @@ enum ProxyItem { /// The "main" function of the pipewire thread. pub(super) fn thread_main( gtk_sender: glib::Sender, - pw_receiver: pipewire::channel::Receiver, + mut pw_receiver: pipewire::channel::Receiver, ) { let mainloop = MainLoop::new().expect("Failed to create mainloop"); - let context = Context::new(&mainloop).expect("Failed to create context"); - let core = Rc::new(context.connect(None).expect("Failed to connect to remote")); - let registry = Rc::new(core.get_registry().expect("Failed to get registry")); + let context = Rc::new(Context::new(&mainloop).expect("Failed to create context")); + let is_stopped = Rc::new(Cell::new(false)); - // Keep proxies and their listeners alive so that we can receive info events. - let proxies = Rc::new(RefCell::new(HashMap::new())); + while !is_stopped.get() { + // Try to connect + let core = match context.connect(None) { + Ok(core) => Rc::new(core), + Err(_) => { + // If connection is failed, try to connect every 200ms + let interval = Some(Duration::from_millis(200)); - let state = Rc::new(RefCell::new(State::new())); - - let _receiver = pw_receiver.attach(&mainloop, { - clone!(@strong mainloop, @weak core, @weak registry, @strong state => move |msg| match msg { - GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, ®istry, &state), - GtkMessage::Terminate => mainloop.quit(), - }) - }); - - let _listener = registry - .add_listener_local() - .global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state => - move |global| match global.type_ { - ObjectType::Node => handle_node(global, >k_sender, &state), - ObjectType::Port => handle_port(global, >k_sender, ®istry, &proxies, &state), - ObjectType::Link => handle_link(global, >k_sender, ®istry, &proxies, &state), - _ => { - // Other objects are not interesting to us - } - } - )) - .global_remove(clone!(@strong proxies, @strong state => move |id| { - if let Some(item) = state.borrow_mut().remove(id) { - gtk_sender.send(match item { - Item::Node { .. } => PipewireMessage::NodeRemoved {id}, - Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id}, - Item::Link { .. } => PipewireMessage::LinkRemoved {id}, - }).expect("Failed to send message"); - } else { - warn!( - "Attempted to remove item with id {} that is not saved in state", - id + let core = Rc::new(RefCell::new(None)); + let timer = mainloop.add_timer( + clone!(@strong mainloop, @strong context, @strong core => move |_| { + if let Ok(x) = context.connect(None) { + core.replace(Some(x)); + mainloop.quit(); + } + }), ); + + timer + .update_timer(interval, interval) + .into_result() + .unwrap(); + + let receiver = pw_receiver.attach(&mainloop, { + clone!(@strong mainloop, @strong is_stopped => move |msg| + if let GtkMessage::Terminate = msg { + // main thread requested stop + is_stopped.set(true); + mainloop.quit(); + } + ) + }); + + mainloop.run(); + pw_receiver = receiver.deattach(); + + if is_stopped.get() { + break; + } + + Rc::new(core.take().unwrap()) } + }; - proxies.borrow_mut().remove(&id); - })) - .register(); + let registry = Rc::new(core.get_registry().expect("Failed to get registry")); - mainloop.run(); + // Keep proxies and their listeners alive so that we can receive info events. + let proxies = Rc::new(RefCell::new(HashMap::new())); + let state = Rc::new(RefCell::new(State::new())); + + let receiver = pw_receiver.attach(&mainloop, { + clone!(@strong mainloop, @weak core, @weak registry, @strong state, @strong is_stopped => move |msg| match msg { + GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, ®istry, &state), + GtkMessage::Terminate => { + // main thread requested stop + is_stopped.set(true); + mainloop.quit(); + } + }) + }); + + let gtk_sender = gtk_sender.clone(); + let _listener = core.add_listener_local() + .error(clone!(@strong mainloop, @strong gtk_sender, @strong is_stopped => move |id, _seq, res, message| { + if id != pipewire::PW_ID_CORE { + return; + } + + if res == -libc::EPIPE { + gtk_sender.send(PipewireMessage::Disconnected) + .expect("Failed to send message"); + mainloop.quit(); + } else { + let serr = SpaResult::from_c(res).into_result().unwrap_err(); + error!("Pipewire Core received error {serr}: {message}"); + } + })) + .register(); + + let _listener = registry + .add_listener_local() + .global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state => + move |global| match global.type_ { + ObjectType::Node => handle_node(global, >k_sender, &state), + ObjectType::Port => handle_port(global, >k_sender, ®istry, &proxies, &state), + ObjectType::Link => handle_link(global, >k_sender, ®istry, &proxies, &state), + _ => { + // Other objects are not interesting to us + } + } + )) + .global_remove(clone!(@strong proxies, @strong state => move |id| { + if let Some(item) = state.borrow_mut().remove(id) { + gtk_sender.send(match item { + Item::Node { .. } => PipewireMessage::NodeRemoved {id}, + Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id}, + Item::Link { .. } => PipewireMessage::LinkRemoved {id}, + }).expect("Failed to send message"); + } else { + warn!( + "Attempted to remove item with id {} that is not saved in state", + id + ); + } + + proxies.borrow_mut().remove(&id); + })) + .register(); + + mainloop.run(); + pw_receiver = receiver.deattach(); + } } /// Handle a new node being added diff --git a/src/ui/graph/graph_view.rs b/src/ui/graph/graph_view.rs index 611bf67..549a4c5 100644 --- a/src/ui/graph/graph_view.rs +++ b/src/ui/graph/graph_view.rs @@ -796,6 +796,14 @@ impl GraphView { self.queue_draw(); } + pub fn clear(&mut self) { + self.imp().links.borrow_mut().clear(); + for (node, _) in self.imp().nodes.borrow_mut().drain() { + node.unparent(); + } + self.queue_draw(); + } + /// Get the position of the specified node inside the graphview. /// /// The returned position is in canvas-space (non-zoomed, (0, 0) fixed in the middle of the canvas).