Modify architecture to run pipewire loop in second thread.

The pipewire loop now runs without interruption in a second thread and communicates with
the GTK thread via a channel in each direction, instead of checking for events once a second and using callbacks.

This allows changes to appear instantly in the view, instead of having to wait.
This commit is contained in:
Tom A. Wagner
2021-05-05 21:43:52 +02:00
parent 75aa0a30d0
commit 076fec7eb4
9 changed files with 292 additions and 346 deletions

View File

@@ -1,13 +1,12 @@
use std::{cell::RefCell, collections::HashMap, rc::Rc};
use gtk::glib::{self, clone};
use libspa::{ForeignDict, ReadableDict};
use gtk::glib::{self, clone, Continue, Receiver};
use log::{info, warn};
use pipewire::{port::Direction, registry::GlobalObject, types::ObjectType};
use pipewire::spa::Direction;
use crate::{pipewire_connection::PipewireConnection, view};
use crate::{view, PipewireLink, PipewireMessage};
#[derive(Copy, Clone)]
#[derive(Debug, Copy, Clone)]
pub enum MediaType {
Audio,
Video,
@@ -41,7 +40,6 @@ enum Item {
///
/// It also keeps and manages a state object that contains the current state of objects present on the remote.
pub struct Controller {
con: Rc<RefCell<PipewireConnection>>,
state: HashMap<u32, Item>,
view: Rc<view::View>,
}
@@ -56,95 +54,52 @@ impl Controller {
/// will also drop the controller, unless the `Rc` is cloned outside of this function.
pub(super) fn new(
view: Rc<view::View>,
con: Rc<RefCell<PipewireConnection>>,
gtk_receiver: Receiver<PipewireMessage>,
) -> Rc<RefCell<Controller>> {
let result = Rc::new(RefCell::new(Controller {
con,
view,
state: HashMap::new(),
}));
result
.borrow()
.con
.borrow_mut()
.on_global_add(Some(Box::new(
clone!(@weak result as this => move |global| {
this.borrow_mut().global_add(global);
}),
)));
result
.borrow()
.con
.borrow_mut()
.on_global_remove(Some(Box::new(clone!(@weak result as this => move |id| {
this.borrow_mut().global_remove(id);
}))));
// React to messages received from the pipewire thread.
gtk_receiver.attach(
None,
clone!(
@weak result as controller => @default-return Continue(true),
move |msg| {
match msg {
PipewireMessage::NodeAdded {
id,
name,
media_type,
} => controller.borrow_mut().add_node(id, name, media_type),
PipewireMessage::PortAdded {
id,
node_id,
name,
direction,
} => controller
.borrow_mut()
.add_port(id, node_id, name, direction),
PipewireMessage::LinkAdded { id, link } => controller.borrow_mut().add_link(id, link),
PipewireMessage::ObjectRemoved { id } => controller.borrow_mut().remove_global(id),
};
Continue(true)
}
)
);
result
}
/// Handle a new global object being added.
/// Relevant objects are displayed to the user and/or stored to the state.
///
/// It is called from the `PipewireConnection` via callback.
fn global_add(&mut self, global: &GlobalObject<ForeignDict>) {
match global.type_ {
ObjectType::Node => {
self.add_node(global);
}
ObjectType::Port => {
self.add_port(global);
}
ObjectType::Link => {
self.add_link(global);
}
_ => {}
}
}
/// Handle a node object being added.
fn add_node(&mut self, node: &GlobalObject<ForeignDict>) {
info!("Adding node to graph: id {}", node.id);
pub(super) fn add_node(&mut self, id: u32, name: String, media_type: Option<MediaType>) {
info!("Adding node to graph: id {}", id);
// Get the nicest possible name for the node, using a fallback chain of possible name attributes.
let node_name = &node
.props
.as_ref()
.map(|dict| {
String::from(
dict.get("node.nick")
.or_else(|| dict.get("node.description"))
.or_else(|| dict.get("node.name"))
.unwrap_or_default(),
)
})
.unwrap_or_default();
// FIXME: This relies on the node being passed to us by the pipwire server before its port.
let media_type = node
.props
.as_ref()
.map(|props| {
props.get("media.class").map(|class| {
if class.contains("Audio") {
Some(MediaType::Audio)
} else if class.contains("Video") {
Some(MediaType::Video)
} else if class.contains("Midi") {
Some(MediaType::Midi)
} else {
None
}
})
})
.flatten()
.flatten();
self.view.add_node(node.id, node_name);
self.view.add_node(id, name.as_str());
self.state.insert(
node.id,
id,
Item::Node {
// widget: node_widget,
media_type,
@@ -153,94 +108,43 @@ impl Controller {
}
/// Handle a port object being added.
fn add_port(&mut self, port: &GlobalObject<ForeignDict>) {
info!("Adding port to graph: id {}", port.id);
pub(super) fn add_port(&mut self, id: u32, node_id: u32, name: String, direction: Direction) {
info!("Adding port to graph: id {}", id);
// Update graph to contain the new port.
let props = port
.props
.as_ref()
.expect("Port object is missing properties");
let port_label = props.get("port.name").unwrap_or_default().to_string();
let node_id: u32 = props
.get("node.id")
.expect("Port has no node.id property!")
.parse()
.expect("Could not parse node.id property");
// Find out the nodes media type so that the port can be colored.
let media_type = if let Some(Item::Node { media_type, .. }) = self.state.get(&node_id) {
media_type.to_owned()
} else {
warn!("Node not found for Port {}", port.id);
warn!("Node not found for Port {}", id);
None
};
self.view.add_port(
node_id,
port.id,
&port_label,
if matches!(props.get("port.direction"), Some("in")) {
Direction::Input
} else {
Direction::Output
},
media_type,
);
self.view
.add_port(node_id, id, &name, direction, media_type);
// Save node_id so we can delete this port easily.
self.state.insert(port.id, Item::Port { node_id });
self.state.insert(id, Item::Port { node_id });
}
/// Handle a link object being added.
fn add_link(&mut self, link: &GlobalObject<ForeignDict>) {
info!("Adding link to graph: id {}", link.id);
pub(super) fn add_link(&mut self, id: u32, link: PipewireLink) {
info!("Adding link to graph: id {}", id);
// FIXME: Links should be colored depending on the data they carry (video, audio, midi) like ports are.
self.state.insert(link.id, Item::Link);
self.state.insert(id, Item::Link);
// Update graph to contain the new link.
let props = link
.props
.as_ref()
.expect("Link object is missing properties");
let input_node: u32 = props
.get("link.input.node")
.expect("Link has no link.input.node property")
.parse()
.expect("Could not parse link.input.node property");
let input_port: u32 = props
.get("link.input.port")
.expect("Link has no link.input.port property")
.parse()
.expect("Could not parse link.input.port property");
let output_node: u32 = props
.get("link.output.node")
.expect("Link has no link.input.node property")
.parse()
.expect("Could not parse link.input.node property");
let output_port: u32 = props
.get("link.output.port")
.expect("Link has no link.output.port property")
.parse()
.expect("Could not parse link.output.port property");
self.view.add_link(
link.id,
crate::PipewireLink {
node_from: output_node,
port_from: output_port,
node_to: input_node,
port_to: input_port,
},
);
self.view.add_link(id, link);
}
/// Handle a globalobject being removed.
/// Relevant objects are removed from the view and/or the state.
///
/// This is called from the `PipewireConnection` via callback.
fn global_remove(&mut self, id: u32) {
pub(super) fn remove_global(&mut self, id: u32) {
if let Some(item) = self.state.remove(&id) {
match item {
Item::Node { .. } => {

View File

@@ -4,7 +4,38 @@ mod view;
use std::rc::Rc;
use gtk::{glib, prelude::*};
use controller::MediaType;
use gtk::glib::{self, PRIORITY_DEFAULT};
use pipewire::spa::Direction;
/// Messages used GTK thread to command the pipewire thread.
#[derive(Debug)]
enum GtkMessage {
/// Quit the event loop and let the thread finish.
Terminate,
}
/// Messages used pipewire thread to notify the GTK thread.
#[derive(Debug)]
enum PipewireMessage {
/// A new node has appeared.
NodeAdded {
id: u32,
name: String,
media_type: Option<MediaType>,
},
/// A new port has appeared.
PortAdded {
id: u32,
node_id: u32,
name: String,
direction: Direction,
},
/// A new link has appeared.
LinkAdded { id: u32, link: PipewireLink },
/// An object was removed
ObjectRemoved { id: u32 },
}
#[derive(Debug)]
pub struct PipewireLink {
@@ -18,20 +49,22 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
gtk::init()?;
let view = Rc::new(view::View::new());
let pw_con = pipewire_connection::PipewireConnection::new()?;
let _controller = controller::Controller::new(view.clone(), pw_con.clone());
// Start the pipewire thread with channels in both directions.
let (gtk_sender, gtk_receiver) = glib::MainContext::channel(PRIORITY_DEFAULT);
let (pw_sender, pw_receiver) = pipewire::channel::channel();
let pw_thread =
std::thread::spawn(move || pipewire_connection::thread_main(gtk_sender, pw_receiver));
// Do an initial roundtrip before showing the view,
// so that the graph is already populated when the window opens.
pw_con.borrow().roundtrip();
// From now on, call roundtrip() every second.
glib::timeout_add_seconds_local(1, move || {
pw_con.borrow().roundtrip();
Continue(true)
});
let view = Rc::new(view::View::new());
let _controller = controller::Controller::new(view.clone(), gtk_receiver);
view.run();
pw_sender
.send(GtkMessage::Terminate)
.expect("Failed to send message");
pw_thread.join().expect("Pipewire thread panicked");
Ok(())
}

View File

@@ -1,123 +1,159 @@
use gtk::glib::{self, clone};
use libspa::ForeignDict;
use log::trace;
use once_cell::unsync::OnceCell;
use pipewire as pw;
use pw::registry::GlobalObject;
use std::{
cell::{Cell, RefCell},
rc::Rc,
use gtk::glib;
use pipewire::{
prelude::*,
registry::GlobalObject,
spa::{Direction, ForeignDict},
types::ObjectType,
Context, MainLoop,
};
/// This struct is responsible for communication with the pipewire server.
/// The owner of this struct can subscribe to notifications for globals added or removed.
///
/// It's `roundtrip` function must be called regularly to receive updates.
pub struct PipewireConnection {
mainloop: pw::MainLoop,
_context: pw::Context<pw::MainLoop>,
core: pw::Core,
registry: pw::registry::Registry,
listeners: OnceCell<pw::registry::Listener>,
on_global_add: Option<Box<dyn Fn(&GlobalObject<ForeignDict>)>>,
on_global_remove: Option<Box<dyn Fn(u32)>>,
}
use crate::{controller::MediaType, GtkMessage, PipewireMessage};
impl PipewireConnection {
/// Create a new Pipewire Connection.
///
/// This returns an `Rc`, because weak references to the result are needed inside closures set up during creation.
pub fn new() -> Result<Rc<RefCell<Self>>, pw::Error> {
// Initialize pipewire lib and obtain needed pipewire objects.
pw::init();
let mainloop = pw::MainLoop::new()?;
let context = pw::Context::new(&mainloop)?;
let core = context.connect(None)?;
let registry = core.get_registry()?;
/// The "main" function of the pipewire thread.
pub(super) fn thread_main(
gtk_sender: glib::Sender<PipewireMessage>,
pw_receiver: pipewire::channel::Receiver<GtkMessage>,
) {
let mainloop = MainLoop::new().expect("Failed to create mainloop");
let context = Context::new(&mainloop).expect("Failed to create context");
let core = context.connect(None).expect("Failed to connect to remote");
let registry = core.get_registry().expect("Failed to get registry");
let result = Rc::new(RefCell::new(Self {
mainloop,
_context: context,
core,
registry,
listeners: OnceCell::new(),
on_global_add: None,
on_global_remove: None,
}));
// Notify state on globals added / removed
let listeners = result
.borrow()
.registry
.add_listener_local()
.global(clone!(@weak result as this => move |global| {
trace!("Global is added: {}", global.id);
let con = this.borrow();
if let Some(callback) = con.on_global_add.as_ref() {
callback(global)
} else {
trace!("No on_global_add callback registered");
}
}))
.global_remove(clone!(@weak result as this => move |id| {
trace!("Global is removed: {}", id);
let con = this.borrow();
if let Some(callback) = con.on_global_remove.as_ref() {
callback(id)
} else {
trace!("No on_global_remove callback registered");
}
}))
.register();
// Makeshift `expect()`: listeners does not implement `Debug`, so we can not use `expect`.
assert!(
result.borrow_mut().listeners.set(listeners).is_ok(),
"PipewireConnection.listeners field already set"
);
Ok(result)
}
/// Receive all events from the pipewire server, sending them to the `pipewire_state` struct for processing.
pub fn roundtrip(&self) {
trace!("Starting roundtrip");
let done = Rc::new(Cell::new(false));
let pending = self
.core
.sync(0)
.expect("Failed to trigger core sync event");
let done_clone = done.clone();
let loop_clone = self.mainloop.clone();
let _listener = self
.core
.add_listener_local()
.done(move |id, seq| {
if id == pw::PW_ID_CORE && seq == pending {
done_clone.set(true);
loop_clone.quit();
}
})
.register();
while !done.get() {
self.mainloop.run();
let _receiver = pw_receiver.attach(&mainloop, {
let mainloop = mainloop.clone();
move |msg| match msg {
GtkMessage::Terminate => mainloop.quit(),
}
});
trace!("Roundtrip finished");
}
let _listener = registry
.add_listener_local()
.global({
let sender = gtk_sender.clone();
move |global| match global.type_ {
ObjectType::Node => handle_node(global, &sender),
ObjectType::Port => handle_port(global, &sender),
ObjectType::Link => handle_link(global, &sender),
_ => {
// Other objects are not interesting to us
}
}
})
.global_remove(move |id| {
gtk_sender
.send(PipewireMessage::ObjectRemoved { id })
.expect("Failed to send message")
})
.register();
/// Set or unset a callback that gets called when a new global is added.
pub fn on_global_add(&mut self, callback: Option<Box<dyn Fn(&GlobalObject<ForeignDict>)>>) {
self.on_global_add = callback;
}
/// Set or unset a callback that gets called when a global is removed.
pub fn on_global_remove(&mut self, callback: Option<Box<dyn Fn(u32)>>) {
self.on_global_remove = callback;
}
mainloop.run();
}
/// Handle a new node being added
fn handle_node(node: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) {
let props = node
.props
.as_ref()
.expect("Node object is missing properties");
// Get the nicest possible name for the node, using a fallback chain of possible name attributes.
let name = String::from(
props
.get("node.nick")
.or_else(|| props.get("node.description"))
.or_else(|| props.get("node.name"))
.unwrap_or_default(),
);
// FIXME: This relies on the node being passed to us by the pipwire server before its port.
let media_type = props
.get("media.class")
.map(|class| {
if class.contains("Audio") {
Some(MediaType::Audio)
} else if class.contains("Video") {
Some(MediaType::Video)
} else if class.contains("Midi") {
Some(MediaType::Midi)
} else {
None
}
})
.flatten();
sender
.send(PipewireMessage::NodeAdded {
id: node.id,
name,
media_type,
})
.expect("Failed to send message");
}
/// Handle a new port being added
fn handle_port(port: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) {
let props = port
.props
.as_ref()
.expect("Port object is missing properties");
let name = props.get("port.name").unwrap_or_default().to_string();
let node_id: u32 = props
.get("node.id")
.expect("Port has no node.id property!")
.parse()
.expect("Could not parse node.id property");
let direction = if matches!(props.get("port.direction"), Some("in")) {
Direction::Input
} else {
Direction::Output
};
sender
.send(PipewireMessage::PortAdded {
id: port.id,
node_id,
name,
direction,
})
.expect("Failed to send message");
}
/// Handle a new link being added
fn handle_link(link: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) {
let props = link
.props
.as_ref()
.expect("Link object is missing properties");
let node_from: u32 = props
.get("link.output.node")
.expect("Link has no link.input.node property")
.parse()
.expect("Could not parse link.input.node property");
let port_from: u32 = props
.get("link.output.port")
.expect("Link has no link.output.port property")
.parse()
.expect("Could not parse link.output.port property");
let node_to: u32 = props
.get("link.input.node")
.expect("Link has no link.input.node property")
.parse()
.expect("Could not parse link.input.node property");
let port_to: u32 = props
.get("link.input.port")
.expect("Link has no link.input.port property")
.parse()
.expect("Could not parse link.input.port property");
sender
.send(PipewireMessage::LinkAdded {
id: link.id,
link: crate::PipewireLink {
node_from,
port_from,
node_to,
port_to,
},
})
.expect("Failed to send message");
}

View File

@@ -13,7 +13,7 @@ use gtk::{
glib::{self, clone},
prelude::*,
};
use pipewire::port::Direction;
use pipewire::spa::Direction;
use crate::controller::MediaType;

View File

@@ -1,7 +1,7 @@
use super::graph_view::GraphView;
use gtk::{glib, prelude::*, subclass::prelude::*, WidgetExt};
use pipewire::port::Direction;
use pipewire::spa::Direction;
use std::{collections::HashMap, rc::Rc};

View File

@@ -1,9 +1,11 @@
use gtk::{glib, prelude::*, subclass::prelude::*};
use pipewire::spa::Direction;
use crate::controller::MediaType;
mod imp {
use once_cell::unsync::OnceCell;
use pipewire::spa::Direction;
use super::*;
@@ -11,7 +13,7 @@ mod imp {
#[derive(Default)]
pub struct Port {
pub(super) id: OnceCell<u32>,
pub(super) direction: OnceCell<pipewire::port::Direction>,
pub(super) direction: OnceCell<Direction>,
}
#[glib::object_subclass]
@@ -32,12 +34,7 @@ glib::wrapper! {
}
impl Port {
pub fn new(
id: u32,
name: &str,
direction: pipewire::port::Direction,
media_type: Option<MediaType>,
) -> Self {
pub fn new(id: u32, name: &str, direction: Direction, media_type: Option<MediaType>) -> Self {
// Create the widget and initialize needed fields
let res: Self = glib::Object::new(&[]).expect("Failed to create Port");
let private = imp::Port::from_instance(&res);
@@ -60,7 +57,7 @@ impl Port {
res
}
pub fn direction(&self) -> &pipewire::port::Direction {
pub fn direction(&self) -> &Direction {
let private = imp::Port::from_instance(self);
private.direction.get().expect("Port direction is not set")
}