Move state to pipewire thread instead of the gtk thread.

This will allow easier state-keeping later, when setting up info-listeners on structs.
This commit is contained in:
Tom A. Wagner
2021-06-06 09:20:07 +02:00
parent 118c1ca28c
commit 907ef328d2
4 changed files with 275 additions and 270 deletions

View File

@@ -1,4 +1,4 @@
use std::{cell::RefCell, collections::HashMap}; use std::cell::RefCell;
use gtk::{ use gtk::{
gio, gio,
@@ -6,21 +6,14 @@ use gtk::{
prelude::*, prelude::*,
subclass::prelude::*, subclass::prelude::*,
}; };
use log::{error, info, warn}; use log::{info, warn};
use pipewire::{channel::Sender, spa::Direction}; use pipewire::{channel::Sender, spa::Direction};
use crate::{ use crate::{
view::{self}, view::{self},
GtkMessage, PipewireLink, PipewireMessage, GtkMessage, MediaType, PipewireLink, PipewireMessage,
}; };
#[derive(Debug, Copy, Clone)]
pub enum MediaType {
Audio,
Video,
Midi,
}
static STYLE: &str = include_str!("style.css"); static STYLE: &str = include_str!("style.css");
mod imp { mod imp {
@@ -31,7 +24,6 @@ mod imp {
#[derive(Default)] #[derive(Default)]
pub struct Application { pub struct Application {
pub(super) graphview: view::GraphView, pub(super) graphview: view::GraphView,
pub(super) state: RefCell<State>,
pub(super) pw_sender: OnceCell<RefCell<Sender<GtkMessage>>>, pub(super) pw_sender: OnceCell<RefCell<Sender<GtkMessage>>>,
} }
@@ -116,19 +108,12 @@ impl Application {
@weak app => @default-return Continue(true), @weak app => @default-return Continue(true),
move |msg| { move |msg| {
match msg { match msg {
PipewireMessage::NodeAdded { PipewireMessage::NodeAdded{ id, name } => app.add_node(id,name),
id, PipewireMessage::PortAdded{ id, node_id, name, direction, media_type} => app.add_port(id,name,node_id,direction,media_type),
name, PipewireMessage::LinkAdded{ id, node_from, port_from, node_to, port_to} => app.add_link(id,node_from,port_from,node_to,port_to),
media_type, PipewireMessage::NodeRemoved { id } => app.remove_node(id),
} => app.add_node(id, name, media_type), PipewireMessage::PortRemoved { id, node_id } => app.remove_port(id, node_id),
PipewireMessage::PortAdded { PipewireMessage::LinkRemoved { id } => app.remove_link(id)
id,
node_id,
name,
direction,
} => app.add_port(id, name, node_id, direction),
PipewireMessage::LinkAdded { id, port_from, port_to } => app.add_link(id, port_from, port_to),
PipewireMessage::ObjectRemoved { id } => app.remove_global(id),
}; };
Continue(true) Continue(true)
} }
@@ -139,40 +124,27 @@ impl Application {
} }
/// Add a new node to the view. /// Add a new node to the view.
pub fn add_node(&self, id: u32, name: String, media_type: Option<MediaType>) { pub fn add_node(&self, id: u32, name: String) {
info!("Adding node to graph: id {}", id); info!("Adding node to graph: id {}", id);
let imp = imp::Application::from_instance(self); imp::Application::from_instance(self)
.graphview
imp.state.borrow_mut().insert( .add_node(id, view::Node::new(name.as_str()));
id,
Item::Node {
// widget: node_widget,
media_type,
},
);
imp.graphview.add_node(id, view::Node::new(name.as_str()));
} }
/// Add a new port to the view. /// Add a new port to the view.
pub fn add_port(&self, id: u32, name: String, node_id: u32, direction: Direction) { pub fn add_port(
&self,
id: u32,
name: String,
node_id: u32,
direction: Direction,
media_type: Option<MediaType>,
) {
info!("Adding port to graph: id {}", id); info!("Adding port to graph: id {}", id);
let imp = imp::Application::from_instance(self); let imp = imp::Application::from_instance(self);
// Find out the nodes media type so that the port can be colored.
let media_type =
if let Some(Item::Node { media_type, .. }) = imp.state.borrow().get(node_id) {
media_type.to_owned()
} else {
warn!("Node not found for Port {}", id);
None
};
// Save node_id so we can delete this port easily.
imp.state.borrow_mut().insert(id, Item::Port { node_id });
let port = view::Port::new(id, name.as_str(), direction, media_type); let port = view::Port::new(id, name.as_str(), direction, media_type);
// Create or delete a link if the widget emits the "port-toggled" signal. // Create or delete a link if the widget emits the "port-toggled" signal.
@@ -196,45 +168,13 @@ impl Application {
} }
/// Add a new link to the view. /// Add a new link to the view.
pub fn add_link(&self, id: u32, port_from: u32, port_to: u32) { pub fn add_link(&self, id: u32, node_from: u32, port_from: u32, node_to: u32, port_to: u32) {
info!("Adding link to graph: id {}", id); info!("Adding link to graph: id {}", id);
let imp = imp::Application::from_instance(self);
let mut state = imp.state.borrow_mut();
// FIXME: Links should be colored depending on the data they carry (video, audio, midi) like ports are. // FIXME: Links should be colored depending on the data they carry (video, audio, midi) like ports are.
let node_from = *match state.get(port_from) {
Some(Item::Port { node_id }) => node_id,
_ => {
error!(
"Tried to add link (id:{}), but its output port (id:{}) is not known",
id, port_from
);
return;
}
};
let node_to = *match state.get(port_to) {
Some(Item::Port { node_id }) => node_id,
_ => {
error!(
"Tried to add link (id:{}), but its input port (id:{}) is not known",
id, port_to
);
return;
}
};
state.insert(
id,
Item::Link {
output_port: port_from,
input_port: port_to,
},
);
// Update graph to contain the new link. // Update graph to contain the new link.
imp.graphview.add_link( imp::Application::from_instance(self).graphview.add_link(
id, id,
PipewireLink { PipewireLink {
node_from, node_from,
@@ -249,54 +189,9 @@ impl Application {
fn toggle_link(&self, port_from: u32, port_to: u32) { fn toggle_link(&self, port_from: u32, port_to: u32) {
let imp = imp::Application::from_instance(self); let imp = imp::Application::from_instance(self);
let sender = imp.pw_sender.get().expect("pw_sender not set").borrow_mut(); let sender = imp.pw_sender.get().expect("pw_sender not set").borrow_mut();
let state = imp.state.borrow_mut();
if let Some(id) = state.get_link_id(port_from, port_to) {
info!("Requesting removal of link with id {}", id);
sender sender
.send(GtkMessage::DestroyGlobal(id)) .send(GtkMessage::ToggleLink { port_from, port_to })
.expect("Failed to send message"); .expect("Failed to send message");
} else {
info!(
"Requesting creation of link from port id:{} to port id:{}",
port_from, port_to
);
let node_from = state
.get_node_of_port(port_from)
.expect("Requested port not in state");
let node_to = state
.get_node_of_port(port_to)
.expect("Requested port not in state");
sender
.send(GtkMessage::CreateLink(PipewireLink {
node_from,
port_from,
node_to,
port_to,
}))
.expect("Failed to send message");
}
}
/// Handle a global object being removed.
pub fn remove_global(&self, id: u32) {
let imp = imp::Application::from_instance(self);
if let Some(item) = imp.state.borrow_mut().remove(id) {
match item {
Item::Node { .. } => self.remove_node(id),
Item::Port { node_id } => self.remove_port(id, node_id),
Item::Link { .. } => self.remove_link(id),
}
} else {
warn!(
"Attempted to remove item with id {} that is not saved in state",
id
);
}
} }
/// Remove the node with the specified id from the view. /// Remove the node with the specified id from the view.
@@ -324,83 +219,3 @@ impl Application {
imp.graphview.remove_link(id); imp.graphview.remove_link(id);
} }
} }
/// Any pipewire item we need to keep track of.
/// These will be saved in the [`Application`]s `state` struct associated with their id.
enum Item {
Node {
// Keep track of the nodes media type to color ports on it.
media_type: Option<MediaType>,
},
Port {
// Save the id of the node this is on so we can remove the port from it
// when it is deleted.
node_id: u32,
},
// We don't need to memorize anything about links right now, but we need to
// be able to find out an id is a link.
Link {
output_port: u32,
input_port: u32,
},
}
/// This struct keeps track of any relevant items and stores them under their IDs.
///
/// Given two port ids, it can also efficiently find the id of the link that connects them.
#[derive(Default)]
struct State {
/// Map pipewire ids to items.
items: HashMap<u32, Item>,
/// Map `(output port id, input port id)` tuples to the id of the link that connects them.
links: HashMap<(u32, u32), u32>,
}
impl State {
/// Add a new item under the specified id.
fn insert(&mut self, id: u32, item: Item) {
if let Item::Link {
output_port,
input_port,
} = item
{
self.links.insert((output_port, input_port), id);
}
self.items.insert(id, item);
}
/// Get the item that has the specified id.
fn get(&self, id: u32) -> Option<&Item> {
self.items.get(&id)
}
/// Get the id of the link that links the two specified ports.
fn get_link_id(&self, output_port: u32, input_port: u32) -> Option<u32> {
self.links.get(&(output_port, input_port)).copied()
}
/// Remove the item with the specified id, returning it if it exists.
fn remove(&mut self, id: u32) -> Option<Item> {
let removed = self.items.remove(&id);
if let Some(Item::Link {
output_port,
input_port,
}) = removed
{
self.links.remove(&(output_port, input_port));
}
removed
}
/// Convenience function: Get the id of the node a port is on
fn get_node_of_port(&self, port: u32) -> Option<u32> {
if let Some(Item::Port { node_id }) = self.get(port) {
Some(*node_id)
} else {
None
}
}
}

View File

@@ -2,48 +2,59 @@ mod application;
mod pipewire_connection; mod pipewire_connection;
mod view; mod view;
use application::MediaType;
use gtk::{ use gtk::{
glib::{self, PRIORITY_DEFAULT}, glib::{self, PRIORITY_DEFAULT},
prelude::*, prelude::*,
}; };
use pipewire::spa::Direction; use pipewire::spa::Direction;
/// Messages used GTK thread to command the pipewire thread. /// Messages sent by the GTK thread to notify the pipewire thread.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum GtkMessage { enum GtkMessage {
/// Create a new link. /// Toggle a link between the two specified ports.
CreateLink(PipewireLink), ToggleLink { port_from: u32, port_to: u32 },
/// Destroy the global with the specified id.
DestroyGlobal(u32),
/// Quit the event loop and let the thread finish. /// Quit the event loop and let the thread finish.
Terminate, Terminate,
} }
/// Messages used pipewire thread to notify the GTK thread. /// Messages sent by the pipewire thread to notify the GTK thread.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum PipewireMessage { enum PipewireMessage {
/// A new node has appeared.
NodeAdded { NodeAdded {
id: u32, id: u32,
name: String, name: String,
media_type: Option<MediaType>,
}, },
/// A new port has appeared.
PortAdded { PortAdded {
id: u32, id: u32,
node_id: u32, node_id: u32,
name: String, name: String,
direction: Direction, direction: Direction,
media_type: Option<MediaType>,
}, },
/// A new link has appeared.
LinkAdded { LinkAdded {
id: u32, id: u32,
node_from: u32,
port_from: u32, port_from: u32,
node_to: u32,
port_to: u32, port_to: u32,
}, },
/// An object was removed NodeRemoved {
ObjectRemoved { id: u32 }, id: u32,
},
PortRemoved {
id: u32,
node_id: u32,
},
LinkRemoved {
id: u32,
},
}
#[derive(Debug, Copy, Clone)]
pub enum MediaType {
Audio,
Video,
Midi,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

View File

@@ -1,18 +1,18 @@
use std::rc::Rc; use std::{cell::RefCell, collections::HashMap, rc::Rc};
use gtk::glib::{self, clone}; use gtk::glib::{self, clone};
use log::warn; use log::{error, info, warn};
use pipewire::{ use pipewire::{
link::Link, link::Link,
prelude::*, prelude::*,
properties, properties,
registry::GlobalObject, registry::{GlobalObject, Registry},
spa::{Direction, ForeignDict}, spa::{Direction, ForeignDict},
types::ObjectType, types::ObjectType,
Context, MainLoop, Context, Core, MainLoop,
}; };
use crate::{application::MediaType, GtkMessage, PipewireMessage}; use crate::{GtkMessage, MediaType, PipewireMessage};
/// The "main" function of the pipewire thread. /// The "main" function of the pipewire thread.
pub(super) fn thread_main( pub(super) fn thread_main(
@@ -21,59 +21,55 @@ pub(super) fn thread_main(
) { ) {
let mainloop = MainLoop::new().expect("Failed to create mainloop"); let mainloop = MainLoop::new().expect("Failed to create mainloop");
let context = Context::new(&mainloop).expect("Failed to create context"); let context = Context::new(&mainloop).expect("Failed to create context");
let core = context.connect(None).expect("Failed to connect to remote"); let core = Rc::new(context.connect(None).expect("Failed to connect to remote"));
let registry = Rc::new(core.get_registry().expect("Failed to get registry")); let registry = Rc::new(core.get_registry().expect("Failed to get registry"));
let state = Rc::new(RefCell::new(State::new()));
let _receiver = pw_receiver.attach(&mainloop, { let _receiver = pw_receiver.attach(&mainloop, {
let mainloop = mainloop.clone(); clone!(@strong mainloop, @weak core, @weak registry, @strong state => move |msg| match msg {
clone!(@weak registry => move |msg| match msg { GtkMessage::ToggleLink { port_from, port_to } => toggle_link(port_from, port_to, &core, &registry, &state),
GtkMessage::CreateLink(link) => {
if let Err(e) = core.create_object::<Link, _>(
"link-factory",
&properties! {
"link.output.node" => link.node_from.to_string(),
"link.output.port" => link.port_from.to_string(),
"link.input.node" => link.node_to.to_string(),
"link.input.port" => link.port_to.to_string(),
"object.linger" => "1"
},
) {
warn!("Failed to create link: {}", e);
}
}
GtkMessage::DestroyGlobal(id) => {
// FIXME: Handle error
registry.destroy_global(id);
}
GtkMessage::Terminate => mainloop.quit(), GtkMessage::Terminate => mainloop.quit(),
}) })
}); });
let _listener = registry let _listener = registry
.add_listener_local() .add_listener_local()
.global({ .global(clone!(@strong gtk_sender, @strong state =>
let sender = gtk_sender.clone();
move |global| match global.type_ { move |global| match global.type_ {
ObjectType::Node => handle_node(global, &sender), ObjectType::Node => handle_node(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &sender), ObjectType::Port => handle_port(global, &gtk_sender, &state),
ObjectType::Link => handle_link(global, &sender), ObjectType::Link => handle_link(global, &gtk_sender, &state),
_ => { _ => {
// Other objects are not interesting to us // Other objects are not interesting to us
} }
} }
}) ))
.global_remove(move |id| { .global_remove(clone!(@strong state => move |id| {
gtk_sender if let Some(item) = state.borrow_mut().remove(id) {
.send(PipewireMessage::ObjectRemoved { id }) gtk_sender.send(match item {
.expect("Failed to send message") Item::Node { .. } => PipewireMessage::NodeRemoved {id},
}) Item::Port { node_id } => PipewireMessage::PortRemoved {id, node_id},
Item::Link { .. } => PipewireMessage::LinkRemoved {id},
}).expect("Failed to send message");
} else {
warn!(
"Attempted to remove item with id {} that is not saved in state",
id
);
}
}))
.register(); .register();
mainloop.run(); mainloop.run();
} }
/// Handle a new node being added /// Handle a new node being added
fn handle_node(node: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) { fn handle_node(
node: &GlobalObject<ForeignDict>,
sender: &glib::Sender<PipewireMessage>,
state: &Rc<RefCell<State>>,
) {
let props = node let props = node
.props .props
.as_ref() .as_ref()
@@ -104,17 +100,25 @@ fn handle_node(node: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireM
}) })
.flatten(); .flatten();
sender state.borrow_mut().insert(
.send(PipewireMessage::NodeAdded { node.id,
id: node.id, Item::Node {
name, // widget: node_widget,
media_type, media_type,
}) },
);
sender
.send(PipewireMessage::NodeAdded { id: node.id, name })
.expect("Failed to send message"); .expect("Failed to send message");
} }
/// Handle a new port being added /// Handle a new port being added
fn handle_port(port: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) { fn handle_port(
port: &GlobalObject<ForeignDict>,
sender: &glib::Sender<PipewireMessage>,
state: &Rc<RefCell<State>>,
) {
let props = port let props = port
.props .props
.as_ref() .as_ref()
@@ -131,18 +135,34 @@ fn handle_port(port: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireM
Direction::Output Direction::Output
}; };
// Find out the nodes media type so that the port can be colored.
let media_type = if let Some(Item::Node { media_type, .. }) = state.borrow().get(node_id) {
media_type.to_owned()
} else {
warn!("Node not found for Port {}", port.id);
None
};
// Save node_id so we can delete this port easily.
state.borrow_mut().insert(port.id, Item::Port { node_id });
sender sender
.send(PipewireMessage::PortAdded { .send(PipewireMessage::PortAdded {
id: port.id, id: port.id,
node_id, node_id,
name, name,
direction, direction,
media_type,
}) })
.expect("Failed to send message"); .expect("Failed to send message");
} }
/// Handle a new link being added /// Handle a new link being added
fn handle_link(link: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireMessage>) { fn handle_link(
link: &GlobalObject<ForeignDict>,
sender: &glib::Sender<PipewireMessage>,
state: &Rc<RefCell<State>>,
) {
let props = link let props = link
.props .props
.as_ref() .as_ref()
@@ -158,11 +178,170 @@ fn handle_link(link: &GlobalObject<ForeignDict>, sender: &glib::Sender<PipewireM
.parse() .parse()
.expect("Could not parse link.input.port property"); .expect("Could not parse link.input.port property");
let mut state = state.borrow_mut();
let node_from = *match state.get(port_from) {
Some(Item::Port { node_id }) => node_id,
_ => {
error!(
"Tried to add link (id:{}), but its output port (id:{}) is not known",
link.id, port_from
);
return;
}
};
let node_to = *match state.get(port_to) {
Some(Item::Port { node_id }) => node_id,
_ => {
error!(
"Tried to add link (id:{}), but its input port (id:{}) is not known",
link.id, port_to
);
return;
}
};
state.insert(
link.id,
Item::Link {
output_port: port_from,
input_port: port_to,
},
);
sender sender
.send(PipewireMessage::LinkAdded { .send(PipewireMessage::LinkAdded {
id: link.id, id: link.id,
node_from,
port_from, port_from,
node_to,
port_to, port_to,
}) })
.expect("Failed to send message"); .expect("Failed to send message");
} }
/// Toggle a link between the two specified ports.
fn toggle_link(
port_from: u32,
port_to: u32,
core: &Rc<Core>,
registry: &Rc<Registry>,
state: &Rc<RefCell<State>>,
) {
let state = state.borrow_mut();
if let Some(id) = state.get_link_id(port_from, port_to) {
info!("Requesting removal of link with id {}", id);
// FIXME: Handle error
registry.destroy_global(id);
} else {
info!(
"Requesting creation of link from port id:{} to port id:{}",
port_from, port_to
);
let node_from = state
.get_node_of_port(port_from)
.expect("Requested port not in state");
let node_to = state
.get_node_of_port(port_to)
.expect("Requested port not in state");
if let Err(e) = core.create_object::<Link, _>(
"link-factory",
&properties! {
"link.output.node" => node_from.to_string(),
"link.output.port" => port_from.to_string(),
"link.input.node" => node_to.to_string(),
"link.input.port" => port_to.to_string(),
"object.linger" => "1"
},
) {
warn!("Failed to create link: {}", e);
}
}
}
/// Any pipewire item we need to keep track of.
/// These will be saved in the [`Application`]s `state` struct associated with their id.
enum Item {
Node {
// Keep track of the nodes media type to color ports on it.
media_type: Option<MediaType>,
},
Port {
// Save the id of the node this is on so we can remove the port from it
// when it is deleted.
node_id: u32,
},
// We don't need to memorize anything about links right now, but we need to
// be able to find out an id is a link.
Link {
output_port: u32,
input_port: u32,
},
}
/// This struct keeps track of any relevant items and stores them under their IDs.
///
/// Given two port ids, it can also efficiently find the id of the link that connects them.
#[derive(Default)]
struct State {
/// Map pipewire ids to items.
items: HashMap<u32, Item>,
/// Map `(output port id, input port id)` tuples to the id of the link that connects them.
links: HashMap<(u32, u32), u32>,
}
impl State {
/// Create a new, empty state.
fn new() -> Self {
Default::default()
}
/// Add a new item under the specified id.
fn insert(&mut self, id: u32, item: Item) {
if let Item::Link {
output_port,
input_port,
} = item
{
self.links.insert((output_port, input_port), id);
}
self.items.insert(id, item);
}
/// Get the item that has the specified id.
fn get(&self, id: u32) -> Option<&Item> {
self.items.get(&id)
}
/// Get the id of the link that links the two specified ports.
fn get_link_id(&self, output_port: u32, input_port: u32) -> Option<u32> {
self.links.get(&(output_port, input_port)).copied()
}
/// Remove the item with the specified id, returning it if it exists.
fn remove(&mut self, id: u32) -> Option<Item> {
let removed = self.items.remove(&id);
if let Some(Item::Link {
output_port,
input_port,
}) = removed
{
self.links.remove(&(output_port, input_port));
}
removed
}
/// Convenience function: Get the id of the node a port is on
fn get_node_of_port(&self, port: u32) -> Option<u32> {
if let Some(Item::Port { node_id }) = self.get(port) {
Some(*node_id)
} else {
None
}
}
}

View File

@@ -7,7 +7,7 @@ use gtk::{
use log::warn; use log::warn;
use pipewire::spa::Direction; use pipewire::spa::Direction;
use crate::application::MediaType; use crate::MediaType;
mod imp { mod imp {
use once_cell::{sync::Lazy, unsync::OnceCell}; use once_cell::{sync::Lazy, unsync::OnceCell};