Use link info callback instead of properties to get ids.

While the properties for a links node ids are not always set, they always are in the link info.

Also, the work done in this commit will easily allow to get the format of links and ports later,
so that we can get the format of them much more reliably,
and we can also get notified of changes to an existing global via the info callback.
This commit is contained in:
Tom A. Wagner
2021-06-06 10:38:41 +02:00
parent 92101d860c
commit c8f94ae302
2 changed files with 149 additions and 142 deletions

View File

@@ -1,9 +1,11 @@
mod state;
use std::{cell::RefCell, collections::HashMap, rc::Rc}; use std::{cell::RefCell, collections::HashMap, rc::Rc};
use gtk::glib::{self, clone}; use gtk::glib::{self, clone};
use log::{error, info, warn}; use log::{debug, info, warn};
use pipewire::{ use pipewire::{
link::Link, link::{Link, LinkListener},
prelude::*, prelude::*,
properties, properties,
registry::{GlobalObject, Registry}, registry::{GlobalObject, Registry},
@@ -13,6 +15,14 @@ use pipewire::{
}; };
use crate::{GtkMessage, MediaType, PipewireMessage}; use crate::{GtkMessage, MediaType, PipewireMessage};
use state::{Item, State};
enum ProxyItem {
Link {
_proxy: Link,
_listener: LinkListener,
},
}
/// The "main" function of the pipewire thread. /// The "main" function of the pipewire thread.
pub(super) fn thread_main( pub(super) fn thread_main(
@@ -24,6 +34,9 @@ pub(super) fn thread_main(
let core = Rc::new(context.connect(None).expect("Failed to connect to remote")); let core = Rc::new(context.connect(None).expect("Failed to connect to remote"));
let registry = Rc::new(core.get_registry().expect("Failed to get registry")); let registry = Rc::new(core.get_registry().expect("Failed to get registry"));
// Keep proxies and their listeners alive so that we can receive info events.
let proxies = Rc::new(RefCell::new(HashMap::new()));
let state = Rc::new(RefCell::new(State::new())); let state = Rc::new(RefCell::new(State::new()));
let _receiver = pw_receiver.attach(&mainloop, { let _receiver = pw_receiver.attach(&mainloop, {
@@ -35,17 +48,17 @@ pub(super) fn thread_main(
let _listener = registry let _listener = registry
.add_listener_local() .add_listener_local()
.global(clone!(@strong gtk_sender, @strong state => .global(clone!(@strong gtk_sender, @weak registry, @strong proxies, @strong state =>
move |global| match global.type_ { move |global| match global.type_ {
ObjectType::Node => handle_node(global, &gtk_sender, &state), ObjectType::Node => handle_node(global, &gtk_sender, &state),
ObjectType::Port => handle_port(global, &gtk_sender, &state), ObjectType::Port => handle_port(global, &gtk_sender, &state),
ObjectType::Link => handle_link(global, &gtk_sender, &state), ObjectType::Link => handle_link(global, &gtk_sender, &registry, &proxies, &state),
_ => { _ => {
// Other objects are not interesting to us // Other objects are not interesting to us
} }
} }
)) ))
.global_remove(clone!(@strong state => move |id| { .global_remove(clone!(@strong proxies, @strong state => move |id| {
if let Some(item) = state.borrow_mut().remove(id) { if let Some(item) = state.borrow_mut().remove(id) {
gtk_sender.send(match item { gtk_sender.send(match item {
Item::Node { .. } => PipewireMessage::NodeRemoved {id}, Item::Node { .. } => PipewireMessage::NodeRemoved {id},
@@ -58,6 +71,8 @@ pub(super) fn thread_main(
id id
); );
} }
proxies.borrow_mut().remove(&id);
})) }))
.register(); .register();
@@ -161,62 +176,58 @@ fn handle_port(
fn handle_link( fn handle_link(
link: &GlobalObject<ForeignDict>, link: &GlobalObject<ForeignDict>,
sender: &glib::Sender<PipewireMessage>, sender: &glib::Sender<PipewireMessage>,
registry: &Rc<Registry>,
proxies: &Rc<RefCell<HashMap<u32, ProxyItem>>>,
state: &Rc<RefCell<State>>, state: &Rc<RefCell<State>>,
) { ) {
let props = link debug!(
.props "New link (id:{}) appeared, setting up info listener.",
.as_ref() link.id
.expect("Link object is missing properties"); );
let port_from: u32 = props
.get("link.output.port") let proxy: Link = registry.bind(link).expect("Failed to bind to link proxy");
.expect("Link has no link.output.port property") let listener = proxy
.parse() .add_listener_local()
.expect("Could not parse link.output.port property"); .info(clone!(@strong state, @strong sender => move |info| {
let port_to: u32 = props debug!("Received link info: {:?}", info);
.get("link.input.port")
.expect("Link has no link.input.port property") let id = info.id();
.parse()
.expect("Could not parse link.input.port property");
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
let node_from = *match state.get(port_from) { if let Some(Item::Link { .. }) = state.get(id) {
Some(Item::Port { node_id }) => node_id, // Info was an update - figure out if we should notify the gtk thread
_ => { // TODO
error!( } else {
"Tried to add link (id:{}), but its output port (id:{}) is not known", // First time we get info. We can now notify the gtk thread of a new link.
link.id, port_from let node_from = info.output_node_id();
); let port_from = info.output_port_id();
return; let node_to = info.input_node_id();
} let port_to = info.input_port_id();
};
let node_to = *match state.get(port_to) {
Some(Item::Port { node_id }) => node_id,
_ => {
error!(
"Tried to add link (id:{}), but its input port (id:{}) is not known",
link.id, port_to
);
return;
}
};
state.insert( state.insert(id, Item::Link {
link.id, port_from, port_to
Item::Link { });
output_port: port_from,
input_port: port_to,
},
);
sender sender.send(PipewireMessage::LinkAdded {
.send(PipewireMessage::LinkAdded { id,
id: link.id,
node_from, node_from,
port_from, port_from,
node_to, node_to,
port_to, port_to
}) }).expect(
.expect("Failed to send message"); "Failed to send message"
);
}
}))
.register();
proxies.borrow_mut().insert(
link.id,
ProxyItem::Link {
_proxy: proxy,
_listener: listener,
},
);
} }
/// Toggle a link between the two specified ports. /// Toggle a link between the two specified ports.
@@ -260,88 +271,3 @@ fn toggle_link(
} }
} }
} }
/// Any pipewire item we need to keep track of.
/// These will be saved in the [`Application`]s `state` struct associated with their id.
enum Item {
Node {
// Keep track of the nodes media type to color ports on it.
media_type: Option<MediaType>,
},
Port {
// Save the id of the node this is on so we can remove the port from it
// when it is deleted.
node_id: u32,
},
// We don't need to memorize anything about links right now, but we need to
// be able to find out an id is a link.
Link {
output_port: u32,
input_port: u32,
},
}
/// This struct keeps track of any relevant items and stores them under their IDs.
///
/// Given two port ids, it can also efficiently find the id of the link that connects them.
#[derive(Default)]
struct State {
/// Map pipewire ids to items.
items: HashMap<u32, Item>,
/// Map `(output port id, input port id)` tuples to the id of the link that connects them.
links: HashMap<(u32, u32), u32>,
}
impl State {
/// Create a new, empty state.
fn new() -> Self {
Default::default()
}
/// Add a new item under the specified id.
fn insert(&mut self, id: u32, item: Item) {
if let Item::Link {
output_port,
input_port,
} = item
{
self.links.insert((output_port, input_port), id);
}
self.items.insert(id, item);
}
/// Get the item that has the specified id.
fn get(&self, id: u32) -> Option<&Item> {
self.items.get(&id)
}
/// Get the id of the link that links the two specified ports.
fn get_link_id(&self, output_port: u32, input_port: u32) -> Option<u32> {
self.links.get(&(output_port, input_port)).copied()
}
/// Remove the item with the specified id, returning it if it exists.
fn remove(&mut self, id: u32) -> Option<Item> {
let removed = self.items.remove(&id);
if let Some(Item::Link {
output_port,
input_port,
}) = removed
{
self.links.remove(&(output_port, input_port));
}
removed
}
/// Convenience function: Get the id of the node a port is on
fn get_node_of_port(&self, port: u32) -> Option<u32> {
if let Some(Item::Port { node_id }) = self.get(port) {
Some(*node_id)
} else {
None
}
}
}

View File

@@ -0,0 +1,81 @@
use std::collections::HashMap;
use crate::MediaType;
/// Any pipewire item we need to keep track of.
/// These will be saved in the `State` struct associated with their id.
pub(super) enum Item {
Node {
// Keep track of the nodes media type to color ports on it.
media_type: Option<MediaType>,
},
Port {
// Save the id of the node this is on so we can remove the port from it
// when it is deleted.
node_id: u32,
},
Link {
port_from: u32,
port_to: u32,
},
}
/// This struct keeps track of any relevant items and stores them under their IDs.
///
/// Given two port ids, it can also efficiently find the id of the link that connects them.
#[derive(Default)]
pub(super) struct State {
/// Map pipewire ids to items.
items: HashMap<u32, Item>,
/// Map `(output port id, input port id)` tuples to the id of the link that connects them.
links: HashMap<(u32, u32), u32>,
}
impl State {
/// Create a new, empty state.
pub fn new() -> Self {
Default::default()
}
/// Add a new item under the specified id.
pub fn insert(&mut self, id: u32, item: Item) {
if let Item::Link {
port_from, port_to, ..
} = item
{
self.links.insert((port_from, port_to), id);
}
self.items.insert(id, item);
}
/// Get the item that has the specified id.
pub fn get(&self, id: u32) -> Option<&Item> {
self.items.get(&id)
}
/// Get the id of the link that links the two specified ports.
pub fn get_link_id(&self, output_port: u32, input_port: u32) -> Option<u32> {
self.links.get(&(output_port, input_port)).copied()
}
/// Remove the item with the specified id, returning it if it exists.
pub fn remove(&mut self, id: u32) -> Option<Item> {
let removed = self.items.remove(&id);
if let Some(Item::Link { port_from, port_to }) = removed {
self.links.remove(&(port_from, port_to));
}
removed
}
/// Convenience function: Get the id of the node a port is on
pub fn get_node_of_port(&self, port: u32) -> Option<u32> {
if let Some(Item::Port { node_id }) = self.get(port) {
Some(*node_id)
} else {
None
}
}
}